diff --git a/app_urls/fetcher/src/db_utils.py b/app_urls/fetcher/src/db_utils.py index 18a4618..5ef6fa1 100644 --- a/app_urls/fetcher/src/db_utils.py +++ b/app_urls/fetcher/src/db_utils.py @@ -100,7 +100,7 @@ class DB_Handler(): # URLs duplciate association UrlsDuplicate.objects.get_or_create(id_url_canonical=obj_url_canonical, id_url_duplicated=obj_url) - def _process_single_url(self, obj_url, status_pattern_match, raise_exception_on_error, paywall_bypass=False): + def _process_single_url(self, obj_url, status_pattern_match, raise_exception_on_error, paywall_bypass=False, request_timeout=15): ########################################################################## # URL pattern: missingkids.org/poster OR missingkids.org/new-poster if ("missingkids.org" in obj_url.url) and ("poster" in obj_url.url): @@ -147,8 +147,7 @@ class DB_Handler(): try: # Extract URL content - dict_url_data = process_url(obj_url.url, paywall_bypass) - logger.debug("Processing raw URL EXTRACT URL CONTENT OK: {}".format(obj_url.url)) + dict_url_data = process_url(obj_url.url, paywall_bypass, request_timeout) except Exception as e: if (raise_exception_on_error): # Simply raise exception, handled in a different way @@ -238,12 +237,10 @@ class DB_Handler(): # Per URL for obj_url in raw_urls: - logger.debug("Processing raw URL: {}".format(obj_url.url)) # Override status if pattern matching? status_pattern_match = _get_status_pattern_matching(obj_url.url, list_pattern_status_tuple) # Process URL self._process_single_url(obj_url, status_pattern_match, raise_exception_on_error=False) - logger.debug("Processing raw URL OK: {}".format(obj_url.url)) logger.info("Updated #{} raw URLs".format(len(raw_urls))) except Exception as e: diff --git a/app_urls/fetcher/src/fetch_utils_url_processor.py b/app_urls/fetcher/src/fetch_utils_url_processor.py index 0fef5a3..5ae6d9e 100644 --- a/app_urls/fetcher/src/fetch_utils_url_processor.py +++ b/app_urls/fetcher/src/fetch_utils_url_processor.py @@ -39,7 +39,7 @@ def url_host_slowdown(url, url_host_slowdown_seconds): # About to process URL host, cache time cache.set("process_{}".format(url_host).encode("utf-8"), time.time(), timeout=60*5) # Expire after 5 minutes -def process_url(url, paywall_bypass=False): +def process_url(url, paywall_bypass=False, request_timeout=15): logger.debug("Processing raw URL 1: {}".format(url)) if (paywall_bypass): @@ -59,7 +59,7 @@ def process_url(url, paywall_bypass=False): # Process if ("foxnews.com" in url_of_interest) or ("zerohedge" in url_of_interest): # Request - r = requests.get(url, headers={"User-Agent": user_agent}, timeout=15) + r = requests.get(url, headers={"User-Agent": user_agent}, timeout=request_timeout) # Raise for error code r.raise_for_status() # Parse @@ -68,7 +68,7 @@ def process_url(url, paywall_bypass=False): # Config: Fake user agent config = newspaper.configuration.Configuration() config.headers = {'User-Agent': user_agent} - config.request_timeout = 15 # timeout in seconds + config.request_timeout = request_timeout # Default mode article = newspaper.article(url_of_interest, config=config) @@ -110,7 +110,7 @@ def process_url(url, paywall_bypass=False): # Try simple request, valid response but couldn't parse article? e.g. getting blocked? -> unknown time.sleep(0.25) - r = requests.get(url_of_interest, timeout=15) + r = requests.get(url_of_interest, timeout=request_timeout) if (r.status_code == 200): return {"override_status": "unknown"} else: diff --git a/app_urls/fetcher/tasks.py b/app_urls/fetcher/tasks.py index 9249d25..606d897 100644 --- a/app_urls/fetcher/tasks.py +++ b/app_urls/fetcher/tasks.py @@ -13,13 +13,30 @@ from .src.logger import get_logger logger = get_logger() +@shared_task(queue='light') +def process_raw_urls(batch_size=100): + task = "Process raw URLs" + logger.info("Task triggered: {}".format(task)) + DB_Handler().process_raw_urls(batch_size=batch_size) + logger.info("Task completed: {}".format(task)) + @shared_task(queue='default') +def process_error_urls(batch_size=50): + task = "Process error URLs" + logger.info("Task triggered: {}".format(task)) + DB_Handler().process_error_urls(batch_size=batch_size) + logger.info("Task completed: {}".format(task)) + + + +@shared_task(queue='light') def fetch_feeds(): task = "Fetch Feeds" logger.info("Task triggered: {}".format(task)) FetchFeeds().run() logger.info("Task completed: {}".format(task)) + @shared_task(queue='default') def fetch_parser(): task = "Fetch Parser" @@ -34,41 +51,31 @@ def fetch_search(): FetchSearcher().run() logger.info("Task completed: {}".format(task)) -@shared_task(queue='low') + + +@shared_task(queue='heavy') def fetch_selenium_search(): task = "Fetch Selenium search" logger.info("Task triggered: {}".format(task)) FetchSeleniumSourceSearch().run() logger.info("Task completed: {}".format(task)) -@shared_task(queue='low') +@shared_task(queue='heavy') def fetch_missing_kids(number_pages=5): task = "Fetch MissingKids" logger.info("Task triggered: {}".format(task)) FetchMissingKids().run(number_pages) logger.info("Task completed: {}".format(task)) -@shared_task(queue='default') -def process_raw_urls(batch_size=100): - task = "Process raw URLs" - logger.info("Task triggered: {}".format(task)) - DB_Handler().process_raw_urls(batch_size=batch_size) - logger.info("Task completed: {}".format(task)) - -@shared_task(queue='default') -def process_error_urls(batch_size=50): - task = "Process error URLs" - logger.info("Task triggered: {}".format(task)) - DB_Handler().process_error_urls(batch_size=batch_size) - logger.info("Task completed: {}".format(task)) - -@shared_task(queue='low') +@shared_task(queue='heavy') def process_missing_kids_urls(batch_size=None, process_status_only=None): task = "Process Missing Kids URLs - batch_size={} process_status_only={}".format(batch_size, process_status_only) logger.info("Task triggered: {}".format(task)) DB_Handler().process_missing_kids_urls(batch_size=batch_size, process_status_only=process_status_only) logger.info("Task completed: {}".format(task)) + + @shared_task(queue='default') def clean_old_url_content(older_than_days=14): task = "Clean old URL content" @@ -76,7 +83,7 @@ def clean_old_url_content(older_than_days=14): DB_Handler().clean_old_url_content(older_than_days=older_than_days) logger.info("Task completed: {}".format(task)) -@shared_task(queue='default') +@shared_task(queue='light') def notify_status(): task = "Notify status" logger.info("Task triggered: {}".format(task)) diff --git a/app_urls/supervisord.conf b/app_urls/supervisord.conf index af131de..158d25b 100644 --- a/app_urls/supervisord.conf +++ b/app_urls/supervisord.conf @@ -40,14 +40,27 @@ redirect_stderr=true stdout_logfile_maxbytes=20MB stdout_logfile_backups=1 -[program:worker_low] -command=celery -A core worker -l info --logfile=/opt/logs/worker_low.log --concurrency=1 -Q low -n low +[program:worker_light] +command=celery -A core worker -l info --logfile=/opt/logs/worker_light.log --concurrency=1 -Q light -n light directory=/opt/app autostart=true autorestart=true ; Unified log file -stdout_logfile=/opt/logs/worker_low.log -stderr_logfile=/opt/logs/worker_low.log +stdout_logfile=/opt/logs/worker_light.log +stderr_logfile=/opt/logs/worker_light.log +redirect_stderr=true +; Rotate when file reaches max size +stdout_logfile_maxbytes=20MB +stdout_logfile_backups=1 + +[program:worker_heavy] +command=celery -A core worker -l info --logfile=/opt/logs/worker_heavy.log --concurrency=1 -Q heavy -n heavy +directory=/opt/app +autostart=true +autorestart=true +; Unified log file +stdout_logfile=/opt/logs/worker_heavy.log +stderr_logfile=/opt/logs/worker_heavy.log redirect_stderr=true ; Rotate when file reaches max size stdout_logfile_maxbytes=20MB