1040 lines
39 KiB
Python
1040 lines
39 KiB
Python
# weekly_games_roundup.py
|
||
# -*- coding: utf-8 -*-
|
||
import asyncio
|
||
import argparse
|
||
import dataclasses
|
||
import html
|
||
import json
|
||
import logging
|
||
import os
|
||
import random
|
||
import re
|
||
import time
|
||
from dataclasses import dataclass
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
from urllib.parse import urlparse, parse_qs
|
||
|
||
import aiohttp
|
||
import jwt # PyJWT
|
||
import requests
|
||
from bs4 import BeautifulSoup
|
||
from logging.handlers import RotatingFileHandler
|
||
from datetime import datetime, timedelta, timezone
|
||
import zoneinfo
|
||
from storage import Storage
|
||
from keys import xgp_key
|
||
|
||
|
||
try:
|
||
from playwright.async_api import async_playwright
|
||
except Exception:
|
||
async_playwright = None
|
||
|
||
LOG = logging.getLogger("bot_weekly")
|
||
LOG_PATTERN = logging.Formatter("%(asctime)s:%(levelname)s: [%(filename)s] %(message)s")
|
||
|
||
PRIME_HOME = "https://gaming.amazon.com/home"
|
||
|
||
CARD_COLUMNS = 2
|
||
CARD_IMG_H = 180 # même hauteur partout
|
||
CARD_TITLE_EM = 3 # ~2 lignes de texte mini
|
||
|
||
def setuplogger():
|
||
stream_handler = logging.StreamHandler()
|
||
stream_handler.setFormatter(LOG_PATTERN)
|
||
stream_handler.setLevel(logging.DEBUG)
|
||
|
||
file_handler = RotatingFileHandler("bot_weekly.log", "a", 1_000_000, 1)
|
||
file_handler.setFormatter(LOG_PATTERN)
|
||
|
||
LOG.setLevel(logging.DEBUG)
|
||
LOG.addHandler(stream_handler)
|
||
LOG.addHandler(file_handler)
|
||
|
||
|
||
TZ = zoneinfo.ZoneInfo("Europe/Brussels")
|
||
UA = {"User-Agent": "Mozilla/5.0 (compatible; weekly-games-roundup/1.0)"}
|
||
|
||
# -------------------- Ghost Admin client --------------------
|
||
|
||
class GhostAdmin:
|
||
def __init__(self, admin_url: str, admin_key: str, accept_version: str = "v6.0"):
|
||
self.base = admin_url.rstrip("/") + "/"
|
||
self.key_id, self.key_secret_hex = admin_key.split(":")
|
||
self.accept_version = accept_version
|
||
|
||
def _jwt(self) -> str:
|
||
iat = int(time.time())
|
||
payload = {"iat": iat, "exp": iat + 5 * 60, "aud": "/admin/"}
|
||
headers = {"alg": "HS256", "typ": "JWT", "kid": self.key_id}
|
||
token = jwt.encode(payload, bytes.fromhex(self.key_secret_hex), algorithm="HS256", headers=headers)
|
||
return token if isinstance(token, str) else token.decode("utf-8")
|
||
|
||
def _headers(self):
|
||
return {"Authorization": f"Ghost {self._jwt()}",
|
||
"Accept-Version": self.accept_version,
|
||
"Content-Type": "application/json"}
|
||
|
||
def pick_newsletter_slug(self, preferred_slug: Optional[str]) -> str:
|
||
if preferred_slug:
|
||
return preferred_slug
|
||
url = self.base + "newsletters/"
|
||
resp = requests.get(url, headers=self._headers(), timeout=20)
|
||
resp.raise_for_status()
|
||
newsletters = resp.json().get("newsletters", [])
|
||
if not newsletters:
|
||
raise RuntimeError("No newsletters configured in Ghost.")
|
||
actives = [n for n in newsletters if n.get("status") == "active"]
|
||
for n in actives:
|
||
if n.get("is_default"):
|
||
return n["slug"]
|
||
return (actives or newsletters)[0]["slug"]
|
||
|
||
def create_post_html(self, title: str, html_content: str,
|
||
status: str = "draft", feature_image: Optional[str] = None) -> Dict[str, Any]:
|
||
url = self.base + "posts/?source=html"
|
||
body = {"posts": [{"title": title, "html": html_content, "status": status,
|
||
**({"feature_image": feature_image} if feature_image else {})}]}
|
||
resp = requests.post(url, headers=self._headers(), json=body, timeout=30)
|
||
if resp.status_code >= 400:
|
||
raise RuntimeError(f"Ghost create error {resp.status_code}: {resp.text}")
|
||
return resp.json()["posts"][0]
|
||
|
||
def publish_post(self, post_id: str, updated_at: str,
|
||
newsletter_slug: Optional[str], email_segment: Optional[str] = None) -> Dict[str, Any]:
|
||
slug = self.pick_newsletter_slug(newsletter_slug)
|
||
params = [f"newsletter={requests.utils.quote(slug)}"]
|
||
if email_segment:
|
||
params.append(f"email_segment={requests.utils.quote(email_segment)}")
|
||
url = self.base + f"posts/{post_id}/?{'&'.join(params)}"
|
||
body = {"posts": [{"updated_at": updated_at, "status": "published"}]}
|
||
resp = requests.put(url, headers=self._headers(), json=body, timeout=30)
|
||
if resp.status_code >= 400:
|
||
raise RuntimeError(f"Ghost publish error {resp.status_code}: {resp.text}")
|
||
return resp.json()["posts"][0]
|
||
|
||
# -------------------- Helpers (URLs, images, dates) --------------------
|
||
|
||
def _sanitize_url(u: Optional[str]) -> Optional[str]:
|
||
if not u or not isinstance(u, str):
|
||
return None
|
||
u = u.strip().replace("\\", "/")
|
||
if not u:
|
||
return None
|
||
if u.startswith("//"):
|
||
u = "https:" + u
|
||
if " " in u:
|
||
u = u.replace(" ", "%20")
|
||
p = urlparse(u)
|
||
if p.scheme not in ("http", "https") or not p.netloc:
|
||
return None
|
||
return u
|
||
|
||
def _fmt_dt(iso_dt: Optional[str]) -> Optional[str]:
|
||
if not iso_dt:
|
||
return None
|
||
try:
|
||
dt = datetime.fromisoformat(iso_dt.replace("Z", "+00:00")).astimezone(timezone.utc)
|
||
return dt.strftime("%Y-%m-%d %H:%M UTC")
|
||
except Exception:
|
||
return iso_dt
|
||
|
||
|
||
def _egs_is_free_now(item: Dict[str, Any], now: Optional[datetime] = None) -> bool:
|
||
"""Vrai si l'offre est GRATUITE en ce moment (pas juste en promo payante)."""
|
||
now = now or datetime.now(timezone.utc)
|
||
|
||
# 1) Prix courant = 0 ?
|
||
price = (item.get("price") or {}).get("totalPrice") or {}
|
||
try:
|
||
# certains champs sont des int, d'autres des str
|
||
discount_price = int(str(price.get("discountPrice") or 0))
|
||
if discount_price == 0:
|
||
# si Epic renvoie 0, c'est bien 'Free Now'
|
||
return True
|
||
except Exception:
|
||
pass
|
||
|
||
# 2) Promo 100% active dans la fenêtre de dates ?
|
||
promos = (item.get("promotions") or {})
|
||
for group_key in ("promotionalOffers", "upcomingPromotionalOffers"):
|
||
for group in promos.get(group_key) or []:
|
||
for p in group.get("promotionalOffers") or []:
|
||
try:
|
||
start = datetime.fromisoformat(p.get("startDate").replace("Z", "+00:00"))
|
||
end = datetime.fromisoformat(p.get("endDate").replace("Z", "+00:00"))
|
||
except Exception:
|
||
start = end = None
|
||
discount = ((p.get("discountSetting") or {}).get("discountPercentage"))
|
||
# Plusieurs backends EGS renvoient 0 pour 'gratuit', d'autres 100.
|
||
# On accepte les deux mais on exige que la fenêtre englobe 'now'.
|
||
if discount in (0, 100):
|
||
if (start is None or start <= now) and (end is None or now <= end):
|
||
return True
|
||
return False
|
||
|
||
|
||
|
||
def _in_last_7_days(start_iso: Optional[str], end_iso: Optional[str]) -> bool:
|
||
now = datetime.now(timezone.utc)
|
||
window_start = now - timedelta(days=7)
|
||
try:
|
||
if start_iso:
|
||
s = datetime.fromisoformat(start_iso.replace("Z", "+00:00"))
|
||
else:
|
||
s = None
|
||
if end_iso:
|
||
e = datetime.fromisoformat(end_iso.replace("Z", "+00:00"))
|
||
else:
|
||
e = None
|
||
except Exception:
|
||
return False
|
||
# Overlap test: [s,e] intersects [now-7d, now]
|
||
s = s or now
|
||
e = e or now
|
||
return (s <= now) and (e >= window_start) or (s >= window_start and s <= now)
|
||
|
||
|
||
# Prime gaming
|
||
|
||
def _prime_key(it: Dict[str, Any]) -> str:
|
||
# priorité à la clé GraphQL si dispo, sinon fallback stable sur (title|url)
|
||
k = it.get("key")
|
||
if isinstance(k, str) and k.strip():
|
||
return k
|
||
t = (it.get("title") or "").strip().lower()
|
||
u = (it.get("url") or "").strip().lower()
|
||
return f"{t}|{u}"
|
||
|
||
def _pick_img(it: Dict[str, Any]) -> Optional[str]:
|
||
# quelques champs possibles venant du payload GraphQL
|
||
for cand in ("image", "hero", "boxArt", "thumb", "thumbnail", "cover"):
|
||
v = it.get(cand)
|
||
if isinstance(v, str) and v.startswith(("http://", "https://")):
|
||
return _sanitize_url(v)
|
||
return None
|
||
|
||
def _coerce_items_from_graphql(data: dict) -> list[dict]:
|
||
"""Extract Prime Gaming Item records from several response shapes."""
|
||
items: List[Dict[str, Any]] = []
|
||
|
||
def push_from_item(it: dict):
|
||
if not isinstance(it, dict):
|
||
return
|
||
iid = it.get("id") or it.get("itemId")
|
||
assets = it.get("assets") or {}
|
||
title = (assets.get("title") or it.get("title") or "").strip()
|
||
url = (
|
||
assets.get("externalClaimLink")
|
||
or it.get("claimUrl")
|
||
or f"https://gaming.amazon.com/home?itemId={iid}"
|
||
)
|
||
media = (assets.get("cardMedia") or {}).get("defaultMedia") or {}
|
||
img = media.get("src1x") or media.get("src2x") or None
|
||
end = None
|
||
offers = it.get("offers")
|
||
if isinstance(offers, list) and offers:
|
||
end = offers[0].get("endTime") or offers[0].get("endDate")
|
||
|
||
items.append(
|
||
{
|
||
"key": str(iid) if iid else title, # used for de-dupe & DB
|
||
"platform": "PRIME",
|
||
"title": title
|
||
or (it.get("game", {}) or {}).get("assets", {}).get("title")
|
||
or "Unknown",
|
||
"game": title or "",
|
||
"end": end,
|
||
"url": _sanitize_url(url),
|
||
"image": _sanitize_url(img),
|
||
"thumbnail": _sanitize_url(img),
|
||
"category": it.get("category"),
|
||
"isFGWP": it.get("isFGWP"),
|
||
}
|
||
)
|
||
|
||
# shape 1: {"data":{"items":{"items":[...]}}}
|
||
page_items = (((data or {}).get("data") or {}).get("items") or {}).get("items")
|
||
if isinstance(page_items, list):
|
||
for it in page_items:
|
||
push_from_item(it)
|
||
|
||
# shape 2: sections with ItemsPage
|
||
sections = [
|
||
"inGameLoot",
|
||
"expiring",
|
||
"popular",
|
||
"games",
|
||
"featuredContent",
|
||
"eventRow1",
|
||
"eventRow2",
|
||
]
|
||
root = (data or {}).get("data") or {}
|
||
for sec in sections:
|
||
arr = (root.get(sec) or {}).get("items")
|
||
if isinstance(arr, list):
|
||
for it in arr:
|
||
if isinstance(it, dict) and it.get("assets"):
|
||
push_from_item(it)
|
||
|
||
return items
|
||
|
||
|
||
|
||
async def _fetch_prime_offers_playwright() -> List[Dict[str, Any]]:
|
||
"""Open Prime Gaming in a real browser and capture GraphQL responses."""
|
||
if async_playwright is None:
|
||
raise RuntimeError(
|
||
"Playwright not installed. `pip install playwright` then `playwright install`"
|
||
)
|
||
|
||
async with async_playwright() as pw:
|
||
browser = await pw.chromium.launch(headless=True)
|
||
context = await browser.new_context(
|
||
user_agent=UA["User-Agent"],
|
||
extra_http_headers={k: v for k, v in UA.items() if k.lower() != "user-agent"},
|
||
)
|
||
|
||
page = await context.new_page()
|
||
offers: List[Dict[str, Any]] = []
|
||
|
||
async def handle_response(resp):
|
||
try:
|
||
if "/graphql" in resp.url and resp.request.method == "POST" and resp.status == 200:
|
||
data = await resp.json()
|
||
extracted = _coerce_items_from_graphql(data)
|
||
offers.extend(extracted)
|
||
except Exception:
|
||
LOG.exception("PRIME: parse error on %s", resp.url)
|
||
|
||
page.on("response", handle_response)
|
||
|
||
await page.goto(PRIME_HOME, wait_until="domcontentloaded", timeout=45000)
|
||
try:
|
||
await page.wait_for_load_state("networkidle", timeout=20000)
|
||
except Exception:
|
||
pass
|
||
|
||
# Nudge lazy sections
|
||
if not offers:
|
||
try:
|
||
await page.evaluate("window.scrollTo(0, document.body.scrollHeight)")
|
||
await page.wait_for_timeout(2000)
|
||
await page.evaluate("window.scrollTo(0, 0)")
|
||
await page.wait_for_timeout(1000)
|
||
except Exception:
|
||
pass
|
||
|
||
await context.close()
|
||
await browser.close()
|
||
|
||
# de-dupe by key
|
||
uniq = {}
|
||
for it in offers:
|
||
k = it.get("key")
|
||
if k and k not in uniq:
|
||
uniq[k] = it
|
||
|
||
# keep only FULL_GAME (drop in-game loot) and items with a title + url
|
||
final = [
|
||
v
|
||
for v in uniq.values()
|
||
if (v.get("category") == "FULL_GAME" or v.get("isFGWP"))
|
||
and v.get("title")
|
||
and v.get("url")
|
||
]
|
||
return final
|
||
|
||
|
||
# Prime Gaming
|
||
PRIME_HOME = "https://gaming.amazon.com/"
|
||
|
||
def _prime_key(it: Dict[str, Any]) -> str:
|
||
# priorité à la clé GraphQL si dispo, sinon fallback stable sur (title|url)
|
||
k = it.get("key")
|
||
if isinstance(k, str) and k.strip():
|
||
return k
|
||
t = (it.get("title") or "").strip().lower()
|
||
u = (it.get("url") or "").strip().lower()
|
||
return f"{t}|{u}"
|
||
|
||
def _pick_img(it: Dict[str, Any]) -> Optional[str]:
|
||
# quelques champs possibles venant du payload GraphQL
|
||
for cand in ("image", "hero", "boxArt", "thumb", "thumbnail", "cover"):
|
||
v = it.get(cand)
|
||
if isinstance(v, str) and v.startswith(("http://", "https://")):
|
||
return _sanitize_url(v)
|
||
return None
|
||
|
||
async def fetch_primegaming_week() -> Optional[Dict[str, Any]]:
|
||
"""
|
||
Ouvre Prime Gaming (Playwright), capte les réponses GraphQL,
|
||
garde les FULL_GAME/FGWP et filtre sur 7 jours si possible.
|
||
Ne touche pas à la DB ici.
|
||
"""
|
||
offers = await _fetch_prime_offers_playwright() # fourni par toi
|
||
|
||
if not offers:
|
||
return None
|
||
|
||
# filtre temporel (si dates présentes)
|
||
windowed: List[Dict[str, Any]] = []
|
||
for it in offers:
|
||
start_iso = it.get("start") or it.get("startDate") or it.get("availabilityStartDate")
|
||
end_iso = it.get("end") or it.get("endDate") or it.get("availabilityEndDate")
|
||
if (start_iso or end_iso):
|
||
if _in_last_7_days(start_iso, end_iso):
|
||
windowed.append(it)
|
||
else:
|
||
windowed.append(it)
|
||
|
||
if not windowed:
|
||
return None
|
||
|
||
# dédup locale + clés calculées pour le filtrage DB ultérieur
|
||
uniq: Dict[str, Dict[str, Any]] = {}
|
||
for it in windowed:
|
||
k = _prime_key(it)
|
||
if k not in uniq:
|
||
it["_dedup_key"] = k
|
||
uniq[k] = it
|
||
|
||
items = list(uniq.values())
|
||
if not items:
|
||
return None
|
||
|
||
# Construire un bloc "brut" (les titres seront re-dérivés après filtrage DB)
|
||
hero = _pick_img(items[0])
|
||
|
||
return {
|
||
"title": "This Month on Prime Gaming",
|
||
"date": datetime.now(timezone.utc).isoformat(),
|
||
"url": PRIME_HOME,
|
||
"games": [ (it.get("title") or "").strip() for it in items if it.get("title") ][:12],
|
||
"image": hero,
|
||
"thumbnail": hero,
|
||
"_items": items, # pour filtrage DB/remember au call-site
|
||
}
|
||
|
||
|
||
|
||
|
||
# -------------------- EGS freebies (async, via official endpoint) --------------------
|
||
|
||
async def fetch_egs_week(session: aiohttp.ClientSession,
|
||
locale="en-US", country="US", allow_countries=None) -> List[Dict[str, Any]]:
|
||
allow = allow_countries or country
|
||
url = ("https://store-site-backend-static.ak.epicgames.com/freeGamesPromotions"
|
||
f"?locale={locale}&country={country}&allowCountries={allow}")
|
||
async with session.get(url, headers=UA, timeout=aiohttp.ClientTimeout(total=25)) as r:
|
||
r.raise_for_status()
|
||
data = await r.json()
|
||
|
||
out: List[Dict[str, Any]] = []
|
||
elements = (((data or {}).get("data") or {}).get("Catalog") or {}).get("searchStore", {}).get("elements", []) or []
|
||
for item in elements:
|
||
|
||
if not _egs_is_free_now(item):
|
||
continue
|
||
title = item.get("title")
|
||
slug = item.get("productSlug")
|
||
if not slug:
|
||
mappings = (item.get("catalogNs", {}) or {}).get("mappings") or []
|
||
if mappings:
|
||
slug = mappings[0].get("pageSlug")
|
||
url_item = _sanitize_url(f"https://store.epicgames.com/p/{slug}" if slug else None)
|
||
key_images = item.get("keyImages") or []
|
||
pref_order = ("OfferImageWide", "DieselStoreFrontWide", "DieselStoreFront", "OfferImageTall", "Thumbnail")
|
||
chosen = None
|
||
for t in pref_order:
|
||
chosen = next((k for k in key_images if (k.get("type") or "").lower() == t.lower()), None)
|
||
if chosen:
|
||
break
|
||
if not chosen and key_images:
|
||
chosen = key_images[0]
|
||
hero = _sanitize_url(chosen.get("url")) if chosen else None
|
||
thumb = hero
|
||
|
||
promos = (item.get("promotions") or {})
|
||
|
||
|
||
for section in ("promotionalOffers", "upcomingPromotionalOffers"):
|
||
lst = promos.get(section) or []
|
||
for entry in lst:
|
||
for p in entry.get("promotionalOffers", []) or []:
|
||
start = p.get("startDate")
|
||
end = p.get("endDate")
|
||
if _in_last_7_days(start, end):
|
||
out.append({
|
||
"platform": "EGS",
|
||
"title": title,
|
||
"start": start,
|
||
"end": end,
|
||
"url": url_item,
|
||
"image": hero,
|
||
"thumbnail": thumb,
|
||
"is_current": None,
|
||
})
|
||
# keep unique by (title,start)
|
||
seen = set()
|
||
uniq = []
|
||
for it in out:
|
||
k = (it.get("title"), it.get("start"))
|
||
if k not in seen:
|
||
seen.add(k)
|
||
uniq.append(it)
|
||
# sort by start desc
|
||
uniq.sort(key=lambda e: _fmt_dt(e.get("start")) or "", reverse=True)
|
||
return uniq
|
||
|
||
# -------------------- PS Plus (async, blog scraping FR) --------------------
|
||
|
||
PSPLUS_CATEGORY_URL_FR = "https://blog.fr.playstation.com/category/ps-plus/"
|
||
BASE_BLOG = "https://blog.fr.playstation.com/"
|
||
|
||
def _soup(text: str) -> BeautifulSoup:
|
||
return BeautifulSoup(text, "html.parser")
|
||
|
||
def _clean_titles(candidates: List[str], limit: int = 7) -> List[str]:
|
||
cleaned, seen = [], set()
|
||
for g in candidates:
|
||
g = re.sub(r"\s*\|\s*PS[45].*$", "", g).strip()
|
||
g = re.sub(r"\s*\(PS\+\)$", "", g).strip()
|
||
if not g or len(g) < 2:
|
||
continue
|
||
k = g.lower()
|
||
if k not in seen:
|
||
seen.add(k)
|
||
cleaned.append(g)
|
||
if len(cleaned) >= limit:
|
||
break
|
||
return cleaned
|
||
|
||
def _abs_url(u: Optional[str], base: str) -> Optional[str]:
|
||
if not u:
|
||
return None
|
||
if u.startswith("http://") or u.startswith("https://"):
|
||
return u
|
||
if u.startswith("/"):
|
||
return BASE_BLOG.rstrip("/") + u
|
||
return base.rstrip("/") + "/" + u
|
||
|
||
def _extract_best_image(art: BeautifulSoup, article_url: str) -> Tuple[Optional[str], Optional[str]]:
|
||
for sel in [
|
||
'meta[property="og:image"]',
|
||
'meta[name="og:image"]',
|
||
'meta[name="twitter:image"]',
|
||
'meta[property="twitter:image"]',
|
||
]:
|
||
tag = art.select_one(sel)
|
||
if tag and tag.get("content"):
|
||
img = _abs_url(tag["content"].strip(), article_url)
|
||
if img:
|
||
return (img, img)
|
||
tag = art.select_one('link[rel="image_src"]')
|
||
if tag and tag.get("href"):
|
||
img = _abs_url(tag["href"].strip(), article_url)
|
||
if img:
|
||
return (img, img)
|
||
first_img = art.select_one("article img[src]")
|
||
if first_img:
|
||
img = _abs_url(first_img.get("src"), article_url)
|
||
return (img, img)
|
||
return (None, None)
|
||
|
||
async def fetch_psplus_week(session: aiohttp.ClientSession) -> Optional[Dict[str, Any]]:
|
||
"""Dernier billet FR “Les jeux du mois PlayStation Plus …”. On le retient s’il tombe dans T-7..T."""
|
||
async with session.get(PSPLUS_CATEGORY_URL_FR, headers=UA, timeout=aiohttp.ClientTimeout(total=25)) as r:
|
||
r.raise_for_status()
|
||
idx_html = await r.text()
|
||
idx = _soup(idx_html)
|
||
candidates: List[Tuple[str, str]] = []
|
||
for a in idx.select("a[href]"):
|
||
href = a.get("href") or ""
|
||
title = (a.get_text() or "").strip()
|
||
if not href.startswith(BASE_BLOG):
|
||
continue
|
||
if title.lower().startswith("les jeux du mois playstation plus"):
|
||
candidates.append((title, href))
|
||
if not candidates:
|
||
return None
|
||
|
||
latest: Optional[Tuple[datetime, str, str, BeautifulSoup]] = None
|
||
for title, href in candidates:
|
||
try:
|
||
async with session.get(href, headers=UA, timeout=aiohttp.ClientTimeout(total=25)) as r:
|
||
r.raise_for_status()
|
||
art_html = await r.text()
|
||
except Exception:
|
||
continue
|
||
art = _soup(art_html)
|
||
time_el = art.find("time")
|
||
date_iso = time_el.get("datetime") if time_el else None
|
||
if not date_iso:
|
||
continue
|
||
try:
|
||
dt_iso = datetime.fromisoformat(date_iso.replace("Z", "+00:00"))
|
||
except Exception:
|
||
continue
|
||
if latest is None or dt_iso > latest[0]:
|
||
latest = (dt_iso, title, href, art)
|
||
|
||
if latest is None:
|
||
return None
|
||
|
||
dt_iso, title, url, art = latest
|
||
now = datetime.now(timezone.utc)
|
||
if (now - dt_iso) > timedelta(days=7):
|
||
# Hors fenêtre -> pas de section PS+ cette semaine
|
||
return None
|
||
|
||
# extraire quelques titres listés dans l’article
|
||
games: List[str] = []
|
||
for node in art.find_all(string=True):
|
||
t = " ".join((node or "").strip().split())
|
||
if " | PS" in t and 0 < len(t) <= 120:
|
||
name = t.split(" | ")[0].strip("—:-– ")
|
||
if 2 <= len(name.split()) <= 8:
|
||
games.append(name)
|
||
if not games:
|
||
for tag in art.select("strong, em"):
|
||
t = " ".join(tag.get_text(" ", strip=True).split())
|
||
if 2 <= len(t.split()) <= 8:
|
||
games.append(t)
|
||
|
||
image_url, thumb_url = _extract_best_image(art, url)
|
||
return {
|
||
"title": title,
|
||
"date": dt_iso.replace(tzinfo=timezone.utc).isoformat(),
|
||
"url": url,
|
||
"games": _clean_titles(games),
|
||
"image": _sanitize_url(image_url),
|
||
"thumbnail": _sanitize_url(thumb_url),
|
||
}
|
||
|
||
# -------------------- Xbox Game Pass (sync, SIGLS + DisplayCatalog) --------------------
|
||
|
||
def _xbox_locale_from_languages(languages: str) -> str:
|
||
if not languages or "-" not in languages:
|
||
return "en-US"
|
||
lang, region = languages.split("-", 1)
|
||
return f"{lang.lower()}-{region.upper()}"
|
||
|
||
def _slugify(s: str) -> str:
|
||
s = re.sub(r"[^a-zA-Z0-9]+", "-", s).strip("-").lower()
|
||
return s or "game"
|
||
|
||
def _pick_images_displaycatalog(prod: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
|
||
def candidates_of(prod):
|
||
cands: List[Dict[str, Any]] = []
|
||
lp = prod.get("LocalizedProperties") or []
|
||
if isinstance(lp, list) and lp:
|
||
images = lp[0].get("Images") or []
|
||
if isinstance(images, list):
|
||
cands.extend(images)
|
||
top = prod.get("Images") or []
|
||
if isinstance(top, list):
|
||
cands.extend(top)
|
||
return cands
|
||
|
||
def is_pref(img: Dict[str, Any]) -> bool:
|
||
purpose = (img.get("ImagePurpose") or img.get("Purpose") or "").lower()
|
||
return any(k in purpose for k in ("poster", "brandedkeyart", "superherowide", "superheroart"))
|
||
|
||
cands = candidates_of(prod)
|
||
if not cands:
|
||
return None, None
|
||
pref = [i for i in cands if is_pref(i)] or cands
|
||
try:
|
||
hero_raw = max(pref, key=lambda i: i.get("Width", 0)).get("Uri")
|
||
except Exception:
|
||
hero_raw = pref[0].get("Uri")
|
||
try:
|
||
thumb_raw = min(pref, key=lambda i: i.get("Width", 10**9)).get("Uri")
|
||
except Exception:
|
||
thumb_raw = hero_raw
|
||
hero = _sanitize_url(hero_raw)
|
||
thumb = _sanitize_url(thumb_raw)
|
||
return hero or thumb, thumb or hero
|
||
|
||
def fetch_xgp_recent(limit=20, market="US", languages="en-us") -> List[Dict[str, Any]]:
|
||
"""Best-effort: ‘Recently added’ via SIGLS; fallback All games; enrich via DisplayCatalog."""
|
||
recently_ids = [
|
||
"3fdd7f57-7092-4b65-bd40-5a9dac1b2b84",
|
||
"61d6e1a1-735c-4b97-9d15-22ce8dfb0c03",
|
||
"7d2d3d36-1c52-4a63-8b3e-3b7ee4d0f62a",
|
||
]
|
||
all_games_id = "29a81209-df6f-41fd-a528-2ae6b91f719c"
|
||
|
||
def sigls(collection_id: str) -> List[str]:
|
||
try:
|
||
r = requests.get("https://catalog.gamepass.com/sigls/v2",
|
||
params={"language": languages, "market": market, "id": collection_id},
|
||
headers=UA, timeout=25)
|
||
r.raise_for_status()
|
||
payload = r.json()
|
||
except Exception:
|
||
return []
|
||
ids: List[str] = []
|
||
if isinstance(payload, list):
|
||
for obj in payload:
|
||
if isinstance(obj, dict) and "id" in obj and isinstance(obj["id"], str):
|
||
ids.append(obj["id"])
|
||
elif isinstance(obj, str):
|
||
ids.append(obj)
|
||
return ids
|
||
|
||
big_ids: List[str] = []
|
||
for cid in recently_ids:
|
||
ids = sigls(cid)
|
||
if ids:
|
||
big_ids = ids
|
||
break
|
||
if not big_ids:
|
||
big_ids = sigls(all_games_id)
|
||
if not big_ids:
|
||
return []
|
||
|
||
out: List[Dict[str, Any]] = []
|
||
seen = set()
|
||
for i in range(0, len(big_ids), 20):
|
||
chunk = big_ids[i:i+20]
|
||
try:
|
||
dc = requests.get("https://displaycatalog.mp.microsoft.com/v7.0/products",
|
||
params={"bigIds": ",".join(chunk), "market": market, "languages": languages},
|
||
headers=UA, timeout=25)
|
||
dc.raise_for_status()
|
||
data = dc.json()
|
||
except Exception:
|
||
continue
|
||
for p in (data.get("Products") or []):
|
||
pid = p.get("ProductId")
|
||
if not pid or pid in seen:
|
||
continue
|
||
seen.add(pid)
|
||
title = pid
|
||
lp = p.get("LocalizedProperties") or []
|
||
if lp:
|
||
title = lp[0].get("ProductTitle") or pid
|
||
locale_path = _xbox_locale_from_languages("fr-fr")
|
||
url = f"https://www.xbox.com/{locale_path}/games/store/{_slugify(title)}/{pid}"
|
||
hero, thumb = _pick_images_displaycatalog(p)
|
||
out.append({"platform": "XGP", "title": title, "productId": pid, "url": _sanitize_url(url),
|
||
"image": hero, "thumbnail": thumb})
|
||
return out[:limit]
|
||
|
||
# -------------------- HTML builder --------------------
|
||
|
||
YOUTUBE_EMBED_TMPL = (
|
||
'<div class="yt-container" style="position:relative;aspect-ratio:16/9;max-width:800px;margin:1rem 0">'
|
||
'<iframe src="https://www.youtube.com/embed/{vid}" title="YouTube video" loading="lazy" '
|
||
'style="position:absolute;inset:0;width:100%;height:100%;border:0" '
|
||
'allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share" '
|
||
'allowfullscreen></iframe></div>'
|
||
)
|
||
|
||
|
||
def render_cards_grid(items, columns=2, fixed_h=180, title_em=3, fallback_url="#"):
|
||
if not items:
|
||
return "<p>Aucun élément disponible.</p>"
|
||
|
||
items = items[:12]
|
||
col_pct = 100 // max(1, columns)
|
||
# explicit col widths => no leftover gap
|
||
colgroup = "".join([f'<col style="width:{col_pct}%;">' for _ in range(columns)])
|
||
|
||
# build cells
|
||
cells = []
|
||
for it in items:
|
||
title = (it.get("title") or it.get("name") or "Voir l’offre").strip()
|
||
url = (it.get("url") or fallback_url).strip()
|
||
img = (it.get("image") or it.get("hero") or it.get("thumb") or "")
|
||
img = _sanitize_url(img) if img else ""
|
||
|
||
cells.append(f"""
|
||
<td width="{col_pct}%" style="padding:8px;vertical-align:top;width:{col_pct}%;">
|
||
<a href="{html.escape(url)}" style="text-decoration:none;color:inherit;display:block;">
|
||
{('<img src="'+html.escape(img)+'" alt="'+html.escape(title)+'" '
|
||
'style="width:100%;height:'+str(fixed_h)+'px;object-fit:cover;object-position:center;'
|
||
'border:0;border-radius:12px;display:block;background:#000;">') if img else ''}
|
||
<div style="margin-top:6px;font-weight:600;line-height:1.25;
|
||
min-height:{title_em}em; text-align:center; overflow:hidden;">{html.escape(title)}</div>
|
||
</a>
|
||
</td>""")
|
||
|
||
# pad last row
|
||
rows = []
|
||
for i in range(0, len(cells), columns):
|
||
row = cells[i:i+columns]
|
||
if len(row) < columns:
|
||
missing = columns - len(row)
|
||
row += [f'<td width="{col_pct}%" style="padding:8px;width:{col_pct}%;"></td>'] * missing
|
||
rows.append("<tr>" + "".join(row) + "</tr>")
|
||
|
||
# IMPORTANT: force fixed layout + zero spacing
|
||
return f"""
|
||
<table role="presentation" cellpadding="0" cellspacing="0" border="0" width="100%"
|
||
style="margin:8px 0; table-layout:fixed; border-collapse:collapse; border-spacing:0;">
|
||
<colgroup>{colgroup}</colgroup>
|
||
{''.join(rows)}
|
||
</table>"""
|
||
|
||
def build_html(egs: List[Dict[str, Any]],
|
||
psplus: Optional[Dict[str, Any]],
|
||
xgp: List[Dict[str, Any]],
|
||
prime: Optional[Dict[str, Any]] = None) -> Tuple[str, Optional[str]]:
|
||
|
||
parts: List[str] = []
|
||
now_local = datetime.now(TZ)
|
||
title_h2 = f"Presque Gratuit — semaine du {(now_local - timedelta(days=6)).strftime('%d/%m')} au {now_local.strftime('%d/%m')}"
|
||
parts.append(f"<h2>{html.escape(title_h2)}</h2>")
|
||
feature: Optional[str] = None
|
||
|
||
# --- Epic Games Store
|
||
parts.append("<h3>🎁 Epic Games Store — Jeux gratuits cette semaine</h3>")
|
||
if not egs:
|
||
parts.append("<p>Aucun jeu gratuit relevé sur la période.</p>")
|
||
else:
|
||
parts.append(render_cards_grid(egs, columns=CARD_COLUMNS, fixed_h=CARD_IMG_H, fallback_url="https://store.epicgames.com/"))
|
||
for it in egs:
|
||
# title = html.escape(it.get("title") or "")
|
||
# url = it.get("url") or ""
|
||
# start = _fmt_dt(it.get("start")) or "?"
|
||
# end = _fmt_dt(it.get("end")) or "?"
|
||
img = it.get("thumbnail")
|
||
if img and not feature:
|
||
feature = img
|
||
# parts.append("<div style='margin:12px 0'>")
|
||
# if img:
|
||
# parts.append(f'<p><img src="{html.escape(img)}" style="max-width:100%;height:auto;border:0"/></p>')
|
||
# parts.append(f"<p><strong>{title}</strong><br/>Période : {start} → {end}</p>")
|
||
# if url:
|
||
# parts.append(f'<p><a href="{html.escape(url)}">{html.escape(url)}</a></p>')
|
||
# parts.append("</div>")
|
||
|
||
|
||
# --- Prime Gaming
|
||
parts.append("<h3>🟣 Prime Gaming — Jeux offerts</h3>")
|
||
if not prime:
|
||
parts.append("<p>Aucun nouveau billet Prime Gaming cette semaine.</p>")
|
||
else:
|
||
# utilise l’image de section comme “feature” si rien d’autre n’a été choisi
|
||
# if prime.get("image") and not feature:
|
||
# feature = prime["image"]
|
||
|
||
# titre cliquable vers la page "This Month on Prime Gaming"
|
||
title = html.escape(prime.get("title") or "This Month on Prime Gaming")
|
||
url = prime.get("url") or "https://gaming.amazon.com/"
|
||
#parts.append(f"<p><strong><a href='{html.escape(url)}'>{title}</a></strong></p>")
|
||
|
||
# grille de cartes (image au-dessus, nom en dessous, lien vers l’offre)
|
||
parts.append(render_cards_grid(prime.get("_items", []), columns=CARD_COLUMNS, fixed_h=CARD_IMG_H, fallback_url=url))
|
||
|
||
# --- Xbox Game Pass
|
||
|
||
parts.append("<h3>🟩 Xbox Game Pass — Récemment ajoutés</h3>")
|
||
if not xgp:
|
||
parts.append("<p>Pas d'entrées détectées.</p>")
|
||
else:
|
||
parts.append(render_cards_grid(xgp, columns=CARD_COLUMNS, fixed_h=CARD_IMG_H, fallback_url="https://www.xbox.com/xbox-game-pass"))
|
||
for it in xgp:
|
||
# title = html.escape(it.get("title") or it.get("productId") or "")
|
||
# url = it.get("url") or ""
|
||
# img = it.get("thumbnail")
|
||
if img and not feature:
|
||
feature = img
|
||
# parts.append("<div style='margin:12px 0'>")
|
||
# if img:
|
||
# parts.append(f'<p><img src="{html.escape(img)}" style="max-width:100%;height:auto;border:0"/></p>')
|
||
# parts.append(f"<p><strong>{title}</strong></p>")
|
||
# if url:
|
||
# parts.append(f'<p><a href="{html.escape(url)}">{html.escape(url)}</a></p>')
|
||
# parts.append("</div>")
|
||
|
||
# --- PlayStation Plus
|
||
parts.append("<h3>🎮 PlayStation Plus — Jeux du mois</h3>")
|
||
if not psplus:
|
||
parts.append("<p>Aucune nouvelle annonce PS+.</p>")
|
||
else:
|
||
if psplus.get("image") and not feature:
|
||
feature = psplus["image"]
|
||
parts.append("<div style='margin:12px 0'>")
|
||
if psplus.get("image"):
|
||
parts.append(f'<p><img src="{html.escape(psplus["image"])}" style="max-width:100%;height:auto;border:0"/></p>')
|
||
title = html.escape(psplus.get("title") or "PlayStation Plus — Jeux du mois")
|
||
url = psplus.get("url")
|
||
if url:
|
||
parts.append(f"<p><strong><a href='{html.escape(url)}'>{title}</a></strong></p>")
|
||
else:
|
||
parts.append(f"<p><strong>{title}</strong></p>")
|
||
games = psplus.get("games") or []
|
||
if games:
|
||
parts.append("<ul>" + "".join(f"<li>{html.escape(g)}</li>" for g in games) + "</ul>")
|
||
parts.append("</div>")
|
||
|
||
parts.append("<hr><p><em>Newsletter hebdomadaire — envoyée automatiquement chaque dimanche à midi.</em></p>")
|
||
return "\n".join(parts), feature
|
||
|
||
# -------------------- Orchestrator --------------------
|
||
|
||
async def run_weekly():
|
||
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
|
||
admin_url = os.environ["GHOST_ADMIN_URL"]
|
||
admin_key = os.environ["GHOST_ADMIN_KEY"]
|
||
newsletter_slug = os.environ.get("GHOST_NEWSLETTER_SLUG")
|
||
ghost = GhostAdmin(admin_url, admin_key)
|
||
|
||
# Fetch all sources (with timeouts and isolation)
|
||
egs_items: List[Dict[str, Any]] = []
|
||
psplus_data: Optional[Dict[str, Any]] = None
|
||
xgp_items: List[Dict[str, Any]] = []
|
||
prime_data: Optional[Dict[str, Any]] = None
|
||
|
||
store = Storage()
|
||
|
||
try:
|
||
async with aiohttp.ClientSession(headers=UA) as s:
|
||
# EGS
|
||
try:
|
||
locale = os.environ.get("EGS_LOCALE", "en-US")
|
||
country = os.environ.get("EGS_COUNTRY", "US")
|
||
allow = os.environ.get("EGS_ALLOW_COUNTRIES", country)
|
||
egs_items = await fetch_egs_week(s, locale=locale, country=country, allow_countries=allow)
|
||
except Exception as e:
|
||
LOG.warning("EGS fetch failed: %s", e)
|
||
|
||
# PS Plus
|
||
try:
|
||
psplus_data = await fetch_psplus_week(s)
|
||
except Exception as e:
|
||
LOG.warning("PS+ fetch failed: %s", e)
|
||
except Exception as e:
|
||
LOG.warning("HTTP session failed: %s", e)
|
||
|
||
#prime
|
||
try:
|
||
prime_data = await fetch_primegaming_week()
|
||
except Exception as e:
|
||
LOG.warning("prime fetch failed: %s", e)
|
||
|
||
|
||
if prime_data:
|
||
# 2.a Filtre DB (ne garder que ce qui n’a jamais été publié)
|
||
fresh = []
|
||
for it in prime_data["_items"]:
|
||
k = it.get("_dedup_key") or _prime_key(it)
|
||
if not store.seen(k): # ← usage DB (filtrage)
|
||
it["_dedup_key"] = k
|
||
fresh.append(it)
|
||
|
||
if fresh:
|
||
# régénère la liste "games" et l'image en fonction du filtrage
|
||
prime_data["_items"] = fresh
|
||
prime_data["games"] = [ (it.get("title") or "").strip() for it in fresh if it.get("title") ][:12]
|
||
prime_data["image"] = prime_data.get("image") or _pick_img(fresh[0])
|
||
prime_data["thumbnail"] = prime_data["image"]
|
||
else:
|
||
prime_data = None # rien de neuf → pas de section
|
||
|
||
|
||
# XGP (requests, sync)
|
||
try:
|
||
market = os.environ.get("XGP_MARKET", "US")
|
||
languages = os.environ.get("XGP_LANGUAGES", "en-us")
|
||
xgp_items = fetch_xgp_recent(limit=20, market=market, languages=languages)
|
||
except Exception as e:
|
||
LOG.warning("XGP fetch failed: %s", e)
|
||
|
||
# Build HTML
|
||
def keep_new(items, key_fn):
|
||
fresh = []
|
||
for it in items:
|
||
k = key_fn(it)
|
||
if not store.seen(k):
|
||
it["_dedup_key"] = k
|
||
fresh.append(it)
|
||
return fresh
|
||
|
||
# Fetch your three sources as you already do:
|
||
|
||
xgp_items = keep_new(xgp_items, xgp_key)
|
||
|
||
html_body, feature = build_html(egs_items, psplus_data, xgp_items, prime_data)
|
||
# Title (FR)
|
||
start = (datetime.now(TZ) - timedelta(days=6)).strftime("%d/%m/%Y")
|
||
end = datetime.now(TZ).strftime("%d/%m/%Y")
|
||
title = f"Récap hebdo — EGS, Prime Gaming, PS Plus, Game Pass ({start} → {end})"
|
||
|
||
# Create + publish + email
|
||
created = ghost.create_post_html(title, html_body, status="draft", feature_image=feature)
|
||
ghost.publish_post(created["id"], created["updated_at"], newsletter_slug=newsletter_slug)
|
||
|
||
post_id = created["id"]
|
||
if xgp_items and post_id:
|
||
for it in xgp_items:
|
||
store.remember("xgp", it["_dedup_key"], post_id)
|
||
|
||
# 2.b Marquer comme publié (clé + post_id)
|
||
if prime_data and post_id:
|
||
store.bulk_remember("prime", [(it["_dedup_key"], post_id) for it in prime_data["_items"]])
|
||
|
||
LOG.info("Published weekly newsletter: %s", created.get("url"))
|
||
|
||
# -------------------- Scheduler --------------------
|
||
|
||
def _format_duration(seconds: float) -> str:
|
||
seconds = int(seconds)
|
||
days, seconds = divmod(seconds, 86400)
|
||
hours, seconds = divmod(seconds, 3600)
|
||
minutes, seconds = divmod(seconds, 60)
|
||
parts = []
|
||
if days: parts.append(f"{days} days")
|
||
if hours: parts.append(f"{hours} hours")
|
||
if minutes: parts.append(f"{minutes} minutes")
|
||
if seconds: parts.append(f"{seconds} seconds")
|
||
return ", ".join(parts) if parts else "0 seconds"
|
||
|
||
async def run_forever_sunday_noon():
|
||
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
|
||
|
||
while True:
|
||
now = datetime.now()
|
||
days_ahead = 6 - now.weekday()
|
||
if days_ahead < 0:
|
||
days_ahead += 7
|
||
|
||
target = (now + timedelta(days=days_ahead)).replace(
|
||
hour=12, minute=0, second=0, microsecond=0
|
||
)
|
||
|
||
# If it's already past this Sunday's noon, schedule for next week
|
||
if target <= now:
|
||
target += timedelta(days=7)
|
||
|
||
sleep_seconds = (target - now).total_seconds()
|
||
|
||
while sleep_seconds > 0:
|
||
LOG.info("Waiting for %s for next scan", _format_duration(sleep_seconds))
|
||
await asyncio.sleep(min(sleep_seconds, 5 * 60))
|
||
now = datetime.now()
|
||
sleep_seconds = (target - now).total_seconds()
|
||
|
||
LOG.info("Going to run the weekly task")
|
||
await run_weekly()
|
||
|
||
# -------------------- Entrypoint --------------------
|
||
|
||
async def main():
|
||
setuplogger()
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument("--runonce", action="store_true", help="Run now and exit (no scheduler)")
|
||
args = parser.parse_args()
|
||
|
||
if args.runonce:
|
||
await run_weekly()
|
||
else:
|
||
await run_forever_sunday_noon()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main()) |