Urls source search, cleaning code
This commit is contained in:
@@ -1,11 +1,9 @@
|
||||
from ..models import Urls, UrlContent, UrlsSource, UrlsDuplicate, Source, StatusPatternMatching
|
||||
from ..models import Urls, UrlContent, UrlsSourceSearch, UrlsDuplicate, StatusPatternMatching, Source, Search
|
||||
from django.db.models import Q
|
||||
from .url_processor import process_url
|
||||
from django.core.cache import cache
|
||||
from django.db import IntegrityError
|
||||
import hashlib
|
||||
from .url_processor import process_url, get_with_protocol
|
||||
import re
|
||||
import time
|
||||
import traceback
|
||||
from .logger import get_logger
|
||||
logger = get_logger()
|
||||
@@ -19,61 +17,32 @@ class DB_Handler():
|
||||
# URL host slowdown
|
||||
self.url_host_slowdown_seconds = 5
|
||||
|
||||
def _get_safe_cache_key(self, raw_key):
|
||||
"""Generate a safe cache key using an MD5 hash"""
|
||||
return hashlib.md5(raw_key.encode()).hexdigest()
|
||||
|
||||
def _cache_key(self, cache_key, hash_encode, cache_timeout):
|
||||
if (hash_encode):
|
||||
cache.set(self._get_safe_cache_key(cache_key), True, timeout=cache_timeout)
|
||||
else:
|
||||
cache.set(cache_key, True, timeout=cache_timeout)
|
||||
|
||||
def _is_cached_key(self, cache_key, hash_encoded):
|
||||
# Returns True if cached
|
||||
if (hash_encoded):
|
||||
return cache.get(self._get_safe_cache_key(cache_key)) is not None
|
||||
else:
|
||||
return cache.get(cache_key) is not None
|
||||
|
||||
def _clean_protocol(self, url):
|
||||
# http:// -> https://
|
||||
url = url.replace("http://", "https://")
|
||||
# "" -> https://
|
||||
if not (url.startswith("https://")):
|
||||
url = "https://" + url
|
||||
return url
|
||||
|
||||
def insert_raw_urls(self, urls, source):
|
||||
def insert_raw_urls(self, urls, obj_source, obj_search):
|
||||
try:
|
||||
logger.debug("Inserting raw URLs")
|
||||
# Empty?
|
||||
if (len(urls) == 0):
|
||||
logger.debug("Empty batch of urls (not writing to DB) for source: {}".format(source))
|
||||
logger.debug("Empty batch of urls (not writing to DB) for source-search: {} - {}".format(obj_source.source, obj_search.search))
|
||||
return
|
||||
|
||||
# Default protocol https://
|
||||
urls_clean = [self._clean_protocol(url) for url in urls]
|
||||
|
||||
# Get the source (create if not exists)
|
||||
source_obj, created = Source.objects.get_or_create(source=source)
|
||||
urls_clean = [get_with_protocol(url) for url in urls]
|
||||
|
||||
urls_to_insert = []
|
||||
# Per URL
|
||||
for url in urls_clean:
|
||||
|
||||
### Already processed URL?
|
||||
if (self._is_cached_key(url, hash_encoded=True)):
|
||||
if (cache.get("insert_{}".format(url)) is not None):
|
||||
logger.debug("Already cached URL: {}".format(url))
|
||||
|
||||
if (self._is_cached_key("{}{}".format(source, url), hash_encoded=True)):
|
||||
logger.debug("Already cached (source, URL): {} {}".format(source, url))
|
||||
if (cache.get("insert_{}{}{}".format(url, obj_source.source, obj_search.search)) is not None):
|
||||
logger.debug("Already cached (URL, source, search): {} {} {}".format(url, obj_source.source, obj_search.search))
|
||||
else:
|
||||
### Insert (URL_id, source_id), since not cached
|
||||
### Insert (URL_id, source_id, search_id), since not cached
|
||||
# Get URL ID (should already be created)
|
||||
url_obj, created = Urls.objects.get_or_create(url=url)
|
||||
obj_url, created = Urls.objects.get_or_create(url=url)
|
||||
# Create (id_source, id_url) (shouldn't exist)
|
||||
UrlsSource.objects.get_or_create(id_source=source_obj, id_url=url_obj)
|
||||
UrlsSourceSearch.objects.get_or_create(id_url=obj_url, id_source=obj_source, id_search=obj_search)
|
||||
else:
|
||||
# Add object to insert
|
||||
# url_object_to_insert.append(Urls(url=url))
|
||||
@@ -85,16 +54,20 @@ class DB_Handler():
|
||||
# URLs (ignore_conflicts=False to return IDs)
|
||||
bulk_created_urls = Urls.objects.bulk_create([Urls(url=url) for url in urls_to_insert], ignore_conflicts=False)
|
||||
# (URL_id, source_id)
|
||||
UrlsSource.objects.bulk_create([UrlsSource(id_source=source_obj, id_url=url_obj) for url_obj in bulk_created_urls], ignore_conflicts=True)
|
||||
UrlsSourceSearch.objects.bulk_create([UrlsSourceSearch(id_url=obj_url, id_source=obj_source, id_search=obj_search) for obj_url in bulk_created_urls], ignore_conflicts=True)
|
||||
except IntegrityError as e:
|
||||
### Fallback to one-by-one insert
|
||||
logger.debug("bulk_create exception while inserting raw URLs (fails if duplicated URL), falling back to non-bulk method")
|
||||
# One by one
|
||||
for url in urls_to_insert:
|
||||
# URL
|
||||
url_obj, created = Urls.objects.get_or_create(url=url)
|
||||
# (URL, source)
|
||||
UrlsSource.objects.get_or_create(id_source=source_obj, id_url=url_obj)
|
||||
obj_url, created = Urls.objects.get_or_create(url=url)
|
||||
if (created):
|
||||
logger.info("CREATED: {}".format(obj_url.url))
|
||||
else:
|
||||
logger.info("NOT CREATED: {}".format(obj_url.url))
|
||||
# (URL, source, search)
|
||||
UrlsSourceSearch.objects.get_or_create(id_url=obj_url, id_source=obj_source, id_search=obj_search)
|
||||
except Exception as e:
|
||||
logger.warning("bulk_create unknown exception while inserting raw URLs: {}\n{}".format(e, traceback.format_exc()))
|
||||
# Avoid caching due to error on insertion
|
||||
@@ -102,37 +75,14 @@ class DB_Handler():
|
||||
|
||||
# Insert or update cache
|
||||
for url in urls_clean:
|
||||
# Hash encode URLs for special characters
|
||||
self._cache_key(url, hash_encode=True, cache_timeout=self._cache_timeout_insert_url)
|
||||
self._cache_key("{}{}".format(source, url), hash_encode=True, cache_timeout=self._cache_timeout_insert_url)
|
||||
cache.set("insert_{}".format(url), True, timeout=self._cache_timeout_insert_url)
|
||||
cache.set("insert_{}{}{}".format(url, obj_source.source, obj_search.search), True, timeout=self._cache_timeout_insert_url)
|
||||
|
||||
logger.info("Inserted #{} raw URLs".format(len(urls_to_insert)))
|
||||
|
||||
except Exception as e:
|
||||
logger.warning("Exception inserting raw URLs: {}\n{}".format(e, traceback.format_exc()))
|
||||
|
||||
def _get_url_host(self, url):
|
||||
# URL no protocol, first substring before '/'
|
||||
url_host = url.replace("https://", "").replace("http://", "").split("/")[0]
|
||||
return url_host
|
||||
|
||||
def _url_host_slowdown(self, url, url_host_slowdown_seconds):
|
||||
### Avoid (frequent) too many requests to the same URL host
|
||||
# Get URL host
|
||||
url_host = self._get_url_host(url)
|
||||
# Recently processed URL host? -> Slow down required
|
||||
last_cached_timestamp = cache.get("processed_{}".format(url_host), None)
|
||||
if last_cached_timestamp:
|
||||
# Get time since last processed URL host (in seconds)
|
||||
time_since_last_processed = time.time() - last_cached_timestamp
|
||||
# Amount of time required to sleep?
|
||||
slowdown_required = max(0, url_host_slowdown_seconds - time_since_last_processed)
|
||||
logger.debug("Slow down (sleeping {:.2f}) for URL host {}".format(slowdown_required, url_host))
|
||||
# Sleep
|
||||
time.sleep(slowdown_required)
|
||||
# About to process URL host, cache time
|
||||
cache.set("processed_{}".format(url_host), time.time(), timeout=60*5) # Expire after 5 minutes
|
||||
|
||||
def _process_single_url(self, obj_url, status_pattern_match, raise_exception_on_error):
|
||||
|
||||
def set_status(obj_url, status):
|
||||
@@ -158,8 +108,6 @@ class DB_Handler():
|
||||
|
||||
##### Process URL
|
||||
try:
|
||||
# Slow down if required to avoid too many requests error
|
||||
self._url_host_slowdown(obj_url.url, self.url_host_slowdown_seconds)
|
||||
# Get data
|
||||
dict_url_data = process_url(obj_url.url)
|
||||
# Not none or handle as exception
|
||||
@@ -190,17 +138,17 @@ class DB_Handler():
|
||||
|
||||
# Get or create URL with canonical form
|
||||
obj_url_canonical, created = Urls.objects.get_or_create(url=dict_url_data.get("url_canonical"))
|
||||
# Get the sources id associated to obj_url.id
|
||||
url_sources = UrlsSource.objects.filter(id_url=obj_url)
|
||||
for url_source_obj in url_sources:
|
||||
# Get the source-search IDs associated to obj_url.id
|
||||
list_url_source_search = UrlsSourceSearch.objects.fiter(id_url=obj_url)
|
||||
for obj_url_source_search in list_url_source_search:
|
||||
# Associate same sources to url_canonical (it might already exist)
|
||||
obj_urls_source, created = UrlsSource.objects.get_or_create(id_source=url_source_obj.id_source, id_url=obj_url_canonical)
|
||||
UrlsSourceSearch.objects.get_or_create(id_url=obj_url_canonical, id_source=obj_url_source_search.id_source, id_search=obj_url_source_search.id_search)
|
||||
|
||||
# URLs duplciate association
|
||||
obj_urls_duplicate, created = UrlsDuplicate.objects.get_or_create(id_url_canonical=obj_url_canonical, id_url_duplicated=obj_url)
|
||||
UrlsDuplicate.objects.get_or_create(id_url_canonical=obj_url_canonical, id_url_duplicated=obj_url)
|
||||
|
||||
# TODO: return obj_url_canonical so as to directly process the recently inserted URL
|
||||
# Whever this function is called, add:
|
||||
# Wherever this function is called, add:
|
||||
# self._process_single_url(obj_url_canonical, status_pattern_match, raise_exception_on_error)
|
||||
|
||||
# Next URL
|
||||
@@ -281,7 +229,7 @@ class DB_Handler():
|
||||
# Per URL
|
||||
for obj_url in error_urls:
|
||||
# URL ID cached? -> Tried to process recently already, skip
|
||||
if (self._is_cached_key("error_{}".format(obj_url.id), hash_encoded=False)):
|
||||
if (cache.get("error_{}".format(obj_url.id)) is not None):
|
||||
logger.debug("Already cached URL ID: {}".format(obj_url.id))
|
||||
num_urls_skipped += 1
|
||||
continue
|
||||
@@ -292,7 +240,7 @@ class DB_Handler():
|
||||
num_urls_processed += 1
|
||||
except Exception as e:
|
||||
# Error, cache to avoid re-processing for X time
|
||||
self._cache_key("error_{}".format(obj_url.id), hash_encode=False, cache_timeout=self._cache_timeout_error_url)
|
||||
cache.set("error_{}".format(obj_url.id), True, timeout=self._cache_timeout_insert_url)
|
||||
num_urls_skipped += 1
|
||||
|
||||
# Get following batch of URLs, status='error'
|
||||
|
||||
Reference in New Issue
Block a user