Files
Substack_JV/presquegratos.py
2025-10-05 19:52:50 +02:00

1019 lines
38 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# weekly_games_roundup.py
# -*- coding: utf-8 -*-
import asyncio
import argparse
import dataclasses
import html
import json
import logging
import os
import random
import re
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlparse, parse_qs
import aiohttp
import jwt # PyJWT
import requests
from bs4 import BeautifulSoup
from logging.handlers import RotatingFileHandler
from datetime import datetime, timedelta, timezone
import zoneinfo
from storage import Storage
from keys import xgp_key
try:
from playwright.async_api import async_playwright
except Exception:
async_playwright = None
LOG = logging.getLogger("bot_weekly")
LOG_PATTERN = logging.Formatter("%(asctime)s:%(levelname)s: [%(filename)s] %(message)s")
PRIME_HOME = "https://gaming.amazon.com/home"
CARD_COLUMNS = 2
CARD_IMG_H = 180 # même hauteur partout
CARD_TITLE_EM = 3 # ~2 lignes de texte mini
def setuplogger():
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(LOG_PATTERN)
stream_handler.setLevel(logging.DEBUG)
file_handler = RotatingFileHandler("bot_weekly.log", "a", 1_000_000, 1)
file_handler.setFormatter(LOG_PATTERN)
LOG.setLevel(logging.DEBUG)
LOG.addHandler(stream_handler)
LOG.addHandler(file_handler)
TZ = zoneinfo.ZoneInfo("Europe/Brussels")
UA = {"User-Agent": "Mozilla/5.0 (compatible; weekly-games-roundup/1.0)"}
# -------------------- Ghost Admin client --------------------
class GhostAdmin:
def __init__(self, admin_url: str, admin_key: str, accept_version: str = "v6.0"):
self.base = admin_url.rstrip("/") + "/"
self.key_id, self.key_secret_hex = admin_key.split(":")
self.accept_version = accept_version
def _jwt(self) -> str:
iat = int(time.time())
payload = {"iat": iat, "exp": iat + 5 * 60, "aud": "/admin/"}
headers = {"alg": "HS256", "typ": "JWT", "kid": self.key_id}
token = jwt.encode(payload, bytes.fromhex(self.key_secret_hex), algorithm="HS256", headers=headers)
return token if isinstance(token, str) else token.decode("utf-8")
def _headers(self):
return {"Authorization": f"Ghost {self._jwt()}",
"Accept-Version": self.accept_version,
"Content-Type": "application/json"}
def pick_newsletter_slug(self, preferred_slug: Optional[str]) -> str:
if preferred_slug:
return preferred_slug
url = self.base + "newsletters/"
resp = requests.get(url, headers=self._headers(), timeout=20)
resp.raise_for_status()
newsletters = resp.json().get("newsletters", [])
if not newsletters:
raise RuntimeError("No newsletters configured in Ghost.")
actives = [n for n in newsletters if n.get("status") == "active"]
for n in actives:
if n.get("is_default"):
return n["slug"]
return (actives or newsletters)[0]["slug"]
def create_post_html(self, title: str, html_content: str,
status: str = "draft", feature_image: Optional[str] = None) -> Dict[str, Any]:
url = self.base + "posts/?source=html"
body = {"posts": [{"title": title, "html": html_content, "status": status,
**({"feature_image": feature_image} if feature_image else {})}]}
resp = requests.post(url, headers=self._headers(), json=body, timeout=30)
if resp.status_code >= 400:
raise RuntimeError(f"Ghost create error {resp.status_code}: {resp.text}")
return resp.json()["posts"][0]
def publish_post(self, post_id: str, updated_at: str,
newsletter_slug: Optional[str], email_segment: Optional[str] = None) -> Dict[str, Any]:
slug = self.pick_newsletter_slug(newsletter_slug)
params = [f"newsletter={requests.utils.quote(slug)}"]
if email_segment:
params.append(f"email_segment={requests.utils.quote(email_segment)}")
url = self.base + f"posts/{post_id}/?{'&'.join(params)}"
body = {"posts": [{"updated_at": updated_at, "status": "published"}]}
resp = requests.put(url, headers=self._headers(), json=body, timeout=30)
if resp.status_code >= 400:
raise RuntimeError(f"Ghost publish error {resp.status_code}: {resp.text}")
return resp.json()["posts"][0]
# -------------------- Helpers (URLs, images, dates) --------------------
def _sanitize_url(u: Optional[str]) -> Optional[str]:
if not u or not isinstance(u, str):
return None
u = u.strip().replace("\\", "/")
if not u:
return None
if u.startswith("//"):
u = "https:" + u
if " " in u:
u = u.replace(" ", "%20")
p = urlparse(u)
if p.scheme not in ("http", "https") or not p.netloc:
return None
return u
def _fmt_dt(iso_dt: Optional[str]) -> Optional[str]:
if not iso_dt:
return None
try:
dt = datetime.fromisoformat(iso_dt.replace("Z", "+00:00")).astimezone(timezone.utc)
return dt.strftime("%Y-%m-%d %H:%M UTC")
except Exception:
return iso_dt
def _egs_is_free_now(item: Dict[str, Any], now: Optional[datetime] = None) -> bool:
"""Vrai si l'offre est GRATUITE en ce moment (pas juste en promo payante)."""
now = now or datetime.now(timezone.utc)
# 1) Prix courant = 0 ?
price = (item.get("price") or {}).get("totalPrice") or {}
try:
# certains champs sont des int, d'autres des str
discount_price = int(str(price.get("discountPrice") or 0))
if discount_price == 0:
# si Epic renvoie 0, c'est bien 'Free Now'
return True
except Exception:
pass
# 2) Promo 100% active dans la fenêtre de dates ?
promos = (item.get("promotions") or {})
for group_key in ("promotionalOffers", "upcomingPromotionalOffers"):
for group in promos.get(group_key) or []:
for p in group.get("promotionalOffers") or []:
try:
start = datetime.fromisoformat(p.get("startDate").replace("Z", "+00:00"))
end = datetime.fromisoformat(p.get("endDate").replace("Z", "+00:00"))
except Exception:
start = end = None
discount = ((p.get("discountSetting") or {}).get("discountPercentage"))
# Plusieurs backends EGS renvoient 0 pour 'gratuit', d'autres 100.
# On accepte les deux mais on exige que la fenêtre englobe 'now'.
if discount in (0, 100):
if (start is None or start <= now) and (end is None or now <= end):
return True
return False
def _in_last_7_days(start_iso: Optional[str], end_iso: Optional[str]) -> bool:
now = datetime.now(timezone.utc)
window_start = now - timedelta(days=7)
try:
if start_iso:
s = datetime.fromisoformat(start_iso.replace("Z", "+00:00"))
else:
s = None
if end_iso:
e = datetime.fromisoformat(end_iso.replace("Z", "+00:00"))
else:
e = None
except Exception:
return False
# Overlap test: [s,e] intersects [now-7d, now]
s = s or now
e = e or now
return (s <= now) and (e >= window_start) or (s >= window_start and s <= now)
# Prime gaming
def _prime_key(it: Dict[str, Any]) -> str:
# priorité à la clé GraphQL si dispo, sinon fallback stable sur (title|url)
k = it.get("key")
if isinstance(k, str) and k.strip():
return k
t = (it.get("title") or "").strip().lower()
u = (it.get("url") or "").strip().lower()
return f"{t}|{u}"
def _pick_img(it: Dict[str, Any]) -> Optional[str]:
# quelques champs possibles venant du payload GraphQL
for cand in ("image", "hero", "boxArt", "thumb", "thumbnail", "cover"):
v = it.get(cand)
if isinstance(v, str) and v.startswith(("http://", "https://")):
return _sanitize_url(v)
return None
def _coerce_items_from_graphql(data: dict) -> list[dict]:
"""Extract Prime Gaming Item records from several response shapes."""
items: List[Dict[str, Any]] = []
def push_from_item(it: dict):
if not isinstance(it, dict):
return
iid = it.get("id") or it.get("itemId")
assets = it.get("assets") or {}
title = (assets.get("title") or it.get("title") or "").strip()
url = (
assets.get("externalClaimLink")
or it.get("claimUrl")
or f"https://gaming.amazon.com/home?itemId={iid}"
)
media = (assets.get("cardMedia") or {}).get("defaultMedia") or {}
img = media.get("src1x") or media.get("src2x") or None
end = None
offers = it.get("offers")
if isinstance(offers, list) and offers:
end = offers[0].get("endTime") or offers[0].get("endDate")
items.append(
{
"key": str(iid) if iid else title, # used for de-dupe & DB
"platform": "PRIME",
"title": title
or (it.get("game", {}) or {}).get("assets", {}).get("title")
or "Unknown",
"game": title or "",
"end": end,
"url": _sanitize_url(url),
"image": _sanitize_url(img),
"thumbnail": _sanitize_url(img),
"category": it.get("category"),
"isFGWP": it.get("isFGWP"),
}
)
# shape 1: {"data":{"items":{"items":[...]}}}
page_items = (((data or {}).get("data") or {}).get("items") or {}).get("items")
if isinstance(page_items, list):
for it in page_items:
push_from_item(it)
# shape 2: sections with ItemsPage
sections = [
"inGameLoot",
"expiring",
"popular",
"games",
"featuredContent",
"eventRow1",
"eventRow2",
]
root = (data or {}).get("data") or {}
for sec in sections:
arr = (root.get(sec) or {}).get("items")
if isinstance(arr, list):
for it in arr:
if isinstance(it, dict) and it.get("assets"):
push_from_item(it)
return items
async def _fetch_prime_offers_playwright() -> List[Dict[str, Any]]:
"""Open Prime Gaming in a real browser and capture GraphQL responses."""
if async_playwright is None:
raise RuntimeError(
"Playwright not installed. `pip install playwright` then `playwright install`"
)
async with async_playwright() as pw:
browser = await pw.chromium.launch(headless=True)
context = await browser.new_context(
user_agent=UA["User-Agent"],
extra_http_headers={k: v for k, v in UA.items() if k.lower() != "user-agent"},
)
page = await context.new_page()
offers: List[Dict[str, Any]] = []
async def handle_response(resp):
try:
if "/graphql" in resp.url and resp.request.method == "POST" and resp.status == 200:
data = await resp.json()
extracted = _coerce_items_from_graphql(data)
offers.extend(extracted)
except Exception:
LOG.exception("PRIME: parse error on %s", resp.url)
page.on("response", handle_response)
await page.goto(PRIME_HOME, wait_until="domcontentloaded", timeout=45000)
try:
await page.wait_for_load_state("networkidle", timeout=20000)
except Exception:
pass
# Nudge lazy sections
if not offers:
try:
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
await page.wait_for_timeout(2000)
await page.evaluate("window.scrollTo(0, 0)")
await page.wait_for_timeout(1000)
except Exception:
pass
await context.close()
await browser.close()
# de-dupe by key
uniq = {}
for it in offers:
k = it.get("key")
if k and k not in uniq:
uniq[k] = it
# keep only FULL_GAME (drop in-game loot) and items with a title + url
final = [
v
for v in uniq.values()
if (v.get("category") == "FULL_GAME" or v.get("isFGWP"))
and v.get("title")
and v.get("url")
]
return final
# Prime Gaming
PRIME_HOME = "https://gaming.amazon.com/"
def _prime_key(it: Dict[str, Any]) -> str:
# priorité à la clé GraphQL si dispo, sinon fallback stable sur (title|url)
k = it.get("key")
if isinstance(k, str) and k.strip():
return k
t = (it.get("title") or "").strip().lower()
u = (it.get("url") or "").strip().lower()
return f"{t}|{u}"
def _pick_img(it: Dict[str, Any]) -> Optional[str]:
# quelques champs possibles venant du payload GraphQL
for cand in ("image", "hero", "boxArt", "thumb", "thumbnail", "cover"):
v = it.get(cand)
if isinstance(v, str) and v.startswith(("http://", "https://")):
return _sanitize_url(v)
return None
async def fetch_primegaming_week() -> Optional[Dict[str, Any]]:
"""
Ouvre Prime Gaming (Playwright), capte les réponses GraphQL,
garde les FULL_GAME/FGWP et filtre sur 7 jours si possible.
Ne touche pas à la DB ici.
"""
offers = await _fetch_prime_offers_playwright() # fourni par toi
if not offers:
return None
# filtre temporel (si dates présentes)
windowed: List[Dict[str, Any]] = []
for it in offers:
start_iso = it.get("start") or it.get("startDate") or it.get("availabilityStartDate")
end_iso = it.get("end") or it.get("endDate") or it.get("availabilityEndDate")
if (start_iso or end_iso):
if _in_last_7_days(start_iso, end_iso):
windowed.append(it)
else:
windowed.append(it)
if not windowed:
return None
# dédup locale + clés calculées pour le filtrage DB ultérieur
uniq: Dict[str, Dict[str, Any]] = {}
for it in windowed:
k = _prime_key(it)
if k not in uniq:
it["_dedup_key"] = k
uniq[k] = it
items = list(uniq.values())
if not items:
return None
# Construire un bloc "brut" (les titres seront re-dérivés après filtrage DB)
hero = _pick_img(items[0])
return {
"title": "This Month on Prime Gaming",
"date": datetime.now(timezone.utc).isoformat(),
"url": PRIME_HOME,
"games": [ (it.get("title") or "").strip() for it in items if it.get("title") ][:12],
"image": hero,
"thumbnail": hero,
"_items": items, # pour filtrage DB/remember au call-site
}
# -------------------- EGS freebies (async, via official endpoint) --------------------
async def fetch_egs_week(session: aiohttp.ClientSession,
locale="en-US", country="US", allow_countries=None) -> List[Dict[str, Any]]:
allow = allow_countries or country
url = ("https://store-site-backend-static.ak.epicgames.com/freeGamesPromotions"
f"?locale={locale}&country={country}&allowCountries={allow}")
async with session.get(url, headers=UA, timeout=aiohttp.ClientTimeout(total=25)) as r:
r.raise_for_status()
data = await r.json()
out: List[Dict[str, Any]] = []
elements = (((data or {}).get("data") or {}).get("Catalog") or {}).get("searchStore", {}).get("elements", []) or []
for item in elements:
if not _egs_is_free_now(item):
continue
title = item.get("title")
slug = item.get("productSlug")
if not slug:
mappings = (item.get("catalogNs", {}) or {}).get("mappings") or []
if mappings:
slug = mappings[0].get("pageSlug")
url_item = _sanitize_url(f"https://store.epicgames.com/p/{slug}" if slug else None)
key_images = item.get("keyImages") or []
pref_order = ("OfferImageWide", "DieselStoreFrontWide", "DieselStoreFront", "OfferImageTall", "Thumbnail")
chosen = None
for t in pref_order:
chosen = next((k for k in key_images if (k.get("type") or "").lower() == t.lower()), None)
if chosen:
break
if not chosen and key_images:
chosen = key_images[0]
hero = _sanitize_url(chosen.get("url")) if chosen else None
thumb = hero
promos = (item.get("promotions") or {})
for section in ("promotionalOffers", "upcomingPromotionalOffers"):
lst = promos.get(section) or []
for entry in lst:
for p in entry.get("promotionalOffers", []) or []:
start = p.get("startDate")
end = p.get("endDate")
if _in_last_7_days(start, end):
out.append({
"platform": "EGS",
"title": title,
"start": start,
"end": end,
"url": url_item,
"image": hero,
"thumbnail": thumb,
"is_current": None,
})
# keep unique by (title,start)
seen = set()
uniq = []
for it in out:
k = (it.get("title"), it.get("start"))
if k not in seen:
seen.add(k)
uniq.append(it)
# sort by start desc
uniq.sort(key=lambda e: _fmt_dt(e.get("start")) or "", reverse=True)
return uniq
# -------------------- PS Plus (async, blog scraping FR) --------------------
PSPLUS_CATEGORY_URL_FR = "https://blog.fr.playstation.com/category/ps-plus/"
BASE_BLOG = "https://blog.fr.playstation.com/"
def _soup(text: str) -> BeautifulSoup:
return BeautifulSoup(text, "html.parser")
def _clean_titles(candidates: List[str], limit: int = 7) -> List[str]:
cleaned, seen = [], set()
for g in candidates:
g = re.sub(r"\s*\|\s*PS[45].*$", "", g).strip()
g = re.sub(r"\s*\(PS\+\)$", "", g).strip()
if not g or len(g) < 2:
continue
k = g.lower()
if k not in seen:
seen.add(k)
cleaned.append(g)
if len(cleaned) >= limit:
break
return cleaned
def _abs_url(u: Optional[str], base: str) -> Optional[str]:
if not u:
return None
if u.startswith("http://") or u.startswith("https://"):
return u
if u.startswith("/"):
return BASE_BLOG.rstrip("/") + u
return base.rstrip("/") + "/" + u
def _extract_best_image(art: BeautifulSoup, article_url: str) -> Tuple[Optional[str], Optional[str]]:
for sel in [
'meta[property="og:image"]',
'meta[name="og:image"]',
'meta[name="twitter:image"]',
'meta[property="twitter:image"]',
]:
tag = art.select_one(sel)
if tag and tag.get("content"):
img = _abs_url(tag["content"].strip(), article_url)
if img:
return (img, img)
tag = art.select_one('link[rel="image_src"]')
if tag and tag.get("href"):
img = _abs_url(tag["href"].strip(), article_url)
if img:
return (img, img)
first_img = art.select_one("article img[src]")
if first_img:
img = _abs_url(first_img.get("src"), article_url)
return (img, img)
return (None, None)
async def fetch_psplus_week(session: aiohttp.ClientSession) -> Optional[Dict[str, Any]]:
"""Dernier billet FR “Les jeux du mois PlayStation Plus …”. On le retient sil tombe dans T-7..T."""
async with session.get(PSPLUS_CATEGORY_URL_FR, headers=UA, timeout=aiohttp.ClientTimeout(total=25)) as r:
r.raise_for_status()
idx_html = await r.text()
idx = _soup(idx_html)
candidates: List[Tuple[str, str]] = []
for a in idx.select("a[href]"):
href = a.get("href") or ""
title = (a.get_text() or "").strip()
if not href.startswith(BASE_BLOG):
continue
if title.lower().startswith("les jeux du mois playstation plus"):
candidates.append((title, href))
if not candidates:
return None
latest: Optional[Tuple[datetime, str, str, BeautifulSoup]] = None
for title, href in candidates:
try:
async with session.get(href, headers=UA, timeout=aiohttp.ClientTimeout(total=25)) as r:
r.raise_for_status()
art_html = await r.text()
except Exception:
continue
art = _soup(art_html)
time_el = art.find("time")
date_iso = time_el.get("datetime") if time_el else None
if not date_iso:
continue
try:
dt_iso = datetime.fromisoformat(date_iso.replace("Z", "+00:00"))
except Exception:
continue
if latest is None or dt_iso > latest[0]:
latest = (dt_iso, title, href, art)
if latest is None:
return None
dt_iso, title, url, art = latest
now = datetime.now(timezone.utc)
if (now - dt_iso) > timedelta(days=7):
# Hors fenêtre -> pas de section PS+ cette semaine
return None
# extraire quelques titres listés dans larticle
games: List[str] = []
for node in art.find_all(string=True):
t = " ".join((node or "").strip().split())
if " | PS" in t and 0 < len(t) <= 120:
name = t.split(" | ")[0].strip("—:- ")
if 2 <= len(name.split()) <= 8:
games.append(name)
if not games:
for tag in art.select("strong, em"):
t = " ".join(tag.get_text(" ", strip=True).split())
if 2 <= len(t.split()) <= 8:
games.append(t)
image_url, thumb_url = _extract_best_image(art, url)
return {
"title": title,
"date": dt_iso.replace(tzinfo=timezone.utc).isoformat(),
"url": url,
"games": _clean_titles(games),
"image": _sanitize_url(image_url),
"thumbnail": _sanitize_url(thumb_url),
}
# -------------------- Xbox Game Pass (sync, SIGLS + DisplayCatalog) --------------------
def _xbox_locale_from_languages(languages: str) -> str:
if not languages or "-" not in languages:
return "en-US"
lang, region = languages.split("-", 1)
return f"{lang.lower()}-{region.upper()}"
def _slugify(s: str) -> str:
s = re.sub(r"[^a-zA-Z0-9]+", "-", s).strip("-").lower()
return s or "game"
def _pick_images_displaycatalog(prod: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
def candidates_of(prod):
cands: List[Dict[str, Any]] = []
lp = prod.get("LocalizedProperties") or []
if isinstance(lp, list) and lp:
images = lp[0].get("Images") or []
if isinstance(images, list):
cands.extend(images)
top = prod.get("Images") or []
if isinstance(top, list):
cands.extend(top)
return cands
def is_pref(img: Dict[str, Any]) -> bool:
purpose = (img.get("ImagePurpose") or img.get("Purpose") or "").lower()
return any(k in purpose for k in ("poster", "brandedkeyart", "superherowide", "superheroart"))
cands = candidates_of(prod)
if not cands:
return None, None
pref = [i for i in cands if is_pref(i)] or cands
try:
hero_raw = max(pref, key=lambda i: i.get("Width", 0)).get("Uri")
except Exception:
hero_raw = pref[0].get("Uri")
try:
thumb_raw = min(pref, key=lambda i: i.get("Width", 10**9)).get("Uri")
except Exception:
thumb_raw = hero_raw
hero = _sanitize_url(hero_raw)
thumb = _sanitize_url(thumb_raw)
return hero or thumb, thumb or hero
def fetch_xgp_recent(limit=20, market="US", languages="en-us") -> List[Dict[str, Any]]:
"""Best-effort: Recently added via SIGLS; fallback All games; enrich via DisplayCatalog."""
recently_ids = [
"3fdd7f57-7092-4b65-bd40-5a9dac1b2b84",
"61d6e1a1-735c-4b97-9d15-22ce8dfb0c03",
"7d2d3d36-1c52-4a63-8b3e-3b7ee4d0f62a",
]
all_games_id = "29a81209-df6f-41fd-a528-2ae6b91f719c"
def sigls(collection_id: str) -> List[str]:
try:
r = requests.get("https://catalog.gamepass.com/sigls/v2",
params={"language": languages, "market": market, "id": collection_id},
headers=UA, timeout=25)
r.raise_for_status()
payload = r.json()
except Exception:
return []
ids: List[str] = []
if isinstance(payload, list):
for obj in payload:
if isinstance(obj, dict) and "id" in obj and isinstance(obj["id"], str):
ids.append(obj["id"])
elif isinstance(obj, str):
ids.append(obj)
return ids
big_ids: List[str] = []
for cid in recently_ids:
ids = sigls(cid)
if ids:
big_ids = ids
break
if not big_ids:
big_ids = sigls(all_games_id)
if not big_ids:
return []
out: List[Dict[str, Any]] = []
seen = set()
for i in range(0, len(big_ids), 20):
chunk = big_ids[i:i+20]
try:
dc = requests.get("https://displaycatalog.mp.microsoft.com/v7.0/products",
params={"bigIds": ",".join(chunk), "market": market, "languages": languages},
headers=UA, timeout=25)
dc.raise_for_status()
data = dc.json()
except Exception:
continue
for p in (data.get("Products") or []):
pid = p.get("ProductId")
if not pid or pid in seen:
continue
seen.add(pid)
title = pid
lp = p.get("LocalizedProperties") or []
if lp:
title = lp[0].get("ProductTitle") or pid
locale_path = _xbox_locale_from_languages("fr-fr")
url = f"https://www.xbox.com/{locale_path}/games/store/{_slugify(title)}/{pid}"
hero, thumb = _pick_images_displaycatalog(p)
out.append({"platform": "XGP", "title": title, "productId": pid, "url": _sanitize_url(url),
"image": hero, "thumbnail": thumb})
return out[:limit]
# -------------------- HTML builder --------------------
YOUTUBE_EMBED_TMPL = (
'<div class="yt-container" style="position:relative;aspect-ratio:16/9;max-width:800px;margin:1rem 0">'
'<iframe src="https://www.youtube.com/embed/{vid}" title="YouTube video" loading="lazy" '
'style="position:absolute;inset:0;width:100%;height:100%;border:0" '
'allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share" '
'allowfullscreen></iframe></div>'
)
def render_cards_grid(items, columns=2, fixed_h=180, title_em=3, fallback_url="#"):
if not items:
return "<p>Aucun élément disponible.</p>"
items = items[:12]
col_pct = 100 // max(1, columns)
# explicit col widths => no leftover gap
colgroup = "".join([f'<col style="width:{col_pct}%;">' for _ in range(columns)])
# build cells
cells = []
for it in items:
title = (it.get("title") or it.get("name") or "Voir loffre").strip()
url = (it.get("url") or fallback_url).strip()
img = (it.get("image") or it.get("hero") or it.get("thumb") or "")
img = _sanitize_url(img) if img else ""
cells.append(f"""
<td width="{col_pct}%" style="padding:8px;vertical-align:top;width:{col_pct}%;">
<a href="{html.escape(url)}" style="text-decoration:none;color:inherit;display:block;">
{('<img src="'+html.escape(img)+'" alt="'+html.escape(title)+'" '
'style="width:100%;height:'+str(fixed_h)+'px;object-fit:cover;object-position:center;'
'border:0;border-radius:12px;display:block;background:#000;">') if img else ''}
<div style="margin-top:6px;font-weight:600;line-height:1.25;
min-height:{title_em}em; text-align:center; overflow:hidden;">{html.escape(title)}</div>
</a>
</td>""")
# pad last row
rows = []
for i in range(0, len(cells), columns):
row = cells[i:i+columns]
if len(row) < columns:
missing = columns - len(row)
row += [f'<td width="{col_pct}%" style="padding:8px;width:{col_pct}%;"></td>'] * missing
rows.append("<tr>" + "".join(row) + "</tr>")
# IMPORTANT: force fixed layout + zero spacing
return f"""
<table role="presentation" cellpadding="0" cellspacing="0" border="0" width="100%"
style="margin:8px 0; table-layout:fixed; border-collapse:collapse; border-spacing:0;">
<colgroup>{colgroup}</colgroup>
{''.join(rows)}
</table>"""
def build_html(egs: List[Dict[str, Any]],
psplus: Optional[Dict[str, Any]],
xgp: List[Dict[str, Any]],
prime: Optional[Dict[str, Any]] = None) -> Tuple[str, Optional[str]]:
parts: List[str] = []
now_local = datetime.now(TZ)
title_h2 = f"Presque Gratuit — semaine du {(now_local - timedelta(days=6)).strftime('%d/%m')} au {now_local.strftime('%d/%m')}"
parts.append(f"<h2>{html.escape(title_h2)}</h2>")
feature: Optional[str] = None
# --- Epic Games Store
parts.append("<h3>🎁 Epic Games Store — Jeux gratuits cette semaine</h3>")
if not egs:
parts.append("<p>Aucun jeu gratuit relevé sur la période.</p>")
else:
parts.append(render_cards_grid(egs, columns=CARD_COLUMNS, fixed_h=CARD_IMG_H, fallback_url="https://store.epicgames.com/"))
for it in egs:
# title = html.escape(it.get("title") or "")
# url = it.get("url") or ""
# start = _fmt_dt(it.get("start")) or "?"
# end = _fmt_dt(it.get("end")) or "?"
img = it.get("thumbnail")
if img and not feature:
feature = img
# parts.append("<div style='margin:12px 0'>")
# if img:
# parts.append(f'<p><img src="{html.escape(img)}" style="max-width:100%;height:auto;border:0"/></p>')
# parts.append(f"<p><strong>{title}</strong><br/>Période : {start} → {end}</p>")
# if url:
# parts.append(f'<p><a href="{html.escape(url)}">{html.escape(url)}</a></p>')
# parts.append("</div>")
# --- Prime Gaming
parts.append("<h3>🟣 Prime Gaming — Jeux offerts</h3>")
if not prime:
parts.append("<p>Aucun nouveau billet Prime Gaming cette semaine.</p>")
else:
# utilise limage de section comme “feature” si rien dautre na été choisi
# if prime.get("image") and not feature:
# feature = prime["image"]
# titre cliquable vers la page "This Month on Prime Gaming"
title = html.escape(prime.get("title") or "This Month on Prime Gaming")
url = prime.get("url") or "https://gaming.amazon.com/"
#parts.append(f"<p><strong><a href='{html.escape(url)}'>{title}</a></strong></p>")
# grille de cartes (image au-dessus, nom en dessous, lien vers loffre)
parts.append(render_cards_grid(prime.get("_items", []), columns=CARD_COLUMNS, fixed_h=CARD_IMG_H, fallback_url=url))
# --- Xbox Game Pass
parts.append("<h3>🟩 Xbox Game Pass — Récemment ajoutés</h3>")
if not xgp:
parts.append("<p>Pas d'entrées détectées.</p>")
else:
parts.append(render_cards_grid(xgp, columns=CARD_COLUMNS, fixed_h=CARD_IMG_H, fallback_url="https://www.xbox.com/xbox-game-pass"))
for it in xgp:
# title = html.escape(it.get("title") or it.get("productId") or "")
# url = it.get("url") or ""
# img = it.get("thumbnail")
if img and not feature:
feature = img
# parts.append("<div style='margin:12px 0'>")
# if img:
# parts.append(f'<p><img src="{html.escape(img)}" style="max-width:100%;height:auto;border:0"/></p>')
# parts.append(f"<p><strong>{title}</strong></p>")
# if url:
# parts.append(f'<p><a href="{html.escape(url)}">{html.escape(url)}</a></p>')
# parts.append("</div>")
# --- PlayStation Plus
parts.append("<h3>🎮 PlayStation Plus — Jeux du mois</h3>")
if not psplus:
parts.append("<p>Aucune nouvelle annonce PS+.</p>")
else:
if psplus.get("image") and not feature:
feature = psplus["image"]
parts.append("<div style='margin:12px 0'>")
if psplus.get("image"):
parts.append(f'<p><img src="{html.escape(psplus["image"])}" style="max-width:100%;height:auto;border:0"/></p>')
title = html.escape(psplus.get("title") or "PlayStation Plus — Jeux du mois")
url = psplus.get("url")
if url:
parts.append(f"<p><strong><a href='{html.escape(url)}'>{title}</a></strong></p>")
else:
parts.append(f"<p><strong>{title}</strong></p>")
games = psplus.get("games") or []
if games:
parts.append("<ul>" + "".join(f"<li>{html.escape(g)}</li>" for g in games) + "</ul>")
parts.append("</div>")
parts.append("<hr><p><em>Newsletter hebdomadaire — envoyée automatiquement chaque dimanche à midi.</em></p>")
return "\n".join(parts), feature
# -------------------- Orchestrator --------------------
async def run_weekly():
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
admin_url = os.environ["GHOST_ADMIN_URL"]
admin_key = os.environ["GHOST_ADMIN_KEY"]
newsletter_slug = os.environ.get("GHOST_NEWSLETTER_SLUG")
ghost = GhostAdmin(admin_url, admin_key)
# Fetch all sources (with timeouts and isolation)
egs_items: List[Dict[str, Any]] = []
psplus_data: Optional[Dict[str, Any]] = None
xgp_items: List[Dict[str, Any]] = []
prime_data: Optional[Dict[str, Any]] = None
store = Storage()
try:
async with aiohttp.ClientSession(headers=UA) as s:
# EGS
try:
locale = os.environ.get("EGS_LOCALE", "en-US")
country = os.environ.get("EGS_COUNTRY", "US")
allow = os.environ.get("EGS_ALLOW_COUNTRIES", country)
egs_items = await fetch_egs_week(s, locale=locale, country=country, allow_countries=allow)
except Exception as e:
LOG.warning("EGS fetch failed: %s", e)
# PS Plus
try:
psplus_data = await fetch_psplus_week(s)
except Exception as e:
LOG.warning("PS+ fetch failed: %s", e)
except Exception as e:
LOG.warning("HTTP session failed: %s", e)
#prime
try:
prime_data = await fetch_primegaming_week()
except Exception as e:
LOG.warning("prime fetch failed: %s", e)
if prime_data:
# 2.a Filtre DB (ne garder que ce qui na jamais été publié)
fresh = []
for it in prime_data["_items"]:
k = it.get("_dedup_key") or _prime_key(it)
if not store.seen(k): # ← usage DB (filtrage)
it["_dedup_key"] = k
fresh.append(it)
if fresh:
# régénère la liste "games" et l'image en fonction du filtrage
prime_data["_items"] = fresh
prime_data["games"] = [ (it.get("title") or "").strip() for it in fresh if it.get("title") ][:12]
prime_data["image"] = prime_data.get("image") or _pick_img(fresh[0])
prime_data["thumbnail"] = prime_data["image"]
else:
prime_data = None # rien de neuf → pas de section
# XGP (requests, sync)
try:
market = os.environ.get("XGP_MARKET", "US")
languages = os.environ.get("XGP_LANGUAGES", "en-us")
xgp_items = fetch_xgp_recent(limit=20, market=market, languages=languages)
except Exception as e:
LOG.warning("XGP fetch failed: %s", e)
# Build HTML
def keep_new(items, key_fn):
fresh = []
for it in items:
k = key_fn(it)
if not store.seen(k):
it["_dedup_key"] = k
fresh.append(it)
return fresh
# Fetch your three sources as you already do:
xgp_items = keep_new(xgp_items, xgp_key)
html_body, feature = build_html(egs_items, psplus_data, xgp_items, prime_data)
# Title (FR)
start = (datetime.now(TZ) - timedelta(days=6)).strftime("%d/%m/%Y")
end = datetime.now(TZ).strftime("%d/%m/%Y")
title = f"Récap hebdo — EGS, Prime Gaming, PS Plus, Game Pass ({start}{end})"
# Create + publish + email
created = ghost.create_post_html(title, html_body, status="draft", feature_image=feature)
#ghost.publish_post(created["id"], created["updated_at"], newsletter_slug=newsletter_slug)
post_id = created["id"]
if xgp_items and post_id:
for it in xgp_items:
store.remember("xgp", it["_dedup_key"], post_id)
# 2.b Marquer comme publié (clé + post_id)
if prime_data and post_id:
store.bulk_remember("prime", [(it["_dedup_key"], post_id) for it in prime_data["_items"]])
LOG.info("Published weekly newsletter: %s", created.get("url"))
# -------------------- Scheduler --------------------
async def run_forever_sunday_noon():
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
"""Run at next Sunday 12:00 Europe/Brussels, then every 7 days."""
while True:
now = datetime.now(TZ)
# days until Sunday (weekday(): Monday=0..Sunday=6)
days = (6 - now.weekday()) % 7
target = (now + timedelta(days=days)).replace(hour=12, minute=0, second=0, microsecond=0)
if target <= now:
target = target + timedelta(days=7)
wait = (target - now).total_seconds()
LOG.info("Next run at %s (in %.0f min)", target.isoformat(), wait/60)
await asyncio.sleep(wait)
try:
await run_weekly()
except Exception as e:
LOG.exception("Weekly run failed: %s", e)
# then sleep 7 days
await asyncio.sleep(7 * 24 * 3600)
# -------------------- Entrypoint --------------------
async def main():
setuplogger()
parser = argparse.ArgumentParser()
parser.add_argument("--runonce", action="store_true", help="Run now and exit (no scheduler)")
args = parser.parse_args()
if args.runonce:
await run_weekly()
else:
await run_forever_sunday_noon()
if __name__ == "__main__":
asyncio.run(main())