Compare commits
25 Commits
app_read_o
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cbc422df36 | ||
|
|
dc784dabec | ||
|
|
d8ef738d19 | ||
|
|
2f035a4222 | ||
|
|
e057568af0 | ||
|
|
7924857fe5 | ||
|
|
f44b784715 | ||
|
|
24510d26e2 | ||
|
|
ef51a96db6 | ||
|
|
079b2473f8 | ||
|
|
1fbc5beb6e | ||
|
|
7886d16264 | ||
|
|
2ed86e31ec | ||
|
|
892fb984d1 | ||
|
|
c17f09a94f | ||
|
|
e4a325d6b4 | ||
|
|
2fae0a3a9d | ||
|
|
35f9260b94 | ||
|
|
b40611bd3e | ||
|
|
346d7c9187 | ||
|
|
a21ff9c5d6 | ||
|
|
7b0d24309c | ||
|
|
334062b0ec | ||
|
|
a9074f45b5 | ||
|
|
569e7d4676 |
12
.env.sample
12
.env.sample
@@ -24,8 +24,10 @@ DB_PASSWORD=supermatitos
|
||||
DB_USER=supermatitos
|
||||
DB_HOST=fetcher_db
|
||||
DB_PORT=5432
|
||||
REDIS_HOST=fetcher_redis
|
||||
REDIS_PORT=6379
|
||||
REDIS_CACHE_HOST=fetcher_redis_cache
|
||||
REDIS_CACHE_PORT=6379
|
||||
REDIS_CELERY_HOST=fetcher_redis_celery
|
||||
REDIS_CELERY_PORT=6379
|
||||
|
||||
# Job timeout: 30 min
|
||||
JOB_DEFAULT_TIMEOUT=1800
|
||||
@@ -55,3 +57,9 @@ PEXELS_API_KEY=Y6clJkY32eihf34ukX4JsINYu9lzxh3xDdNq2HMAmGwXp0a0tt6vr6S9
|
||||
# Ollama
|
||||
ENDPOINT_OLLAMA=https://ollamamodelnpu.matitos.org
|
||||
OLLAMA_MODEL_DEFAULT=qwen2.5-instruct:3b
|
||||
|
||||
# Telegram
|
||||
TELEGRAM_INFO_BOT_TOKEN="..."
|
||||
TELEGRAM_INFO_CHAT_ID="..."
|
||||
TELEGRAM_WARNING_BOT_TOKEN="..."
|
||||
TELEGRAM_WARNING_CHAT_ID="..."
|
||||
|
||||
@@ -97,9 +97,10 @@ DATABASES = {
|
||||
CACHES = {
|
||||
"default": {
|
||||
"BACKEND": "django_redis.cache.RedisCache",
|
||||
"LOCATION": "redis://{}:{}".format(
|
||||
os.environ.get("REDIS_HOST", "localhost"),
|
||||
os.environ.get("REDIS_PORT", 6379)
|
||||
"LOCATION": "redis://{}:{}/{}".format(
|
||||
os.environ.get("REDIS_CACHE_HOST", "localhost"),
|
||||
os.environ.get("REDIS_CACHE_PORT", 6379),
|
||||
2 # DB for Caching
|
||||
),
|
||||
"OPTIONS": {
|
||||
"MEMCACHE_MAX_KEY_LENGTH": 2048,
|
||||
@@ -108,13 +109,14 @@ CACHES = {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
# Celery configuration
|
||||
CELERY_BROKER_URL = 'redis://{}:{}/{}'.format(os.environ.get("REDIS_HOST", "localhost"), os.environ.get("REDIS_PORT", 6379), os.environ.get("REDIS_DB", 0))
|
||||
CELERY_RESULT_BACKEND = 'redis://{}:{}/{}'.format(os.environ.get("REDIS_HOST", "localhost"), os.environ.get("REDIS_PORT", 6379), os.environ.get("REDIS_DB_RESULTS", 1))
|
||||
CELERY_BROKER_URL = 'redis://{}:{}/{}'.format(os.environ.get("REDIS_CELERY_HOST", "localhost"), os.environ.get("REDIS_CELERY_PORT", 6379), 0)
|
||||
CELERY_RESULT_BACKEND = 'redis://{}:{}/{}'.format(os.environ.get("REDIS_CELERY_HOST", "localhost"), os.environ.get("REDIS_CELERY_PORT", 6379), 1)
|
||||
CELERY_ACCEPT_CONTENT = ['json']
|
||||
CELERY_TASK_SERIALIZER = 'json'
|
||||
CELERY_RESULT_EXPIRES = 3600 # Auto clean results after 1 hour
|
||||
CELERY_ENABLE_UTC = True
|
||||
CELERY_TIMEZONE = "UTC"
|
||||
|
||||
# Celery Beat scheduler (required for django-celery-beat to work)
|
||||
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers.DatabaseScheduler'
|
||||
|
||||
@@ -4,7 +4,8 @@ from django.core.cache import cache
|
||||
from django.db import IntegrityError
|
||||
from django.utils import timezone
|
||||
from datetime import timedelta
|
||||
from .fetch_utils_url_processor import process_url, get_with_protocol, url_host_slowdown
|
||||
from .fetch_utils_url_processor import process_url, verify_missing_kid_url
|
||||
from .utils import get_with_protocol
|
||||
import re
|
||||
import requests
|
||||
import os
|
||||
@@ -16,7 +17,7 @@ class DB_Handler():
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
def insert_raw_urls(self, urls, obj_source, obj_search):
|
||||
def insert_raw_urls(self, urls, obj_source, obj_search):
|
||||
try:
|
||||
logger.debug("Inserting raw URLs")
|
||||
# Empty?
|
||||
@@ -100,15 +101,13 @@ class DB_Handler():
|
||||
# URLs duplciate association
|
||||
UrlsDuplicate.objects.get_or_create(id_url_canonical=obj_url_canonical, id_url_duplicated=obj_url)
|
||||
|
||||
def _process_single_url(self, obj_url, status_pattern_match, raise_exception_on_error, paywall_bypass=False):
|
||||
def _process_single_url(self, obj_url, status_pattern_match, raise_exception_on_error, paywall_bypass=False, request_timeout=15):
|
||||
##########################################################################
|
||||
# URL pattern: missingkids.org/poster OR missingkids.org/new-poster
|
||||
if ("missingkids.org" in obj_url.url) and ("poster" in obj_url.url):
|
||||
# Sleep required? To avoid too many requests error (original URL, not paywall bypassing endpoint)
|
||||
url_host_slowdown(obj_url.url, url_host_slowdown_seconds=float(os.getenv("FETCHER_URL_HOST_SLEEP", 5)))
|
||||
try:
|
||||
# Request
|
||||
r = requests.get(obj_url.url, allow_redirects=True)
|
||||
# Verify missing kid URL
|
||||
results = verify_missing_kid_url(obj_url.url)
|
||||
except Exception as e:
|
||||
if (raise_exception_on_error):
|
||||
# Simply raise exception, handled in a different way
|
||||
@@ -118,20 +117,16 @@ class DB_Handler():
|
||||
# Set status to error
|
||||
self._set_status(obj_url, Urls.STATUS_ENUM.ERROR)
|
||||
return
|
||||
|
||||
if (r.url != obj_url.url):
|
||||
# Canonical
|
||||
url_canonical = r.url
|
||||
# Set duplicate, and insert new canonical form
|
||||
self._set_duplicate_and_insert_canonical(obj_url, url_canonical)
|
||||
elif (r.status_code == 200):
|
||||
# Not enough to determine if it is valid. Need to wait to finish javascript, it might redirect to 404
|
||||
# self._set_status(obj_url, Urls.STATUS_ENUM.VALID)
|
||||
self._set_status(obj_url, Urls.STATUS_ENUM.UNKNOWN)
|
||||
elif (r.status_code == 404):
|
||||
|
||||
if (results.get("status") == "valid"):
|
||||
self._set_status(obj_url, Urls.STATUS_ENUM.VALID)
|
||||
elif (results.get("status") == "invalid"):
|
||||
self._set_status(obj_url, Urls.STATUS_ENUM.INVALID)
|
||||
else:
|
||||
logger.debug("Unknown request status: {} for missing kids request: {}".format(r.status_code, obj_url.url))
|
||||
elif (results.get("status") == "duplicate"):
|
||||
self._set_duplicate_and_insert_canonical(obj_url, results.get("redirection"))
|
||||
elif (results.get("status") == "unknown"):
|
||||
# Nothing to do, not sure about it...
|
||||
logger.info("Missing kid verification returned unknown for URL: {}".format(obj_url.url))
|
||||
self._set_status(obj_url, Urls.STATUS_ENUM.UNKNOWN)
|
||||
return
|
||||
##########################################################################
|
||||
@@ -147,7 +142,7 @@ class DB_Handler():
|
||||
|
||||
try:
|
||||
# Extract URL content
|
||||
dict_url_data = process_url(obj_url.url, paywall_bypass)
|
||||
dict_url_data = process_url(obj_url.url, paywall_bypass, request_timeout)
|
||||
except Exception as e:
|
||||
if (raise_exception_on_error):
|
||||
# Simply raise exception, handled in a different way
|
||||
@@ -314,14 +309,20 @@ class DB_Handler():
|
||||
# Per URL
|
||||
for obj_url in missingkids_urls:
|
||||
try:
|
||||
# Missing kids fetching endpoint, verify URL
|
||||
missingkids_fetch_endpoint = os.path.join(os.getenv("SELENIUM_ENDPOINT", "http://localhost:80"), "verify_missing_kid/")
|
||||
data = {"url": obj_url.url}
|
||||
# POST
|
||||
r = requests.post(missingkids_fetch_endpoint, json=data, timeout=120)
|
||||
# Jsonify
|
||||
results = r.json()
|
||||
logger.debug("Selenium results for URL {}: {}".format(obj_url.url, str(results)))
|
||||
SELENIUM_BASED_MISSINGKID_VERIFICATION = False
|
||||
if (SELENIUM_BASED_MISSINGKID_VERIFICATION):
|
||||
# Missing kids fetching endpoint, verify URL
|
||||
missingkids_fetch_endpoint = os.path.join(os.getenv("SELENIUM_ENDPOINT", "http://localhost:80"), "verify_missing_kid/")
|
||||
data = {"url": obj_url.url}
|
||||
# POST
|
||||
r = requests.post(missingkids_fetch_endpoint, json=data, timeout=120)
|
||||
# Jsonify
|
||||
results = r.json()
|
||||
logger.debug("Missingkids Selenium results for URL {}: {}".format(obj_url.url, str(results)))
|
||||
else:
|
||||
# Verify
|
||||
results = verify_missing_kid_url(obj_url.url)
|
||||
logger.debug("Missingkids verify results for URL {}: {}".format(obj_url.url, str(results)))
|
||||
|
||||
if (results.get("status") == "valid"):
|
||||
self._set_status(obj_url, Urls.STATUS_ENUM.VALID)
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from .db_utils import DB_Handler
|
||||
from ..models import Search, Source
|
||||
from .fetch_utils_url_processor import get_with_protocol, url_host_slowdown
|
||||
from .fetch_utils_url_processor import url_host_slowdown
|
||||
from .utils import get_with_protocol
|
||||
import newspaper
|
||||
import traceback
|
||||
from .logger import get_logger
|
||||
|
||||
@@ -9,14 +9,6 @@ from urllib.parse import unquote
|
||||
import langdetect
|
||||
langdetect.DetectorFactory.seed = 0
|
||||
|
||||
def get_with_protocol(url):
|
||||
# http:// -> https://
|
||||
url = url.replace("http://", "https://")
|
||||
# "" -> https://
|
||||
if not (url.startswith("https://")):
|
||||
url = "https://" + url
|
||||
return url
|
||||
|
||||
def get_url_host(url):
|
||||
# URL no protocol, first substring before '/'
|
||||
url_host = url.replace("https://", "").replace("http://", "").split("/")[0]
|
||||
@@ -39,7 +31,48 @@ def url_host_slowdown(url, url_host_slowdown_seconds):
|
||||
# About to process URL host, cache time
|
||||
cache.set("process_{}".format(url_host).encode("utf-8"), time.time(), timeout=60*5) # Expire after 5 minutes
|
||||
|
||||
def process_url(url, paywall_bypass=False):
|
||||
|
||||
def verify_missing_kid_url(url):
|
||||
# Sleep required? To avoid too many requests error
|
||||
url_host_slowdown(url, url_host_slowdown_seconds=float(os.getenv("FETCHER_URL_HOST_SLEEP", 5)))
|
||||
|
||||
# Request, get redirection
|
||||
r = requests.get(url, allow_redirects=True)
|
||||
# Redirection?
|
||||
if (url != r.url):
|
||||
url_redirection = r.url
|
||||
return {"status": "duplicate", "redirection": url_redirection}
|
||||
|
||||
# Sample URL: "https://www.missingkids.org/poster/NCMC/2058896/1"
|
||||
org_prefix, case_num = url.split("/")[-3], url.split("/")[-2]
|
||||
# Fill details to API endpoint
|
||||
base_url = "https://www.missingkids.org/bin/ncmecEndpoint?action=childDetail&orgPrefix={}&caseNum={}"
|
||||
url_endpoint = base_url.format(org_prefix, case_num)
|
||||
|
||||
# Cache timeout missingkids.org
|
||||
time.sleep(0.25)
|
||||
|
||||
# Request
|
||||
r = requests.get(url_endpoint)
|
||||
# Analyze status code and status result
|
||||
if (r.status_code == 200):
|
||||
r_json = r.json()
|
||||
# Valid poster
|
||||
if (r_json.get("status") == "success"):
|
||||
return {"status": "valid"}
|
||||
# Invalid poster
|
||||
elif (r_json.get("status") == "error"):
|
||||
return {"status": "invalid"}
|
||||
else:
|
||||
# ?
|
||||
logger.info("Unknown json status: {} when verifying missing kid: {}".format(str(r_json), url))
|
||||
return {"status": "unknown"}
|
||||
else:
|
||||
# Error status code
|
||||
logger.info("Unknown request status: {} when verifying missing kid: {}".format(r.status_code, url))
|
||||
return {"status": "unknown"}
|
||||
|
||||
def process_url(url, paywall_bypass=False, request_timeout=15):
|
||||
|
||||
if (paywall_bypass):
|
||||
# TODO: Implement self-hosted instance
|
||||
@@ -58,7 +91,7 @@ def process_url(url, paywall_bypass=False):
|
||||
# Process
|
||||
if ("foxnews.com" in url_of_interest) or ("zerohedge" in url_of_interest):
|
||||
# Request
|
||||
r = requests.get(url, headers={"User-Agent": user_agent})
|
||||
r = requests.get(url, headers={"User-Agent": user_agent}, timeout=request_timeout)
|
||||
# Raise for error code
|
||||
r.raise_for_status()
|
||||
# Parse
|
||||
@@ -67,8 +100,10 @@ def process_url(url, paywall_bypass=False):
|
||||
# Config: Fake user agent
|
||||
config = newspaper.configuration.Configuration()
|
||||
config.headers = {'User-Agent': user_agent}
|
||||
config.request_timeout = request_timeout
|
||||
# Default mode
|
||||
article = newspaper.article(url_of_interest, config=config)
|
||||
|
||||
except newspaper.ArticleBinaryDataException:
|
||||
logger.warning("ArticleException for input URL {}".format(url))
|
||||
return {"override_status": "invalid"}
|
||||
@@ -106,7 +141,7 @@ def process_url(url, paywall_bypass=False):
|
||||
|
||||
# Try simple request, valid response but couldn't parse article? e.g. getting blocked? -> unknown
|
||||
time.sleep(0.25)
|
||||
r = requests.get(url_of_interest)
|
||||
r = requests.get(url_of_interest, timeout=request_timeout)
|
||||
if (r.status_code == 200):
|
||||
return {"override_status": "unknown"}
|
||||
else:
|
||||
@@ -117,7 +152,7 @@ def process_url(url, paywall_bypass=False):
|
||||
except Exception as e:
|
||||
logger.warning("Exception for input URL {}\n{}".format(url, str(e)))
|
||||
return None
|
||||
|
||||
|
||||
# Not a valid URL?
|
||||
if (not article.is_valid_url()):
|
||||
logger.debug("Invalid URL found: {}".format(url))
|
||||
|
||||
@@ -4,54 +4,150 @@ from ..models import Urls, Source, Search, UrlContent, UrlsSourceSearch, UrlsDup
|
||||
from django.db.models import Count
|
||||
import requests
|
||||
import os
|
||||
import traceback
|
||||
from .logger import get_logger
|
||||
logger = get_logger()
|
||||
|
||||
def notify_telegram_info(last_hours, channel="INFO"):
|
||||
try:
|
||||
start_date = timezone.now() - timedelta(hours=last_hours)
|
||||
|
||||
# Count the number of URLs grouped by status within the date range
|
||||
urls_data_status = Urls.objects.filter(ts_fetch__gte=start_date) \
|
||||
.values('status') \
|
||||
.annotate(count=Count('id')) \
|
||||
.order_by('status')
|
||||
|
||||
# Count the number of URLs grouped by source
|
||||
urls_data_source = UrlsSourceSearch.objects \
|
||||
.filter(id_url__ts_fetch__gte=start_date) \
|
||||
.values('id_source__source') \
|
||||
.annotate(count=Count('id_url')) \
|
||||
.order_by('id_source__source')
|
||||
|
||||
# Count the number of URLs grouped by search
|
||||
urls_data_search = UrlsSourceSearch.objects \
|
||||
.filter(id_url__ts_fetch__gte=start_date) \
|
||||
.values('id_search__search') \
|
||||
.annotate(count=Count('id_url')) \
|
||||
.order_by('id_search__search')
|
||||
|
||||
|
||||
def notify_telegram(last_hours=24):
|
||||
start_date = timezone.now() - timedelta(hours=last_hours)
|
||||
|
||||
# Count the number of URLs grouped by status within the date range
|
||||
urls_data_status = Urls.objects.filter(ts_fetch__gte=start_date) \
|
||||
.values('status') \
|
||||
.annotate(count=Count('id')) \
|
||||
.order_by('status')
|
||||
|
||||
# Count the number of URLs grouped by source
|
||||
urls_data_source = UrlsSourceSearch.objects \
|
||||
.filter(id_url__ts_fetch__gte=start_date) \
|
||||
.values('id_source__source') \
|
||||
.annotate(count=Count('id_url')) \
|
||||
.order_by('id_source__source')
|
||||
|
||||
# Count the number of URLs grouped by search
|
||||
urls_data_search = UrlsSourceSearch.objects \
|
||||
.filter(id_url__ts_fetch__gte=start_date) \
|
||||
.values('id_search__search') \
|
||||
.annotate(count=Count('id_url')) \
|
||||
.order_by('id_search__search')
|
||||
bot_token = os.environ.get("TELEGRAM_{}_BOT_TOKEN".format(channel), "")
|
||||
chat_id = os.environ.get("TELEGRAM_{}_CHAT_ID".format(channel), "")
|
||||
|
||||
message = "During the last {} hours:\n".format(last_hours)
|
||||
|
||||
message += "\nURLs per status:\n"
|
||||
for o in urls_data_status:
|
||||
message += " {}: {}\n".format(o.get("status"), o.get("count"))
|
||||
message += "\nURLs per source:\n"
|
||||
for o in urls_data_source:
|
||||
message += " {}: {}\n".format(o.get("id_source__source"), o.get("count"))
|
||||
message += "\nURLs per search:\n"
|
||||
for o in urls_data_search:
|
||||
message += " {}: {}\n".format(o.get("id_search__search"), o.get("count"))
|
||||
|
||||
|
||||
bot_token = os.environ.get("TELEGRAM_BOT_TOKEN", "")
|
||||
chat_id = os.environ.get("TELEGRAM_CHAT_ID", "")
|
||||
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
|
||||
params = {
|
||||
"chat_id": chat_id,
|
||||
"text": message
|
||||
}
|
||||
|
||||
# POST
|
||||
response = requests.post(url, params=params)
|
||||
except Exception as e:
|
||||
logger.info("Exception while notifying status: {}\n{}".format(str(e), traceback.format_exc()))
|
||||
|
||||
|
||||
message = "During the last {} hours:\n".format(last_hours)
|
||||
|
||||
message += "\nURLs per status:\n"
|
||||
for o in urls_data_status:
|
||||
message += " {}: {}\n".format(o.get("status"), o.get("count"))
|
||||
message += "\nURLs per source:\n"
|
||||
for o in urls_data_source:
|
||||
message += " {}: {}\n".format(o.get("id_source__source"), o.get("count"))
|
||||
message += "\nURLs per search:\n"
|
||||
for o in urls_data_search:
|
||||
message += " {}: {}\n".format(o.get("id_search__search"), o.get("count"))
|
||||
def notify_telegram_warning(last_hours, channel="WARNING"):
|
||||
try:
|
||||
# Message appending logic
|
||||
message = ""
|
||||
|
||||
start_date = timezone.now() - timedelta(hours=last_hours)
|
||||
|
||||
# Count the number of URLs grouped by status within the date range
|
||||
urls_data_status = Urls.objects.filter(ts_fetch__gte=start_date) \
|
||||
.values('status') \
|
||||
.annotate(count=Count('id')) \
|
||||
.order_by('status')
|
||||
|
||||
# Build dictionary
|
||||
urls_data_status_dict = {}
|
||||
for o in urls_data_status:
|
||||
# #STATUS
|
||||
urls_data_status_dict[o.get("status")] = o.get("count")
|
||||
# #TOTAL
|
||||
urls_data_status_dict["total"] = urls_data_status_dict.get("total", 0) + o.get("count")
|
||||
|
||||
MINIMUM_URLS_THRESHOLD = 10
|
||||
MINIMUM_PROCESSED_URLS_RATIO = 0.7
|
||||
|
||||
# Minimum amount of URLs
|
||||
if (urls_data_status_dict.get("total") < MINIMUM_URLS_THRESHOLD):
|
||||
message += "WARNING - Total #URLS during the last {} hours: {}\n".format(last_hours, urls_data_status_dict.get("total"))
|
||||
message += "\nURLs per status:\n"
|
||||
for o in urls_data_status:
|
||||
message += " {}: {}\n".format(o.get("status"), o.get("count"))
|
||||
|
||||
# Minimum ratio of processed raw urls
|
||||
if (urls_data_status_dict.get("total") > 0):
|
||||
if (urls_data_status_dict.get("raw", 0) / urls_data_status_dict.get("total") >= MINIMUM_PROCESSED_URLS_RATIO):
|
||||
message += "WARNING - Small ratio of processed raw URLs during the last {} hours: {}\n".format(last_hours, urls_data_status_dict.get("total"))
|
||||
message += "\nURLs per status:\n"
|
||||
for o in urls_data_status:
|
||||
message += " {}: {}\n".format(o.get("status"), o.get("count"))
|
||||
|
||||
|
||||
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
|
||||
params = {
|
||||
"chat_id": chat_id,
|
||||
"text": message
|
||||
}
|
||||
# Count the number of URLs grouped by source
|
||||
urls_data_source = UrlsSourceSearch.objects \
|
||||
.filter(id_url__ts_fetch__gte=start_date) \
|
||||
.values('id_source__source') \
|
||||
.annotate(count=Count('id_url')) \
|
||||
.order_by('id_source__source')
|
||||
|
||||
MINIMUM_SOURCES = 3
|
||||
if (len(urls_data_source) < MINIMUM_SOURCES):
|
||||
message += "WARNING - Very few sources found URLs during the last {} hours".format(last_hours)
|
||||
message += "\nURLs per source:\n"
|
||||
for o in urls_data_source:
|
||||
message += " {}: {}\n".format(o.get("id_source__source"), o.get("count"))
|
||||
|
||||
# POST
|
||||
response = requests.post(url, params=params)
|
||||
"""
|
||||
# TODO: URLs per search, key should be present for cnbc.com, foxnews.com, zerohedge.com, breitbart.com, child abuse, child neglect
|
||||
# Count the number of URLs grouped by search
|
||||
urls_data_search = UrlsSourceSearch.objects \
|
||||
.filter(id_url__ts_fetch__gte=start_date) \
|
||||
.values('id_search__search') \
|
||||
.annotate(count=Count('id_url')) \
|
||||
.order_by('id_search__search')
|
||||
|
||||
message += "\nURLs per search:\n"
|
||||
for o in urls_data_search:
|
||||
message += " {}: {}\n".format(o.get("id_search__search"), o.get("count"))
|
||||
"""
|
||||
|
||||
# Valid message body?
|
||||
if (message != ""):
|
||||
bot_token = os.environ.get("TELEGRAM_{}_BOT_TOKEN".format(channel), "")
|
||||
chat_id = os.environ.get("TELEGRAM_{}_CHAT_ID".format(channel), "")
|
||||
|
||||
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
|
||||
params = {
|
||||
"chat_id": chat_id,
|
||||
"text": message
|
||||
}
|
||||
|
||||
# POST
|
||||
response = requests.post(url, params=params)
|
||||
except Exception as e:
|
||||
logger.info("Exception while notifying status: {}\n{}".format(str(e)), traceback.format_exc())
|
||||
|
||||
|
||||
def notify_telegram(last_hours=12):
|
||||
# INFO
|
||||
notify_telegram_info(last_hours, channel="INFO")
|
||||
# WARNING
|
||||
notify_telegram_warning(last_hours, channel="WARNING")
|
||||
|
||||
8
app_urls/fetcher/src/utils.py
Normal file
8
app_urls/fetcher/src/utils.py
Normal file
@@ -0,0 +1,8 @@
|
||||
|
||||
def get_with_protocol(url):
|
||||
# http:// -> https://
|
||||
url = url.replace("http://", "https://")
|
||||
# "" -> https://
|
||||
if not (url.startswith("https://")):
|
||||
url = "https://" + url
|
||||
return url
|
||||
@@ -13,13 +13,30 @@ from .src.logger import get_logger
|
||||
logger = get_logger()
|
||||
|
||||
|
||||
@shared_task(queue='light')
|
||||
def process_raw_urls(batch_size=100):
|
||||
task = "Process raw URLs"
|
||||
logger.info("Task triggered: {}".format(task))
|
||||
DB_Handler().process_raw_urls(batch_size=batch_size)
|
||||
logger.info("Task completed: {}".format(task))
|
||||
|
||||
@shared_task(queue='default')
|
||||
def process_error_urls(batch_size=50):
|
||||
task = "Process error URLs"
|
||||
logger.info("Task triggered: {}".format(task))
|
||||
DB_Handler().process_error_urls(batch_size=batch_size)
|
||||
logger.info("Task completed: {}".format(task))
|
||||
|
||||
|
||||
|
||||
@shared_task(queue='light')
|
||||
def fetch_feeds():
|
||||
task = "Fetch Feeds"
|
||||
logger.info("Task triggered: {}".format(task))
|
||||
FetchFeeds().run()
|
||||
logger.info("Task completed: {}".format(task))
|
||||
|
||||
|
||||
@shared_task(queue='default')
|
||||
def fetch_parser():
|
||||
task = "Fetch Parser"
|
||||
@@ -34,41 +51,31 @@ def fetch_search():
|
||||
FetchSearcher().run()
|
||||
logger.info("Task completed: {}".format(task))
|
||||
|
||||
@shared_task(queue='low')
|
||||
|
||||
|
||||
@shared_task(queue='heavy')
|
||||
def fetch_selenium_search():
|
||||
task = "Fetch Selenium search"
|
||||
logger.info("Task triggered: {}".format(task))
|
||||
FetchSeleniumSourceSearch().run()
|
||||
logger.info("Task completed: {}".format(task))
|
||||
|
||||
@shared_task(queue='low')
|
||||
@shared_task(queue='heavy')
|
||||
def fetch_missing_kids(number_pages=5):
|
||||
task = "Fetch MissingKids"
|
||||
logger.info("Task triggered: {}".format(task))
|
||||
FetchMissingKids().run(number_pages)
|
||||
logger.info("Task completed: {}".format(task))
|
||||
|
||||
@shared_task(queue='default')
|
||||
def process_raw_urls(batch_size=100):
|
||||
task = "Process raw URLs"
|
||||
logger.info("Task triggered: {}".format(task))
|
||||
DB_Handler().process_raw_urls(batch_size=batch_size)
|
||||
logger.info("Task completed: {}".format(task))
|
||||
|
||||
@shared_task(queue='default')
|
||||
def process_error_urls(batch_size=50):
|
||||
task = "Process error URLs"
|
||||
logger.info("Task triggered: {}".format(task))
|
||||
DB_Handler().process_error_urls(batch_size=batch_size)
|
||||
logger.info("Task completed: {}".format(task))
|
||||
|
||||
@shared_task(queue='low')
|
||||
@shared_task(queue='heavy')
|
||||
def process_missing_kids_urls(batch_size=None, process_status_only=None):
|
||||
task = "Process Missing Kids URLs - batch_size={} process_status_only={}".format(batch_size, process_status_only)
|
||||
logger.info("Task triggered: {}".format(task))
|
||||
DB_Handler().process_missing_kids_urls(batch_size=batch_size, process_status_only=process_status_only)
|
||||
logger.info("Task completed: {}".format(task))
|
||||
|
||||
|
||||
|
||||
@shared_task(queue='default')
|
||||
def clean_old_url_content(older_than_days=14):
|
||||
task = "Clean old URL content"
|
||||
@@ -76,7 +83,7 @@ def clean_old_url_content(older_than_days=14):
|
||||
DB_Handler().clean_old_url_content(older_than_days=older_than_days)
|
||||
logger.info("Task completed: {}".format(task))
|
||||
|
||||
@shared_task(queue='default')
|
||||
@shared_task(queue='light')
|
||||
def notify_status():
|
||||
task = "Notify status"
|
||||
logger.info("Task triggered: {}".format(task))
|
||||
|
||||
@@ -16,14 +16,10 @@ def trigger_task(request, task):
|
||||
def link_list(request):
|
||||
# Base URL path
|
||||
app_url = request.build_absolute_uri()
|
||||
# Tasks
|
||||
links_fetch = ["fetch_feeds", "fetch_parser", "fetch_search", "fetch_missingkids_5", "fetch_missingkids_all", "fetch_selenium_search"]
|
||||
links_process = ["process_raw_urls_50", "process_error_urls_50", "process_missing_kids_urls_50", "process_missing_kids_urls_valid_all", "process_missing_kids_urls_invalid_all", "process_missing_kids_urls_unknown_all", "process_missing_kids_urls_all", "clean_old_url_content_60"]
|
||||
# List of links
|
||||
list_links = \
|
||||
[ os.path.join(app_url, "admin"), os.path.join(app_url, "urls") ] + \
|
||||
[ os.path.join(app_url, "logs", log_type) for log_type in ["database", "debug", "info", "warning", "server", "beat", "worker_default", "worker_low"] ] #+ \
|
||||
#[ os.path.join(app_url, "task", l) for l in links_fetch + links_process ]
|
||||
[ os.path.join(app_url, "logs", log_type) for log_type in ["database", "debug", "info", "warning", "server", "beat", "worker_light", "worker_default", "worker_heavy"] ]
|
||||
|
||||
# Links tuple
|
||||
links = [(l, l) for l in list_links]
|
||||
|
||||
@@ -17,19 +17,20 @@
|
||||
"cnbc.com"
|
||||
],
|
||||
"keyword_search": [
|
||||
"child abuse"
|
||||
"child abuse",
|
||||
"child neglect"
|
||||
]
|
||||
},
|
||||
"REGEX_PATTERN_STATUS_PRIORITY": [
|
||||
[".*(youtube|tiktok|twitter|reddit)\\.com\\/.*", "invalid", 50],
|
||||
["https:\\/\\/x.com\\/.*", "invalid", 50],
|
||||
[".*cnbc\\.com\\/(video|quotes)\\/.*", "invalid", 75],
|
||||
[".*foxnews\\.com\\/(video|category|person|books|html-sitemap)\\/.*", "invalid", 75],
|
||||
[".*radio\\.foxnews\\.com\\/.*", "invalid", 75],
|
||||
[".*breitbart\\.com\\/(tag|author)\\/.*", "invalid", 75],
|
||||
[".*breitbart\\.com\\/(economy|entertainment|border|crime|clips)\\/.*", "valid", 50],
|
||||
[".*zerohedge\\.com\\/(user|contributors)\\/.*", "invalid", 75],
|
||||
[".*zerohedge\\.com\\/(economics|political|markets|)\\/.*", "valid", 50],
|
||||
[".*breitbart\\.com\\/(economy|entertainment|border|crime|clips)\\/.*", "valid", 50],
|
||||
[".*radio\\.foxnews\\.com\\/.*", "invalid", 75],
|
||||
[".*foxnews\\.com\\/(video|category|person|books|html-sitemap)\\/.*", "invalid", 75],
|
||||
[".*foxnews\\.com\\/(lifestyle|opinion|sports|world)\\/.*", "valid", 50],
|
||||
[".*foxnews\\.com\\/[^\\/]+\\/?$", "invalid", 25]
|
||||
]
|
||||
|
||||
@@ -160,7 +160,7 @@
|
||||
"expire_seconds": null,
|
||||
"one_off": false,
|
||||
"start_time": null,
|
||||
"enabled": false,
|
||||
"enabled": true,
|
||||
"last_run_at": null,
|
||||
"total_run_count": 0,
|
||||
"date_changed": "2025-07-17T16:20:19.969Z",
|
||||
@@ -188,7 +188,7 @@
|
||||
"expire_seconds": null,
|
||||
"one_off": false,
|
||||
"start_time": null,
|
||||
"enabled": false,
|
||||
"enabled": true,
|
||||
"last_run_at": null,
|
||||
"total_run_count": 0,
|
||||
"date_changed": "2025-07-17T16:21:30.809Z",
|
||||
@@ -397,7 +397,7 @@
|
||||
"fields": {
|
||||
"name": "Notify status",
|
||||
"task": "fetcher.tasks.notify_status",
|
||||
"interval": 3,
|
||||
"interval": 4,
|
||||
"crontab": null,
|
||||
"solar": null,
|
||||
"clocked": null,
|
||||
|
||||
@@ -15,7 +15,7 @@ stdout_logfile_maxbytes=20MB
|
||||
stdout_logfile_backups=1
|
||||
|
||||
[program:beat]
|
||||
command=celery -A core beat -l info --logfile=/opt/logs/beat.log
|
||||
command=celery -A core beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler --logfile=/opt/logs/beat.log
|
||||
directory=/opt/app
|
||||
autostart=true
|
||||
autorestart=true
|
||||
@@ -40,14 +40,27 @@ redirect_stderr=true
|
||||
stdout_logfile_maxbytes=20MB
|
||||
stdout_logfile_backups=1
|
||||
|
||||
[program:worker_low]
|
||||
command=celery -A core worker -l info --logfile=/opt/logs/worker_low.log --concurrency=1 -Q low -n low
|
||||
[program:worker_light]
|
||||
command=celery -A core worker -l info --logfile=/opt/logs/worker_light.log --concurrency=1 -Q light -n light
|
||||
directory=/opt/app
|
||||
autostart=true
|
||||
autorestart=true
|
||||
; Unified log file
|
||||
stdout_logfile=/opt/logs/worker_low.log
|
||||
stderr_logfile=/opt/logs/worker_low.log
|
||||
stdout_logfile=/opt/logs/worker_light.log
|
||||
stderr_logfile=/opt/logs/worker_light.log
|
||||
redirect_stderr=true
|
||||
; Rotate when file reaches max size
|
||||
stdout_logfile_maxbytes=20MB
|
||||
stdout_logfile_backups=1
|
||||
|
||||
[program:worker_heavy]
|
||||
command=celery -A core worker -l info --logfile=/opt/logs/worker_heavy.log --concurrency=1 -Q heavy -n heavy
|
||||
directory=/opt/app
|
||||
autostart=true
|
||||
autorestart=true
|
||||
; Unified log file
|
||||
stdout_logfile=/opt/logs/worker_heavy.log
|
||||
stderr_logfile=/opt/logs/worker_heavy.log
|
||||
redirect_stderr=true
|
||||
; Rotate when file reaches max size
|
||||
stdout_logfile_maxbytes=20MB
|
||||
|
||||
@@ -43,8 +43,10 @@ services:
|
||||
- DB_PASSWORD=${DB_PASSWORD}
|
||||
- DB_HOST=${DB_HOST}
|
||||
- DB_PORT=${DB_PORT}
|
||||
- REDIS_HOST=${REDIS_HOST}
|
||||
- REDIS_PORT=${REDIS_PORT}
|
||||
- REDIS_CACHE_HOST=${REDIS_CACHE_HOST}
|
||||
- REDIS_CACHE_PORT=${REDIS_CACHE_PORT}
|
||||
- REDIS_CELERY_HOST=${REDIS_CELERY_HOST}
|
||||
- REDIS_CELERY_PORT=${REDIS_CELERY_PORT}
|
||||
# Job timeout: 30 min
|
||||
- JOB_DEFAULT_TIMEOUT=${JOB_DEFAULT_TIMEOUT}
|
||||
# Fetcher
|
||||
@@ -64,22 +66,32 @@ services:
|
||||
- PEXELS_API_KEY=${PEXELS_API_KEY}
|
||||
- OLLAMA_MODEL_DEFAULT=${OLLAMA_MODEL_DEFAULT}
|
||||
# Telegram
|
||||
- TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN}
|
||||
- TELEGRAM_CHAT_ID=${TELEGRAM_CHAT_ID}
|
||||
- TELEGRAM_INFO_BOT_TOKEN=${TELEGRAM_INFO_BOT_TOKEN}
|
||||
- TELEGRAM_INFO_CHAT_ID=${TELEGRAM_INFO_CHAT_ID}
|
||||
- TELEGRAM_WARNING_BOT_TOKEN=${TELEGRAM_WARNING_BOT_TOKEN}
|
||||
- TELEGRAM_WARNING_CHAT_ID=${TELEGRAM_WARNING_CHAT_ID}
|
||||
########################
|
||||
ports:
|
||||
- 8000
|
||||
depends_on:
|
||||
- fetcher_db
|
||||
- fetcher_redis
|
||||
- fetcher_redis_cache
|
||||
- fetcher_redis_celery
|
||||
- fetcher_app_selenium
|
||||
dns:
|
||||
- 1.1.1.1
|
||||
- 1.0.0.1
|
||||
|
||||
fetcher_redis:
|
||||
fetcher_redis_cache:
|
||||
image: redis:alpine
|
||||
container_name: fetcher_redis
|
||||
container_name: fetcher_redis_cache
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- 6379
|
||||
|
||||
fetcher_redis_celery:
|
||||
image: redis:alpine
|
||||
container_name: fetcher_redis_celery
|
||||
restart: unless-stopped
|
||||
ports:
|
||||
- 6379
|
||||
@@ -94,6 +106,7 @@ services:
|
||||
ports:
|
||||
- 5555
|
||||
environment:
|
||||
- CELERY_BROKER_URL=redis://fetcher_redis:6379/0
|
||||
- CELERY_BROKER_URL=redis://fetcher_redis_celery:6379/0
|
||||
- FLOWER_UNAUTHENTICATED_API=true
|
||||
depends_on:
|
||||
- fetcher_redis
|
||||
- fetcher_redis_celery
|
||||
|
||||
@@ -52,12 +52,19 @@ services:
|
||||
#volumes: # Persistent DB?
|
||||
# - ./postgres:/var/lib/postgresql/data
|
||||
|
||||
fetcher_redis:
|
||||
fetcher_redis_cache:
|
||||
extends:
|
||||
file: docker-compose-base.yml
|
||||
service: fetcher_redis
|
||||
service: fetcher_redis_cache
|
||||
ports:
|
||||
- 6379:6379
|
||||
- 6379
|
||||
|
||||
fetcher_redis_celery:
|
||||
extends:
|
||||
file: docker-compose-base.yml
|
||||
service: fetcher_redis_celery
|
||||
ports:
|
||||
- 6379
|
||||
|
||||
fetcher_flower:
|
||||
extends:
|
||||
|
||||
@@ -47,12 +47,19 @@ services:
|
||||
autossh -M 15885 -N -L 0.0.0.0:5432:127.0.0.1:5432 ${REMOTE_USERNAME}@${REMOTE_HOST}
|
||||
# autossh -M 15885 -N -o 'GatewayPorts yes' -L 0.0.0.0:5432:127.0.0.1:5432 ${REMOTE_USERNAME}@${REMOTE_HOST}
|
||||
|
||||
fetcher_redis:
|
||||
fetcher_redis_cache:
|
||||
extends:
|
||||
file: docker-compose-base.yml
|
||||
service: fetcher_redis
|
||||
service: fetcher_redis_cache
|
||||
ports:
|
||||
- 6379:6379
|
||||
- 6379
|
||||
|
||||
fetcher_redis_celery:
|
||||
extends:
|
||||
file: docker-compose-base.yml
|
||||
service: fetcher_redis_celery
|
||||
ports:
|
||||
- 6379
|
||||
|
||||
fetcher_flower:
|
||||
extends:
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
"import json\n",
|
||||
"import csv\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"headers = {\"User-Agent\": \"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/51.0.2704.103 Safari/537.36\"}"
|
||||
]
|
||||
},
|
||||
@@ -329,13 +328,22 @@
|
||||
" main()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": []
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"df = pd.read_csv(\"scholenopdekaart.csv\", index_col=0)\n",
|
||||
"import pandas as pd\n",
|
||||
"\n",
|
||||
"df = pd.read_csv(\"~/Downloads/scholenopdekaart.csv\", index_col=0)\n",
|
||||
"\n",
|
||||
"df.head()"
|
||||
]
|
||||
@@ -346,13 +354,101 @@
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"df.tail()"
|
||||
"def to_dict(row):\n",
|
||||
" # Empty?\n",
|
||||
" if (pd.isna(row)):\n",
|
||||
" return {}\n",
|
||||
" # Evaluate, to dict\n",
|
||||
" dict_data = dict(eval(row))\n",
|
||||
" # Remove None values\n",
|
||||
" for k in list(dict_data.keys()):\n",
|
||||
" if dict_data[k] is None:\n",
|
||||
" del dict_data[k]\n",
|
||||
" # Prefix\n",
|
||||
" return {f\"{column}_{k}\": v for k, v in dict_data.items()}\n",
|
||||
"\n",
|
||||
"for column in [\"students_per_year_trend\", \"num_students_per_group\", \"num_students_per_age\"]:\n",
|
||||
" print(column)\n",
|
||||
" # Convert the list of tuples into a dictionary per row\n",
|
||||
" df_dicts = df[column].apply(to_dict)\n",
|
||||
" # Expand into separate columns\n",
|
||||
" df_expanded = pd.json_normalize(df_dicts)\n",
|
||||
" # Sort\n",
|
||||
" df_expanded = df_expanded[sorted(df_expanded.columns)]\n",
|
||||
" # Combine with original columns\n",
|
||||
" df = pd.concat([df.drop(columns=[column]), df_expanded], axis=1)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"def to_dict(row):\n",
|
||||
" # Empty?\n",
|
||||
" if (pd.isna(row)):\n",
|
||||
" return {}\n",
|
||||
" # Evaluate, to dict\n",
|
||||
" data = eval(row)\n",
|
||||
" # Remove first useless data\n",
|
||||
" data = data[1:]\n",
|
||||
"\n",
|
||||
" # Generate dict\n",
|
||||
" dict_data = {}\n",
|
||||
" for (zipcode, num, percentage) in data:\n",
|
||||
" dict_data[f\"num_students_zipcode_{zipcode}\"] = num\n",
|
||||
" dict_data[f\"percentage_students_zipcode_{zipcode}\"] = percentage\n",
|
||||
"\n",
|
||||
" # Remove None values\n",
|
||||
" for k in list(dict_data.keys()):\n",
|
||||
" if dict_data[k] is None:\n",
|
||||
" del dict_data[k]\n",
|
||||
" return dict_data\n",
|
||||
"\n",
|
||||
"for column in [\"students_per_zipcode\"]:\n",
|
||||
" print(column)\n",
|
||||
" # Convert the list of tuples into a dictionary per row\n",
|
||||
" df_dicts = df[column].apply(to_dict)\n",
|
||||
" # Expand into separate columns\n",
|
||||
" df_expanded = pd.json_normalize(df_dicts)\n",
|
||||
" # Sort\n",
|
||||
" df_expanded = df_expanded[sorted(df_expanded.columns)]\n",
|
||||
" # Combine with original columns\n",
|
||||
" df = pd.concat([df.drop(columns=[column]), df_expanded], axis=1)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"df.to_csv(\"schools_nl.csv\")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"df.head()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"list(df.columns)"
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": "matitos_urls",
|
||||
"display_name": "fetcher",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
@@ -366,7 +462,7 @@
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.12.9"
|
||||
"version": "3.12.11"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
|
||||
Reference in New Issue
Block a user