import asyncio import argparse import datetime as dt import html import json import logging import os import re import time from logging.handlers import RotatingFileHandler from typing import Optional, List, Dict import feedparser import requests import jwt import zoneinfo # Python 3.9+ from urllib.parse import urlparse, parse_qs, urljoin # ------------- Web Crawler for Images ------------- def extract_image_from_url(url: str, timeout: int = 10) -> Optional[str]: """ Fetch a webpage and extract the best image (og:image, twitter:image, or first large image). Returns the image URL or None. """ try: resp = requests.get( url, timeout=timeout, headers={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Accept": "text/html,application/xhtml+xml", }, allow_redirects=True, ) resp.raise_for_status() html_content = resp.text # Try OpenGraph image first (most reliable) og_match = re.search(r']+property=["\']og:image["\'][^>]+content=["\']([^"\']+)["\']', html_content, re.IGNORECASE) if not og_match: og_match = re.search(r']+content=["\']([^"\']+)["\'][^>]+property=["\']og:image["\']', html_content, re.IGNORECASE) if og_match: img_url = og_match.group(1) return urljoin(url, img_url) # Try Twitter card image tw_match = re.search(r']+name=["\']twitter:image["\'][^>]+content=["\']([^"\']+)["\']', html_content, re.IGNORECASE) if not tw_match: tw_match = re.search(r']+content=["\']([^"\']+)["\'][^>]+name=["\']twitter:image["\']', html_content, re.IGNORECASE) if tw_match: img_url = tw_match.group(1) return urljoin(url, img_url) # Fallback: look for article/main image article_img = re.search(r']*>.*?]+src=["\']([^"\']+)["\']', html_content, re.IGNORECASE | re.DOTALL) if article_img: img_url = article_img.group(1) # Skip tiny images, icons, avatars if not any(skip in img_url.lower() for skip in ['avatar', 'icon', 'logo', 'emoji', '1x1', 'pixel']): return urljoin(url, img_url) return None except Exception as e: LOG.debug("Failed to extract image from %s: %s", url, e) return None # ------------- YouTube helpers ------------- def fetch_youtube_oembed_html(youtube_url: str, timeout: int = 10) -> Optional[str]: """ Get YouTube oEmbed HTML exactly as provided and wrap it as a Ghost embed card. """ try: resp = requests.get( "https://www.youtube.com/oembed", params={"url": youtube_url, "format": "json"}, headers={"User-Agent": "ghost-bot/1.0"}, timeout=timeout, ) resp.raise_for_status() data = resp.json() html_content = data.get("html") if not html_content: return None # Wrap in Ghost embed card container; do NOT alter the iframe attributes. return f'
{html_content}
' except Exception: return None def youtube_thumbnail_url(video_id: str) -> str: return f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg" def extract_youtube_id(url: str) -> Optional[str]: try: u = urlparse(url) host = u.netloc.lower() if host.endswith("youtube.com"): if u.path == "/watch": return parse_qs(u.query).get("v", [None])[0] m = re.match(r"^/(shorts/|live/)?([A-Za-z0-9_-]{6,})", u.path) if m: return m.group(2) if host == "youtu.be": slug = u.path.strip("/").split("/")[0] return slug or None except Exception: return None return None # ------------- Logging ------------- LOG = logging.getLogger("bot") LOG_PATTERN = logging.Formatter("%(asctime)s:%(levelname)s: [%(filename)s] %(message)s") def setuplogger(): stream_handler = logging.StreamHandler() stream_handler.setFormatter(LOG_PATTERN) stream_handler.setLevel(logging.DEBUG) file_handler = RotatingFileHandler("bot.log", "a", 1_000_000, 1) file_handler.setFormatter(LOG_PATTERN) LOG.setLevel(logging.DEBUG) LOG.addHandler(stream_handler) LOG.addHandler(file_handler) # ------------- Model ------------- class RSSfeed: def __init__(self, url: str, yt: bool = False): self.url = url self.youtube = yt # ------------- Mistral AI Client ------------- class MistralClient: """Client for Mistral AI API to filter and group news items.""" def __init__(self, api_key: str, model: str = "mistral-small-latest"): self.api_key = api_key self.model = model self.base_url = "https://api.mistral.ai/v1/chat/completions" def _call_api(self, messages: List[Dict], temperature: float = 0.3) -> Optional[str]: """Make a call to the Mistral API.""" headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json", } payload = { "model": self.model, "messages": messages, "temperature": temperature, "response_format": {"type": "json_object"}, } try: resp = requests.post(self.base_url, headers=headers, json=payload, timeout=120) resp.raise_for_status() return resp.json()["choices"][0]["message"]["content"] except Exception as e: LOG.error("Mistral API error: %s", e) return None def filter_news_items(self, items: List[dict], dry_run: bool = False) -> List[dict]: """ Filter out non-news items (tips, walkthroughs, guides, tutorials). Returns only actual game news items. """ if not items: return [] # Prepare items for analysis items_for_analysis = [] for i, item in enumerate(items): items_for_analysis.append({ "id": i, "title": item.get("title", ""), "link": item.get("link", ""), "summary": (item.get("summary", "") or "")[:300], # Truncate for API }) # Split into batches to avoid token limits batch_size = 50 filtered_ids = set() for batch_start in range(0, len(items_for_analysis), batch_size): batch = items_for_analysis[batch_start:batch_start + batch_size] prompt = f"""Tu analyses des articles de sites de jeux vidéo. Tu dois identifier UNIQUEMENT les articles à EXCLURE. EXCLURE UNIQUEMENT si le titre contient EXPLICITEMENT UN de ces mots-clés: - "guide" (le mot exact) - "soluce" (le mot exact) - "astuce" (le mot exact) - "solution" (le mot exact, pas "résolution") - "code promo" - "bon plan" - "-20%" ou "-30%" etc (réductions) - "tuto" ou "tutoriel" - "comment faire" - "how to" NE JAMAIS EXCLURE: - "Early Access" = news de sortie anticipée, À GARDER - "Test" ou "Review" = critique, À GARDER - "Partie Rapide" = émission/podcast, À GARDER - Tout article de news, annonce, sortie, preview - Tout article d'opinion, éditorial, récap - Tout le reste qui ne contient pas les mots-clés d'exclusion ci-dessus Articles à analyser: {json.dumps(batch, ensure_ascii=False, indent=2)} Retourne un JSON avec "exclude_ids" contenant UNIQUEMENT les IDs des articles guides/soluces/promos. Si aucun article ne correspond aux critères d'exclusion, retourne {{"exclude_ids": []}} Sois TRÈS conservateur - en cas de doute, NE PAS exclure.""" messages = [{"role": "user", "content": prompt}] response = self._call_api(messages) if response: try: result = json.loads(response) excluded_ids = set(result.get("exclude_ids", [])) # Keep all items NOT in excluded_ids batch_ids = {item["id"] for item in batch} kept_ids = batch_ids - excluded_ids filtered_ids.update(kept_ids) except json.JSONDecodeError: LOG.warning("Failed to parse Mistral response for filtering") # Fallback: include all items from this batch filtered_ids.update(item["id"] for item in batch) else: # Fallback: include all items from this batch filtered_ids.update(item["id"] for item in batch) # Log filtered out items in dry-run mode if dry_run: excluded_ids = set(range(len(items))) - filtered_ids if excluded_ids: LOG.debug("=== FILTERED OUT (non-news) ===") for i in sorted(excluded_ids): LOG.debug(" [EXCLUDED] %s", items[i].get("title", "No title")) LOG.debug("=== KEPT (news) ===") for i in sorted(filtered_ids): if i < len(items): LOG.debug(" [KEPT] %s", items[i].get("title", "No title")) return [items[i] for i in sorted(filtered_ids) if i < len(items)] def group_similar_items(self, items: List[dict]) -> List[Dict]: """ Group news items by category (News, Tests/Reviews, Previews, etc.) with sub-groups by game/topic within each category. Returns a list of categories, each with sub-groups containing items. """ if not items: return [] # Prepare items for analysis items_for_analysis = [] for i, item in enumerate(items): items_for_analysis.append({ "id": i, "title": item.get("title", ""), "link": item.get("link", ""), }) prompt = f"""Organise ces articles de jeux vidéo en CATÉGORIES et SOUS-GROUPES. Articles à organiser: {json.dumps(items_for_analysis, ensure_ascii=False, indent=2)} CATÉGORIES (utilise ces noms exacts): 1. "Actualités" - News, annonces, sorties, mises à jour, industrie 2. "Tests & Critiques" - Reviews, tests, avis, notes 3. "Aperçus & Previews" - Previews, impressions, démos, hands-on 4. "Vidéos" - Trailers, gameplay vidéos, podcasts 5. "Autres" - Le reste RÈGLES DE GROUPEMENT (TRÈS IMPORTANT): - Groupe par FRANCHISE ou SÉRIE (ex: tous les "Final Fantasy" ensemble, même FF7, FF16, FF XIV) - Groupe par UNIVERS (ex: "Warhammer 40K" = Space Marine + Dawn of War + Darktide) - Groupe par ÉVÉNEMENT (ex: "Nintendo Direct", "State of Play", "Game Awards") - N'utilise JAMAIS de noms de sites web comme groupes (pas "NoFrag", "JeuxOnline", etc.) EXEMPLES DE GROUPEMENTS CORRECTS: - "Final Fantasy VII Rebirth sur Switch 2" + "Final Fantasy VII Remake Intergrade en tête" → groupe "Final Fantasy" - "Techmarine dans Space Marine 2" + "Dawn of War 4 gameplay Ork" → groupe "Warhammer 40K" - "GTA 6 trailer" + "GTA 6 date de sortie" → groupe "GTA 6" - "Nintendo Direct annoncé" + "Zelda dans le Nintendo Direct" → groupe "Nintendo Direct" Retourne ce JSON: {{ "categories": [ {{ "name": "Actualités", "subgroups": [ {{"title": "Final Fantasy", "item_ids": [0, 3, 7]}}, {{"title": "Warhammer 40K", "item_ids": [1, 2]}}, {{"title": "Steam", "item_ids": [5]}} ] }} ] }} IMPORTANT: Chaque article dans UN SEUL sous-groupe. Titre = nom de franchise/série/univers, PAS nom de site.""" messages = [{"role": "user", "content": prompt}] response = self._call_api(messages, temperature=0.2) if response: try: result = json.loads(response) categories = [] used_ids = set() for cat_data in result.get("categories", []): cat_name = cat_data.get("name", "Autres") subgroups = [] for sg_data in cat_data.get("subgroups", []): sg_title = sg_data.get("title", "Divers") item_ids = sg_data.get("item_ids", []) # Filter to valid, unused IDs valid_ids = [i for i in item_ids if i < len(items) and i not in used_ids] if valid_ids: used_ids.update(valid_ids) subgroups.append({ "title": sg_title, "items": [items[i] for i in valid_ids] }) if subgroups: categories.append({ "name": cat_name, "subgroups": subgroups }) # Add any ungrouped items ungrouped = [items[i] for i in range(len(items)) if i not in used_ids] if ungrouped: # Find or create "Autres" category autres_cat = next((c for c in categories if c["name"] == "Autres"), None) if autres_cat: autres_cat["subgroups"].append({"title": "Divers", "items": ungrouped}) else: categories.append({ "name": "Autres", "subgroups": [{"title": "Divers", "items": ungrouped}] }) return categories except json.JSONDecodeError: LOG.warning("Failed to parse Mistral response for grouping") # Fallback: return all items in a single category/subgroup return [{ "name": "Actualités de la semaine", "subgroups": [{"title": "Toutes les news", "items": items}] }] # ------------- Ghost Admin API client ------------- class GhostAdmin: def __init__(self, admin_url: str, admin_key: str, accept_version: str = "v6.0"): self.base = admin_url.rstrip("/") + "/" self.key_id, self.key_secret_hex = admin_key.split(":") self.accept_version = accept_version def _jwt(self) -> str: iat = int(time.time()) payload = {"iat": iat, "exp": iat + 5 * 60, "aud": "/admin/"} headers = {"alg": "HS256", "typ": "JWT", "kid": self.key_id} token = jwt.encode(payload, bytes.fromhex(self.key_secret_hex), algorithm="HS256", headers=headers) return token if isinstance(token, str) else token.decode("utf-8") def _headers(self): return { "Authorization": f"Ghost {self._jwt()}", "Accept-Version": self.accept_version, "Content-Type": "application/json", } def latest_published_date(self, tz_name: str = "Europe/Brussels"): """ Date (aware) de la dernière publication (status=published), ou None. """ url = self.base + "posts/?limit=1&order=published_at%20desc&fields=published_at" resp = requests.get(url, headers=self._headers(), timeout=20) resp.raise_for_status() posts = resp.json().get("posts", []) if not posts or not posts[0].get("published_at"): return None # ISO 8601 → aware UTC → converti tz locale dtu = dt.datetime.fromisoformat(posts[0]["published_at"].replace("Z", "+00:00")) return dtu.astimezone(zoneinfo.ZoneInfo(tz_name)) def get_newsletters(self): url = self.base + "newsletters/" resp = requests.get(url, headers=self._headers(), timeout=20) if resp.status_code >= 400: raise RuntimeError(f"Ghost newsletters error {resp.status_code}: {resp.text}") return resp.json().get("newsletters", []) def pick_newsletter_slug(self, preferred_slug: Optional[str]) -> str: if preferred_slug: return preferred_slug nls = self.get_newsletters() if not nls: raise RuntimeError("No newsletters configured in Ghost (Settings → Newsletters).") actives = [n for n in nls if n.get("status") == "active"] for n in actives: if n.get("is_default"): return n.get("slug") return (actives or nls)[0].get("slug") def create_post_html(self, title: str, html_content: str, status: str = "draft", feature_image: Optional[str] = None): url = self.base + "posts/?source=html" post = {"title": title, "html": html_content, "status": status} if feature_image: post["feature_image"] = feature_image resp = requests.post(url, headers=self._headers(), json={"posts": [post]}, timeout=30) if resp.status_code >= 400: raise RuntimeError(f"Ghost create error {resp.status_code}: {resp.text}") return resp.json()["posts"][0] def publish_post(self, post_id: str, updated_at: str, newsletter_slug: Optional[str], email_segment: Optional[str]): slug = self.pick_newsletter_slug(newsletter_slug) params = [f"newsletter={requests.utils.quote(slug)}"] if email_segment: params.append(f"email_segment={requests.utils.quote(email_segment)}") url = self.base + f"posts/{post_id}/?{'&'.join(params)}" body = {"posts": [{"updated_at": updated_at, "status": "published"}]} resp = requests.put(url, headers=self._headers(), json=body, timeout=30) if resp.status_code >= 400: raise RuntimeError(f"Ghost publish error {resp.status_code}: {resp.text}") return resp.json()["posts"][0] # ------------- Task orchestration ------------- class GhostTask: def __init__(self, feeds: List[RSSfeed], admin_url: str, admin_key: str, mistral_api_key: Optional[str] = None, newsletter_slug: Optional[str] = None, email_segment: Optional[str] = None, dry_run: bool = False): self.ghost = GhostAdmin(admin_url, admin_key) self.feeds = feeds self.newsletter_slug = newsletter_slug self.email_segment = email_segment self.mistral = MistralClient(mistral_api_key) if mistral_api_key else None self.dry_run = dry_run for feed in self.feeds: LOG.info("Adding feed %s", feed.url) # --- startup immediate run if not yet published this week def _published_this_week(self) -> bool: """Check if we already published this week (since last Saturday 12:00).""" tz = zoneinfo.ZoneInfo("Europe/Brussels") last = self.ghost.latest_published_date("Europe/Brussels") if not last: return False now = dt.datetime.now(tz) # Find last Saturday at 12:00 days_since_saturday = (now.weekday() - 5) % 7 # Saturday = 5 last_saturday = (now - dt.timedelta(days=days_since_saturday)).replace( hour=12, minute=0, second=0, microsecond=0 ) return last >= last_saturday async def maybe_run_this_week(self): if not self._published_this_week(): LOG.info("Aucune newsletter publiée cette semaine -> génération immédiate.") await self.weekly_task() else: LOG.info("Déjà publié cette semaine, on attend la prochaine fenêtre.") # --- utils @staticmethod def _fr_week_range() -> str: """Returns a French formatted date range for the past week.""" months = { 'January': 'Janvier', 'February': 'Février', 'March': 'Mars', 'April': 'Avril', 'May': 'Mai', 'June': 'Juin', 'July': 'Juillet', 'August': 'Août', 'September': 'Septembre', 'October': 'Octobre', 'November': 'Novembre', 'December': 'Décembre' } today = dt.datetime.now() week_ago = today - dt.timedelta(days=7) # Format: "24 - 31 Janvier 2025" or "28 Janvier - 4 Février 2025" if week_ago.month == today.month: formatted = f"{week_ago.day} - {today.strftime('%d %B %Y')}" else: formatted = f"{week_ago.strftime('%d %B')} - {today.strftime('%d %B %Y')}" for en, fr in months.items(): formatted = formatted.replace(en, fr) return formatted @staticmethod def _safe_get(url: str, timeout: int = 20) -> Optional[bytes]: try: r = requests.get(url, timeout=timeout, headers={"User-Agent": "ghost-bot/1.0"}) r.raise_for_status() return r.content except Exception as e: LOG.warning("Flux indisponible: %s (%s)", url, e) return None @staticmethod def _entry_datetime(entry) -> Optional[dt.datetime]: """ Tente de récupérer une datetime aware (UTC) pour un item feedparser. """ # Try common fields first if getattr(entry, "published", None): try: # YouTube (ISO) e.g. 2025-09-05T10:20:33+00:00 return dt.datetime.fromisoformat(entry.published.replace("Z", "+00:00")).astimezone(dt.timezone.utc) except Exception: pass try: # RFC822 e.g. Fri, 05 Sep 2025 10:20:33 +0000 return dt.datetime.strptime(entry.published.replace('GMT', '+0000'), '%a, %d %b %Y %H:%M:%S %z').astimezone(dt.timezone.utc) except Exception: pass if getattr(entry, "updated_parsed", None): try: return dt.datetime.fromtimestamp(time.mktime(entry.updated_parsed), tz=dt.timezone.utc) except Exception: pass return None # --- HTML builder for grouped content def _build_html_roundup_grouped(self, categories: List[Dict], feeds: List[RSSfeed]): """ Construit le HTML avec des catégories et sous-groupes thématiques. Inclut un résumé et une table des matières en haut. Retourne (html, feature_image_url_ou_None). """ parts: List[str] = [] first_image: Optional[str] = None # --- Build Summary Section --- parts.append('

✨ En bref cette semaine

') parts.append('
    ') for cat in categories: cat_name = cat.get("name", "Actualités") subgroups = cat.get("subgroups", []) # Get top subgroups with more than 1 item (by item count) for summary multi_item_subgroups = [sg for sg in subgroups if len(sg.get("items", [])) > 1] sorted_subgroups = sorted(multi_item_subgroups, key=lambda sg: len(sg.get("items", [])), reverse=True) top_subgroups = sorted_subgroups[:5] # Max 5 highlights per category total_items = sum(len(sg.get("items", [])) for sg in subgroups) if top_subgroups: highlights = ", ".join(sg.get("title", "Divers") for sg in top_subgroups) total_items = sum(len(sg.get("items", [])) for sg in subgroups) parts.append(f'
  • {html.escape(cat_name)}: {html.escape(highlights)} ({total_items} articles)
  • ') elif total_items > 0: parts.append(f'
  • {html.escape(cat_name)}: {total_items} articles
  • ') parts.append('
') parts.append('
') # --- Build Table of Contents --- # parts.append('

📋 Sommaire

') # parts.append('
    ') # for cat in categories: # cat_name = cat.get("name", "Actualités") # cat_anchor = self._make_anchor(cat_name) # subgroups = cat.get("subgroups", []) # total_items = sum(len(sg.get("items", [])) for sg in subgroups) # parts.append(f'
  • {html.escape(cat_name)} ({total_items} articles)') # if len(subgroups) > 1 or (len(subgroups) == 1 and len(subgroups[0].get("items", [])) > 1): # parts.append('
      ') # for sg in subgroups: # sg_title = sg.get("title", "Divers") # sg_anchor = self._make_anchor(f"{cat_name}-{sg_title}") # item_count = len(sg.get("items", [])) # parts.append(f'
    • {html.escape(sg_title)} ({item_count})
    • ') # parts.append('
    ') # parts.append('
  • ') # parts.append('
') # parts.append('
') # --- Build Content by Category --- for cat in categories: cat_name = cat.get("name", "Actualités") cat_anchor = self._make_anchor(cat_name) subgroups = cat.get("subgroups", []) if not subgroups: continue # Category header with emoji cat_emoji = { "Actualités": "📰", "Tests & Critiques": "⭐", "Aperçus & Previews": "👁️", "Vidéos": "🎬", "Autres": "📁" }.get(cat_name, "📌") parts.append(f'

{cat_emoji} {html.escape(cat_name)}

') for sg in subgroups: sg_title = sg.get("title", "Divers") sg_anchor = self._make_anchor(f"{cat_name}-{sg_title}") items = sg.get("items", []) if not items: continue # Sub-group header (only if more than 1 item in subgroup) if len(items) > 1: parts.append(f'

{html.escape(sg_title)}

') for post in items: title = post.get("title", "") or "" linkURL = post.get("link", "") or "" parts.append(f'

{html.escape(title)}

') # --- YouTube embed / fallback vid = post.get("yt_videoid") or extract_youtube_id(linkURL) if vid: watch_url = f"https://www.youtube.com/watch?v={vid}" # Try provider HTML via oEmbed (as Ghost does) embed_html = fetch_youtube_oembed_html(watch_url, timeout=10) if embed_html: parts.append(embed_html) else: # Fallback: leave the plain URL on its own line so Ghost may still auto-embed parts.append(f'\n

{watch_url}

\n') # Minimal fallback link (non-intrusive for email/web) parts.append(f'

Voir sur YouTube

') else: # --- Texte + lien ftext = "" if "summary" in post and post["summary"]: ftext = html.unescape(post["summary"]) ftext = re.sub("<[^<]+?>", "", ftext) ftext = re.sub(r"L'article .* est apparu en premier sur .*", "", ftext) if ftext: parts.append(f"

{html.escape(ftext)}

") if linkURL: esc = html.escape(linkURL) parts.append(f'

{esc}

') # --- Images: first try RSS metadata, then crawl the page found_image = False for link in post.get("links", []) or []: if link.get("type") in ("image/jpg", "image/jpeg", "image/png", "image/webp"): imgUrl = link.get("href") if imgUrl: imgUrl = imgUrl.replace("/250x250/", "/990x320/") if not first_image: first_image = imgUrl parts.append(f'
') found_image = True # If no image from RSS, try to extract from the article page if not found_image and linkURL: crawled_img = extract_image_from_url(linkURL, timeout=8) if crawled_img: if not first_image: first_image = crawled_img parts.append(f'
') parts.append('
') # --- Sources parts.append("

📚 Sources

") for feed in feeds: esc = html.escape(feed.url) parts.append(f'

{esc}

') parts.append('

Abonnez-vous pour recevoir chaque semaine les news et soutenir mon travail.

') return "\n".join(parts), first_image @staticmethod def _make_anchor(text: str) -> str: """Convert text to a valid HTML anchor ID.""" # Remove accents and special chars, lowercase, replace spaces with dashes anchor = text.lower() anchor = re.sub(r'[àáâãäå]', 'a', anchor) anchor = re.sub(r'[èéêë]', 'e', anchor) anchor = re.sub(r'[ìíîï]', 'i', anchor) anchor = re.sub(r'[òóôõö]', 'o', anchor) anchor = re.sub(r'[ùúûü]', 'u', anchor) anchor = re.sub(r'[ýÿ]', 'y', anchor) anchor = re.sub(r'[ç]', 'c', anchor) anchor = re.sub(r'[^a-z0-9\s-]', '', anchor) anchor = re.sub(r'\s+', '-', anchor.strip()) return anchor or "section" @staticmethod def _format_duration(seconds: float) -> str: seconds = int(seconds) days, seconds = divmod(seconds, 86400) hours, seconds = divmod(seconds, 3600) minutes, seconds = divmod(seconds, 60) parts = [] if days: parts.append(f"{days} days") if hours: parts.append(f"{hours} hours") if minutes: parts.append(f"{minutes} minutes") if seconds: parts.append(f"{seconds} seconds") return ", ".join(parts) if parts else "0 seconds" async def run_weekly_on_saturday(self): """Run every Saturday at 12:00 (noon).""" while True: now = dt.datetime.now() # Calculate next Saturday at 12:00 days_until_saturday = (5 - now.weekday()) % 7 # Saturday = 5 if days_until_saturday == 0 and now.hour >= 12: days_until_saturday = 7 # Already past Saturday 12:00, wait for next week next_run = (now + dt.timedelta(days=days_until_saturday)).replace( hour=12, minute=0, second=0, microsecond=0 ) sleep_seconds = (next_run - now).total_seconds() while sleep_seconds > 0: LOG.info("Waiting for %s for next scan (Saturday noon)", self._format_duration(sleep_seconds)) await asyncio.sleep(min(sleep_seconds, 5 * 60)) now = dt.datetime.now() sleep_seconds = (next_run - now).total_seconds() LOG.info("Going to run the weekly task") await self.weekly_task() async def weekly_task(self): """Main weekly task: collect, filter, group, and publish.""" # Log newsletters (debug) try: nls = self.ghost.get_newsletters() LOG.info("Newsletters: %s", ", ".join(f"{n.get('name')}[{n.get('slug')}]" for n in nls)) except Exception as e: LOG.warning("Unable to list newsletters: %s", e) title_post = "Les news de la semaine du " + self._fr_week_range() LOG.info("Running weekly task : %s", title_post) # (Re)charge les feeds feeds_file = os.environ.get("FEEDS_FILE", "/data/feeds.txt") if not os.path.isfile(feeds_file): feeds_file = os.environ.get("FEEDS_FILE_FALLBACK", r"f:\workspace\Substack_JV\feeds.txt") feeds: List[RSSfeed] = [] with open(feeds_file, encoding="utf-8") as f: lines = [line.strip() for line in f if line.strip()] for line in lines: feeds.append(RSSfeed(line, "youtube" in line.lower())) self.feeds = feeds # Fenêtre: depuis 7 jours à 06:00 UTC week_ago_6am_utc = dt.datetime.now(dt.timezone.utc).replace( hour=6, minute=0, second=0, microsecond=0 ) - dt.timedelta(days=7) all_news_posts: List[dict] = [] for feed in self.feeds: LOG.info("Scanning feed %s", feed.url) content = self._safe_get(feed.url, timeout=30) if not content: continue fp = feedparser.parse(content) # Sélection des items de la semaine new_entries = [] for e in fp.entries: dte = self._entry_datetime(e) if dte and dte > week_ago_6am_utc: new_entries.append(e) # Basic URL-based filtering (keep existing logic) filtered = [] for e in new_entries: linkURL = e.get("link", "") or "" if "actugaming" in linkURL and ("puzzle-" in linkURL or "guide-" in linkURL): continue # enrich YouTube id if applicable if feed.youtube and linkURL: vid = extract_youtube_id(linkURL) if vid: e["yt_videoid"] = vid filtered.append(e) all_news_posts.extend(filtered) if not all_news_posts: LOG.warning("Aucun item récupéré (flux down ?). On n'envoie pas cette semaine.") return LOG.info("Collected %d items from feeds", len(all_news_posts)) # Use Mistral AI for filtering and grouping if available if self.mistral: LOG.info("Using Mistral AI to filter non-news content...") filtered_posts = self.mistral.filter_news_items(all_news_posts, dry_run=self.dry_run) LOG.info("After filtering: %d items (removed %d)", len(filtered_posts), len(all_news_posts) - len(filtered_posts)) if filtered_posts: LOG.info("Using Mistral AI to group items by category...") categories = self.mistral.group_similar_items(filtered_posts) total_cats = len(categories) total_subgroups = sum(len(cat.get("subgroups", [])) for cat in categories) LOG.info("Created %d categories with %d sub-groups", total_cats, total_subgroups) else: categories = [] else: LOG.warning("No Mistral API key configured, skipping AI filtering/grouping") # Fallback: single category with all items categories = [{ "name": "Actualités de la semaine", "subgroups": [{"title": "Toutes les news", "items": all_news_posts}] }] if not categories or all( len(sg.get("items", [])) == 0 for cat in categories for sg in cat.get("subgroups", []) ): LOG.warning("No news items after filtering. Skipping this week.") return roundup_html, feature_image = self._build_html_roundup_grouped(categories, self.feeds) # 1) Create draft (with feature image if any) created = self.ghost.create_post_html(title_post, roundup_html, status="draft", feature_image=feature_image) LOG.info("Created draft post: %s (id: %s)", created.get("title"), created.get("id")) # 2) Publish + send email (unless dry-run mode) if self.dry_run: LOG.info("DRY-RUN MODE: Post created as draft but NOT published. URL: %s", created.get("url", "N/A")) LOG.info("DRY-RUN MODE: Review the draft in Ghost admin, then publish manually if satisfied.") return published = self.ghost.publish_post( post_id=created["id"], updated_at=created["updated_at"], newsletter_slug=os.environ.get("GHOST_NEWSLETTER_SLUG"), email_segment=os.environ.get("GHOST_EMAIL_SEGMENT"), ) LOG.info("Published post: %s (emailed via newsletter)", published.get("url")) # ------------- main ------------- async def main(): setuplogger() parser = argparse.ArgumentParser() parser.add_argument("--runonce", action="store_true", help="Run now and exit (no scheduler)") parser.add_argument("--dry-run", action="store_true", dest="dry_run", help="Run immediately, create draft but do NOT publish (for testing)") args = parser.parse_args() # Feeds init (list may be reloaded inside task) feeds: List[RSSfeed] = [] feeds_file = os.environ.get("FEEDS_FILE", "/data/feeds.txt") if not os.path.isfile(feeds_file): feeds_file = os.environ.get("FEEDS_FILE_FALLBACK", r"f:\workspace\Substack_JV\feeds.txt") with open(feeds_file, encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue feeds.append(RSSfeed(line, "youtube" in line.lower())) admin_url = os.environ["GHOST_ADMIN_URL"] # e.g. https://ghostadmin.zep.best/ghost/api/admin/ admin_key = os.environ["GHOST_ADMIN_KEY"] # integration_id:secret_hex mistral_api_key = os.environ.get("MISTRAL_API_KEY") # Optional: for AI filtering/grouping if not mistral_api_key: LOG.warning("MISTRAL_API_KEY not set. AI filtering and grouping will be disabled.") task = GhostTask( feeds=feeds, admin_url=admin_url, admin_key=admin_key, mistral_api_key=mistral_api_key, newsletter_slug=os.environ.get("GHOST_NEWSLETTER_SLUG"), email_segment=os.environ.get("GHOST_EMAIL_SEGMENT"), dry_run=args.dry_run, ) LOG.info("Starting bot (weekly mode%s)", " - DRY RUN" if args.dry_run else "") if args.runonce: await task.weekly_task() return if args.dry_run: LOG.info("DRY-RUN: Running weekly task immediately (will create draft only)") await task.weekly_task() return # Démarrage: publier l'édition de la semaine si elle n'existe pas encore await task.maybe_run_this_week() # Planification hebdomadaire le samedi à 12:00 Europe/Brussels await task.run_weekly_on_saturday() if __name__ == "__main__": asyncio.run(main())