# weekly_games_roundup.py # -*- coding: utf-8 -*- import asyncio import argparse import dataclasses import html import json import logging import os import random import re import time from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple from urllib.parse import urlparse, parse_qs import aiohttp import jwt # PyJWT import requests from bs4 import BeautifulSoup from logging.handlers import RotatingFileHandler from datetime import datetime, timedelta, timezone import zoneinfo from storage import Storage from keys import xgp_key try: from playwright.async_api import async_playwright except Exception: async_playwright = None LOG = logging.getLogger("bot_weekly") LOG_PATTERN = logging.Formatter("%(asctime)s:%(levelname)s: [%(filename)s] %(message)s") PRIME_HOME = "https://gaming.amazon.com/home" CARD_COLUMNS = 2 CARD_IMG_H = 180 # même hauteur partout CARD_TITLE_EM = 3 # ~2 lignes de texte mini def setuplogger(): stream_handler = logging.StreamHandler() stream_handler.setFormatter(LOG_PATTERN) stream_handler.setLevel(logging.DEBUG) file_handler = RotatingFileHandler("bot_weekly.log", "a", 1_000_000, 1) file_handler.setFormatter(LOG_PATTERN) LOG.setLevel(logging.DEBUG) LOG.addHandler(stream_handler) LOG.addHandler(file_handler) TZ = zoneinfo.ZoneInfo("Europe/Brussels") UA = {"User-Agent": "Mozilla/5.0 (compatible; weekly-games-roundup/1.0)"} # -------------------- Ghost Admin client -------------------- class GhostAdmin: def __init__(self, admin_url: str, admin_key: str, accept_version: str = "v6.0"): self.base = admin_url.rstrip("/") + "/" self.key_id, self.key_secret_hex = admin_key.split(":") self.accept_version = accept_version def _jwt(self) -> str: iat = int(time.time()) payload = {"iat": iat, "exp": iat + 5 * 60, "aud": "/admin/"} headers = {"alg": "HS256", "typ": "JWT", "kid": self.key_id} token = jwt.encode(payload, bytes.fromhex(self.key_secret_hex), algorithm="HS256", headers=headers) return token if isinstance(token, str) else token.decode("utf-8") def _headers(self): return {"Authorization": f"Ghost {self._jwt()}", "Accept-Version": self.accept_version, "Content-Type": "application/json"} def pick_newsletter_slug(self, preferred_slug: Optional[str]) -> str: if preferred_slug: return preferred_slug url = self.base + "newsletters/" resp = requests.get(url, headers=self._headers(), timeout=20) resp.raise_for_status() newsletters = resp.json().get("newsletters", []) if not newsletters: raise RuntimeError("No newsletters configured in Ghost.") actives = [n for n in newsletters if n.get("status") == "active"] for n in actives: if n.get("is_default"): return n["slug"] return (actives or newsletters)[0]["slug"] def create_post_html(self, title: str, html_content: str, status: str = "draft", feature_image: Optional[str] = None) -> Dict[str, Any]: url = self.base + "posts/?source=html" body = {"posts": [{"title": title, "html": html_content, "status": status, **({"feature_image": feature_image} if feature_image else {})}]} resp = requests.post(url, headers=self._headers(), json=body, timeout=30) if resp.status_code >= 400: raise RuntimeError(f"Ghost create error {resp.status_code}: {resp.text}") return resp.json()["posts"][0] def publish_post(self, post_id: str, updated_at: str, newsletter_slug: Optional[str], email_segment: Optional[str] = None) -> Dict[str, Any]: slug = self.pick_newsletter_slug(newsletter_slug) params = [f"newsletter={requests.utils.quote(slug)}"] if email_segment: params.append(f"email_segment={requests.utils.quote(email_segment)}") url = self.base + f"posts/{post_id}/?{'&'.join(params)}" body = {"posts": [{"updated_at": updated_at, "status": "published"}]} resp = requests.put(url, headers=self._headers(), json=body, timeout=30) if resp.status_code >= 400: raise RuntimeError(f"Ghost publish error {resp.status_code}: {resp.text}") return resp.json()["posts"][0] # -------------------- Helpers (URLs, images, dates) -------------------- def _sanitize_url(u: Optional[str]) -> Optional[str]: if not u or not isinstance(u, str): return None u = u.strip().replace("\\", "/") if not u: return None if u.startswith("//"): u = "https:" + u if " " in u: u = u.replace(" ", "%20") p = urlparse(u) if p.scheme not in ("http", "https") or not p.netloc: return None return u def _fmt_dt(iso_dt: Optional[str]) -> Optional[str]: if not iso_dt: return None try: dt = datetime.fromisoformat(iso_dt.replace("Z", "+00:00")).astimezone(timezone.utc) return dt.strftime("%Y-%m-%d %H:%M UTC") except Exception: return iso_dt def _egs_is_free_now(item: Dict[str, Any], now: Optional[datetime] = None) -> bool: """Vrai si l'offre est GRATUITE en ce moment (pas juste en promo payante).""" now = now or datetime.now(timezone.utc) # 1) Prix courant = 0 ? price = (item.get("price") or {}).get("totalPrice") or {} try: # certains champs sont des int, d'autres des str discount_price = int(str(price.get("discountPrice") or 0)) if discount_price == 0: # si Epic renvoie 0, c'est bien 'Free Now' return True except Exception: pass # 2) Promo 100% active dans la fenêtre de dates ? promos = (item.get("promotions") or {}) for group_key in ("promotionalOffers", "upcomingPromotionalOffers"): for group in promos.get(group_key) or []: for p in group.get("promotionalOffers") or []: try: start = datetime.fromisoformat(p.get("startDate").replace("Z", "+00:00")) end = datetime.fromisoformat(p.get("endDate").replace("Z", "+00:00")) except Exception: start = end = None discount = ((p.get("discountSetting") or {}).get("discountPercentage")) # Plusieurs backends EGS renvoient 0 pour 'gratuit', d'autres 100. # On accepte les deux mais on exige que la fenêtre englobe 'now'. if discount in (0, 100): if (start is None or start <= now) and (end is None or now <= end): return True return False def _in_last_7_days(start_iso: Optional[str], end_iso: Optional[str]) -> bool: now = datetime.now(timezone.utc) window_start = now - timedelta(days=7) try: if start_iso: s = datetime.fromisoformat(start_iso.replace("Z", "+00:00")) else: s = None if end_iso: e = datetime.fromisoformat(end_iso.replace("Z", "+00:00")) else: e = None except Exception: return False # Overlap test: [s,e] intersects [now-7d, now] s = s or now e = e or now return (s <= now) and (e >= window_start) or (s >= window_start and s <= now) # Prime gaming def _prime_key(it: Dict[str, Any]) -> str: # priorité à la clé GraphQL si dispo, sinon fallback stable sur (title|url) k = it.get("key") if isinstance(k, str) and k.strip(): return k t = (it.get("title") or "").strip().lower() u = (it.get("url") or "").strip().lower() return f"{t}|{u}" def _pick_img(it: Dict[str, Any]) -> Optional[str]: # quelques champs possibles venant du payload GraphQL for cand in ("image", "hero", "boxArt", "thumb", "thumbnail", "cover"): v = it.get(cand) if isinstance(v, str) and v.startswith(("http://", "https://")): return _sanitize_url(v) return None def _coerce_items_from_graphql(data: dict) -> list[dict]: """Extract Prime Gaming Item records from several response shapes.""" items: List[Dict[str, Any]] = [] def push_from_item(it: dict): if not isinstance(it, dict): return iid = it.get("id") or it.get("itemId") assets = it.get("assets") or {} title = (assets.get("title") or it.get("title") or "").strip() url = ( assets.get("externalClaimLink") or it.get("claimUrl") or f"https://gaming.amazon.com/home?itemId={iid}" ) media = (assets.get("cardMedia") or {}).get("defaultMedia") or {} img = media.get("src1x") or media.get("src2x") or None end = None offers = it.get("offers") if isinstance(offers, list) and offers: end = offers[0].get("endTime") or offers[0].get("endDate") items.append( { "key": str(iid) if iid else title, # used for de-dupe & DB "platform": "PRIME", "title": title or (it.get("game", {}) or {}).get("assets", {}).get("title") or "Unknown", "game": title or "", "end": end, "url": _sanitize_url(url), "image": _sanitize_url(img), "thumbnail": _sanitize_url(img), "category": it.get("category"), "isFGWP": it.get("isFGWP"), } ) # shape 1: {"data":{"items":{"items":[...]}}} page_items = (((data or {}).get("data") or {}).get("items") or {}).get("items") if isinstance(page_items, list): for it in page_items: push_from_item(it) # shape 2: sections with ItemsPage sections = [ "inGameLoot", "expiring", "popular", "games", "featuredContent", "eventRow1", "eventRow2", ] root = (data or {}).get("data") or {} for sec in sections: arr = (root.get(sec) or {}).get("items") if isinstance(arr, list): for it in arr: if isinstance(it, dict) and it.get("assets"): push_from_item(it) return items async def _fetch_prime_offers_playwright() -> List[Dict[str, Any]]: """Open Prime Gaming in a real browser and capture GraphQL responses.""" if async_playwright is None: raise RuntimeError( "Playwright not installed. `pip install playwright` then `playwright install`" ) async with async_playwright() as pw: browser = await pw.chromium.launch(headless=True) context = await browser.new_context( user_agent=UA["User-Agent"], extra_http_headers={k: v for k, v in UA.items() if k.lower() != "user-agent"}, ) page = await context.new_page() offers: List[Dict[str, Any]] = [] async def handle_response(resp): try: if "/graphql" in resp.url and resp.request.method == "POST" and resp.status == 200: data = await resp.json() extracted = _coerce_items_from_graphql(data) offers.extend(extracted) except Exception: LOG.exception("PRIME: parse error on %s", resp.url) page.on("response", handle_response) await page.goto(PRIME_HOME, wait_until="domcontentloaded", timeout=45000) try: await page.wait_for_load_state("networkidle", timeout=20000) except Exception: pass # Nudge lazy sections if not offers: try: await page.evaluate("window.scrollTo(0, document.body.scrollHeight)") await page.wait_for_timeout(2000) await page.evaluate("window.scrollTo(0, 0)") await page.wait_for_timeout(1000) except Exception: pass await context.close() await browser.close() # de-dupe by key uniq = {} for it in offers: k = it.get("key") if k and k not in uniq: uniq[k] = it # keep only FULL_GAME (drop in-game loot) and items with a title + url final = [ v for v in uniq.values() if (v.get("category") == "FULL_GAME" or v.get("isFGWP")) and v.get("title") and v.get("url") ] return final # Prime Gaming PRIME_HOME = "https://gaming.amazon.com/" def _prime_key(it: Dict[str, Any]) -> str: # priorité à la clé GraphQL si dispo, sinon fallback stable sur (title|url) k = it.get("key") if isinstance(k, str) and k.strip(): return k t = (it.get("title") or "").strip().lower() u = (it.get("url") or "").strip().lower() return f"{t}|{u}" def _pick_img(it: Dict[str, Any]) -> Optional[str]: # quelques champs possibles venant du payload GraphQL for cand in ("image", "hero", "boxArt", "thumb", "thumbnail", "cover"): v = it.get(cand) if isinstance(v, str) and v.startswith(("http://", "https://")): return _sanitize_url(v) return None async def fetch_primegaming_week() -> Optional[Dict[str, Any]]: """ Ouvre Prime Gaming (Playwright), capte les réponses GraphQL, garde les FULL_GAME/FGWP et filtre sur 7 jours si possible. Ne touche pas à la DB ici. """ offers = await _fetch_prime_offers_playwright() # fourni par toi if not offers: return None # filtre temporel (si dates présentes) windowed: List[Dict[str, Any]] = [] for it in offers: start_iso = it.get("start") or it.get("startDate") or it.get("availabilityStartDate") end_iso = it.get("end") or it.get("endDate") or it.get("availabilityEndDate") if (start_iso or end_iso): if _in_last_7_days(start_iso, end_iso): windowed.append(it) else: windowed.append(it) if not windowed: return None # dédup locale + clés calculées pour le filtrage DB ultérieur uniq: Dict[str, Dict[str, Any]] = {} for it in windowed: k = _prime_key(it) if k not in uniq: it["_dedup_key"] = k uniq[k] = it items = list(uniq.values()) if not items: return None # Construire un bloc "brut" (les titres seront re-dérivés après filtrage DB) hero = _pick_img(items[0]) return { "title": "This Month on Prime Gaming", "date": datetime.now(timezone.utc).isoformat(), "url": PRIME_HOME, "games": [ (it.get("title") or "").strip() for it in items if it.get("title") ][:12], "image": hero, "thumbnail": hero, "_items": items, # pour filtrage DB/remember au call-site } # -------------------- EGS freebies (async, via official endpoint) -------------------- async def fetch_egs_week(session: aiohttp.ClientSession, locale="en-US", country="US", allow_countries=None) -> List[Dict[str, Any]]: allow = allow_countries or country url = ("https://store-site-backend-static.ak.epicgames.com/freeGamesPromotions" f"?locale={locale}&country={country}&allowCountries={allow}") async with session.get(url, headers=UA, timeout=aiohttp.ClientTimeout(total=25)) as r: r.raise_for_status() data = await r.json() out: List[Dict[str, Any]] = [] elements = (((data or {}).get("data") or {}).get("Catalog") or {}).get("searchStore", {}).get("elements", []) or [] for item in elements: if not _egs_is_free_now(item): continue title = item.get("title") slug = item.get("productSlug") if not slug: mappings = (item.get("catalogNs", {}) or {}).get("mappings") or [] if mappings: slug = mappings[0].get("pageSlug") url_item = _sanitize_url(f"https://store.epicgames.com/p/{slug}" if slug else None) key_images = item.get("keyImages") or [] pref_order = ("OfferImageWide", "DieselStoreFrontWide", "DieselStoreFront", "OfferImageTall", "Thumbnail") chosen = None for t in pref_order: chosen = next((k for k in key_images if (k.get("type") or "").lower() == t.lower()), None) if chosen: break if not chosen and key_images: chosen = key_images[0] hero = _sanitize_url(chosen.get("url")) if chosen else None thumb = hero promos = (item.get("promotions") or {}) for section in ("promotionalOffers", "upcomingPromotionalOffers"): lst = promos.get(section) or [] for entry in lst: for p in entry.get("promotionalOffers", []) or []: start = p.get("startDate") end = p.get("endDate") if _in_last_7_days(start, end): out.append({ "platform": "EGS", "title": title, "start": start, "end": end, "url": url_item, "image": hero, "thumbnail": thumb, "is_current": None, }) # keep unique by (title,start) seen = set() uniq = [] for it in out: k = (it.get("title"), it.get("start")) if k not in seen: seen.add(k) uniq.append(it) # sort by start desc uniq.sort(key=lambda e: _fmt_dt(e.get("start")) or "", reverse=True) return uniq # -------------------- PS Plus (async, blog scraping FR) -------------------- PSPLUS_CATEGORY_URL_FR = "https://blog.fr.playstation.com/category/ps-plus/" BASE_BLOG = "https://blog.fr.playstation.com/" def _soup(text: str) -> BeautifulSoup: return BeautifulSoup(text, "html.parser") def _clean_titles(candidates: List[str], limit: int = 7) -> List[str]: cleaned, seen = [], set() for g in candidates: g = re.sub(r"\s*\|\s*PS[45].*$", "", g).strip() g = re.sub(r"\s*\(PS\+\)$", "", g).strip() if not g or len(g) < 2: continue k = g.lower() if k not in seen: seen.add(k) cleaned.append(g) if len(cleaned) >= limit: break return cleaned def _abs_url(u: Optional[str], base: str) -> Optional[str]: if not u: return None if u.startswith("http://") or u.startswith("https://"): return u if u.startswith("/"): return BASE_BLOG.rstrip("/") + u return base.rstrip("/") + "/" + u def _extract_best_image(art: BeautifulSoup, article_url: str) -> Tuple[Optional[str], Optional[str]]: for sel in [ 'meta[property="og:image"]', 'meta[name="og:image"]', 'meta[name="twitter:image"]', 'meta[property="twitter:image"]', ]: tag = art.select_one(sel) if tag and tag.get("content"): img = _abs_url(tag["content"].strip(), article_url) if img: return (img, img) tag = art.select_one('link[rel="image_src"]') if tag and tag.get("href"): img = _abs_url(tag["href"].strip(), article_url) if img: return (img, img) first_img = art.select_one("article img[src]") if first_img: img = _abs_url(first_img.get("src"), article_url) return (img, img) return (None, None) async def fetch_psplus_week(session: aiohttp.ClientSession) -> Optional[Dict[str, Any]]: """Dernier billet FR “Les jeux du mois PlayStation Plus …”. On le retient s’il tombe dans T-7..T.""" async with session.get(PSPLUS_CATEGORY_URL_FR, headers=UA, timeout=aiohttp.ClientTimeout(total=25)) as r: r.raise_for_status() idx_html = await r.text() idx = _soup(idx_html) candidates: List[Tuple[str, str]] = [] for a in idx.select("a[href]"): href = a.get("href") or "" title = (a.get_text() or "").strip() if not href.startswith(BASE_BLOG): continue if title.lower().startswith("les jeux du mois playstation plus"): candidates.append((title, href)) if not candidates: return None latest: Optional[Tuple[datetime, str, str, BeautifulSoup]] = None for title, href in candidates: try: async with session.get(href, headers=UA, timeout=aiohttp.ClientTimeout(total=25)) as r: r.raise_for_status() art_html = await r.text() except Exception: continue art = _soup(art_html) time_el = art.find("time") date_iso = time_el.get("datetime") if time_el else None if not date_iso: continue try: dt_iso = datetime.fromisoformat(date_iso.replace("Z", "+00:00")) except Exception: continue if latest is None or dt_iso > latest[0]: latest = (dt_iso, title, href, art) if latest is None: return None dt_iso, title, url, art = latest now = datetime.now(timezone.utc) if (now - dt_iso) > timedelta(days=7): # Hors fenêtre -> pas de section PS+ cette semaine return None # extraire quelques titres listés dans l’article games: List[str] = [] for node in art.find_all(string=True): t = " ".join((node or "").strip().split()) if " | PS" in t and 0 < len(t) <= 120: name = t.split(" | ")[0].strip("—:-– ") if 2 <= len(name.split()) <= 8: games.append(name) if not games: for tag in art.select("strong, em"): t = " ".join(tag.get_text(" ", strip=True).split()) if 2 <= len(t.split()) <= 8: games.append(t) image_url, thumb_url = _extract_best_image(art, url) return { "title": title, "date": dt_iso.replace(tzinfo=timezone.utc).isoformat(), "url": url, "games": _clean_titles(games), "image": _sanitize_url(image_url), "thumbnail": _sanitize_url(thumb_url), } # -------------------- Xbox Game Pass (sync, SIGLS + DisplayCatalog) -------------------- def _xbox_locale_from_languages(languages: str) -> str: if not languages or "-" not in languages: return "en-US" lang, region = languages.split("-", 1) return f"{lang.lower()}-{region.upper()}" def _slugify(s: str) -> str: s = re.sub(r"[^a-zA-Z0-9]+", "-", s).strip("-").lower() return s or "game" def _pick_images_displaycatalog(prod: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]: def candidates_of(prod): cands: List[Dict[str, Any]] = [] lp = prod.get("LocalizedProperties") or [] if isinstance(lp, list) and lp: images = lp[0].get("Images") or [] if isinstance(images, list): cands.extend(images) top = prod.get("Images") or [] if isinstance(top, list): cands.extend(top) return cands def is_pref(img: Dict[str, Any]) -> bool: purpose = (img.get("ImagePurpose") or img.get("Purpose") or "").lower() return any(k in purpose for k in ("poster", "brandedkeyart", "superherowide", "superheroart")) cands = candidates_of(prod) if not cands: return None, None pref = [i for i in cands if is_pref(i)] or cands try: hero_raw = max(pref, key=lambda i: i.get("Width", 0)).get("Uri") except Exception: hero_raw = pref[0].get("Uri") try: thumb_raw = min(pref, key=lambda i: i.get("Width", 10**9)).get("Uri") except Exception: thumb_raw = hero_raw hero = _sanitize_url(hero_raw) thumb = _sanitize_url(thumb_raw) return hero or thumb, thumb or hero def fetch_xgp_recent(limit=20, market="US", languages="en-us") -> List[Dict[str, Any]]: """Best-effort: ‘Recently added’ via SIGLS; fallback All games; enrich via DisplayCatalog.""" recently_ids = [ "3fdd7f57-7092-4b65-bd40-5a9dac1b2b84", "61d6e1a1-735c-4b97-9d15-22ce8dfb0c03", "7d2d3d36-1c52-4a63-8b3e-3b7ee4d0f62a", ] all_games_id = "29a81209-df6f-41fd-a528-2ae6b91f719c" def sigls(collection_id: str) -> List[str]: try: r = requests.get("https://catalog.gamepass.com/sigls/v2", params={"language": languages, "market": market, "id": collection_id}, headers=UA, timeout=25) r.raise_for_status() payload = r.json() except Exception: return [] ids: List[str] = [] if isinstance(payload, list): for obj in payload: if isinstance(obj, dict) and "id" in obj and isinstance(obj["id"], str): ids.append(obj["id"]) elif isinstance(obj, str): ids.append(obj) return ids big_ids: List[str] = [] for cid in recently_ids: ids = sigls(cid) if ids: big_ids = ids break if not big_ids: big_ids = sigls(all_games_id) if not big_ids: return [] out: List[Dict[str, Any]] = [] seen = set() for i in range(0, len(big_ids), 20): chunk = big_ids[i:i+20] try: dc = requests.get("https://displaycatalog.mp.microsoft.com/v7.0/products", params={"bigIds": ",".join(chunk), "market": market, "languages": languages}, headers=UA, timeout=25) dc.raise_for_status() data = dc.json() except Exception: continue for p in (data.get("Products") or []): pid = p.get("ProductId") if not pid or pid in seen: continue seen.add(pid) title = pid lp = p.get("LocalizedProperties") or [] if lp: title = lp[0].get("ProductTitle") or pid locale_path = _xbox_locale_from_languages("fr-fr") url = f"https://www.xbox.com/{locale_path}/games/store/{_slugify(title)}/{pid}" hero, thumb = _pick_images_displaycatalog(p) out.append({"platform": "XGP", "title": title, "productId": pid, "url": _sanitize_url(url), "image": hero, "thumbnail": thumb}) return out[:limit] # -------------------- HTML builder -------------------- YOUTUBE_EMBED_TMPL = ( '
Aucun élément disponible.
" items = items[:12] col_pct = 100 // max(1, columns) # explicit col widths => no leftover gap colgroup = "".join([f'Aucun jeu gratuit relevé sur la période.
") else: parts.append(render_cards_grid(egs, columns=CARD_COLUMNS, fixed_h=CARD_IMG_H, fallback_url="https://store.epicgames.com/")) for it in egs: # title = html.escape(it.get("title") or "") # url = it.get("url") or "" # start = _fmt_dt(it.get("start")) or "?" # end = _fmt_dt(it.get("end")) or "?" img = it.get("thumbnail") if img and not feature: feature = img # parts.append("{title}
Période : {start} → {end}
Aucun nouveau billet Prime Gaming cette semaine.
") else: # utilise l’image de section comme “feature” si rien d’autre n’a été choisi # if prime.get("image") and not feature: # feature = prime["image"] # titre cliquable vers la page "This Month on Prime Gaming" title = html.escape(prime.get("title") or "This Month on Prime Gaming") url = prime.get("url") or "https://gaming.amazon.com/" #parts.append(f"") # grille de cartes (image au-dessus, nom en dessous, lien vers l’offre) parts.append(render_cards_grid(prime.get("_items", []), columns=CARD_COLUMNS, fixed_h=CARD_IMG_H, fallback_url=url)) # --- Xbox Game Pass parts.append("Pas d'entrées détectées.
") else: parts.append(render_cards_grid(xgp, columns=CARD_COLUMNS, fixed_h=CARD_IMG_H, fallback_url="https://www.xbox.com/xbox-game-pass")) for it in xgp: # title = html.escape(it.get("title") or it.get("productId") or "") # url = it.get("url") or "" # img = it.get("thumbnail") if img and not feature: feature = img # parts.append("{title}
") # if url: # parts.append(f'') # parts.append("Aucune nouvelle annonce PS+.
") else: if psplus.get("image") and not feature: feature = psplus["image"] parts.append("{title}
") games = psplus.get("games") or [] if games: parts.append("Newsletter hebdomadaire — envoyée automatiquement chaque dimanche à midi.
") return "\n".join(parts), feature # -------------------- Orchestrator -------------------- async def run_weekly(): logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") admin_url = os.environ["GHOST_ADMIN_URL"] admin_key = os.environ["GHOST_ADMIN_KEY"] newsletter_slug = os.environ.get("GHOST_NEWSLETTER_SLUG") ghost = GhostAdmin(admin_url, admin_key) # Fetch all sources (with timeouts and isolation) egs_items: List[Dict[str, Any]] = [] psplus_data: Optional[Dict[str, Any]] = None xgp_items: List[Dict[str, Any]] = [] prime_data: Optional[Dict[str, Any]] = None store = Storage() try: async with aiohttp.ClientSession(headers=UA) as s: # EGS try: locale = os.environ.get("EGS_LOCALE", "en-US") country = os.environ.get("EGS_COUNTRY", "US") allow = os.environ.get("EGS_ALLOW_COUNTRIES", country) egs_items = await fetch_egs_week(s, locale=locale, country=country, allow_countries=allow) except Exception as e: LOG.warning("EGS fetch failed: %s", e) # PS Plus try: psplus_data = await fetch_psplus_week(s) except Exception as e: LOG.warning("PS+ fetch failed: %s", e) except Exception as e: LOG.warning("HTTP session failed: %s", e) #prime try: prime_data = await fetch_primegaming_week() except Exception as e: LOG.warning("prime fetch failed: %s", e) if prime_data: # 2.a Filtre DB (ne garder que ce qui n’a jamais été publié) fresh = [] for it in prime_data["_items"]: k = it.get("_dedup_key") or _prime_key(it) if not store.seen(k): # ← usage DB (filtrage) it["_dedup_key"] = k fresh.append(it) if fresh: # régénère la liste "games" et l'image en fonction du filtrage prime_data["_items"] = fresh prime_data["games"] = [ (it.get("title") or "").strip() for it in fresh if it.get("title") ][:12] prime_data["image"] = prime_data.get("image") or _pick_img(fresh[0]) prime_data["thumbnail"] = prime_data["image"] else: prime_data = None # rien de neuf → pas de section # XGP (requests, sync) try: market = os.environ.get("XGP_MARKET", "US") languages = os.environ.get("XGP_LANGUAGES", "en-us") xgp_items = fetch_xgp_recent(limit=20, market=market, languages=languages) except Exception as e: LOG.warning("XGP fetch failed: %s", e) # Build HTML def keep_new(items, key_fn): fresh = [] for it in items: k = key_fn(it) if not store.seen(k): it["_dedup_key"] = k fresh.append(it) return fresh # Fetch your three sources as you already do: xgp_items = keep_new(xgp_items, xgp_key) html_body, feature = build_html(egs_items, psplus_data, xgp_items, prime_data) # Title (FR) start = (datetime.now(TZ) - timedelta(days=6)).strftime("%d/%m/%Y") end = datetime.now(TZ).strftime("%d/%m/%Y") title = f"Récap hebdo — EGS, Prime Gaming, PS Plus, Game Pass ({start} → {end})" # Create + publish + email created = ghost.create_post_html(title, html_body, status="draft", feature_image=feature) ghost.publish_post(created["id"], created["updated_at"], newsletter_slug=newsletter_slug) post_id = created["id"] if xgp_items and post_id: for it in xgp_items: store.remember("xgp", it["_dedup_key"], post_id) # 2.b Marquer comme publié (clé + post_id) if prime_data and post_id: store.bulk_remember("prime", [(it["_dedup_key"], post_id) for it in prime_data["_items"]]) LOG.info("Published weekly newsletter: %s", created.get("url")) # -------------------- Scheduler -------------------- def _format_duration(seconds: float) -> str: seconds = int(seconds) days, seconds = divmod(seconds, 86400) hours, seconds = divmod(seconds, 3600) minutes, seconds = divmod(seconds, 60) parts = [] if days: parts.append(f"{days} days") if hours: parts.append(f"{hours} hours") if minutes: parts.append(f"{minutes} minutes") if seconds: parts.append(f"{seconds} seconds") return ", ".join(parts) if parts else "0 seconds" async def run_forever_sunday_noon(): logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") while True: now = datetime.now() days = (6 - now.weekday()) % 7 target = (now + timedelta(days=days)).replace(hour=12, minute=0, second=0, microsecond=0) sleep_seconds = (target - now).total_seconds() while sleep_seconds > 0: LOG.info("Waiting for %s for next scan", _format_duration(sleep_seconds)) await asyncio.sleep(min(sleep_seconds, 5 * 60)) now = datetime.now() sleep_seconds = (target - now).total_seconds() LOG.info("Going to run the weekly task") await run_weekly() # -------------------- Entrypoint -------------------- async def main(): setuplogger() parser = argparse.ArgumentParser() parser.add_argument("--runonce", action="store_true", help="Run now and exit (no scheduler)") args = parser.parse_args() if args.runonce: await run_weekly() else: await run_forever_sunday_noon() if __name__ == "__main__": asyncio.run(main())