Files
matitos_news/app_urls/init_db.py
2025-08-13 14:29:44 +02:00

255 lines
11 KiB
Python

import argparse
import os
import psycopg
import json
import time
import urllib.parse
import html5lib
import feedparser
import requests
connection_info = "host={} port={} dbname={} user={} password={} connect_timeout=60".format(
os.environ.get("DB_HOST", "localhost"),
os.environ.get("DB_PORT", "5432"),
os.environ.get("DB_NAME", "matitos"),
os.environ.get("DB_USER", "supermatitos"),
os.environ.get("DB_PASSWORD", "supermatitos")
)
def wait_connection():
connected = False
while (not connected):
try:
# Connect to an existing database
with psycopg.connect(connection_info) as conn:
# Open a cursor to perform database operations
with conn.cursor() as cur:
# Create URLs table
c = cur.execute("SELECT 1;").fetchall()
connected = True
except psycopg.OperationalError as e:
print(str(e))
# Connection not ready...
# print(".", end="")
time.sleep(15)
except Exception as e:
print(str(e))
# Connection not ready...
# print("e", end="")
time.sleep(15)
print("DB connection ready")
def initialize_tables():
# Connect to an existing database
with psycopg.connect(connection_info) as conn:
# Open a cursor to perform database operations
with conn.cursor() as cur:
# Autocommit at end of transaction (Atomic creation of tables)
with conn.transaction() as tx:
try:
# Create URLs table
c = cur.execute("""
CREATE TYPE URL_STATUS AS ENUM ('raw', 'error', 'valid', 'unknown', 'invalid', 'duplicate');
CREATE TABLE URLS (
id SERIAL PRIMARY KEY,
url TEXT NOT NULL UNIQUE,
ts_fetch TIMESTAMPTZ NOT NULL DEFAULT NOW(),
status URL_STATUS NOT NULL DEFAULT 'raw' -- ,
-- status_wendy WENDY_STATUS DEFAULT NULL,
-- ts_wendy TIMESTAMPTZ DEFAULT NULL,
-- child_abuse BOOLEAN DEFAULT NULL,
);
CREATE INDEX idx_urls_status ON urls(status);
CREATE INDEX idx_urls_ts_fetch ON urls(ts_fetch);
CREATE TABLE URLS_DUPLICATE (
id_url_canonical INTEGER REFERENCES URLS(id),
id_url_duplicated INTEGER REFERENCES URLS(id),
PRIMARY KEY (id_url_canonical, id_url_duplicated)
);
CREATE TYPE SEARCH_TYPE AS ENUM ('rss_feed', 'keyword_search', 'url_host');
CREATE TABLE SEARCH (
id SMALLSERIAL PRIMARY KEY,
search TEXT NOT NULL UNIQUE,
type SEARCH_TYPE NOT NULL
-- language_country CHAR(5), -- Language: ISO 639-1 Code. Country: ISO 3166 ALPHA-2. e.g.: en-us. Required for search
-- UNIQUE(search, language_country)
);
CREATE INDEX idx_search_type ON SEARCH(type);
CREATE TABLE SOURCE (
id SMALLSERIAL PRIMARY KEY,
source TEXT NOT NULL UNIQUE
);
-- CREATE TABLE SEARCH_LANGUAGE (
-- language CHAR(2) NOT NULL, -- ISO 639-1 Code, e.g. "en"
-- country CHAR(2) NOT NULL, -- ISO 3166 ALPHA-2, e.g. "us"
-- PRIMARY KEY (language, country)
-- );
CREATE TABLE URLS_SOURCE_SEARCH (
id_url INTEGER REFERENCES URLS(id),
id_source SMALLINT REFERENCES SOURCE(id) ON UPDATE CASCADE ON DELETE RESTRICT,
id_search SMALLINT REFERENCES SEARCH(id) ON UPDATE CASCADE ON DELETE RESTRICT,
PRIMARY KEY(id_url, id_source, id_search)
);
CREATE INDEX idx_source ON URLS_SOURCE_SEARCH(id_source);
CREATE INDEX idx_search ON URLS_SOURCE_SEARCH(id_search);
CREATE TABLE STATUS_PATTERN_MATCHING (
pattern TEXT PRIMARY KEY,
priority SMALLINT NOT NULL,
status URL_STATUS NOT NULL
);
CREATE TABLE URL_CONTENT (
id_url INTEGER PRIMARY KEY REFERENCES URLS(id),
date_published TIMESTAMPTZ DEFAULT NOW(),
title TEXT,
description TEXT,
content TEXT,
valid_content BOOLEAN,
language CHAR(2), -- ISO 639-1 Code
keywords TEXT[],
tags TEXT[],
authors TEXT[],
image_main_url TEXT,
images_url TEXT[],
videos_url TEXT[],
url_host TEXT, -- www.breitbart.com
site_name TEXT -- Breitbart News
);
CREATE INDEX idx_tags ON URL_CONTENT USING GIN(tags);
CREATE INDEX idx_authors ON URL_CONTENT USING GIN(authors);
CREATE INDEX idx_date_published ON URL_CONTENT (date_published);
CREATE INDEX idx_valid_content ON URL_CONTENT (valid_content);
CREATE INDEX idx_language ON URL_CONTENT (language);
CREATE INDEX idx_url_host ON URL_CONTENT (url_host);
""")
except Exception as e:
print(str(e))
def find_feeds(url):
list_feeds = []
try:
def get_with_protocol(url):
# http:// -> https://
url = url.replace("http://", "https://")
# "" -> https://
if not (url.startswith("https://")):
url = "https://" + url
return url
url = get_with_protocol(url)
response = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'})
html = response.text
tree = html5lib.parse(html, namespaceHTMLElements=False)
# base for relative URLs
base = tree.findall('.//base')
base_url = base[0].attrib['href'] if base and 'href' in base[0].attrib else url
# prioritize Atom over RSS
links = tree.findall("""head/link[@rel='alternate'][@type='application/atom+xml']""") + tree.findall("""head/link[@rel='alternate'][@type='application/rss+xml']""")
for link in links:
href = link.attrib.get('href', '').strip()
if href:
r = requests.get(urllib.parse.urljoin(base_url, href), allow_redirects=True)
list_feeds.append(r.url)
# heuristic search for common feed paths
for suffix in [
'feed', 'feed/', 'rss', 'atom', 'feed.xml',
'/feed', '/feed/', '/rss', '/atom', '/feed.xml',
'index.atom', 'index.rss', 'index.xml', 'atom.xml', 'rss.xml',
'/index.atom', '/index.rss', '/index.xml', '/atom.xml', '/rss.xml',
'.rss', '/.rss', '?rss=1', '?feed=rss2',
]:
try:
potential_feed = urllib.parse.urljoin(base_url, suffix)
response = requests.get(potential_feed, allow_redirects=True)
if (response.status_code == 200) and (len(feedparser.parse(potential_feed).get("entries")) > 0):
list_feeds.append(response.url)
except Exception:
continue
except Exception as e:
print(f"An error occurred: {e}")
# Remove comments feed
list_feeds = [f for f in list_feeds if "/comments" not in f]
# Remove duplicates
return list(set(list_feeds))
def initialize_data():
# Read data
with open("init_data.json", "r") as f:
data_json = json.loads(f.read())
print("Initialization data:", data_json)
# Connect to an existing database
with psycopg.connect(connection_info) as conn:
# Open a cursor to perform database operations
with conn.cursor() as cur:
# Autocommit at end of transaction (Atomic creation of data)
# with conn.transaction() as tx:
# TODO: Language per search
# cur.execute( "INSERT INTO SEARCH (search, type) VALUES ('child abuse', 'keyword_search', 'en-us');" )
# cur.execute( "INSERT INTO SEARCH (search, type) VALUES ('child abuse', 'keyword_search', 'en-gb');" )
for list_pattern_status_priority in data_json.get("REGEX_PATTERN_STATUS_PRIORITY", []):
# Decode
pattern, status, priority = list_pattern_status_priority
# Query
query = "INSERT INTO STATUS_PATTERN_MATCHING (pattern, priority, status) VALUES ('{}', {}, '{}');".format(pattern, priority, status)
print(query)
cur.execute(query)
# Connect to an existing database
with psycopg.connect(connection_info) as conn:
# Open a cursor to perform database operations
with conn.cursor() as cur:
# Feeds, URL host, keyword search
for search_type, list_searches in data_json.get("SEARCH", {}).items():
for search in list_searches:
query = "INSERT INTO SEARCH (search, type) VALUES ('{}', '{}');".format(search, search_type)
print(query)
cur.execute(query)
# Try finding RSS feed
if (search_type == "url_host"):
url_host = search
list_feeds = find_feeds(url_host)
# If not exists, insert feed
for feed in list_feeds:
query = "INSERT INTO SEARCH (search, type) VALUES ('{}', '{}') ON CONFLICT DO NOTHING;".format(feed, "rss_feed")
print(query)
cur.execute(query)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Database initialization')
parser.add_argument('--initialize_tables', help='Create DB tables', action='store_true', default=False)
parser.add_argument('--initialize_data', help='Insert data', action='store_true', default=False)
args = parser.parse_args()
# Wait for DB connection
wait_connection()
if (args.initialize_tables):
print("Initializing tables")
initialize_tables()
if (args.initialize_data):
print("Initializing data")
initialize_data()