Refactoring fetcher, working feeds and raw url writer
This commit is contained in:
190
app_urls/api/src/db_utils.py
Normal file
190
app_urls/api/src/db_utils.py
Normal file
@@ -0,0 +1,190 @@
|
||||
from ..models import Urls, UrlContent, UrlsSource, Source, WebsiteToFilter, StatusPatternMatching
|
||||
from .url_processor import process_url
|
||||
from django.utils import timezone
|
||||
from django.core.cache import cache
|
||||
import hashlib
|
||||
from datetime import timedelta
|
||||
import re
|
||||
import traceback
|
||||
from .logger import get_logger
|
||||
logger = get_logger()
|
||||
|
||||
class DB_Handler():
|
||||
def __init__(self):
|
||||
logger.debug("Initializing URL DB Handler")
|
||||
|
||||
def _get_safe_cache_key(self, raw_key):
|
||||
"""Generate a safe cache key using an MD5 hash"""
|
||||
return hashlib.md5(raw_key.encode()).hexdigest()
|
||||
|
||||
def _cache_key(self, cache_key, cache_timeout=86400):
|
||||
cache.set(self._get_safe_cache_key(cache_key), True, timeout=cache_timeout)
|
||||
|
||||
def _is_cached_key(self, cache_key):
|
||||
# Returns True if cached
|
||||
return cache.get(self._get_safe_cache_key(cache_key)) is not None
|
||||
|
||||
def insert_raw_urls(self, urls, source):
|
||||
try:
|
||||
logger.debug("Inserting raw URLs")
|
||||
# Empty?
|
||||
if (len(urls) == 0):
|
||||
logger.debug("Empty batch of urls (not writing to DB) for source: {}".format(source))
|
||||
return
|
||||
|
||||
url_object_to_insert = []
|
||||
# Per URL
|
||||
for url in urls:
|
||||
### Already processed URL?
|
||||
if (self._is_cached_key(url)):
|
||||
logger.debug("Already cached URL: {}".format(url))
|
||||
|
||||
if (self._is_cached_key("{}{}".format(source, url))):
|
||||
logger.debug("Already cached (source, URL): {} {}".format(source, url))
|
||||
else:
|
||||
### Insert source
|
||||
# Get the source (create if not exists)
|
||||
source_obj, created = Source.objects.get_or_create(source=source)
|
||||
# Get URL ID
|
||||
url_obj = Urls.objects.get(url=url)
|
||||
# Create (id_source, id_url)
|
||||
UrlsSource.objects.create(id_source=source_obj.id, id_url=url_obj.id)
|
||||
else:
|
||||
# Add object to insert
|
||||
url_object_to_insert.append(Urls(url=url))
|
||||
|
||||
### Bulk insert URLs, ignore conflicts if a url exists
|
||||
bulk_created_obj = Urls.objects.bulk_create(url_object_to_insert, ignore_conflicts=True)
|
||||
# Insert or update cache
|
||||
for url in urls:
|
||||
self._cache_key(url)
|
||||
self._cache_key("{}{}".format(source, url))
|
||||
|
||||
logger.info("Inserted #{} raw URLs".format(len(url_object_to_insert)))
|
||||
|
||||
except Exception as e:
|
||||
logger.warning("Exception inserting raw URLs: {}\n{}".format(e, traceback.format_exc()))
|
||||
|
||||
def _get_status_pattern_matching(self, url, article_status, list_pattern_status_tuple):
|
||||
# Sort pattern tuples by priority. (pattern, priority, status)
|
||||
list_pattern_status_tuple.sort(key=lambda tup: tup[1], reverse=True)
|
||||
|
||||
# Regex pattern to update status on "valid", "invalid", and "unknown" status only
|
||||
# Status "raw", "duplicated" and "error" should remain the way they are
|
||||
# Assumption: List of patterns sorted by importance
|
||||
if (article_status in ["valid", "invalid", "unknown"]):
|
||||
# Regular expression pattern matching: https://regexr.com/
|
||||
for regex_pattern, regex_priority, status_if_match in list_pattern_status_tuple:
|
||||
# Matching? Update article status
|
||||
if bool(re.match(regex_pattern, url)):
|
||||
if (status_if_match != article_status):
|
||||
logger.debug("Regex pattern found, updating status from '{}' to '{}' for URL: {}".format(article_status, status_if_match, url))
|
||||
return status_if_match
|
||||
# Pattern matching not required or not found, original article status
|
||||
return article_status
|
||||
|
||||
def process_raw_urls(self, time_delta=timedelta(days=1), batch_size=50):
|
||||
try:
|
||||
logger.debug("Processing raw URLs")
|
||||
|
||||
# Get list of domains to filter
|
||||
list_domains_to_filter = WebsiteToFilter.objects.values_list('url_host', flat=True)
|
||||
# Get list of (pattern, priority, status) tuples to override status if required
|
||||
list_pattern_status_tuple = list(StatusPatternMatching.objects.values_list("pattern", "priority", "status"))
|
||||
|
||||
# Fetched during last 24 hours
|
||||
time_delta_ts = timezone.now() - time_delta
|
||||
# Get batch of URLs, status='raw' and fetched X days ago
|
||||
raw_urls = Urls.objects.filter(status='raw', ts_fetch__gte=time_delta_ts)[:batch_size]
|
||||
# List of objects to bulk update
|
||||
updating_urls = []
|
||||
|
||||
# Per URL
|
||||
for obj_url in raw_urls:
|
||||
|
||||
##### Any domain to filter included in URL? -> Invalid
|
||||
if (any([d in obj_url.url for d in list_domains_to_filter])):
|
||||
logger.debug("Domain filter applied to input URL: {}".format(obj_url.url))
|
||||
# Update status
|
||||
obj_url.status = 'invalid'
|
||||
# Append to bulk update
|
||||
updating_urls.append(obj_url)
|
||||
# Next URL
|
||||
continue
|
||||
|
||||
##### Process URL
|
||||
try:
|
||||
# Get data
|
||||
dict_url_data = process_url(obj_url.url)
|
||||
# Not none or handle as exception
|
||||
assert(dict_url_data is not None)
|
||||
except Exception as e:
|
||||
logger.debug("Error processing URL: {}\n{}".format(obj_url.url, str(e)))
|
||||
# Update status
|
||||
obj_url.status = 'error'
|
||||
# Append to bulk update
|
||||
updating_urls.append(obj_url)
|
||||
# Next URL
|
||||
continue
|
||||
|
||||
##### Canonical URL different? -> Duplicate
|
||||
if (dict_url_data.get("url") != dict_url_data.get("url_canonical")):
|
||||
# Update status
|
||||
obj_url.status = 'duplicate'
|
||||
# Append to bulk update
|
||||
updating_urls.append(obj_url)
|
||||
|
||||
# Get or create URL with canonical form
|
||||
obj_url_canonical = Urls.objects.get_or_create(url=dict_url_data.get("url_canonical"))
|
||||
# Associate same sources to url -> url_canonical
|
||||
|
||||
# Get the sources id associated to obj_url.id
|
||||
url_sources = UrlsSource.objects.filter(id_url=obj_url.id)
|
||||
for url_source_obj in url_sources:
|
||||
# Associate same sources to url_canonical (it might already exist)
|
||||
UrlsSource.objects.get_or_create(id_source=url_source_obj.id_source, id_url=obj_url_canonical.id)
|
||||
# Next URL
|
||||
continue
|
||||
|
||||
##### Valid URL
|
||||
# Update status
|
||||
obj_url.status = 'valid'
|
||||
# Append to bulk update
|
||||
updating_urls.append(obj_url)
|
||||
# Create extracted URL data
|
||||
UrlContent.objects.create_or_update(
|
||||
id_url=obj_url.id,
|
||||
date_published=dict_url_data.get("publish_date"),
|
||||
title=dict_url_data.get("title"),
|
||||
description=dict_url_data.get("description"),
|
||||
content=dict_url_data.get("content"),
|
||||
valid_content=dict_url_data.get("valid_content"),
|
||||
language=dict_url_data.get("language"),
|
||||
keywords=dict_url_data.get("keywords"),
|
||||
tags=dict_url_data.get("tags"),
|
||||
authors=dict_url_data.get("authors"),
|
||||
image_main=dict_url_data.get("image_main"),
|
||||
images_url=dict_url_data.get("images_url"),
|
||||
videos_url=dict_url_data.get("videos_url"),
|
||||
url_host=dict_url_data.get("url_host"),
|
||||
site_name=dict_url_data.get("site_name"),
|
||||
)
|
||||
|
||||
|
||||
##### Override status if pattern matching?
|
||||
for obj_url in updating_urls:
|
||||
# Check if article status needs to be updated with pattern matching
|
||||
status_pattern_matching = self._get_status_pattern_matching(obj_url.url, obj_url.status, list_pattern_status_tuple)
|
||||
# Update status?
|
||||
if (status_pattern_matching != obj_url.status):
|
||||
logger.debug("Pattern matching, overriding with status {} for URL: {}".format(status_pattern_matching, obj_url.url))
|
||||
# Update, no need to append to updating_urls, already included
|
||||
obj_url.status = status_pattern_matching
|
||||
|
||||
# Bulk update
|
||||
Urls.objects.bulk_update(updating_urls, ['status'])
|
||||
|
||||
logger.debug("Finished processing raw URLs")
|
||||
except Exception as e:
|
||||
logger.warning("Exception processing raw URLs: {}\n{}".format(e, traceback.format_exc()))
|
||||
|
||||
50
app_urls/api/src/fetch_feed.py
Normal file
50
app_urls/api/src/fetch_feed.py
Normal file
@@ -0,0 +1,50 @@
|
||||
from .db_utils import DB_Handler
|
||||
from ..models import Feed
|
||||
import feedparser
|
||||
import dateutil
|
||||
import traceback
|
||||
from .logger import get_logger
|
||||
logger = get_logger()
|
||||
|
||||
class FetchFeeds():
|
||||
def __init__(self) -> None:
|
||||
logger.debug("Initializing News feed")
|
||||
|
||||
def run(self):
|
||||
try:
|
||||
logger.debug("Starting NewsFeed.run()")
|
||||
|
||||
# Get feeds
|
||||
list_url_feeds = list(Feed.objects.values_list('rss_feed', flat=True))
|
||||
logger.debug("Fetching news from feeds: {}".format(list_url_feeds))
|
||||
|
||||
# Process via RSS feeds
|
||||
for url_feed in list_url_feeds:
|
||||
# Initialize
|
||||
urls_fetched, urls_publish_date = [], []
|
||||
# Fetch feeds
|
||||
feeds = feedparser.parse(url_feed)
|
||||
# Parse
|
||||
for f in feeds.get("entries", []):
|
||||
# Get URL
|
||||
url = f.get("link", None)
|
||||
# Process?
|
||||
if (url is not None):
|
||||
# Available publish date?
|
||||
publish_date_parsed = f.get("published_parsed")
|
||||
if (publish_date_parsed is None):
|
||||
publish_date = f.get("published", None)
|
||||
if (publish_date is not None):
|
||||
publish_date_parsed = dateutil.parser.parse(publish_date)
|
||||
|
||||
# Published date
|
||||
urls_publish_date.append(publish_date_parsed)
|
||||
# URL
|
||||
urls_fetched.append(url)
|
||||
|
||||
# URL fetching source
|
||||
source = "feed {}".format(url_feed)
|
||||
# Write to DB
|
||||
DB_Handler().insert_raw_urls(urls_fetched, source)
|
||||
except Exception as e:
|
||||
logger.warning("Exception in NewsFeed.run(): {}\n{}".format(e, traceback.format_exc()))
|
||||
22
app_urls/api/src/logger.py
Normal file
22
app_urls/api/src/logger.py
Normal file
@@ -0,0 +1,22 @@
|
||||
import logging
|
||||
|
||||
import os
|
||||
os.makedirs("logs", exist_ok=True)
|
||||
|
||||
logging.basicConfig(format='%(filename)s | %(levelname)s | %(asctime)s | %(message)s')
|
||||
logger = logging.getLogger("news_fetcher")
|
||||
logger.setLevel(logging.DEBUG)
|
||||
|
||||
# To file log: INFO / WARNING / ERROR
|
||||
fh = logging.handlers.RotatingFileHandler(filename="logs/log_app_fetcher.log", mode="a", maxBytes=10000000, backupCount=4)
|
||||
fh.setFormatter(logging.Formatter('%(levelname)s | %(asctime)s | %(message)s'))
|
||||
logger.addHandler(fh)
|
||||
|
||||
# To file log: WARNING / ERROR
|
||||
fh_ = logging.handlers.RotatingFileHandler(filename="logs/log_app_fetcher_error.log", mode="a", maxBytes=10000000, backupCount=1)
|
||||
fh_.setFormatter(logging.Formatter('%(levelname)s | %(asctime)s | %(message)s'))
|
||||
fh_.setLevel(logging.WARNING)
|
||||
logger.addHandler(fh_)
|
||||
|
||||
def get_logger():
|
||||
return logger
|
||||
60
app_urls/api/src/url_processor.py
Normal file
60
app_urls/api/src/url_processor.py
Normal file
@@ -0,0 +1,60 @@
|
||||
from .logger import get_logger
|
||||
logger = get_logger()
|
||||
import newspaper
|
||||
# pip install langdetect
|
||||
#import langdetect
|
||||
#langdetect.DetectorFactory.seed = 0
|
||||
|
||||
def process_url(url):
|
||||
try:
|
||||
# Process
|
||||
article = newspaper.article(url)
|
||||
except newspaper.ArticleException as e:
|
||||
logger.warning("ArticleException for input URL {}\n{}".format(url, str(e)))
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.warning("Exception for input URL {}\n{}".format(url, str(e)))
|
||||
return None
|
||||
|
||||
dict_data = {
|
||||
"url": url,
|
||||
"url_canonical": article.canonical_link,
|
||||
"url_host": article.source_url,
|
||||
"site_name": article.meta_site_name,
|
||||
"publish_date": article.publish_date,
|
||||
"language": article.meta_lang, # langdetect.detect(article.text)
|
||||
"title": article.title,
|
||||
"description": article.meta_description,
|
||||
"content": article.text,
|
||||
"valid_content": article.is_valid_body(),
|
||||
"keywords": [k for k in set(article.keywords + article.meta_keywords) if k!=""],
|
||||
"tags": article.tags,
|
||||
"authors": article.authors,
|
||||
"image_main": article.top_image, # article.meta_img
|
||||
"images": article.images,
|
||||
"videos": article.videos,
|
||||
}
|
||||
|
||||
'''
|
||||
# TODO: If exists, add tags article.meta_data.get("classification-tags", "").split(",")
|
||||
if (dict_data["tags"] is None):
|
||||
dict_data["tags"] = []
|
||||
for k in article.meta_data.keys():
|
||||
if ("tags" in k):
|
||||
dict_data["tags"] += article.meta_data[k].split(",")
|
||||
'''
|
||||
|
||||
# Sanity check
|
||||
for k in dict_data.keys():
|
||||
if (type(k) is list):
|
||||
# Remove empty string
|
||||
dict_data[k] = [ e for e in dict_data[k] if e != "" ]
|
||||
# NULL instead of empty list
|
||||
if (len(dict_data[k]) == 0):
|
||||
dict_data[k] = None
|
||||
else:
|
||||
# NULL instead of empty string
|
||||
if (dict_data[k] == ""):
|
||||
dict_data[k] = None
|
||||
|
||||
return dict_data
|
||||
Reference in New Issue
Block a user