Workers light,default,heavy

This commit is contained in:
Luciano Gervasoni
2025-09-08 12:34:47 +02:00
parent 892fb984d1
commit 2ed86e31ec
4 changed files with 48 additions and 31 deletions

View File

@@ -100,7 +100,7 @@ class DB_Handler():
# URLs duplciate association
UrlsDuplicate.objects.get_or_create(id_url_canonical=obj_url_canonical, id_url_duplicated=obj_url)
def _process_single_url(self, obj_url, status_pattern_match, raise_exception_on_error, paywall_bypass=False):
def _process_single_url(self, obj_url, status_pattern_match, raise_exception_on_error, paywall_bypass=False, request_timeout=15):
##########################################################################
# URL pattern: missingkids.org/poster OR missingkids.org/new-poster
if ("missingkids.org" in obj_url.url) and ("poster" in obj_url.url):
@@ -147,8 +147,7 @@ class DB_Handler():
try:
# Extract URL content
dict_url_data = process_url(obj_url.url, paywall_bypass)
logger.debug("Processing raw URL EXTRACT URL CONTENT OK: {}".format(obj_url.url))
dict_url_data = process_url(obj_url.url, paywall_bypass, request_timeout)
except Exception as e:
if (raise_exception_on_error):
# Simply raise exception, handled in a different way
@@ -238,12 +237,10 @@ class DB_Handler():
# Per URL
for obj_url in raw_urls:
logger.debug("Processing raw URL: {}".format(obj_url.url))
# Override status if pattern matching?
status_pattern_match = _get_status_pattern_matching(obj_url.url, list_pattern_status_tuple)
# Process URL
self._process_single_url(obj_url, status_pattern_match, raise_exception_on_error=False)
logger.debug("Processing raw URL OK: {}".format(obj_url.url))
logger.info("Updated #{} raw URLs".format(len(raw_urls)))
except Exception as e:

View File

@@ -39,7 +39,7 @@ def url_host_slowdown(url, url_host_slowdown_seconds):
# About to process URL host, cache time
cache.set("process_{}".format(url_host).encode("utf-8"), time.time(), timeout=60*5) # Expire after 5 minutes
def process_url(url, paywall_bypass=False):
def process_url(url, paywall_bypass=False, request_timeout=15):
logger.debug("Processing raw URL 1: {}".format(url))
if (paywall_bypass):
@@ -59,7 +59,7 @@ def process_url(url, paywall_bypass=False):
# Process
if ("foxnews.com" in url_of_interest) or ("zerohedge" in url_of_interest):
# Request
r = requests.get(url, headers={"User-Agent": user_agent}, timeout=15)
r = requests.get(url, headers={"User-Agent": user_agent}, timeout=request_timeout)
# Raise for error code
r.raise_for_status()
# Parse
@@ -68,7 +68,7 @@ def process_url(url, paywall_bypass=False):
# Config: Fake user agent
config = newspaper.configuration.Configuration()
config.headers = {'User-Agent': user_agent}
config.request_timeout = 15 # timeout in seconds
config.request_timeout = request_timeout
# Default mode
article = newspaper.article(url_of_interest, config=config)
@@ -110,7 +110,7 @@ def process_url(url, paywall_bypass=False):
# Try simple request, valid response but couldn't parse article? e.g. getting blocked? -> unknown
time.sleep(0.25)
r = requests.get(url_of_interest, timeout=15)
r = requests.get(url_of_interest, timeout=request_timeout)
if (r.status_code == 200):
return {"override_status": "unknown"}
else:

View File

@@ -13,13 +13,30 @@ from .src.logger import get_logger
logger = get_logger()
@shared_task(queue='light')
def process_raw_urls(batch_size=100):
task = "Process raw URLs"
logger.info("Task triggered: {}".format(task))
DB_Handler().process_raw_urls(batch_size=batch_size)
logger.info("Task completed: {}".format(task))
@shared_task(queue='default')
def process_error_urls(batch_size=50):
task = "Process error URLs"
logger.info("Task triggered: {}".format(task))
DB_Handler().process_error_urls(batch_size=batch_size)
logger.info("Task completed: {}".format(task))
@shared_task(queue='light')
def fetch_feeds():
task = "Fetch Feeds"
logger.info("Task triggered: {}".format(task))
FetchFeeds().run()
logger.info("Task completed: {}".format(task))
@shared_task(queue='default')
def fetch_parser():
task = "Fetch Parser"
@@ -34,41 +51,31 @@ def fetch_search():
FetchSearcher().run()
logger.info("Task completed: {}".format(task))
@shared_task(queue='low')
@shared_task(queue='heavy')
def fetch_selenium_search():
task = "Fetch Selenium search"
logger.info("Task triggered: {}".format(task))
FetchSeleniumSourceSearch().run()
logger.info("Task completed: {}".format(task))
@shared_task(queue='low')
@shared_task(queue='heavy')
def fetch_missing_kids(number_pages=5):
task = "Fetch MissingKids"
logger.info("Task triggered: {}".format(task))
FetchMissingKids().run(number_pages)
logger.info("Task completed: {}".format(task))
@shared_task(queue='default')
def process_raw_urls(batch_size=100):
task = "Process raw URLs"
logger.info("Task triggered: {}".format(task))
DB_Handler().process_raw_urls(batch_size=batch_size)
logger.info("Task completed: {}".format(task))
@shared_task(queue='default')
def process_error_urls(batch_size=50):
task = "Process error URLs"
logger.info("Task triggered: {}".format(task))
DB_Handler().process_error_urls(batch_size=batch_size)
logger.info("Task completed: {}".format(task))
@shared_task(queue='low')
@shared_task(queue='heavy')
def process_missing_kids_urls(batch_size=None, process_status_only=None):
task = "Process Missing Kids URLs - batch_size={} process_status_only={}".format(batch_size, process_status_only)
logger.info("Task triggered: {}".format(task))
DB_Handler().process_missing_kids_urls(batch_size=batch_size, process_status_only=process_status_only)
logger.info("Task completed: {}".format(task))
@shared_task(queue='default')
def clean_old_url_content(older_than_days=14):
task = "Clean old URL content"
@@ -76,7 +83,7 @@ def clean_old_url_content(older_than_days=14):
DB_Handler().clean_old_url_content(older_than_days=older_than_days)
logger.info("Task completed: {}".format(task))
@shared_task(queue='default')
@shared_task(queue='light')
def notify_status():
task = "Notify status"
logger.info("Task triggered: {}".format(task))

View File

@@ -40,14 +40,27 @@ redirect_stderr=true
stdout_logfile_maxbytes=20MB
stdout_logfile_backups=1
[program:worker_low]
command=celery -A core worker -l info --logfile=/opt/logs/worker_low.log --concurrency=1 -Q low -n low
[program:worker_light]
command=celery -A core worker -l info --logfile=/opt/logs/worker_light.log --concurrency=1 -Q light -n light
directory=/opt/app
autostart=true
autorestart=true
; Unified log file
stdout_logfile=/opt/logs/worker_low.log
stderr_logfile=/opt/logs/worker_low.log
stdout_logfile=/opt/logs/worker_light.log
stderr_logfile=/opt/logs/worker_light.log
redirect_stderr=true
; Rotate when file reaches max size
stdout_logfile_maxbytes=20MB
stdout_logfile_backups=1
[program:worker_heavy]
command=celery -A core worker -l info --logfile=/opt/logs/worker_heavy.log --concurrency=1 -Q heavy -n heavy
directory=/opt/app
autostart=true
autorestart=true
; Unified log file
stdout_logfile=/opt/logs/worker_heavy.log
stderr_logfile=/opt/logs/worker_heavy.log
redirect_stderr=true
; Rotate when file reaches max size
stdout_logfile_maxbytes=20MB