255 lines
11 KiB
Python
255 lines
11 KiB
Python
import argparse
|
|
import os
|
|
import psycopg
|
|
import json
|
|
import time
|
|
import urllib.parse
|
|
import html5lib
|
|
import feedparser
|
|
import requests
|
|
|
|
connection_info = "host={} port={} dbname={} user={} password={} connect_timeout=60".format(
|
|
os.environ.get("DB_HOST", "localhost"),
|
|
os.environ.get("DB_PORT", "5432"),
|
|
os.environ.get("DB_NAME", "matitos"),
|
|
os.environ.get("DB_USER", "supermatitos"),
|
|
os.environ.get("DB_PASSWORD", "supermatitos")
|
|
)
|
|
|
|
def wait_connection():
|
|
connected = False
|
|
while (not connected):
|
|
try:
|
|
# Connect to an existing database
|
|
with psycopg.connect(connection_info) as conn:
|
|
# Open a cursor to perform database operations
|
|
with conn.cursor() as cur:
|
|
# Create URLs table
|
|
c = cur.execute("SELECT 1;").fetchall()
|
|
connected = True
|
|
|
|
except psycopg.OperationalError as e:
|
|
print(str(e))
|
|
# Connection not ready...
|
|
# print(".", end="")
|
|
time.sleep(15)
|
|
except Exception as e:
|
|
print(str(e))
|
|
# Connection not ready...
|
|
# print("e", end="")
|
|
time.sleep(15)
|
|
|
|
print("DB connection ready")
|
|
|
|
def initialize_tables():
|
|
# Connect to an existing database
|
|
with psycopg.connect(connection_info) as conn:
|
|
# Open a cursor to perform database operations
|
|
with conn.cursor() as cur:
|
|
# Autocommit at end of transaction (Atomic creation of tables)
|
|
with conn.transaction() as tx:
|
|
try:
|
|
# Create URLs table
|
|
c = cur.execute("""
|
|
CREATE TYPE URL_STATUS AS ENUM ('raw', 'error', 'valid', 'unknown', 'invalid', 'duplicate');
|
|
|
|
CREATE TABLE URLS (
|
|
id SERIAL PRIMARY KEY,
|
|
url TEXT NOT NULL UNIQUE,
|
|
ts_fetch TIMESTAMPTZ NOT NULL DEFAULT NOW(),
|
|
status URL_STATUS NOT NULL DEFAULT 'raw' -- ,
|
|
-- status_wendy WENDY_STATUS DEFAULT NULL,
|
|
-- ts_wendy TIMESTAMPTZ DEFAULT NULL,
|
|
-- child_abuse BOOLEAN DEFAULT NULL,
|
|
);
|
|
CREATE INDEX idx_urls_status ON urls(status);
|
|
CREATE INDEX idx_urls_ts_fetch ON urls(ts_fetch);
|
|
|
|
CREATE TABLE URLS_DUPLICATE (
|
|
id_url_canonical INTEGER REFERENCES URLS(id),
|
|
id_url_duplicated INTEGER REFERENCES URLS(id),
|
|
PRIMARY KEY (id_url_canonical, id_url_duplicated)
|
|
);
|
|
|
|
CREATE TYPE SEARCH_TYPE AS ENUM ('rss_feed', 'keyword_search', 'url_host');
|
|
CREATE TABLE SEARCH (
|
|
id SMALLSERIAL PRIMARY KEY,
|
|
search TEXT NOT NULL UNIQUE,
|
|
type SEARCH_TYPE NOT NULL
|
|
-- language_country CHAR(5), -- Language: ISO 639-1 Code. Country: ISO 3166 ALPHA-2. e.g.: en-us. Required for search
|
|
-- UNIQUE(search, language_country)
|
|
);
|
|
CREATE INDEX idx_search_type ON SEARCH(type);
|
|
|
|
CREATE TABLE SOURCE (
|
|
id SMALLSERIAL PRIMARY KEY,
|
|
source TEXT NOT NULL UNIQUE
|
|
);
|
|
|
|
-- CREATE TABLE SEARCH_LANGUAGE (
|
|
-- language CHAR(2) NOT NULL, -- ISO 639-1 Code, e.g. "en"
|
|
-- country CHAR(2) NOT NULL, -- ISO 3166 ALPHA-2, e.g. "us"
|
|
-- PRIMARY KEY (language, country)
|
|
-- );
|
|
|
|
CREATE TABLE URLS_SOURCE_SEARCH (
|
|
id_url INTEGER REFERENCES URLS(id),
|
|
id_source SMALLINT REFERENCES SOURCE(id) ON UPDATE CASCADE ON DELETE RESTRICT,
|
|
id_search SMALLINT REFERENCES SEARCH(id) ON UPDATE CASCADE ON DELETE RESTRICT,
|
|
PRIMARY KEY(id_url, id_source, id_search)
|
|
);
|
|
CREATE INDEX idx_source ON URLS_SOURCE_SEARCH(id_source);
|
|
CREATE INDEX idx_search ON URLS_SOURCE_SEARCH(id_search);
|
|
|
|
CREATE TABLE STATUS_PATTERN_MATCHING (
|
|
pattern TEXT PRIMARY KEY,
|
|
priority SMALLINT NOT NULL,
|
|
status URL_STATUS NOT NULL
|
|
);
|
|
|
|
|
|
CREATE TABLE URL_CONTENT (
|
|
id_url INTEGER PRIMARY KEY REFERENCES URLS(id),
|
|
date_published TIMESTAMPTZ DEFAULT NOW(),
|
|
title TEXT,
|
|
description TEXT,
|
|
content TEXT,
|
|
valid_content BOOLEAN,
|
|
language CHAR(2), -- ISO 639-1 Code
|
|
keywords TEXT[],
|
|
tags TEXT[],
|
|
authors TEXT[],
|
|
image_main_url TEXT,
|
|
images_url TEXT[],
|
|
videos_url TEXT[],
|
|
url_host TEXT, -- www.breitbart.com
|
|
site_name TEXT -- Breitbart News
|
|
);
|
|
CREATE INDEX idx_tags ON URL_CONTENT USING GIN(tags);
|
|
CREATE INDEX idx_authors ON URL_CONTENT USING GIN(authors);
|
|
CREATE INDEX idx_date_published ON URL_CONTENT (date_published);
|
|
CREATE INDEX idx_valid_content ON URL_CONTENT (valid_content);
|
|
CREATE INDEX idx_language ON URL_CONTENT (language);
|
|
CREATE INDEX idx_url_host ON URL_CONTENT (url_host);
|
|
""")
|
|
except Exception as e:
|
|
print(str(e))
|
|
|
|
|
|
def find_feeds(url):
|
|
list_feeds = []
|
|
try:
|
|
def get_with_protocol(url):
|
|
# http:// -> https://
|
|
url = url.replace("http://", "https://")
|
|
# "" -> https://
|
|
if not (url.startswith("https://")):
|
|
url = "https://" + url
|
|
return url
|
|
url = get_with_protocol(url)
|
|
|
|
response = requests.get(url, headers={'User-Agent': 'Mozilla/5.0'})
|
|
html = response.text
|
|
tree = html5lib.parse(html, namespaceHTMLElements=False)
|
|
|
|
# base for relative URLs
|
|
base = tree.findall('.//base')
|
|
base_url = base[0].attrib['href'] if base and 'href' in base[0].attrib else url
|
|
|
|
# prioritize Atom over RSS
|
|
links = tree.findall("""head/link[@rel='alternate'][@type='application/atom+xml']""") + tree.findall("""head/link[@rel='alternate'][@type='application/rss+xml']""")
|
|
for link in links:
|
|
href = link.attrib.get('href', '').strip()
|
|
if href:
|
|
r = requests.get(urllib.parse.urljoin(base_url, href), allow_redirects=True)
|
|
list_feeds.append(r.url)
|
|
|
|
# heuristic search for common feed paths
|
|
for suffix in [
|
|
'feed', 'feed/', 'rss', 'atom', 'feed.xml',
|
|
'/feed', '/feed/', '/rss', '/atom', '/feed.xml',
|
|
'index.atom', 'index.rss', 'index.xml', 'atom.xml', 'rss.xml',
|
|
'/index.atom', '/index.rss', '/index.xml', '/atom.xml', '/rss.xml',
|
|
'.rss', '/.rss', '?rss=1', '?feed=rss2',
|
|
]:
|
|
try:
|
|
potential_feed = urllib.parse.urljoin(base_url, suffix)
|
|
response = requests.get(potential_feed, allow_redirects=True)
|
|
if (response.status_code == 200) and (len(feedparser.parse(potential_feed).get("entries")) > 0):
|
|
list_feeds.append(response.url)
|
|
except Exception:
|
|
continue
|
|
|
|
except Exception as e:
|
|
print(f"An error occurred: {e}")
|
|
|
|
# Remove comments feed
|
|
list_feeds = [f for f in list_feeds if "/comments" not in f]
|
|
|
|
# Remove duplicates
|
|
return list(set(list_feeds))
|
|
|
|
def initialize_data():
|
|
# Read data
|
|
with open("init_data.json", "r") as f:
|
|
data_json = json.loads(f.read())
|
|
|
|
print("Initialization data:", data_json)
|
|
|
|
# Connect to an existing database
|
|
with psycopg.connect(connection_info) as conn:
|
|
# Open a cursor to perform database operations
|
|
with conn.cursor() as cur:
|
|
# Autocommit at end of transaction (Atomic creation of data)
|
|
# with conn.transaction() as tx:
|
|
|
|
# TODO: Language per search
|
|
# cur.execute( "INSERT INTO SEARCH (search, type) VALUES ('child abuse', 'keyword_search', 'en-us');" )
|
|
# cur.execute( "INSERT INTO SEARCH (search, type) VALUES ('child abuse', 'keyword_search', 'en-gb');" )
|
|
|
|
for list_pattern_status_priority in data_json.get("REGEX_PATTERN_STATUS_PRIORITY", []):
|
|
# Decode
|
|
pattern, status, priority = list_pattern_status_priority
|
|
# Query
|
|
query = "INSERT INTO STATUS_PATTERN_MATCHING (pattern, priority, status) VALUES ('{}', {}, '{}');".format(pattern, priority, status)
|
|
print(query)
|
|
cur.execute(query)
|
|
|
|
# Connect to an existing database
|
|
with psycopg.connect(connection_info) as conn:
|
|
# Open a cursor to perform database operations
|
|
with conn.cursor() as cur:
|
|
# Feeds, URL host, keyword search
|
|
for search_type, list_searches in data_json.get("SEARCH", {}).items():
|
|
for search in list_searches:
|
|
query = "INSERT INTO SEARCH (search, type) VALUES ('{}', '{}');".format(search, search_type)
|
|
print(query)
|
|
cur.execute(query)
|
|
|
|
# Try finding RSS feed
|
|
if (search_type == "url_host"):
|
|
url_host = search
|
|
list_feeds = find_feeds(url_host)
|
|
# If not exists, insert feed
|
|
for feed in list_feeds:
|
|
query = "INSERT INTO SEARCH (search, type) VALUES ('{}', '{}') ON CONFLICT DO NOTHING;".format(feed, "rss_feed")
|
|
print(query)
|
|
cur.execute(query)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
parser = argparse.ArgumentParser(description='Database initialization')
|
|
parser.add_argument('--initialize_tables', help='Create DB tables', action='store_true', default=False)
|
|
parser.add_argument('--initialize_data', help='Insert data', action='store_true', default=False)
|
|
args = parser.parse_args()
|
|
|
|
# Wait for DB connection
|
|
wait_connection()
|
|
|
|
if (args.initialize_tables):
|
|
print("Initializing tables")
|
|
initialize_tables()
|
|
if (args.initialize_data):
|
|
print("Initializing data")
|
|
initialize_data()
|