new newsletter format

This commit is contained in:
Gaël
2026-01-31 19:21:24 +01:00
parent e15d53339f
commit 85d79db3fd
2 changed files with 573 additions and 87 deletions

7
env.bat Normal file
View File

@@ -0,0 +1,7 @@
set GHOST_ADMIN_KEY=68bad0e13546e700012dd65d:116a81b7e189d3b3d3b86082f97ef65daedb06498a3f1f902b8e0c08d095dc19
set GHOST_ADMIN_URL=https://ghostadmin.zep.best/ghost/api/admin/__bot/FF4440EBA737506D397C170A8422109C357AA7582F10938B7C5F11D6B652F5D4
set GHOST_EMAIL_SEGMENT=status:free
set GHOST_NEWSLETTER_SLUG=default-newsletter
set GHOST_CONTENT_URL=https://ghost.zep.best
set DB_FILE_FALLBACK=f:\workspace\Substack_JV\data\published.db
set MISTRAL_API_KEY=tQJHvYlmwz1ihKxOhXS3FmDNTRhBh6b3

View File

@@ -2,18 +2,67 @@ import asyncio
import argparse
import datetime as dt
import html
import json
import logging
import os
import random
import re
import time
from logging.handlers import RotatingFileHandler
from typing import Optional, List
from typing import Optional, List, Dict
import feedparser
import requests
import jwt
import zoneinfo # Python 3.9+
from urllib.parse import urlparse, parse_qs
from urllib.parse import urlparse, parse_qs, urljoin
# ------------- Web Crawler for Images -------------
def extract_image_from_url(url: str, timeout: int = 10) -> Optional[str]:
"""
Fetch a webpage and extract the best image (og:image, twitter:image, or first large image).
Returns the image URL or None.
"""
try:
resp = requests.get(
url,
timeout=timeout,
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml",
},
allow_redirects=True,
)
resp.raise_for_status()
html_content = resp.text
# Try OpenGraph image first (most reliable)
og_match = re.search(r'<meta[^>]+property=["\']og:image["\'][^>]+content=["\']([^"\']+)["\']', html_content, re.IGNORECASE)
if not og_match:
og_match = re.search(r'<meta[^>]+content=["\']([^"\']+)["\'][^>]+property=["\']og:image["\']', html_content, re.IGNORECASE)
if og_match:
img_url = og_match.group(1)
return urljoin(url, img_url)
# Try Twitter card image
tw_match = re.search(r'<meta[^>]+name=["\']twitter:image["\'][^>]+content=["\']([^"\']+)["\']', html_content, re.IGNORECASE)
if not tw_match:
tw_match = re.search(r'<meta[^>]+content=["\']([^"\']+)["\'][^>]+name=["\']twitter:image["\']', html_content, re.IGNORECASE)
if tw_match:
img_url = tw_match.group(1)
return urljoin(url, img_url)
# Fallback: look for article/main image
article_img = re.search(r'<article[^>]*>.*?<img[^>]+src=["\']([^"\']+)["\']', html_content, re.IGNORECASE | re.DOTALL)
if article_img:
img_url = article_img.group(1)
# Skip tiny images, icons, avatars
if not any(skip in img_url.lower() for skip in ['avatar', 'icon', 'logo', 'emoji', '1x1', 'pixel']):
return urljoin(url, img_url)
return None
except Exception as e:
LOG.debug("Failed to extract image from %s: %s", url, e)
return None
# ------------- YouTube helpers -------------
@@ -30,11 +79,11 @@ def fetch_youtube_oembed_html(youtube_url: str, timeout: int = 10) -> Optional[s
)
resp.raise_for_status()
data = resp.json()
html = data.get("html")
if not html:
html_content = data.get("html")
if not html_content:
return None
# Wrap in Ghost embed card container; do NOT alter the iframe attributes.
return f'<figure class="kg-card kg-embed-card">{html}</figure>'
return f'<figure class="kg-card kg-embed-card">{html_content}</figure>'
except Exception:
return None
@@ -82,6 +131,239 @@ class RSSfeed:
self.url = url
self.youtube = yt
# ------------- Mistral AI Client -------------
class MistralClient:
"""Client for Mistral AI API to filter and group news items."""
def __init__(self, api_key: str, model: str = "mistral-small-latest"):
self.api_key = api_key
self.model = model
self.base_url = "https://api.mistral.ai/v1/chat/completions"
def _call_api(self, messages: List[Dict], temperature: float = 0.3) -> Optional[str]:
"""Make a call to the Mistral API."""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
payload = {
"model": self.model,
"messages": messages,
"temperature": temperature,
"response_format": {"type": "json_object"},
}
try:
resp = requests.post(self.base_url, headers=headers, json=payload, timeout=120)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"]
except Exception as e:
LOG.error("Mistral API error: %s", e)
return None
def filter_news_items(self, items: List[dict], dry_run: bool = False) -> List[dict]:
"""
Filter out non-news items (tips, walkthroughs, guides, tutorials).
Returns only actual game news items.
"""
if not items:
return []
# Prepare items for analysis
items_for_analysis = []
for i, item in enumerate(items):
items_for_analysis.append({
"id": i,
"title": item.get("title", ""),
"link": item.get("link", ""),
"summary": (item.get("summary", "") or "")[:300], # Truncate for API
})
# Split into batches to avoid token limits
batch_size = 50
filtered_ids = set()
for batch_start in range(0, len(items_for_analysis), batch_size):
batch = items_for_analysis[batch_start:batch_start + batch_size]
prompt = f"""Tu analyses des articles de sites de jeux vidéo. Tu dois identifier UNIQUEMENT les articles à EXCLURE.
EXCLURE UNIQUEMENT si le titre contient EXPLICITEMENT UN de ces mots-clés:
- "guide" (le mot exact)
- "soluce" (le mot exact)
- "astuce" (le mot exact)
- "solution" (le mot exact, pas "résolution")
- "code promo"
- "bon plan"
- "-20%" ou "-30%" etc (réductions)
- "tuto" ou "tutoriel"
- "comment faire"
- "how to"
NE JAMAIS EXCLURE:
- "Early Access" = news de sortie anticipée, À GARDER
- "Test" ou "Review" = critique, À GARDER
- "Partie Rapide" = émission/podcast, À GARDER
- Tout article de news, annonce, sortie, preview
- Tout article d'opinion, éditorial, récap
- Tout le reste qui ne contient pas les mots-clés d'exclusion ci-dessus
Articles à analyser:
{json.dumps(batch, ensure_ascii=False, indent=2)}
Retourne un JSON avec "exclude_ids" contenant UNIQUEMENT les IDs des articles guides/soluces/promos.
Si aucun article ne correspond aux critères d'exclusion, retourne {{"exclude_ids": []}}
Sois TRÈS conservateur - en cas de doute, NE PAS exclure."""
messages = [{"role": "user", "content": prompt}]
response = self._call_api(messages)
if response:
try:
result = json.loads(response)
excluded_ids = set(result.get("exclude_ids", []))
# Keep all items NOT in excluded_ids
batch_ids = {item["id"] for item in batch}
kept_ids = batch_ids - excluded_ids
filtered_ids.update(kept_ids)
except json.JSONDecodeError:
LOG.warning("Failed to parse Mistral response for filtering")
# Fallback: include all items from this batch
filtered_ids.update(item["id"] for item in batch)
else:
# Fallback: include all items from this batch
filtered_ids.update(item["id"] for item in batch)
# Log filtered out items in dry-run mode
if dry_run:
excluded_ids = set(range(len(items))) - filtered_ids
if excluded_ids:
LOG.debug("=== FILTERED OUT (non-news) ===")
for i in sorted(excluded_ids):
LOG.debug(" [EXCLUDED] %s", items[i].get("title", "No title"))
LOG.debug("=== KEPT (news) ===")
for i in sorted(filtered_ids):
if i < len(items):
LOG.debug(" [KEPT] %s", items[i].get("title", "No title"))
return [items[i] for i in sorted(filtered_ids) if i < len(items)]
def group_similar_items(self, items: List[dict]) -> List[Dict]:
"""
Group news items by category (News, Tests/Reviews, Previews, etc.)
with sub-groups by game/topic within each category.
Returns a list of categories, each with sub-groups containing items.
"""
if not items:
return []
# Prepare items for analysis
items_for_analysis = []
for i, item in enumerate(items):
items_for_analysis.append({
"id": i,
"title": item.get("title", ""),
"link": item.get("link", ""),
})
prompt = f"""Organise ces articles de jeux vidéo en CATÉGORIES et SOUS-GROUPES.
Articles à organiser:
{json.dumps(items_for_analysis, ensure_ascii=False, indent=2)}
CATÉGORIES (utilise ces noms exacts):
1. "Actualités" - News, annonces, sorties, mises à jour, industrie
2. "Tests & Critiques" - Reviews, tests, avis, notes
3. "Aperçus & Previews" - Previews, impressions, démos, hands-on
4. "Vidéos" - Trailers, gameplay vidéos, podcasts
5. "Autres" - Le reste
RÈGLES DE GROUPEMENT (TRÈS IMPORTANT):
- Groupe par FRANCHISE ou SÉRIE (ex: tous les "Final Fantasy" ensemble, même FF7, FF16, FF XIV)
- Groupe par UNIVERS (ex: "Warhammer 40K" = Space Marine + Dawn of War + Darktide)
- Groupe par ÉVÉNEMENT (ex: "Nintendo Direct", "State of Play", "Game Awards")
- N'utilise JAMAIS de noms de sites web comme groupes (pas "NoFrag", "JeuxOnline", etc.)
EXEMPLES DE GROUPEMENTS CORRECTS:
- "Final Fantasy VII Rebirth sur Switch 2" + "Final Fantasy VII Remake Intergrade en tête" → groupe "Final Fantasy"
- "Techmarine dans Space Marine 2" + "Dawn of War 4 gameplay Ork" → groupe "Warhammer 40K"
- "GTA 6 trailer" + "GTA 6 date de sortie" → groupe "GTA 6"
- "Nintendo Direct annoncé" + "Zelda dans le Nintendo Direct" → groupe "Nintendo Direct"
Retourne ce JSON:
{{
"categories": [
{{
"name": "Actualités",
"subgroups": [
{{"title": "Final Fantasy", "item_ids": [0, 3, 7]}},
{{"title": "Warhammer 40K", "item_ids": [1, 2]}},
{{"title": "Steam", "item_ids": [5]}}
]
}}
]
}}
IMPORTANT: Chaque article dans UN SEUL sous-groupe. Titre = nom de franchise/série/univers, PAS nom de site."""
messages = [{"role": "user", "content": prompt}]
response = self._call_api(messages, temperature=0.2)
if response:
try:
result = json.loads(response)
categories = []
used_ids = set()
for cat_data in result.get("categories", []):
cat_name = cat_data.get("name", "Autres")
subgroups = []
for sg_data in cat_data.get("subgroups", []):
sg_title = sg_data.get("title", "Divers")
item_ids = sg_data.get("item_ids", [])
# Filter to valid, unused IDs
valid_ids = [i for i in item_ids if i < len(items) and i not in used_ids]
if valid_ids:
used_ids.update(valid_ids)
subgroups.append({
"title": sg_title,
"items": [items[i] for i in valid_ids]
})
if subgroups:
categories.append({
"name": cat_name,
"subgroups": subgroups
})
# Add any ungrouped items
ungrouped = [items[i] for i in range(len(items)) if i not in used_ids]
if ungrouped:
# Find or create "Autres" category
autres_cat = next((c for c in categories if c["name"] == "Autres"), None)
if autres_cat:
autres_cat["subgroups"].append({"title": "Divers", "items": ungrouped})
else:
categories.append({
"name": "Autres",
"subgroups": [{"title": "Divers", "items": ungrouped}]
})
return categories
except json.JSONDecodeError:
LOG.warning("Failed to parse Mistral response for grouping")
# Fallback: return all items in a single category/subgroup
return [{
"name": "Actualités de la semaine",
"subgroups": [{"title": "Toutes les news", "items": items}]
}]
# ------------- Ghost Admin API client -------------
class GhostAdmin:
@@ -163,41 +445,63 @@ class GhostAdmin:
# ------------- Task orchestration -------------
class GhostTask:
def __init__(self, feeds: List[RSSfeed], admin_url: str, admin_key: str, newsletter_slug: Optional[str] = None, email_segment: Optional[str] = None):
def __init__(self, feeds: List[RSSfeed], admin_url: str, admin_key: str,
mistral_api_key: Optional[str] = None,
newsletter_slug: Optional[str] = None, email_segment: Optional[str] = None,
dry_run: bool = False):
self.ghost = GhostAdmin(admin_url, admin_key)
self.feeds = feeds
self.newsletter_slug = newsletter_slug
self.email_segment = email_segment
self.mistral = MistralClient(mistral_api_key) if mistral_api_key else None
self.dry_run = dry_run
for feed in self.feeds:
LOG.info("Adding feed %s", feed.url)
# --- startup immediate run if not yet published today
# --- startup immediate run if not yet published this week
def _published_today(self) -> bool:
def _published_this_week(self) -> bool:
"""Check if we already published this week (since last Saturday 12:00)."""
tz = zoneinfo.ZoneInfo("Europe/Brussels")
last = self.ghost.latest_published_date("Europe/Brussels")
if not last:
return False
return last.date() == dt.datetime.now(tz).date()
async def maybe_run_today(self):
if not self._published_today():
LOG.info("Aucune newsletter publiée aujourd'hui -> génération immédiate.")
await self.daily_task()
now = dt.datetime.now(tz)
# Find last Saturday at 12:00
days_since_saturday = (now.weekday() - 5) % 7 # Saturday = 5
last_saturday = (now - dt.timedelta(days=days_since_saturday)).replace(
hour=12, minute=0, second=0, microsecond=0
)
return last >= last_saturday
async def maybe_run_this_week(self):
if not self._published_this_week():
LOG.info("Aucune newsletter publiée cette semaine -> génération immédiate.")
await self.weekly_task()
else:
LOG.info("Déjà publié aujourd'hui, on attend la prochaine fenêtre.")
LOG.info("Déjà publié cette semaine, on attend la prochaine fenêtre.")
# --- utils
@staticmethod
def _fr_date_today() -> str:
def _fr_week_range() -> str:
"""Returns a French formatted date range for the past week."""
months = {
'January': 'Janvier', 'February': 'Février', 'March': 'Mars', 'April': 'Avril',
'May': 'Mai', 'June': 'Juin', 'July': 'Juillet', 'August': 'Août',
'September': 'Septembre', 'October': 'Octobre', 'November': 'Novembre', 'December': 'Décembre'
}
today = dt.datetime.now()
formatted = today.strftime("%d %B %Y")
week_ago = today - dt.timedelta(days=7)
# Format: "24 - 31 Janvier 2025" or "28 Janvier - 4 Février 2025"
if week_ago.month == today.month:
formatted = f"{week_ago.day} - {today.strftime('%d %B %Y')}"
else:
formatted = f"{week_ago.strftime('%d %B')} - {today.strftime('%d %B %Y')}"
for en, fr in months.items():
formatted = formatted.replace(en, fr)
return formatted
@@ -237,22 +541,103 @@ class GhostTask:
pass
return None
# --- HTML builder
# --- HTML builder for grouped content
def _build_html_roundup(self, items: List[dict], feeds: List[RSSfeed]):
def _build_html_roundup_grouped(self, categories: List[Dict], feeds: List[RSSfeed]):
"""
Construit le HTML et retourne (html, feature_image_url_ou_None).
- YouTube: iframe + miniature cliquable (fallback email-safe)
- Images: collecte la première pour feature_image
Construit le HTML avec des catégories et sous-groupes thématiques.
Inclut un résumé et une table des matières en haut.
Retourne (html, feature_image_url_ou_None).
"""
parts: List[str] = []
#parts.append(f"<h2>Les news du {self._fr_date_today()}</h2>")
first_image: Optional[str] = None
# --- Build Summary Section ---
parts.append('<h2>✨ En bref cette semaine</h2>')
parts.append('<ul>')
for cat in categories:
cat_name = cat.get("name", "Actualités")
subgroups = cat.get("subgroups", [])
# Get top subgroups with more than 1 item (by item count) for summary
multi_item_subgroups = [sg for sg in subgroups if len(sg.get("items", [])) > 1]
sorted_subgroups = sorted(multi_item_subgroups, key=lambda sg: len(sg.get("items", [])), reverse=True)
top_subgroups = sorted_subgroups[:5] # Max 5 highlights per category
total_items = sum(len(sg.get("items", [])) for sg in subgroups)
if top_subgroups:
highlights = ", ".join(sg.get("title", "Divers") for sg in top_subgroups)
total_items = sum(len(sg.get("items", [])) for sg in subgroups)
parts.append(f'<li><strong>{html.escape(cat_name)}</strong>: {html.escape(highlights)} ({total_items} articles)</li>')
elif total_items > 0:
parts.append(f'<li><strong>{html.escape(cat_name)}</strong>: {total_items} articles</li>')
parts.append('</ul>')
parts.append('<hr>')
# --- Build Table of Contents ---
# parts.append('<h2>📋 Sommaire</h2>')
# parts.append('<ul>')
# for cat in categories:
# cat_name = cat.get("name", "Actualités")
# cat_anchor = self._make_anchor(cat_name)
# subgroups = cat.get("subgroups", [])
# total_items = sum(len(sg.get("items", [])) for sg in subgroups)
# parts.append(f'<li><a href="#{cat_anchor}"><strong>{html.escape(cat_name)}</strong></a> ({total_items} articles)')
# if len(subgroups) > 1 or (len(subgroups) == 1 and len(subgroups[0].get("items", [])) > 1):
# parts.append('<ul>')
# for sg in subgroups:
# sg_title = sg.get("title", "Divers")
# sg_anchor = self._make_anchor(f"{cat_name}-{sg_title}")
# item_count = len(sg.get("items", []))
# parts.append(f'<li><a href="#{sg_anchor}">{html.escape(sg_title)}</a> ({item_count})</li>')
# parts.append('</ul>')
# parts.append('</li>')
# parts.append('</ul>')
# parts.append('<hr>')
# --- Build Content by Category ---
for cat in categories:
cat_name = cat.get("name", "Actualités")
cat_anchor = self._make_anchor(cat_name)
subgroups = cat.get("subgroups", [])
if not subgroups:
continue
# Category header with emoji
cat_emoji = {
"Actualités": "📰",
"Tests & Critiques": "",
"Aperçus & Previews": "👁️",
"Vidéos": "🎬",
"Autres": "📁"
}.get(cat_name, "📌")
parts.append(f'<h2 id="{cat_anchor}">{cat_emoji} {html.escape(cat_name)}</h2>')
for sg in subgroups:
sg_title = sg.get("title", "Divers")
sg_anchor = self._make_anchor(f"{cat_name}-{sg_title}")
items = sg.get("items", [])
if not items:
continue
# Sub-group header (only if more than 1 item in subgroup)
if len(items) > 1:
parts.append(f'<h3 id="{sg_anchor}">{html.escape(sg_title)}</h3>')
for post in items:
title = post.get("title", "") or ""
linkURL = post.get("link", "") or ""
parts.append(f'<hr><h3>{html.escape(title)}</h3>')
parts.append(f'<h4>{html.escape(title)}</h4>')
# --- YouTube embed / fallback
vid = post.get("yt_videoid") or extract_youtube_id(linkURL)
@@ -275,14 +660,15 @@ class GhostTask:
if "summary" in post and post["summary"]:
ftext = html.unescape(post["summary"])
ftext = re.sub("<[^<]+?>", "", ftext)
ftext = re.sub(r"Larticle .* est apparu en premier sur .*", "", ftext)
ftext = re.sub(r"L'article .* est apparu en premier sur .*", "", ftext)
if ftext:
parts.append(f"<p>{html.escape(ftext)}</p>")
if linkURL:
esc = html.escape(linkURL)
parts.append(f'<p><a href="{esc}">{esc}</a></p>')
# --- Images dans le contenu
# --- Images: first try RSS metadata, then crawl the page
found_image = False
for link in post.get("links", []) or []:
if link.get("type") in ("image/jpg", "image/jpeg", "image/png", "image/webp"):
imgUrl = link.get("href")
@@ -291,16 +677,43 @@ class GhostTask:
if not first_image:
first_image = imgUrl
parts.append(f'<figure><img src="{html.escape(imgUrl)}" loading="lazy"></figure>')
found_image = True
# If no image from RSS, try to extract from the article page
if not found_image and linkURL:
crawled_img = extract_image_from_url(linkURL, timeout=8)
if crawled_img:
if not first_image:
first_image = crawled_img
parts.append(f'<figure><img src="{html.escape(crawled_img)}" loading="lazy"></figure>')
parts.append('<hr>')
# --- Sources
parts.append("<hr><h3>Sources</h3>")
parts.append("<h3>📚 Sources</h3>")
for feed in feeds:
esc = html.escape(feed.url)
parts.append(f'<p><a href="{esc}">{esc}</a></p>')
parts.append('<p><em>Abonnez-vous pour recevoir chaque jour les news et soutenir mon travail.</em></p>')
parts.append('<p><em>Abonnez-vous pour recevoir chaque semaine les news et soutenir mon travail.</em></p>')
return "\n".join(parts), first_image
@staticmethod
def _make_anchor(text: str) -> str:
"""Convert text to a valid HTML anchor ID."""
# Remove accents and special chars, lowercase, replace spaces with dashes
anchor = text.lower()
anchor = re.sub(r'[àáâãäå]', 'a', anchor)
anchor = re.sub(r'[èéêë]', 'e', anchor)
anchor = re.sub(r'[ìíîï]', 'i', anchor)
anchor = re.sub(r'[òóôõö]', 'o', anchor)
anchor = re.sub(r'[ùúûü]', 'u', anchor)
anchor = re.sub(r'[ýÿ]', 'y', anchor)
anchor = re.sub(r'[ç]', 'c', anchor)
anchor = re.sub(r'[^a-z0-9\s-]', '', anchor)
anchor = re.sub(r'\s+', '-', anchor.strip())
return anchor or "section"
@staticmethod
def _format_duration(seconds: float) -> str:
seconds = int(seconds)
@@ -314,20 +727,32 @@ class GhostTask:
if seconds: parts.append(f"{seconds} seconds")
return ", ".join(parts) if parts else "0 seconds"
async def run_daily_at_6_05(self):
async def run_weekly_on_saturday(self):
"""Run every Saturday at 12:00 (noon)."""
while True:
now = dt.datetime.now()
next_run = (now + dt.timedelta(days=1)).replace(hour=6, minute=5, second=0, microsecond=0)
# Calculate next Saturday at 12:00
days_until_saturday = (5 - now.weekday()) % 7 # Saturday = 5
if days_until_saturday == 0 and now.hour >= 12:
days_until_saturday = 7 # Already past Saturday 12:00, wait for next week
next_run = (now + dt.timedelta(days=days_until_saturday)).replace(
hour=12, minute=0, second=0, microsecond=0
)
sleep_seconds = (next_run - now).total_seconds()
while sleep_seconds > 0:
LOG.info("Waiting for %s for next scan", self._format_duration(sleep_seconds))
LOG.info("Waiting for %s for next scan (Saturday noon)", self._format_duration(sleep_seconds))
await asyncio.sleep(min(sleep_seconds, 5 * 60))
now = dt.datetime.now()
sleep_seconds = (next_run - now).total_seconds()
LOG.info("Going to run the daily task")
await self.daily_task()
async def daily_task(self):
LOG.info("Going to run the weekly task")
await self.weekly_task()
async def weekly_task(self):
"""Main weekly task: collect, filter, group, and publish."""
# Log newsletters (debug)
try:
nls = self.ghost.get_newsletters()
@@ -335,8 +760,8 @@ class GhostTask:
except Exception as e:
LOG.warning("Unable to list newsletters: %s", e)
title_post = "Les news du " + self._fr_date_today()
LOG.info("Running daily task : %s", title_post)
title_post = "Les news de la semaine du " + self._fr_week_range()
LOG.info("Running weekly task : %s", title_post)
# (Re)charge les feeds
feeds_file = os.environ.get("FEEDS_FILE", "/data/feeds.txt")
@@ -349,8 +774,10 @@ class GhostTask:
feeds.append(RSSfeed(line, "youtube" in line.lower()))
self.feeds = feeds
# Fenêtre: depuis hier 06:00 UTC
yesterday_6am_utc = dt.datetime.now(dt.timezone.utc).replace(hour=6, minute=0, second=0, microsecond=0) - dt.timedelta(days=1)
# Fenêtre: depuis 7 jours à 06:00 UTC
week_ago_6am_utc = dt.datetime.now(dt.timezone.utc).replace(
hour=6, minute=0, second=0, microsecond=0
) - dt.timedelta(days=7)
all_news_posts: List[dict] = []
for feed in self.feeds:
@@ -360,14 +787,14 @@ class GhostTask:
continue
fp = feedparser.parse(content)
# Sélection des items récents
# Sélection des items de la semaine
new_entries = []
for e in fp.entries:
dte = self._entry_datetime(e)
if dte and dte > yesterday_6am_utc:
if dte and dte > week_ago_6am_utc:
new_entries.append(e)
# Filtrage ad-hoc
# Basic URL-based filtering (keep existing logic)
filtered = []
for e in new_entries:
linkURL = e.get("link", "") or ""
@@ -383,16 +810,55 @@ class GhostTask:
all_news_posts.extend(filtered)
if not all_news_posts:
LOG.warning("Aucun item récupéré (flux down ?). On n'envoie pas aujourd'hui.")
LOG.warning("Aucun item récupéré (flux down ?). On n'envoie pas cette semaine.")
return
random.shuffle(all_news_posts)
roundup_html, feature_image = self._build_html_roundup(all_news_posts, self.feeds)
LOG.info("Collected %d items from feeds", len(all_news_posts))
# Use Mistral AI for filtering and grouping if available
if self.mistral:
LOG.info("Using Mistral AI to filter non-news content...")
filtered_posts = self.mistral.filter_news_items(all_news_posts, dry_run=self.dry_run)
LOG.info("After filtering: %d items (removed %d)",
len(filtered_posts), len(all_news_posts) - len(filtered_posts))
if filtered_posts:
LOG.info("Using Mistral AI to group items by category...")
categories = self.mistral.group_similar_items(filtered_posts)
total_cats = len(categories)
total_subgroups = sum(len(cat.get("subgroups", [])) for cat in categories)
LOG.info("Created %d categories with %d sub-groups", total_cats, total_subgroups)
else:
categories = []
else:
LOG.warning("No Mistral API key configured, skipping AI filtering/grouping")
# Fallback: single category with all items
categories = [{
"name": "Actualités de la semaine",
"subgroups": [{"title": "Toutes les news", "items": all_news_posts}]
}]
if not categories or all(
len(sg.get("items", [])) == 0
for cat in categories
for sg in cat.get("subgroups", [])
):
LOG.warning("No news items after filtering. Skipping this week.")
return
roundup_html, feature_image = self._build_html_roundup_grouped(categories, self.feeds)
# 1) Create draft (with feature image if any)
created = self.ghost.create_post_html(title_post, roundup_html, status="draft", feature_image=feature_image)
LOG.info("Created draft post: %s (id: %s)", created.get("title"), created.get("id"))
# 2) Publish + send email (unless dry-run mode)
if self.dry_run:
LOG.info("DRY-RUN MODE: Post created as draft but NOT published. URL: %s",
created.get("url", "N/A"))
LOG.info("DRY-RUN MODE: Review the draft in Ghost admin, then publish manually if satisfied.")
return
# 2) Publish + send email
published = self.ghost.publish_post(
post_id=created["id"],
updated_at=created["updated_at"],
@@ -408,6 +874,8 @@ async def main():
parser = argparse.ArgumentParser()
parser.add_argument("--runonce", action="store_true", help="Run now and exit (no scheduler)")
parser.add_argument("--dry-run", action="store_true", dest="dry_run",
help="Run immediately, create draft but do NOT publish (for testing)")
args = parser.parse_args()
# Feeds init (list may be reloaded inside task)
@@ -424,26 +892,37 @@ async def main():
admin_url = os.environ["GHOST_ADMIN_URL"] # e.g. https://ghostadmin.zep.best/ghost/api/admin/
admin_key = os.environ["GHOST_ADMIN_KEY"] # integration_id:secret_hex
mistral_api_key = os.environ.get("MISTRAL_API_KEY") # Optional: for AI filtering/grouping
if not mistral_api_key:
LOG.warning("MISTRAL_API_KEY not set. AI filtering and grouping will be disabled.")
task = GhostTask(
feeds=feeds,
admin_url=admin_url,
admin_key=admin_key,
mistral_api_key=mistral_api_key,
newsletter_slug=os.environ.get("GHOST_NEWSLETTER_SLUG"),
email_segment=os.environ.get("GHOST_EMAIL_SEGMENT"),
dry_run=args.dry_run,
)
LOG.info("Starting bot")
LOG.info("Starting bot (weekly mode%s)", " - DRY RUN" if args.dry_run else "")
if args.runonce:
await task.daily_task()
await task.weekly_task()
return
# Démarrage: publier l'édition du jour si elle n'existe pas encore
await task.maybe_run_today()
if args.dry_run:
LOG.info("DRY-RUN: Running weekly task immediately (will create draft only)")
await task.weekly_task()
return
# Planification quotidienne à 06:05 Europe/Brussels (via heure locale du conteneur)
await task.run_daily_at_6_05()
# Démarrage: publier l'édition de la semaine si elle n'existe pas encore
await task.maybe_run_this_week()
# Planification hebdomadaire le samedi à 12:00 Europe/Brussels
await task.run_weekly_on_saturday()
if __name__ == "__main__":
asyncio.run(main())