Working fetch feeds and parser, process raw and error urls

This commit is contained in:
Luciano Gervasoni
2025-03-18 14:49:12 +01:00
parent 7d7bce1e72
commit fb4b30f05e
26 changed files with 270 additions and 364 deletions

View File

@@ -72,7 +72,10 @@ python manage.py runserver
# Worker
python manage.py rqworker default
while true; do python manage.py rqworker default --burst; sleep 5; done
while true; do python manage.py rqworker default --burst -v 0; sleep 5; done
# Visualize DB
http://localhost:8080/?pgsql=matitos_db&username=supermatitos&db=matitos&ns=public&select=urls&order%5B0%5D=id
```
* Utils

View File

@@ -106,12 +106,3 @@ class WebsiteOfInterest(models.Model):
class Meta:
managed = False
db_table = 'website_of_interest'
class WebsiteToFilter(models.Model):
id = models.SmallAutoField(primary_key=True)
url_host = models.TextField(unique=True)
class Meta:
managed = False
db_table = 'website_to_filter'

View File

@@ -1,11 +1,10 @@
from ..models import Urls, UrlContent, UrlsSource, Source, WebsiteToFilter, StatusPatternMatching
from ..models import Urls, UrlContent, UrlsSource, UrlsDuplicate, Source, StatusPatternMatching
from .url_processor import process_url
from django.utils import timezone
from django.core.cache import cache
from django.db import IntegrityError
import hashlib
from datetime import timedelta
import re
import time
import traceback
from .logger import get_logger
logger = get_logger()
@@ -13,17 +12,29 @@ logger = get_logger()
class DB_Handler():
def __init__(self):
logger.debug("Initializing URL DB Handler")
# Inserting raw URL, cache time: 1 day
self._cache_timeout_insert_url = 86400
# Processing error URL, cache time: 2 days
self._cache_timeout_error_url = 86400*2
# URL host slowdown
self.url_host_slowdown_seconds = 5
def _get_safe_cache_key(self, raw_key):
"""Generate a safe cache key using an MD5 hash"""
return hashlib.md5(raw_key.encode()).hexdigest()
def _cache_key(self, cache_key, cache_timeout=86400):
cache.set(self._get_safe_cache_key(cache_key), True, timeout=cache_timeout)
def _cache_key(self, cache_key, hash_encode, cache_timeout):
if (hash_encode):
cache.set(self._get_safe_cache_key(cache_key), True, timeout=cache_timeout)
else:
cache.set(cache_key, True, timeout=cache_timeout)
def _is_cached_key(self, cache_key):
def _is_cached_key(self, cache_key, hash_encoded):
# Returns True if cached
return cache.get(self._get_safe_cache_key(cache_key)) is not None
if (hash_encoded):
return cache.get(self._get_safe_cache_key(cache_key)) is not None
else:
return cache.get(cache_key) is not None
def insert_raw_urls(self, urls, source):
@@ -53,10 +64,10 @@ class DB_Handler():
for url in urls_clean:
### Already processed URL?
if (self._is_cached_key(url)):
if (self._is_cached_key(url, hash_encoded=True)):
logger.debug("Already cached URL: {}".format(url))
if (self._is_cached_key("{}{}".format(source, url))):
if (self._is_cached_key("{}{}".format(source, url), hash_encoded=True)):
logger.debug("Already cached (source, URL): {} {}".format(source, url))
else:
### Insert (URL_id, source_id), since not cached
@@ -92,139 +103,189 @@ class DB_Handler():
# Insert or update cache
for url in urls_clean:
self._cache_key(url)
self._cache_key("{}{}".format(source, url))
# Hash encode URLs for special characters
self._cache_key(url, hash_encode=True, cache_timeout=self._cache_timeout_insert_url)
self._cache_key("{}{}".format(source, url), hash_encode=True, cache_timeout=self._cache_timeout_insert_url)
logger.info("Inserted #{} raw URLs".format(len(urls_to_insert)))
except Exception as e:
logger.warning("Exception inserting raw URLs: {}\n{}".format(e, traceback.format_exc()))
def _get_status_pattern_matching(self, url, article_status, list_pattern_status_tuple):
# Sort pattern tuples by priority. (pattern, priority, status)
list_pattern_status_tuple.sort(key=lambda tup: tup[1], reverse=True)
def _get_url_host(self, url):
# URL no protocol, first substring before '/'
url_host = url.replace("https://", "").replace("http://", "").split("/")[0]
return url_host
def _url_host_slowdown(self, url, url_host_slowdown_seconds):
### Avoid (frequent) too many requests to the same URL host
# Get URL host
url_host = self._get_url_host(url)
# Recently processed URL host? -> Slow down required
last_cached_timestamp = cache.get("processed_{}".format(url_host), None)
if last_cached_timestamp:
# Get time since last processed URL host (in seconds)
time_since_last_processed = time.time() - last_cached_timestamp
# Amount of time required to sleep?
slowdown_required = max(0, url_host_slowdown_seconds - time_since_last_processed)
logger.debug("Slow down (sleeping {:.2f}) for URL host {}".format(slowdown_required, url_host))
# Sleep
time.sleep(slowdown_required)
# About to process URL host, cache time
cache.set("processed_{}".format(url_host), time.time(), timeout=60*5) # Expire after 5 minutes
# Regex pattern to update status on "valid", "invalid", and "unknown" status only
# Status "raw", "duplicated" and "error" should remain the way they are
# Assumption: List of patterns sorted by importance
if (article_status in ["valid", "invalid", "unknown"]):
# Regular expression pattern matching: https://regexr.com/
for regex_pattern, regex_priority, status_if_match in list_pattern_status_tuple:
# Matching? Update article status
if bool(re.match(regex_pattern, url)):
if (status_if_match != article_status):
logger.debug("Regex pattern found, updating status from '{}' to '{}' for URL: {}".format(article_status, status_if_match, url))
def _process_single_url(self, obj_url, status_pattern_match, raise_exception_on_error):
##### Filter URL? -> Invalid
if (status_pattern_match == "invalid"):
logger.debug("Domain filter applied to input URL: {}".format(obj_url.url))
# Update status
obj_url.status = Urls.STATUS_ENUM.INVALID
obj_url.save()
# updating_urls.append(obj_url)
# Next URL
return
##### Process URL
try:
# Slow down if required to avoid too many requests error
self._url_host_slowdown(obj_url.url, self.url_host_slowdown_seconds)
# Get data
dict_url_data = process_url(obj_url.url)
# Not none or handle as exception
assert(dict_url_data is not None)
except Exception as e:
if (raise_exception_on_error):
# Simply raise exception
raise Exception("Error processing URL")
else:
# Set status to error
logger.debug("Error processing URL: {}\n{}\n".format(obj_url.url, str(e), traceback.format_exc()))
# Update status
obj_url.status = Urls.STATUS_ENUM.ERROR
obj_url.save()
# updating_urls.append(obj_url)
# Next URL
return
##### Canonical URL different? -> Duplicate
if (dict_url_data.get("url") != dict_url_data.get("url_canonical")):
# Update status
obj_url.status = Urls.STATUS_ENUM.DUPLICATE
obj_url.save()
# updating_urls.append(obj_url)
# Get or create URL with canonical form
obj_url_canonical, created = Urls.objects.get_or_create(url=dict_url_data.get("url_canonical"))
# Get the sources id associated to obj_url.id
url_sources = UrlsSource.objects.filter(id_url=obj_url)
for url_source_obj in url_sources:
# Associate same sources to url_canonical (it might already exist)
obj_urls_source, created = UrlsSource.objects.get_or_create(id_source=url_source_obj.id_source, id_url=obj_url_canonical)
# URLs duplciate association
obj_urls_duplicate, created = UrlsDuplicate.objects.get_or_create(id_url_canonical=obj_url_canonical, id_url_duplicated=obj_url)
# Next URL
return
##### Valid URL
# Update status
obj_url.status = Urls.STATUS_ENUM.VALID
obj_url.save()
# updating_urls.append(obj_url)
# Create or update extracted URL data
UrlContent.objects.update_or_create(
id_url=obj_url,
defaults = {
"date_published" : dict_url_data.get("publish_date"),
"title" : dict_url_data.get("title"),
"description" : dict_url_data.get("description"),
"content" : dict_url_data.get("content"),
"valid_content" : dict_url_data.get("valid_content"),
"language" : dict_url_data.get("language"),
"keywords" : dict_url_data.get("keywords"),
"tags" : dict_url_data.get("tags"),
"authors" : dict_url_data.get("authors"),
"image_main_url" : dict_url_data.get("image_main_url"),
"images_url" : dict_url_data.get("images_url"),
"videos_url" : dict_url_data.get("videos_url"),
"url_host" : dict_url_data.get("url_host"),
"site_name" : dict_url_data.get("site_name"),
}
)
def process_raw_urls(self, batch_size):
def _get_status_pattern_matching(url, list_pattern_status_tuple):
""" Be careful: Regex pattern should update status on "valid", "invalid", and "unknown" status only
"""
# Sort pattern tuples by priority. (pattern, priority, status)
for regex_pattern, regex_priority, status_if_match in sorted(list_pattern_status_tuple, key=lambda tup: tup[1], reverse=True):
# Regular expression pattern matching: https://regexr.com/
if bool(re.match(regex_pattern, obj_url.url)):
logger.debug("Regex pattern found, status '{}' for URL: {}".format(status_if_match, url))
return status_if_match
# Pattern matching not required or not found, original article status
return article_status
return None
def process_error_urls(self, batch_size=50):
# Get batch of URLs, status='error'
#error_urls = Urls.objects.SORTBY TS_FETCH....filter(status=Urls.STATUS_ENUM.RAW, ts_fetch__gte=time_delta_ts)[:batch_size]
pass
def process_raw_urls(self, time_delta=timedelta(days=1), batch_size=50):
try:
logger.debug("Processing raw URLs")
# Get list of domains to filter
list_domains_to_filter = WebsiteToFilter.objects.values_list('url_host', flat=True)
# Get batch of URLs, status='raw'
raw_urls = Urls.objects.order_by("-ts_fetch").filter(status=Urls.STATUS_ENUM.RAW)[:batch_size]
if (len(raw_urls) == 0):
logger.debug("No raw URLs to process")
return
# Get list of (pattern, priority, status) tuples to override status if required
list_pattern_status_tuple = list(StatusPatternMatching.objects.values_list("pattern", "priority", "status"))
# Fetched during last 24 hours
time_delta_ts = timezone.now() - time_delta
# Get batch of URLs, status='raw' and fetched X days ago
raw_urls = Urls.objects.filter(status=Urls.STATUS_ENUM.RAW, ts_fetch__gte=time_delta_ts)[:batch_size]
# List of objects to bulk update
updating_urls = []
# updating_urls = []
# Per URL
for obj_url in raw_urls:
##### Any domain to filter included in URL? -> Invalid
if (any([d in obj_url.url for d in list_domains_to_filter])):
logger.debug("Domain filter applied to input URL: {}".format(obj_url.url))
# Update status
obj_url.status = Urls.STATUS_ENUM.INVALID
obj_url.save()
updating_urls.append(obj_url)
# Next URL
continue
##### Process URL
try:
# Get data
dict_url_data = process_url(obj_url.url)
# Not none or handle as exception
assert(dict_url_data is not None)
except Exception as e:
logger.debug("Error processing URL: {}\n{}\n".format(obj_url.url, str(e), traceback.format_exc()))
# Update status
obj_url.status = Urls.STATUS_ENUM.ERROR
obj_url.save()
updating_urls.append(obj_url)
# Next URL
continue
# Override status if pattern matching?
status_pattern_match = _get_status_pattern_matching(obj_url.url, list_pattern_status_tuple)
# Process URL
self._process_single_url(obj_url, status_pattern_match, raise_exception_on_error=False)
##### Canonical URL different? -> Duplicate
if (dict_url_data.get("url") != dict_url_data.get("url_canonical")):
# Update status
obj_url.status = Urls.STATUS_ENUM.DUPLICATE
obj_url.save()
updating_urls.append(obj_url)
# Get or create URL with canonical form
obj_url_canonical, created = Urls.objects.get_or_create(url=dict_url_data.get("url_canonical"))
# Get the sources id associated to obj_url.id
url_sources = UrlsSource.objects.filter(id_url=obj_url)
for url_source_obj in url_sources:
# Associate same sources to url_canonical (it might already exist)
UrlsSource.objects.get_or_create(id_source=url_source_obj.id_source, id_url=obj_url_canonical)
# Next URL
continue
##### Valid URL
# Update status
obj_url.status = Urls.STATUS_ENUM.VALID
obj_url.save()
updating_urls.append(obj_url)
# Create extracted URL data
UrlContent.objects.create(
id_url=obj_url,
date_published=dict_url_data.get("publish_date"),
title=dict_url_data.get("title"),
description=dict_url_data.get("description"),
content=dict_url_data.get("content"),
valid_content=dict_url_data.get("valid_content"),
language=dict_url_data.get("language"),
keywords=dict_url_data.get("keywords"),
tags=dict_url_data.get("tags"),
authors=dict_url_data.get("authors"),
image_main_url=dict_url_data.get("image_main_url"),
images_url=dict_url_data.get("images_url"),
videos_url=dict_url_data.get("videos_url"),
url_host=dict_url_data.get("url_host"),
site_name=dict_url_data.get("site_name"),
)
##### Override status if pattern matching?
for obj_url in updating_urls:
# Check if article status needs to be updated with pattern matching
status_pattern_matching = self._get_status_pattern_matching(obj_url.url, obj_url.status, list_pattern_status_tuple)
# Update status?
if (status_pattern_matching != obj_url.status):
logger.debug("Pattern matching, overriding with status {} for URL: {}".format(status_pattern_matching, obj_url.url))
# Update, no need to append to updating_urls, already included
obj_url.status = status_pattern_matching
obj_url.save()
# TODO: Fix enum type issue. Bulk update
# TODO: Fix enum type issue. Bulk update instead of .save() for each object
# Urls.objects.bulk_update(updating_urls, ['status'])
logger.info("Updated #{} raw URLs".format(len(updating_urls)))
logger.info("Updated #{} raw URLs".format(len(raw_urls)))
except Exception as e:
logger.warning("Exception processing raw URLs: {}\n{}".format(e, traceback.format_exc()))
def process_error_urls(self, batch_size):
try:
logger.debug("Processing error URLs")
# Keep track of processed and skipped "error" URLs
num_urls_skipped, num_urls_processed = 0, 0
# Get batch of URLs, status='error'
error_urls = Urls.objects.order_by("-ts_fetch").filter(status=Urls.STATUS_ENUM.ERROR)[num_urls_skipped:batch_size+num_urls_skipped]
while ((len(error_urls) > 0) and (num_urls_processed < batch_size)):
# Per URL
for obj_url in error_urls:
# URL ID cached? -> Tried to process recently already, skip
if (self._is_cached_key("error_{}".format(obj_url.id), hash_encoded=False)):
num_urls_skipped += 1
continue
try:
# Process URL
self._process_single_url(obj_url, status_pattern_match=None, raise_exception_on_error=True)
num_urls_processed += 1
except Exception as e:
# Error, cache to avoid re-processing for X time
self._cache_key("error_{}".format(obj_url.id), hash_encode=False, cache_timeout=self._cache_timeout_error_url)
num_urls_skipped += 1
# Get following batch of URLs, status='error'
error_urls = Urls.objects.order_by("-ts_fetch").filter(status=Urls.STATUS_ENUM.ERROR)[num_urls_skipped:batch_size+num_urls_skipped]
logger.info("Updated #{}, skipped #{} error URLs".format(num_urls_processed, num_urls_skipped))
except Exception as e:
logger.warning("Exception processing error URLs: {}\n{}".format(e, traceback.format_exc()))

View File

@@ -8,15 +8,15 @@ logger = get_logger()
class FetchFeeds():
def __init__(self) -> None:
logger.debug("Initializing News feed")
logger.debug("Initializing Fetcher Feeds")
def run(self):
try:
logger.debug("Starting NewsFeed.run()")
logger.debug("Starting FetchFeeds.run()")
# Get feeds
list_url_feeds = list(Feed.objects.values_list('rss_feed', flat=True))
logger.debug("Fetching news from feeds: {}".format(list_url_feeds))
logger.debug("Fetching from feeds: {}".format(list_url_feeds))
# Process via RSS feeds
for url_feed in list_url_feeds:
@@ -47,4 +47,4 @@ class FetchFeeds():
# Write to DB
DB_Handler().insert_raw_urls(urls_fetched, source)
except Exception as e:
logger.warning("Exception in NewsFeed.run(): {}\n{}".format(e, traceback.format_exc()))
logger.warning("Exception in FetchFeeds.run(): {}\n{}".format(e, traceback.format_exc()))

View File

@@ -0,0 +1,39 @@
from .db_utils import DB_Handler
from ..models import WebsiteOfInterest
import newspaper
import traceback
from .logger import get_logger
logger = get_logger()
class FetchParser():
def __init__(self) -> None:
logger.debug("Initializing Fetcher Parser")
def run(self):
try:
logger.debug("Starting FetchParser.run() for {}")
# Get URL hosts
list_url_host = list(WebsiteOfInterest.objects.values_list('url_host', flat=True))
logger.debug("Fetching news by parsing URL hosts: {}".format(list_url_host))
# Process newspaper4k build method
for url_host_feed in list_url_host:
# Protocol
if not (url_host_feed.startswith("http")):
url_host_feed_formatted = "https://" + url_host_feed
else:
url_host_feed_formatted = url_host_feed
logger.debug("Fetching newspaper4k parsing based on URL: {}".format(url_host_feed_formatted))
# Source object
url_host_built = newspaper.build(url_host_feed_formatted)
# Get articles URL list
urls_fetched = url_host_built.article_urls()
# URL fetching source
source = "newspaper4k {}".format(url_host_feed)
# Write to DB
DB_Handler().insert_raw_urls(urls_fetched, source)
except Exception as e:
logger.warning("Exception in FetchParser.run(): {}\n{}".format(e, traceback.format_exc()))

View File

@@ -1,3 +1,4 @@
from django.core.cache import cache
from .logger import get_logger
logger = get_logger()
import newspaper
@@ -6,6 +7,7 @@ from urllib.parse import unquote
#import langdetect
#langdetect.DetectorFactory.seed = 0
def process_url(url):
try:
# Process

View File

@@ -1,6 +1,7 @@
from django_rq import job
from .src.fetch_feed import FetchFeeds
from .src.fetch_parser import FetchParser
from .src.db_utils import DB_Handler
'''
from src.fetch_parser import FetchParser
@@ -8,16 +9,13 @@ from src.fetch_search import FetchSearcher
from src.missing_kids_fetch import MissingKidsFetch
from src.missing_kids_status import MissingKidsStatus
from src.url_status import UpdateErrorURLs
from src.db_utils import DB_Handler
from src.credentials import db_connect_info, redis_connect_info
# DB Handler
db_handler = DB_Handler(db_connect_info, redis_connect_info)
'''
from .src.logger import get_logger
logger = get_logger()
# TODO: Queues with priorities, process_raw_urls least priority due to slowdown...
@job
def background_task(process_type: str):
logger.info("Task triggered: {}".format(process_type))
@@ -25,18 +23,17 @@ def background_task(process_type: str):
try:
if (process_type == "fetch_feeds"):
FetchFeeds().run()
elif (process_type == "fetch_parser"):
FetchParser().run()
elif (process_type == "process_raw_urls"):
DB_Handler().process_raw_urls(batch_size=3)
DB_Handler().process_raw_urls(batch_size=50)
elif (process_type == "process_error_urls"):
DB_Handler().process_error_urls(batch_size=50)
else:
logger.info("Task unknown!: {}".format(process_type))
'''
if (process_type == "fetch_feeds"):
FetchFeeds(db_handler).run()
elif (process_type == "fetch_parser"):
FetchParser(db_handler).run()
elif (process_type == "search") or (process_type == "search_full"):
FetchSearcher(cred.db_connect_info, cred.redis_connect_info, full=True).run()

View File

@@ -1,6 +1,7 @@
from django.urls import path
from .views import trigger_task
from .views import trigger_task, link_list
urlpatterns = [
path('links', link_list, name='link_list'),
path('<str:task>', trigger_task, name='trigger_task'),
]

View File

@@ -1,6 +1,7 @@
import django_rq
from django.http import JsonResponse
from .tasks import background_task
import os
from .src.logger import get_logger
logger = get_logger()
@@ -9,3 +10,8 @@ def trigger_task(request, task):
queue = django_rq.get_queue('default') # Get the default queue
job = queue.enqueue(background_task, task)
return JsonResponse({"message": "Task has been enqueued!", "job_id": job.id})
def link_list(request):
prefix = "http://localhost:8000/api"
links = ["fetch_feeds", "fetch_parser", "process_raw_urls", "process_error_urls"]
return JsonResponse({"links": ["http://localhost:8080/?pgsql=matitos_db&username=supermatitos&db=matitos&ns=public&select=urls&order%5B0%5D=id"] + [os.path.join(prefix, l) for l in links]})