25 Commits

Author SHA1 Message Date
Luciano Gervasoni
cbc422df36 Notify status bot token 2025-10-16 10:38:32 +02:00
Luciano Gervasoni
dc784dabec Notify status schedule 2025-10-16 10:12:17 +02:00
Luciano Gervasoni
d8ef738d19 Notifier fix 2025-10-14 13:10:54 +02:00
Luciano Gervasoni
2f035a4222 Notifier fix 2025-10-14 12:23:05 +02:00
Luciano Gervasoni
e057568af0 Telegram bot tokens 2025-10-14 11:36:19 +02:00
Luciano Gervasoni
7924857fe5 Schools NL tuples, traceback on notify err 2025-10-14 11:33:17 +02:00
Luciano Gervasoni
f44b784715 Notifications, info and warning, try catch 2025-09-09 22:06:23 +02:00
Luciano Gervasoni
24510d26e2 Notifications, info and warning 2025-09-08 17:55:03 +02:00
Luciano Gervasoni
ef51a96db6 Process missing kids url based on API endpoint, fix2 2025-09-08 16:20:39 +02:00
Luciano Gervasoni
079b2473f8 Process missing kids url based on API endpoint 2025-09-08 16:12:27 +02:00
Luciano Gervasoni
1fbc5beb6e URL Logs 2025-09-08 12:45:53 +02:00
Luciano Gervasoni
7886d16264 Flower allow API handling 2025-09-08 12:44:48 +02:00
Luciano Gervasoni
2ed86e31ec Workers light,default,heavy 2025-09-08 12:34:47 +02:00
Luciano Gervasoni
892fb984d1 Debug enlarge 2025-09-05 14:18:33 +02:00
Luciano Gervasoni
c17f09a94f Debug enlarge 2025-09-05 14:06:10 +02:00
Luciano Gervasoni
e4a325d6b4 Request timeout debugging 2025-09-05 14:00:50 +02:00
Luciano Gervasoni
2fae0a3a9d Request timeout 2025-09-05 13:52:34 +02:00
Luciano Gervasoni
35f9260b94 Debug process raw url 2025-09-05 13:45:16 +02:00
Luciano Gervasoni
b40611bd3e Flower port update 2025-09-04 09:26:44 +02:00
Luciano Gervasoni
346d7c9187 Debug workers 2025-09-04 09:04:04 +02:00
Luciano Gervasoni
a21ff9c5d6 Celery scheduler DB based 2025-09-04 08:46:04 +02:00
Luciano Gervasoni
7b0d24309c Redis cache and celery, avoid overflow (3) 2025-09-03 23:20:56 +02:00
Luciano Gervasoni
334062b0ec Redis cache and celery, avoid overflow 2025-09-03 23:18:43 +02:00
Luciano Gervasoni
a9074f45b5 Redis cache and celery, avoid overflow 2025-09-03 23:07:03 +02:00
Luciano Gervasoni
569e7d4676 Disable fetch missing kids all 2025-08-28 11:23:47 +02:00
16 changed files with 439 additions and 148 deletions

View File

@@ -24,8 +24,10 @@ DB_PASSWORD=supermatitos
DB_USER=supermatitos DB_USER=supermatitos
DB_HOST=fetcher_db DB_HOST=fetcher_db
DB_PORT=5432 DB_PORT=5432
REDIS_HOST=fetcher_redis REDIS_CACHE_HOST=fetcher_redis_cache
REDIS_PORT=6379 REDIS_CACHE_PORT=6379
REDIS_CELERY_HOST=fetcher_redis_celery
REDIS_CELERY_PORT=6379
# Job timeout: 30 min # Job timeout: 30 min
JOB_DEFAULT_TIMEOUT=1800 JOB_DEFAULT_TIMEOUT=1800
@@ -55,3 +57,9 @@ PEXELS_API_KEY=Y6clJkY32eihf34ukX4JsINYu9lzxh3xDdNq2HMAmGwXp0a0tt6vr6S9
# Ollama # Ollama
ENDPOINT_OLLAMA=https://ollamamodelnpu.matitos.org ENDPOINT_OLLAMA=https://ollamamodelnpu.matitos.org
OLLAMA_MODEL_DEFAULT=qwen2.5-instruct:3b OLLAMA_MODEL_DEFAULT=qwen2.5-instruct:3b
# Telegram
TELEGRAM_INFO_BOT_TOKEN="..."
TELEGRAM_INFO_CHAT_ID="..."
TELEGRAM_WARNING_BOT_TOKEN="..."
TELEGRAM_WARNING_CHAT_ID="..."

View File

@@ -97,9 +97,10 @@ DATABASES = {
CACHES = { CACHES = {
"default": { "default": {
"BACKEND": "django_redis.cache.RedisCache", "BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://{}:{}".format( "LOCATION": "redis://{}:{}/{}".format(
os.environ.get("REDIS_HOST", "localhost"), os.environ.get("REDIS_CACHE_HOST", "localhost"),
os.environ.get("REDIS_PORT", 6379) os.environ.get("REDIS_CACHE_PORT", 6379),
2 # DB for Caching
), ),
"OPTIONS": { "OPTIONS": {
"MEMCACHE_MAX_KEY_LENGTH": 2048, "MEMCACHE_MAX_KEY_LENGTH": 2048,
@@ -108,13 +109,14 @@ CACHES = {
} }
} }
# Celery configuration # Celery configuration
CELERY_BROKER_URL = 'redis://{}:{}/{}'.format(os.environ.get("REDIS_HOST", "localhost"), os.environ.get("REDIS_PORT", 6379), os.environ.get("REDIS_DB", 0)) CELERY_BROKER_URL = 'redis://{}:{}/{}'.format(os.environ.get("REDIS_CELERY_HOST", "localhost"), os.environ.get("REDIS_CELERY_PORT", 6379), 0)
CELERY_RESULT_BACKEND = 'redis://{}:{}/{}'.format(os.environ.get("REDIS_HOST", "localhost"), os.environ.get("REDIS_PORT", 6379), os.environ.get("REDIS_DB_RESULTS", 1)) CELERY_RESULT_BACKEND = 'redis://{}:{}/{}'.format(os.environ.get("REDIS_CELERY_HOST", "localhost"), os.environ.get("REDIS_CELERY_PORT", 6379), 1)
CELERY_ACCEPT_CONTENT = ['json'] CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json' CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_EXPIRES = 3600 # Auto clean results after 1 hour
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = "UTC"
# Celery Beat scheduler (required for django-celery-beat to work) # Celery Beat scheduler (required for django-celery-beat to work)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers.DatabaseScheduler' CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers.DatabaseScheduler'

View File

@@ -4,7 +4,8 @@ from django.core.cache import cache
from django.db import IntegrityError from django.db import IntegrityError
from django.utils import timezone from django.utils import timezone
from datetime import timedelta from datetime import timedelta
from .fetch_utils_url_processor import process_url, get_with_protocol, url_host_slowdown from .fetch_utils_url_processor import process_url, verify_missing_kid_url
from .utils import get_with_protocol
import re import re
import requests import requests
import os import os
@@ -16,7 +17,7 @@ class DB_Handler():
def __init__(self): def __init__(self):
pass pass
def insert_raw_urls(self, urls, obj_source, obj_search): def insert_raw_urls(self, urls, obj_source, obj_search):
try: try:
logger.debug("Inserting raw URLs") logger.debug("Inserting raw URLs")
# Empty? # Empty?
@@ -100,15 +101,13 @@ class DB_Handler():
# URLs duplciate association # URLs duplciate association
UrlsDuplicate.objects.get_or_create(id_url_canonical=obj_url_canonical, id_url_duplicated=obj_url) UrlsDuplicate.objects.get_or_create(id_url_canonical=obj_url_canonical, id_url_duplicated=obj_url)
def _process_single_url(self, obj_url, status_pattern_match, raise_exception_on_error, paywall_bypass=False): def _process_single_url(self, obj_url, status_pattern_match, raise_exception_on_error, paywall_bypass=False, request_timeout=15):
########################################################################## ##########################################################################
# URL pattern: missingkids.org/poster OR missingkids.org/new-poster # URL pattern: missingkids.org/poster OR missingkids.org/new-poster
if ("missingkids.org" in obj_url.url) and ("poster" in obj_url.url): if ("missingkids.org" in obj_url.url) and ("poster" in obj_url.url):
# Sleep required? To avoid too many requests error (original URL, not paywall bypassing endpoint)
url_host_slowdown(obj_url.url, url_host_slowdown_seconds=float(os.getenv("FETCHER_URL_HOST_SLEEP", 5)))
try: try:
# Request # Verify missing kid URL
r = requests.get(obj_url.url, allow_redirects=True) results = verify_missing_kid_url(obj_url.url)
except Exception as e: except Exception as e:
if (raise_exception_on_error): if (raise_exception_on_error):
# Simply raise exception, handled in a different way # Simply raise exception, handled in a different way
@@ -118,20 +117,16 @@ class DB_Handler():
# Set status to error # Set status to error
self._set_status(obj_url, Urls.STATUS_ENUM.ERROR) self._set_status(obj_url, Urls.STATUS_ENUM.ERROR)
return return
if (r.url != obj_url.url): if (results.get("status") == "valid"):
# Canonical self._set_status(obj_url, Urls.STATUS_ENUM.VALID)
url_canonical = r.url elif (results.get("status") == "invalid"):
# Set duplicate, and insert new canonical form
self._set_duplicate_and_insert_canonical(obj_url, url_canonical)
elif (r.status_code == 200):
# Not enough to determine if it is valid. Need to wait to finish javascript, it might redirect to 404
# self._set_status(obj_url, Urls.STATUS_ENUM.VALID)
self._set_status(obj_url, Urls.STATUS_ENUM.UNKNOWN)
elif (r.status_code == 404):
self._set_status(obj_url, Urls.STATUS_ENUM.INVALID) self._set_status(obj_url, Urls.STATUS_ENUM.INVALID)
else: elif (results.get("status") == "duplicate"):
logger.debug("Unknown request status: {} for missing kids request: {}".format(r.status_code, obj_url.url)) self._set_duplicate_and_insert_canonical(obj_url, results.get("redirection"))
elif (results.get("status") == "unknown"):
# Nothing to do, not sure about it...
logger.info("Missing kid verification returned unknown for URL: {}".format(obj_url.url))
self._set_status(obj_url, Urls.STATUS_ENUM.UNKNOWN) self._set_status(obj_url, Urls.STATUS_ENUM.UNKNOWN)
return return
########################################################################## ##########################################################################
@@ -147,7 +142,7 @@ class DB_Handler():
try: try:
# Extract URL content # Extract URL content
dict_url_data = process_url(obj_url.url, paywall_bypass) dict_url_data = process_url(obj_url.url, paywall_bypass, request_timeout)
except Exception as e: except Exception as e:
if (raise_exception_on_error): if (raise_exception_on_error):
# Simply raise exception, handled in a different way # Simply raise exception, handled in a different way
@@ -314,14 +309,20 @@ class DB_Handler():
# Per URL # Per URL
for obj_url in missingkids_urls: for obj_url in missingkids_urls:
try: try:
# Missing kids fetching endpoint, verify URL SELENIUM_BASED_MISSINGKID_VERIFICATION = False
missingkids_fetch_endpoint = os.path.join(os.getenv("SELENIUM_ENDPOINT", "http://localhost:80"), "verify_missing_kid/") if (SELENIUM_BASED_MISSINGKID_VERIFICATION):
data = {"url": obj_url.url} # Missing kids fetching endpoint, verify URL
# POST missingkids_fetch_endpoint = os.path.join(os.getenv("SELENIUM_ENDPOINT", "http://localhost:80"), "verify_missing_kid/")
r = requests.post(missingkids_fetch_endpoint, json=data, timeout=120) data = {"url": obj_url.url}
# Jsonify # POST
results = r.json() r = requests.post(missingkids_fetch_endpoint, json=data, timeout=120)
logger.debug("Selenium results for URL {}: {}".format(obj_url.url, str(results))) # Jsonify
results = r.json()
logger.debug("Missingkids Selenium results for URL {}: {}".format(obj_url.url, str(results)))
else:
# Verify
results = verify_missing_kid_url(obj_url.url)
logger.debug("Missingkids verify results for URL {}: {}".format(obj_url.url, str(results)))
if (results.get("status") == "valid"): if (results.get("status") == "valid"):
self._set_status(obj_url, Urls.STATUS_ENUM.VALID) self._set_status(obj_url, Urls.STATUS_ENUM.VALID)

View File

@@ -1,6 +1,7 @@
from .db_utils import DB_Handler from .db_utils import DB_Handler
from ..models import Search, Source from ..models import Search, Source
from .fetch_utils_url_processor import get_with_protocol, url_host_slowdown from .fetch_utils_url_processor import url_host_slowdown
from .utils import get_with_protocol
import newspaper import newspaper
import traceback import traceback
from .logger import get_logger from .logger import get_logger

View File

@@ -9,14 +9,6 @@ from urllib.parse import unquote
import langdetect import langdetect
langdetect.DetectorFactory.seed = 0 langdetect.DetectorFactory.seed = 0
def get_with_protocol(url):
# http:// -> https://
url = url.replace("http://", "https://")
# "" -> https://
if not (url.startswith("https://")):
url = "https://" + url
return url
def get_url_host(url): def get_url_host(url):
# URL no protocol, first substring before '/' # URL no protocol, first substring before '/'
url_host = url.replace("https://", "").replace("http://", "").split("/")[0] url_host = url.replace("https://", "").replace("http://", "").split("/")[0]
@@ -39,7 +31,48 @@ def url_host_slowdown(url, url_host_slowdown_seconds):
# About to process URL host, cache time # About to process URL host, cache time
cache.set("process_{}".format(url_host).encode("utf-8"), time.time(), timeout=60*5) # Expire after 5 minutes cache.set("process_{}".format(url_host).encode("utf-8"), time.time(), timeout=60*5) # Expire after 5 minutes
def process_url(url, paywall_bypass=False):
def verify_missing_kid_url(url):
# Sleep required? To avoid too many requests error
url_host_slowdown(url, url_host_slowdown_seconds=float(os.getenv("FETCHER_URL_HOST_SLEEP", 5)))
# Request, get redirection
r = requests.get(url, allow_redirects=True)
# Redirection?
if (url != r.url):
url_redirection = r.url
return {"status": "duplicate", "redirection": url_redirection}
# Sample URL: "https://www.missingkids.org/poster/NCMC/2058896/1"
org_prefix, case_num = url.split("/")[-3], url.split("/")[-2]
# Fill details to API endpoint
base_url = "https://www.missingkids.org/bin/ncmecEndpoint?action=childDetail&orgPrefix={}&caseNum={}"
url_endpoint = base_url.format(org_prefix, case_num)
# Cache timeout missingkids.org
time.sleep(0.25)
# Request
r = requests.get(url_endpoint)
# Analyze status code and status result
if (r.status_code == 200):
r_json = r.json()
# Valid poster
if (r_json.get("status") == "success"):
return {"status": "valid"}
# Invalid poster
elif (r_json.get("status") == "error"):
return {"status": "invalid"}
else:
# ?
logger.info("Unknown json status: {} when verifying missing kid: {}".format(str(r_json), url))
return {"status": "unknown"}
else:
# Error status code
logger.info("Unknown request status: {} when verifying missing kid: {}".format(r.status_code, url))
return {"status": "unknown"}
def process_url(url, paywall_bypass=False, request_timeout=15):
if (paywall_bypass): if (paywall_bypass):
# TODO: Implement self-hosted instance # TODO: Implement self-hosted instance
@@ -58,7 +91,7 @@ def process_url(url, paywall_bypass=False):
# Process # Process
if ("foxnews.com" in url_of_interest) or ("zerohedge" in url_of_interest): if ("foxnews.com" in url_of_interest) or ("zerohedge" in url_of_interest):
# Request # Request
r = requests.get(url, headers={"User-Agent": user_agent}) r = requests.get(url, headers={"User-Agent": user_agent}, timeout=request_timeout)
# Raise for error code # Raise for error code
r.raise_for_status() r.raise_for_status()
# Parse # Parse
@@ -67,8 +100,10 @@ def process_url(url, paywall_bypass=False):
# Config: Fake user agent # Config: Fake user agent
config = newspaper.configuration.Configuration() config = newspaper.configuration.Configuration()
config.headers = {'User-Agent': user_agent} config.headers = {'User-Agent': user_agent}
config.request_timeout = request_timeout
# Default mode # Default mode
article = newspaper.article(url_of_interest, config=config) article = newspaper.article(url_of_interest, config=config)
except newspaper.ArticleBinaryDataException: except newspaper.ArticleBinaryDataException:
logger.warning("ArticleException for input URL {}".format(url)) logger.warning("ArticleException for input URL {}".format(url))
return {"override_status": "invalid"} return {"override_status": "invalid"}
@@ -106,7 +141,7 @@ def process_url(url, paywall_bypass=False):
# Try simple request, valid response but couldn't parse article? e.g. getting blocked? -> unknown # Try simple request, valid response but couldn't parse article? e.g. getting blocked? -> unknown
time.sleep(0.25) time.sleep(0.25)
r = requests.get(url_of_interest) r = requests.get(url_of_interest, timeout=request_timeout)
if (r.status_code == 200): if (r.status_code == 200):
return {"override_status": "unknown"} return {"override_status": "unknown"}
else: else:
@@ -117,7 +152,7 @@ def process_url(url, paywall_bypass=False):
except Exception as e: except Exception as e:
logger.warning("Exception for input URL {}\n{}".format(url, str(e))) logger.warning("Exception for input URL {}\n{}".format(url, str(e)))
return None return None
# Not a valid URL? # Not a valid URL?
if (not article.is_valid_url()): if (not article.is_valid_url()):
logger.debug("Invalid URL found: {}".format(url)) logger.debug("Invalid URL found: {}".format(url))

View File

@@ -4,54 +4,150 @@ from ..models import Urls, Source, Search, UrlContent, UrlsSourceSearch, UrlsDup
from django.db.models import Count from django.db.models import Count
import requests import requests
import os import os
import traceback
from .logger import get_logger
logger = get_logger()
def notify_telegram_info(last_hours, channel="INFO"):
try:
start_date = timezone.now() - timedelta(hours=last_hours)
# Count the number of URLs grouped by status within the date range
urls_data_status = Urls.objects.filter(ts_fetch__gte=start_date) \
.values('status') \
.annotate(count=Count('id')) \
.order_by('status')
# Count the number of URLs grouped by source
urls_data_source = UrlsSourceSearch.objects \
.filter(id_url__ts_fetch__gte=start_date) \
.values('id_source__source') \
.annotate(count=Count('id_url')) \
.order_by('id_source__source')
# Count the number of URLs grouped by search
urls_data_search = UrlsSourceSearch.objects \
.filter(id_url__ts_fetch__gte=start_date) \
.values('id_search__search') \
.annotate(count=Count('id_url')) \
.order_by('id_search__search')
def notify_telegram(last_hours=24): bot_token = os.environ.get("TELEGRAM_{}_BOT_TOKEN".format(channel), "")
start_date = timezone.now() - timedelta(hours=last_hours) chat_id = os.environ.get("TELEGRAM_{}_CHAT_ID".format(channel), "")
# Count the number of URLs grouped by status within the date range message = "During the last {} hours:\n".format(last_hours)
urls_data_status = Urls.objects.filter(ts_fetch__gte=start_date) \
.values('status') \ message += "\nURLs per status:\n"
.annotate(count=Count('id')) \ for o in urls_data_status:
.order_by('status') message += " {}: {}\n".format(o.get("status"), o.get("count"))
message += "\nURLs per source:\n"
# Count the number of URLs grouped by source for o in urls_data_source:
urls_data_source = UrlsSourceSearch.objects \ message += " {}: {}\n".format(o.get("id_source__source"), o.get("count"))
.filter(id_url__ts_fetch__gte=start_date) \ message += "\nURLs per search:\n"
.values('id_source__source') \ for o in urls_data_search:
.annotate(count=Count('id_url')) \ message += " {}: {}\n".format(o.get("id_search__search"), o.get("count"))
.order_by('id_source__source')
# Count the number of URLs grouped by search
urls_data_search = UrlsSourceSearch.objects \
.filter(id_url__ts_fetch__gte=start_date) \
.values('id_search__search') \
.annotate(count=Count('id_url')) \
.order_by('id_search__search')
bot_token = os.environ.get("TELEGRAM_BOT_TOKEN", "") url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
chat_id = os.environ.get("TELEGRAM_CHAT_ID", "") params = {
"chat_id": chat_id,
"text": message
}
# POST
response = requests.post(url, params=params)
except Exception as e:
logger.info("Exception while notifying status: {}\n{}".format(str(e), traceback.format_exc()))
message = "During the last {} hours:\n".format(last_hours) def notify_telegram_warning(last_hours, channel="WARNING"):
try:
message += "\nURLs per status:\n" # Message appending logic
for o in urls_data_status: message = ""
message += " {}: {}\n".format(o.get("status"), o.get("count"))
message += "\nURLs per source:\n" start_date = timezone.now() - timedelta(hours=last_hours)
for o in urls_data_source:
message += " {}: {}\n".format(o.get("id_source__source"), o.get("count")) # Count the number of URLs grouped by status within the date range
message += "\nURLs per search:\n" urls_data_status = Urls.objects.filter(ts_fetch__gte=start_date) \
for o in urls_data_search: .values('status') \
message += " {}: {}\n".format(o.get("id_search__search"), o.get("count")) .annotate(count=Count('id')) \
.order_by('status')
# Build dictionary
urls_data_status_dict = {}
for o in urls_data_status:
# #STATUS
urls_data_status_dict[o.get("status")] = o.get("count")
# #TOTAL
urls_data_status_dict["total"] = urls_data_status_dict.get("total", 0) + o.get("count")
MINIMUM_URLS_THRESHOLD = 10
MINIMUM_PROCESSED_URLS_RATIO = 0.7
# Minimum amount of URLs
if (urls_data_status_dict.get("total") < MINIMUM_URLS_THRESHOLD):
message += "WARNING - Total #URLS during the last {} hours: {}\n".format(last_hours, urls_data_status_dict.get("total"))
message += "\nURLs per status:\n"
for o in urls_data_status:
message += " {}: {}\n".format(o.get("status"), o.get("count"))
# Minimum ratio of processed raw urls
if (urls_data_status_dict.get("total") > 0):
if (urls_data_status_dict.get("raw", 0) / urls_data_status_dict.get("total") >= MINIMUM_PROCESSED_URLS_RATIO):
message += "WARNING - Small ratio of processed raw URLs during the last {} hours: {}\n".format(last_hours, urls_data_status_dict.get("total"))
message += "\nURLs per status:\n"
for o in urls_data_status:
message += " {}: {}\n".format(o.get("status"), o.get("count"))
url = f"https://api.telegram.org/bot{bot_token}/sendMessage" # Count the number of URLs grouped by source
params = { urls_data_source = UrlsSourceSearch.objects \
"chat_id": chat_id, .filter(id_url__ts_fetch__gte=start_date) \
"text": message .values('id_source__source') \
} .annotate(count=Count('id_url')) \
.order_by('id_source__source')
MINIMUM_SOURCES = 3
if (len(urls_data_source) < MINIMUM_SOURCES):
message += "WARNING - Very few sources found URLs during the last {} hours".format(last_hours)
message += "\nURLs per source:\n"
for o in urls_data_source:
message += " {}: {}\n".format(o.get("id_source__source"), o.get("count"))
# POST """
response = requests.post(url, params=params) # TODO: URLs per search, key should be present for cnbc.com, foxnews.com, zerohedge.com, breitbart.com, child abuse, child neglect
# Count the number of URLs grouped by search
urls_data_search = UrlsSourceSearch.objects \
.filter(id_url__ts_fetch__gte=start_date) \
.values('id_search__search') \
.annotate(count=Count('id_url')) \
.order_by('id_search__search')
message += "\nURLs per search:\n"
for o in urls_data_search:
message += " {}: {}\n".format(o.get("id_search__search"), o.get("count"))
"""
# Valid message body?
if (message != ""):
bot_token = os.environ.get("TELEGRAM_{}_BOT_TOKEN".format(channel), "")
chat_id = os.environ.get("TELEGRAM_{}_CHAT_ID".format(channel), "")
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
params = {
"chat_id": chat_id,
"text": message
}
# POST
response = requests.post(url, params=params)
except Exception as e:
logger.info("Exception while notifying status: {}\n{}".format(str(e)), traceback.format_exc())
def notify_telegram(last_hours=12):
# INFO
notify_telegram_info(last_hours, channel="INFO")
# WARNING
notify_telegram_warning(last_hours, channel="WARNING")

View File

@@ -0,0 +1,8 @@
def get_with_protocol(url):
# http:// -> https://
url = url.replace("http://", "https://")
# "" -> https://
if not (url.startswith("https://")):
url = "https://" + url
return url

View File

@@ -13,13 +13,30 @@ from .src.logger import get_logger
logger = get_logger() logger = get_logger()
@shared_task(queue='light')
def process_raw_urls(batch_size=100):
task = "Process raw URLs"
logger.info("Task triggered: {}".format(task))
DB_Handler().process_raw_urls(batch_size=batch_size)
logger.info("Task completed: {}".format(task))
@shared_task(queue='default') @shared_task(queue='default')
def process_error_urls(batch_size=50):
task = "Process error URLs"
logger.info("Task triggered: {}".format(task))
DB_Handler().process_error_urls(batch_size=batch_size)
logger.info("Task completed: {}".format(task))
@shared_task(queue='light')
def fetch_feeds(): def fetch_feeds():
task = "Fetch Feeds" task = "Fetch Feeds"
logger.info("Task triggered: {}".format(task)) logger.info("Task triggered: {}".format(task))
FetchFeeds().run() FetchFeeds().run()
logger.info("Task completed: {}".format(task)) logger.info("Task completed: {}".format(task))
@shared_task(queue='default') @shared_task(queue='default')
def fetch_parser(): def fetch_parser():
task = "Fetch Parser" task = "Fetch Parser"
@@ -34,41 +51,31 @@ def fetch_search():
FetchSearcher().run() FetchSearcher().run()
logger.info("Task completed: {}".format(task)) logger.info("Task completed: {}".format(task))
@shared_task(queue='low')
@shared_task(queue='heavy')
def fetch_selenium_search(): def fetch_selenium_search():
task = "Fetch Selenium search" task = "Fetch Selenium search"
logger.info("Task triggered: {}".format(task)) logger.info("Task triggered: {}".format(task))
FetchSeleniumSourceSearch().run() FetchSeleniumSourceSearch().run()
logger.info("Task completed: {}".format(task)) logger.info("Task completed: {}".format(task))
@shared_task(queue='low') @shared_task(queue='heavy')
def fetch_missing_kids(number_pages=5): def fetch_missing_kids(number_pages=5):
task = "Fetch MissingKids" task = "Fetch MissingKids"
logger.info("Task triggered: {}".format(task)) logger.info("Task triggered: {}".format(task))
FetchMissingKids().run(number_pages) FetchMissingKids().run(number_pages)
logger.info("Task completed: {}".format(task)) logger.info("Task completed: {}".format(task))
@shared_task(queue='default') @shared_task(queue='heavy')
def process_raw_urls(batch_size=100):
task = "Process raw URLs"
logger.info("Task triggered: {}".format(task))
DB_Handler().process_raw_urls(batch_size=batch_size)
logger.info("Task completed: {}".format(task))
@shared_task(queue='default')
def process_error_urls(batch_size=50):
task = "Process error URLs"
logger.info("Task triggered: {}".format(task))
DB_Handler().process_error_urls(batch_size=batch_size)
logger.info("Task completed: {}".format(task))
@shared_task(queue='low')
def process_missing_kids_urls(batch_size=None, process_status_only=None): def process_missing_kids_urls(batch_size=None, process_status_only=None):
task = "Process Missing Kids URLs - batch_size={} process_status_only={}".format(batch_size, process_status_only) task = "Process Missing Kids URLs - batch_size={} process_status_only={}".format(batch_size, process_status_only)
logger.info("Task triggered: {}".format(task)) logger.info("Task triggered: {}".format(task))
DB_Handler().process_missing_kids_urls(batch_size=batch_size, process_status_only=process_status_only) DB_Handler().process_missing_kids_urls(batch_size=batch_size, process_status_only=process_status_only)
logger.info("Task completed: {}".format(task)) logger.info("Task completed: {}".format(task))
@shared_task(queue='default') @shared_task(queue='default')
def clean_old_url_content(older_than_days=14): def clean_old_url_content(older_than_days=14):
task = "Clean old URL content" task = "Clean old URL content"
@@ -76,7 +83,7 @@ def clean_old_url_content(older_than_days=14):
DB_Handler().clean_old_url_content(older_than_days=older_than_days) DB_Handler().clean_old_url_content(older_than_days=older_than_days)
logger.info("Task completed: {}".format(task)) logger.info("Task completed: {}".format(task))
@shared_task(queue='default') @shared_task(queue='light')
def notify_status(): def notify_status():
task = "Notify status" task = "Notify status"
logger.info("Task triggered: {}".format(task)) logger.info("Task triggered: {}".format(task))

View File

@@ -16,14 +16,10 @@ def trigger_task(request, task):
def link_list(request): def link_list(request):
# Base URL path # Base URL path
app_url = request.build_absolute_uri() app_url = request.build_absolute_uri()
# Tasks
links_fetch = ["fetch_feeds", "fetch_parser", "fetch_search", "fetch_missingkids_5", "fetch_missingkids_all", "fetch_selenium_search"]
links_process = ["process_raw_urls_50", "process_error_urls_50", "process_missing_kids_urls_50", "process_missing_kids_urls_valid_all", "process_missing_kids_urls_invalid_all", "process_missing_kids_urls_unknown_all", "process_missing_kids_urls_all", "clean_old_url_content_60"]
# List of links # List of links
list_links = \ list_links = \
[ os.path.join(app_url, "admin"), os.path.join(app_url, "urls") ] + \ [ os.path.join(app_url, "admin"), os.path.join(app_url, "urls") ] + \
[ os.path.join(app_url, "logs", log_type) for log_type in ["database", "debug", "info", "warning", "server", "beat", "worker_default", "worker_low"] ] #+ \ [ os.path.join(app_url, "logs", log_type) for log_type in ["database", "debug", "info", "warning", "server", "beat", "worker_light", "worker_default", "worker_heavy"] ]
#[ os.path.join(app_url, "task", l) for l in links_fetch + links_process ]
# Links tuple # Links tuple
links = [(l, l) for l in list_links] links = [(l, l) for l in list_links]

View File

@@ -17,19 +17,20 @@
"cnbc.com" "cnbc.com"
], ],
"keyword_search": [ "keyword_search": [
"child abuse" "child abuse",
"child neglect"
] ]
}, },
"REGEX_PATTERN_STATUS_PRIORITY": [ "REGEX_PATTERN_STATUS_PRIORITY": [
[".*(youtube|tiktok|twitter|reddit)\\.com\\/.*", "invalid", 50], [".*(youtube|tiktok|twitter|reddit)\\.com\\/.*", "invalid", 50],
["https:\\/\\/x.com\\/.*", "invalid", 50], ["https:\\/\\/x.com\\/.*", "invalid", 50],
[".*cnbc\\.com\\/(video|quotes)\\/.*", "invalid", 75], [".*cnbc\\.com\\/(video|quotes)\\/.*", "invalid", 75],
[".*foxnews\\.com\\/(video|category|person|books|html-sitemap)\\/.*", "invalid", 75],
[".*radio\\.foxnews\\.com\\/.*", "invalid", 75],
[".*breitbart\\.com\\/(tag|author)\\/.*", "invalid", 75], [".*breitbart\\.com\\/(tag|author)\\/.*", "invalid", 75],
[".*breitbart\\.com\\/(economy|entertainment|border|crime|clips)\\/.*", "valid", 50],
[".*zerohedge\\.com\\/(user|contributors)\\/.*", "invalid", 75], [".*zerohedge\\.com\\/(user|contributors)\\/.*", "invalid", 75],
[".*zerohedge\\.com\\/(economics|political|markets|)\\/.*", "valid", 50], [".*zerohedge\\.com\\/(economics|political|markets|)\\/.*", "valid", 50],
[".*breitbart\\.com\\/(economy|entertainment|border|crime|clips)\\/.*", "valid", 50], [".*radio\\.foxnews\\.com\\/.*", "invalid", 75],
[".*foxnews\\.com\\/(video|category|person|books|html-sitemap)\\/.*", "invalid", 75],
[".*foxnews\\.com\\/(lifestyle|opinion|sports|world)\\/.*", "valid", 50], [".*foxnews\\.com\\/(lifestyle|opinion|sports|world)\\/.*", "valid", 50],
[".*foxnews\\.com\\/[^\\/]+\\/?$", "invalid", 25] [".*foxnews\\.com\\/[^\\/]+\\/?$", "invalid", 25]
] ]

View File

@@ -160,7 +160,7 @@
"expire_seconds": null, "expire_seconds": null,
"one_off": false, "one_off": false,
"start_time": null, "start_time": null,
"enabled": false, "enabled": true,
"last_run_at": null, "last_run_at": null,
"total_run_count": 0, "total_run_count": 0,
"date_changed": "2025-07-17T16:20:19.969Z", "date_changed": "2025-07-17T16:20:19.969Z",
@@ -188,7 +188,7 @@
"expire_seconds": null, "expire_seconds": null,
"one_off": false, "one_off": false,
"start_time": null, "start_time": null,
"enabled": false, "enabled": true,
"last_run_at": null, "last_run_at": null,
"total_run_count": 0, "total_run_count": 0,
"date_changed": "2025-07-17T16:21:30.809Z", "date_changed": "2025-07-17T16:21:30.809Z",
@@ -397,7 +397,7 @@
"fields": { "fields": {
"name": "Notify status", "name": "Notify status",
"task": "fetcher.tasks.notify_status", "task": "fetcher.tasks.notify_status",
"interval": 3, "interval": 4,
"crontab": null, "crontab": null,
"solar": null, "solar": null,
"clocked": null, "clocked": null,

View File

@@ -15,7 +15,7 @@ stdout_logfile_maxbytes=20MB
stdout_logfile_backups=1 stdout_logfile_backups=1
[program:beat] [program:beat]
command=celery -A core beat -l info --logfile=/opt/logs/beat.log command=celery -A core beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler --logfile=/opt/logs/beat.log
directory=/opt/app directory=/opt/app
autostart=true autostart=true
autorestart=true autorestart=true
@@ -40,14 +40,27 @@ redirect_stderr=true
stdout_logfile_maxbytes=20MB stdout_logfile_maxbytes=20MB
stdout_logfile_backups=1 stdout_logfile_backups=1
[program:worker_low] [program:worker_light]
command=celery -A core worker -l info --logfile=/opt/logs/worker_low.log --concurrency=1 -Q low -n low command=celery -A core worker -l info --logfile=/opt/logs/worker_light.log --concurrency=1 -Q light -n light
directory=/opt/app directory=/opt/app
autostart=true autostart=true
autorestart=true autorestart=true
; Unified log file ; Unified log file
stdout_logfile=/opt/logs/worker_low.log stdout_logfile=/opt/logs/worker_light.log
stderr_logfile=/opt/logs/worker_low.log stderr_logfile=/opt/logs/worker_light.log
redirect_stderr=true
; Rotate when file reaches max size
stdout_logfile_maxbytes=20MB
stdout_logfile_backups=1
[program:worker_heavy]
command=celery -A core worker -l info --logfile=/opt/logs/worker_heavy.log --concurrency=1 -Q heavy -n heavy
directory=/opt/app
autostart=true
autorestart=true
; Unified log file
stdout_logfile=/opt/logs/worker_heavy.log
stderr_logfile=/opt/logs/worker_heavy.log
redirect_stderr=true redirect_stderr=true
; Rotate when file reaches max size ; Rotate when file reaches max size
stdout_logfile_maxbytes=20MB stdout_logfile_maxbytes=20MB

View File

@@ -43,8 +43,10 @@ services:
- DB_PASSWORD=${DB_PASSWORD} - DB_PASSWORD=${DB_PASSWORD}
- DB_HOST=${DB_HOST} - DB_HOST=${DB_HOST}
- DB_PORT=${DB_PORT} - DB_PORT=${DB_PORT}
- REDIS_HOST=${REDIS_HOST} - REDIS_CACHE_HOST=${REDIS_CACHE_HOST}
- REDIS_PORT=${REDIS_PORT} - REDIS_CACHE_PORT=${REDIS_CACHE_PORT}
- REDIS_CELERY_HOST=${REDIS_CELERY_HOST}
- REDIS_CELERY_PORT=${REDIS_CELERY_PORT}
# Job timeout: 30 min # Job timeout: 30 min
- JOB_DEFAULT_TIMEOUT=${JOB_DEFAULT_TIMEOUT} - JOB_DEFAULT_TIMEOUT=${JOB_DEFAULT_TIMEOUT}
# Fetcher # Fetcher
@@ -64,22 +66,32 @@ services:
- PEXELS_API_KEY=${PEXELS_API_KEY} - PEXELS_API_KEY=${PEXELS_API_KEY}
- OLLAMA_MODEL_DEFAULT=${OLLAMA_MODEL_DEFAULT} - OLLAMA_MODEL_DEFAULT=${OLLAMA_MODEL_DEFAULT}
# Telegram # Telegram
- TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN} - TELEGRAM_INFO_BOT_TOKEN=${TELEGRAM_INFO_BOT_TOKEN}
- TELEGRAM_CHAT_ID=${TELEGRAM_CHAT_ID} - TELEGRAM_INFO_CHAT_ID=${TELEGRAM_INFO_CHAT_ID}
- TELEGRAM_WARNING_BOT_TOKEN=${TELEGRAM_WARNING_BOT_TOKEN}
- TELEGRAM_WARNING_CHAT_ID=${TELEGRAM_WARNING_CHAT_ID}
######################## ########################
ports: ports:
- 8000 - 8000
depends_on: depends_on:
- fetcher_db - fetcher_db
- fetcher_redis - fetcher_redis_cache
- fetcher_redis_celery
- fetcher_app_selenium - fetcher_app_selenium
dns: dns:
- 1.1.1.1 - 1.1.1.1
- 1.0.0.1 - 1.0.0.1
fetcher_redis: fetcher_redis_cache:
image: redis:alpine image: redis:alpine
container_name: fetcher_redis container_name: fetcher_redis_cache
restart: unless-stopped
ports:
- 6379
fetcher_redis_celery:
image: redis:alpine
container_name: fetcher_redis_celery
restart: unless-stopped restart: unless-stopped
ports: ports:
- 6379 - 6379
@@ -94,6 +106,7 @@ services:
ports: ports:
- 5555 - 5555
environment: environment:
- CELERY_BROKER_URL=redis://fetcher_redis:6379/0 - CELERY_BROKER_URL=redis://fetcher_redis_celery:6379/0
- FLOWER_UNAUTHENTICATED_API=true
depends_on: depends_on:
- fetcher_redis - fetcher_redis_celery

View File

@@ -52,12 +52,19 @@ services:
#volumes: # Persistent DB? #volumes: # Persistent DB?
# - ./postgres:/var/lib/postgresql/data # - ./postgres:/var/lib/postgresql/data
fetcher_redis: fetcher_redis_cache:
extends: extends:
file: docker-compose-base.yml file: docker-compose-base.yml
service: fetcher_redis service: fetcher_redis_cache
ports: ports:
- 6379:6379 - 6379
fetcher_redis_celery:
extends:
file: docker-compose-base.yml
service: fetcher_redis_celery
ports:
- 6379
fetcher_flower: fetcher_flower:
extends: extends:

View File

@@ -47,12 +47,19 @@ services:
autossh -M 15885 -N -L 0.0.0.0:5432:127.0.0.1:5432 ${REMOTE_USERNAME}@${REMOTE_HOST} autossh -M 15885 -N -L 0.0.0.0:5432:127.0.0.1:5432 ${REMOTE_USERNAME}@${REMOTE_HOST}
# autossh -M 15885 -N -o 'GatewayPorts yes' -L 0.0.0.0:5432:127.0.0.1:5432 ${REMOTE_USERNAME}@${REMOTE_HOST} # autossh -M 15885 -N -o 'GatewayPorts yes' -L 0.0.0.0:5432:127.0.0.1:5432 ${REMOTE_USERNAME}@${REMOTE_HOST}
fetcher_redis: fetcher_redis_cache:
extends: extends:
file: docker-compose-base.yml file: docker-compose-base.yml
service: fetcher_redis service: fetcher_redis_cache
ports: ports:
- 6379:6379 - 6379
fetcher_redis_celery:
extends:
file: docker-compose-base.yml
service: fetcher_redis_celery
ports:
- 6379
fetcher_flower: fetcher_flower:
extends: extends:

View File

@@ -14,7 +14,6 @@
"import json\n", "import json\n",
"import csv\n", "import csv\n",
"\n", "\n",
"\n",
"headers = {\"User-Agent\": \"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36\"}" "headers = {\"User-Agent\": \"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36\"}"
] ]
}, },
@@ -329,13 +328,22 @@
" main()" " main()"
] ]
}, },
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{ {
"cell_type": "code", "cell_type": "code",
"execution_count": null, "execution_count": null,
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
"df = pd.read_csv(\"scholenopdekaart.csv\", index_col=0)\n", "import pandas as pd\n",
"\n",
"df = pd.read_csv(\"~/Downloads/scholenopdekaart.csv\", index_col=0)\n",
"\n", "\n",
"df.head()" "df.head()"
] ]
@@ -346,13 +354,101 @@
"metadata": {}, "metadata": {},
"outputs": [], "outputs": [],
"source": [ "source": [
"df.tail()" "def to_dict(row):\n",
" # Empty?\n",
" if (pd.isna(row)):\n",
" return {}\n",
" # Evaluate, to dict\n",
" dict_data = dict(eval(row))\n",
" # Remove None values\n",
" for k in list(dict_data.keys()):\n",
" if dict_data[k] is None:\n",
" del dict_data[k]\n",
" # Prefix\n",
" return {f\"{column}_{k}\": v for k, v in dict_data.items()}\n",
"\n",
"for column in [\"students_per_year_trend\", \"num_students_per_group\", \"num_students_per_age\"]:\n",
" print(column)\n",
" # Convert the list of tuples into a dictionary per row\n",
" df_dicts = df[column].apply(to_dict)\n",
" # Expand into separate columns\n",
" df_expanded = pd.json_normalize(df_dicts)\n",
" # Sort\n",
" df_expanded = df_expanded[sorted(df_expanded.columns)]\n",
" # Combine with original columns\n",
" df = pd.concat([df.drop(columns=[column]), df_expanded], axis=1)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def to_dict(row):\n",
" # Empty?\n",
" if (pd.isna(row)):\n",
" return {}\n",
" # Evaluate, to dict\n",
" data = eval(row)\n",
" # Remove first useless data\n",
" data = data[1:]\n",
"\n",
" # Generate dict\n",
" dict_data = {}\n",
" for (zipcode, num, percentage) in data:\n",
" dict_data[f\"num_students_zipcode_{zipcode}\"] = num\n",
" dict_data[f\"percentage_students_zipcode_{zipcode}\"] = percentage\n",
"\n",
" # Remove None values\n",
" for k in list(dict_data.keys()):\n",
" if dict_data[k] is None:\n",
" del dict_data[k]\n",
" return dict_data\n",
"\n",
"for column in [\"students_per_zipcode\"]:\n",
" print(column)\n",
" # Convert the list of tuples into a dictionary per row\n",
" df_dicts = df[column].apply(to_dict)\n",
" # Expand into separate columns\n",
" df_expanded = pd.json_normalize(df_dicts)\n",
" # Sort\n",
" df_expanded = df_expanded[sorted(df_expanded.columns)]\n",
" # Combine with original columns\n",
" df = pd.concat([df.drop(columns=[column]), df_expanded], axis=1)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df.to_csv(\"schools_nl.csv\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"list(df.columns)"
] ]
} }
], ],
"metadata": { "metadata": {
"kernelspec": { "kernelspec": {
"display_name": "matitos_urls", "display_name": "fetcher",
"language": "python", "language": "python",
"name": "python3" "name": "python3"
}, },
@@ -366,7 +462,7 @@
"name": "python", "name": "python",
"nbconvert_exporter": "python", "nbconvert_exporter": "python",
"pygments_lexer": "ipython3", "pygments_lexer": "ipython3",
"version": "3.12.9" "version": "3.12.11"
} }
}, },
"nbformat": 4, "nbformat": 4,