import argparse import os import psycopg import json import time import urllib.parse import html5lib import feedparser import requests connection_info = "host={} port={} dbname={} user={} password={} connect_timeout=60".format( os.environ.get("DB_HOST", "localhost"), os.environ.get("DB_PORT", "5432"), os.environ.get("DB_NAME", "matitos"), os.environ.get("DB_USER", "supermatitos"), os.environ.get("DB_PASSWORD", "supermatitos") ) def wait_connection(): connected = False while (not connected): try: # Connect to an existing database with psycopg.connect(connection_info) as conn: # Open a cursor to perform database operations with conn.cursor() as cur: # Create URLs table c = cur.execute("SELECT 1;").fetchall() connected = True except psycopg.OperationalError as e: print(str(e)) # Connection not ready... # print(".", end="") time.sleep(15) except Exception as e: print(str(e)) # Connection not ready... # print("e", end="") time.sleep(15) print("DB connection ready") def initialize_tables(): # Connect to an existing database with psycopg.connect(connection_info) as conn: # Open a cursor to perform database operations with conn.cursor() as cur: # Autocommit at end of transaction (Atomic creation of tables) with conn.transaction() as tx: try: # Create URLs table c = cur.execute(""" CREATE TYPE URL_STATUS AS ENUM ('raw', 'error', 'valid', 'unknown', 'invalid', 'duplicate'); CREATE TABLE URLS ( id SERIAL PRIMARY KEY, url TEXT NOT NULL UNIQUE, ts_fetch TIMESTAMPTZ NOT NULL DEFAULT NOW(), status URL_STATUS NOT NULL DEFAULT 'raw' -- , -- status_wendy WENDY_STATUS DEFAULT NULL, -- ts_wendy TIMESTAMPTZ DEFAULT NULL, -- child_abuse BOOLEAN DEFAULT NULL, ); CREATE INDEX idx_urls_status ON urls(status); CREATE INDEX idx_urls_ts_fetch ON urls(ts_fetch); CREATE TABLE URLS_DUPLICATE ( id_url_canonical INTEGER REFERENCES URLS(id), id_url_duplicated INTEGER REFERENCES URLS(id), PRIMARY KEY (id_url_canonical, id_url_duplicated) ); CREATE TYPE SEARCH_TYPE AS ENUM ('rss_feed', 'keyword_search', 'url_host'); CREATE TABLE SEARCH ( id SMALLSERIAL PRIMARY KEY, search TEXT NOT NULL UNIQUE, type SEARCH_TYPE NOT NULL -- language_country CHAR(5), -- Language: ISO 639-1 Code. Country: ISO 3166 ALPHA-2. e.g.: en-us. Required for search -- UNIQUE(search, language_country) ); CREATE INDEX idx_search_type ON SEARCH(type); CREATE TABLE SOURCE ( id SMALLSERIAL PRIMARY KEY, source TEXT NOT NULL UNIQUE ); -- CREATE TABLE SEARCH_LANGUAGE ( -- language CHAR(2) NOT NULL, -- ISO 639-1 Code, e.g. "en" -- country CHAR(2) NOT NULL, -- ISO 3166 ALPHA-2, e.g. "us" -- PRIMARY KEY (language, country) -- ); CREATE TABLE URLS_SOURCE_SEARCH ( id_url INTEGER REFERENCES URLS(id), id_source SMALLINT REFERENCES SOURCE(id) ON UPDATE CASCADE ON DELETE RESTRICT, id_search SMALLINT REFERENCES SEARCH(id) ON UPDATE CASCADE ON DELETE RESTRICT, PRIMARY KEY(id_url, id_source, id_search) ); CREATE INDEX idx_source ON URLS_SOURCE_SEARCH(id_source); CREATE INDEX idx_search ON URLS_SOURCE_SEARCH(id_search); CREATE TABLE STATUS_PATTERN_MATCHING ( pattern TEXT PRIMARY KEY, priority SMALLINT NOT NULL, status URL_STATUS NOT NULL ); CREATE TABLE URL_CONTENT ( id_url INTEGER PRIMARY KEY REFERENCES URLS(id), date_published TIMESTAMPTZ DEFAULT NOW(), title TEXT, description TEXT, content TEXT, valid_content BOOLEAN, language CHAR(2), -- ISO 639-1 Code keywords TEXT[], tags TEXT[], authors TEXT[], image_main_url TEXT, images_url TEXT[], videos_url TEXT[], url_host TEXT, -- www.breitbart.com site_name TEXT -- Breitbart News ); CREATE INDEX idx_tags ON URL_CONTENT USING GIN(tags); CREATE INDEX idx_authors ON URL_CONTENT USING GIN(authors); CREATE INDEX idx_date_published ON URL_CONTENT (date_published); CREATE INDEX idx_valid_content ON URL_CONTENT (valid_content); CREATE INDEX idx_language ON URL_CONTENT (language); CREATE INDEX idx_url_host ON URL_CONTENT (url_host); """) except Exception as e: print(str(e)) def find_feeds(url): list_feeds = [] try: def get_with_protocol(url): # http:// -> https:// url = url.replace("http://", "https://") # "" -> https:// if not (url.startswith("https://")): url = "https://" + url return url url = get_with_protocol(url) response = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'}) html = response.text tree = html5lib.parse(html, namespaceHTMLElements=False) # base for relative URLs base = tree.findall('.//base') base_url = base[0].attrib['href'] if base and 'href' in base[0].attrib else url # prioritize Atom over RSS links = tree.findall("""head/link[@rel='alternate'][@type='application/atom+xml']""") + tree.findall("""head/link[@rel='alternate'][@type='application/rss+xml']""") for link in links: href = link.attrib.get('href', '').strip() if href: r = requests.get(urllib.parse.urljoin(base_url, href), allow_redirects=True) list_feeds.append(r.url) # heuristic search for common feed paths for suffix in [ 'feed', 'feed/', 'rss', 'atom', 'feed.xml', '/feed', '/feed/', '/rss', '/atom', '/feed.xml', 'index.atom', 'index.rss', 'index.xml', 'atom.xml', 'rss.xml', '/index.atom', '/index.rss', '/index.xml', '/atom.xml', '/rss.xml', '.rss', '/.rss', '?rss=1', '?feed=rss2', ]: try: potential_feed = urllib.parse.urljoin(base_url, suffix) response = requests.get(potential_feed, allow_redirects=True) if (response.status_code == 200) and (len(feedparser.parse(potential_feed).get("entries")) > 0): list_feeds.append(response.url) except Exception: continue except Exception as e: print(f"An error occurred: {e}") # Remove comments feed list_feeds = [f for f in list_feeds if "/comments" not in f] # Remove duplicates return list(set(list_feeds)) def initialize_data(): # Read data with open("init_data.json", "r") as f: data_json = json.loads(f.read()) print("Initialization data:", data_json) # Connect to an existing database with psycopg.connect(connection_info) as conn: # Open a cursor to perform database operations with conn.cursor() as cur: # Autocommit at end of transaction (Atomic creation of data) # with conn.transaction() as tx: # TODO: Language per search # cur.execute( "INSERT INTO SEARCH (search, type) VALUES ('child abuse', 'keyword_search', 'en-us');" ) # cur.execute( "INSERT INTO SEARCH (search, type) VALUES ('child abuse', 'keyword_search', 'en-gb');" ) for list_pattern_status_priority in data_json.get("REGEX_PATTERN_STATUS_PRIORITY", []): # Decode pattern, status, priority = list_pattern_status_priority # Query query = "INSERT INTO STATUS_PATTERN_MATCHING (pattern, priority, status) VALUES ('{}', {}, '{}');".format(pattern, priority, status) print(query) cur.execute(query) # Connect to an existing database with psycopg.connect(connection_info) as conn: # Open a cursor to perform database operations with conn.cursor() as cur: # Feeds, URL host, keyword search for search_type, list_searches in data_json.get("SEARCH", {}).items(): for search in list_searches: query = "INSERT INTO SEARCH (search, type) VALUES ('{}', '{}');".format(search, search_type) print(query) cur.execute(query) # Try finding RSS feed if (search_type == "url_host"): url_host = search list_feeds = find_feeds(url_host) # If not exists, insert feed for feed in list_feeds: query = "INSERT INTO SEARCH (search, type) VALUES ('{}', '{}') ON CONFLICT DO NOTHING;".format(feed, "rss_feed") print(query) cur.execute(query) if __name__ == '__main__': parser = argparse.ArgumentParser(description='Database initialization') parser.add_argument('--initialize_tables', help='Create DB tables', action='store_true', default=False) parser.add_argument('--initialize_data', help='Insert data', action='store_true', default=False) args = parser.parse_args() # Wait for DB connection wait_connection() if (args.initialize_tables): print("Initializing tables") initialize_tables() if (args.initialize_data): print("Initializing data") initialize_data()