267 lines
13 KiB
Python
267 lines
13 KiB
Python
from ..models import Urls, UrlContent, UrlsSourceSearch, UrlsDuplicate, StatusPatternMatching, Source, Search
|
|
from django.db.models import Q
|
|
from django.core.cache import cache
|
|
from django.db import IntegrityError
|
|
from .url_processor import process_url, get_with_protocol
|
|
import re
|
|
import traceback
|
|
from .logger import get_logger
|
|
logger = get_logger()
|
|
|
|
class DB_Handler():
|
|
def __init__(self):
|
|
# Inserting raw URL, cache time: 1 day
|
|
self._cache_timeout_insert_url = 86400
|
|
# Processing error URL, cache time: 2 days
|
|
self._cache_timeout_error_url = 86400*2
|
|
|
|
def insert_raw_urls(self, urls, obj_source, obj_search):
|
|
try:
|
|
logger.debug("Inserting raw URLs")
|
|
# Empty?
|
|
if (len(urls) == 0):
|
|
logger.debug("Empty batch of urls (not writing to DB) for source-search: {} - {}".format(obj_source.source, obj_search.search))
|
|
return
|
|
# Default protocol https://
|
|
urls_clean = [get_with_protocol(url) for url in urls]
|
|
|
|
urls_to_insert = []
|
|
# Per URL
|
|
for url in urls_clean:
|
|
|
|
### Already processed URL?
|
|
if (cache.get("insert_{}".format(url)) is not None):
|
|
logger.debug("Already cached URL: {}".format(url))
|
|
|
|
if (cache.get("insert_{}{}{}".format(url, obj_source.source, obj_search.search)) is not None):
|
|
logger.debug("Already cached (URL, source, search): {} {} {}".format(url, obj_source.source, obj_search.search))
|
|
else:
|
|
### Insert (URL_id, source_id, search_id), since not cached
|
|
# Get URL ID (should already be created)
|
|
obj_url, created = Urls.objects.get_or_create(url=url)
|
|
# Create (id_source, id_url) (shouldn't exist)
|
|
UrlsSourceSearch.objects.get_or_create(id_url=obj_url, id_source=obj_source, id_search=obj_search)
|
|
else:
|
|
# Add object to insert
|
|
# url_object_to_insert.append(Urls(url=url))
|
|
urls_to_insert.append(url)
|
|
|
|
### Insert URLs & (URL_id, source_id)
|
|
try:
|
|
### Bulk insert, fails if duplicated URL (not retuning IDs when using ignore_conflicts=True)
|
|
# URLs (ignore_conflicts=False to return IDs)
|
|
bulk_created_urls = Urls.objects.bulk_create([Urls(url=url) for url in urls_to_insert], ignore_conflicts=False)
|
|
# (URL_id, source_id)
|
|
UrlsSourceSearch.objects.bulk_create([UrlsSourceSearch(id_url=obj_url, id_source=obj_source, id_search=obj_search) for obj_url in bulk_created_urls], ignore_conflicts=True)
|
|
except IntegrityError as e:
|
|
### Fallback to one-by-one insert
|
|
logger.debug("bulk_create exception while inserting raw URLs (fails if duplicated URL), falling back to non-bulk method")
|
|
# One by one
|
|
for url in urls_to_insert:
|
|
# URL
|
|
obj_url, created = Urls.objects.get_or_create(url=url)
|
|
if (created):
|
|
logger.info("CREATED: {}".format(obj_url.url))
|
|
else:
|
|
logger.info("NOT CREATED: {}".format(obj_url.url))
|
|
# (URL, source, search)
|
|
UrlsSourceSearch.objects.get_or_create(id_url=obj_url, id_source=obj_source, id_search=obj_search)
|
|
except Exception as e:
|
|
logger.warning("bulk_create unknown exception while inserting raw URLs: {}\n{}".format(e, traceback.format_exc()))
|
|
# Avoid caching due to error on insertion
|
|
urls_clean = []
|
|
|
|
# Insert or update cache
|
|
for url in urls_clean:
|
|
cache.set("insert_{}".format(url), True, timeout=self._cache_timeout_insert_url)
|
|
cache.set("insert_{}{}{}".format(url, obj_source.source, obj_search.search), True, timeout=self._cache_timeout_insert_url)
|
|
|
|
logger.info("Inserted #{} raw URLs".format(len(urls_to_insert)))
|
|
|
|
except Exception as e:
|
|
logger.warning("Exception inserting raw URLs: {}\n{}".format(e, traceback.format_exc()))
|
|
|
|
def _process_single_url(self, obj_url, status_pattern_match, raise_exception_on_error):
|
|
|
|
def set_status(obj_url, status):
|
|
# Update status if setting a new value
|
|
if (obj_url.status != status):
|
|
obj_url.status = status
|
|
obj_url.save()
|
|
|
|
##### Filter URL? -> Invalid
|
|
if (status_pattern_match == "invalid"):
|
|
logger.debug("Domain filter applied to input URL: {}".format(obj_url.url))
|
|
# Update status
|
|
set_status(obj_url, Urls.STATUS_ENUM.INVALID)
|
|
# Next URL
|
|
return
|
|
|
|
##### Process URL
|
|
try:
|
|
# Get data
|
|
dict_url_data = process_url(obj_url.url)
|
|
# Not none or handle as exception
|
|
assert(dict_url_data is not None)
|
|
except Exception as e:
|
|
if (raise_exception_on_error):
|
|
# Simply raise exception
|
|
raise Exception("Error processing URL")
|
|
else:
|
|
# Set status to error
|
|
logger.debug("Error processing URL: {}\n{}\n".format(obj_url.url, str(e), traceback.format_exc()))
|
|
# Update status
|
|
set_status(obj_url, Urls.STATUS_ENUM.ERROR)
|
|
# Next URL
|
|
return
|
|
|
|
# Invalid? e.g. binary data
|
|
if (dict_url_data.get("override_status") == "invalid"):
|
|
# Update status
|
|
set_status(obj_url, Urls.STATUS_ENUM.INVALID)
|
|
# Next URL
|
|
return
|
|
|
|
##### Canonical URL different? -> Duplicate
|
|
if (dict_url_data.get("url_canonical") is not None) and(dict_url_data.get("url") != dict_url_data.get("url_canonical")):
|
|
# Update status
|
|
set_status(obj_url, Urls.STATUS_ENUM.DUPLICATE)
|
|
|
|
# Get or create URL with canonical form
|
|
obj_url_canonical, created = Urls.objects.get_or_create(url=dict_url_data.get("url_canonical"))
|
|
# Get the source-search IDs associated to obj_url.id
|
|
list_url_source_search = UrlsSourceSearch.objects.fiter(id_url=obj_url)
|
|
for obj_url_source_search in list_url_source_search:
|
|
# Associate same sources to url_canonical (it might already exist)
|
|
UrlsSourceSearch.objects.get_or_create(id_url=obj_url_canonical, id_source=obj_url_source_search.id_source, id_search=obj_url_source_search.id_search)
|
|
|
|
# URLs duplciate association
|
|
UrlsDuplicate.objects.get_or_create(id_url_canonical=obj_url_canonical, id_url_duplicated=obj_url)
|
|
|
|
# TODO: return obj_url_canonical so as to directly process the recently inserted URL
|
|
# Wherever this function is called, add:
|
|
# self._process_single_url(obj_url_canonical, status_pattern_match, raise_exception_on_error)
|
|
|
|
# Next URL
|
|
return
|
|
|
|
##### Valid URL
|
|
# Update status
|
|
set_status(obj_url, Urls.STATUS_ENUM.VALID)
|
|
|
|
# Create or update extracted URL data
|
|
UrlContent.objects.update_or_create(
|
|
id_url=obj_url,
|
|
defaults = {
|
|
"date_published" : dict_url_data.get("publish_date"),
|
|
"title" : dict_url_data.get("title"),
|
|
"description" : dict_url_data.get("description"),
|
|
"content" : dict_url_data.get("content"),
|
|
"valid_content" : dict_url_data.get("valid_content"),
|
|
"language" : dict_url_data.get("language"),
|
|
"keywords" : dict_url_data.get("keywords"),
|
|
"tags" : dict_url_data.get("tags"),
|
|
"authors" : dict_url_data.get("authors"),
|
|
"image_main_url" : dict_url_data.get("image_main_url"),
|
|
"images_url" : dict_url_data.get("images_url"),
|
|
"videos_url" : dict_url_data.get("videos_url"),
|
|
"url_host" : dict_url_data.get("url_host"),
|
|
"site_name" : dict_url_data.get("site_name"),
|
|
}
|
|
)
|
|
|
|
def process_raw_urls(self, batch_size):
|
|
|
|
def _get_status_pattern_matching(url, list_pattern_status_tuple):
|
|
""" Be careful: Regex pattern should update status on "valid", "invalid", and "unknown" status only
|
|
"""
|
|
# Sort pattern tuples by priority. (pattern, priority, status)
|
|
for regex_pattern, regex_priority, status_if_match in sorted(list_pattern_status_tuple, key=lambda tup: tup[1], reverse=True):
|
|
# Regular expression pattern matching: https://regexr.com/
|
|
if bool(re.match(regex_pattern, obj_url.url)):
|
|
logger.debug("Regex pattern found, status '{}' for URL: {}".format(status_if_match, url))
|
|
return status_if_match
|
|
return None
|
|
|
|
try:
|
|
logger.debug("Processing raw URLs")
|
|
|
|
# Get batch of URLs, status='raw'
|
|
raw_urls = Urls.objects.order_by("-ts_fetch").filter(status=Urls.STATUS_ENUM.RAW)[:batch_size]
|
|
|
|
if (len(raw_urls) == 0):
|
|
logger.debug("No raw URLs to process")
|
|
return
|
|
|
|
# Get list of (pattern, priority, status) tuples to override status if required
|
|
list_pattern_status_tuple = list(StatusPatternMatching.objects.values_list("pattern", "priority", "status"))
|
|
|
|
# Per URL
|
|
for obj_url in raw_urls:
|
|
# Override status if pattern matching?
|
|
status_pattern_match = _get_status_pattern_matching(obj_url.url, list_pattern_status_tuple)
|
|
# Process URL
|
|
self._process_single_url(obj_url, status_pattern_match, raise_exception_on_error=False)
|
|
|
|
logger.info("Updated #{} raw URLs".format(len(raw_urls)))
|
|
except Exception as e:
|
|
logger.warning("Exception processing raw URLs: {}\n{}".format(e, traceback.format_exc()))
|
|
|
|
def process_error_urls(self, batch_size):
|
|
try:
|
|
logger.debug("Processing error URLs")
|
|
|
|
# Keep track of processed and skipped "error" URLs
|
|
num_urls_skipped, num_urls_processed = 0, 0
|
|
# Get batch of URLs, status='error'
|
|
error_urls = Urls.objects.order_by("-ts_fetch").filter(status=Urls.STATUS_ENUM.ERROR)[num_urls_skipped:batch_size+num_urls_skipped]
|
|
|
|
while ((len(error_urls) > 0) and (num_urls_processed < batch_size)):
|
|
# Per URL
|
|
for obj_url in error_urls:
|
|
# URL ID cached? -> Tried to process recently already, skip
|
|
if (cache.get("error_{}".format(obj_url.id)) is not None):
|
|
logger.debug("Already cached URL ID: {}".format(obj_url.id))
|
|
num_urls_skipped += 1
|
|
continue
|
|
|
|
try:
|
|
# Process URL
|
|
self._process_single_url(obj_url, status_pattern_match=None, raise_exception_on_error=True)
|
|
num_urls_processed += 1
|
|
except Exception as e:
|
|
# Error, cache to avoid re-processing for X time
|
|
cache.set("error_{}".format(obj_url.id), True, timeout=self._cache_timeout_insert_url)
|
|
num_urls_skipped += 1
|
|
|
|
# Get following batch of URLs, status='error'
|
|
error_urls = Urls.objects.order_by("-ts_fetch").filter(status=Urls.STATUS_ENUM.ERROR)[num_urls_skipped:batch_size+num_urls_skipped]
|
|
|
|
logger.info("Updated #{}, skipped #{} error URLs".format(num_urls_processed, num_urls_skipped))
|
|
except Exception as e:
|
|
logger.warning("Exception processing error URLs: {}\n{}".format(e, traceback.format_exc()))
|
|
|
|
def process_missing_kids_urls(self, batch_size):
|
|
try:
|
|
logger.debug("Processing MissingKids URLs")
|
|
# Get batch of URLs, %missingkids.org/poster% AND (status='valid' OR status='invalid')
|
|
missingkids_urls = Urls.objects.order_by("-ts_fetch").filter(
|
|
(Q(url__contains="missingkids.org/poster") | Q(url__contains="missingkids.org/new-poster"))
|
|
&
|
|
(Q(status=Urls.STATUS_ENUM.VALID) | Q(status=Urls.STATUS_ENUM.INVALID) | Q(status=Urls.STATUS_ENUM.ERROR))
|
|
)[:batch_size]
|
|
|
|
# Per URL
|
|
for obj_url in missingkids_urls:
|
|
try:
|
|
# Process URL. If no exception -> Valid
|
|
self._process_single_url(obj_url, status_pattern_match=None, raise_exception_on_error=True)
|
|
except Exception as e:
|
|
# Raised exception -> Invalid (404 error)
|
|
obj_url.status = Urls.STATUS_ENUM.INVALID
|
|
obj_url.save()
|
|
|
|
logger.info("Verified status of #{} missingkids.org/poster URLs".format(len(missingkids_urls)))
|
|
except Exception as e:
|
|
logger.warning("Exception processing MissingKids URLs: {}\n{}".format(e, traceback.format_exc()))
|
|
|