Update missing kids poster status
This commit is contained in:
@@ -157,6 +157,9 @@
|
|||||||
" cur.execute(\"INSERT INTO URLS (url, status) values ('https://www.fox35orlando.com/news/tavares-police-florida-boys-10-9-abused-sheer-brutality', 'valid')\")\n",
|
" cur.execute(\"INSERT INTO URLS (url, status) values ('https://www.fox35orlando.com/news/tavares-police-florida-boys-10-9-abused-sheer-brutality', 'valid')\")\n",
|
||||||
" cur.execute(\"INSERT INTO URLS (url, status) values ('https://www.google.com', 'invalid')\")\n",
|
" cur.execute(\"INSERT INTO URLS (url, status) values ('https://www.google.com', 'invalid')\")\n",
|
||||||
"\n",
|
"\n",
|
||||||
|
" cur.execute(\"INSERT INTO URLS (url, status) values ('https://www.missingkids.org/poster/USVA/VA25-0820/1', 'valid')\")\n",
|
||||||
|
" cur.execute(\"INSERT INTO URLS (url, status) values ('https://www.missingkids.org/poster/NCMC/2045193/1', 'valid')\")\n",
|
||||||
|
"\n",
|
||||||
" cur.execute(\"INSERT INTO SOURCE (source) values ('news.google.com')\")\n",
|
" cur.execute(\"INSERT INTO SOURCE (source) values ('news.google.com')\")\n",
|
||||||
" cur.execute(\"INSERT INTO SOURCE (source) values ('qwant.com')\")\n",
|
" cur.execute(\"INSERT INTO SOURCE (source) values ('qwant.com')\")\n",
|
||||||
"\n",
|
"\n",
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
from ..models import Urls, UrlContent, UrlsSource, UrlsDuplicate, Source, StatusPatternMatching
|
from ..models import Urls, UrlContent, UrlsSource, UrlsDuplicate, Source, StatusPatternMatching
|
||||||
|
from django.db.models import Q
|
||||||
from .url_processor import process_url
|
from .url_processor import process_url
|
||||||
from django.core.cache import cache
|
from django.core.cache import cache
|
||||||
from django.db import IntegrityError
|
from django.db import IntegrityError
|
||||||
@@ -135,13 +136,25 @@ class DB_Handler():
|
|||||||
cache.set("processed_{}".format(url_host), time.time(), timeout=60*5) # Expire after 5 minutes
|
cache.set("processed_{}".format(url_host), time.time(), timeout=60*5) # Expire after 5 minutes
|
||||||
|
|
||||||
def _process_single_url(self, obj_url, status_pattern_match, raise_exception_on_error):
|
def _process_single_url(self, obj_url, status_pattern_match, raise_exception_on_error):
|
||||||
|
|
||||||
|
def set_status(obj_url, status):
|
||||||
|
# Update status if setting a new value
|
||||||
|
if (obj_url.status != status):
|
||||||
|
obj_url.status = status
|
||||||
|
obj_url.save()
|
||||||
|
# updating_urls.append(obj_url)
|
||||||
|
|
||||||
|
# TODO: Fix enum type issue. Bulk update instead of .save() for each object
|
||||||
|
# List of objects to bulk update
|
||||||
|
# updating_urls = []
|
||||||
|
# ... general processing, append to updating_urls
|
||||||
|
# Urls.objects.bulk_update(updating_urls, ['status'])
|
||||||
|
|
||||||
##### Filter URL? -> Invalid
|
##### Filter URL? -> Invalid
|
||||||
if (status_pattern_match == "invalid"):
|
if (status_pattern_match == "invalid"):
|
||||||
logger.debug("Domain filter applied to input URL: {}".format(obj_url.url))
|
logger.debug("Domain filter applied to input URL: {}".format(obj_url.url))
|
||||||
# Update status
|
# Update status
|
||||||
obj_url.status = Urls.STATUS_ENUM.INVALID
|
set_status(obj_url, Urls.STATUS_ENUM.INVALID)
|
||||||
obj_url.save()
|
|
||||||
# updating_urls.append(obj_url)
|
|
||||||
# Next URL
|
# Next URL
|
||||||
return
|
return
|
||||||
|
|
||||||
@@ -161,19 +174,15 @@ class DB_Handler():
|
|||||||
# Set status to error
|
# Set status to error
|
||||||
logger.debug("Error processing URL: {}\n{}\n".format(obj_url.url, str(e), traceback.format_exc()))
|
logger.debug("Error processing URL: {}\n{}\n".format(obj_url.url, str(e), traceback.format_exc()))
|
||||||
# Update status
|
# Update status
|
||||||
obj_url.status = Urls.STATUS_ENUM.ERROR
|
set_status(obj_url, Urls.STATUS_ENUM.ERROR)
|
||||||
obj_url.save()
|
|
||||||
# updating_urls.append(obj_url)
|
|
||||||
# Next URL
|
# Next URL
|
||||||
return
|
return
|
||||||
|
|
||||||
##### Canonical URL different? -> Duplicate
|
##### Canonical URL different? -> Duplicate
|
||||||
if (dict_url_data.get("url") != dict_url_data.get("url_canonical")):
|
if (dict_url_data.get("url") != dict_url_data.get("url_canonical")):
|
||||||
# Update status
|
# Update status
|
||||||
obj_url.status = Urls.STATUS_ENUM.DUPLICATE
|
set_status(obj_url, Urls.STATUS_ENUM.DUPLICATE)
|
||||||
obj_url.save()
|
|
||||||
# updating_urls.append(obj_url)
|
|
||||||
|
|
||||||
# Get or create URL with canonical form
|
# Get or create URL with canonical form
|
||||||
obj_url_canonical, created = Urls.objects.get_or_create(url=dict_url_data.get("url_canonical"))
|
obj_url_canonical, created = Urls.objects.get_or_create(url=dict_url_data.get("url_canonical"))
|
||||||
# Get the sources id associated to obj_url.id
|
# Get the sources id associated to obj_url.id
|
||||||
@@ -190,9 +199,7 @@ class DB_Handler():
|
|||||||
|
|
||||||
##### Valid URL
|
##### Valid URL
|
||||||
# Update status
|
# Update status
|
||||||
obj_url.status = Urls.STATUS_ENUM.VALID
|
set_status(obj_url, Urls.STATUS_ENUM.VALID)
|
||||||
obj_url.save()
|
|
||||||
# updating_urls.append(obj_url)
|
|
||||||
|
|
||||||
# Create or update extracted URL data
|
# Create or update extracted URL data
|
||||||
UrlContent.objects.update_or_create(
|
UrlContent.objects.update_or_create(
|
||||||
@@ -240,8 +247,6 @@ class DB_Handler():
|
|||||||
|
|
||||||
# Get list of (pattern, priority, status) tuples to override status if required
|
# Get list of (pattern, priority, status) tuples to override status if required
|
||||||
list_pattern_status_tuple = list(StatusPatternMatching.objects.values_list("pattern", "priority", "status"))
|
list_pattern_status_tuple = list(StatusPatternMatching.objects.values_list("pattern", "priority", "status"))
|
||||||
# List of objects to bulk update
|
|
||||||
# updating_urls = []
|
|
||||||
|
|
||||||
# Per URL
|
# Per URL
|
||||||
for obj_url in raw_urls:
|
for obj_url in raw_urls:
|
||||||
@@ -250,9 +255,6 @@ class DB_Handler():
|
|||||||
# Process URL
|
# Process URL
|
||||||
self._process_single_url(obj_url, status_pattern_match, raise_exception_on_error=False)
|
self._process_single_url(obj_url, status_pattern_match, raise_exception_on_error=False)
|
||||||
|
|
||||||
# TODO: Fix enum type issue. Bulk update instead of .save() for each object
|
|
||||||
# Urls.objects.bulk_update(updating_urls, ['status'])
|
|
||||||
|
|
||||||
logger.info("Updated #{} raw URLs".format(len(raw_urls)))
|
logger.info("Updated #{} raw URLs".format(len(raw_urls)))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Exception processing raw URLs: {}\n{}".format(e, traceback.format_exc()))
|
logger.warning("Exception processing raw URLs: {}\n{}".format(e, traceback.format_exc()))
|
||||||
@@ -288,4 +290,29 @@ class DB_Handler():
|
|||||||
|
|
||||||
logger.info("Updated #{}, skipped #{} error URLs".format(num_urls_processed, num_urls_skipped))
|
logger.info("Updated #{}, skipped #{} error URLs".format(num_urls_processed, num_urls_skipped))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Exception processing error URLs: {}\n{}".format(e, traceback.format_exc()))
|
logger.warning("Exception processing error URLs: {}\n{}".format(e, traceback.format_exc()))
|
||||||
|
|
||||||
|
def process_missing_kids_urls(self, batch_size):
|
||||||
|
try:
|
||||||
|
logger.debug("Processing MissingKids URLs")
|
||||||
|
# Get batch of URLs, %missingkids.org/poster% AND (status='valid' OR status='invalid')
|
||||||
|
missingkids_urls = Urls.objects.order_by("-ts_fetch").filter(
|
||||||
|
(Q(url__contains="missingkids.org/poster") | Q(url__contains="missingkids.org/new-poster"))
|
||||||
|
&
|
||||||
|
(Q(status=Urls.STATUS_ENUM.VALID) | Q(status=Urls.STATUS_ENUM.INVALID))
|
||||||
|
)[:batch_size]
|
||||||
|
|
||||||
|
# Per URL
|
||||||
|
for obj_url in missingkids_urls:
|
||||||
|
try:
|
||||||
|
# Process URL. If no exception -> Valid
|
||||||
|
self._process_single_url(obj_url, status_pattern_match=None, raise_exception_on_error=True)
|
||||||
|
except Exception as e:
|
||||||
|
# Raised exception -> Invalid (404 error)
|
||||||
|
obj_url.status = Urls.STATUS_ENUM.INVALID
|
||||||
|
obj_url.save()
|
||||||
|
|
||||||
|
logger.info("Verified status of #{} missingkids.org/poster URLs".format(len(missingkids_urls)))
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Exception processing MissingKids URLs: {}\n{}".format(e, traceback.format_exc()))
|
||||||
|
|
||||||
@@ -4,18 +4,14 @@ from .src.fetch_feed import FetchFeeds
|
|||||||
from .src.fetch_parser import FetchParser
|
from .src.fetch_parser import FetchParser
|
||||||
from .src.db_utils import DB_Handler
|
from .src.db_utils import DB_Handler
|
||||||
'''
|
'''
|
||||||
from src.fetch_parser import FetchParser
|
|
||||||
from src.fetch_search import FetchSearcher
|
from src.fetch_search import FetchSearcher
|
||||||
from src.missing_kids_fetch import MissingKidsFetch
|
from src.missing_kids_fetch import MissingKidsFetch
|
||||||
from src.missing_kids_status import MissingKidsStatus
|
from src.missing_kids_status import MissingKidsStatus
|
||||||
from src.url_status import UpdateErrorURLs
|
|
||||||
'''
|
'''
|
||||||
|
|
||||||
from .src.logger import get_logger
|
from .src.logger import get_logger
|
||||||
logger = get_logger()
|
logger = get_logger()
|
||||||
|
|
||||||
# TODO: Queues with priorities, process_raw_urls least priority due to slowdown...
|
|
||||||
|
|
||||||
@job
|
@job
|
||||||
def background_task(process_type: str):
|
def background_task(process_type: str):
|
||||||
logger.info("Task triggered: {}".format(process_type))
|
logger.info("Task triggered: {}".format(process_type))
|
||||||
@@ -25,10 +21,16 @@ def background_task(process_type: str):
|
|||||||
FetchFeeds().run()
|
FetchFeeds().run()
|
||||||
elif (process_type == "fetch_parser"):
|
elif (process_type == "fetch_parser"):
|
||||||
FetchParser().run()
|
FetchParser().run()
|
||||||
|
# TODO: ENCODE BATCH_SIZE IN PROCESS_tYPE..
|
||||||
elif (process_type == "process_raw_urls"):
|
elif (process_type == "process_raw_urls"):
|
||||||
DB_Handler().process_raw_urls(batch_size=50)
|
DB_Handler().process_raw_urls(batch_size=50)
|
||||||
elif (process_type == "process_error_urls"):
|
elif (process_type == "process_error_urls"):
|
||||||
DB_Handler().process_error_urls(batch_size=50)
|
DB_Handler().process_error_urls(batch_size=50)
|
||||||
|
elif (process_type == "process_missing_kids_urls"):
|
||||||
|
DB_Handler().process_missing_kids_urls(batch_size=50)
|
||||||
|
elif ("process_missing_kids_urls" in process_type):
|
||||||
|
batch_size = int(process_type.split("_")[-1])
|
||||||
|
DB_Handler().process_missing_kids_urls(batch_size=batch_size)
|
||||||
else:
|
else:
|
||||||
logger.info("Task unknown!: {}".format(process_type))
|
logger.info("Task unknown!: {}".format(process_type))
|
||||||
|
|
||||||
|
|||||||
@@ -5,6 +5,8 @@ import os
|
|||||||
from .src.logger import get_logger
|
from .src.logger import get_logger
|
||||||
logger = get_logger()
|
logger = get_logger()
|
||||||
|
|
||||||
|
# TODO: Queues with priorities, process_raw_urls, process_error_urls least priority due to slowdown logic
|
||||||
|
|
||||||
def trigger_task(request, task):
|
def trigger_task(request, task):
|
||||||
"""View that enqueues a task."""
|
"""View that enqueues a task."""
|
||||||
queue = django_rq.get_queue('default') # Get the default queue
|
queue = django_rq.get_queue('default') # Get the default queue
|
||||||
@@ -13,5 +15,5 @@ def trigger_task(request, task):
|
|||||||
|
|
||||||
def link_list(request):
|
def link_list(request):
|
||||||
prefix = "http://localhost:8000/api"
|
prefix = "http://localhost:8000/api"
|
||||||
links = ["fetch_feeds", "fetch_parser", "process_raw_urls", "process_error_urls"]
|
links = ["fetch_feeds", "fetch_parser", "process_raw_urls", "process_error_urls", "process_missing_kids_urls_50", "process_missing_kids_urls_500000"]
|
||||||
return JsonResponse({"links": ["http://localhost:8080/?pgsql=matitos_db&username=supermatitos&db=matitos&ns=public&select=urls&order%5B0%5D=id"] + [os.path.join(prefix, l) for l in links]})
|
return JsonResponse({"links": ["http://localhost:8080/?pgsql=matitos_db&username=supermatitos&db=matitos&ns=public&select=urls&order%5B0%5D=id"] + [os.path.join(prefix, l) for l in links]})
|
||||||
|
|||||||
Reference in New Issue
Block a user