from ..models import Urls, UrlContent, UrlsSourceSearch, UrlsDuplicate, StatusPatternMatching, Source, Search from django.db.models import Q from django.core.cache import cache from django.db import IntegrityError from django.utils import timezone from datetime import timedelta from .fetch_utils_url_processor import process_url, verify_missing_kid_url from .utils import get_with_protocol import re import requests import os import traceback from .logger import get_logger logger = get_logger() class DB_Handler(): def __init__(self): pass def insert_raw_urls(self, urls, obj_source, obj_search): try: logger.debug("Inserting raw URLs") # Empty? if (len(urls) == 0): logger.debug("Empty batch of urls (not writing to DB) for source-search: {} - {}".format(obj_source.source, obj_search.search)) return # Default protocol https:// urls_clean = [get_with_protocol(url) for url in urls] urls_to_insert = [] # Per URL for url in urls_clean: ### Already processed URL? if (cache.get("insert_{}".format(url)) is not None): logger.debug("Already cached URL: {}".format(url)) if (cache.get("insert_{}{}{}".format(url, obj_source.source, obj_search.search)) is not None): logger.debug("Already cached (URL, source, search): {} {} {}".format(url, obj_source.source, obj_search.search)) else: ### Insert (URL_id, source_id, search_id), since not cached # Get URL ID (should already be created) obj_url, created = Urls.objects.get_or_create(url=url) # Create (id_source, id_url) (shouldn't exist) UrlsSourceSearch.objects.get_or_create(id_url=obj_url, id_source=obj_source, id_search=obj_search) else: # Add object to insert urls_to_insert.append(url) ### Insert URLs & (URL_id, source_id) try: ### Bulk insert, fails if duplicated URL (not retuning IDs when using ignore_conflicts=True) # URLs (ignore_conflicts=False to return IDs) bulk_created_urls = Urls.objects.bulk_create([Urls(url=url) for url in urls_to_insert], ignore_conflicts=False) # (URL_id, source_id) UrlsSourceSearch.objects.bulk_create([UrlsSourceSearch(id_url=obj_url, id_source=obj_source, id_search=obj_search) for obj_url in bulk_created_urls], ignore_conflicts=True) except IntegrityError as e: ### Fallback to one-by-one insert logger.debug("bulk_create exception while inserting raw URLs (fails if duplicated URL), falling back to non-bulk method") # One by one for url in urls_to_insert: # URL obj_url, created = Urls.objects.get_or_create(url=url) if (created): logger.debug("Inserted: {}".format(obj_url.url)) else: logger.debug("Not inserted: {}".format(obj_url.url)) # (URL, source, search) UrlsSourceSearch.objects.get_or_create(id_url=obj_url, id_source=obj_source, id_search=obj_search) except Exception as e: logger.warning("bulk_create unknown exception while inserting raw URLs: {}\n{}".format(e, traceback.format_exc())) # Avoid caching due to error on insertion urls_clean = [] # Insert or update cache for url in urls_clean: cache.set("insert_{}".format(url), True, timeout=int(os.getenv("FETCHER_INSERT_URL_CACHE_TIME", 86400))) cache.set("insert_{}{}{}".format(url, obj_source.source, obj_search.search), True, timeout=int(os.getenv("FETCHER_INSERT_URL_CACHE_TIME", 86400))) logger.info("Inserted #{} raw URLs, Source-Search {} - {}".format(len(urls_to_insert), obj_source.source, obj_search.search)) except Exception as e: logger.warning("Exception inserting raw URLs: {}\n{}".format(e, traceback.format_exc())) def _set_status(self, obj_url, status): # Update status if setting a new value if (obj_url.status != status): obj_url.status = status obj_url.save() def _set_duplicate_and_insert_canonical(self, obj_url, url_canonical): # Update status self._set_status(obj_url, Urls.STATUS_ENUM.DUPLICATE) # Get or create URL with canonical form obj_url_canonical, created = Urls.objects.get_or_create(url=url_canonical) # Get the source-search IDs associated to obj_url.id list_url_source_search = UrlsSourceSearch.objects.filter(id_url=obj_url) for obj_url_source_search in list_url_source_search: # Associate same sources to url_canonical (it might already exist) UrlsSourceSearch.objects.get_or_create(id_url=obj_url_canonical, id_source=obj_url_source_search.id_source, id_search=obj_url_source_search.id_search) # URLs duplciate association UrlsDuplicate.objects.get_or_create(id_url_canonical=obj_url_canonical, id_url_duplicated=obj_url) def _process_single_url(self, obj_url, status_pattern_match, raise_exception_on_error, paywall_bypass=False, request_timeout=15): ########################################################################## # URL pattern: missingkids.org/poster OR missingkids.org/new-poster if ("missingkids.org" in obj_url.url) and ("poster" in obj_url.url): try: # Verify missing kid URL results = verify_missing_kid_url(obj_url.url) except Exception as e: if (raise_exception_on_error): # Simply raise exception, handled in a different way raise Exception("Error processing URL, raising exception as expected") else: logger.debug("Error processing URL: {}\n{}\n{}".format(obj_url.url, str(e), traceback.format_exc())) # Set status to error self._set_status(obj_url, Urls.STATUS_ENUM.ERROR) return if (results.get("status") == "valid"): self._set_status(obj_url, Urls.STATUS_ENUM.VALID) elif (results.get("status") == "invalid"): self._set_status(obj_url, Urls.STATUS_ENUM.INVALID) elif (results.get("status") == "duplicate"): self._set_duplicate_and_insert_canonical(obj_url, results.get("redirection")) elif (results.get("status") == "unknown"): # Nothing to do, not sure about it... logger.info("Missing kid verification returned unknown for URL: {}".format(obj_url.url)) self._set_status(obj_url, Urls.STATUS_ENUM.UNKNOWN) return ########################################################################## # Found a pattern match -> Override status if (status_pattern_match is not None): logger.debug("Pattern match, status '{}' for input URL: {}".format(status_pattern_match, obj_url.url)) # Update status self._set_status(obj_url, status_pattern_match) ##### Filter URL? -> Invalid (don't extract content) if (status_pattern_match == "invalid"): return try: # Extract URL content dict_url_data = process_url(obj_url.url, paywall_bypass, request_timeout) except Exception as e: if (raise_exception_on_error): # Simply raise exception, handled in a different way raise Exception("Error processing URL, raising exception as expected") else: logger.debug("Error processing URL: {}\n{}\n{}".format(obj_url.url, str(e), traceback.format_exc())) # Set status to error dict_url_data = None ##### Canonical URL different? -> Duplicate if (dict_url_data is not None) and (dict_url_data.get("url_canonical") is not None) and (dict_url_data.get("url") != dict_url_data.get("url_canonical")): # URL as duplicate, insert canonical URL self._set_duplicate_and_insert_canonical(obj_url, dict_url_data.get("url_canonical")) # Next URL return # Not overriding status given pattern matching? if (status_pattern_match is None): # (dict_url_data is None) or (Exception while processing URL) ? -> Error status if (dict_url_data is None): # Update status self._set_status(obj_url, Urls.STATUS_ENUM.ERROR) # Next URL return # Invalid? e.g. binary data if (dict_url_data.get("override_status") == "invalid"): # Update status self._set_status(obj_url, Urls.STATUS_ENUM.INVALID) # Next URL return ##### Valid URL # Update status self._set_status(obj_url, Urls.STATUS_ENUM.VALID) try: if (dict_url_data is not None): # Create or update extracted URL data UrlContent.objects.update_or_create( id_url=obj_url, defaults = { "date_published" : dict_url_data.get("publish_date"), "title" : dict_url_data.get("title"), "description" : dict_url_data.get("description"), "content" : dict_url_data.get("content"), "valid_content" : dict_url_data.get("valid_content"), "language" : dict_url_data.get("language"), "keywords" : dict_url_data.get("keywords"), "tags" : dict_url_data.get("tags"), "authors" : dict_url_data.get("authors"), "image_main_url" : dict_url_data.get("image_main_url"), "images_url" : dict_url_data.get("images_url"), "videos_url" : dict_url_data.get("videos_url"), "url_host" : dict_url_data.get("url_host"), "site_name" : dict_url_data.get("site_name"), } ) except Exception as e: logger.debug("Error in update_or_create UrlContent: {}\ndict_url_data: {}\n{}\n{}".format(obj_url.url, dict_url_data, str(e), traceback.format_exc())) def process_raw_urls(self, batch_size): def _get_status_pattern_matching(url, list_pattern_status_tuple): """ Be careful: Regex pattern should update status on "valid", "invalid", and "unknown" status only """ # Sort pattern tuples by priority. (pattern, priority, status) for regex_pattern, regex_priority, status_if_match in sorted(list_pattern_status_tuple, key=lambda tup: tup[1], reverse=True): # Regular expression pattern matching: https://regexr.com/ if bool(re.match(regex_pattern, obj_url.url)): # logger.debug("Regex pattern found, status '{}' for URL: {}".format(status_if_match, url)) return status_if_match return None try: logger.debug("Processing raw URLs") # Get batch of URLs, status='raw' raw_urls = Urls.objects.order_by("-ts_fetch").filter(status=Urls.STATUS_ENUM.RAW)[:batch_size] if (len(raw_urls) == 0): logger.debug("No raw URLs to process") return # Get list of (pattern, priority, status) tuples to override status if required list_pattern_status_tuple = list(StatusPatternMatching.objects.values_list("pattern", "priority", "status")) # Per URL for obj_url in raw_urls: # Override status if pattern matching? status_pattern_match = _get_status_pattern_matching(obj_url.url, list_pattern_status_tuple) # Process URL self._process_single_url(obj_url, status_pattern_match, raise_exception_on_error=False) logger.info("Updated #{} raw URLs".format(len(raw_urls))) except Exception as e: logger.warning("Exception processing raw URLs: {}\n{}".format(e, traceback.format_exc())) def process_error_urls(self, batch_size): try: logger.debug("Processing error URLs") # Keep track of processed and skipped "error" URLs num_urls_skipped, num_urls_processed = 0, 0 # Get batch of URLs, status='error' error_urls = Urls.objects.order_by("-ts_fetch").filter(status=Urls.STATUS_ENUM.ERROR)[num_urls_skipped:batch_size+num_urls_skipped] while ((len(error_urls) > 0) and (num_urls_processed < batch_size)): # Per URL for obj_url in error_urls: # URL ID cached? -> Tried to process recently already, skip if (cache.get("error_{}".format(obj_url.id)) is not None): logger.debug("Already cached URL ID: {}".format(obj_url.id)) num_urls_skipped += 1 continue try: # Process URL, try bypassing paywall self._process_single_url(obj_url, status_pattern_match=None, raise_exception_on_error=True, paywall_bypass=True) num_urls_processed += 1 except Exception as e: # Error, cache to avoid re-processing for X time cache.set("error_{}".format(obj_url.id), True, timeout=int(os.getenv("FETCHER_ERROR_URL_CACHE_TIME", 86400))) num_urls_skipped += 1 # Get following batch of URLs, status='error' error_urls = Urls.objects.order_by("-ts_fetch").filter(status=Urls.STATUS_ENUM.ERROR)[num_urls_skipped:batch_size+num_urls_skipped] logger.info("Updated #{}, skipped #{} error URLs".format(num_urls_processed, num_urls_skipped)) except Exception as e: logger.warning("Exception processing error URLs: {}\n{}".format(e, traceback.format_exc())) def process_missing_kids_urls(self, batch_size=None, process_status_only=None): try: logger.info("Processing MissingKids URLs - batch_size={} process_status_only={}".format(batch_size, process_status_only)) if (process_status_only is None): filter = (Q(status=Urls.STATUS_ENUM.VALID) | Q(status=Urls.STATUS_ENUM.INVALID) | Q(status=Urls.STATUS_ENUM.UNKNOWN) | Q(status=Urls.STATUS_ENUM.ERROR)) else: if (process_status_only == "valid"): filter = Q(status=Urls.STATUS_ENUM.VALID) elif (process_status_only == "invalid"): filter = Q(status=Urls.STATUS_ENUM.INVALID) elif (process_status_only == "error"): filter = Q(status=Urls.STATUS_ENUM.ERROR) elif (process_status_only == "unknown"): filter = Q(status=Urls.STATUS_ENUM.UNKNOWN) elif (process_status_only == "raw"): filter = Q(status=Urls.STATUS_ENUM.RAW) elif (process_status_only == "duplicate"): filter = Q(status=Urls.STATUS_ENUM.DUPLICATE) else: logger.info("Unknown status to filter: {}".format(process_status_only)) # Get batch of URLs, %missingkids.org/poster% AND (status='valid' OR status='invalid') missingkids_urls = Urls.objects.order_by("-ts_fetch").filter( filter & (Q(url__contains="missingkids.org/poster") | Q(url__contains="missingkids.org/new-poster")) ) # Get batch size if (batch_size is not None): missingkids_urls = missingkids_urls[:batch_size] # Per URL for obj_url in missingkids_urls: try: SELENIUM_BASED_MISSINGKID_VERIFICATION = False if (SELENIUM_BASED_MISSINGKID_VERIFICATION): # Missing kids fetching endpoint, verify URL missingkids_fetch_endpoint = os.path.join(os.getenv("SELENIUM_ENDPOINT", "http://localhost:80"), "verify_missing_kid/") data = {"url": obj_url.url} # POST r = requests.post(missingkids_fetch_endpoint, json=data, timeout=120) # Jsonify results = r.json() logger.debug("Missingkids Selenium results for URL {}: {}".format(obj_url.url, str(results))) else: # Verify results = verify_missing_kid_url(obj_url.url) logger.debug("Missingkids verify results for URL {}: {}".format(obj_url.url, str(results))) if (results.get("status") == "valid"): self._set_status(obj_url, Urls.STATUS_ENUM.VALID) elif (results.get("status") == "invalid"): self._set_status(obj_url, Urls.STATUS_ENUM.INVALID) elif (results.get("status") == "duplicate"): self._set_duplicate_and_insert_canonical(obj_url, results.get("redirection")) elif (results.get("status") == "unknown"): # Nothing to do, not sure about it... logger.info("Missing kid verification returned unknown for URL: {}".format(obj_url.url)) pass except Exception as e: logger.warning("Unknown error processing missing kids poster for URL: {}\n{}".format(obj_url.url, str(e))) logger.info("Verified status of #{} missingkids.org/poster / missingkids.org/new-poster URLs".format(len(missingkids_urls))) except Exception as e: logger.warning("Exception processing MissingKids URLs: {}\n{}".format(e, traceback.format_exc())) def clean_old_url_content(self, older_than_days=60): try: # Get cut off date cutoff_date = timezone.now() - timedelta(days=older_than_days) # Delete old UrlContent objects old_url_content = UrlContent.objects.filter(id_url__ts_fetch__lt=cutoff_date) logger.info("Cleaning URL content older than {} days: #{}".format(older_than_days, len(old_url_content))) old_url_content.delete() except Exception as e: logger.warning("Exception cleaning old URL content: {}\n{}".format(e, traceback.format_exc()))