Switching to django celery for workers

This commit is contained in:
Luciano Gervasoni
2025-07-17 22:29:06 +02:00
parent 50e8666162
commit cb621c9d6b
15 changed files with 540 additions and 348 deletions

View File

@@ -1,48 +1,37 @@
import logging
import os
# Set to warning
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("newspaper").setLevel(logging.WARNING)
# Get env var
logs_directory = os.getenv("PATH_LOGS_DIRECTORY", "logs")
# Directory of logs
os.makedirs(logs_directory, exist_ok=True)
class PPIDFilter(logging.Filter):
def filter(self, record):
# record.ppid = str(os.getppid()) + " " + multiprocessing.current_process().name # os.environ.get("PPID", "*" + os.environ.get("PID"))
record.ppid = os.getppid()
return True
logging.basicConfig(format='%(filename)s | PPID=%(ppid)s | %(levelname)s | %(asctime)s | %(message)s')
logging.basicConfig(format='%(filename)s | %(levelname)s | %(asctime)s | %(message)s')
logger = logging.getLogger("fetcher")
# logger.setFormatter(logging.Formatter('%(levelname)s | PPID=%(ppid)s | %(asctime)s | %(message)s'))
logger.addFilter(PPIDFilter())
logger.setLevel(logging.INFO)
logger.setLevel(logging.DEBUG)
# To file log: DEBUG / INFO / WARNING / ERROR / CRITICAL
fh = logging.handlers.RotatingFileHandler(filename=os.path.join(logs_directory, "debug.log"), mode="a", maxBytes=10000000, backupCount=1)
fh.setFormatter(logging.Formatter('%(levelname)s | PPID=%(ppid)s | %(asctime)s | %(message)s'))
fh.addFilter(PPIDFilter())
fh.setFormatter(logging.Formatter('%(levelname)s | %(asctime)s | %(message)s'))
fh.setLevel(logging.DEBUG)
logger.addHandler(fh)
# To file log: INFO / WARNING / ERROR
fh = logging.handlers.RotatingFileHandler(filename=os.path.join(logs_directory, "info.log"), mode="a", maxBytes=10000000, backupCount=1)
fh.setFormatter(logging.Formatter('%(levelname)s | PPID=%(ppid)s | %(asctime)s | %(message)s'))
fh.addFilter(PPIDFilter())
fh.setFormatter(logging.Formatter('%(levelname)s | %(asctime)s | %(message)s'))
fh.setLevel(logging.INFO)
logger.addHandler(fh)
# To file log: WARNING / ERROR / CRITICAL
fh = logging.handlers.RotatingFileHandler(filename=os.path.join(logs_directory, "warning.log"), mode="a", maxBytes=10000000, backupCount=1)
fh.setFormatter(logging.Formatter('%(levelname)s | PPID=%(ppid)s | %(asctime)s | %(message)s'))
fh.addFilter(PPIDFilter())
fh.setFormatter(logging.Formatter('%(levelname)s | %(asctime)s | %(message)s'))
fh.setLevel(logging.WARNING)
logger.addHandler(fh)
# Set to warning
logging.getLogger("urllib3").setLevel(logging.WARNING)
logging.getLogger("newspaper").setLevel(logging.WARNING)
def get_logger():
return logger

View File

@@ -1,4 +1,4 @@
from scheduler import job
from celery import shared_task
from .src.fetch_feed import FetchFeeds
from .src.fetch_parser import FetchParser
@@ -11,63 +11,64 @@ from .src.publisher import Publisher
from .src.logger import get_logger
logger = get_logger()
@job('default')
@shared_task(queue='default')
def fetch_feeds():
task = "Fetch Feeds"
logger.info("Task triggered: {}".format(task))
FetchFeeds().run()
logger.info("Task completed: {}".format(task))
@job('default')
@shared_task(queue='default')
def fetch_parser():
task = "Fetch Parser"
logger.info("Task triggered: {}".format(task))
FetchParser().run()
logger.info("Task completed: {}".format(task))
@job('default')
@shared_task(queue='default')
def fetch_search():
task = "Fetch Search"
logger.info("Task triggered: {}".format(task))
FetchSearcher().run()
logger.info("Task completed: {}".format(task))
@job('default')
@shared_task(queue='low')
def fetch_selenium_search():
task = "Fetch Selenium search"
logger.info("Task triggered: {}".format(task))
FetchSeleniumSourceSearch().run()
logger.info("Task completed: {}".format(task))
@job('default')
@shared_task(queue='low')
def fetch_missing_kids(number_pages=5):
task = "Fetch MissingKids"
logger.info("Task triggered: {}".format(task))
FetchMissingKids().run(number_pages)
logger.info("Task completed: {}".format(task))
@job('default')
@shared_task(queue='default')
def process_raw_urls(batch_size=100):
task = "Process raw URLs"
logger.info("Task triggered: {}".format(task))
DB_Handler().process_raw_urls(batch_size=batch_size)
logger.info("Task completed: {}".format(task))
@job('default')
@shared_task(queue='default')
def process_error_urls(batch_size=50):
task = "Process error URLs"
logger.info("Task triggered: {}".format(task))
DB_Handler().process_error_urls(batch_size=batch_size)
logger.info("Task completed: {}".format(task))
@job('default')
@shared_task(queue='low')
def process_missing_kids_urls(batch_size=None, process_status_only=None):
task = "Process Missing Kids URLs - batch_size={} process_status_only={}".format(batch_size, process_status_only)
logger.info("Task triggered: {}".format(task))
DB_Handler().process_missing_kids_urls(batch_size=batch_size, process_status_only=process_status_only)
logger.info("Task completed: {}".format(task))
@job('default')
@shared_task(queue='default')
def clean_old_url_content(older_than_days=14):
task = "Clean old URL content"
logger.info("Task triggered: {}".format(task))
@@ -75,6 +76,7 @@ def clean_old_url_content(older_than_days=14):
logger.info("Task completed: {}".format(task))
'''
@job('default')
def background_task(process_type: str):
logger.info("Task triggered: {}".format(process_type))
@@ -143,3 +145,4 @@ def background_task(process_type: str):
logger.info("Task completed: {}".format(process_type))
except Exception as e:
logger.error(e)
'''

View File

@@ -7,8 +7,6 @@ urlpatterns = [
path('logs/database', views.log_db, name='log_db'),
path('logs/<str:log_type>', views.logs, name='logs'),
#
path('task/<str:task>', views.trigger_task, name='trigger_task'),
#
path('urls/charts/', views.charts, name='charts'),
path('urls-by-fetch-date/', views.urls_by_fetch_date, name='urls_by_fetch_date'),
path('urls-per-status/', views.urls_per_status, name='urls_per_status'),

View File

@@ -1,4 +1,4 @@
from .views_base import link_list, logs, log_db, trigger_task
from .views_base import link_list, logs, log_db #, trigger_task,
from django.core.paginator import Paginator
from django.shortcuts import render, get_object_or_404

View File

@@ -1,15 +1,17 @@
import os
from .tasks import background_task
from django.http import JsonResponse, HttpResponse
from django.db import connection
####################################################################################################
"""
### from .tasks import background_task
def trigger_task(request, task):
# Enqueue function in "default" queue
background_task.delay(task)
return JsonResponse({"message": "Task has been enqueued!", "task": task})
"""
####################################################################################################
def link_list(request):
# Base URL path
app_url = request.build_absolute_uri()
@@ -19,8 +21,8 @@ def link_list(request):
# List of links
list_links = \
[ os.path.join(app_url, "admin"), os.path.join(app_url, "urls") ] + \
[ os.path.join(app_url, "logs", log_type) for log_type in ["database", "debug", "info", "warning"] ] + \
[ os.path.join(app_url, "task", l) for l in links_fetch + links_process ]
[ os.path.join(app_url, "logs", log_type) for log_type in ["database", "debug", "info", "warning"] ] #+ \
#[ os.path.join(app_url, "task", l) for l in links_fetch + links_process ]
# Links tuple
links = [(l, l) for l in list_links]
@@ -32,6 +34,7 @@ def link_list(request):
return HttpResponse(html)
####################################################################################################
def logs(request, log_type):
# Capture output: python manage.py rqstats