357 lines
19 KiB
Python
357 lines
19 KiB
Python
from ..models import Urls, UrlContent, UrlsSourceSearch, UrlsDuplicate, StatusPatternMatching, Source, Search
|
|
from django.db.models import Q
|
|
from django.core.cache import cache
|
|
from django.db import IntegrityError
|
|
from django.utils import timezone
|
|
from datetime import timedelta
|
|
from .fetch_utils_url_processor import process_url, get_with_protocol, url_host_slowdown
|
|
import re
|
|
import requests
|
|
import os
|
|
import traceback
|
|
from .logger import get_logger
|
|
logger = get_logger()
|
|
|
|
class DB_Handler():
|
|
def __init__(self):
|
|
pass
|
|
|
|
def insert_raw_urls(self, urls, obj_source, obj_search):
|
|
try:
|
|
logger.debug("Inserting raw URLs")
|
|
# Empty?
|
|
if (len(urls) == 0):
|
|
logger.debug("Empty batch of urls (not writing to DB) for source-search: {} - {}".format(obj_source.source, obj_search.search))
|
|
return
|
|
# Default protocol https://
|
|
urls_clean = [get_with_protocol(url) for url in urls]
|
|
|
|
urls_to_insert = []
|
|
# Per URL
|
|
for url in urls_clean:
|
|
|
|
### Already processed URL?
|
|
if (cache.get("insert_{}".format(url)) is not None):
|
|
logger.debug("Already cached URL: {}".format(url))
|
|
|
|
if (cache.get("insert_{}{}{}".format(url, obj_source.source, obj_search.search)) is not None):
|
|
logger.debug("Already cached (URL, source, search): {} {} {}".format(url, obj_source.source, obj_search.search))
|
|
else:
|
|
### Insert (URL_id, source_id, search_id), since not cached
|
|
# Get URL ID (should already be created)
|
|
obj_url, created = Urls.objects.get_or_create(url=url)
|
|
# Create (id_source, id_url) (shouldn't exist)
|
|
UrlsSourceSearch.objects.get_or_create(id_url=obj_url, id_source=obj_source, id_search=obj_search)
|
|
else:
|
|
# Add object to insert
|
|
urls_to_insert.append(url)
|
|
|
|
### Insert URLs & (URL_id, source_id)
|
|
try:
|
|
### Bulk insert, fails if duplicated URL (not retuning IDs when using ignore_conflicts=True)
|
|
# URLs (ignore_conflicts=False to return IDs)
|
|
bulk_created_urls = Urls.objects.bulk_create([Urls(url=url) for url in urls_to_insert], ignore_conflicts=False)
|
|
# (URL_id, source_id)
|
|
UrlsSourceSearch.objects.bulk_create([UrlsSourceSearch(id_url=obj_url, id_source=obj_source, id_search=obj_search) for obj_url in bulk_created_urls], ignore_conflicts=True)
|
|
except IntegrityError as e:
|
|
### Fallback to one-by-one insert
|
|
logger.debug("bulk_create exception while inserting raw URLs (fails if duplicated URL), falling back to non-bulk method")
|
|
# One by one
|
|
for url in urls_to_insert:
|
|
# URL
|
|
obj_url, created = Urls.objects.get_or_create(url=url)
|
|
if (created):
|
|
logger.debug("Inserted: {}".format(obj_url.url))
|
|
else:
|
|
logger.debug("Not inserted: {}".format(obj_url.url))
|
|
# (URL, source, search)
|
|
UrlsSourceSearch.objects.get_or_create(id_url=obj_url, id_source=obj_source, id_search=obj_search)
|
|
except Exception as e:
|
|
logger.warning("bulk_create unknown exception while inserting raw URLs: {}\n{}".format(e, traceback.format_exc()))
|
|
# Avoid caching due to error on insertion
|
|
urls_clean = []
|
|
|
|
# Insert or update cache
|
|
for url in urls_clean:
|
|
cache.set("insert_{}".format(url), True, timeout=int(os.getenv("FETCHER_INSERT_URL_CACHE_TIME", 86400)))
|
|
cache.set("insert_{}{}{}".format(url, obj_source.source, obj_search.search), True, timeout=int(os.getenv("FETCHER_INSERT_URL_CACHE_TIME", 86400)))
|
|
|
|
logger.info("Inserted #{} raw URLs, Source-Search {} - {}".format(len(urls_to_insert), obj_source.source, obj_search.search))
|
|
|
|
except Exception as e:
|
|
logger.warning("Exception inserting raw URLs: {}\n{}".format(e, traceback.format_exc()))
|
|
|
|
def _set_status(self, obj_url, status):
|
|
# Update status if setting a new value
|
|
if (obj_url.status != status):
|
|
obj_url.status = status
|
|
obj_url.save()
|
|
|
|
def _set_duplicate_and_insert_canonical(self, obj_url, url_canonical):
|
|
# Update status
|
|
self._set_status(obj_url, Urls.STATUS_ENUM.DUPLICATE)
|
|
# Get or create URL with canonical form
|
|
obj_url_canonical, created = Urls.objects.get_or_create(url=url_canonical)
|
|
# Get the source-search IDs associated to obj_url.id
|
|
list_url_source_search = UrlsSourceSearch.objects.filter(id_url=obj_url)
|
|
for obj_url_source_search in list_url_source_search:
|
|
# Associate same sources to url_canonical (it might already exist)
|
|
UrlsSourceSearch.objects.get_or_create(id_url=obj_url_canonical, id_source=obj_url_source_search.id_source, id_search=obj_url_source_search.id_search)
|
|
# URLs duplciate association
|
|
UrlsDuplicate.objects.get_or_create(id_url_canonical=obj_url_canonical, id_url_duplicated=obj_url)
|
|
|
|
def _process_single_url(self, obj_url, status_pattern_match, raise_exception_on_error, paywall_bypass=False):
|
|
##########################################################################
|
|
# URL pattern: missingkids.org/poster OR missingkids.org/new-poster
|
|
if ("missingkids.org" in obj_url.url) and ("poster" in obj_url.url):
|
|
# Sleep required? To avoid too many requests error (original URL, not paywall bypassing endpoint)
|
|
url_host_slowdown(obj_url.url, url_host_slowdown_seconds=float(os.getenv("FETCHER_URL_HOST_SLEEP", 5)))
|
|
try:
|
|
# Request
|
|
r = requests.get(obj_url.url, allow_redirects=True)
|
|
except Exception as e:
|
|
if (raise_exception_on_error):
|
|
# Simply raise exception, handled in a different way
|
|
raise Exception("Error processing URL, raising exception as expected")
|
|
else:
|
|
logger.debug("Error processing URL: {}\n{}\n{}".format(obj_url.url, str(e), traceback.format_exc()))
|
|
# Set status to error
|
|
self._set_status(obj_url, Urls.STATUS_ENUM.ERROR)
|
|
return
|
|
|
|
if (r.url != obj_url.url):
|
|
# Canonical
|
|
url_canonical = r.url
|
|
# Set duplicate, and insert new canonical form
|
|
self._set_duplicate_and_insert_canonical(obj_url, url_canonical)
|
|
elif (r.status_code == 200):
|
|
# Not enough to determine if it is valid. Need to wait to finish javascript, it might redirect to 404
|
|
# self._set_status(obj_url, Urls.STATUS_ENUM.VALID)
|
|
self._set_status(obj_url, Urls.STATUS_ENUM.UNKNOWN)
|
|
elif (r.status_code == 404):
|
|
self._set_status(obj_url, Urls.STATUS_ENUM.INVALID)
|
|
else:
|
|
logger.debug("Unknown request status: {} for missing kids request: {}".format(r.status_code, obj_url.url))
|
|
self._set_status(obj_url, Urls.STATUS_ENUM.UNKNOWN)
|
|
return
|
|
##########################################################################
|
|
|
|
# Found a pattern match -> Override status
|
|
if (status_pattern_match is not None):
|
|
logger.debug("Pattern match, status '{}' for input URL: {}".format(status_pattern_match, obj_url.url))
|
|
# Update status
|
|
self._set_status(obj_url, status_pattern_match)
|
|
##### Filter URL? -> Invalid (don't extract content)
|
|
if (status_pattern_match == "invalid"):
|
|
return
|
|
|
|
try:
|
|
# Extract URL content
|
|
dict_url_data = process_url(obj_url.url, paywall_bypass)
|
|
logger.debug("Processing raw URL EXTRACT URL CONTENT OK: {}".format(obj_url.url))
|
|
except Exception as e:
|
|
if (raise_exception_on_error):
|
|
# Simply raise exception, handled in a different way
|
|
raise Exception("Error processing URL, raising exception as expected")
|
|
else:
|
|
logger.debug("Error processing URL: {}\n{}\n{}".format(obj_url.url, str(e), traceback.format_exc()))
|
|
# Set status to error
|
|
dict_url_data = None
|
|
|
|
##### Canonical URL different? -> Duplicate
|
|
if (dict_url_data is not None) and (dict_url_data.get("url_canonical") is not None) and (dict_url_data.get("url") != dict_url_data.get("url_canonical")):
|
|
# URL as duplicate, insert canonical URL
|
|
self._set_duplicate_and_insert_canonical(obj_url, dict_url_data.get("url_canonical"))
|
|
# Next URL
|
|
return
|
|
|
|
# Not overriding status given pattern matching?
|
|
if (status_pattern_match is None):
|
|
# (dict_url_data is None) or (Exception while processing URL) ? -> Error status
|
|
if (dict_url_data is None):
|
|
# Update status
|
|
self._set_status(obj_url, Urls.STATUS_ENUM.ERROR)
|
|
# Next URL
|
|
return
|
|
|
|
# Invalid? e.g. binary data
|
|
if (dict_url_data.get("override_status") == "invalid"):
|
|
# Update status
|
|
self._set_status(obj_url, Urls.STATUS_ENUM.INVALID)
|
|
# Next URL
|
|
return
|
|
|
|
##### Valid URL
|
|
# Update status
|
|
self._set_status(obj_url, Urls.STATUS_ENUM.VALID)
|
|
|
|
try:
|
|
if (dict_url_data is not None):
|
|
# Create or update extracted URL data
|
|
UrlContent.objects.update_or_create(
|
|
id_url=obj_url,
|
|
defaults = {
|
|
"date_published" : dict_url_data.get("publish_date"),
|
|
"title" : dict_url_data.get("title"),
|
|
"description" : dict_url_data.get("description"),
|
|
"content" : dict_url_data.get("content"),
|
|
"valid_content" : dict_url_data.get("valid_content"),
|
|
"language" : dict_url_data.get("language"),
|
|
"keywords" : dict_url_data.get("keywords"),
|
|
"tags" : dict_url_data.get("tags"),
|
|
"authors" : dict_url_data.get("authors"),
|
|
"image_main_url" : dict_url_data.get("image_main_url"),
|
|
"images_url" : dict_url_data.get("images_url"),
|
|
"videos_url" : dict_url_data.get("videos_url"),
|
|
"url_host" : dict_url_data.get("url_host"),
|
|
"site_name" : dict_url_data.get("site_name"),
|
|
}
|
|
)
|
|
except Exception as e:
|
|
logger.debug("Error in update_or_create UrlContent: {}\ndict_url_data: {}\n{}\n{}".format(obj_url.url, dict_url_data, str(e), traceback.format_exc()))
|
|
|
|
|
|
def process_raw_urls(self, batch_size):
|
|
|
|
def _get_status_pattern_matching(url, list_pattern_status_tuple):
|
|
""" Be careful: Regex pattern should update status on "valid", "invalid", and "unknown" status only """
|
|
# Sort pattern tuples by priority. (pattern, priority, status)
|
|
for regex_pattern, regex_priority, status_if_match in sorted(list_pattern_status_tuple, key=lambda tup: tup[1], reverse=True):
|
|
# Regular expression pattern matching: https://regexr.com/
|
|
if bool(re.match(regex_pattern, obj_url.url)):
|
|
# logger.debug("Regex pattern found, status '{}' for URL: {}".format(status_if_match, url))
|
|
return status_if_match
|
|
return None
|
|
|
|
try:
|
|
logger.debug("Processing raw URLs")
|
|
|
|
# Get batch of URLs, status='raw'
|
|
raw_urls = Urls.objects.order_by("-ts_fetch").filter(status=Urls.STATUS_ENUM.RAW)[:batch_size]
|
|
|
|
if (len(raw_urls) == 0):
|
|
logger.debug("No raw URLs to process")
|
|
return
|
|
|
|
# Get list of (pattern, priority, status) tuples to override status if required
|
|
list_pattern_status_tuple = list(StatusPatternMatching.objects.values_list("pattern", "priority", "status"))
|
|
|
|
# Per URL
|
|
for obj_url in raw_urls:
|
|
logger.debug("Processing raw URL: {}".format(obj_url.url))
|
|
# Override status if pattern matching?
|
|
status_pattern_match = _get_status_pattern_matching(obj_url.url, list_pattern_status_tuple)
|
|
# Process URL
|
|
self._process_single_url(obj_url, status_pattern_match, raise_exception_on_error=False)
|
|
logger.debug("Processing raw URL OK: {}".format(obj_url.url))
|
|
|
|
logger.info("Updated #{} raw URLs".format(len(raw_urls)))
|
|
except Exception as e:
|
|
logger.warning("Exception processing raw URLs: {}\n{}".format(e, traceback.format_exc()))
|
|
|
|
def process_error_urls(self, batch_size):
|
|
try:
|
|
logger.debug("Processing error URLs")
|
|
|
|
# Keep track of processed and skipped "error" URLs
|
|
num_urls_skipped, num_urls_processed = 0, 0
|
|
# Get batch of URLs, status='error'
|
|
error_urls = Urls.objects.order_by("-ts_fetch").filter(status=Urls.STATUS_ENUM.ERROR)[num_urls_skipped:batch_size+num_urls_skipped]
|
|
|
|
while ((len(error_urls) > 0) and (num_urls_processed < batch_size)):
|
|
# Per URL
|
|
for obj_url in error_urls:
|
|
# URL ID cached? -> Tried to process recently already, skip
|
|
if (cache.get("error_{}".format(obj_url.id)) is not None):
|
|
logger.debug("Already cached URL ID: {}".format(obj_url.id))
|
|
num_urls_skipped += 1
|
|
continue
|
|
|
|
try:
|
|
# Process URL, try bypassing paywall
|
|
self._process_single_url(obj_url, status_pattern_match=None, raise_exception_on_error=True, paywall_bypass=True)
|
|
num_urls_processed += 1
|
|
except Exception as e:
|
|
# Error, cache to avoid re-processing for X time
|
|
cache.set("error_{}".format(obj_url.id), True, timeout=int(os.getenv("FETCHER_ERROR_URL_CACHE_TIME", 86400)))
|
|
num_urls_skipped += 1
|
|
|
|
# Get following batch of URLs, status='error'
|
|
error_urls = Urls.objects.order_by("-ts_fetch").filter(status=Urls.STATUS_ENUM.ERROR)[num_urls_skipped:batch_size+num_urls_skipped]
|
|
|
|
logger.info("Updated #{}, skipped #{} error URLs".format(num_urls_processed, num_urls_skipped))
|
|
except Exception as e:
|
|
logger.warning("Exception processing error URLs: {}\n{}".format(e, traceback.format_exc()))
|
|
|
|
def process_missing_kids_urls(self, batch_size=None, process_status_only=None):
|
|
try:
|
|
logger.info("Processing MissingKids URLs - batch_size={} process_status_only={}".format(batch_size, process_status_only))
|
|
|
|
if (process_status_only is None):
|
|
filter = (Q(status=Urls.STATUS_ENUM.VALID) | Q(status=Urls.STATUS_ENUM.INVALID) | Q(status=Urls.STATUS_ENUM.UNKNOWN) | Q(status=Urls.STATUS_ENUM.ERROR))
|
|
else:
|
|
if (process_status_only == "valid"):
|
|
filter = Q(status=Urls.STATUS_ENUM.VALID)
|
|
elif (process_status_only == "invalid"):
|
|
filter = Q(status=Urls.STATUS_ENUM.INVALID)
|
|
elif (process_status_only == "error"):
|
|
filter = Q(status=Urls.STATUS_ENUM.ERROR)
|
|
elif (process_status_only == "unknown"):
|
|
filter = Q(status=Urls.STATUS_ENUM.UNKNOWN)
|
|
elif (process_status_only == "raw"):
|
|
filter = Q(status=Urls.STATUS_ENUM.RAW)
|
|
elif (process_status_only == "duplicate"):
|
|
filter = Q(status=Urls.STATUS_ENUM.DUPLICATE)
|
|
else:
|
|
logger.info("Unknown status to filter: {}".format(process_status_only))
|
|
|
|
# Get batch of URLs, %missingkids.org/poster% AND (status='valid' OR status='invalid')
|
|
missingkids_urls = Urls.objects.order_by("-ts_fetch").filter(
|
|
filter & (Q(url__contains="missingkids.org/poster") | Q(url__contains="missingkids.org/new-poster"))
|
|
)
|
|
|
|
# Get batch size
|
|
if (batch_size is not None):
|
|
missingkids_urls = missingkids_urls[:batch_size]
|
|
|
|
# Per URL
|
|
for obj_url in missingkids_urls:
|
|
try:
|
|
# Missing kids fetching endpoint, verify URL
|
|
missingkids_fetch_endpoint = os.path.join(os.getenv("SELENIUM_ENDPOINT", "http://localhost:80"), "verify_missing_kid/")
|
|
data = {"url": obj_url.url}
|
|
# POST
|
|
r = requests.post(missingkids_fetch_endpoint, json=data, timeout=120)
|
|
# Jsonify
|
|
results = r.json()
|
|
logger.debug("Selenium results for URL {}: {}".format(obj_url.url, str(results)))
|
|
|
|
if (results.get("status") == "valid"):
|
|
self._set_status(obj_url, Urls.STATUS_ENUM.VALID)
|
|
elif (results.get("status") == "invalid"):
|
|
self._set_status(obj_url, Urls.STATUS_ENUM.INVALID)
|
|
elif (results.get("status") == "duplicate"):
|
|
self._set_duplicate_and_insert_canonical(obj_url, results.get("redirection"))
|
|
elif (results.get("status") == "unknown"):
|
|
# Nothing to do, not sure about it...
|
|
logger.info("Missing kid verification returned unknown for URL: {}".format(obj_url.url))
|
|
pass
|
|
|
|
except Exception as e:
|
|
logger.warning("Unknown error processing missing kids poster for URL: {}\n{}".format(obj_url.url, str(e)))
|
|
|
|
logger.info("Verified status of #{} missingkids.org/poster / missingkids.org/new-poster URLs".format(len(missingkids_urls)))
|
|
except Exception as e:
|
|
logger.warning("Exception processing MissingKids URLs: {}\n{}".format(e, traceback.format_exc()))
|
|
|
|
def clean_old_url_content(self, older_than_days=60):
|
|
try:
|
|
# Get cut off date
|
|
cutoff_date = timezone.now() - timedelta(days=older_than_days)
|
|
# Delete old UrlContent objects
|
|
old_url_content = UrlContent.objects.filter(id_url__ts_fetch__lt=cutoff_date)
|
|
logger.info("Cleaning URL content older than {} days: #{}".format(older_than_days, len(old_url_content)))
|
|
old_url_content.delete()
|
|
except Exception as e:
|
|
logger.warning("Exception cleaning old URL content: {}\n{}".format(e, traceback.format_exc()))
|