Files
matitos_news/app_selenium/missing_kids.py

160 lines
6.5 KiB
Python

from utils import get_webdriver, kill_process_tree
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import time
import os
from logger import get_logger
logger = get_logger()
class MissingKidsFetcher():
def __init__(self) -> None:
pass
def verify_missing_kid_url(self, url):
def load_finished(driver):
# Find all <img> tags with src attributes. Extract src URLs
image_urls = [img.get_attribute("src") for img in driver.find_elements(By.XPATH, "//img[@src]")]
# If base64 image exists, loading finished
finished = any(["data:image/png;base64" in i for i in image_urls])
# logger.debug("Finished loading URL")
return finished
try:
# Initialize
logger.debug("Initializing driver")
driver, service = get_webdriver()
# Load URL
logger.debug("Get URL: {}".format(url))
driver.get(url)
# Wait for 404?
try:
WebDriverWait(driver, 2).until(EC.title_contains("404"))
logger.debug("WebDriverWait -> title contains 404")
except TimeoutException:
logger.debug("WebDriverWait timeout, no 404 appeared")
if ("404" in driver.title):
# Status invalid
results = {"status": "invalid"}
else:
# Check until finished loading
num_checks = 10
while (not load_finished(driver)) and (num_checks>=0):
time.sleep(1)
num_checks -= 1
# Find all <img> tags with src attributes. Extract src URLs
image_urls = [img.get_attribute("src") for img in driver.find_elements(By.XPATH, "//img[@src]")]
# Redirects to 404?
if ("missingkids.org/404" in driver.current_url) or (any(["thumb-404.png" in i for i in image_urls])):
# Status invalid
results = {"status": "invalid"}
# Redirection to valid URL? -> Duplicate
elif (driver.current_url != url):
# Redirection (duplicate)
results = {"status": "duplicate", "redirection": driver.current_url}
# Valid
elif ("Have you seen this child?" in driver.title):
# Status valid
results = {"status": "valid"}
else:
results = {"status": "unknown"}
except Exception as e:
logger.warning("Exception while verifying MissingKid URL {}\n{}".format(url, str(e)), exc_info=True)
results = {}
# Release memory
try:
driver.quit() #driver.close()
time.sleep(1)
# import atexit
# atexit.register(driver.quit) # Will always be called on exit
except Exception as e:
logger.warning("Exception while closing/quitting driver: {}".format(str(e)), exc_info=True)
kill_process_tree(service.process.pid)
logger.info("Results: {} for URL: {}".format(str(results), url))
return results
def get_missing_kids_urls(self, first_n_pages=-1):
logger.info("Get MissingKids, #pages: {}".format(first_n_pages))
# Poster URL
url = "https://www.missingkids.org/gethelpnow/search/poster-search-results"
# URLs
set_urls = set()
try:
logger.debug("Initializing driver")
driver, service = get_webdriver()
logger.debug("Get URL: {}".format(url))
# Go to URL
driver.get(url)
# Iterate
i, continue_iterating, num_exceptions = 1, True, 0
while (continue_iterating):
logger.debug("Processing page: {}...".format(i))
try:
time.sleep(float(os.getenv("SELENIUM_SLEEP_PER_PAGE", 4))) #driver.implicitly_wait(3)
# Fetch poster URLs
for element_type in ["a"]: # ["a", "p", "div"]:
for elem in driver.find_elements(By.TAG_NAME, element_type):
href = elem.get_attribute('href')
if (href is not None) and ("missingkids.org/poster" in href):
set_urls.add(href)
logger.debug("#URLS: {}".format(len(set_urls)))
# Next page
elem = driver.find_element(By.LINK_TEXT, str(i+1))
logger.debug("Clicking: {}...".format(elem.text))
elem.click()
# Ok
processed_ok = True
except Exception as e:
# +1 exception
num_exceptions += 1
processed_ok = False
if (num_exceptions == 2):
continue_iterating = False
else:
logger.info("Exception while clicking page {}, retrying...".format(i+1))
start_print = False
for e in driver.find_elements(By.PARTIAL_LINK_TEXT, ""):
if (e.text == "<<"):
start_print = True
if (e.text == ">>"):
break
if (start_print):
logger.info(e.text)
# driver.refresh()
time.sleep(float(os.getenv("SELENIUM_SLEEP_PER_PAGE", 4)))
if (i == first_n_pages):
continue_iterating = False
if (processed_ok):
i += 1
num_exceptions = 0
except Exception as e:
logger.warning("Exception while fetching MissingKids {}".format(str(e)), exc_info=True)
set_urls = set()
# Release memory
try:
driver.quit() #driver.close()
time.sleep(1)
# import atexit
# atexit.register(driver.quit) # Will always be called on exit
except Exception as e:
logger.warning("Exception while closing/quitting driver: {}".format(str(e)), exc_info=True)
kill_process_tree(service.process.pid)
return set_urls