diff --git a/post_rss_to_ghost.py b/post_rss_to_ghost.py index c15aa54..fba2bcd 100644 --- a/post_rss_to_ghost.py +++ b/post_rss_to_ghost.py @@ -414,6 +414,10 @@ async def main(): LOG.info("Starting bot") + if args.run_once: + await task.daily_task() + return + # Démarrage: publier l'édition du jour si elle n'existe pas encore await task.maybe_run_today() diff --git a/presquegratos.py b/presquegratos.py new file mode 100644 index 0000000..ca15e5c --- /dev/null +++ b/presquegratos.py @@ -0,0 +1,597 @@ +# weekly_games_roundup.py +# -*- coding: utf-8 -*- +import asyncio +import dataclasses +import html +import json +import logging +import os +import random +import re +import time +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, Tuple +from urllib.parse import urlparse, parse_qs + +import aiohttp +import jwt # PyJWT +import requests +from bs4 import BeautifulSoup +from datetime import datetime, timedelta, timezone +import zoneinfo + +LOG = logging.getLogger("weekly") +TZ = zoneinfo.ZoneInfo("Europe/Brussels") +UA = {"User-Agent": "Mozilla/5.0 (compatible; weekly-games-roundup/1.0)"} + +# -------------------- Ghost Admin client -------------------- + +class GhostAdmin: + def __init__(self, admin_url: str, admin_key: str, accept_version: str = "v6.0"): + self.base = admin_url.rstrip("/") + "/" + self.key_id, self.key_secret_hex = admin_key.split(":") + self.accept_version = accept_version + + def _jwt(self) -> str: + iat = int(time.time()) + payload = {"iat": iat, "exp": iat + 5 * 60, "aud": "/admin/"} + headers = {"alg": "HS256", "typ": "JWT", "kid": self.key_id} + token = jwt.encode(payload, bytes.fromhex(self.key_secret_hex), algorithm="HS256", headers=headers) + return token if isinstance(token, str) else token.decode("utf-8") + + def _headers(self): + return {"Authorization": f"Ghost {self._jwt()}", + "Accept-Version": self.accept_version, + "Content-Type": "application/json"} + + def pick_newsletter_slug(self, preferred_slug: Optional[str]) -> str: + if preferred_slug: + return preferred_slug + url = self.base + "newsletters/" + resp = requests.get(url, headers=self._headers(), timeout=20) + resp.raise_for_status() + newsletters = resp.json().get("newsletters", []) + if not newsletters: + raise RuntimeError("No newsletters configured in Ghost.") + actives = [n for n in newsletters if n.get("status") == "active"] + for n in actives: + if n.get("is_default"): + return n["slug"] + return (actives or newsletters)[0]["slug"] + + def create_post_html(self, title: str, html_content: str, + status: str = "draft", feature_image: Optional[str] = None) -> Dict[str, Any]: + url = self.base + "posts/?source=html" + body = {"posts": [{"title": title, "html": html_content, "status": status, + **({"feature_image": feature_image} if feature_image else {})}]} + resp = requests.post(url, headers=self._headers(), json=body, timeout=30) + if resp.status_code >= 400: + raise RuntimeError(f"Ghost create error {resp.status_code}: {resp.text}") + return resp.json()["posts"][0] + + def publish_post(self, post_id: str, updated_at: str, + newsletter_slug: Optional[str], email_segment: Optional[str] = None) -> Dict[str, Any]: + slug = self.pick_newsletter_slug(newsletter_slug) + params = [f"newsletter={requests.utils.quote(slug)}"] + if email_segment: + params.append(f"email_segment={requests.utils.quote(email_segment)}") + url = self.base + f"posts/{post_id}/?{'&'.join(params)}" + body = {"posts": [{"updated_at": updated_at, "status": "published"}]} + resp = requests.put(url, headers=self._headers(), json=body, timeout=30) + if resp.status_code >= 400: + raise RuntimeError(f"Ghost publish error {resp.status_code}: {resp.text}") + return resp.json()["posts"][0] + +# -------------------- Helpers (URLs, images, dates) -------------------- + +def _sanitize_url(u: Optional[str]) -> Optional[str]: + if not u or not isinstance(u, str): + return None + u = u.strip().replace("\\", "/") + if not u: + return None + if u.startswith("//"): + u = "https:" + u + if " " in u: + u = u.replace(" ", "%20") + p = urlparse(u) + if p.scheme not in ("http", "https") or not p.netloc: + return None + return u + +def _fmt_dt(iso_dt: Optional[str]) -> Optional[str]: + if not iso_dt: + return None + try: + dt = datetime.fromisoformat(iso_dt.replace("Z", "+00:00")).astimezone(timezone.utc) + return dt.strftime("%Y-%m-%d %H:%M UTC") + except Exception: + return iso_dt + +def _in_last_7_days(start_iso: Optional[str], end_iso: Optional[str]) -> bool: + now = datetime.now(timezone.utc) + window_start = now - timedelta(days=7) + try: + if start_iso: + s = datetime.fromisoformat(start_iso.replace("Z", "+00:00")) + else: + s = None + if end_iso: + e = datetime.fromisoformat(end_iso.replace("Z", "+00:00")) + else: + e = None + except Exception: + return False + # Overlap test: [s,e] intersects [now-7d, now] + s = s or now + e = e or now + return (s <= now) and (e >= window_start) or (s >= window_start and s <= now) + +# -------------------- EGS freebies (async, via official endpoint) -------------------- + +async def fetch_egs_week(session: aiohttp.ClientSession, + locale="en-US", country="US", allow_countries=None) -> List[Dict[str, Any]]: + allow = allow_countries or country + url = ("https://store-site-backend-static.ak.epicgames.com/freeGamesPromotions" + f"?locale={locale}&country={country}&allowCountries={allow}") + async with session.get(url, headers=UA, timeout=aiohttp.ClientTimeout(total=25)) as r: + r.raise_for_status() + data = await r.json() + + out: List[Dict[str, Any]] = [] + elements = (((data or {}).get("data") or {}).get("Catalog") or {}).get("searchStore", {}).get("elements", []) or [] + for item in elements: + title = item.get("title") + slug = item.get("productSlug") + if not slug: + mappings = (item.get("catalogNs", {}) or {}).get("mappings") or [] + if mappings: + slug = mappings[0].get("pageSlug") + url_item = _sanitize_url(f"https://store.epicgames.com/p/{slug}" if slug else None) + key_images = item.get("keyImages") or [] + pref_order = ("OfferImageWide", "DieselStoreFrontWide", "DieselStoreFront", "OfferImageTall", "Thumbnail") + chosen = None + for t in pref_order: + chosen = next((k for k in key_images if (k.get("type") or "").lower() == t.lower()), None) + if chosen: + break + if not chosen and key_images: + chosen = key_images[0] + hero = _sanitize_url(chosen.get("url")) if chosen else None + thumb = hero + + promos = (item.get("promotions") or {}) + for section in ("promotionalOffers", "upcomingPromotionalOffers"): + lst = promos.get(section) or [] + for entry in lst: + for p in entry.get("promotionalOffers", []) or []: + start = p.get("startDate") + end = p.get("endDate") + if _in_last_7_days(start, end): + out.append({ + "platform": "EGS", + "title": title, + "start": start, + "end": end, + "url": url_item, + "image": hero, + "thumbnail": thumb, + "is_current": None, + }) + # keep unique by (title,start) + seen = set() + uniq = [] + for it in out: + k = (it.get("title"), it.get("start")) + if k not in seen: + seen.add(k) + uniq.append(it) + # sort by start desc + uniq.sort(key=lambda e: _fmt_dt(e.get("start")) or "", reverse=True) + return uniq + +# -------------------- PS Plus (async, blog scraping FR) -------------------- + +PSPLUS_CATEGORY_URL_FR = "https://blog.fr.playstation.com/category/ps-plus/" +BASE_BLOG = "https://blog.fr.playstation.com/" + +def _soup(text: str) -> BeautifulSoup: + return BeautifulSoup(text, "html.parser") + +def _clean_titles(candidates: List[str], limit: int = 7) -> List[str]: + cleaned, seen = [], set() + for g in candidates: + g = re.sub(r"\s*\|\s*PS[45].*$", "", g).strip() + g = re.sub(r"\s*\(PS\+\)$", "", g).strip() + if not g or len(g) < 2: + continue + k = g.lower() + if k not in seen: + seen.add(k) + cleaned.append(g) + if len(cleaned) >= limit: + break + return cleaned + +def _abs_url(u: Optional[str], base: str) -> Optional[str]: + if not u: + return None + if u.startswith("http://") or u.startswith("https://"): + return u + if u.startswith("/"): + return BASE_BLOG.rstrip("/") + u + return base.rstrip("/") + "/" + u + +def _extract_best_image(art: BeautifulSoup, article_url: str) -> Tuple[Optional[str], Optional[str]]: + for sel in [ + 'meta[property="og:image"]', + 'meta[name="og:image"]', + 'meta[name="twitter:image"]', + 'meta[property="twitter:image"]', + ]: + tag = art.select_one(sel) + if tag and tag.get("content"): + img = _abs_url(tag["content"].strip(), article_url) + if img: + return (img, img) + tag = art.select_one('link[rel="image_src"]') + if tag and tag.get("href"): + img = _abs_url(tag["href"].strip(), article_url) + if img: + return (img, img) + first_img = art.select_one("article img[src]") + if first_img: + img = _abs_url(first_img.get("src"), article_url) + return (img, img) + return (None, None) + +async def fetch_psplus_week(session: aiohttp.ClientSession) -> Optional[Dict[str, Any]]: + """Dernier billet FR “Les jeux du mois PlayStation Plus …”. On le retient s’il tombe dans T-7..T.""" + async with session.get(PSPLUS_CATEGORY_URL_FR, headers=UA, timeout=aiohttp.ClientTimeout(total=25)) as r: + r.raise_for_status() + idx_html = await r.text() + idx = _soup(idx_html) + candidates: List[Tuple[str, str]] = [] + for a in idx.select("a[href]"): + href = a.get("href") or "" + title = (a.get_text() or "").strip() + if not href.startswith(BASE_BLOG): + continue + if title.lower().startswith("les jeux du mois playstation plus"): + candidates.append((title, href)) + if not candidates: + return None + + latest: Optional[Tuple[datetime, str, str, BeautifulSoup]] = None + for title, href in candidates: + try: + async with session.get(href, headers=UA, timeout=aiohttp.ClientTimeout(total=25)) as r: + r.raise_for_status() + art_html = await r.text() + except Exception: + continue + art = _soup(art_html) + time_el = art.find("time") + date_iso = time_el.get("datetime") if time_el else None + if not date_iso: + continue + try: + dt_iso = datetime.fromisoformat(date_iso.replace("Z", "+00:00")) + except Exception: + continue + if latest is None or dt_iso > latest[0]: + latest = (dt_iso, title, href, art) + + if latest is None: + return None + + dt_iso, title, url, art = latest + now = datetime.now(timezone.utc) + if (now - dt_iso) > timedelta(days=7): + # Hors fenêtre -> pas de section PS+ cette semaine + return None + + # extraire quelques titres listés dans l’article + games: List[str] = [] + for node in art.find_all(string=True): + t = " ".join((node or "").strip().split()) + if " | PS" in t and 0 < len(t) <= 120: + name = t.split(" | ")[0].strip("—:-– ") + if 2 <= len(name.split()) <= 8: + games.append(name) + if not games: + for tag in art.select("strong, em"): + t = " ".join(tag.get_text(" ", strip=True).split()) + if 2 <= len(t.split()) <= 8: + games.append(t) + + image_url, thumb_url = _extract_best_image(art, url) + return { + "title": title, + "date": dt_iso.replace(tzinfo=timezone.utc).isoformat(), + "url": url, + "games": _clean_titles(games), + "image": _sanitize_url(image_url), + "thumbnail": _sanitize_url(thumb_url), + } + +# -------------------- Xbox Game Pass (sync, SIGLS + DisplayCatalog) -------------------- + +def _xbox_locale_from_languages(languages: str) -> str: + if not languages or "-" not in languages: + return "en-US" + lang, region = languages.split("-", 1) + return f"{lang.lower()}-{region.upper()}" + +def _slugify(s: str) -> str: + s = re.sub(r"[^a-zA-Z0-9]+", "-", s).strip("-").lower() + return s or "game" + +def _pick_images_displaycatalog(prod: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]: + def candidates_of(prod): + cands: List[Dict[str, Any]] = [] + lp = prod.get("LocalizedProperties") or [] + if isinstance(lp, list) and lp: + images = lp[0].get("Images") or [] + if isinstance(images, list): + cands.extend(images) + top = prod.get("Images") or [] + if isinstance(top, list): + cands.extend(top) + return cands + + def is_pref(img: Dict[str, Any]) -> bool: + purpose = (img.get("ImagePurpose") or img.get("Purpose") or "").lower() + return any(k in purpose for k in ("poster", "brandedkeyart", "superherowide", "superheroart")) + + cands = candidates_of(prod) + if not cands: + return None, None + pref = [i for i in cands if is_pref(i)] or cands + try: + hero_raw = max(pref, key=lambda i: i.get("Width", 0)).get("Uri") + except Exception: + hero_raw = pref[0].get("Uri") + try: + thumb_raw = min(pref, key=lambda i: i.get("Width", 10**9)).get("Uri") + except Exception: + thumb_raw = hero_raw + hero = _sanitize_url(hero_raw) + thumb = _sanitize_url(thumb_raw) + return hero or thumb, thumb or hero + +def fetch_xgp_recent(limit=20, market="US", languages="en-us") -> List[Dict[str, Any]]: + """Best-effort: ‘Recently added’ via SIGLS; fallback All games; enrich via DisplayCatalog.""" + recently_ids = [ + "3fdd7f57-7092-4b65-bd40-5a9dac1b2b84", + "61d6e1a1-735c-4b97-9d15-22ce8dfb0c03", + "7d2d3d36-1c52-4a63-8b3e-3b7ee4d0f62a", + ] + all_games_id = "29a81209-df6f-41fd-a528-2ae6b91f719c" + + def sigls(collection_id: str) -> List[str]: + try: + r = requests.get("https://catalog.gamepass.com/sigls/v2", + params={"language": languages, "market": market, "id": collection_id}, + headers=UA, timeout=25) + r.raise_for_status() + payload = r.json() + except Exception: + return [] + ids: List[str] = [] + if isinstance(payload, list): + for obj in payload: + if isinstance(obj, dict) and "id" in obj and isinstance(obj["id"], str): + ids.append(obj["id"]) + elif isinstance(obj, str): + ids.append(obj) + return ids + + big_ids: List[str] = [] + for cid in recently_ids: + ids = sigls(cid) + if ids: + big_ids = ids + break + if not big_ids: + big_ids = sigls(all_games_id) + if not big_ids: + return [] + + out: List[Dict[str, Any]] = [] + seen = set() + for i in range(0, len(big_ids), 20): + chunk = big_ids[i:i+20] + try: + dc = requests.get("https://displaycatalog.mp.microsoft.com/v7.0/products", + params={"bigIds": ",".join(chunk), "market": market, "languages": languages}, + headers=UA, timeout=25) + dc.raise_for_status() + data = dc.json() + except Exception: + continue + for p in (data.get("Products") or []): + pid = p.get("ProductId") + if not pid or pid in seen: + continue + seen.add(pid) + title = pid + lp = p.get("LocalizedProperties") or [] + if lp: + title = lp[0].get("ProductTitle") or pid + locale_path = _xbox_locale_from_languages("fr-fr") + url = f"https://www.xbox.com/{locale_path}/games/store/{_slugify(title)}/{pid}" + hero, thumb = _pick_images_displaycatalog(p) + out.append({"platform": "XGP", "title": title, "productId": pid, "url": _sanitize_url(url), + "image": hero, "thumbnail": thumb}) + return out[:limit] + +# -------------------- HTML builder -------------------- + +YOUTUBE_EMBED_TMPL = ( + '
Aucun jeu gratuit relevé sur la période.
") + else: + for it in egs: + title = html.escape(it.get("title") or "") + url = it.get("url") or "" + start = _fmt_dt(it.get("start")) or "?" + end = _fmt_dt(it.get("end")) or "?" + img = it.get("thumbnail") + if img and not feature: + feature = img + parts.append("{title}
Période : {start} → {end}
Aucun nouveau billet PS+ dans la fenêtre des 7 jours.
") + else: + if psplus.get("image") and not feature: + feature = psplus["image"] + parts.append("{title}
") + games = psplus.get("games") or [] + if games: + parts.append("Pas d'entrées détectées.
") + else: + for it in xgp: + title = html.escape(it.get("title") or it.get("productId") or "") + url = it.get("url") or "" + img = it.get("thumbnail") + if img and not feature: + feature = img + parts.append("{title}
") + if url: + parts.append(f'') + parts.append("Newsletter hebdomadaire — envoyée automatiquement chaque dimanche à midi.
") + return "\n".join(parts), feature + +# -------------------- Orchestrator -------------------- + +async def run_weekly(): + logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") + admin_url = os.environ["GHOST_ADMIN_URL"] + admin_key = os.environ["GHOST_ADMIN_KEY"] + newsletter_slug = os.environ.get("GHOST_NEWSLETTER_SLUG") + ghost = GhostAdmin(admin_url, admin_key) + + # Fetch all sources (with timeouts and isolation) + egs_items: List[Dict[str, Any]] = [] + psplus_data: Optional[Dict[str, Any]] = None + xgp_items: List[Dict[str, Any]] = [] + + try: + async with aiohttp.ClientSession(headers=UA) as s: + # EGS + try: + locale = os.environ.get("EGS_LOCALE", "en-US") + country = os.environ.get("EGS_COUNTRY", "US") + allow = os.environ.get("EGS_ALLOW_COUNTRIES", country) + egs_items = await fetch_egs_week(s, locale=locale, country=country, allow_countries=allow) + except Exception as e: + LOG.warning("EGS fetch failed: %s", e) + + # PS Plus + try: + psplus_data = await fetch_psplus_week(s) + except Exception as e: + LOG.warning("PS+ fetch failed: %s", e) + except Exception as e: + LOG.warning("HTTP session failed: %s", e) + + # XGP (requests, sync) + try: + market = os.environ.get("XGP_MARKET", "US") + languages = os.environ.get("XGP_LANGUAGES", "en-us") + xgp_items = fetch_xgp_recent(limit=20, market=market, languages=languages) + except Exception as e: + LOG.warning("XGP fetch failed: %s", e) + + # Build HTML + html_body, feature = build_html(egs_items, psplus_data, xgp_items) + # Title (FR) + start = (datetime.now(TZ) - timedelta(days=6)).strftime("%d/%m/%Y") + end = datetime.now(TZ).strftime("%d/%m/%Y") + title = f"Récap hebdo — EGS, PS Plus, Game Pass ({start} → {end})" + + # Create + publish + email + created = ghost.create_post_html(title, html_body, status="draft", feature_image=feature) + #ghost.publish_post(created["id"], created["updated_at"], newsletter_slug=newsletter_slug) + + LOG.info("Published weekly newsletter: %s", created.get("url")) + +# -------------------- Scheduler -------------------- + +async def run_forever_sunday_noon(): + """Run at next Sunday 12:00 Europe/Brussels, then every 7 days.""" + while True: + now = datetime.now(TZ) + # days until Sunday (weekday(): Monday=0..Sunday=6) + days = (6 - now.weekday()) % 7 + target = (now + timedelta(days=days)).replace(hour=12, minute=0, second=0, microsecond=0) + if target <= now: + target = target + timedelta(days=7) + wait = (target - now).total_seconds() + LOG.info("Next run at %s (in %.0f min)", target.isoformat(), wait/60) + await asyncio.sleep(wait) + try: + await run_weekly() + except Exception as e: + LOG.exception("Weekly run failed: %s", e) + # then sleep 7 days + await asyncio.sleep(7 * 24 * 3600) + +# -------------------- Entrypoint -------------------- + +if __name__ == "__main__": + import argparse + parser = argparse.ArgumentParser() + parser.add_argument("--run-once", action="store_true", help="Run now and exit (no scheduler)") + args = parser.parse_args() + if args.run_once: + asyncio.run(run_weekly()) + else: + asyncio.run(run_forever_sunday_noon()) diff --git a/update_and_run.sh b/update_and_run.sh index 1b4b7ec..8118f94 100644 --- a/update_and_run.sh +++ b/update_and_run.sh @@ -1,10 +1,14 @@ #!/bin/bash +set -euo pipefail +trap 'echo "[stop] shutting down..."; kill 0' SIGINT SIGTERM +git fetch --all || true +git reset --hard origin/main || true -# Pull the latest changes -git fetch --all -git reset --hard origin/main +# Lancer les 2 bots en parallèle +python -u post_rss_to_ghost.py & PID1=$! +python -u presquegratos.py & PID2=$! -# Run your Python script -python post_rss_to_ghost.py \ No newline at end of file +# Attendre que l'un des deux meure (et laisser l'autre se faire tuer par le trap) +wait -n "$PID1" "$PID2" || true \ No newline at end of file