Refactoring fetcher WIP
This commit is contained in:
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,3 +1,4 @@
|
|||||||
__pycache__/
|
__pycache__/
|
||||||
*.pyc
|
*.pyc
|
||||||
**/credentials.py
|
**/credentials.py
|
||||||
|
logs/
|
||||||
|
|||||||
@@ -34,5 +34,5 @@ docker run --rm -it -p 12343:80 image_generation
|
|||||||
|
|
||||||
# Deploy
|
# Deploy
|
||||||
```
|
```
|
||||||
python manage.py runserver
|
python app_web/manage.py runserver
|
||||||
```
|
```
|
||||||
|
|||||||
46
app_fetcher/Dev.ipynb
Normal file
46
app_fetcher/Dev.ipynb
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
{
|
||||||
|
"cells": [
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"conda create -n matitos_fetcher python=3.12\n",
|
||||||
|
"conda activate matitos_fetcher\n",
|
||||||
|
"conda install -c conda-forge curl\n",
|
||||||
|
"pip install ipykernel \"psycopg[binary]\" git+https://github.com/ranahaani/GNews.git GoogleNews duckduckgo_search newspaper4k numpy beautifulsoup4 requests feedparser pytz redis fastapi uvicorn fastapi-utils lxml[html_clean]"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": null,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [],
|
||||||
|
"source": [
|
||||||
|
"!uvicorn app:app --host 0.0.0.0 --port 5000 --reload"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"metadata": {
|
||||||
|
"kernelspec": {
|
||||||
|
"display_name": "matitos_fetcher",
|
||||||
|
"language": "python",
|
||||||
|
"name": "python3"
|
||||||
|
},
|
||||||
|
"language_info": {
|
||||||
|
"codemirror_mode": {
|
||||||
|
"name": "ipython",
|
||||||
|
"version": 3
|
||||||
|
},
|
||||||
|
"file_extension": ".py",
|
||||||
|
"mimetype": "text/x-python",
|
||||||
|
"name": "python",
|
||||||
|
"nbconvert_exporter": "python",
|
||||||
|
"pygments_lexer": "ipython3",
|
||||||
|
"version": "3.12.9"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"nbformat": 4,
|
||||||
|
"nbformat_minor": 2
|
||||||
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
FROM continuumio/miniconda3:23.10.0-1
|
FROM continuumio/miniconda3:25.1.1-2
|
||||||
|
|
||||||
# App repository
|
# App repository
|
||||||
COPY . /opt/app/
|
COPY . /opt/app/
|
||||||
@@ -10,6 +10,7 @@ RUN pip freeze
|
|||||||
|
|
||||||
WORKDIR /opt/app
|
WORKDIR /opt/app
|
||||||
|
|
||||||
|
# https://www.uvicorn.org/settings/#resource-limits
|
||||||
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "80"]
|
CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "80"]
|
||||||
|
|
||||||
# docker build -t fetch_app .
|
# docker build -t fetch_app .
|
||||||
|
|||||||
@@ -1,5 +1,13 @@
|
|||||||
# Fetcher
|
# Fetcher
|
||||||
|
|
||||||
|
```
|
||||||
|
conda create -n matitos_fetcher python=3.12
|
||||||
|
conda activate matitos_fetcher
|
||||||
|
conda install -c conda-forge curl
|
||||||
|
pip install ipykernel "psycopg[binary]" git+https://github.com/ranahaani/GNews.git GoogleNews duckduckgo_search newspaper4k numpy beautifulsoup4 requests feedparser pytz redis fastapi uvicorn fastapi-utils lxml[html_clean]
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
* Fetcher app
|
* Fetcher app
|
||||||
- Contains several endpoints to perform a specific fetching type task
|
- Contains several endpoints to perform a specific fetching type task
|
||||||
- For more details, check in [app.py](app.py) /{fetch_type}
|
- For more details, check in [app.py](app.py) /{fetch_type}
|
||||||
|
|||||||
@@ -30,6 +30,8 @@ from src.missing_kids_status import MissingKidsStatus
|
|||||||
from src.url_status import UpdateErrorURLs
|
from src.url_status import UpdateErrorURLs
|
||||||
from src.fetcher_status import FetcherStatus
|
from src.fetcher_status import FetcherStatus
|
||||||
|
|
||||||
|
from src.db_utils import DB_Handler
|
||||||
|
|
||||||
from fastapi import FastAPI, BackgroundTasks
|
from fastapi import FastAPI, BackgroundTasks
|
||||||
# import requests
|
# import requests
|
||||||
# from fastapi_utils.tasks import repeat_every
|
# from fastapi_utils.tasks import repeat_every
|
||||||
@@ -37,11 +39,13 @@ from fastapi import FastAPI, BackgroundTasks
|
|||||||
# time.sleep(10)
|
# time.sleep(10)
|
||||||
# import gc
|
# import gc
|
||||||
|
|
||||||
|
db_handler = DB_Handler(cred.db_connect_info, cred.redis_connect_info)
|
||||||
|
|
||||||
app = FastAPI()
|
app = FastAPI()
|
||||||
|
|
||||||
@app.get("/")
|
@app.get("/")
|
||||||
def hello_world():
|
def hello_world():
|
||||||
return {"message": "OK"}
|
return {"message": "Ok"}
|
||||||
|
|
||||||
@app.get("/{fetch_type}")
|
@app.get("/{fetch_type}")
|
||||||
async def fetch(background_tasks: BackgroundTasks, fetch_type: str):
|
async def fetch(background_tasks: BackgroundTasks, fetch_type: str):
|
||||||
@@ -49,9 +53,9 @@ async def fetch(background_tasks: BackgroundTasks, fetch_type: str):
|
|||||||
logger.info("Triggered fetch: {}".format(fetch_type))
|
logger.info("Triggered fetch: {}".format(fetch_type))
|
||||||
|
|
||||||
if (fetch_type == "feeds"):
|
if (fetch_type == "feeds"):
|
||||||
task_run = NewsFeed(cred.db_connect_info, cred.redis_connect_info).run
|
task_run = NewsFeed(db_handler).run
|
||||||
elif (fetch_type == "parser"):
|
elif (fetch_type == "parser"):
|
||||||
task_run = NewsSiteParsing(cred.db_connect_info, cred.redis_connect_info).run
|
task_run = NewsSiteParsing(db_handler).run
|
||||||
elif (fetch_type == "fetch_missing_kids_reduced"):
|
elif (fetch_type == "fetch_missing_kids_reduced"):
|
||||||
task_run = NewsMissingKids(cred.db_connect_info, cred.redis_connect_info, num_pages=4).run
|
task_run = NewsMissingKids(cred.db_connect_info, cred.redis_connect_info, num_pages=4).run
|
||||||
elif (fetch_type == "fetch_missing_kids_full"):
|
elif (fetch_type == "fetch_missing_kids_full"):
|
||||||
|
|||||||
@@ -13,11 +13,11 @@ logger = logging.getLogger("news_fetcher")
|
|||||||
# TODO: URL_DB_HANDLER, _get_search_list, _get_url_host, _get_url_host_list, ...
|
# TODO: URL_DB_HANDLER, _get_search_list, _get_url_host, _get_url_host_list, ...
|
||||||
# The rest, elsewhere
|
# The rest, elsewhere
|
||||||
|
|
||||||
class URL_DB_Writer():
|
class DB_Handler():
|
||||||
def __init__(self, db_connect_info, redis_connect_info):
|
def __init__(self, db_connect_info, redis_connect_info):
|
||||||
logger.debug("Initializing URL DB writer")
|
logger.debug("Initializing URL DB writer")
|
||||||
self.db_connect_info = db_connect_info
|
self.db_connect_info = db_connect_info
|
||||||
self.redis_instance = redis.Redis(host=redis_connect_info.get("redis_host"), port=redis_connect_info.get("redis_port"))
|
self.redis_instance = redis.Redis(host=redis_connect_info.get("host"), port=redis_connect_info.get("port"))
|
||||||
self.redis_expiry_seconds = redis_connect_info.get("expiry_seconds", 172800) # Default: 48 hours
|
self.redis_expiry_seconds = redis_connect_info.get("expiry_seconds", 172800) # Default: 48 hours
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -41,6 +41,28 @@ class URL_DB_Writer():
|
|||||||
num_urls = None
|
num_urls = None
|
||||||
return num_urls
|
return num_urls
|
||||||
|
|
||||||
|
def _get_feed_urls(self):
|
||||||
|
try:
|
||||||
|
with psycopg.connect(self.db_connect_info) as conn:
|
||||||
|
list_url_feeds = conn.execute("SELECT rss_feed FROM FEED;").fetchall()
|
||||||
|
# Decode (tuple with 1 element)
|
||||||
|
list_url_feeds = [l[0] for l in list_url_feeds]
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Exception fetching RSS sites: " + str(e))
|
||||||
|
list_url_feeds = []
|
||||||
|
return list_url_feeds
|
||||||
|
|
||||||
|
def _get_url_hosts(self):
|
||||||
|
try:
|
||||||
|
with psycopg.connect(self.db_connect_info) as conn:
|
||||||
|
list_url_hosts = conn.execute("SELECT url_host FROM WEBSITE_OF_INTEREST;").fetchall()
|
||||||
|
# Decode (tuple with 1 element)
|
||||||
|
list_url_hosts = [l[0] for l in list_url_hosts]
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("Exception fetching RSS sites: " + str(e))
|
||||||
|
list_url_hosts = []
|
||||||
|
return list_url_hosts
|
||||||
|
|
||||||
def _format(self, values):
|
def _format(self, values):
|
||||||
# Repalce single quote ' with ''. Based on https://stackoverflow.com/a/12320729
|
# Repalce single quote ' with ''. Based on https://stackoverflow.com/a/12320729
|
||||||
# String -> 'string', Int -> '1' (string-based), None -> NULL (no quotes for pgSQL to interpret Null value)
|
# String -> 'string', Int -> '1' (string-based), None -> NULL (no quotes for pgSQL to interpret Null value)
|
||||||
@@ -352,6 +374,7 @@ class URL_DB_Writer():
|
|||||||
# Decode source id
|
# Decode source id
|
||||||
id_source = c[0]
|
id_source = c[0]
|
||||||
# Cache
|
# Cache
|
||||||
|
print("*"*10, source, id_source)
|
||||||
self.redis_instance.set(source, id_source, ex=self.redis_expiry_seconds)
|
self.redis_instance.set(source, id_source, ex=self.redis_expiry_seconds)
|
||||||
return id_source
|
return id_source
|
||||||
|
|
||||||
|
|||||||
@@ -1,34 +1,20 @@
|
|||||||
from .db_utils import URL_DB_Writer
|
from .db_utils import DB_Handler
|
||||||
import feedparser
|
import feedparser
|
||||||
import dateutil
|
import dateutil
|
||||||
import psycopg
|
|
||||||
import logging
|
import logging
|
||||||
logging.basicConfig(format='%(filename)s | %(levelname)s | %(asctime)s | %(message)s')
|
logging.basicConfig(format='%(filename)s | %(levelname)s | %(asctime)s | %(message)s')
|
||||||
logger = logging.getLogger("news_fetcher")
|
logger = logging.getLogger("news_fetcher")
|
||||||
|
|
||||||
class NewsFeed():
|
class NewsFeed():
|
||||||
def __init__(self, db_connect_info, redis_connect_info) -> None:
|
def __init__(self, db_handler: DB_Handler) -> None:
|
||||||
logger.debug("Initializing News feed")
|
logger.debug("Initializing News feed")
|
||||||
self.db_connect_info = db_connect_info
|
self.db_handler = db_handler
|
||||||
self.redis_connect_info = redis_connect_info
|
|
||||||
|
|
||||||
def _get_feed_urls(self):
|
|
||||||
try:
|
|
||||||
with psycopg.connect(self.db_connect_info) as conn:
|
|
||||||
list_url_feeds = conn.execute("SELECT rss_feed FROM FEED;").fetchall()
|
|
||||||
# Decode (tuple with 1 element)
|
|
||||||
list_url_feeds = [l[0] for l in list_url_feeds]
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning("Exception fetching RSS sites: " + str(e))
|
|
||||||
list_url_feeds = []
|
|
||||||
return list_url_feeds
|
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
try:
|
try:
|
||||||
logger.debug("Starting NewsFeed.run()")
|
logger.debug("Starting NewsFeed.run()")
|
||||||
|
|
||||||
# Get feeds
|
# Get feeds
|
||||||
list_url_feeds = self._get_feed_urls()
|
list_url_feeds = self.db_handler._get_feed_urls()
|
||||||
logger.debug("Fetching news from feeds: {}".format(str(list_url_feeds)))
|
logger.debug("Fetching news from feeds: {}".format(str(list_url_feeds)))
|
||||||
|
|
||||||
# Process via RSS feeds
|
# Process via RSS feeds
|
||||||
@@ -44,17 +30,20 @@ class NewsFeed():
|
|||||||
# Process?
|
# Process?
|
||||||
if (url is not None):
|
if (url is not None):
|
||||||
# Available publish date?
|
# Available publish date?
|
||||||
publish_date = f.get("published", None)
|
publish_date_parsed = f.get("published_parsed")
|
||||||
if (publish_date is not None):
|
if (publish_date_parsed is None):
|
||||||
publish_date = dateutil.parser.parse(publish_date)
|
publish_date = f.get("published", None)
|
||||||
urls_publish_date.append(publish_date)
|
if (publish_date is not None):
|
||||||
|
publish_date_parsed = dateutil.parser.parse(publish_date)
|
||||||
|
|
||||||
|
# Published date
|
||||||
|
urls_publish_date.append(publish_date_parsed)
|
||||||
# URL
|
# URL
|
||||||
urls_fetched.append(url)
|
urls_fetched.append(url)
|
||||||
|
|
||||||
# URL fetching source
|
# URL fetching source
|
||||||
source = "feed {}".format(url_feed)
|
source = "feed {}".format(url_feed)
|
||||||
# Write to DB
|
# Write to DB
|
||||||
db_writer = URL_DB_Writer(self.db_connect_info, self.redis_connect_info)
|
self.db_handler.write_batch(urls_fetched, source)
|
||||||
db_writer.write_batch(urls_fetched, source)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Exception in NewsFeed.run(): {}".format(str(e)))
|
logger.warning("Exception in NewsFeed.run(): {}".format(str(e)))
|
||||||
|
|||||||
@@ -1,27 +1,15 @@
|
|||||||
from .db_utils import URL_DB_Writer
|
from .db_utils import DB_Handler
|
||||||
import newspaper
|
import newspaper
|
||||||
import psycopg
|
|
||||||
import logging
|
import logging
|
||||||
logging.basicConfig(format='%(filename)s | %(levelname)s | %(asctime)s | %(message)s')
|
logging.basicConfig(format='%(filename)s | %(levelname)s | %(asctime)s | %(message)s')
|
||||||
logger = logging.getLogger("news_fetcher")
|
logger = logging.getLogger("news_fetcher")
|
||||||
|
|
||||||
class NewsSiteParsing():
|
class NewsSiteParsing():
|
||||||
def __init__(self, db_connect_info, redis_connect_info) -> None:
|
def __init__(self, db_handler: DB_Handler) -> None:
|
||||||
logger.debug("Initializing News SiteParsing newspaper3k")
|
logger.debug("Initializing News SiteParsing newspaper4k")
|
||||||
self.db_connect_info = db_connect_info
|
self.db_handler = db_handler
|
||||||
self.redis_connect_info = redis_connect_info
|
|
||||||
|
|
||||||
def _get_url_hosts(self):
|
|
||||||
try:
|
|
||||||
with psycopg.connect(self.db_connect_info) as conn:
|
|
||||||
list_url_hosts = conn.execute("SELECT url_host FROM WEBSITE_OF_INTEREST;").fetchall()
|
|
||||||
# Decode (tuple with 1 element)
|
|
||||||
list_url_hosts = [l[0] for l in list_url_hosts]
|
|
||||||
except Exception as e:
|
|
||||||
logger.warning("Exception fetching RSS sites: " + str(e))
|
|
||||||
list_url_hosts = []
|
|
||||||
return list_url_hosts
|
|
||||||
|
|
||||||
|
# TODO: MOVE LOGIC ELSEWHERE!
|
||||||
def _postprocess(self, article_urls):
|
def _postprocess(self, article_urls):
|
||||||
return [url.replace("#comment-stream", "") for url in article_urls]
|
return [url.replace("#comment-stream", "") for url in article_urls]
|
||||||
|
|
||||||
@@ -29,11 +17,11 @@ class NewsSiteParsing():
|
|||||||
try:
|
try:
|
||||||
logger.debug("Starting NewsSiteParsing.run() for {}")
|
logger.debug("Starting NewsSiteParsing.run() for {}")
|
||||||
|
|
||||||
# Get feeds
|
# Get URL hosts
|
||||||
list_url_hosts = self._get_url_hosts()
|
list_url_hosts = self.db_handler._get_url_hosts()
|
||||||
logger.info("Fetching news by parsing URL hosts: {}".format(str(list_url_hosts)))
|
logger.info("Fetching news by parsing URL hosts: {}".format(str(list_url_hosts)))
|
||||||
|
|
||||||
# Process newspaper3k build method
|
# Process newspaper4k build method
|
||||||
for url_host_feed in list_url_hosts:
|
for url_host_feed in list_url_hosts:
|
||||||
# Protocol
|
# Protocol
|
||||||
if not (url_host_feed.startswith("http")):
|
if not (url_host_feed.startswith("http")):
|
||||||
@@ -41,18 +29,18 @@ class NewsSiteParsing():
|
|||||||
else:
|
else:
|
||||||
url_host_feed_formatted = url_host_feed
|
url_host_feed_formatted = url_host_feed
|
||||||
|
|
||||||
logger.debug("Fetching newspaper3k parsing based on URL: {}".format(url_host_feed_formatted))
|
logger.debug("Fetching newspaper4k parsing based on URL: {}".format(url_host_feed_formatted))
|
||||||
# Source object
|
# Source object
|
||||||
url_host_built = newspaper.build(url_host_feed_formatted)
|
url_host_built = newspaper.build(url_host_feed_formatted)
|
||||||
# Get articles URL list
|
# Get articles URL list
|
||||||
urls_fetched = url_host_built.article_urls()
|
urls_fetched = url_host_built.article_urls()
|
||||||
|
# TODO: MOVE!
|
||||||
# Post-processing
|
# Post-processing
|
||||||
urls_fetched = self._postprocess(urls_fetched)
|
urls_fetched = self._postprocess(urls_fetched)
|
||||||
|
|
||||||
# URL fetching source
|
# URL fetching source
|
||||||
source = "newspaper3k {}".format(url_host_feed)
|
source = "newspaper4k {}".format(url_host_feed)
|
||||||
# Write to DB
|
# Write to DB
|
||||||
db_writer = URL_DB_Writer(self.db_connect_info, self.redis_connect_info)
|
self.db_handler.write_batch(urls_fetched, source)
|
||||||
db_writer.write_batch(urls_fetched, source)
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("Exception in NewsSiteParsing.run(): {}".format(str(e)))
|
logger.warning("Exception in NewsSiteParsing.run(): {}".format(str(e)))
|
||||||
@@ -18,6 +18,15 @@ services:
|
|||||||
ports:
|
ports:
|
||||||
- 5432:5432
|
- 5432:5432
|
||||||
|
|
||||||
|
matitos_redis:
|
||||||
|
image: redis:alpine
|
||||||
|
container_name: db_redis
|
||||||
|
restart: unless-stopped
|
||||||
|
ports:
|
||||||
|
- 6379:6379
|
||||||
|
#expose:
|
||||||
|
# - 6379
|
||||||
|
|
||||||
# django:
|
# django:
|
||||||
# Env: DB_HOST=matitos_db
|
# Env: DB_HOST=matitos_db
|
||||||
# DJANGO_DB_NAME=${DB_DATABASE_NAME:-matitos}
|
# DJANGO_DB_NAME=${DB_DATABASE_NAME:-matitos}
|
||||||
|
|||||||
Reference in New Issue
Block a user