25 Commits

Author SHA1 Message Date
Luciano Gervasoni
cbc422df36 Notify status bot token 2025-10-16 10:38:32 +02:00
Luciano Gervasoni
dc784dabec Notify status schedule 2025-10-16 10:12:17 +02:00
Luciano Gervasoni
d8ef738d19 Notifier fix 2025-10-14 13:10:54 +02:00
Luciano Gervasoni
2f035a4222 Notifier fix 2025-10-14 12:23:05 +02:00
Luciano Gervasoni
e057568af0 Telegram bot tokens 2025-10-14 11:36:19 +02:00
Luciano Gervasoni
7924857fe5 Schools NL tuples, traceback on notify err 2025-10-14 11:33:17 +02:00
Luciano Gervasoni
f44b784715 Notifications, info and warning, try catch 2025-09-09 22:06:23 +02:00
Luciano Gervasoni
24510d26e2 Notifications, info and warning 2025-09-08 17:55:03 +02:00
Luciano Gervasoni
ef51a96db6 Process missing kids url based on API endpoint, fix2 2025-09-08 16:20:39 +02:00
Luciano Gervasoni
079b2473f8 Process missing kids url based on API endpoint 2025-09-08 16:12:27 +02:00
Luciano Gervasoni
1fbc5beb6e URL Logs 2025-09-08 12:45:53 +02:00
Luciano Gervasoni
7886d16264 Flower allow API handling 2025-09-08 12:44:48 +02:00
Luciano Gervasoni
2ed86e31ec Workers light,default,heavy 2025-09-08 12:34:47 +02:00
Luciano Gervasoni
892fb984d1 Debug enlarge 2025-09-05 14:18:33 +02:00
Luciano Gervasoni
c17f09a94f Debug enlarge 2025-09-05 14:06:10 +02:00
Luciano Gervasoni
e4a325d6b4 Request timeout debugging 2025-09-05 14:00:50 +02:00
Luciano Gervasoni
2fae0a3a9d Request timeout 2025-09-05 13:52:34 +02:00
Luciano Gervasoni
35f9260b94 Debug process raw url 2025-09-05 13:45:16 +02:00
Luciano Gervasoni
b40611bd3e Flower port update 2025-09-04 09:26:44 +02:00
Luciano Gervasoni
346d7c9187 Debug workers 2025-09-04 09:04:04 +02:00
Luciano Gervasoni
a21ff9c5d6 Celery scheduler DB based 2025-09-04 08:46:04 +02:00
Luciano Gervasoni
7b0d24309c Redis cache and celery, avoid overflow (3) 2025-09-03 23:20:56 +02:00
Luciano Gervasoni
334062b0ec Redis cache and celery, avoid overflow 2025-09-03 23:18:43 +02:00
Luciano Gervasoni
a9074f45b5 Redis cache and celery, avoid overflow 2025-09-03 23:07:03 +02:00
Luciano Gervasoni
569e7d4676 Disable fetch missing kids all 2025-08-28 11:23:47 +02:00
16 changed files with 439 additions and 148 deletions

View File

@@ -24,8 +24,10 @@ DB_PASSWORD=supermatitos
DB_USER=supermatitos
DB_HOST=fetcher_db
DB_PORT=5432
REDIS_HOST=fetcher_redis
REDIS_PORT=6379
REDIS_CACHE_HOST=fetcher_redis_cache
REDIS_CACHE_PORT=6379
REDIS_CELERY_HOST=fetcher_redis_celery
REDIS_CELERY_PORT=6379
# Job timeout: 30 min
JOB_DEFAULT_TIMEOUT=1800
@@ -55,3 +57,9 @@ PEXELS_API_KEY=Y6clJkY32eihf34ukX4JsINYu9lzxh3xDdNq2HMAmGwXp0a0tt6vr6S9
# Ollama
ENDPOINT_OLLAMA=https://ollamamodelnpu.matitos.org
OLLAMA_MODEL_DEFAULT=qwen2.5-instruct:3b
# Telegram
TELEGRAM_INFO_BOT_TOKEN="..."
TELEGRAM_INFO_CHAT_ID="..."
TELEGRAM_WARNING_BOT_TOKEN="..."
TELEGRAM_WARNING_CHAT_ID="..."

View File

@@ -97,9 +97,10 @@ DATABASES = {
CACHES = {
"default": {
"BACKEND": "django_redis.cache.RedisCache",
"LOCATION": "redis://{}:{}".format(
os.environ.get("REDIS_HOST", "localhost"),
os.environ.get("REDIS_PORT", 6379)
"LOCATION": "redis://{}:{}/{}".format(
os.environ.get("REDIS_CACHE_HOST", "localhost"),
os.environ.get("REDIS_CACHE_PORT", 6379),
2 # DB for Caching
),
"OPTIONS": {
"MEMCACHE_MAX_KEY_LENGTH": 2048,
@@ -108,13 +109,14 @@ CACHES = {
}
}
# Celery configuration
CELERY_BROKER_URL = 'redis://{}:{}/{}'.format(os.environ.get("REDIS_HOST", "localhost"), os.environ.get("REDIS_PORT", 6379), os.environ.get("REDIS_DB", 0))
CELERY_RESULT_BACKEND = 'redis://{}:{}/{}'.format(os.environ.get("REDIS_HOST", "localhost"), os.environ.get("REDIS_PORT", 6379), os.environ.get("REDIS_DB_RESULTS", 1))
CELERY_BROKER_URL = 'redis://{}:{}/{}'.format(os.environ.get("REDIS_CELERY_HOST", "localhost"), os.environ.get("REDIS_CELERY_PORT", 6379), 0)
CELERY_RESULT_BACKEND = 'redis://{}:{}/{}'.format(os.environ.get("REDIS_CELERY_HOST", "localhost"), os.environ.get("REDIS_CELERY_PORT", 6379), 1)
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_EXPIRES = 3600 # Auto clean results after 1 hour
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = "UTC"
# Celery Beat scheduler (required for django-celery-beat to work)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers.DatabaseScheduler'

View File

@@ -4,7 +4,8 @@ from django.core.cache import cache
from django.db import IntegrityError
from django.utils import timezone
from datetime import timedelta
from .fetch_utils_url_processor import process_url, get_with_protocol, url_host_slowdown
from .fetch_utils_url_processor import process_url, verify_missing_kid_url
from .utils import get_with_protocol
import re
import requests
import os
@@ -16,7 +17,7 @@ class DB_Handler():
def __init__(self):
pass
def insert_raw_urls(self, urls, obj_source, obj_search):
def insert_raw_urls(self, urls, obj_source, obj_search):
try:
logger.debug("Inserting raw URLs")
# Empty?
@@ -100,15 +101,13 @@ class DB_Handler():
# URLs duplciate association
UrlsDuplicate.objects.get_or_create(id_url_canonical=obj_url_canonical, id_url_duplicated=obj_url)
def _process_single_url(self, obj_url, status_pattern_match, raise_exception_on_error, paywall_bypass=False):
def _process_single_url(self, obj_url, status_pattern_match, raise_exception_on_error, paywall_bypass=False, request_timeout=15):
##########################################################################
# URL pattern: missingkids.org/poster OR missingkids.org/new-poster
if ("missingkids.org" in obj_url.url) and ("poster" in obj_url.url):
# Sleep required? To avoid too many requests error (original URL, not paywall bypassing endpoint)
url_host_slowdown(obj_url.url, url_host_slowdown_seconds=float(os.getenv("FETCHER_URL_HOST_SLEEP", 5)))
try:
# Request
r = requests.get(obj_url.url, allow_redirects=True)
# Verify missing kid URL
results = verify_missing_kid_url(obj_url.url)
except Exception as e:
if (raise_exception_on_error):
# Simply raise exception, handled in a different way
@@ -118,20 +117,16 @@ class DB_Handler():
# Set status to error
self._set_status(obj_url, Urls.STATUS_ENUM.ERROR)
return
if (r.url != obj_url.url):
# Canonical
url_canonical = r.url
# Set duplicate, and insert new canonical form
self._set_duplicate_and_insert_canonical(obj_url, url_canonical)
elif (r.status_code == 200):
# Not enough to determine if it is valid. Need to wait to finish javascript, it might redirect to 404
# self._set_status(obj_url, Urls.STATUS_ENUM.VALID)
self._set_status(obj_url, Urls.STATUS_ENUM.UNKNOWN)
elif (r.status_code == 404):
if (results.get("status") == "valid"):
self._set_status(obj_url, Urls.STATUS_ENUM.VALID)
elif (results.get("status") == "invalid"):
self._set_status(obj_url, Urls.STATUS_ENUM.INVALID)
else:
logger.debug("Unknown request status: {} for missing kids request: {}".format(r.status_code, obj_url.url))
elif (results.get("status") == "duplicate"):
self._set_duplicate_and_insert_canonical(obj_url, results.get("redirection"))
elif (results.get("status") == "unknown"):
# Nothing to do, not sure about it...
logger.info("Missing kid verification returned unknown for URL: {}".format(obj_url.url))
self._set_status(obj_url, Urls.STATUS_ENUM.UNKNOWN)
return
##########################################################################
@@ -147,7 +142,7 @@ class DB_Handler():
try:
# Extract URL content
dict_url_data = process_url(obj_url.url, paywall_bypass)
dict_url_data = process_url(obj_url.url, paywall_bypass, request_timeout)
except Exception as e:
if (raise_exception_on_error):
# Simply raise exception, handled in a different way
@@ -314,14 +309,20 @@ class DB_Handler():
# Per URL
for obj_url in missingkids_urls:
try:
# Missing kids fetching endpoint, verify URL
missingkids_fetch_endpoint = os.path.join(os.getenv("SELENIUM_ENDPOINT", "http://localhost:80"), "verify_missing_kid/")
data = {"url": obj_url.url}
# POST
r = requests.post(missingkids_fetch_endpoint, json=data, timeout=120)
# Jsonify
results = r.json()
logger.debug("Selenium results for URL {}: {}".format(obj_url.url, str(results)))
SELENIUM_BASED_MISSINGKID_VERIFICATION = False
if (SELENIUM_BASED_MISSINGKID_VERIFICATION):
# Missing kids fetching endpoint, verify URL
missingkids_fetch_endpoint = os.path.join(os.getenv("SELENIUM_ENDPOINT", "http://localhost:80"), "verify_missing_kid/")
data = {"url": obj_url.url}
# POST
r = requests.post(missingkids_fetch_endpoint, json=data, timeout=120)
# Jsonify
results = r.json()
logger.debug("Missingkids Selenium results for URL {}: {}".format(obj_url.url, str(results)))
else:
# Verify
results = verify_missing_kid_url(obj_url.url)
logger.debug("Missingkids verify results for URL {}: {}".format(obj_url.url, str(results)))
if (results.get("status") == "valid"):
self._set_status(obj_url, Urls.STATUS_ENUM.VALID)

View File

@@ -1,6 +1,7 @@
from .db_utils import DB_Handler
from ..models import Search, Source
from .fetch_utils_url_processor import get_with_protocol, url_host_slowdown
from .fetch_utils_url_processor import url_host_slowdown
from .utils import get_with_protocol
import newspaper
import traceback
from .logger import get_logger

View File

@@ -9,14 +9,6 @@ from urllib.parse import unquote
import langdetect
langdetect.DetectorFactory.seed = 0
def get_with_protocol(url):
# http:// -> https://
url = url.replace("http://", "https://")
# "" -> https://
if not (url.startswith("https://")):
url = "https://" + url
return url
def get_url_host(url):
# URL no protocol, first substring before '/'
url_host = url.replace("https://", "").replace("http://", "").split("/")[0]
@@ -39,7 +31,48 @@ def url_host_slowdown(url, url_host_slowdown_seconds):
# About to process URL host, cache time
cache.set("process_{}".format(url_host).encode("utf-8"), time.time(), timeout=60*5) # Expire after 5 minutes
def process_url(url, paywall_bypass=False):
def verify_missing_kid_url(url):
# Sleep required? To avoid too many requests error
url_host_slowdown(url, url_host_slowdown_seconds=float(os.getenv("FETCHER_URL_HOST_SLEEP", 5)))
# Request, get redirection
r = requests.get(url, allow_redirects=True)
# Redirection?
if (url != r.url):
url_redirection = r.url
return {"status": "duplicate", "redirection": url_redirection}
# Sample URL: "https://www.missingkids.org/poster/NCMC/2058896/1"
org_prefix, case_num = url.split("/")[-3], url.split("/")[-2]
# Fill details to API endpoint
base_url = "https://www.missingkids.org/bin/ncmecEndpoint?action=childDetail&orgPrefix={}&caseNum={}"
url_endpoint = base_url.format(org_prefix, case_num)
# Cache timeout missingkids.org
time.sleep(0.25)
# Request
r = requests.get(url_endpoint)
# Analyze status code and status result
if (r.status_code == 200):
r_json = r.json()
# Valid poster
if (r_json.get("status") == "success"):
return {"status": "valid"}
# Invalid poster
elif (r_json.get("status") == "error"):
return {"status": "invalid"}
else:
# ?
logger.info("Unknown json status: {} when verifying missing kid: {}".format(str(r_json), url))
return {"status": "unknown"}
else:
# Error status code
logger.info("Unknown request status: {} when verifying missing kid: {}".format(r.status_code, url))
return {"status": "unknown"}
def process_url(url, paywall_bypass=False, request_timeout=15):
if (paywall_bypass):
# TODO: Implement self-hosted instance
@@ -58,7 +91,7 @@ def process_url(url, paywall_bypass=False):
# Process
if ("foxnews.com" in url_of_interest) or ("zerohedge" in url_of_interest):
# Request
r = requests.get(url, headers={"User-Agent": user_agent})
r = requests.get(url, headers={"User-Agent": user_agent}, timeout=request_timeout)
# Raise for error code
r.raise_for_status()
# Parse
@@ -67,8 +100,10 @@ def process_url(url, paywall_bypass=False):
# Config: Fake user agent
config = newspaper.configuration.Configuration()
config.headers = {'User-Agent': user_agent}
config.request_timeout = request_timeout
# Default mode
article = newspaper.article(url_of_interest, config=config)
except newspaper.ArticleBinaryDataException:
logger.warning("ArticleException for input URL {}".format(url))
return {"override_status": "invalid"}
@@ -106,7 +141,7 @@ def process_url(url, paywall_bypass=False):
# Try simple request, valid response but couldn't parse article? e.g. getting blocked? -> unknown
time.sleep(0.25)
r = requests.get(url_of_interest)
r = requests.get(url_of_interest, timeout=request_timeout)
if (r.status_code == 200):
return {"override_status": "unknown"}
else:
@@ -117,7 +152,7 @@ def process_url(url, paywall_bypass=False):
except Exception as e:
logger.warning("Exception for input URL {}\n{}".format(url, str(e)))
return None
# Not a valid URL?
if (not article.is_valid_url()):
logger.debug("Invalid URL found: {}".format(url))

View File

@@ -4,54 +4,150 @@ from ..models import Urls, Source, Search, UrlContent, UrlsSourceSearch, UrlsDup
from django.db.models import Count
import requests
import os
import traceback
from .logger import get_logger
logger = get_logger()
def notify_telegram_info(last_hours, channel="INFO"):
try:
start_date = timezone.now() - timedelta(hours=last_hours)
# Count the number of URLs grouped by status within the date range
urls_data_status = Urls.objects.filter(ts_fetch__gte=start_date) \
.values('status') \
.annotate(count=Count('id')) \
.order_by('status')
# Count the number of URLs grouped by source
urls_data_source = UrlsSourceSearch.objects \
.filter(id_url__ts_fetch__gte=start_date) \
.values('id_source__source') \
.annotate(count=Count('id_url')) \
.order_by('id_source__source')
# Count the number of URLs grouped by search
urls_data_search = UrlsSourceSearch.objects \
.filter(id_url__ts_fetch__gte=start_date) \
.values('id_search__search') \
.annotate(count=Count('id_url')) \
.order_by('id_search__search')
def notify_telegram(last_hours=24):
start_date = timezone.now() - timedelta(hours=last_hours)
# Count the number of URLs grouped by status within the date range
urls_data_status = Urls.objects.filter(ts_fetch__gte=start_date) \
.values('status') \
.annotate(count=Count('id')) \
.order_by('status')
# Count the number of URLs grouped by source
urls_data_source = UrlsSourceSearch.objects \
.filter(id_url__ts_fetch__gte=start_date) \
.values('id_source__source') \
.annotate(count=Count('id_url')) \
.order_by('id_source__source')
# Count the number of URLs grouped by search
urls_data_search = UrlsSourceSearch.objects \
.filter(id_url__ts_fetch__gte=start_date) \
.values('id_search__search') \
.annotate(count=Count('id_url')) \
.order_by('id_search__search')
bot_token = os.environ.get("TELEGRAM_{}_BOT_TOKEN".format(channel), "")
chat_id = os.environ.get("TELEGRAM_{}_CHAT_ID".format(channel), "")
message = "During the last {} hours:\n".format(last_hours)
message += "\nURLs per status:\n"
for o in urls_data_status:
message += " {}: {}\n".format(o.get("status"), o.get("count"))
message += "\nURLs per source:\n"
for o in urls_data_source:
message += " {}: {}\n".format(o.get("id_source__source"), o.get("count"))
message += "\nURLs per search:\n"
for o in urls_data_search:
message += " {}: {}\n".format(o.get("id_search__search"), o.get("count"))
bot_token = os.environ.get("TELEGRAM_BOT_TOKEN", "")
chat_id = os.environ.get("TELEGRAM_CHAT_ID", "")
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
params = {
"chat_id": chat_id,
"text": message
}
# POST
response = requests.post(url, params=params)
except Exception as e:
logger.info("Exception while notifying status: {}\n{}".format(str(e), traceback.format_exc()))
message = "During the last {} hours:\n".format(last_hours)
message += "\nURLs per status:\n"
for o in urls_data_status:
message += " {}: {}\n".format(o.get("status"), o.get("count"))
message += "\nURLs per source:\n"
for o in urls_data_source:
message += " {}: {}\n".format(o.get("id_source__source"), o.get("count"))
message += "\nURLs per search:\n"
for o in urls_data_search:
message += " {}: {}\n".format(o.get("id_search__search"), o.get("count"))
def notify_telegram_warning(last_hours, channel="WARNING"):
try:
# Message appending logic
message = ""
start_date = timezone.now() - timedelta(hours=last_hours)
# Count the number of URLs grouped by status within the date range
urls_data_status = Urls.objects.filter(ts_fetch__gte=start_date) \
.values('status') \
.annotate(count=Count('id')) \
.order_by('status')
# Build dictionary
urls_data_status_dict = {}
for o in urls_data_status:
# #STATUS
urls_data_status_dict[o.get("status")] = o.get("count")
# #TOTAL
urls_data_status_dict["total"] = urls_data_status_dict.get("total", 0) + o.get("count")
MINIMUM_URLS_THRESHOLD = 10
MINIMUM_PROCESSED_URLS_RATIO = 0.7
# Minimum amount of URLs
if (urls_data_status_dict.get("total") < MINIMUM_URLS_THRESHOLD):
message += "WARNING - Total #URLS during the last {} hours: {}\n".format(last_hours, urls_data_status_dict.get("total"))
message += "\nURLs per status:\n"
for o in urls_data_status:
message += " {}: {}\n".format(o.get("status"), o.get("count"))
# Minimum ratio of processed raw urls
if (urls_data_status_dict.get("total") > 0):
if (urls_data_status_dict.get("raw", 0) / urls_data_status_dict.get("total") >= MINIMUM_PROCESSED_URLS_RATIO):
message += "WARNING - Small ratio of processed raw URLs during the last {} hours: {}\n".format(last_hours, urls_data_status_dict.get("total"))
message += "\nURLs per status:\n"
for o in urls_data_status:
message += " {}: {}\n".format(o.get("status"), o.get("count"))
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
params = {
"chat_id": chat_id,
"text": message
}
# Count the number of URLs grouped by source
urls_data_source = UrlsSourceSearch.objects \
.filter(id_url__ts_fetch__gte=start_date) \
.values('id_source__source') \
.annotate(count=Count('id_url')) \
.order_by('id_source__source')
MINIMUM_SOURCES = 3
if (len(urls_data_source) < MINIMUM_SOURCES):
message += "WARNING - Very few sources found URLs during the last {} hours".format(last_hours)
message += "\nURLs per source:\n"
for o in urls_data_source:
message += " {}: {}\n".format(o.get("id_source__source"), o.get("count"))
# POST
response = requests.post(url, params=params)
"""
# TODO: URLs per search, key should be present for cnbc.com, foxnews.com, zerohedge.com, breitbart.com, child abuse, child neglect
# Count the number of URLs grouped by search
urls_data_search = UrlsSourceSearch.objects \
.filter(id_url__ts_fetch__gte=start_date) \
.values('id_search__search') \
.annotate(count=Count('id_url')) \
.order_by('id_search__search')
message += "\nURLs per search:\n"
for o in urls_data_search:
message += " {}: {}\n".format(o.get("id_search__search"), o.get("count"))
"""
# Valid message body?
if (message != ""):
bot_token = os.environ.get("TELEGRAM_{}_BOT_TOKEN".format(channel), "")
chat_id = os.environ.get("TELEGRAM_{}_CHAT_ID".format(channel), "")
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
params = {
"chat_id": chat_id,
"text": message
}
# POST
response = requests.post(url, params=params)
except Exception as e:
logger.info("Exception while notifying status: {}\n{}".format(str(e)), traceback.format_exc())
def notify_telegram(last_hours=12):
# INFO
notify_telegram_info(last_hours, channel="INFO")
# WARNING
notify_telegram_warning(last_hours, channel="WARNING")

View File

@@ -0,0 +1,8 @@
def get_with_protocol(url):
# http:// -> https://
url = url.replace("http://", "https://")
# "" -> https://
if not (url.startswith("https://")):
url = "https://" + url
return url

View File

@@ -13,13 +13,30 @@ from .src.logger import get_logger
logger = get_logger()
@shared_task(queue='light')
def process_raw_urls(batch_size=100):
task = "Process raw URLs"
logger.info("Task triggered: {}".format(task))
DB_Handler().process_raw_urls(batch_size=batch_size)
logger.info("Task completed: {}".format(task))
@shared_task(queue='default')
def process_error_urls(batch_size=50):
task = "Process error URLs"
logger.info("Task triggered: {}".format(task))
DB_Handler().process_error_urls(batch_size=batch_size)
logger.info("Task completed: {}".format(task))
@shared_task(queue='light')
def fetch_feeds():
task = "Fetch Feeds"
logger.info("Task triggered: {}".format(task))
FetchFeeds().run()
logger.info("Task completed: {}".format(task))
@shared_task(queue='default')
def fetch_parser():
task = "Fetch Parser"
@@ -34,41 +51,31 @@ def fetch_search():
FetchSearcher().run()
logger.info("Task completed: {}".format(task))
@shared_task(queue='low')
@shared_task(queue='heavy')
def fetch_selenium_search():
task = "Fetch Selenium search"
logger.info("Task triggered: {}".format(task))
FetchSeleniumSourceSearch().run()
logger.info("Task completed: {}".format(task))
@shared_task(queue='low')
@shared_task(queue='heavy')
def fetch_missing_kids(number_pages=5):
task = "Fetch MissingKids"
logger.info("Task triggered: {}".format(task))
FetchMissingKids().run(number_pages)
logger.info("Task completed: {}".format(task))
@shared_task(queue='default')
def process_raw_urls(batch_size=100):
task = "Process raw URLs"
logger.info("Task triggered: {}".format(task))
DB_Handler().process_raw_urls(batch_size=batch_size)
logger.info("Task completed: {}".format(task))
@shared_task(queue='default')
def process_error_urls(batch_size=50):
task = "Process error URLs"
logger.info("Task triggered: {}".format(task))
DB_Handler().process_error_urls(batch_size=batch_size)
logger.info("Task completed: {}".format(task))
@shared_task(queue='low')
@shared_task(queue='heavy')
def process_missing_kids_urls(batch_size=None, process_status_only=None):
task = "Process Missing Kids URLs - batch_size={} process_status_only={}".format(batch_size, process_status_only)
logger.info("Task triggered: {}".format(task))
DB_Handler().process_missing_kids_urls(batch_size=batch_size, process_status_only=process_status_only)
logger.info("Task completed: {}".format(task))
@shared_task(queue='default')
def clean_old_url_content(older_than_days=14):
task = "Clean old URL content"
@@ -76,7 +83,7 @@ def clean_old_url_content(older_than_days=14):
DB_Handler().clean_old_url_content(older_than_days=older_than_days)
logger.info("Task completed: {}".format(task))
@shared_task(queue='default')
@shared_task(queue='light')
def notify_status():
task = "Notify status"
logger.info("Task triggered: {}".format(task))

View File

@@ -16,14 +16,10 @@ def trigger_task(request, task):
def link_list(request):
# Base URL path
app_url = request.build_absolute_uri()
# Tasks
links_fetch = ["fetch_feeds", "fetch_parser", "fetch_search", "fetch_missingkids_5", "fetch_missingkids_all", "fetch_selenium_search"]
links_process = ["process_raw_urls_50", "process_error_urls_50", "process_missing_kids_urls_50", "process_missing_kids_urls_valid_all", "process_missing_kids_urls_invalid_all", "process_missing_kids_urls_unknown_all", "process_missing_kids_urls_all", "clean_old_url_content_60"]
# List of links
list_links = \
[ os.path.join(app_url, "admin"), os.path.join(app_url, "urls") ] + \
[ os.path.join(app_url, "logs", log_type) for log_type in ["database", "debug", "info", "warning", "server", "beat", "worker_default", "worker_low"] ] #+ \
#[ os.path.join(app_url, "task", l) for l in links_fetch + links_process ]
[ os.path.join(app_url, "logs", log_type) for log_type in ["database", "debug", "info", "warning", "server", "beat", "worker_light", "worker_default", "worker_heavy"] ]
# Links tuple
links = [(l, l) for l in list_links]

View File

@@ -17,19 +17,20 @@
"cnbc.com"
],
"keyword_search": [
"child abuse"
"child abuse",
"child neglect"
]
},
"REGEX_PATTERN_STATUS_PRIORITY": [
[".*(youtube|tiktok|twitter|reddit)\\.com\\/.*", "invalid", 50],
["https:\\/\\/x.com\\/.*", "invalid", 50],
[".*cnbc\\.com\\/(video|quotes)\\/.*", "invalid", 75],
[".*foxnews\\.com\\/(video|category|person|books|html-sitemap)\\/.*", "invalid", 75],
[".*radio\\.foxnews\\.com\\/.*", "invalid", 75],
[".*breitbart\\.com\\/(tag|author)\\/.*", "invalid", 75],
[".*breitbart\\.com\\/(economy|entertainment|border|crime|clips)\\/.*", "valid", 50],
[".*zerohedge\\.com\\/(user|contributors)\\/.*", "invalid", 75],
[".*zerohedge\\.com\\/(economics|political|markets|)\\/.*", "valid", 50],
[".*breitbart\\.com\\/(economy|entertainment|border|crime|clips)\\/.*", "valid", 50],
[".*radio\\.foxnews\\.com\\/.*", "invalid", 75],
[".*foxnews\\.com\\/(video|category|person|books|html-sitemap)\\/.*", "invalid", 75],
[".*foxnews\\.com\\/(lifestyle|opinion|sports|world)\\/.*", "valid", 50],
[".*foxnews\\.com\\/[^\\/]+\\/?$", "invalid", 25]
]

View File

@@ -160,7 +160,7 @@
"expire_seconds": null,
"one_off": false,
"start_time": null,
"enabled": false,
"enabled": true,
"last_run_at": null,
"total_run_count": 0,
"date_changed": "2025-07-17T16:20:19.969Z",
@@ -188,7 +188,7 @@
"expire_seconds": null,
"one_off": false,
"start_time": null,
"enabled": false,
"enabled": true,
"last_run_at": null,
"total_run_count": 0,
"date_changed": "2025-07-17T16:21:30.809Z",
@@ -397,7 +397,7 @@
"fields": {
"name": "Notify status",
"task": "fetcher.tasks.notify_status",
"interval": 3,
"interval": 4,
"crontab": null,
"solar": null,
"clocked": null,

View File

@@ -15,7 +15,7 @@ stdout_logfile_maxbytes=20MB
stdout_logfile_backups=1
[program:beat]
command=celery -A core beat -l info --logfile=/opt/logs/beat.log
command=celery -A core beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler --logfile=/opt/logs/beat.log
directory=/opt/app
autostart=true
autorestart=true
@@ -40,14 +40,27 @@ redirect_stderr=true
stdout_logfile_maxbytes=20MB
stdout_logfile_backups=1
[program:worker_low]
command=celery -A core worker -l info --logfile=/opt/logs/worker_low.log --concurrency=1 -Q low -n low
[program:worker_light]
command=celery -A core worker -l info --logfile=/opt/logs/worker_light.log --concurrency=1 -Q light -n light
directory=/opt/app
autostart=true
autorestart=true
; Unified log file
stdout_logfile=/opt/logs/worker_low.log
stderr_logfile=/opt/logs/worker_low.log
stdout_logfile=/opt/logs/worker_light.log
stderr_logfile=/opt/logs/worker_light.log
redirect_stderr=true
; Rotate when file reaches max size
stdout_logfile_maxbytes=20MB
stdout_logfile_backups=1
[program:worker_heavy]
command=celery -A core worker -l info --logfile=/opt/logs/worker_heavy.log --concurrency=1 -Q heavy -n heavy
directory=/opt/app
autostart=true
autorestart=true
; Unified log file
stdout_logfile=/opt/logs/worker_heavy.log
stderr_logfile=/opt/logs/worker_heavy.log
redirect_stderr=true
; Rotate when file reaches max size
stdout_logfile_maxbytes=20MB

View File

@@ -43,8 +43,10 @@ services:
- DB_PASSWORD=${DB_PASSWORD}
- DB_HOST=${DB_HOST}
- DB_PORT=${DB_PORT}
- REDIS_HOST=${REDIS_HOST}
- REDIS_PORT=${REDIS_PORT}
- REDIS_CACHE_HOST=${REDIS_CACHE_HOST}
- REDIS_CACHE_PORT=${REDIS_CACHE_PORT}
- REDIS_CELERY_HOST=${REDIS_CELERY_HOST}
- REDIS_CELERY_PORT=${REDIS_CELERY_PORT}
# Job timeout: 30 min
- JOB_DEFAULT_TIMEOUT=${JOB_DEFAULT_TIMEOUT}
# Fetcher
@@ -64,22 +66,32 @@ services:
- PEXELS_API_KEY=${PEXELS_API_KEY}
- OLLAMA_MODEL_DEFAULT=${OLLAMA_MODEL_DEFAULT}
# Telegram
- TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN}
- TELEGRAM_CHAT_ID=${TELEGRAM_CHAT_ID}
- TELEGRAM_INFO_BOT_TOKEN=${TELEGRAM_INFO_BOT_TOKEN}
- TELEGRAM_INFO_CHAT_ID=${TELEGRAM_INFO_CHAT_ID}
- TELEGRAM_WARNING_BOT_TOKEN=${TELEGRAM_WARNING_BOT_TOKEN}
- TELEGRAM_WARNING_CHAT_ID=${TELEGRAM_WARNING_CHAT_ID}
########################
ports:
- 8000
depends_on:
- fetcher_db
- fetcher_redis
- fetcher_redis_cache
- fetcher_redis_celery
- fetcher_app_selenium
dns:
- 1.1.1.1
- 1.0.0.1
fetcher_redis:
fetcher_redis_cache:
image: redis:alpine
container_name: fetcher_redis
container_name: fetcher_redis_cache
restart: unless-stopped
ports:
- 6379
fetcher_redis_celery:
image: redis:alpine
container_name: fetcher_redis_celery
restart: unless-stopped
ports:
- 6379
@@ -94,6 +106,7 @@ services:
ports:
- 5555
environment:
- CELERY_BROKER_URL=redis://fetcher_redis:6379/0
- CELERY_BROKER_URL=redis://fetcher_redis_celery:6379/0
- FLOWER_UNAUTHENTICATED_API=true
depends_on:
- fetcher_redis
- fetcher_redis_celery

View File

@@ -52,12 +52,19 @@ services:
#volumes: # Persistent DB?
# - ./postgres:/var/lib/postgresql/data
fetcher_redis:
fetcher_redis_cache:
extends:
file: docker-compose-base.yml
service: fetcher_redis
service: fetcher_redis_cache
ports:
- 6379:6379
- 6379
fetcher_redis_celery:
extends:
file: docker-compose-base.yml
service: fetcher_redis_celery
ports:
- 6379
fetcher_flower:
extends:

View File

@@ -47,12 +47,19 @@ services:
autossh -M 15885 -N -L 0.0.0.0:5432:127.0.0.1:5432 ${REMOTE_USERNAME}@${REMOTE_HOST}
# autossh -M 15885 -N -o 'GatewayPorts yes' -L 0.0.0.0:5432:127.0.0.1:5432 ${REMOTE_USERNAME}@${REMOTE_HOST}
fetcher_redis:
fetcher_redis_cache:
extends:
file: docker-compose-base.yml
service: fetcher_redis
service: fetcher_redis_cache
ports:
- 6379:6379
- 6379
fetcher_redis_celery:
extends:
file: docker-compose-base.yml
service: fetcher_redis_celery
ports:
- 6379
fetcher_flower:
extends:

View File

@@ -14,7 +14,6 @@
"import json\n",
"import csv\n",
"\n",
"\n",
"headers = {\"User-Agent\": \"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36\"}"
]
},
@@ -329,13 +328,22 @@
" main()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df = pd.read_csv(\"scholenopdekaart.csv\", index_col=0)\n",
"import pandas as pd\n",
"\n",
"df = pd.read_csv(\"~/Downloads/scholenopdekaart.csv\", index_col=0)\n",
"\n",
"df.head()"
]
@@ -346,13 +354,101 @@
"metadata": {},
"outputs": [],
"source": [
"df.tail()"
"def to_dict(row):\n",
" # Empty?\n",
" if (pd.isna(row)):\n",
" return {}\n",
" # Evaluate, to dict\n",
" dict_data = dict(eval(row))\n",
" # Remove None values\n",
" for k in list(dict_data.keys()):\n",
" if dict_data[k] is None:\n",
" del dict_data[k]\n",
" # Prefix\n",
" return {f\"{column}_{k}\": v for k, v in dict_data.items()}\n",
"\n",
"for column in [\"students_per_year_trend\", \"num_students_per_group\", \"num_students_per_age\"]:\n",
" print(column)\n",
" # Convert the list of tuples into a dictionary per row\n",
" df_dicts = df[column].apply(to_dict)\n",
" # Expand into separate columns\n",
" df_expanded = pd.json_normalize(df_dicts)\n",
" # Sort\n",
" df_expanded = df_expanded[sorted(df_expanded.columns)]\n",
" # Combine with original columns\n",
" df = pd.concat([df.drop(columns=[column]), df_expanded], axis=1)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def to_dict(row):\n",
" # Empty?\n",
" if (pd.isna(row)):\n",
" return {}\n",
" # Evaluate, to dict\n",
" data = eval(row)\n",
" # Remove first useless data\n",
" data = data[1:]\n",
"\n",
" # Generate dict\n",
" dict_data = {}\n",
" for (zipcode, num, percentage) in data:\n",
" dict_data[f\"num_students_zipcode_{zipcode}\"] = num\n",
" dict_data[f\"percentage_students_zipcode_{zipcode}\"] = percentage\n",
"\n",
" # Remove None values\n",
" for k in list(dict_data.keys()):\n",
" if dict_data[k] is None:\n",
" del dict_data[k]\n",
" return dict_data\n",
"\n",
"for column in [\"students_per_zipcode\"]:\n",
" print(column)\n",
" # Convert the list of tuples into a dictionary per row\n",
" df_dicts = df[column].apply(to_dict)\n",
" # Expand into separate columns\n",
" df_expanded = pd.json_normalize(df_dicts)\n",
" # Sort\n",
" df_expanded = df_expanded[sorted(df_expanded.columns)]\n",
" # Combine with original columns\n",
" df = pd.concat([df.drop(columns=[column]), df_expanded], axis=1)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df.to_csv(\"schools_nl.csv\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df.head()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"list(df.columns)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "matitos_urls",
"display_name": "fetcher",
"language": "python",
"name": "python3"
},
@@ -366,7 +462,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.9"
"version": "3.12.11"
}
},
"nbformat": 4,