Working fetch feeds and process raw urls
This commit is contained in:
@@ -33,6 +33,16 @@ python manage.py inspectdb
|
||||
# Fields default:
|
||||
ts_fetch = models.DateTimeField(auto_now_add=True)
|
||||
status = models.TextField(default='raw') # This field type is a guess.
|
||||
|
||||
# URLContent:
|
||||
from django.contrib.postgres.fields import ArrayField
|
||||
|
||||
keywords = ArrayField(models.TextField(blank=True, null=True)) # This field type is a guess.
|
||||
tags = ArrayField(models.TextField(blank=True, null=True)) # This field type is a guess.
|
||||
authors = ArrayField(models.TextField(blank=True, null=True)) # This field type is a guess.
|
||||
image_main_url = models.TextField(blank=True, null=True)
|
||||
images_url = ArrayField(models.TextField(blank=True, null=True)) # This field type is a guess.
|
||||
videos_url = ArrayField(models.TextField(blank=True, null=True)) # This field type is a guess.
|
||||
```
|
||||
|
||||
* Environment variables
|
||||
@@ -51,8 +61,8 @@ REDIS_PORT=${REDIS_PORT:-6379}
|
||||
```
|
||||
# Generate content for models.py
|
||||
python manage.py inspectdb
|
||||
python manage.py makemigrations
|
||||
python manage.py migrate --fake
|
||||
# Migrations
|
||||
python manage.py makemigrations api; python manage.py migrate --fake-initial
|
||||
```
|
||||
|
||||
* Deploy
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
# Generated by Django 5.1.7 on 2025-03-07 16:56
|
||||
# Generated by Django 5.1.7 on 2025-03-13 17:01
|
||||
|
||||
import django.db.models.deletion
|
||||
from django.db import migrations, models
|
||||
@@ -62,8 +62,8 @@ class Migration(migrations.Migration):
|
||||
fields=[
|
||||
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
|
||||
('url', models.TextField(unique=True)),
|
||||
('ts_fetch', models.DateTimeField()),
|
||||
('status', models.TextField()),
|
||||
('ts_fetch', models.DateTimeField(auto_now_add=True)),
|
||||
('status', models.TextField(choices=[('raw', 'Raw'), ('error', 'Error'), ('valid', 'Valid'), ('unknown', 'Unknown'), ('invalid', 'Invalid'), ('duplicate', 'Duplicate')], default='raw')),
|
||||
],
|
||||
options={
|
||||
'db_table': 'urls',
|
||||
@@ -100,9 +100,16 @@ class Migration(migrations.Migration):
|
||||
('title', models.TextField(blank=True, null=True)),
|
||||
('description', models.TextField(blank=True, null=True)),
|
||||
('content', models.TextField(blank=True, null=True)),
|
||||
('valid_content', models.BooleanField(blank=True, null=True)),
|
||||
('language', models.CharField(blank=True, max_length=2, null=True)),
|
||||
('keywords', models.TextField(blank=True, null=True)),
|
||||
('tags', models.TextField(blank=True, null=True)),
|
||||
('authors', models.TextField(blank=True, null=True)),
|
||||
('image_urls', models.TextField(blank=True, null=True)),
|
||||
('image_main', models.TextField(blank=True, null=True)),
|
||||
('images_url', models.TextField(blank=True, null=True)),
|
||||
('videos_url', models.TextField(blank=True, null=True)),
|
||||
('url_host', models.TextField(blank=True, null=True)),
|
||||
('site_name', models.TextField(blank=True, null=True)),
|
||||
],
|
||||
options={
|
||||
'db_table': 'url_content',
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
from django.db import models
|
||||
from django.contrib.postgres.fields import ArrayField
|
||||
|
||||
# Create your models here.
|
||||
class Feed(models.Model):
|
||||
@@ -44,9 +45,16 @@ class UrlContent(models.Model):
|
||||
title = models.TextField(blank=True, null=True)
|
||||
description = models.TextField(blank=True, null=True)
|
||||
content = models.TextField(blank=True, null=True)
|
||||
tags = models.TextField(blank=True, null=True) # This field type is a guess.
|
||||
authors = models.TextField(blank=True, null=True) # This field type is a guess.
|
||||
image_urls = models.TextField(blank=True, null=True) # This field type is a guess.
|
||||
valid_content = models.BooleanField(blank=True, null=True)
|
||||
language = models.CharField(max_length=2, blank=True, null=True)
|
||||
keywords = ArrayField(models.TextField(blank=True, null=True)) # This field type is a guess.
|
||||
tags = ArrayField(models.TextField(blank=True, null=True)) # This field type is a guess.
|
||||
authors = ArrayField(models.TextField(blank=True, null=True)) # This field type is a guess.
|
||||
image_main_url = models.TextField(blank=True, null=True)
|
||||
images_url = ArrayField(models.TextField(blank=True, null=True)) # This field type is a guess.
|
||||
videos_url = ArrayField(models.TextField(blank=True, null=True)) # This field type is a guess.
|
||||
url_host = models.TextField(blank=True, null=True)
|
||||
site_name = models.TextField(blank=True, null=True)
|
||||
|
||||
class Meta:
|
||||
managed = False
|
||||
@@ -54,9 +62,17 @@ class UrlContent(models.Model):
|
||||
|
||||
|
||||
class Urls(models.Model):
|
||||
class STATUS_ENUM(models.TextChoices):
|
||||
RAW = "raw", "Raw"
|
||||
ERROR = "error", "Error"
|
||||
VALID = "valid", "Valid"
|
||||
UNKNOWN = "unknown", "Unknown"
|
||||
INVALID = "invalid", "Invalid"
|
||||
DUPLICATE = "duplicate", "Duplicate"
|
||||
|
||||
url = models.TextField(unique=True)
|
||||
ts_fetch = models.DateTimeField(auto_now_add=True)
|
||||
status = models.TextField(default='raw') # This field type is a guess.
|
||||
status = models.TextField(choices=STATUS_ENUM.choices, default=STATUS_ENUM.RAW) # This field type is a guess.
|
||||
|
||||
class Meta:
|
||||
managed = False
|
||||
|
||||
@@ -2,6 +2,7 @@ from ..models import Urls, UrlContent, UrlsSource, Source, WebsiteToFilter, Stat
|
||||
from .url_processor import process_url
|
||||
from django.utils import timezone
|
||||
from django.core.cache import cache
|
||||
from django.db import IntegrityError
|
||||
import hashlib
|
||||
from datetime import timedelta
|
||||
import re
|
||||
@@ -25,16 +26,32 @@ class DB_Handler():
|
||||
return cache.get(self._get_safe_cache_key(cache_key)) is not None
|
||||
|
||||
def insert_raw_urls(self, urls, source):
|
||||
|
||||
def clean_protocol(url):
|
||||
# http:// -> https://
|
||||
url = url.replace("http://", "https://")
|
||||
# "" -> https://
|
||||
if not (url.startswith("https://")):
|
||||
url = "https://" + url
|
||||
return url
|
||||
|
||||
try:
|
||||
logger.debug("Inserting raw URLs")
|
||||
# Empty?
|
||||
if (len(urls) == 0):
|
||||
logger.debug("Empty batch of urls (not writing to DB) for source: {}".format(source))
|
||||
return
|
||||
|
||||
# Default protocol https://
|
||||
urls_clean = [clean_protocol(url) for url in urls]
|
||||
|
||||
# Get the source (create if not exists)
|
||||
source_obj, created = Source.objects.get_or_create(source=source)
|
||||
|
||||
url_object_to_insert = []
|
||||
urls_to_insert = []
|
||||
# Per URL
|
||||
for url in urls:
|
||||
for url in urls_clean:
|
||||
|
||||
### Already processed URL?
|
||||
if (self._is_cached_key(url)):
|
||||
logger.debug("Already cached URL: {}".format(url))
|
||||
@@ -42,25 +59,43 @@ class DB_Handler():
|
||||
if (self._is_cached_key("{}{}".format(source, url))):
|
||||
logger.debug("Already cached (source, URL): {} {}".format(source, url))
|
||||
else:
|
||||
### Insert source
|
||||
# Get the source (create if not exists)
|
||||
source_obj, created = Source.objects.get_or_create(source=source)
|
||||
# Get URL ID
|
||||
url_obj = Urls.objects.get(url=url)
|
||||
# Create (id_source, id_url)
|
||||
UrlsSource.objects.create(id_source=source_obj.id, id_url=url_obj.id)
|
||||
### Insert (URL_id, source_id), since not cached
|
||||
# Get URL ID (should already be created)
|
||||
url_obj, created = Urls.objects.get_or_create(url=url)
|
||||
# Create (id_source, id_url) (shouldn't exist)
|
||||
UrlsSource.objects.get_or_create(id_source=source_obj, id_url=url_obj)
|
||||
else:
|
||||
# Add object to insert
|
||||
url_object_to_insert.append(Urls(url=url))
|
||||
# url_object_to_insert.append(Urls(url=url))
|
||||
urls_to_insert.append(url)
|
||||
|
||||
### Insert URLs & (URL_id, source_id)
|
||||
try:
|
||||
### Bulk insert, fails if duplicated URL (not retuning IDs when using ignore_conflicts=True)
|
||||
# URLs (ignore_conflicts=False to return IDs)
|
||||
bulk_created_urls = Urls.objects.bulk_create([Urls(url=url) for url in urls_to_insert], ignore_conflicts=False)
|
||||
# (URL_id, source_id)
|
||||
UrlsSource.objects.bulk_create([UrlsSource(id_source=source_obj, id_url=url_obj) for url_obj in bulk_created_urls], ignore_conflicts=True)
|
||||
except IntegrityError as e:
|
||||
### Fallback to one-by-one insert
|
||||
logger.debug("bulk_create exception while inserting raw URLs, falling back to non-bulk method")
|
||||
# One by one
|
||||
for url in urls_to_insert:
|
||||
# URL
|
||||
url_obj, created = Urls.objects.get_or_create(url=url)
|
||||
# (URL, source)
|
||||
UrlsSource.objects.get_or_create(id_source=source_obj, id_url=url_obj)
|
||||
except Exception as e:
|
||||
logger.warning("bulk_create unknown exception while inserting raw URLs: {}\n{}".format(e, traceback.format_exc()))
|
||||
# Avoid caching due to error on insertion
|
||||
urls_clean = []
|
||||
|
||||
### Bulk insert URLs, ignore conflicts if a url exists
|
||||
bulk_created_obj = Urls.objects.bulk_create(url_object_to_insert, ignore_conflicts=True)
|
||||
# Insert or update cache
|
||||
for url in urls:
|
||||
for url in urls_clean:
|
||||
self._cache_key(url)
|
||||
self._cache_key("{}{}".format(source, url))
|
||||
|
||||
logger.info("Inserted #{} raw URLs".format(len(url_object_to_insert)))
|
||||
logger.info("Inserted #{} raw URLs".format(len(urls_to_insert)))
|
||||
|
||||
except Exception as e:
|
||||
logger.warning("Exception inserting raw URLs: {}\n{}".format(e, traceback.format_exc()))
|
||||
@@ -83,6 +118,12 @@ class DB_Handler():
|
||||
# Pattern matching not required or not found, original article status
|
||||
return article_status
|
||||
|
||||
|
||||
def process_error_urls(self, batch_size=50):
|
||||
# Get batch of URLs, status='error'
|
||||
#error_urls = Urls.objects.SORTBY TS_FETCH....filter(status=Urls.STATUS_ENUM.RAW, ts_fetch__gte=time_delta_ts)[:batch_size]
|
||||
pass
|
||||
|
||||
def process_raw_urls(self, time_delta=timedelta(days=1), batch_size=50):
|
||||
try:
|
||||
logger.debug("Processing raw URLs")
|
||||
@@ -95,19 +136,18 @@ class DB_Handler():
|
||||
# Fetched during last 24 hours
|
||||
time_delta_ts = timezone.now() - time_delta
|
||||
# Get batch of URLs, status='raw' and fetched X days ago
|
||||
raw_urls = Urls.objects.filter(status='raw', ts_fetch__gte=time_delta_ts)[:batch_size]
|
||||
raw_urls = Urls.objects.filter(status=Urls.STATUS_ENUM.RAW, ts_fetch__gte=time_delta_ts)[:batch_size]
|
||||
# List of objects to bulk update
|
||||
updating_urls = []
|
||||
|
||||
# Per URL
|
||||
for obj_url in raw_urls:
|
||||
|
||||
##### Any domain to filter included in URL? -> Invalid
|
||||
if (any([d in obj_url.url for d in list_domains_to_filter])):
|
||||
logger.debug("Domain filter applied to input URL: {}".format(obj_url.url))
|
||||
# Update status
|
||||
obj_url.status = 'invalid'
|
||||
# Append to bulk update
|
||||
obj_url.status = Urls.STATUS_ENUM.INVALID
|
||||
obj_url.save()
|
||||
updating_urls.append(obj_url)
|
||||
# Next URL
|
||||
continue
|
||||
@@ -119,10 +159,10 @@ class DB_Handler():
|
||||
# Not none or handle as exception
|
||||
assert(dict_url_data is not None)
|
||||
except Exception as e:
|
||||
logger.debug("Error processing URL: {}\n{}".format(obj_url.url, str(e)))
|
||||
logger.debug("Error processing URL: {}\n{}\n".format(obj_url.url, str(e), traceback.format_exc()))
|
||||
# Update status
|
||||
obj_url.status = 'error'
|
||||
# Append to bulk update
|
||||
obj_url.status = Urls.STATUS_ENUM.ERROR
|
||||
obj_url.save()
|
||||
updating_urls.append(obj_url)
|
||||
# Next URL
|
||||
continue
|
||||
@@ -130,30 +170,30 @@ class DB_Handler():
|
||||
##### Canonical URL different? -> Duplicate
|
||||
if (dict_url_data.get("url") != dict_url_data.get("url_canonical")):
|
||||
# Update status
|
||||
obj_url.status = 'duplicate'
|
||||
# Append to bulk update
|
||||
obj_url.status = Urls.STATUS_ENUM.DUPLICATE
|
||||
obj_url.save()
|
||||
updating_urls.append(obj_url)
|
||||
|
||||
# Get or create URL with canonical form
|
||||
obj_url_canonical = Urls.objects.get_or_create(url=dict_url_data.get("url_canonical"))
|
||||
# Associate same sources to url -> url_canonical
|
||||
|
||||
obj_url_canonical, created = Urls.objects.get_or_create(url=dict_url_data.get("url_canonical"))
|
||||
# Get the sources id associated to obj_url.id
|
||||
url_sources = UrlsSource.objects.filter(id_url=obj_url.id)
|
||||
url_sources = UrlsSource.objects.filter(id_url=obj_url)
|
||||
for url_source_obj in url_sources:
|
||||
# Associate same sources to url_canonical (it might already exist)
|
||||
UrlsSource.objects.get_or_create(id_source=url_source_obj.id_source, id_url=obj_url_canonical.id)
|
||||
UrlsSource.objects.get_or_create(id_source=url_source_obj.id_source, id_url=obj_url_canonical)
|
||||
|
||||
# Next URL
|
||||
continue
|
||||
|
||||
##### Valid URL
|
||||
# Update status
|
||||
obj_url.status = 'valid'
|
||||
# Append to bulk update
|
||||
obj_url.status = Urls.STATUS_ENUM.VALID
|
||||
obj_url.save()
|
||||
updating_urls.append(obj_url)
|
||||
|
||||
# Create extracted URL data
|
||||
UrlContent.objects.create_or_update(
|
||||
id_url=obj_url.id,
|
||||
UrlContent.objects.create(
|
||||
id_url=obj_url,
|
||||
date_published=dict_url_data.get("publish_date"),
|
||||
title=dict_url_data.get("title"),
|
||||
description=dict_url_data.get("description"),
|
||||
@@ -163,7 +203,7 @@ class DB_Handler():
|
||||
keywords=dict_url_data.get("keywords"),
|
||||
tags=dict_url_data.get("tags"),
|
||||
authors=dict_url_data.get("authors"),
|
||||
image_main=dict_url_data.get("image_main"),
|
||||
image_main_url=dict_url_data.get("image_main_url"),
|
||||
images_url=dict_url_data.get("images_url"),
|
||||
videos_url=dict_url_data.get("videos_url"),
|
||||
url_host=dict_url_data.get("url_host"),
|
||||
@@ -180,11 +220,11 @@ class DB_Handler():
|
||||
logger.debug("Pattern matching, overriding with status {} for URL: {}".format(status_pattern_matching, obj_url.url))
|
||||
# Update, no need to append to updating_urls, already included
|
||||
obj_url.status = status_pattern_matching
|
||||
obj_url.save()
|
||||
|
||||
# Bulk update
|
||||
Urls.objects.bulk_update(updating_urls, ['status'])
|
||||
# TODO: Fix enum type issue. Bulk update
|
||||
# Urls.objects.bulk_update(updating_urls, ['status'])
|
||||
|
||||
logger.debug("Finished processing raw URLs")
|
||||
logger.info("Updated #{} raw URLs".format(len(updating_urls)))
|
||||
except Exception as e:
|
||||
logger.warning("Exception processing raw URLs: {}\n{}".format(e, traceback.format_exc()))
|
||||
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
from .logger import get_logger
|
||||
logger = get_logger()
|
||||
import newspaper
|
||||
import newspaper
|
||||
from urllib.parse import unquote
|
||||
# pip install langdetect
|
||||
#import langdetect
|
||||
#langdetect.DetectorFactory.seed = 0
|
||||
@@ -30,9 +31,9 @@ def process_url(url):
|
||||
"keywords": [k for k in set(article.keywords + article.meta_keywords) if k!=""],
|
||||
"tags": article.tags,
|
||||
"authors": article.authors,
|
||||
"image_main": article.top_image, # article.meta_img
|
||||
"images": article.images,
|
||||
"videos": article.videos,
|
||||
"image_main_url": article.top_image, # article.meta_img
|
||||
"images_url": article.images,
|
||||
"videos_url": article.movies,
|
||||
}
|
||||
|
||||
'''
|
||||
@@ -46,13 +47,16 @@ def process_url(url):
|
||||
|
||||
# Sanity check
|
||||
for k in dict_data.keys():
|
||||
if (type(k) is list):
|
||||
# Remove empty string
|
||||
dict_data[k] = [ e for e in dict_data[k] if e != "" ]
|
||||
if (type(dict_data[k]) is list):
|
||||
# Remove empty string, unquote special characters, e.g. "%20" -> " "
|
||||
dict_data[k] = [ unquote(e) for e in dict_data[k] if e != "" ]
|
||||
# NULL instead of empty list
|
||||
if (len(dict_data[k]) == 0):
|
||||
dict_data[k] = None
|
||||
else:
|
||||
elif (type(dict_data[k]) is str):
|
||||
# Unquote special characters
|
||||
if (dict_data[k] is not None):
|
||||
dict_data[k] = unquote(dict_data[k])
|
||||
# NULL instead of empty string
|
||||
if (dict_data[k] == ""):
|
||||
dict_data[k] = None
|
||||
|
||||
@@ -15,18 +15,20 @@ from src.credentials import db_connect_info, redis_connect_info
|
||||
db_handler = DB_Handler(db_connect_info, redis_connect_info)
|
||||
'''
|
||||
|
||||
import logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
from .src.logger import get_logger
|
||||
logger = get_logger()
|
||||
|
||||
@job
|
||||
def background_task(process_type: str):
|
||||
logger.info("Task triggered: {}".format(process_type))
|
||||
|
||||
try:
|
||||
FetchFeeds().run()
|
||||
|
||||
# DB_Handler().process_raw_urls()
|
||||
if (process_type == "fetch_feeds"):
|
||||
FetchFeeds().run()
|
||||
elif (process_type == "process_raw_urls"):
|
||||
DB_Handler().process_raw_urls(batch_size=3)
|
||||
else:
|
||||
logger.info("Task unknown!: {}".format(process_type))
|
||||
|
||||
|
||||
'''
|
||||
|
||||
@@ -2,5 +2,5 @@ from django.urls import path
|
||||
from .views import trigger_task
|
||||
|
||||
urlpatterns = [
|
||||
path('fetch', trigger_task, name='trigger_task')
|
||||
path('<str:task>', trigger_task, name='trigger_task'),
|
||||
]
|
||||
|
||||
@@ -1,10 +1,11 @@
|
||||
import django_rq
|
||||
from django.http import JsonResponse
|
||||
from .tasks import background_task
|
||||
from .src.logger import get_logger
|
||||
logger = get_logger()
|
||||
|
||||
def trigger_task(request):
|
||||
def trigger_task(request, task):
|
||||
"""View that enqueues a task."""
|
||||
queue = django_rq.get_queue('default') # Get the default queue
|
||||
job = queue.enqueue(background_task, "Hello from Django RQ!")
|
||||
|
||||
job = queue.enqueue(background_task, task)
|
||||
return JsonResponse({"message": "Task has been enqueued!", "job_id": job.id})
|
||||
|
||||
Reference in New Issue
Block a user