Files
matitos_news/app_fetcher/src/db_utils.py
Luciano Gervasoni 54ebd58070 Url content
2025-03-07 00:34:46 +01:00

456 lines
24 KiB
Python

import psycopg
import redis
import traceback
import random
import requests
import json
import os
from .url_utils import process_article
import logging
logging.basicConfig(format='%(filename)s | %(levelname)s | %(asctime)s | %(message)s')
logger = logging.getLogger("news_fetcher")
# TODO: URL_DB_HANDLER, _get_search_list, _get_url_host, _get_url_host_list, ...
# The rest, elsewhere
class URL_DB_Writer():
def __init__(self, db_connect_info, redis_connect_info):
logger.debug("Initializing URL DB writer")
self.db_connect_info = db_connect_info
self.redis_instance = redis.Redis(host=redis_connect_info.get("redis_host"), port=redis_connect_info.get("redis_port"))
self.redis_expiry_seconds = redis_connect_info.get("expiry_seconds", 172800) # Default: 48 hours
try:
self.redis_instance.ping()
logger.debug("Succesfully pinged Redis")
except Exception as e:
logger.warning("Error trying to ping Redis: {}".format(str(e)))
def get_urls_count(self, last_minutes_check):
#####################
### Get number of URLs within last X minutes
#####################
try:
# Update
with psycopg.connect(self.db_connect_info) as conn:
# Open cursor
cursor = conn.cursor()
num_urls = cursor.execute("SELECT COUNT(*) FROM URLS WHERE ts_fetch >= current_timestamp - interval '{} minutes';".format(last_minutes_check)).fetchone()[0]
except Exception as e:
logger.warning("Error updating URLs status: {}".format(str(e)))
num_urls = None
return num_urls
def _format(self, values):
# Repalce single quote ' with ''. Based on https://stackoverflow.com/a/12320729
# String -> 'string', Int -> '1' (string-based), None -> NULL (no quotes for pgSQL to interpret Null value)
if (type(values) == list) or (type(values) == tuple):
insert_args = "(" + ", ".join([ "NULL" if v is None else "'" + str(v).replace("'", "''") + "'" for v in values]) + ")"
elif (type(values) == str):
insert_args = "({})".format( "NULL" if values is None else "'" + values.replace("'", "''") + "'" )
else:
logger.warning("Error formatting input values: {}".format(values))
assert False
return insert_args
def _get_cached_canonical_url(self, url):
### Redis: URL processed recently? -> Avoid increasing SERIAL counter & efficiency of DB
try:
filter_url = self.redis_instance.get(url)
if (filter_url is not None):
filter_url = filter_url.decode("utf-8")
except Exception as e:
logger.warning("Exception querying Redis: {}".format(str(e)))
filter_url = None
return filter_url
def _update_urls_status(self, dict_status_ids):
#####################
### Update status to array of URL IDs
#####################
try:
# Update
with psycopg.connect(self.db_connect_info) as conn:
# Open cursor
cursor = conn.cursor()
# Autocommit at end of transaction (Atomic insert of URLs and sources)
with conn.transaction() as tx:
for key_status, value_ids in dict_status_ids.items():
cursor.execute("UPDATE URLS SET status='{}' WHERE id IN ({});".format(key_status, ",".join([str(v) for v in value_ids])))
except Exception as e:
logger.warning("Error updating URLs status: {}".format(str(e)))
def _get_missing_kids_urls(self, num_urls=None):
#####################
### Get list of Missing Kids URLs
#####################
try:
missing_kids_ids_and_urls = []
if (num_urls is None):
limit = 500
else:
limit = num_urls
offset = 0
with psycopg.connect(self.db_connect_info) as conn:
# Open cursor
cursor = conn.cursor()
while True:
# Query
missing_kids_ids_and_urls_query = cursor.execute("SELECT id, url, status FROM URLS WHERE url LIKE '%missingkids.org/poster%' ORDER BY ts_fetch DESC LIMIT {} OFFSET {};".format(limit, offset)).fetchall()
# Finished?
if (len(missing_kids_ids_and_urls_query) == 0):
break
# Extend
missing_kids_ids_and_urls = missing_kids_ids_and_urls + missing_kids_ids_and_urls_query
# Offset
offset += len(missing_kids_ids_and_urls_query)
# Stop?
if (num_urls is not None) and (len(missing_kids_ids_and_urls) >= num_urls):
break
except Exception as e:
logger.warning("Error getting Missing Kids URLs: {}".format(str(e)))
missing_kids_ids_and_urls = []
return missing_kids_ids_and_urls
def _get_error_urls(self, num_urls=None):
#####################
### Get list of Missing Kids URLs
#####################
try:
error_urls = []
if (num_urls is None):
limit = 500
else:
limit = num_urls
offset = 0
with psycopg.connect(self.db_connect_info) as conn:
# Open cursor
cursor = conn.cursor()
while True:
# Query
error_urls_query = cursor.execute("SELECT id, url FROM URLS WHERE status='error' ORDER BY ts_fetch DESC LIMIT {} OFFSET {};".format(limit, offset)).fetchall()
# Finished?
if (len(error_urls_query) == 0):
break
# Extend
error_urls = error_urls + error_urls_query
# Offset
offset += len(error_urls_query)
# Stop?
if (num_urls is not None) and (len(error_urls) >= num_urls):
break
except Exception as e:
logger.warning("Error getting Error URLs: {}".format(str(e)))
error_urls = []
return error_urls
def _decode_urls(self, urls_fetched, list_domains_to_filter, list_pattern_status_tuple): # TODO: language for urls_fetched...
"""
# TODO: REFACTOR
For each input url
Already processed?
-> Update on Redis expire time
-> Associate to source
Not processed? Get main URL:
-> URL Canonical valid?
-> Rely on this as main URL
-> URL Canonical not valid?
-> Use input url, unless it's a news.google.com link
-> If news.google.com link, filter out. REDIS?
Main URL processing:
-> Update in REDIS, association url -> url_canonical
-> url != url_canonical: Add in duplicate table
If both != news.google.com
"""
# URLs to insert, URLs duplicated association, URL to Canonical form
list_insert_url_tuple_args, list_tuple_canonical_duplicate_urls, dict_full_urls_to_canonical = [], [], {}
# URL VS CANONICAL:
# News URL returned: https://news.google.com/articles/CBMifmh0dHBzOi8vd3d3LmJyZWl0YmFydC5jb20vMm5kLWFtZW5kbWVudC8yMDIzLzA0LzAzL2dvdi1kZXNhbnRpcy1zaWducy1iaWxsLW1ha2luZy1mbG9yaWRhLXRoZS0yNnRoLWNvbnN0aXR1dGlvbmFsLWNhcnJ5LXN0YXRlL9IBAA?hl=en-US&gl=US&ceid=US%3Aen
# Corresponds to canonical URL: https://www.breitbart.com/2nd-amendment/2023/04/03/gov-desantis-signs-bill-making-florida-the-26th-constitutional-carry-state/
for url in urls_fetched:
# Domain to filter? Input url
filter_due_to_domain = False
for domain_to_filter in list_domains_to_filter:
if (domain_to_filter in url):
logger.debug("Domain filter applied based on {} for input URL: {}".format(domain_to_filter, url))
filter_due_to_domain = True
if (filter_due_to_domain):
continue
# URL processed recently? -> Filter and avoid increasing SERIAL counter & efficiency of DB
cached_canonical_url = self._get_cached_canonical_url(url)
if (cached_canonical_url is not None):
# Even if url processed, need to add url_canonical to list_filtered_urls, so as to associate search source to canonical URL (canonical is the main URL entry)
dict_full_urls_to_canonical[url] = cached_canonical_url # X -> Y
# If url has been processed, so was its canonical form
logger.debug("Filtering out already inserted (processed) URL and its canonical form: {} {}".format(url, cached_canonical_url))
continue
# Process TODO: Add language...
url_canonical, article_elements, article_status = process_article(url, list_pattern_status_tuple)
# TODO: Store article_elements information to insert into OS after inserted into DB (and therefore having associated url_id)
# Could not retrieve redirection for news.google.com based URL? Continue (avoid inserting in DB)
if (url_canonical is None) and ("news.google.com" in url):
logger.debug("Filtering empty canonical link for base URL based on news.google.com: {}".format(url))
continue
# Canonical URL still news.google.com? Continue (avoid inserting in DB)
if (url_canonical is not None) and ("news.google.com" in url_canonical):
logger.debug("Filtering canonical news.google.com based URL: {}".format(url_canonical))
continue
# Domain to filter? Input canonical_url
filter_due_to_domain = False
for domain_to_filter in list_domains_to_filter:
if (url_canonical is not None) and (domain_to_filter in url_canonical):
filter_due_to_domain = True
if (filter_due_to_domain):
logger.info("Filtering due to domain input URL, Canonical_URL: {} {}".format(url, url_canonical))
continue
if (url_canonical is None) or (article_status == "error"):
logger.debug("Processing failed for URL: {}".format(url))
# Still insert URL with "error"? -> If processed later, might have inconsistent sources (url vs url_canonical). Only store if not news.google.com based
if ("news.google.com" in url) or ("consent.google.com" in url):
logging.debug("Not able to process Google News link, skipping: {}".format(url))
else:
dict_full_urls_to_canonical[url] = url # X -> X
list_insert_url_tuple_args.append( (url, article_status) )
continue
# URL was not processed (not sure canonical yet). Generate URL_CANONICAL <-> URL_ORIGINAL association if they're different
if (url_canonical != url):
list_tuple_canonical_duplicate_urls.append( (url_canonical, url) )
# Dict: url -> canonical (update association)
dict_full_urls_to_canonical[url] = url_canonical # X -> Y or X
# Canonical URL processed recently? -> Filter and avoid increasing SERIAL counter & efficiency of DB
if (self._get_cached_canonical_url(url_canonical) is not None):
# Canonical URL was already processed
logger.debug("Filtering out already inserted (processed) URL canonical: {}".format(url_canonical))
else:
# Insert url_canonical to DB formatted
list_insert_url_tuple_args.append( (url_canonical, article_status) )
# Canonical URL different? Process
if (url_canonical != url):
if ("news.google.com" in url) or ("consent.google.com" in url):
logging.debug("Not adding google.news.com based link, skipping: {}".format(url))
else:
# Fetched url -> duplicate (using canonical as main link)
article_status = "duplicate"
# Insert url (non-canonical) to DB formatted
list_insert_url_tuple_args.append( (url, article_status) )
return list_insert_url_tuple_args, list_tuple_canonical_duplicate_urls, dict_full_urls_to_canonical
def _insert_urls(self, cursor, list_insert_url_tuple_args):
#####################
### Insert URLs with status
#####################
if (len(list_insert_url_tuple_args) > 0):
insert_args = ', '.join( [ self._format(t) for t in list_insert_url_tuple_args] )
# Insert. (url_1, status_1), (url_2, status_2), ...
sql_code = "INSERT INTO URLS {} VALUES {} ON CONFLICT (url) DO NOTHING;".format("(url, status)", insert_args)
# logger.debug("SQL CODE: {}".format(sql_code))
c = cursor.execute(sql_code)
# NOTE: Not using "RETURNING id" since previously inserted URLs are not returned (ON CONFLICT)
# https://stackoverflow.com/questions/35949877/how-to-include-excluded-rows-in-returning-from-insert-on-conflict/35953488#35953488
def _insert_urls_duplicated(self, cursor, list_tuple_canonical_duplicate_urls):
#####################
### Insert duplicated URLs
#####################
if (len(list_tuple_canonical_duplicate_urls) > 0):
# Flatten, format, set to remove duplicates
args_duplicated_urls_set = "(" + ', '.join( set( [ "'" + str(y).replace("'", "''") + "'" for x in list_tuple_canonical_duplicate_urls for y in x] ) ) + ")"
# Dict: url -> id
dict_url_to_id = {}
# Get url -> id association to populate duplicated URLs
for (id_, url_) in cursor.execute("SELECT id, url FROM URLS WHERE url IN {};".format(args_duplicated_urls_set)).fetchall():
dict_url_to_id[url_] = id_
# Convert tuples (url_canonical, url) -> (id_url_canonical, id_url) to insert in DB
# ORIGINAL CODE. Issue, might not have found association to all urls
### list_tuple_canonical_duplicate_urls_ids = [ (dict_url_to_id[t[0]], dict_url_to_id[t[1]]) for t in list_tuple_canonical_duplicate_urls]
list_tuple_canonical_duplicate_urls_ids = []
for (url_1, url_2) in list_tuple_canonical_duplicate_urls:
id_url_1, id_url_2 = dict_url_to_id.get(url_1), dict_url_to_id.get(url_2)
if (id_url_1 is None) or (id_url_2 is None):
logger.debug("Skipping duplicate association due to no url -> id_url mapping available for tuple: {} {}".format(url_1, url_2))
else:
list_tuple_canonical_duplicate_urls_ids.append( (id_url_1, id_url_2) )
if (len(list_tuple_canonical_duplicate_urls_ids) > 0):
insert_args = ', '.join( [ self._format(t) for t in list_tuple_canonical_duplicate_urls_ids] )
# Insert. (id_url_canonical_1, id_url_1), ...
sql_code = "INSERT INTO URLS_DUPLICATE {} VALUES {} ON CONFLICT DO NOTHING;".format("(id_url_canonical, id_url_duplicated)", insert_args)
# logger.debug("SQL CODE: {}".format(sql_code))
c = cursor.execute(sql_code)
def _get_pattern_status_list(self):
#####################
### Get list of domains to filter
#####################
# TODO: Cache on redis and query once every N hours? ...
try:
with psycopg.connect(self.db_connect_info) as conn:
# Open cursor
cursor = conn.cursor()
# TODO: Cache on Redis
list_pattern_status = cursor.execute("SELECT pattern, priority, status FROM STATUS_PATTERN_MATCHING;").fetchall()
except Exception as e:
logger.warning("Error getting pattern status list: {}".format(str(e)))
list_pattern_status = []
return list_pattern_status
def _get_domains_to_filter(self):
#####################
### Get list of domains to filter
#####################
# TODO: Cache on redis and query once every N hours? ...
try:
with psycopg.connect(self.db_connect_info) as conn:
# Open cursor
cursor = conn.cursor()
# TODO: Cache on Redis
sites_to_filter = [e[0] for e in cursor.execute("SELECT url_host FROM WEBSITE_TO_FILTER;").fetchall() ]
except Exception as e:
logger.warning("Error getting domains to filter: {}".format(str(e)))
sites_to_filter = []
return sites_to_filter
def _get_cached_source_id(self, source):
### Redis: URL processed recently? -> Avoid increasing SERIAL counter & efficiency of DB
try:
source_id = self.redis_instance.get(source)
if (source_id is not None):
source_id = source_id.decode("utf-8")
except Exception as e:
logger.warning("Exception querying Redis: {}".format(str(e)))
source_id = None
return source_id
def _get_source_id(self, cursor, source):
#####################
### Get source corresponding id
#####################
# Cached?
id_source = self._get_cached_source_id(source)
if (id_source is None):
c = cursor.execute("SELECT id FROM SOURCE WHERE source='{}'".format(source.replace("'", "''"))).fetchone()
if (c is None) or (len(c) == 0):
# Source does not exist, insert and get id
c = cursor.execute("INSERT INTO SOURCE (source) VALUES ('{}') RETURNING id;".format(source.replace("'", "''"))).fetchone()
# Decode source id
id_source = c[0]
# Cache
self.redis_instance.set(source, id_source, ex=self.redis_expiry_seconds)
return id_source
def _get_urls_id(self, cursor, urls_full):
#####################
### Get id of inserted and filtered URLs
#####################
# TODO: Cache url -> url_id, url_canonical
if (len(urls_full) == 0):
return []
# Get inserted and filtered URL ids (unnested). Filtered URLs are also retrieved since they might have been fetched from a new source
in_inserted_filtered_urls = "(" + ', '.join(["'" + u.replace("'", "''") + "'" for u in urls_full]) + ")"
id_urls_related = [ i[0] for i in cursor.execute("SELECT id FROM URLS WHERE url IN {};".format(in_inserted_filtered_urls)).fetchall() ]
return id_urls_related
def _insert_urls_source(self, cursor, id_urls_related, id_source):
#####################
### Insert URL sources: (id_url_1, id_source), (id_url_2, id_source), ...
#####################
if (len(id_urls_related) == 0) or (id_source is None):
return
columns = "(id_url, id_source)"
insert_args = ', '.join( [ self._format([id_url, id_source]) for id_url in id_urls_related ] )
# Insert
sql_code = "INSERT INTO URLS_SOURCE {} VALUES {} ON CONFLICT DO NOTHING;".format(columns, insert_args)
# logger.debug("SQL CODE: {}".format(sql_code))
c = cursor.execute(sql_code)
def write_batch(self, urls_fetched, source):
# Chunks of 50 elements
n = 50
# Divide in small chunks
urls_fetched_chunks = [urls_fetched[i:i + n] for i in range(0, len(urls_fetched), n)]
# Process
for urls_fetched_chunk_i in urls_fetched_chunks:
self._write_small_batch(urls_fetched_chunk_i, source)
def _write_small_batch(self, urls_fetched, source):
try:
logger.info("Fetched #{} URLs, source: {}".format(len(urls_fetched), source))
if (len(urls_fetched) == 0):
logger.debug("Empty batch of urls (not writing to DB) for source: {}".format(source))
return
# Shuffle URLs to reduce continuous URLs of same URL host (minimize chance of being blocked for too many continuous requests)
random.shuffle(urls_fetched)
# Get list of domains to filter
list_domains_to_filter = self._get_domains_to_filter()
# Get list of (pattern, priority, status) tuples to override status if required
list_pattern_status_tuple = self._get_pattern_status_list()
# Sort pattern tuples by priority
list_pattern_status_tuple.sort(key=lambda tup: tup[1], reverse=True)
# Process URLs to update DB
list_insert_url_tuple_args, list_tuple_canonical_duplicate_urls, dict_full_urls_to_canonical = self._decode_urls(urls_fetched, list_domains_to_filter, list_pattern_status_tuple)
# Full set of URL and its canonical form (to associate them to a search), both to insert and filter
urls_full = set(dict_full_urls_to_canonical.keys()).union( set(dict_full_urls_to_canonical.values()) )
# Insert
with psycopg.connect(self.db_connect_info) as conn:
# Open cursor
cursor = conn.cursor()
# Autocommit at end of transaction (Atomic insert of URLs and sources)
with conn.transaction() as tx:
# Insert processed URLs
self._insert_urls(cursor, list_insert_url_tuple_args)
# Insert URLs duplicated (canonical != fetched url)
self._insert_urls_duplicated(cursor, list_tuple_canonical_duplicate_urls)
# Get source id in DB
id_source = self._get_source_id(cursor, source)
# Get IDs of all related URLs
id_urls_related = self._get_urls_id(cursor, urls_full)
# Insert search source associated to URLs
self._insert_urls_source(cursor, id_urls_related, id_source)
# Update Redis status of inserted and filtered URLs after writing to DB
for url, url_canonical in dict_full_urls_to_canonical.items():
try:
# Set with updated expiry time
self.redis_instance.set(url, url_canonical, ex=self.redis_expiry_seconds)
if (url != url_canonical):
self.redis_instance.set(url_canonical, url_canonical, ex=self.redis_expiry_seconds)
except Exception as e:
logger.warning("Exception running set in Redis: {}".format(str(e)))
if (len(list_insert_url_tuple_args) > 0):
try:
webhook_token = os.environ.get("CLIQ_WEBHOOK_TOKEN")
endpoint_message = "https://cliq.zoho.com/api/v2/channelsbyname/urlretrievalbot/message?zapikey={}".format(webhook_token)
payload = json.dumps({"text": "Fetched #{} new URLs, source: {}".format(len(list_insert_url_tuple_args), source) })
r = requests.post(endpoint_message, data=payload)
except Exception as e:
logger.warning("Webhook failed: {}".format(str(e)))
logger.debug("URL DB write finished")
except Exception as e:
logger.warning( "Exception writing to URL_DB:\n{}".format(traceback.format_exc()) )
logger.debug( "Exception --- List of URLs: {}".format(str(urls_fetched)) )