Error URLs proces with marreta. Env vars update

This commit is contained in:
Luciano Gervasoni
2025-04-16 11:50:55 +02:00
parent b876f6d720
commit 148ec72658
6 changed files with 44 additions and 26 deletions

8
.env
View File

@@ -35,6 +35,8 @@ FETCHER_URL_HOST_SLEEP=1.5
FETCHER_GOOGLE_GENERAL_PAGE_ITER_SLEEP=5 FETCHER_GOOGLE_GENERAL_PAGE_ITER_SLEEP=5
FETCHER_BETWEEN_SEARCHES_SLEEP=1 FETCHER_BETWEEN_SEARCHES_SLEEP=1
FETCHER_LANGUAGE_DETECTION_MIN_CHAR=100 FETCHER_LANGUAGE_DETECTION_MIN_CHAR=100
FETCHER_INSERT_URL_CACHE_TIME=86400
FETCHER_ERROR_URL_CACHE_TIME=172800
# Selenium # Selenium
SELENIUM_ENDPOINT=http://fetcher_app_selenium:80 SELENIUM_ENDPOINT=http://fetcher_app_selenium:80
@@ -43,4 +45,8 @@ ENDPOINT_OLLAMA=https://ollamamodel.matitos.org
# APP: Selenium # APP: Selenium
ARCH=amd64 # arm64, amd64 ARCH=amd64 # arm64, amd64
SELENIUM_SLEEP_PER_PAGE=4 SELENIUM_SLEEP_PER_PAGE=4
PATH_LOGS_DIRECTORY=/opt/logs PATH_LOGS_DIRECTORY=/opt/logs
# Deploy resources per App
DEPLOY_CPUS=2
DEPLOY_RAM=4G

3
.gitignore vendored
View File

@@ -2,4 +2,5 @@ __pycache__/
*.pyc *.pyc
**/credentials.py **/credentials.py
logs/ logs/
postgres/ postgres/
docker_data/

View File

@@ -6,16 +6,14 @@ from django.utils import timezone
from datetime import timedelta from datetime import timedelta
from .url_processor import process_url, get_with_protocol from .url_processor import process_url, get_with_protocol
import re import re
import os
import traceback import traceback
from .logger import get_logger from .logger import get_logger
logger = get_logger() logger = get_logger()
class DB_Handler(): class DB_Handler():
def __init__(self): def __init__(self):
# Inserting raw URL, cache time: 1 day pass
self._cache_timeout_insert_url = 86400
# Processing error URL, cache time: 2 days
self._cache_timeout_error_url = 86400*2
def insert_raw_urls(self, urls, obj_source, obj_search): def insert_raw_urls(self, urls, obj_source, obj_search):
try: try:
@@ -75,8 +73,8 @@ class DB_Handler():
# Insert or update cache # Insert or update cache
for url in urls_clean: for url in urls_clean:
cache.set("insert_{}".format(url), True, timeout=self._cache_timeout_insert_url) cache.set("insert_{}".format(url), True, timeout=int(os.getenv("FETCHER_INSERT_URL_CACHE_TIME", 86400)))
cache.set("insert_{}{}{}".format(url, obj_source.source, obj_search.search), True, timeout=self._cache_timeout_insert_url) cache.set("insert_{}{}{}".format(url, obj_source.source, obj_search.search), True, timeout=int(os.getenv("FETCHER_INSERT_URL_CACHE_TIME", 86400)))
logger.info("Inserted #{} raw URLs, Source-Search {} - {}".format(len(urls_to_insert), obj_source.source, obj_search.search)) logger.info("Inserted #{} raw URLs, Source-Search {} - {}".format(len(urls_to_insert), obj_source.source, obj_search.search))
@@ -84,7 +82,7 @@ class DB_Handler():
logger.warning("Exception inserting raw URLs: {}\n{}".format(e, traceback.format_exc())) logger.warning("Exception inserting raw URLs: {}\n{}".format(e, traceback.format_exc()))
def _process_single_url(self, obj_url, status_pattern_match, raise_exception_on_error): def _process_single_url(self, obj_url, status_pattern_match, raise_exception_on_error, override_url=None):
def set_status(obj_url, status): def set_status(obj_url, status):
# Update status if setting a new value # Update status if setting a new value
@@ -102,8 +100,13 @@ class DB_Handler():
return return
try: try:
# Override URL for request?
if (override_url is not None):
url_of_interest = override_url
else:
url_of_interest = obj_url.url
# Extract URL content # Extract URL content
dict_url_data = process_url(obj_url.url) dict_url_data = process_url(url_of_interest)
except Exception as e: except Exception as e:
if (raise_exception_on_error): if (raise_exception_on_error):
# Simply raise exception, handled in a different way # Simply raise exception, handled in a different way
@@ -232,11 +235,12 @@ class DB_Handler():
try: try:
# Process URL # Process URL
self._process_single_url(obj_url, status_pattern_match=None, raise_exception_on_error=True) override_url = "https://marreta.pcdomanual.com/p/{}".format(obj_url.url)
self._process_single_url(obj_url, status_pattern_match=None, raise_exception_on_error=True, override_url=override_url)
num_urls_processed += 1 num_urls_processed += 1
except Exception as e: except Exception as e:
# Error, cache to avoid re-processing for X time # Error, cache to avoid re-processing for X time
cache.set("error_{}".format(obj_url.id), True, timeout=self._cache_timeout_insert_url) cache.set("error_{}".format(obj_url.id), True, timeout=int(os.getenv("FETCHER_ERROR_URL_CACHE_TIME", 86400)))
num_urls_skipped += 1 num_urls_skipped += 1
# Get following batch of URLs, status='error' # Get following batch of URLs, status='error'

View File

@@ -100,6 +100,7 @@ def background_task(process_type: str):
number_pages = int(process_type.split("_")[-1]) number_pages = int(process_type.split("_")[-1])
except Exception as e: except Exception as e:
number_pages = -1 number_pages = -1
FetchMissingKids().run(number_pages=number_pages) FetchMissingKids().run(number_pages=number_pages)
elif ("process_" in process_type): elif ("process_" in process_type):
@@ -108,6 +109,7 @@ def background_task(process_type: str):
batch_size = int(process_type.split("_")[-1]) batch_size = int(process_type.split("_")[-1])
except Exception as e: except Exception as e:
batch_size = None batch_size = None
# Task type # Task type
if ("process_raw_urls" in process_type): if ("process_raw_urls" in process_type):
DB_Handler().process_raw_urls(batch_size=batch_size) DB_Handler().process_raw_urls(batch_size=batch_size)
@@ -122,6 +124,7 @@ def background_task(process_type: str):
older_than_days = float(process_type.split("_")[-1]) older_than_days = float(process_type.split("_")[-1])
except Exception as e: except Exception as e:
older_than_days = None older_than_days = None
DB_Handler().clean_old_url_content(older_than_days=older_than_days) DB_Handler().clean_old_url_content(older_than_days=older_than_days)
else: else:

View File

@@ -22,8 +22,8 @@ services:
deploy: deploy:
resources: resources:
limits: limits:
cpus: '4' cpus: '${DEPLOY_CPUS}'
memory: 4G memory: ${DEPLOY_RAM}
fetcher_app_urls: fetcher_app_urls:
image: fetcher_app_urls image: fetcher_app_urls
@@ -56,9 +56,11 @@ services:
# Fetcher # Fetcher
- FETCHER_GNEWS_DECODE_SLEEP=${FETCHER_GNEWS_DECODE_SLEEP} - FETCHER_GNEWS_DECODE_SLEEP=${FETCHER_GNEWS_DECODE_SLEEP}
- FETCHER_GOOGLE_GENERAL_PAGE_ITER_SLEEP=${FETCHER_GOOGLE_GENERAL_PAGE_ITER_SLEEP} - FETCHER_GOOGLE_GENERAL_PAGE_ITER_SLEEP=${FETCHER_GOOGLE_GENERAL_PAGE_ITER_SLEEP}
- FETCHER_BETWEEN_SEARCHES_SLEEP=${FETCHER_BETWEEN_SEARCHES_SLEEP} - FETCHER_BETWEEN_SEARCHES_SLEEP=${FETCHER_BETWEEN_SEARCHES_SLEEP} # Sleep time between each search
- FETCHER_URL_HOST_SLEEP=${FETCHER_URL_HOST_SLEEP} - FETCHER_URL_HOST_SLEEP=${FETCHER_URL_HOST_SLEEP} # Sleep time between requests to same URL host
- FETCHER_LANGUAGE_DETECTION_MIN_CHAR=100 - FETCHER_LANGUAGE_DETECTION_MIN_CHAR=${FETCHER_LANGUAGE_DETECTION_MIN_CHAR} # Min amonut of characters to run language detection
- FETCHER_INSERT_URL_CACHE_TIME=${FETCHER_INSERT_URL_CACHE_TIME} # Cache time: Insert raw URL
- FETCHER_ERROR_URL_CACHE_TIME=${FETCHER_ERROR_URL_CACHE_TIME} # Cache time: Error on processing URL
# Selenium # Selenium
- SELENIUM_ENDPOINT=${SELENIUM_ENDPOINT} - SELENIUM_ENDPOINT=${SELENIUM_ENDPOINT}
- ENDPOINT_OLLAMA=${ENDPOINT_OLLAMA} - ENDPOINT_OLLAMA=${ENDPOINT_OLLAMA}
@@ -77,8 +79,8 @@ services:
deploy: deploy:
resources: resources:
limits: limits:
cpus: '4' cpus: '${DEPLOY_CPUS}'
memory: 4G memory: ${DEPLOY_RAM}
#labels: # Reverse proxy sample #labels: # Reverse proxy sample
# - "traefik.enable=true" # - "traefik.enable=true"
# - "traefik.http.routers.fetcher.rule=Host(`urls.yourdomain.com`)" # - "traefik.http.routers.fetcher.rule=Host(`urls.yourdomain.com`)"

View File

@@ -22,8 +22,8 @@ services:
deploy: deploy:
resources: resources:
limits: limits:
cpus: '4' cpus: '${DEPLOY_CPUS}'
memory: 4G memory: ${DEPLOY_RAM}
fetcher_app_urls: fetcher_app_urls:
image: fetcher_app_urls image: fetcher_app_urls
@@ -56,9 +56,11 @@ services:
# Fetcher # Fetcher
- FETCHER_GNEWS_DECODE_SLEEP=${FETCHER_GNEWS_DECODE_SLEEP} - FETCHER_GNEWS_DECODE_SLEEP=${FETCHER_GNEWS_DECODE_SLEEP}
- FETCHER_GOOGLE_GENERAL_PAGE_ITER_SLEEP=${FETCHER_GOOGLE_GENERAL_PAGE_ITER_SLEEP} - FETCHER_GOOGLE_GENERAL_PAGE_ITER_SLEEP=${FETCHER_GOOGLE_GENERAL_PAGE_ITER_SLEEP}
- FETCHER_BETWEEN_SEARCHES_SLEEP=${FETCHER_BETWEEN_SEARCHES_SLEEP} - FETCHER_BETWEEN_SEARCHES_SLEEP=${FETCHER_BETWEEN_SEARCHES_SLEEP} # Sleep time between each search
- FETCHER_URL_HOST_SLEEP=${FETCHER_URL_HOST_SLEEP} - FETCHER_URL_HOST_SLEEP=${FETCHER_URL_HOST_SLEEP} # Sleep time between requests to same URL host
- FETCHER_LANGUAGE_DETECTION_MIN_CHAR=100 - FETCHER_LANGUAGE_DETECTION_MIN_CHAR=${FETCHER_LANGUAGE_DETECTION_MIN_CHAR} # Min amonut of characters to run language detection
- FETCHER_INSERT_URL_CACHE_TIME=${FETCHER_INSERT_URL_CACHE_TIME} # Cache time: Insert raw URL
- FETCHER_ERROR_URL_CACHE_TIME=${FETCHER_ERROR_URL_CACHE_TIME} # Cache time: Error on processing URL
# Selenium # Selenium
- SELENIUM_ENDPOINT=${SELENIUM_ENDPOINT} - SELENIUM_ENDPOINT=${SELENIUM_ENDPOINT}
- ENDPOINT_OLLAMA=${ENDPOINT_OLLAMA} - ENDPOINT_OLLAMA=${ENDPOINT_OLLAMA}
@@ -77,8 +79,8 @@ services:
deploy: deploy:
resources: resources:
limits: limits:
cpus: '4' cpus: '${DEPLOY_CPUS}'
memory: 4G memory: ${DEPLOY_RAM}
labels: # Reverse proxy sample labels: # Reverse proxy sample
- "traefik.enable=true" - "traefik.enable=true"
- "traefik.http.routers.fetcher.rule=Host(`${REVERSE_PROXY_URL}`)" - "traefik.http.routers.fetcher.rule=Host(`${REVERSE_PROXY_URL}`)"