Dockerization, whitenoise serving static, refactor
This commit is contained in:
273
app_urls/fetcher/src/db_utils.py
Normal file
273
app_urls/fetcher/src/db_utils.py
Normal file
@@ -0,0 +1,273 @@
|
||||
from ..models import Urls, UrlContent, UrlsSourceSearch, UrlsDuplicate, StatusPatternMatching, Source, Search
|
||||
from django.db.models import Q
|
||||
from django.core.cache import cache
|
||||
from django.db import IntegrityError
|
||||
from .url_processor import process_url, get_with_protocol
|
||||
import re
|
||||
import traceback
|
||||
from .logger import get_logger
|
||||
logger = get_logger()
|
||||
|
||||
class DB_Handler():
|
||||
def __init__(self):
|
||||
# Inserting raw URL, cache time: 1 day
|
||||
self._cache_timeout_insert_url = 86400
|
||||
# Processing error URL, cache time: 2 days
|
||||
self._cache_timeout_error_url = 86400*2
|
||||
|
||||
def insert_raw_urls(self, urls, obj_source, obj_search):
|
||||
try:
|
||||
logger.debug("Inserting raw URLs")
|
||||
# Empty?
|
||||
if (len(urls) == 0):
|
||||
logger.debug("Empty batch of urls (not writing to DB) for source-search: {} - {}".format(obj_source.source, obj_search.search))
|
||||
return
|
||||
# Default protocol https://
|
||||
urls_clean = [get_with_protocol(url) for url in urls]
|
||||
|
||||
urls_to_insert = []
|
||||
# Per URL
|
||||
for url in urls_clean:
|
||||
|
||||
### Already processed URL?
|
||||
if (cache.get("insert_{}".format(url)) is not None):
|
||||
logger.debug("Already cached URL: {}".format(url))
|
||||
|
||||
if (cache.get("insert_{}{}{}".format(url, obj_source.source, obj_search.search)) is not None):
|
||||
logger.debug("Already cached (URL, source, search): {} {} {}".format(url, obj_source.source, obj_search.search))
|
||||
else:
|
||||
### Insert (URL_id, source_id, search_id), since not cached
|
||||
# Get URL ID (should already be created)
|
||||
obj_url, created = Urls.objects.get_or_create(url=url)
|
||||
# Create (id_source, id_url) (shouldn't exist)
|
||||
UrlsSourceSearch.objects.get_or_create(id_url=obj_url, id_source=obj_source, id_search=obj_search)
|
||||
else:
|
||||
# Add object to insert
|
||||
# url_object_to_insert.append(Urls(url=url))
|
||||
urls_to_insert.append(url)
|
||||
|
||||
### Insert URLs & (URL_id, source_id)
|
||||
try:
|
||||
### Bulk insert, fails if duplicated URL (not retuning IDs when using ignore_conflicts=True)
|
||||
# URLs (ignore_conflicts=False to return IDs)
|
||||
bulk_created_urls = Urls.objects.bulk_create([Urls(url=url) for url in urls_to_insert], ignore_conflicts=False)
|
||||
# (URL_id, source_id)
|
||||
UrlsSourceSearch.objects.bulk_create([UrlsSourceSearch(id_url=obj_url, id_source=obj_source, id_search=obj_search) for obj_url in bulk_created_urls], ignore_conflicts=True)
|
||||
except IntegrityError as e:
|
||||
### Fallback to one-by-one insert
|
||||
logger.debug("bulk_create exception while inserting raw URLs (fails if duplicated URL), falling back to non-bulk method")
|
||||
# One by one
|
||||
for url in urls_to_insert:
|
||||
# URL
|
||||
obj_url, created = Urls.objects.get_or_create(url=url)
|
||||
if (created):
|
||||
logger.debug("Inserted: {}".format(obj_url.url))
|
||||
else:
|
||||
logger.debug("Not inserted: {}".format(obj_url.url))
|
||||
# (URL, source, search)
|
||||
UrlsSourceSearch.objects.get_or_create(id_url=obj_url, id_source=obj_source, id_search=obj_search)
|
||||
except Exception as e:
|
||||
logger.warning("bulk_create unknown exception while inserting raw URLs: {}\n{}".format(e, traceback.format_exc()))
|
||||
# Avoid caching due to error on insertion
|
||||
urls_clean = []
|
||||
|
||||
# Insert or update cache
|
||||
for url in urls_clean:
|
||||
cache.set("insert_{}".format(url), True, timeout=self._cache_timeout_insert_url)
|
||||
cache.set("insert_{}{}{}".format(url, obj_source.source, obj_search.search), True, timeout=self._cache_timeout_insert_url)
|
||||
|
||||
logger.info("Inserted #{} raw URLs, Source-Search {} - {}".format(len(urls_to_insert), obj_source.source, obj_search.search))
|
||||
|
||||
except Exception as e:
|
||||
logger.warning("Exception inserting raw URLs: {}\n{}".format(e, traceback.format_exc()))
|
||||
|
||||
def _process_single_url(self, obj_url, status_pattern_match, raise_exception_on_error):
|
||||
|
||||
def set_status(obj_url, status):
|
||||
# Update status if setting a new value
|
||||
if (obj_url.status != status):
|
||||
obj_url.status = status
|
||||
obj_url.save()
|
||||
|
||||
##### Filter URL? -> Invalid
|
||||
if (status_pattern_match == "invalid"):
|
||||
logger.debug("Domain filter applied to input URL: {}".format(obj_url.url))
|
||||
# Update status
|
||||
set_status(obj_url, Urls.STATUS_ENUM.INVALID)
|
||||
# Next URL
|
||||
return
|
||||
|
||||
##### Process URL
|
||||
try:
|
||||
# Get data
|
||||
dict_url_data = process_url(obj_url.url)
|
||||
except Exception as e:
|
||||
if (raise_exception_on_error):
|
||||
# Simply raise exception, handled in a different way
|
||||
raise Exception("Error processing URL, raising exception as expected")
|
||||
else:
|
||||
logger.debug("Error processing URL: {}\n{}\n{}".format(obj_url.url, str(e), traceback.format_exc()))
|
||||
# Set status to error
|
||||
dict_url_data = None
|
||||
|
||||
# (dict_url_data is None) or (Exception while processing URL) ? -> Error status
|
||||
if (dict_url_data is None):
|
||||
# Update status
|
||||
set_status(obj_url, Urls.STATUS_ENUM.ERROR)
|
||||
# Next URL
|
||||
return
|
||||
|
||||
# Invalid? e.g. binary data
|
||||
if (dict_url_data.get("override_status") == "invalid"):
|
||||
# Update status
|
||||
set_status(obj_url, Urls.STATUS_ENUM.INVALID)
|
||||
# Next URL
|
||||
return
|
||||
|
||||
##### Canonical URL different? -> Duplicate
|
||||
if (dict_url_data.get("url_canonical") is not None) and(dict_url_data.get("url") != dict_url_data.get("url_canonical")):
|
||||
# Update status
|
||||
set_status(obj_url, Urls.STATUS_ENUM.DUPLICATE)
|
||||
|
||||
# Get or create URL with canonical form
|
||||
obj_url_canonical, created = Urls.objects.get_or_create(url=dict_url_data.get("url_canonical"))
|
||||
# Get the source-search IDs associated to obj_url.id
|
||||
list_url_source_search = UrlsSourceSearch.objects.filter(id_url=obj_url)
|
||||
for obj_url_source_search in list_url_source_search:
|
||||
# Associate same sources to url_canonical (it might already exist)
|
||||
UrlsSourceSearch.objects.get_or_create(id_url=obj_url_canonical, id_source=obj_url_source_search.id_source, id_search=obj_url_source_search.id_search)
|
||||
|
||||
# URLs duplciate association
|
||||
UrlsDuplicate.objects.get_or_create(id_url_canonical=obj_url_canonical, id_url_duplicated=obj_url)
|
||||
|
||||
# TODO: return obj_url_canonical so as to directly process the recently inserted URL
|
||||
# Wherever this function is called, add:
|
||||
# self._process_single_url(obj_url_canonical, status_pattern_match, raise_exception_on_error)
|
||||
|
||||
# Next URL
|
||||
return
|
||||
|
||||
##### Valid URL
|
||||
# Update status
|
||||
set_status(obj_url, Urls.STATUS_ENUM.VALID)
|
||||
|
||||
# Create or update extracted URL data
|
||||
UrlContent.objects.update_or_create(
|
||||
id_url=obj_url,
|
||||
defaults = {
|
||||
"date_published" : dict_url_data.get("publish_date"),
|
||||
"title" : dict_url_data.get("title"),
|
||||
"description" : dict_url_data.get("description"),
|
||||
"content" : dict_url_data.get("content"),
|
||||
"valid_content" : dict_url_data.get("valid_content"),
|
||||
"language" : dict_url_data.get("language"),
|
||||
"keywords" : dict_url_data.get("keywords"),
|
||||
"tags" : dict_url_data.get("tags"),
|
||||
"authors" : dict_url_data.get("authors"),
|
||||
"image_main_url" : dict_url_data.get("image_main_url"),
|
||||
"images_url" : dict_url_data.get("images_url"),
|
||||
"videos_url" : dict_url_data.get("videos_url"),
|
||||
"url_host" : dict_url_data.get("url_host"),
|
||||
"site_name" : dict_url_data.get("site_name"),
|
||||
}
|
||||
)
|
||||
|
||||
def process_raw_urls(self, batch_size):
|
||||
|
||||
def _get_status_pattern_matching(url, list_pattern_status_tuple):
|
||||
""" Be careful: Regex pattern should update status on "valid", "invalid", and "unknown" status only
|
||||
"""
|
||||
# Sort pattern tuples by priority. (pattern, priority, status)
|
||||
for regex_pattern, regex_priority, status_if_match in sorted(list_pattern_status_tuple, key=lambda tup: tup[1], reverse=True):
|
||||
# Regular expression pattern matching: https://regexr.com/
|
||||
if bool(re.match(regex_pattern, obj_url.url)):
|
||||
logger.debug("Regex pattern found, status '{}' for URL: {}".format(status_if_match, url))
|
||||
return status_if_match
|
||||
return None
|
||||
|
||||
try:
|
||||
logger.debug("Processing raw URLs")
|
||||
|
||||
# Get batch of URLs, status='raw'
|
||||
raw_urls = Urls.objects.order_by("-ts_fetch").filter(status=Urls.STATUS_ENUM.RAW)[:batch_size]
|
||||
|
||||
if (len(raw_urls) == 0):
|
||||
logger.debug("No raw URLs to process")
|
||||
return
|
||||
|
||||
# Get list of (pattern, priority, status) tuples to override status if required
|
||||
list_pattern_status_tuple = list(StatusPatternMatching.objects.values_list("pattern", "priority", "status"))
|
||||
|
||||
# Per URL
|
||||
for obj_url in raw_urls:
|
||||
# Override status if pattern matching?
|
||||
status_pattern_match = _get_status_pattern_matching(obj_url.url, list_pattern_status_tuple)
|
||||
# Process URL
|
||||
self._process_single_url(obj_url, status_pattern_match, raise_exception_on_error=False)
|
||||
|
||||
logger.info("Updated #{} raw URLs".format(len(raw_urls)))
|
||||
except Exception as e:
|
||||
logger.warning("Exception processing raw URLs: {}\n{}".format(e, traceback.format_exc()))
|
||||
|
||||
def process_error_urls(self, batch_size):
|
||||
try:
|
||||
logger.debug("Processing error URLs")
|
||||
|
||||
# Keep track of processed and skipped "error" URLs
|
||||
num_urls_skipped, num_urls_processed = 0, 0
|
||||
# Get batch of URLs, status='error'
|
||||
error_urls = Urls.objects.order_by("-ts_fetch").filter(status=Urls.STATUS_ENUM.ERROR)[num_urls_skipped:batch_size+num_urls_skipped]
|
||||
|
||||
while ((len(error_urls) > 0) and (num_urls_processed < batch_size)):
|
||||
# Per URL
|
||||
for obj_url in error_urls:
|
||||
# URL ID cached? -> Tried to process recently already, skip
|
||||
if (cache.get("error_{}".format(obj_url.id)) is not None):
|
||||
logger.debug("Already cached URL ID: {}".format(obj_url.id))
|
||||
num_urls_skipped += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
# Process URL
|
||||
self._process_single_url(obj_url, status_pattern_match=None, raise_exception_on_error=True)
|
||||
num_urls_processed += 1
|
||||
except Exception as e:
|
||||
# Error, cache to avoid re-processing for X time
|
||||
cache.set("error_{}".format(obj_url.id), True, timeout=self._cache_timeout_insert_url)
|
||||
num_urls_skipped += 1
|
||||
|
||||
# Get following batch of URLs, status='error'
|
||||
error_urls = Urls.objects.order_by("-ts_fetch").filter(status=Urls.STATUS_ENUM.ERROR)[num_urls_skipped:batch_size+num_urls_skipped]
|
||||
|
||||
logger.info("Updated #{}, skipped #{} error URLs".format(num_urls_processed, num_urls_skipped))
|
||||
except Exception as e:
|
||||
logger.warning("Exception processing error URLs: {}\n{}".format(e, traceback.format_exc()))
|
||||
|
||||
def process_missing_kids_urls(self, batch_size=None):
|
||||
try:
|
||||
logger.debug("Processing MissingKids URLs - batch_size={}".format(batch_size))
|
||||
# Get batch of URLs, %missingkids.org/poster% AND (status='valid' OR status='invalid')
|
||||
missingkids_urls = Urls.objects.order_by("-ts_fetch").filter(
|
||||
(Q(url__contains="missingkids.org/poster") | Q(url__contains="missingkids.org/new-poster"))
|
||||
&
|
||||
(Q(status=Urls.STATUS_ENUM.VALID) | Q(status=Urls.STATUS_ENUM.INVALID) | Q(status=Urls.STATUS_ENUM.ERROR))
|
||||
)
|
||||
|
||||
# Get batch size
|
||||
if (batch_size is not None):
|
||||
missingkids_urls = missingkids_urls[:batch_size]
|
||||
|
||||
# Per URL
|
||||
for obj_url in missingkids_urls:
|
||||
try:
|
||||
# Process URL. If no exception -> Valid
|
||||
self._process_single_url(obj_url, status_pattern_match=None, raise_exception_on_error=True)
|
||||
except Exception as e:
|
||||
# Raised exception -> Invalid (404 error)
|
||||
obj_url.status = Urls.STATUS_ENUM.INVALID
|
||||
obj_url.save()
|
||||
|
||||
logger.info("Verified status of #{} missingkids.org/poster URLs".format(len(missingkids_urls)))
|
||||
except Exception as e:
|
||||
logger.warning("Exception processing MissingKids URLs: {}\n{}".format(e, traceback.format_exc()))
|
||||
|
||||
Reference in New Issue
Block a user