# weekly_games_roundup.py # -*- coding: utf-8 -*- import asyncio import dataclasses import html import json import logging import os import random import re import time from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple from urllib.parse import urlparse, parse_qs import aiohttp import jwt # PyJWT import requests from bs4 import BeautifulSoup from datetime import datetime, timedelta, timezone import zoneinfo LOG = logging.getLogger("weekly") TZ = zoneinfo.ZoneInfo("Europe/Brussels") UA = {"User-Agent": "Mozilla/5.0 (compatible; weekly-games-roundup/1.0)"} # -------------------- Ghost Admin client -------------------- class GhostAdmin: def __init__(self, admin_url: str, admin_key: str, accept_version: str = "v6.0"): self.base = admin_url.rstrip("/") + "/" self.key_id, self.key_secret_hex = admin_key.split(":") self.accept_version = accept_version def _jwt(self) -> str: iat = int(time.time()) payload = {"iat": iat, "exp": iat + 5 * 60, "aud": "/admin/"} headers = {"alg": "HS256", "typ": "JWT", "kid": self.key_id} token = jwt.encode(payload, bytes.fromhex(self.key_secret_hex), algorithm="HS256", headers=headers) return token if isinstance(token, str) else token.decode("utf-8") def _headers(self): return {"Authorization": f"Ghost {self._jwt()}", "Accept-Version": self.accept_version, "Content-Type": "application/json"} def pick_newsletter_slug(self, preferred_slug: Optional[str]) -> str: if preferred_slug: return preferred_slug url = self.base + "newsletters/" resp = requests.get(url, headers=self._headers(), timeout=20) resp.raise_for_status() newsletters = resp.json().get("newsletters", []) if not newsletters: raise RuntimeError("No newsletters configured in Ghost.") actives = [n for n in newsletters if n.get("status") == "active"] for n in actives: if n.get("is_default"): return n["slug"] return (actives or newsletters)[0]["slug"] def create_post_html(self, title: str, html_content: str, status: str = "draft", feature_image: Optional[str] = None) -> Dict[str, Any]: url = self.base + "posts/?source=html" body = {"posts": [{"title": title, "html": html_content, "status": status, **({"feature_image": feature_image} if feature_image else {})}]} resp = requests.post(url, headers=self._headers(), json=body, timeout=30) if resp.status_code >= 400: raise RuntimeError(f"Ghost create error {resp.status_code}: {resp.text}") return resp.json()["posts"][0] def publish_post(self, post_id: str, updated_at: str, newsletter_slug: Optional[str], email_segment: Optional[str] = None) -> Dict[str, Any]: slug = self.pick_newsletter_slug(newsletter_slug) params = [f"newsletter={requests.utils.quote(slug)}"] if email_segment: params.append(f"email_segment={requests.utils.quote(email_segment)}") url = self.base + f"posts/{post_id}/?{'&'.join(params)}" body = {"posts": [{"updated_at": updated_at, "status": "published"}]} resp = requests.put(url, headers=self._headers(), json=body, timeout=30) if resp.status_code >= 400: raise RuntimeError(f"Ghost publish error {resp.status_code}: {resp.text}") return resp.json()["posts"][0] # -------------------- Helpers (URLs, images, dates) -------------------- def _sanitize_url(u: Optional[str]) -> Optional[str]: if not u or not isinstance(u, str): return None u = u.strip().replace("\\", "/") if not u: return None if u.startswith("//"): u = "https:" + u if " " in u: u = u.replace(" ", "%20") p = urlparse(u) if p.scheme not in ("http", "https") or not p.netloc: return None return u def _fmt_dt(iso_dt: Optional[str]) -> Optional[str]: if not iso_dt: return None try: dt = datetime.fromisoformat(iso_dt.replace("Z", "+00:00")).astimezone(timezone.utc) return dt.strftime("%Y-%m-%d %H:%M UTC") except Exception: return iso_dt def _in_last_7_days(start_iso: Optional[str], end_iso: Optional[str]) -> bool: now = datetime.now(timezone.utc) window_start = now - timedelta(days=7) try: if start_iso: s = datetime.fromisoformat(start_iso.replace("Z", "+00:00")) else: s = None if end_iso: e = datetime.fromisoformat(end_iso.replace("Z", "+00:00")) else: e = None except Exception: return False # Overlap test: [s,e] intersects [now-7d, now] s = s or now e = e or now return (s <= now) and (e >= window_start) or (s >= window_start and s <= now) # -------------------- EGS freebies (async, via official endpoint) -------------------- async def fetch_egs_week(session: aiohttp.ClientSession, locale="en-US", country="US", allow_countries=None) -> List[Dict[str, Any]]: allow = allow_countries or country url = ("https://store-site-backend-static.ak.epicgames.com/freeGamesPromotions" f"?locale={locale}&country={country}&allowCountries={allow}") async with session.get(url, headers=UA, timeout=aiohttp.ClientTimeout(total=25)) as r: r.raise_for_status() data = await r.json() out: List[Dict[str, Any]] = [] elements = (((data or {}).get("data") or {}).get("Catalog") or {}).get("searchStore", {}).get("elements", []) or [] for item in elements: title = item.get("title") slug = item.get("productSlug") if not slug: mappings = (item.get("catalogNs", {}) or {}).get("mappings") or [] if mappings: slug = mappings[0].get("pageSlug") url_item = _sanitize_url(f"https://store.epicgames.com/p/{slug}" if slug else None) key_images = item.get("keyImages") or [] pref_order = ("OfferImageWide", "DieselStoreFrontWide", "DieselStoreFront", "OfferImageTall", "Thumbnail") chosen = None for t in pref_order: chosen = next((k for k in key_images if (k.get("type") or "").lower() == t.lower()), None) if chosen: break if not chosen and key_images: chosen = key_images[0] hero = _sanitize_url(chosen.get("url")) if chosen else None thumb = hero promos = (item.get("promotions") or {}) for section in ("promotionalOffers", "upcomingPromotionalOffers"): lst = promos.get(section) or [] for entry in lst: for p in entry.get("promotionalOffers", []) or []: start = p.get("startDate") end = p.get("endDate") if _in_last_7_days(start, end): out.append({ "platform": "EGS", "title": title, "start": start, "end": end, "url": url_item, "image": hero, "thumbnail": thumb, "is_current": None, }) # keep unique by (title,start) seen = set() uniq = [] for it in out: k = (it.get("title"), it.get("start")) if k not in seen: seen.add(k) uniq.append(it) # sort by start desc uniq.sort(key=lambda e: _fmt_dt(e.get("start")) or "", reverse=True) return uniq # -------------------- PS Plus (async, blog scraping FR) -------------------- PSPLUS_CATEGORY_URL_FR = "https://blog.fr.playstation.com/category/ps-plus/" BASE_BLOG = "https://blog.fr.playstation.com/" def _soup(text: str) -> BeautifulSoup: return BeautifulSoup(text, "html.parser") def _clean_titles(candidates: List[str], limit: int = 7) -> List[str]: cleaned, seen = [], set() for g in candidates: g = re.sub(r"\s*\|\s*PS[45].*$", "", g).strip() g = re.sub(r"\s*\(PS\+\)$", "", g).strip() if not g or len(g) < 2: continue k = g.lower() if k not in seen: seen.add(k) cleaned.append(g) if len(cleaned) >= limit: break return cleaned def _abs_url(u: Optional[str], base: str) -> Optional[str]: if not u: return None if u.startswith("http://") or u.startswith("https://"): return u if u.startswith("/"): return BASE_BLOG.rstrip("/") + u return base.rstrip("/") + "/" + u def _extract_best_image(art: BeautifulSoup, article_url: str) -> Tuple[Optional[str], Optional[str]]: for sel in [ 'meta[property="og:image"]', 'meta[name="og:image"]', 'meta[name="twitter:image"]', 'meta[property="twitter:image"]', ]: tag = art.select_one(sel) if tag and tag.get("content"): img = _abs_url(tag["content"].strip(), article_url) if img: return (img, img) tag = art.select_one('link[rel="image_src"]') if tag and tag.get("href"): img = _abs_url(tag["href"].strip(), article_url) if img: return (img, img) first_img = art.select_one("article img[src]") if first_img: img = _abs_url(first_img.get("src"), article_url) return (img, img) return (None, None) async def fetch_psplus_week(session: aiohttp.ClientSession) -> Optional[Dict[str, Any]]: """Dernier billet FR “Les jeux du mois PlayStation Plus …”. On le retient s’il tombe dans T-7..T.""" async with session.get(PSPLUS_CATEGORY_URL_FR, headers=UA, timeout=aiohttp.ClientTimeout(total=25)) as r: r.raise_for_status() idx_html = await r.text() idx = _soup(idx_html) candidates: List[Tuple[str, str]] = [] for a in idx.select("a[href]"): href = a.get("href") or "" title = (a.get_text() or "").strip() if not href.startswith(BASE_BLOG): continue if title.lower().startswith("les jeux du mois playstation plus"): candidates.append((title, href)) if not candidates: return None latest: Optional[Tuple[datetime, str, str, BeautifulSoup]] = None for title, href in candidates: try: async with session.get(href, headers=UA, timeout=aiohttp.ClientTimeout(total=25)) as r: r.raise_for_status() art_html = await r.text() except Exception: continue art = _soup(art_html) time_el = art.find("time") date_iso = time_el.get("datetime") if time_el else None if not date_iso: continue try: dt_iso = datetime.fromisoformat(date_iso.replace("Z", "+00:00")) except Exception: continue if latest is None or dt_iso > latest[0]: latest = (dt_iso, title, href, art) if latest is None: return None dt_iso, title, url, art = latest now = datetime.now(timezone.utc) if (now - dt_iso) > timedelta(days=7): # Hors fenêtre -> pas de section PS+ cette semaine return None # extraire quelques titres listés dans l’article games: List[str] = [] for node in art.find_all(string=True): t = " ".join((node or "").strip().split()) if " | PS" in t and 0 < len(t) <= 120: name = t.split(" | ")[0].strip("—:-– ") if 2 <= len(name.split()) <= 8: games.append(name) if not games: for tag in art.select("strong, em"): t = " ".join(tag.get_text(" ", strip=True).split()) if 2 <= len(t.split()) <= 8: games.append(t) image_url, thumb_url = _extract_best_image(art, url) return { "title": title, "date": dt_iso.replace(tzinfo=timezone.utc).isoformat(), "url": url, "games": _clean_titles(games), "image": _sanitize_url(image_url), "thumbnail": _sanitize_url(thumb_url), } # -------------------- Xbox Game Pass (sync, SIGLS + DisplayCatalog) -------------------- def _xbox_locale_from_languages(languages: str) -> str: if not languages or "-" not in languages: return "en-US" lang, region = languages.split("-", 1) return f"{lang.lower()}-{region.upper()}" def _slugify(s: str) -> str: s = re.sub(r"[^a-zA-Z0-9]+", "-", s).strip("-").lower() return s or "game" def _pick_images_displaycatalog(prod: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]: def candidates_of(prod): cands: List[Dict[str, Any]] = [] lp = prod.get("LocalizedProperties") or [] if isinstance(lp, list) and lp: images = lp[0].get("Images") or [] if isinstance(images, list): cands.extend(images) top = prod.get("Images") or [] if isinstance(top, list): cands.extend(top) return cands def is_pref(img: Dict[str, Any]) -> bool: purpose = (img.get("ImagePurpose") or img.get("Purpose") or "").lower() return any(k in purpose for k in ("poster", "brandedkeyart", "superherowide", "superheroart")) cands = candidates_of(prod) if not cands: return None, None pref = [i for i in cands if is_pref(i)] or cands try: hero_raw = max(pref, key=lambda i: i.get("Width", 0)).get("Uri") except Exception: hero_raw = pref[0].get("Uri") try: thumb_raw = min(pref, key=lambda i: i.get("Width", 10**9)).get("Uri") except Exception: thumb_raw = hero_raw hero = _sanitize_url(hero_raw) thumb = _sanitize_url(thumb_raw) return hero or thumb, thumb or hero def fetch_xgp_recent(limit=20, market="US", languages="en-us") -> List[Dict[str, Any]]: """Best-effort: ‘Recently added’ via SIGLS; fallback All games; enrich via DisplayCatalog.""" recently_ids = [ "3fdd7f57-7092-4b65-bd40-5a9dac1b2b84", "61d6e1a1-735c-4b97-9d15-22ce8dfb0c03", "7d2d3d36-1c52-4a63-8b3e-3b7ee4d0f62a", ] all_games_id = "29a81209-df6f-41fd-a528-2ae6b91f719c" def sigls(collection_id: str) -> List[str]: try: r = requests.get("https://catalog.gamepass.com/sigls/v2", params={"language": languages, "market": market, "id": collection_id}, headers=UA, timeout=25) r.raise_for_status() payload = r.json() except Exception: return [] ids: List[str] = [] if isinstance(payload, list): for obj in payload: if isinstance(obj, dict) and "id" in obj and isinstance(obj["id"], str): ids.append(obj["id"]) elif isinstance(obj, str): ids.append(obj) return ids big_ids: List[str] = [] for cid in recently_ids: ids = sigls(cid) if ids: big_ids = ids break if not big_ids: big_ids = sigls(all_games_id) if not big_ids: return [] out: List[Dict[str, Any]] = [] seen = set() for i in range(0, len(big_ids), 20): chunk = big_ids[i:i+20] try: dc = requests.get("https://displaycatalog.mp.microsoft.com/v7.0/products", params={"bigIds": ",".join(chunk), "market": market, "languages": languages}, headers=UA, timeout=25) dc.raise_for_status() data = dc.json() except Exception: continue for p in (data.get("Products") or []): pid = p.get("ProductId") if not pid or pid in seen: continue seen.add(pid) title = pid lp = p.get("LocalizedProperties") or [] if lp: title = lp[0].get("ProductTitle") or pid locale_path = _xbox_locale_from_languages("fr-fr") url = f"https://www.xbox.com/{locale_path}/games/store/{_slugify(title)}/{pid}" hero, thumb = _pick_images_displaycatalog(p) out.append({"platform": "XGP", "title": title, "productId": pid, "url": _sanitize_url(url), "image": hero, "thumbnail": thumb}) return out[:limit] # -------------------- HTML builder -------------------- YOUTUBE_EMBED_TMPL = ( '
' '
' ) def build_html(egs: List[Dict[str, Any]], psplus: Optional[Dict[str, Any]], xgp: List[Dict[str, Any]]) -> Tuple[str, Optional[str]]: parts: List[str] = [] now_local = datetime.now(TZ) title_h2 = f"Presque Gratuit — semaine du {(now_local - timedelta(days=6)).strftime('%d/%m')} au {now_local.strftime('%d/%m')}" parts.append(f"

{html.escape(title_h2)}

") feature: Optional[str] = None # --- Epic Games Store parts.append("

🎁 Epic Games Store — Jeux gratuits (7 jours)

") if not egs: parts.append("

Aucun jeu gratuit relevé sur la période.

") else: for it in egs: title = html.escape(it.get("title") or "") url = it.get("url") or "" start = _fmt_dt(it.get("start")) or "?" end = _fmt_dt(it.get("end")) or "?" img = it.get("thumbnail") if img and not feature: feature = img parts.append("
") if img: parts.append(f'

') parts.append(f"

{title}
Période : {start} → {end}

") if url: parts.append(f'

{html.escape(url)}

') parts.append("
") # --- PlayStation Plus parts.append("

🎮 PlayStation Plus — Jeux du mois (si annoncé cette semaine)

") if not psplus: parts.append("

Aucun nouveau billet PS+ dans la fenêtre des 7 jours.

") else: if psplus.get("image") and not feature: feature = psplus["image"] parts.append("
") if psplus.get("image"): parts.append(f'

') title = html.escape(psplus.get("title") or "PlayStation Plus — Jeux du mois") url = psplus.get("url") if url: parts.append(f"

{title}

") else: parts.append(f"

{title}

") games = psplus.get("games") or [] if games: parts.append("") parts.append("
") # --- Xbox Game Pass parts.append("

🟩 Xbox Game Pass — Récemment ajoutés

") if not xgp: parts.append("

Pas d'entrées détectées.

") else: for it in xgp: title = html.escape(it.get("title") or it.get("productId") or "") url = it.get("url") or "" img = it.get("thumbnail") if img and not feature: feature = img parts.append("
") if img: parts.append(f'

') parts.append(f"

{title}

") if url: parts.append(f'

{html.escape(url)}

') parts.append("
") parts.append("

Newsletter hebdomadaire — envoyée automatiquement chaque dimanche à midi.

") return "\n".join(parts), feature # -------------------- Orchestrator -------------------- async def run_weekly(): logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") admin_url = os.environ["GHOST_ADMIN_URL"] admin_key = os.environ["GHOST_ADMIN_KEY"] newsletter_slug = os.environ.get("GHOST_NEWSLETTER_SLUG") ghost = GhostAdmin(admin_url, admin_key) # Fetch all sources (with timeouts and isolation) egs_items: List[Dict[str, Any]] = [] psplus_data: Optional[Dict[str, Any]] = None xgp_items: List[Dict[str, Any]] = [] try: async with aiohttp.ClientSession(headers=UA) as s: # EGS try: locale = os.environ.get("EGS_LOCALE", "en-US") country = os.environ.get("EGS_COUNTRY", "US") allow = os.environ.get("EGS_ALLOW_COUNTRIES", country) egs_items = await fetch_egs_week(s, locale=locale, country=country, allow_countries=allow) except Exception as e: LOG.warning("EGS fetch failed: %s", e) # PS Plus try: psplus_data = await fetch_psplus_week(s) except Exception as e: LOG.warning("PS+ fetch failed: %s", e) except Exception as e: LOG.warning("HTTP session failed: %s", e) # XGP (requests, sync) try: market = os.environ.get("XGP_MARKET", "US") languages = os.environ.get("XGP_LANGUAGES", "en-us") xgp_items = fetch_xgp_recent(limit=20, market=market, languages=languages) except Exception as e: LOG.warning("XGP fetch failed: %s", e) # Build HTML html_body, feature = build_html(egs_items, psplus_data, xgp_items) # Title (FR) start = (datetime.now(TZ) - timedelta(days=6)).strftime("%d/%m/%Y") end = datetime.now(TZ).strftime("%d/%m/%Y") title = f"Récap hebdo — EGS, PS Plus, Game Pass ({start} → {end})" # Create + publish + email created = ghost.create_post_html(title, html_body, status="draft", feature_image=feature) #ghost.publish_post(created["id"], created["updated_at"], newsletter_slug=newsletter_slug) LOG.info("Published weekly newsletter: %s", created.get("url")) # -------------------- Scheduler -------------------- async def run_forever_sunday_noon(): logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") """Run at next Sunday 12:00 Europe/Brussels, then every 7 days.""" while True: now = datetime.now(TZ) # days until Sunday (weekday(): Monday=0..Sunday=6) days = (6 - now.weekday()) % 7 target = (now + timedelta(days=days)).replace(hour=12, minute=0, second=0, microsecond=0) if target <= now: target = target + timedelta(days=7) wait = (target - now).total_seconds() LOG.info("Next run at %s (in %.0f min)", target.isoformat(), wait/60) await asyncio.sleep(wait) try: await run_weekly() except Exception as e: LOG.exception("Weekly run failed: %s", e) # then sleep 7 days await asyncio.sleep(7 * 24 * 3600) # -------------------- Entrypoint -------------------- if __name__ == "__main__": import argparse parser = argparse.ArgumentParser() parser.add_argument("--runonce", action="store_true", help="Run now and exit (no scheduler)") args = parser.parse_args() if args.runonce: asyncio.run(run_weekly()) else: asyncio.run(run_forever_sunday_noon())