Files
matitos_news/app_urls/db.py
2025-04-04 12:28:22 +02:00

173 lines
8.7 KiB
Python

import argparse
import os
import psycopg
import re
import time
connection_info = "host={} port={} dbname={} user={} password={} connect_timeout=60".format(
os.environ.get("DB_HOST", "localhost"),
os.environ.get("DB_PORT", "5432"),
os.environ.get("DB_NAME", "matitos"),
os.environ.get("DB_USER", "supermatitos"),
os.environ.get("DB_PASSWORD", "supermatitos")
)
def wait_connection():
connected = False
while (not connected):
try:
# Connect to an existing database
with psycopg.connect(connection_info) as conn:
# Open a cursor to perform database operations
with conn.cursor() as cur:
# Create URLs table
c = cur.execute("SELECT 1;").fetchall()
connected = True
except psycopg.OperationalError as e:
# Connection not ready...
# print(".", end="")
time.sleep(2)
except Exception as e:
# Connection not ready...
# print("e", end="")
time.sleep(2)
print("DB connection ready")
def initialize_tables():
# Connect to an existing database
with psycopg.connect(connection_info) as conn:
# Open a cursor to perform database operations
with conn.cursor() as cur:
# Autocommit at end of transaction (Atomic creation of tables)
with conn.transaction() as tx:
# Create URLs table
c = cur.execute("""
CREATE TYPE URL_STATUS AS ENUM ('raw', 'error', 'valid', 'unknown', 'invalid', 'duplicate');
CREATE TABLE URLS (
id SERIAL PRIMARY KEY,
url TEXT NOT NULL UNIQUE,
ts_fetch TIMESTAMPTZ NOT NULL DEFAULT NOW(),
status URL_STATUS NOT NULL DEFAULT 'raw' -- ,
-- status_wendy WENDY_STATUS DEFAULT NULL,
-- ts_wendy TIMESTAMPTZ DEFAULT NULL
);
CREATE INDEX idx_urls_status ON urls(status);
CREATE INDEX idx_urls_ts_fetch ON urls(ts_fetch);
CREATE TABLE URLS_DUPLICATE (
id_url_canonical INTEGER REFERENCES URLS(id),
id_url_duplicated INTEGER REFERENCES URLS(id),
PRIMARY KEY (id_url_canonical, id_url_duplicated)
);
CREATE TYPE SEARCH_TYPE AS ENUM ('rss_feed', 'keyword_search', 'url_host');
CREATE TABLE SEARCH (
id SMALLSERIAL PRIMARY KEY,
search TEXT NOT NULL UNIQUE,
type SEARCH_TYPE NOT NULL
-- language_country CHAR(5), -- Language: ISO 639-1 Code. Country: ISO 3166 ALPHA-2. e.g.: en-us. Required for search
-- UNIQUE(search, language_country)
);
CREATE INDEX idx_search_type ON SEARCH(type);
CREATE TABLE SOURCE (
id SMALLSERIAL PRIMARY KEY,
source TEXT NOT NULL UNIQUE
);
-- CREATE TABLE SEARCH_LANGUAGE (
-- language CHAR(2) NOT NULL, -- ISO 639-1 Code, e.g. "en"
-- country CHAR(2) NOT NULL, -- ISO 3166 ALPHA-2, e.g. "us"
-- PRIMARY KEY (language, country)
-- );
CREATE TABLE URLS_SOURCE_SEARCH (
id_url INTEGER REFERENCES URLS(id),
id_source SMALLINT REFERENCES SOURCE(id) ON UPDATE CASCADE ON DELETE RESTRICT,
id_search SMALLINT REFERENCES SEARCH(id) ON UPDATE CASCADE ON DELETE RESTRICT,
PRIMARY KEY(id_url, id_source, id_search)
);
CREATE INDEX idx_source ON URLS_SOURCE_SEARCH(id_source);
CREATE INDEX idx_search ON URLS_SOURCE_SEARCH(id_search);
CREATE TABLE STATUS_PATTERN_MATCHING (
pattern TEXT PRIMARY KEY,
priority SMALLINT NOT NULL,
status URL_STATUS NOT NULL
);
CREATE TABLE URL_CONTENT (
id_url INTEGER PRIMARY KEY REFERENCES URLS(id),
date_published TIMESTAMPTZ DEFAULT NOW(),
title TEXT,
description TEXT,
content TEXT,
valid_content BOOLEAN,
language CHAR(2), -- ISO 639-1 Code
keywords TEXT[],
tags TEXT[],
authors TEXT[],
image_main_url TEXT,
images_url TEXT[],
videos_url TEXT[],
url_host TEXT, -- www.breitbart.com
site_name TEXT -- Breitbart News
);
CREATE INDEX idx_tags ON URL_CONTENT USING GIN(tags);
CREATE INDEX idx_authors ON URL_CONTENT USING GIN(authors);
CREATE INDEX idx_date_published ON URL_CONTENT (date_published);
CREATE INDEX idx_valid_content ON URL_CONTENT (valid_content);
CREATE INDEX idx_language ON URL_CONTENT (language);
CREATE INDEX idx_url_host ON URL_CONTENT (url_host);
""")
def initialize_data():
# Connect to an existing database
with psycopg.connect(connection_info) as conn:
# Open a cursor to perform database operations
with conn.cursor() as cur:
# Autocommit at end of transaction (Atomic creation of data)
with conn.transaction() as tx:
# Feeds
cur.execute( "INSERT INTO SEARCH (search, type) VALUES ('https://api.missingkids.org/missingkids/servlet/XmlServlet?act=rss&LanguageCountry=en_US&orgPrefix=NCMC', 'rss_feed');" )
# Websites of interest
cur.execute( "INSERT INTO SEARCH (search, type) VALUES ('missingkids.org/poster', 'url_host');" )
cur.execute( "INSERT INTO SEARCH (search, type) VALUES ('missingkids.org/new-poster', 'url_host');" )
cur.execute( "INSERT INTO SEARCH (search, type) VALUES ('breitbart.com', 'url_host');" )
# Search keywords
cur.execute( "INSERT INTO SEARCH (search, type) VALUES ('child abuse', 'keyword_search');" )
# TODO: Language per search
# cur.execute( "INSERT INTO SEARCH (search, type) VALUES ('child abuse', 'keyword_search', 'en-us');" )
# cur.execute( "INSERT INTO SEARCH (search, type) VALUES ('child abuse', 'keyword_search', 'en-gb');" )
# Status update based on pattern matching (with priority to apply in order). Regex test https://regex101.com/
cur.execute( "INSERT INTO STATUS_PATTERN_MATCHING (pattern, priority, status) VALUES ('{}', 50, 'invalid');".format(".*{}.*".format(re.escape("youtube.com/"))) )
cur.execute( "INSERT INTO STATUS_PATTERN_MATCHING (pattern, priority, status) VALUES ('{}', 50, 'invalid');".format(".*{}.*".format(re.escape("tiktok.com/"))) )
cur.execute( "INSERT INTO STATUS_PATTERN_MATCHING (pattern, priority, status) VALUES ('{}', 50, 'invalid');".format(".*{}.*".format(re.escape("twitter.com/"))) )
cur.execute( "INSERT INTO STATUS_PATTERN_MATCHING (pattern, priority, status) VALUES ('{}', 50, 'invalid');".format(".*{}.*".format(re.escape("reddit.com/"))) )
cur.execute( "INSERT INTO STATUS_PATTERN_MATCHING (pattern, priority, status) VALUES ('{}', 50, 'invalid');".format(".*{}.*".format(re.escape("libreddit.de/"))) )
cur.execute( "INSERT INTO STATUS_PATTERN_MATCHING (pattern, priority, status) VALUES ('{}', 50, 'invalid');".format(".*{}.*".format(re.escape("radio.foxnews.com/"))) )
def main(name):
print('Hello, %s!' % name)
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Database initialization')
parser.add_argument('--initialize_tables', help='Create DB tables', action='store_true', default=False)
parser.add_argument('--initialize_data', help='Insert data', action='store_true', default=False)
args = parser.parse_args()
# Wait for DB connection
wait_connection()
if (args.initialize_tables):
print("Initializing tables")
initialize_tables()
if (args.initialize_data):
print("Initializing data")
initialize_data()