Selenium based fetch of different sources

This commit is contained in:
Luciano Gervasoni
2025-07-08 18:18:26 +02:00
parent f729bd1cb2
commit 0cf61026e8
10 changed files with 235 additions and 31 deletions

View File

@@ -1,6 +1,7 @@
from fastapi import FastAPI
from pydantic import BaseModel
from missing_kids import MissingKidsFetcher
from search import SearchFetcher
from logger import get_logger
logger = get_logger()
@@ -12,17 +13,41 @@ def get_missing_kids(pages: int = -1):
logger.info("Get missing kids, #pages={}".format(pages))
res = {"list_urls": MissingKidsFetcher().get_missing_kids_urls(first_n_pages=pages)}
except Exception as e:
logger.warning("Exception: {}".format(str(e)), exc_info=True)
res = {}
return res
class Body(BaseModel):
class BodyVerifyMissingKid(BaseModel):
url: str
@app.post("/verify_missing_kid/")
def get_missing_kids(data: Body):
def get_missing_kids(data: BodyVerifyMissingKid):
try:
logger.info("Verify missing kid, URL={}".format(data.url))
res = MissingKidsFetcher().verify_missing_kid_url(data.url)
except Exception as e:
logger.warning("Exception: {}".format(str(e)), exc_info=True)
res = {}
return res
return res
class BodyFetchSearch(BaseModel):
search: str
@app.post("/fetch_search/")
def fetch_search(data: BodyFetchSearch):
try:
# Initialize
search_fetcher, results = SearchFetcher(), {}
# Iterate
for source in search_fetcher.get_available_sources():
logger.info("Fetch based search source={} search={}".format(source, data.search))
# Fetch
results[source] = SearchFetcher().search(source, data.search)
# Empty?
if (len(results[source]) == 0):
results.pop(source)
except Exception as e:
logger.warning("Exception: {}".format(str(e)), exc_info=True)
results = {}
return results

View File

@@ -8,10 +8,10 @@ logs_directory = os.getenv("PATH_LOGS_DIRECTORY", "logs")
os.makedirs(logs_directory, exist_ok=True)
logging.basicConfig(format='%(filename)s | %(levelname)s | %(asctime)s | %(message)s')
logger = logging.getLogger("app_selenium")
logger.setLevel(logging.DEBUG)
logger = logging.getLogger("selenium")
logger.setLevel(logging.INFO)
# To file log: INFO / WARNING / ERROR / CRITICAL
# To file log: DEBUG / INFO / WARNING / ERROR / CRITICAL
fh = logging.handlers.RotatingFileHandler(filename=os.path.join(logs_directory, "debug.log"), mode="a", maxBytes=10000000, backupCount=1)
fh.setFormatter(logging.Formatter('%(levelname)s | %(asctime)s | %(message)s'))
fh.setLevel(logging.DEBUG)

View File

@@ -1,7 +1,5 @@
from selenium import webdriver
from utils import get_webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.firefox.service import Service
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
@@ -11,16 +9,6 @@ import os
from logger import get_logger
logger = get_logger()
def get_webdriver():
options = Options()
options.add_argument('--headless') # Optional
options.binary_location = '/opt/firefox/firefox'
service = Service('/usr/local/bin/geckodriver')
driver = webdriver.Firefox(options=options, service=service)
return driver
class MissingKidsFetcher():
def __init__(self) -> None:
pass

106
app_selenium/search.py Normal file
View File

@@ -0,0 +1,106 @@
from utils import get_webdriver
from selenium.webdriver.common.by import By
from urllib.parse import quote
import time
from logger import get_logger
logger = get_logger()
class SearchFetcher():
def __init__(self):
pass
def get_available_sources(self, ):
return ["foxnews", "breitbart", "zerohedge"]
def search(self, source, search="child abuse"):
try:
if (source == "foxnews"):
return self._search_foxnews(search)
elif (source == "breitbart"):
return self._search_breitbart(search)
elif (source == "zerohedge"):
return self._search_zerohedge(search)
else:
logger.warning("Search not implemented for source={} search={}".format(source, search))
return []
except Exception as e:
logger.warning("Error searching for source={} search={}".format(source, search))
return []
def _search_foxnews(self, search):
url_host = "foxnews.com"
# URL search
url_unquoted = "https://www.foxnews.com/search-results/search#q={}".format(search)
url = quote(url_unquoted, safe=":/?=&#")
# Initialize
driver = get_webdriver()
# Load URL
driver.get(url)
time.sleep(2)
# Find the element with class "page"
page_element = driver.find_element(By.CLASS_NAME, "page")
# Find the articles
articles = page_element.find_elements(By.CLASS_NAME, "article")
# Extract URLs
urls = [ art.find_element(By.CLASS_NAME, "m").find_element(By.TAG_NAME, "a").get_attribute("href") for art in articles ]
# Remove duplicates, remove None
urls = [u for u in set(urls) if u is not None]
# Filter by URL host
urls = [u for u in urls if url_host in u]
return urls
def _search_breitbart(self, search):
url_host = "breitbart.com"
# URL search
url_unquoted = "https://www.breitbart.com/search/?s={}".format(search.replace(" ", "+"))
url = quote(url_unquoted, safe=":/?=&#")
# Initialize
driver = get_webdriver()
# Load URL
driver.get(url)
time.sleep(4)
# Find the element with class "page"
page_element = driver.find_element(By.CLASS_NAME, "gsc-expansionArea")
# Find the articles
articles = page_element.find_elements(By.CLASS_NAME, "gs-title")
# Extract URLs
urls = [ art.get_attribute("href") for art in articles ]
# Remove duplicates, remove None
urls = [u for u in set(urls) if u is not None]
# Filter by URL host
urls = [u for u in urls if url_host in u]
return urls
def _search_zerohedge(self, search):
url_host = "zerohedge.com"
# URL search
url_unquoted = "https://www.zerohedge.com/search-content?qTitleBody={}".format(search.replace(" ", "+"))
url = quote(url_unquoted, safe=":/?=&#")
# Initialize
driver = get_webdriver()
# Load URL
driver.get(url)
time.sleep(2)
# Find the element with class "page"
page_element = driver.find_element(By.CLASS_NAME, "main-content")
# Find the articles
articles = page_element.find_elements(By.TAG_NAME, "a")
# Extract URLs
urls = [ art.get_attribute("href") for art in articles]
# Remove duplicates, remove None
urls = [u for u in set(urls) if u is not None]
# Filter by URL host
urls = [u for u in urls if url_host in u]
return urls

13
app_selenium/utils.py Normal file
View File

@@ -0,0 +1,13 @@
from selenium import webdriver
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.firefox.service import Service
def get_webdriver():
options = Options()
options.add_argument('--headless') # Optional
options.binary_location = '/opt/firefox/firefox'
service = Service('/usr/local/bin/geckodriver')
driver = webdriver.Firefox(options=options, service=service)
return driver