Files
matitos_news/app_urls/fetcher/views.py
Luciano Gervasoni b876f6d720 LLM view refactor
2025-04-14 13:49:06 +02:00

315 lines
14 KiB
Python

from .views_base import link_list, logs, log_db, trigger_task
from django.core.paginator import Paginator
from django.shortcuts import render, get_object_or_404
from django.http import StreamingHttpResponse, JsonResponse
from django.db.models import Q, Count
from django.utils import timezone
from django.utils.timezone import now, timedelta
from .models import Urls, Source, Search, UrlContent, UrlsSourceSearch, UrlsDuplicate
import ollama
import os
import json
####################################################################################################
class OllamaClient():
def __init__(self):
self.client = ollama.Client(host=os.getenv("ENDPOINT_OLLAMA", "https://ollamamodel.matitos.org"))
def _get_default_model(self):
return "llama3.2:3b"
def get_models(self):
models = sorted([m.model for m in self.client.list().models])
if (self._get_default_model() in models):
return [self._get_default_model()] + [m for m in models if m != self._get_default_model()]
else:
return models
def get_prompt(self):
return "Rewrite the text below into a clear and concise summary, presenting the key points as if they are newly written insights. Do not mention or reference the original text, its source, or any phrases like 'According to' or 'The text states'. Instead, write in a natural, standalone format that feels like an original explanation. Keep it brief, engaging, informative, in the style of a news article, and no longer than a paragraph:"
def llm(request):
def stream_response(model, text):
msg_content = {
"role": "user",
"content": text,
}
response = OllamaClient().client.chat(model=model, messages=[msg_content], stream=True)
for chunk in response:
yield chunk["message"]["content"] # Stream each chunk of text
if request.method == 'POST':
try:
body_data = json.loads(request.body)
message = body_data.get('message')
model = body_data.get('model')
if message is None:
return JsonResponse({'error': 'No message found in request'}, status=400)
return StreamingHttpResponse(stream_response(model, message), content_type="text/plain")
except json.JSONDecodeError:
return JsonResponse({'error': 'Invalid JSON'}, status=400)
return JsonResponse({'error': 'Only POST method allowed'}, status=405)
def url_detail_view(request, id):
url_item = get_object_or_404(Urls, id=id)
url_sources = list(Source.objects.filter(urlssourcesearch__id_url=url_item).distinct())
url_searches = list(Search.objects.filter(urlssourcesearch__id_url=url_item).distinct())
if (url_item.status == Urls.STATUS_ENUM.DUPLICATE):
url_canonical = UrlsDuplicate.objects.get(id_url_duplicated=url_item).id_url_canonical
else:
url_canonical = None
try:
url_content = UrlContent.objects.get(pk=id)
except UrlContent.DoesNotExist:
url_content = {}
# TODO: https://github.com/ollama/ollama-python?tab=readme-ov-file#async-client
ollama = OllamaClient()
context = {
'url_item': url_item,
'sources': url_sources,
'searches': url_searches,
'models': ollama.get_models(),
'prompt': ollama.get_prompt(),
'url_content': url_content,
'url_canonical': url_canonical,
}
return render(request, 'url_detail.html', context)
####################################################################################################
def charts(request):
return render(request, 'charts.html')
def urls_by_fetch_date(request):
# Get the filtering date parameter
days = float(request.GET.get('days', 30)) # Default is 30 days
start_date = timezone.now() - timedelta(days=days)
# Count the number of URLs grouped by fetch date
urls_data = Urls.objects.filter(ts_fetch__gte=start_date) \
.values('ts_fetch__date') \
.annotate(count=Count('id')) \
.order_by('ts_fetch__date')
# Format data to return as JSON
data = {
'labels': [item['ts_fetch__date'] for item in urls_data],
'values': [item['count'] for item in urls_data],
}
return JsonResponse(data)
def urls_per_status(request):
# Get the filtering date parameter
days = float(request.GET.get('days', 30)) # Default is 30 days
start_date = timezone.now() - timedelta(days=days)
# Count the number of URLs grouped by status within the date range
urls_data = Urls.objects.filter(ts_fetch__gte=start_date) \
.values('status') \
.annotate(count=Count('id')) \
.order_by('status')
# Format data for JSON
data = {
'labels': [item['status'] for item in urls_data],
'values': [item['count'] for item in urls_data],
}
return JsonResponse(data)
def urls_per_source(request):
# Get the filtering date parameter
days = float(request.GET.get('days', 30)) # Default is 30 days
start_date = timezone.now() - timedelta(days=days)
# Count the number of URLs grouped by source
urls_data = UrlsSourceSearch.objects \
.filter(id_url__ts_fetch__gte=start_date) \
.values('id_source__source') \
.annotate(count=Count('id_url')) \
.order_by('id_source__source')
# Format data for JSON
data = {
'labels': [item['id_source__source'] for item in urls_data],
'values': [item['count'] for item in urls_data],
}
return JsonResponse(data)
def urls_per_search(request):
# Get the filtering date parameter
days = float(request.GET.get('days', 30)) # Default is 30 days
start_date = timezone.now() - timedelta(days=days)
# Count the number of URLs grouped by search
urls_data = UrlsSourceSearch.objects \
.filter(id_url__ts_fetch__gte=start_date) \
.values('id_search__search') \
.annotate(count=Count('id_url')) \
.order_by('id_search__search')
# Format data for JSON
data = {
'labels': [item['id_search__search'] for item in urls_data],
'values': [item['count'] for item in urls_data],
}
return JsonResponse(data)
####################################################################################################
def filtered_urls(request):
statuses = Urls.STATUS_ENUM.choices
searches = Search.objects.all()
sources = Source.objects.all()
languages = list(UrlContent.objects.distinct('language').values_list('language', flat=True)) # TODO: Cache languages
languages = ["Unknown"] + [l for l in languages if l is not None]
valid_contents = ["True", "False", "Unknown"]
# Get selected parameters
selected_status = request.GET.getlist('status', ["null"])
selected_search = request.GET.getlist('search', ["null"])
selected_source = request.GET.getlist('source', ["null"])
selected_language = request.GET.getlist('language', ["null"])
selected_valid_contents = request.GET.getlist('valid_content', ["null"])
selected_min_sources = int(request.GET.get('min_sources', 1))
selected_days = request.GET.get("days", 30)
per_page = request.GET.get('per_page', 100) # Default is X URLs per page
page_number = request.GET.get('page') # Get the current page number
# Override with default filters? [Case: no params update on URL] -> Only on "Home" click, or "Next page"
if (len(request.GET.keys()) == 0) or ((len(request.GET.keys()) == 1) and ("page" in request.GET.keys())):
selected_status = ["all"]
selected_search = ["all"]
selected_source = ["all"]
selected_language = ["all"]
selected_valid_contents = ["all"]
else:
# All elements
all_status = [str(status[0]) for status in statuses]
all_search = [str(search.id) for search in searches]
all_source = [str(source.id) for source in sources]
all_languages = languages
all_valid_contents = valid_contents
# Non-defult parameters, if list with all elements, replace with "all" and avoid heavy query
selected_status = ["all"] if (set(selected_status) == set(all_status)) else selected_status
selected_search = ["all"] if (set(selected_search) == set(all_search)) else selected_search
selected_source = ["all"] if (set(selected_source) == set(all_source)) else selected_source
selected_language = ["all"] if (set(selected_language) == set(all_languages)) else selected_language
selected_valid_contents = ["all"] if (set(selected_valid_contents) == set(all_valid_contents)) else selected_valid_contents
# Filter URLs based on selected filters
if any( 'null' in l for l in [selected_status, selected_search, selected_source, selected_language, selected_valid_contents] ):
urls = []
else:
# Filter by date
query = Q(ts_fetch__gte=now() - timedelta(days=float(selected_days)))
# Additional filters
if ("all" not in selected_status):
query &= Q(status__in=selected_status)
if ("all" not in selected_source):
query &= Q(urlssourcesearch__id_source__in=selected_source)
if ("all" not in selected_search):
query &= Q(urlssourcesearch__id_search__in=selected_search)
if ("all" not in selected_language):
# URLs with selected languages
subquery = Q(urlcontent__language__in=selected_language)
if ("Unknown" in selected_language):
# URLs with NULL language
subquery |= Q(urlcontent__language__isnull=True)
# URLs with no UrlContent record at all (similar to URLs with NULL language)
subquery |= Q(urlcontent__id_url__isnull=True)
# Update query
query &= (subquery)
if ("all" not in selected_valid_contents):
# Boolean array
bool_array = []
if ('True' in selected_valid_contents):
bool_array.append(True)
if ('False' in selected_valid_contents):
bool_array.append(False)
# URLs with selected valid_contents
subquery = Q(urlcontent__valid_content__in=bool_array)
if ("Unknown" in selected_valid_contents):
# URLs with NULL valid_content
subquery |= Q(urlcontent__valid_content__isnull=True)
# URLs with no UrlContent record at all (similar to URLs with NULL valid_content)
subquery |= Q(urlcontent__id_url__isnull=True)
# Update query
query &= (subquery)
if (selected_min_sources > 1):
query &= Q(pk__in=UrlsSourceSearch.objects.values('id_url').annotate(search_count=Count('id_source', distinct=True)).filter(search_count__gte=selected_min_sources).values('id_url'))
# Run query
urls = Urls.objects.filter(query).distinct() # .order_by('-ts_fetch')
# Pagination
paginator = Paginator(urls, per_page) # Paginate the filtered URLs
page_obj = paginator.get_page(page_number) # Get the current page object
# Map URL IDs to their sources & searches, only for subset of URLs (page of interest)
sources_map = {
url.id: list(Source.objects.filter(urlssourcesearch__id_url=url).distinct()) for url in page_obj.object_list
}
searches_map = {
url.id: list(Search.objects.filter(urlssourcesearch__id_url=url).distinct()) for url in page_obj.object_list
}
url_content_map = {
url.id: UrlContent.objects.filter(pk=url).first() for url in page_obj.object_list
}
# Custom replace search type text
for s in searches:
s.type = s.type.replace("rss_feed", "rss").replace("url_host", "url").replace("keyword_search", "keyword")
context = {
'urls': page_obj, # Pass the paginated URLs
'per_page': per_page, # Send per_page value for dynamic pagination
'statuses': statuses,
'searches': sorted(searches, key=lambda x: (x.type, x.search)),
'sources': sorted(sources, key=lambda x: x.source),
'languages': sorted(languages, key=lambda x: (x is None, x)),
'valid_contents': valid_contents,
# Selection
'selected_status': selected_status,
'selected_search': selected_search,
'selected_source': selected_source,
'selected_language': selected_language,
'selected_valid_contents': selected_valid_contents,
"selected_min_sources": selected_min_sources,
"selected_days": selected_days,
# Map
"sources_map": sources_map,
"searches_map": searches_map,
"url_content_map": url_content_map,
# "charts": charts,
# "list_per_page": [15, 100, 500],
# "list_days_text": ([0.25, 1, 7, 30, 365], ["Last 6 hours", "Last 24 hours", "Last 7 days", "Last 30 days", "Last 365 days"])
}
return render(request, 'filtered_urls.html', context)
####################################################################################################
def content_generation(request):
'''
# Get list of URLs ID
selected_urls = request.GET.getlist('urls', [])
# Sample URLs
selected_urls = [13460, 13455, 13454, 13452, 13210]
'''
####################################################################################################