Refactoring fetcher

This commit is contained in:
Luciano Gervasoni
2025-03-07 17:23:09 +01:00
parent 95b9766245
commit e024b200bb
17 changed files with 4904 additions and 422 deletions

View File

@@ -1,43 +1,21 @@
import src.credentials as cred
import logging
from logging.handlers import RotatingFileHandler
logging.basicConfig(format='%(filename)s | %(levelname)s | %(asctime)s | %(message)s')
logger = logging.getLogger("news_fetcher")
logger.setLevel(logging.INFO)
from src.fetch_feed import FetchFeed
from src.fetch_parser import FetchParser
from src.fetch_search import FetchSearch
import os
os.makedirs("logs", exist_ok=True)
# To file log
fh = RotatingFileHandler(filename="logs/log_app_fetcher.log", mode="a", maxBytes=10000000, backupCount=4)
fh.setFormatter(logging.Formatter('%(levelname)s | %(asctime)s | %(message)s'))
logger.addHandler(fh)
# To file log: WARNING / ERROR
fh_ = RotatingFileHandler(filename="logs/log_app_fetcher_error.log", mode="a", maxBytes=10000000, backupCount=1)
fh_.setFormatter(logging.Formatter('%(levelname)s | %(asctime)s | %(message)s'))
fh_.setLevel(logging.WARNING)
logger.addHandler(fh_)
logger.info("Environment: {}".format(cred.ENVIRONMENT))
##################################################################################################
from src.news_feed import NewsFeed
from src.news_parsing import NewsSiteParsing
from src.news_search import NewsSearch
from src.news_missing_kids import NewsMissingKids
from src.missing_kids_fetch import MissingKidsFetch
from src.missing_kids_status import MissingKidsStatus
from src.url_status import UpdateErrorURLs
from src.fetcher_status import FetcherStatus
from src.url_status import UpdateErrorURLs
from src.db_utils import DB_Handler
import src.credentials as cred
from logging_ import get_logger
from fastapi import FastAPI, BackgroundTasks
# import requests
# from fastapi_utils.tasks import repeat_every
# import time
# time.sleep(10)
# import gc
##################################################################################################
logger = get_logger()
logger.info("Environment: {}".format(cred.ENVIRONMENT))
db_handler = DB_Handler(cred.db_connect_info, cred.redis_connect_info)
@@ -47,49 +25,55 @@ app = FastAPI()
def hello_world():
return {"message": "Ok"}
@app.get("/{fetch_type}")
async def fetch(background_tasks: BackgroundTasks, fetch_type: str):
@app.get("/{process_type}")
async def process(background_tasks: BackgroundTasks, process_type: str):
# Concurrent job running
logger.info("Triggered fetch: {}".format(fetch_type))
logger.info("Triggered: {}".format(process_type))
if (fetch_type == "feeds"):
task_run = NewsFeed(db_handler).run
elif (fetch_type == "parser"):
task_run = NewsSiteParsing(db_handler).run
elif (fetch_type == "fetch_missing_kids_reduced"):
task_run = NewsMissingKids(cred.db_connect_info, cred.redis_connect_info, num_pages=4).run
elif (fetch_type == "fetch_missing_kids_full"):
task_run = NewsMissingKids(cred.db_connect_info, cred.redis_connect_info, num_pages=100000).run
elif (fetch_type == "search") or (fetch_type == "search_full"):
task_run = NewsSearch(cred.db_connect_info, cred.redis_connect_info, full=True).run
elif (fetch_type == "search_reduced"):
task_run = NewsSearch(cred.db_connect_info, cred.redis_connect_info, full=False).run
elif (fetch_type == "update_missing_kids_status_reduced"):
if (process_type == "fetch_feeds"):
task_run = FetchFeed(db_handler).run
elif (process_type == "fetch_parser"):
task_run = FetchParser(db_handler).run
elif (process_type == "search") or (process_type == "search_full"):
task_run = FetchSearch(cred.db_connect_info, cred.redis_connect_info, full=True).run
elif (process_type == "search_reduced"):
task_run = FetchSearch(cred.db_connect_info, cred.redis_connect_info, full=False).run
# Selenium based
elif (process_type == "fetch_missing_kids_reduced"):
task_run = MissingKidsFetch(db_handler, num_pages=4).run
elif (process_type == "fetch_missing_kids_full"):
task_run = MissingKidsFetch(db_handler, num_pages=100000).run
elif (process_type == "update_missing_kids_status_reduced"):
task_run = MissingKidsStatus(cred.db_connect_info, cred.redis_connect_info, num_urls=50).update_missing_kids_status
elif (fetch_type == "update_missing_kids_status_full"):
elif (process_type == "update_missing_kids_status_full"):
task_run = MissingKidsStatus(cred.db_connect_info, cred.redis_connect_info, num_urls=None).update_missing_kids_status
elif (fetch_type == "update_error_urls"):
elif (process_type == "update_error_urls"):
task_run = UpdateErrorURLs(cred.db_connect_info, cred.redis_connect_info, num_urls=100).update_error_urls_status
elif (fetch_type == "fetch_warning_check"):
task_run = FetcherStatus(cred.db_connect_info, cred.redis_connect_info, last_minutes_check=180).check_warning
else:
return {"message": "ERROR. Unknown fetcher type!"}
# Run task
background_tasks.add_task(task_run)
# Return message
return {"message": "Started fetching {}: Ok".format(fetch_type)}
return {"message": "Started {}: Ok".format(process_type)}
##################################################################################################
"""
# TODO: Instead of background tasks!
###########################
'''
@app.on_event("startup")
def verify_db() -> None:
logger.info("Testing DB connection")
import psycopg
with psycopg.connect(cred.db_connect_info) as conn:
url_test_msg = "Num URLs: {}".format(conn.execute("SELECT COUNT(*) FROM URLS;").fetchall())
logger.info(url_test_msg)
'''
###########################
import rq
import redis
# Redis connection
redis_conn = redis.Redis(host='localhost', port=6379, db=0)
queue = rq.Queue(connection=redis_conn)
# ...
# Queue the processing task
dict_args= {"db_handler": db_handler, }
queue.enqueue(task_run, **dict_args)
# https://python-rq.org/
"""