Files
Substack_JV/presquegratos.py
2025-09-29 09:39:41 +02:00

644 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# weekly_games_roundup.py
# -*- coding: utf-8 -*-
import asyncio
import argparse
import dataclasses
import html
import json
import logging
import os
import random
import re
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlparse, parse_qs
import aiohttp
import jwt # PyJWT
import requests
from bs4 import BeautifulSoup
from logging.handlers import RotatingFileHandler
from datetime import datetime, timedelta, timezone
import zoneinfo
from storage import Storage
from keys import xgp_key
LOG = logging.getLogger("bot_weekly")
LOG_PATTERN = logging.Formatter("%(asctime)s:%(levelname)s: [%(filename)s] %(message)s")
def setuplogger():
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(LOG_PATTERN)
stream_handler.setLevel(logging.DEBUG)
file_handler = RotatingFileHandler("bot_weekly.log", "a", 1_000_000, 1)
file_handler.setFormatter(LOG_PATTERN)
LOG.setLevel(logging.DEBUG)
LOG.addHandler(stream_handler)
LOG.addHandler(file_handler)
TZ = zoneinfo.ZoneInfo("Europe/Brussels")
UA = {"User-Agent": "Mozilla/5.0 (compatible; weekly-games-roundup/1.0)"}
# -------------------- Ghost Admin client --------------------
class GhostAdmin:
def __init__(self, admin_url: str, admin_key: str, accept_version: str = "v6.0"):
self.base = admin_url.rstrip("/") + "/"
self.key_id, self.key_secret_hex = admin_key.split(":")
self.accept_version = accept_version
def _jwt(self) -> str:
iat = int(time.time())
payload = {"iat": iat, "exp": iat + 5 * 60, "aud": "/admin/"}
headers = {"alg": "HS256", "typ": "JWT", "kid": self.key_id}
token = jwt.encode(payload, bytes.fromhex(self.key_secret_hex), algorithm="HS256", headers=headers)
return token if isinstance(token, str) else token.decode("utf-8")
def _headers(self):
return {"Authorization": f"Ghost {self._jwt()}",
"Accept-Version": self.accept_version,
"Content-Type": "application/json"}
def pick_newsletter_slug(self, preferred_slug: Optional[str]) -> str:
if preferred_slug:
return preferred_slug
url = self.base + "newsletters/"
resp = requests.get(url, headers=self._headers(), timeout=20)
resp.raise_for_status()
newsletters = resp.json().get("newsletters", [])
if not newsletters:
raise RuntimeError("No newsletters configured in Ghost.")
actives = [n for n in newsletters if n.get("status") == "active"]
for n in actives:
if n.get("is_default"):
return n["slug"]
return (actives or newsletters)[0]["slug"]
def create_post_html(self, title: str, html_content: str,
status: str = "draft", feature_image: Optional[str] = None) -> Dict[str, Any]:
url = self.base + "posts/?source=html"
body = {"posts": [{"title": title, "html": html_content, "status": status,
**({"feature_image": feature_image} if feature_image else {})}]}
resp = requests.post(url, headers=self._headers(), json=body, timeout=30)
if resp.status_code >= 400:
raise RuntimeError(f"Ghost create error {resp.status_code}: {resp.text}")
return resp.json()["posts"][0]
def publish_post(self, post_id: str, updated_at: str,
newsletter_slug: Optional[str], email_segment: Optional[str] = None) -> Dict[str, Any]:
slug = self.pick_newsletter_slug(newsletter_slug)
params = [f"newsletter={requests.utils.quote(slug)}"]
if email_segment:
params.append(f"email_segment={requests.utils.quote(email_segment)}")
url = self.base + f"posts/{post_id}/?{'&'.join(params)}"
body = {"posts": [{"updated_at": updated_at, "status": "published"}]}
resp = requests.put(url, headers=self._headers(), json=body, timeout=30)
if resp.status_code >= 400:
raise RuntimeError(f"Ghost publish error {resp.status_code}: {resp.text}")
return resp.json()["posts"][0]
# -------------------- Helpers (URLs, images, dates) --------------------
def _sanitize_url(u: Optional[str]) -> Optional[str]:
if not u or not isinstance(u, str):
return None
u = u.strip().replace("\\", "/")
if not u:
return None
if u.startswith("//"):
u = "https:" + u
if " " in u:
u = u.replace(" ", "%20")
p = urlparse(u)
if p.scheme not in ("http", "https") or not p.netloc:
return None
return u
def _fmt_dt(iso_dt: Optional[str]) -> Optional[str]:
if not iso_dt:
return None
try:
dt = datetime.fromisoformat(iso_dt.replace("Z", "+00:00")).astimezone(timezone.utc)
return dt.strftime("%Y-%m-%d %H:%M UTC")
except Exception:
return iso_dt
def _in_last_7_days(start_iso: Optional[str], end_iso: Optional[str]) -> bool:
now = datetime.now(timezone.utc)
window_start = now - timedelta(days=7)
try:
if start_iso:
s = datetime.fromisoformat(start_iso.replace("Z", "+00:00"))
else:
s = None
if end_iso:
e = datetime.fromisoformat(end_iso.replace("Z", "+00:00"))
else:
e = None
except Exception:
return False
# Overlap test: [s,e] intersects [now-7d, now]
s = s or now
e = e or now
return (s <= now) and (e >= window_start) or (s >= window_start and s <= now)
# -------------------- EGS freebies (async, via official endpoint) --------------------
async def fetch_egs_week(session: aiohttp.ClientSession,
locale="en-US", country="US", allow_countries=None) -> List[Dict[str, Any]]:
allow = allow_countries or country
url = ("https://store-site-backend-static.ak.epicgames.com/freeGamesPromotions"
f"?locale={locale}&country={country}&allowCountries={allow}")
async with session.get(url, headers=UA, timeout=aiohttp.ClientTimeout(total=25)) as r:
r.raise_for_status()
data = await r.json()
out: List[Dict[str, Any]] = []
elements = (((data or {}).get("data") or {}).get("Catalog") or {}).get("searchStore", {}).get("elements", []) or []
for item in elements:
title = item.get("title")
slug = item.get("productSlug")
if not slug:
mappings = (item.get("catalogNs", {}) or {}).get("mappings") or []
if mappings:
slug = mappings[0].get("pageSlug")
url_item = _sanitize_url(f"https://store.epicgames.com/p/{slug}" if slug else None)
key_images = item.get("keyImages") or []
pref_order = ("OfferImageWide", "DieselStoreFrontWide", "DieselStoreFront", "OfferImageTall", "Thumbnail")
chosen = None
for t in pref_order:
chosen = next((k for k in key_images if (k.get("type") or "").lower() == t.lower()), None)
if chosen:
break
if not chosen and key_images:
chosen = key_images[0]
hero = _sanitize_url(chosen.get("url")) if chosen else None
thumb = hero
promos = (item.get("promotions") or {})
for section in ("promotionalOffers", "upcomingPromotionalOffers"):
lst = promos.get(section) or []
for entry in lst:
for p in entry.get("promotionalOffers", []) or []:
start = p.get("startDate")
end = p.get("endDate")
if _in_last_7_days(start, end):
out.append({
"platform": "EGS",
"title": title,
"start": start,
"end": end,
"url": url_item,
"image": hero,
"thumbnail": thumb,
"is_current": None,
})
# keep unique by (title,start)
seen = set()
uniq = []
for it in out:
k = (it.get("title"), it.get("start"))
if k not in seen:
seen.add(k)
uniq.append(it)
# sort by start desc
uniq.sort(key=lambda e: _fmt_dt(e.get("start")) or "", reverse=True)
return uniq
# -------------------- PS Plus (async, blog scraping FR) --------------------
PSPLUS_CATEGORY_URL_FR = "https://blog.fr.playstation.com/category/ps-plus/"
BASE_BLOG = "https://blog.fr.playstation.com/"
def _soup(text: str) -> BeautifulSoup:
return BeautifulSoup(text, "html.parser")
def _clean_titles(candidates: List[str], limit: int = 7) -> List[str]:
cleaned, seen = [], set()
for g in candidates:
g = re.sub(r"\s*\|\s*PS[45].*$", "", g).strip()
g = re.sub(r"\s*\(PS\+\)$", "", g).strip()
if not g or len(g) < 2:
continue
k = g.lower()
if k not in seen:
seen.add(k)
cleaned.append(g)
if len(cleaned) >= limit:
break
return cleaned
def _abs_url(u: Optional[str], base: str) -> Optional[str]:
if not u:
return None
if u.startswith("http://") or u.startswith("https://"):
return u
if u.startswith("/"):
return BASE_BLOG.rstrip("/") + u
return base.rstrip("/") + "/" + u
def _extract_best_image(art: BeautifulSoup, article_url: str) -> Tuple[Optional[str], Optional[str]]:
for sel in [
'meta[property="og:image"]',
'meta[name="og:image"]',
'meta[name="twitter:image"]',
'meta[property="twitter:image"]',
]:
tag = art.select_one(sel)
if tag and tag.get("content"):
img = _abs_url(tag["content"].strip(), article_url)
if img:
return (img, img)
tag = art.select_one('link[rel="image_src"]')
if tag and tag.get("href"):
img = _abs_url(tag["href"].strip(), article_url)
if img:
return (img, img)
first_img = art.select_one("article img[src]")
if first_img:
img = _abs_url(first_img.get("src"), article_url)
return (img, img)
return (None, None)
async def fetch_psplus_week(session: aiohttp.ClientSession) -> Optional[Dict[str, Any]]:
"""Dernier billet FR “Les jeux du mois PlayStation Plus …”. On le retient sil tombe dans T-7..T."""
async with session.get(PSPLUS_CATEGORY_URL_FR, headers=UA, timeout=aiohttp.ClientTimeout(total=25)) as r:
r.raise_for_status()
idx_html = await r.text()
idx = _soup(idx_html)
candidates: List[Tuple[str, str]] = []
for a in idx.select("a[href]"):
href = a.get("href") or ""
title = (a.get_text() or "").strip()
if not href.startswith(BASE_BLOG):
continue
if title.lower().startswith("les jeux du mois playstation plus"):
candidates.append((title, href))
if not candidates:
return None
latest: Optional[Tuple[datetime, str, str, BeautifulSoup]] = None
for title, href in candidates:
try:
async with session.get(href, headers=UA, timeout=aiohttp.ClientTimeout(total=25)) as r:
r.raise_for_status()
art_html = await r.text()
except Exception:
continue
art = _soup(art_html)
time_el = art.find("time")
date_iso = time_el.get("datetime") if time_el else None
if not date_iso:
continue
try:
dt_iso = datetime.fromisoformat(date_iso.replace("Z", "+00:00"))
except Exception:
continue
if latest is None or dt_iso > latest[0]:
latest = (dt_iso, title, href, art)
if latest is None:
return None
dt_iso, title, url, art = latest
now = datetime.now(timezone.utc)
if (now - dt_iso) > timedelta(days=7):
# Hors fenêtre -> pas de section PS+ cette semaine
return None
# extraire quelques titres listés dans larticle
games: List[str] = []
for node in art.find_all(string=True):
t = " ".join((node or "").strip().split())
if " | PS" in t and 0 < len(t) <= 120:
name = t.split(" | ")[0].strip("—:- ")
if 2 <= len(name.split()) <= 8:
games.append(name)
if not games:
for tag in art.select("strong, em"):
t = " ".join(tag.get_text(" ", strip=True).split())
if 2 <= len(t.split()) <= 8:
games.append(t)
image_url, thumb_url = _extract_best_image(art, url)
return {
"title": title,
"date": dt_iso.replace(tzinfo=timezone.utc).isoformat(),
"url": url,
"games": _clean_titles(games),
"image": _sanitize_url(image_url),
"thumbnail": _sanitize_url(thumb_url),
}
# -------------------- Xbox Game Pass (sync, SIGLS + DisplayCatalog) --------------------
def _xbox_locale_from_languages(languages: str) -> str:
if not languages or "-" not in languages:
return "en-US"
lang, region = languages.split("-", 1)
return f"{lang.lower()}-{region.upper()}"
def _slugify(s: str) -> str:
s = re.sub(r"[^a-zA-Z0-9]+", "-", s).strip("-").lower()
return s or "game"
def _pick_images_displaycatalog(prod: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
def candidates_of(prod):
cands: List[Dict[str, Any]] = []
lp = prod.get("LocalizedProperties") or []
if isinstance(lp, list) and lp:
images = lp[0].get("Images") or []
if isinstance(images, list):
cands.extend(images)
top = prod.get("Images") or []
if isinstance(top, list):
cands.extend(top)
return cands
def is_pref(img: Dict[str, Any]) -> bool:
purpose = (img.get("ImagePurpose") or img.get("Purpose") or "").lower()
return any(k in purpose for k in ("poster", "brandedkeyart", "superherowide", "superheroart"))
cands = candidates_of(prod)
if not cands:
return None, None
pref = [i for i in cands if is_pref(i)] or cands
try:
hero_raw = max(pref, key=lambda i: i.get("Width", 0)).get("Uri")
except Exception:
hero_raw = pref[0].get("Uri")
try:
thumb_raw = min(pref, key=lambda i: i.get("Width", 10**9)).get("Uri")
except Exception:
thumb_raw = hero_raw
hero = _sanitize_url(hero_raw)
thumb = _sanitize_url(thumb_raw)
return hero or thumb, thumb or hero
def fetch_xgp_recent(limit=20, market="US", languages="en-us") -> List[Dict[str, Any]]:
"""Best-effort: Recently added via SIGLS; fallback All games; enrich via DisplayCatalog."""
recently_ids = [
"3fdd7f57-7092-4b65-bd40-5a9dac1b2b84",
"61d6e1a1-735c-4b97-9d15-22ce8dfb0c03",
"7d2d3d36-1c52-4a63-8b3e-3b7ee4d0f62a",
]
all_games_id = "29a81209-df6f-41fd-a528-2ae6b91f719c"
def sigls(collection_id: str) -> List[str]:
try:
r = requests.get("https://catalog.gamepass.com/sigls/v2",
params={"language": languages, "market": market, "id": collection_id},
headers=UA, timeout=25)
r.raise_for_status()
payload = r.json()
except Exception:
return []
ids: List[str] = []
if isinstance(payload, list):
for obj in payload:
if isinstance(obj, dict) and "id" in obj and isinstance(obj["id"], str):
ids.append(obj["id"])
elif isinstance(obj, str):
ids.append(obj)
return ids
big_ids: List[str] = []
for cid in recently_ids:
ids = sigls(cid)
if ids:
big_ids = ids
break
if not big_ids:
big_ids = sigls(all_games_id)
if not big_ids:
return []
out: List[Dict[str, Any]] = []
seen = set()
for i in range(0, len(big_ids), 20):
chunk = big_ids[i:i+20]
try:
dc = requests.get("https://displaycatalog.mp.microsoft.com/v7.0/products",
params={"bigIds": ",".join(chunk), "market": market, "languages": languages},
headers=UA, timeout=25)
dc.raise_for_status()
data = dc.json()
except Exception:
continue
for p in (data.get("Products") or []):
pid = p.get("ProductId")
if not pid or pid in seen:
continue
seen.add(pid)
title = pid
lp = p.get("LocalizedProperties") or []
if lp:
title = lp[0].get("ProductTitle") or pid
locale_path = _xbox_locale_from_languages("fr-fr")
url = f"https://www.xbox.com/{locale_path}/games/store/{_slugify(title)}/{pid}"
hero, thumb = _pick_images_displaycatalog(p)
out.append({"platform": "XGP", "title": title, "productId": pid, "url": _sanitize_url(url),
"image": hero, "thumbnail": thumb})
return out[:limit]
# -------------------- HTML builder --------------------
YOUTUBE_EMBED_TMPL = (
'<div class="yt-container" style="position:relative;aspect-ratio:16/9;max-width:800px;margin:1rem 0">'
'<iframe src="https://www.youtube.com/embed/{vid}" title="YouTube video" loading="lazy" '
'style="position:absolute;inset:0;width:100%;height:100%;border:0" '
'allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share" '
'allowfullscreen></iframe></div>'
)
def build_html(egs: List[Dict[str, Any]],
psplus: Optional[Dict[str, Any]],
xgp: List[Dict[str, Any]]) -> Tuple[str, Optional[str]]:
parts: List[str] = []
now_local = datetime.now(TZ)
title_h2 = f"Presque Gratuit — semaine du {(now_local - timedelta(days=6)).strftime('%d/%m')} au {now_local.strftime('%d/%m')}"
parts.append(f"<h2>{html.escape(title_h2)}</h2>")
feature: Optional[str] = None
# --- Epic Games Store
parts.append("<h3>🎁 Epic Games Store — Jeux gratuits (7 jours)</h3>")
if not egs:
parts.append("<p>Aucun jeu gratuit relevé sur la période.</p>")
else:
for it in egs:
title = html.escape(it.get("title") or "")
url = it.get("url") or ""
start = _fmt_dt(it.get("start")) or "?"
end = _fmt_dt(it.get("end")) or "?"
img = it.get("thumbnail")
if img and not feature:
feature = img
parts.append("<div style='margin:12px 0'>")
if img:
parts.append(f'<p><img src="{html.escape(img)}" style="max-width:100%;height:auto;border:0"/></p>')
parts.append(f"<p><strong>{title}</strong><br/>Période : {start}{end}</p>")
if url:
parts.append(f'<p><a href="{html.escape(url)}">{html.escape(url)}</a></p>')
parts.append("</div>")
# --- PlayStation Plus
parts.append("<h3>🎮 PlayStation Plus — Jeux du mois (si annoncé cette semaine)</h3>")
if not psplus:
parts.append("<p>Aucun nouveau billet PS+ dans la fenêtre des 7 jours.</p>")
else:
if psplus.get("image") and not feature:
feature = psplus["image"]
parts.append("<div style='margin:12px 0'>")
if psplus.get("image"):
parts.append(f'<p><img src="{html.escape(psplus["image"])}" style="max-width:100%;height:auto;border:0"/></p>')
title = html.escape(psplus.get("title") or "PlayStation Plus — Jeux du mois")
url = psplus.get("url")
if url:
parts.append(f"<p><strong><a href='{html.escape(url)}'>{title}</a></strong></p>")
else:
parts.append(f"<p><strong>{title}</strong></p>")
games = psplus.get("games") or []
if games:
parts.append("<ul>" + "".join(f"<li>{html.escape(g)}</li>" for g in games) + "</ul>")
parts.append("</div>")
# --- Xbox Game Pass
parts.append("<h3>🟩 Xbox Game Pass — Récemment ajoutés</h3>")
if not xgp:
parts.append("<p>Pas d'entrées détectées.</p>")
else:
for it in xgp:
title = html.escape(it.get("title") or it.get("productId") or "")
url = it.get("url") or ""
img = it.get("thumbnail")
if img and not feature:
feature = img
parts.append("<div style='margin:12px 0'>")
if img:
parts.append(f'<p><img src="{html.escape(img)}" style="max-width:100%;height:auto;border:0"/></p>')
parts.append(f"<p><strong>{title}</strong></p>")
if url:
parts.append(f'<p><a href="{html.escape(url)}">{html.escape(url)}</a></p>')
parts.append("</div>")
parts.append("<hr><p><em>Newsletter hebdomadaire — envoyée automatiquement chaque dimanche à midi.</em></p>")
return "\n".join(parts), feature
# -------------------- Orchestrator --------------------
async def run_weekly():
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
admin_url = os.environ["GHOST_ADMIN_URL"]
admin_key = os.environ["GHOST_ADMIN_KEY"]
newsletter_slug = os.environ.get("GHOST_NEWSLETTER_SLUG")
ghost = GhostAdmin(admin_url, admin_key)
# Fetch all sources (with timeouts and isolation)
egs_items: List[Dict[str, Any]] = []
psplus_data: Optional[Dict[str, Any]] = None
xgp_items: List[Dict[str, Any]] = []
try:
async with aiohttp.ClientSession(headers=UA) as s:
# EGS
try:
locale = os.environ.get("EGS_LOCALE", "en-US")
country = os.environ.get("EGS_COUNTRY", "US")
allow = os.environ.get("EGS_ALLOW_COUNTRIES", country)
egs_items = await fetch_egs_week(s, locale=locale, country=country, allow_countries=allow)
except Exception as e:
LOG.warning("EGS fetch failed: %s", e)
# PS Plus
try:
psplus_data = await fetch_psplus_week(s)
except Exception as e:
LOG.warning("PS+ fetch failed: %s", e)
except Exception as e:
LOG.warning("HTTP session failed: %s", e)
# XGP (requests, sync)
try:
market = os.environ.get("XGP_MARKET", "US")
languages = os.environ.get("XGP_LANGUAGES", "en-us")
xgp_items = fetch_xgp_recent(limit=20, market=market, languages=languages)
except Exception as e:
LOG.warning("XGP fetch failed: %s", e)
# Build HTML
store = Storage()
def keep_new(items, key_fn):
fresh = []
for it in items:
k = key_fn(it)
if not store.seen(k):
it["_dedup_key"] = k
fresh.append(it)
return fresh
# Fetch your three sources as you already do:
xgp_items = keep_new(xgp_items, xgp_key)
html_body, feature = build_html(egs_items, psplus_data, xgp_items)
# Title (FR)
start = (datetime.now(TZ) - timedelta(days=6)).strftime("%d/%m/%Y")
end = datetime.now(TZ).strftime("%d/%m/%Y")
title = f"Récap hebdo — EGS, PS Plus, Game Pass ({start}{end})"
# Create + publish + email
created = ghost.create_post_html(title, html_body, status="draft", feature_image=feature)
#ghost.publish_post(created["id"], created["updated_at"], newsletter_slug=newsletter_slug)
post_id = created["id"]
for it in xgp_items:
store.remember("xgp", it["_dedup_key"], post_id)
LOG.info("Published weekly newsletter: %s", created.get("url"))
# -------------------- Scheduler --------------------
async def run_forever_sunday_noon():
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
"""Run at next Sunday 12:00 Europe/Brussels, then every 7 days."""
while True:
now = datetime.now(TZ)
# days until Sunday (weekday(): Monday=0..Sunday=6)
days = (6 - now.weekday()) % 7
target = (now + timedelta(days=days)).replace(hour=12, minute=0, second=0, microsecond=0)
if target <= now:
target = target + timedelta(days=7)
wait = (target - now).total_seconds()
LOG.info("Next run at %s (in %.0f min)", target.isoformat(), wait/60)
await asyncio.sleep(wait)
try:
await run_weekly()
except Exception as e:
LOG.exception("Weekly run failed: %s", e)
# then sleep 7 days
await asyncio.sleep(7 * 24 * 3600)
# -------------------- Entrypoint --------------------
async def main():
setuplogger()
parser = argparse.ArgumentParser()
parser.add_argument("--runonce", action="store_true", help="Run now and exit (no scheduler)")
args = parser.parse_args()
if args.runonce:
await run_weekly()
else:
await run_forever_sunday_noon()
if __name__ == "__main__":
asyncio.run(main())