220 lines
8.2 KiB
Python
220 lines
8.2 KiB
Python
import time
|
|
import jwt
|
|
import os
|
|
import requests
|
|
import random
|
|
from .llm import OllamaClient
|
|
from ..models import Urls, UrlContent
|
|
|
|
from .logger import get_logger
|
|
logger = get_logger()
|
|
|
|
|
|
class Publisher():
|
|
def __init__(self):
|
|
self.admin_api_url = os.getenv("GHOST_ADMIN_API_URL")
|
|
self.admin_api_key = os.getenv("GHOST_ADMIN_API_KEY")
|
|
|
|
def _create_jwt(self, admin_api_key):
|
|
id_, secret = admin_api_key.split(':')
|
|
iat = int(time.time())
|
|
exp = iat + 5 * 60 # 5 minutes
|
|
header = {'alg': 'HS256', 'kid': id_}
|
|
payload = {
|
|
'iat': iat,
|
|
'exp': exp,
|
|
'aud': '/v5/admin/' # Adjust depending on your Ghost version
|
|
}
|
|
token = jwt.encode(payload, bytes.fromhex(secret), algorithm='HS256', headers=header)
|
|
return token
|
|
|
|
def _create_ghost_post(self, post_data):
|
|
# Get token
|
|
jwt_token = self._create_jwt(self.admin_api_key)
|
|
|
|
headers = {
|
|
'Authorization': f'Ghost {jwt_token}',
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
post_data = {"posts": [post_data]}
|
|
|
|
response = requests.post(
|
|
os.path.join(self.admin_api_url, "posts"),
|
|
json=post_data,
|
|
headers=headers,
|
|
params={"source":"html"}
|
|
)
|
|
|
|
if response.status_code == 201:
|
|
logger.info("Ghost post published successfully")
|
|
return response.json()
|
|
else:
|
|
logger.warning("Ghost - Failed to publish post: {} {}".format(response.status_code, response.text))
|
|
return None
|
|
|
|
def _published_url_id(self, url_id):
|
|
# Get token
|
|
jwt_token = self._create_jwt(self.admin_api_key)
|
|
|
|
headers = {
|
|
'Authorization': f'Ghost {jwt_token}',
|
|
'Content-Type': 'application/json'
|
|
}
|
|
|
|
# Query param filter by URL ID
|
|
params = {"filter": "tags:hash-url-id-{}".format(url_id)}
|
|
# Get posts using filter
|
|
response = requests.get(os.path.join(self.admin_api_url, "posts"), params=params, headers=headers)
|
|
# To JSON
|
|
dict_response = response.json()
|
|
|
|
if (len(dict_response.get("posts")) > 0):
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def _get_photo_url(self, query):
|
|
# TODO: Get already used photos to skip. Use DB
|
|
try:
|
|
endpoint = "https://api.pexels.com/v1/search?query={}".format(query)
|
|
header= {"Authorization": os.getenv("PEXELS_API_KEY")}
|
|
|
|
while True:
|
|
# Request
|
|
r = requests.get(endpoint, headers=header)
|
|
dict_images = r.json()
|
|
|
|
# Get list of photos
|
|
list_photos = dict_images.get("photos", [])
|
|
|
|
# TODO: IMPROVE...
|
|
photo_url = random.choice(list_photos).get("src").get("landscape")
|
|
return photo_url
|
|
|
|
|
|
for photo in list_photos:
|
|
# Already used? -> Continue
|
|
# photo.get("id") # Compare against DB
|
|
|
|
# Get landscape photo
|
|
photo_url = photo.get("src").get("landscape")
|
|
return photo_url
|
|
|
|
# Iterated page, already used all images
|
|
endpoint = dict_images.get("next_page")
|
|
except Exception as e:
|
|
logger.warning("Something went wrong while fetching image from Pexels: {}".format(str(e)))
|
|
return None
|
|
|
|
def publish(self, url_id):
|
|
logger.info("Publishing URL ID {}".format(url_id))
|
|
|
|
# URL Content
|
|
url_content = UrlContent.objects.filter(pk=url_id).first()
|
|
url = Urls.objects.filter(pk=url_id).first()
|
|
|
|
if (url_content is None):
|
|
logger.warning("Ghost - URL Content is NULL for URL ID: {} {}".format(url_id, url.url))
|
|
return
|
|
if (url_content.valid_content is False):
|
|
logger.warning("Ghost - URL Content is not valid for URL ID: {} {}".format(url_id, url.url))
|
|
return
|
|
|
|
# URL ID already published?
|
|
if (self._published_url_id(url_id)):
|
|
logger.info("Ghost - URL ID {} already published, skipping".format(url_id))
|
|
return
|
|
|
|
###########################################
|
|
client_llm = OllamaClient()
|
|
# Model
|
|
model = client_llm.get_models()[0]
|
|
# Prompt
|
|
prompt = client_llm.get_prompt(url_content.content)
|
|
# Generate content
|
|
generated_content_dict = client_llm.generate(model, prompt, format="json")
|
|
logger.debug("Generated content: {}".format(generated_content_dict))
|
|
|
|
###########################################
|
|
# Get where description
|
|
generated_content_where = generated_content_dict.get("where")
|
|
# Prompt to extract address / location
|
|
prompt = 'Only answer with the location or address which can be extracted from this description: "{}"'.format(generated_content_where)
|
|
# LLM
|
|
extracted_location = client_llm.generate(model, prompt, format=None)
|
|
logger.debug("Estimated location: {}".format(extracted_location))
|
|
# OSM API
|
|
params = {
|
|
'q': extracted_location,
|
|
'format': 'json',
|
|
'addressdetails': 1,
|
|
'limit': 1
|
|
}
|
|
|
|
response = requests.get('https://nominatim.openstreetmap.org/search', params=params, headers={'User-Agent': 'App'})
|
|
list_data = response.json()
|
|
if (len(list_data) > 0):
|
|
data = list_data[0]
|
|
location_url = "https://openstreetmap.org/{}/{}".format(data.get("osm_type"), data.get("osm_id"))
|
|
else:
|
|
location_url = None
|
|
###########################################
|
|
|
|
# Parse generated content
|
|
summary, five_w = "", ""
|
|
for k, v in generated_content_dict.items():
|
|
if ("summary" in k.lower()):
|
|
summary = v if type(v) is str else "\n".join(summary)
|
|
else:
|
|
five_w += "{}: {}\n".format(k.capitalize(), v if type(v) is str else ". ".join(v) )
|
|
# Aggregate generated content
|
|
generated_content = "{}\n\n{}".format(summary, five_w)
|
|
|
|
################################################################################################
|
|
if (url_content.image_main_url is None) or (requests.get(url_content.image_main_url).status_code != 200):
|
|
# Invalid main image -> Search for one
|
|
photo_query = "Mountain landscape"
|
|
photo_url = self._get_photo_url(photo_query)
|
|
else:
|
|
photo_url = url_content.image_main_url
|
|
|
|
# HTML: Generate content
|
|
html_data = "".join([ "<p>{}</p>".format(t) for t in generated_content.split("\n") ])
|
|
# HTML: Add location if available
|
|
if (location_url is not None):
|
|
html_data += '<p><a href="{}">Estimated location</a></p>'.format(location_url)
|
|
# HTML: Add source
|
|
html_data += '<p><a href="{}">Source: {}</a></p>'.format(url.url, url_content.url_host.replace("https://", ""))
|
|
|
|
post_data = {
|
|
# "slug": "hey-short",
|
|
"title": url_content.title,
|
|
"html": html_data,
|
|
#"meta_title": "",
|
|
#"meta_description": "",
|
|
"feature_image": photo_url,
|
|
#"feature_image_caption": "",
|
|
"status": "published",
|
|
"tags": ["#url-id-{}".format(url_id)] # Hidden tag with associated URL ID
|
|
}
|
|
|
|
# Publish post
|
|
payload = self._create_ghost_post(post_data)
|
|
logger.debug("Ghost payload: {}".format(str(payload)))
|
|
|
|
'''
|
|
# Return a response (you can customize this as needed)
|
|
return HttpResponse(f"""
|
|
<h1>Generated Content</h1>
|
|
<p>URL ID: {id_url}</p>
|
|
<p>URL: {url.url}</p>
|
|
<p>Title: {url_content.title}</p>
|
|
<p>Description: {url_content.description}</p>
|
|
<p>Content: {url_content.content}</p>
|
|
<p>Valid content: {url_content.valid_content}</p>
|
|
<p>Language: {url_content.language}</p>
|
|
<p>Main image: {url_content.image_main_url}</p>
|
|
<p>Generated summary: {article_summary}</p>
|
|
""")
|
|
''' |