General search fix, status pattern match regex, find feeds on startup
This commit is contained in:
244
app_urls/init_db.py
Normal file
244
app_urls/init_db.py
Normal file
@@ -0,0 +1,244 @@
|
||||
import argparse
|
||||
import os
|
||||
import psycopg
|
||||
import json
|
||||
import time
|
||||
import urllib.parse
|
||||
import html5lib
|
||||
import feedparser
|
||||
import requests
|
||||
|
||||
connection_info = "host={} port={} dbname={} user={} password={} connect_timeout=60".format(
|
||||
os.environ.get("DB_HOST", "localhost"),
|
||||
os.environ.get("DB_PORT", "5432"),
|
||||
os.environ.get("DB_NAME", "matitos"),
|
||||
os.environ.get("DB_USER", "supermatitos"),
|
||||
os.environ.get("DB_PASSWORD", "supermatitos")
|
||||
)
|
||||
|
||||
def wait_connection():
|
||||
connected = False
|
||||
while (not connected):
|
||||
try:
|
||||
# Connect to an existing database
|
||||
with psycopg.connect(connection_info) as conn:
|
||||
# Open a cursor to perform database operations
|
||||
with conn.cursor() as cur:
|
||||
# Create URLs table
|
||||
c = cur.execute("SELECT 1;").fetchall()
|
||||
connected = True
|
||||
|
||||
except psycopg.OperationalError as e:
|
||||
# Connection not ready...
|
||||
# print(".", end="")
|
||||
time.sleep(2)
|
||||
except Exception as e:
|
||||
# Connection not ready...
|
||||
# print("e", end="")
|
||||
time.sleep(2)
|
||||
|
||||
print("DB connection ready")
|
||||
|
||||
def initialize_tables():
|
||||
# Connect to an existing database
|
||||
with psycopg.connect(connection_info) as conn:
|
||||
# Open a cursor to perform database operations
|
||||
with conn.cursor() as cur:
|
||||
# Autocommit at end of transaction (Atomic creation of tables)
|
||||
with conn.transaction() as tx:
|
||||
try:
|
||||
# Create URLs table
|
||||
c = cur.execute("""
|
||||
CREATE TYPE URL_STATUS AS ENUM ('raw', 'error', 'valid', 'unknown', 'invalid', 'duplicate');
|
||||
|
||||
CREATE TABLE URLS (
|
||||
id SERIAL PRIMARY KEY,
|
||||
url TEXT NOT NULL UNIQUE,
|
||||
ts_fetch TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
||||
status URL_STATUS NOT NULL DEFAULT 'raw' -- ,
|
||||
-- status_wendy WENDY_STATUS DEFAULT NULL,
|
||||
-- ts_wendy TIMESTAMPTZ DEFAULT NULL
|
||||
);
|
||||
CREATE INDEX idx_urls_status ON urls(status);
|
||||
CREATE INDEX idx_urls_ts_fetch ON urls(ts_fetch);
|
||||
|
||||
CREATE TABLE URLS_DUPLICATE (
|
||||
id_url_canonical INTEGER REFERENCES URLS(id),
|
||||
id_url_duplicated INTEGER REFERENCES URLS(id),
|
||||
PRIMARY KEY (id_url_canonical, id_url_duplicated)
|
||||
);
|
||||
|
||||
CREATE TYPE SEARCH_TYPE AS ENUM ('rss_feed', 'keyword_search', 'url_host');
|
||||
CREATE TABLE SEARCH (
|
||||
id SMALLSERIAL PRIMARY KEY,
|
||||
search TEXT NOT NULL UNIQUE,
|
||||
type SEARCH_TYPE NOT NULL
|
||||
-- language_country CHAR(5), -- Language: ISO 639-1 Code. Country: ISO 3166 ALPHA-2. e.g.: en-us. Required for search
|
||||
-- UNIQUE(search, language_country)
|
||||
);
|
||||
CREATE INDEX idx_search_type ON SEARCH(type);
|
||||
|
||||
CREATE TABLE SOURCE (
|
||||
id SMALLSERIAL PRIMARY KEY,
|
||||
source TEXT NOT NULL UNIQUE
|
||||
);
|
||||
|
||||
-- CREATE TABLE SEARCH_LANGUAGE (
|
||||
-- language CHAR(2) NOT NULL, -- ISO 639-1 Code, e.g. "en"
|
||||
-- country CHAR(2) NOT NULL, -- ISO 3166 ALPHA-2, e.g. "us"
|
||||
-- PRIMARY KEY (language, country)
|
||||
-- );
|
||||
|
||||
CREATE TABLE URLS_SOURCE_SEARCH (
|
||||
id_url INTEGER REFERENCES URLS(id),
|
||||
id_source SMALLINT REFERENCES SOURCE(id) ON UPDATE CASCADE ON DELETE RESTRICT,
|
||||
id_search SMALLINT REFERENCES SEARCH(id) ON UPDATE CASCADE ON DELETE RESTRICT,
|
||||
PRIMARY KEY(id_url, id_source, id_search)
|
||||
);
|
||||
CREATE INDEX idx_source ON URLS_SOURCE_SEARCH(id_source);
|
||||
CREATE INDEX idx_search ON URLS_SOURCE_SEARCH(id_search);
|
||||
|
||||
CREATE TABLE STATUS_PATTERN_MATCHING (
|
||||
pattern TEXT PRIMARY KEY,
|
||||
priority SMALLINT NOT NULL,
|
||||
status URL_STATUS NOT NULL
|
||||
);
|
||||
|
||||
|
||||
CREATE TABLE URL_CONTENT (
|
||||
id_url INTEGER PRIMARY KEY REFERENCES URLS(id),
|
||||
date_published TIMESTAMPTZ DEFAULT NOW(),
|
||||
title TEXT,
|
||||
description TEXT,
|
||||
content TEXT,
|
||||
valid_content BOOLEAN,
|
||||
language CHAR(2), -- ISO 639-1 Code
|
||||
keywords TEXT[],
|
||||
tags TEXT[],
|
||||
authors TEXT[],
|
||||
image_main_url TEXT,
|
||||
images_url TEXT[],
|
||||
videos_url TEXT[],
|
||||
url_host TEXT, -- www.breitbart.com
|
||||
site_name TEXT -- Breitbart News
|
||||
);
|
||||
CREATE INDEX idx_tags ON URL_CONTENT USING GIN(tags);
|
||||
CREATE INDEX idx_authors ON URL_CONTENT USING GIN(authors);
|
||||
CREATE INDEX idx_date_published ON URL_CONTENT (date_published);
|
||||
CREATE INDEX idx_valid_content ON URL_CONTENT (valid_content);
|
||||
CREATE INDEX idx_language ON URL_CONTENT (language);
|
||||
CREATE INDEX idx_url_host ON URL_CONTENT (url_host);
|
||||
""")
|
||||
except Exception as e:
|
||||
print(str(e))
|
||||
|
||||
|
||||
def find_feeds(url):
|
||||
list_feeds = []
|
||||
try:
|
||||
def get_with_protocol(url):
|
||||
# http:// -> https://
|
||||
url = url.replace("http://", "https://")
|
||||
# "" -> https://
|
||||
if not (url.startswith("https://")):
|
||||
url = "https://" + url
|
||||
return url
|
||||
url = get_with_protocol(url)
|
||||
|
||||
response = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'})
|
||||
html = response.text
|
||||
tree = html5lib.parse(html, namespaceHTMLElements=False)
|
||||
|
||||
# base for relative URLs
|
||||
base = tree.findall('.//base')
|
||||
base_url = base[0].attrib['href'] if base and 'href' in base[0].attrib else url
|
||||
|
||||
# prioritize Atom over RSS
|
||||
links = tree.findall("""head/link[@rel='alternate'][@type='application/atom+xml']""") + tree.findall("""head/link[@rel='alternate'][@type='application/rss+xml']""")
|
||||
for link in links:
|
||||
href = link.attrib.get('href', '').strip()
|
||||
if href:
|
||||
r = requests.get(urllib.parse.urljoin(base_url, href), allow_redirects=True)
|
||||
list_feeds.append(r.url)
|
||||
|
||||
# heuristic search for common feed paths
|
||||
for suffix in [
|
||||
'feed', 'feed/', 'rss', 'atom', 'feed.xml',
|
||||
'/feed', '/feed/', '/rss', '/atom', '/feed.xml',
|
||||
'index.atom', 'index.rss', 'index.xml', 'atom.xml', 'rss.xml',
|
||||
'/index.atom', '/index.rss', '/index.xml', '/atom.xml', '/rss.xml',
|
||||
'.rss', '/.rss', '?rss=1', '?feed=rss2',
|
||||
]:
|
||||
try:
|
||||
potential_feed = urllib.parse.urljoin(base_url, suffix)
|
||||
response = requests.get(potential_feed, allow_redirects=True)
|
||||
if (response.status_code == 200) and (len(feedparser.parse(potential_feed).get("entries")) > 0):
|
||||
list_feeds.append(response.url)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
except Exception as e:
|
||||
print(f"An error occurred: {e}")
|
||||
|
||||
# Remove duplicates
|
||||
return list(set(list_feeds))
|
||||
|
||||
def initialize_data():
|
||||
# Read data
|
||||
with open("init_data.json", "r") as f:
|
||||
data_json = json.loads(f.read())
|
||||
|
||||
print("Initialization data:", data_json)
|
||||
|
||||
# Connect to an existing database
|
||||
with psycopg.connect(connection_info) as conn:
|
||||
# Open a cursor to perform database operations
|
||||
with conn.cursor() as cur:
|
||||
# Autocommit at end of transaction (Atomic creation of data)
|
||||
# with conn.transaction() as tx:
|
||||
|
||||
# TODO: Language per search
|
||||
# cur.execute( "INSERT INTO SEARCH (search, type) VALUES ('child abuse', 'keyword_search', 'en-us');" )
|
||||
# cur.execute( "INSERT INTO SEARCH (search, type) VALUES ('child abuse', 'keyword_search', 'en-gb');" )
|
||||
|
||||
for list_pattern_status_priority in data_json.get("REGEX_PATTERN_STATUS_PRIORITY", []):
|
||||
# Decode
|
||||
pattern, status, priority = list_pattern_status_priority
|
||||
# Query
|
||||
query = "INSERT INTO STATUS_PATTERN_MATCHING (pattern, priority, status) VALUES ('{}', {}, '{}');".format(pattern, priority, status)
|
||||
print(query)
|
||||
cur.execute(query)
|
||||
|
||||
# Feeds, URL host, keyword search
|
||||
for search_type, list_searches in data_json.get("SEARCH", {}).items():
|
||||
for search in list_searches:
|
||||
query = "INSERT INTO SEARCH (search, type) VALUES ('{}', '{}');".format(search, search_type)
|
||||
print(query)
|
||||
cur.execute(query)
|
||||
|
||||
# Try finding RSS feed
|
||||
if (search_type == "url_host"):
|
||||
url_host = search
|
||||
list_feeds = find_feeds(url_host)
|
||||
# If not exists, insert feed
|
||||
for feed in list_feeds:
|
||||
query = "INSERT INTO SEARCH (search, type) VALUES ('{}', '{}') ON CONFLICT DO NOTHING;".format(feed, "rss_feed")
|
||||
print(query)
|
||||
cur.execute(query)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description='Database initialization')
|
||||
parser.add_argument('--initialize_tables', help='Create DB tables', action='store_true', default=False)
|
||||
parser.add_argument('--initialize_data', help='Insert data', action='store_true', default=False)
|
||||
args = parser.parse_args()
|
||||
|
||||
# Wait for DB connection
|
||||
wait_connection()
|
||||
|
||||
if (args.initialize_tables):
|
||||
print("Initializing tables")
|
||||
initialize_tables()
|
||||
if (args.initialize_data):
|
||||
print("Initializing data")
|
||||
initialize_data()
|
||||
Reference in New Issue
Block a user