619 lines
24 KiB
Python
619 lines
24 KiB
Python
# weekly_games_roundup.py
|
||
# -*- coding: utf-8 -*-
|
||
import asyncio
|
||
import argparse
|
||
import dataclasses
|
||
import html
|
||
import json
|
||
import logging
|
||
import os
|
||
import random
|
||
import re
|
||
import time
|
||
from dataclasses import dataclass
|
||
from typing import Any, Dict, List, Optional, Tuple
|
||
from urllib.parse import urlparse, parse_qs
|
||
|
||
import aiohttp
|
||
import jwt # PyJWT
|
||
import requests
|
||
from bs4 import BeautifulSoup
|
||
from logging.handlers import RotatingFileHandler
|
||
from datetime import datetime, timedelta, timezone
|
||
import zoneinfo
|
||
|
||
LOG = logging.getLogger("bot_weekly")
|
||
LOG_PATTERN = logging.Formatter("%(asctime)s:%(levelname)s: [%(filename)s] %(message)s")
|
||
|
||
def setuplogger():
|
||
stream_handler = logging.StreamHandler()
|
||
stream_handler.setFormatter(LOG_PATTERN)
|
||
stream_handler.setLevel(logging.DEBUG)
|
||
|
||
file_handler = RotatingFileHandler("bot_weekly.log", "a", 1_000_000, 1)
|
||
file_handler.setFormatter(LOG_PATTERN)
|
||
|
||
LOG.setLevel(logging.DEBUG)
|
||
LOG.addHandler(stream_handler)
|
||
LOG.addHandler(file_handler)
|
||
|
||
|
||
TZ = zoneinfo.ZoneInfo("Europe/Brussels")
|
||
UA = {"User-Agent": "Mozilla/5.0 (compatible; weekly-games-roundup/1.0)"}
|
||
|
||
# -------------------- Ghost Admin client --------------------
|
||
|
||
class GhostAdmin:
|
||
def __init__(self, admin_url: str, admin_key: str, accept_version: str = "v6.0"):
|
||
self.base = admin_url.rstrip("/") + "/"
|
||
self.key_id, self.key_secret_hex = admin_key.split(":")
|
||
self.accept_version = accept_version
|
||
|
||
def _jwt(self) -> str:
|
||
iat = int(time.time())
|
||
payload = {"iat": iat, "exp": iat + 5 * 60, "aud": "/admin/"}
|
||
headers = {"alg": "HS256", "typ": "JWT", "kid": self.key_id}
|
||
token = jwt.encode(payload, bytes.fromhex(self.key_secret_hex), algorithm="HS256", headers=headers)
|
||
return token if isinstance(token, str) else token.decode("utf-8")
|
||
|
||
def _headers(self):
|
||
return {"Authorization": f"Ghost {self._jwt()}",
|
||
"Accept-Version": self.accept_version,
|
||
"Content-Type": "application/json"}
|
||
|
||
def pick_newsletter_slug(self, preferred_slug: Optional[str]) -> str:
|
||
if preferred_slug:
|
||
return preferred_slug
|
||
url = self.base + "newsletters/"
|
||
resp = requests.get(url, headers=self._headers(), timeout=20)
|
||
resp.raise_for_status()
|
||
newsletters = resp.json().get("newsletters", [])
|
||
if not newsletters:
|
||
raise RuntimeError("No newsletters configured in Ghost.")
|
||
actives = [n for n in newsletters if n.get("status") == "active"]
|
||
for n in actives:
|
||
if n.get("is_default"):
|
||
return n["slug"]
|
||
return (actives or newsletters)[0]["slug"]
|
||
|
||
def create_post_html(self, title: str, html_content: str,
|
||
status: str = "draft", feature_image: Optional[str] = None) -> Dict[str, Any]:
|
||
url = self.base + "posts/?source=html"
|
||
body = {"posts": [{"title": title, "html": html_content, "status": status,
|
||
**({"feature_image": feature_image} if feature_image else {})}]}
|
||
resp = requests.post(url, headers=self._headers(), json=body, timeout=30)
|
||
if resp.status_code >= 400:
|
||
raise RuntimeError(f"Ghost create error {resp.status_code}: {resp.text}")
|
||
return resp.json()["posts"][0]
|
||
|
||
def publish_post(self, post_id: str, updated_at: str,
|
||
newsletter_slug: Optional[str], email_segment: Optional[str] = None) -> Dict[str, Any]:
|
||
slug = self.pick_newsletter_slug(newsletter_slug)
|
||
params = [f"newsletter={requests.utils.quote(slug)}"]
|
||
if email_segment:
|
||
params.append(f"email_segment={requests.utils.quote(email_segment)}")
|
||
url = self.base + f"posts/{post_id}/?{'&'.join(params)}"
|
||
body = {"posts": [{"updated_at": updated_at, "status": "published"}]}
|
||
resp = requests.put(url, headers=self._headers(), json=body, timeout=30)
|
||
if resp.status_code >= 400:
|
||
raise RuntimeError(f"Ghost publish error {resp.status_code}: {resp.text}")
|
||
return resp.json()["posts"][0]
|
||
|
||
# -------------------- Helpers (URLs, images, dates) --------------------
|
||
|
||
def _sanitize_url(u: Optional[str]) -> Optional[str]:
|
||
if not u or not isinstance(u, str):
|
||
return None
|
||
u = u.strip().replace("\\", "/")
|
||
if not u:
|
||
return None
|
||
if u.startswith("//"):
|
||
u = "https:" + u
|
||
if " " in u:
|
||
u = u.replace(" ", "%20")
|
||
p = urlparse(u)
|
||
if p.scheme not in ("http", "https") or not p.netloc:
|
||
return None
|
||
return u
|
||
|
||
def _fmt_dt(iso_dt: Optional[str]) -> Optional[str]:
|
||
if not iso_dt:
|
||
return None
|
||
try:
|
||
dt = datetime.fromisoformat(iso_dt.replace("Z", "+00:00")).astimezone(timezone.utc)
|
||
return dt.strftime("%Y-%m-%d %H:%M UTC")
|
||
except Exception:
|
||
return iso_dt
|
||
|
||
def _in_last_7_days(start_iso: Optional[str], end_iso: Optional[str]) -> bool:
|
||
now = datetime.now(timezone.utc)
|
||
window_start = now - timedelta(days=7)
|
||
try:
|
||
if start_iso:
|
||
s = datetime.fromisoformat(start_iso.replace("Z", "+00:00"))
|
||
else:
|
||
s = None
|
||
if end_iso:
|
||
e = datetime.fromisoformat(end_iso.replace("Z", "+00:00"))
|
||
else:
|
||
e = None
|
||
except Exception:
|
||
return False
|
||
# Overlap test: [s,e] intersects [now-7d, now]
|
||
s = s or now
|
||
e = e or now
|
||
return (s <= now) and (e >= window_start) or (s >= window_start and s <= now)
|
||
|
||
# -------------------- EGS freebies (async, via official endpoint) --------------------
|
||
|
||
async def fetch_egs_week(session: aiohttp.ClientSession,
|
||
locale="en-US", country="US", allow_countries=None) -> List[Dict[str, Any]]:
|
||
allow = allow_countries or country
|
||
url = ("https://store-site-backend-static.ak.epicgames.com/freeGamesPromotions"
|
||
f"?locale={locale}&country={country}&allowCountries={allow}")
|
||
async with session.get(url, headers=UA, timeout=aiohttp.ClientTimeout(total=25)) as r:
|
||
r.raise_for_status()
|
||
data = await r.json()
|
||
|
||
out: List[Dict[str, Any]] = []
|
||
elements = (((data or {}).get("data") or {}).get("Catalog") or {}).get("searchStore", {}).get("elements", []) or []
|
||
for item in elements:
|
||
title = item.get("title")
|
||
slug = item.get("productSlug")
|
||
if not slug:
|
||
mappings = (item.get("catalogNs", {}) or {}).get("mappings") or []
|
||
if mappings:
|
||
slug = mappings[0].get("pageSlug")
|
||
url_item = _sanitize_url(f"https://store.epicgames.com/p/{slug}" if slug else None)
|
||
key_images = item.get("keyImages") or []
|
||
pref_order = ("OfferImageWide", "DieselStoreFrontWide", "DieselStoreFront", "OfferImageTall", "Thumbnail")
|
||
chosen = None
|
||
for t in pref_order:
|
||
chosen = next((k for k in key_images if (k.get("type") or "").lower() == t.lower()), None)
|
||
if chosen:
|
||
break
|
||
if not chosen and key_images:
|
||
chosen = key_images[0]
|
||
hero = _sanitize_url(chosen.get("url")) if chosen else None
|
||
thumb = hero
|
||
|
||
promos = (item.get("promotions") or {})
|
||
for section in ("promotionalOffers", "upcomingPromotionalOffers"):
|
||
lst = promos.get(section) or []
|
||
for entry in lst:
|
||
for p in entry.get("promotionalOffers", []) or []:
|
||
start = p.get("startDate")
|
||
end = p.get("endDate")
|
||
if _in_last_7_days(start, end):
|
||
out.append({
|
||
"platform": "EGS",
|
||
"title": title,
|
||
"start": start,
|
||
"end": end,
|
||
"url": url_item,
|
||
"image": hero,
|
||
"thumbnail": thumb,
|
||
"is_current": None,
|
||
})
|
||
# keep unique by (title,start)
|
||
seen = set()
|
||
uniq = []
|
||
for it in out:
|
||
k = (it.get("title"), it.get("start"))
|
||
if k not in seen:
|
||
seen.add(k)
|
||
uniq.append(it)
|
||
# sort by start desc
|
||
uniq.sort(key=lambda e: _fmt_dt(e.get("start")) or "", reverse=True)
|
||
return uniq
|
||
|
||
# -------------------- PS Plus (async, blog scraping FR) --------------------
|
||
|
||
PSPLUS_CATEGORY_URL_FR = "https://blog.fr.playstation.com/category/ps-plus/"
|
||
BASE_BLOG = "https://blog.fr.playstation.com/"
|
||
|
||
def _soup(text: str) -> BeautifulSoup:
|
||
return BeautifulSoup(text, "html.parser")
|
||
|
||
def _clean_titles(candidates: List[str], limit: int = 7) -> List[str]:
|
||
cleaned, seen = [], set()
|
||
for g in candidates:
|
||
g = re.sub(r"\s*\|\s*PS[45].*$", "", g).strip()
|
||
g = re.sub(r"\s*\(PS\+\)$", "", g).strip()
|
||
if not g or len(g) < 2:
|
||
continue
|
||
k = g.lower()
|
||
if k not in seen:
|
||
seen.add(k)
|
||
cleaned.append(g)
|
||
if len(cleaned) >= limit:
|
||
break
|
||
return cleaned
|
||
|
||
def _abs_url(u: Optional[str], base: str) -> Optional[str]:
|
||
if not u:
|
||
return None
|
||
if u.startswith("http://") or u.startswith("https://"):
|
||
return u
|
||
if u.startswith("/"):
|
||
return BASE_BLOG.rstrip("/") + u
|
||
return base.rstrip("/") + "/" + u
|
||
|
||
def _extract_best_image(art: BeautifulSoup, article_url: str) -> Tuple[Optional[str], Optional[str]]:
|
||
for sel in [
|
||
'meta[property="og:image"]',
|
||
'meta[name="og:image"]',
|
||
'meta[name="twitter:image"]',
|
||
'meta[property="twitter:image"]',
|
||
]:
|
||
tag = art.select_one(sel)
|
||
if tag and tag.get("content"):
|
||
img = _abs_url(tag["content"].strip(), article_url)
|
||
if img:
|
||
return (img, img)
|
||
tag = art.select_one('link[rel="image_src"]')
|
||
if tag and tag.get("href"):
|
||
img = _abs_url(tag["href"].strip(), article_url)
|
||
if img:
|
||
return (img, img)
|
||
first_img = art.select_one("article img[src]")
|
||
if first_img:
|
||
img = _abs_url(first_img.get("src"), article_url)
|
||
return (img, img)
|
||
return (None, None)
|
||
|
||
async def fetch_psplus_week(session: aiohttp.ClientSession) -> Optional[Dict[str, Any]]:
|
||
"""Dernier billet FR “Les jeux du mois PlayStation Plus …”. On le retient s’il tombe dans T-7..T."""
|
||
async with session.get(PSPLUS_CATEGORY_URL_FR, headers=UA, timeout=aiohttp.ClientTimeout(total=25)) as r:
|
||
r.raise_for_status()
|
||
idx_html = await r.text()
|
||
idx = _soup(idx_html)
|
||
candidates: List[Tuple[str, str]] = []
|
||
for a in idx.select("a[href]"):
|
||
href = a.get("href") or ""
|
||
title = (a.get_text() or "").strip()
|
||
if not href.startswith(BASE_BLOG):
|
||
continue
|
||
if title.lower().startswith("les jeux du mois playstation plus"):
|
||
candidates.append((title, href))
|
||
if not candidates:
|
||
return None
|
||
|
||
latest: Optional[Tuple[datetime, str, str, BeautifulSoup]] = None
|
||
for title, href in candidates:
|
||
try:
|
||
async with session.get(href, headers=UA, timeout=aiohttp.ClientTimeout(total=25)) as r:
|
||
r.raise_for_status()
|
||
art_html = await r.text()
|
||
except Exception:
|
||
continue
|
||
art = _soup(art_html)
|
||
time_el = art.find("time")
|
||
date_iso = time_el.get("datetime") if time_el else None
|
||
if not date_iso:
|
||
continue
|
||
try:
|
||
dt_iso = datetime.fromisoformat(date_iso.replace("Z", "+00:00"))
|
||
except Exception:
|
||
continue
|
||
if latest is None or dt_iso > latest[0]:
|
||
latest = (dt_iso, title, href, art)
|
||
|
||
if latest is None:
|
||
return None
|
||
|
||
dt_iso, title, url, art = latest
|
||
now = datetime.now(timezone.utc)
|
||
if (now - dt_iso) > timedelta(days=7):
|
||
# Hors fenêtre -> pas de section PS+ cette semaine
|
||
return None
|
||
|
||
# extraire quelques titres listés dans l’article
|
||
games: List[str] = []
|
||
for node in art.find_all(string=True):
|
||
t = " ".join((node or "").strip().split())
|
||
if " | PS" in t and 0 < len(t) <= 120:
|
||
name = t.split(" | ")[0].strip("—:-– ")
|
||
if 2 <= len(name.split()) <= 8:
|
||
games.append(name)
|
||
if not games:
|
||
for tag in art.select("strong, em"):
|
||
t = " ".join(tag.get_text(" ", strip=True).split())
|
||
if 2 <= len(t.split()) <= 8:
|
||
games.append(t)
|
||
|
||
image_url, thumb_url = _extract_best_image(art, url)
|
||
return {
|
||
"title": title,
|
||
"date": dt_iso.replace(tzinfo=timezone.utc).isoformat(),
|
||
"url": url,
|
||
"games": _clean_titles(games),
|
||
"image": _sanitize_url(image_url),
|
||
"thumbnail": _sanitize_url(thumb_url),
|
||
}
|
||
|
||
# -------------------- Xbox Game Pass (sync, SIGLS + DisplayCatalog) --------------------
|
||
|
||
def _xbox_locale_from_languages(languages: str) -> str:
|
||
if not languages or "-" not in languages:
|
||
return "en-US"
|
||
lang, region = languages.split("-", 1)
|
||
return f"{lang.lower()}-{region.upper()}"
|
||
|
||
def _slugify(s: str) -> str:
|
||
s = re.sub(r"[^a-zA-Z0-9]+", "-", s).strip("-").lower()
|
||
return s or "game"
|
||
|
||
def _pick_images_displaycatalog(prod: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
|
||
def candidates_of(prod):
|
||
cands: List[Dict[str, Any]] = []
|
||
lp = prod.get("LocalizedProperties") or []
|
||
if isinstance(lp, list) and lp:
|
||
images = lp[0].get("Images") or []
|
||
if isinstance(images, list):
|
||
cands.extend(images)
|
||
top = prod.get("Images") or []
|
||
if isinstance(top, list):
|
||
cands.extend(top)
|
||
return cands
|
||
|
||
def is_pref(img: Dict[str, Any]) -> bool:
|
||
purpose = (img.get("ImagePurpose") or img.get("Purpose") or "").lower()
|
||
return any(k in purpose for k in ("poster", "brandedkeyart", "superherowide", "superheroart"))
|
||
|
||
cands = candidates_of(prod)
|
||
if not cands:
|
||
return None, None
|
||
pref = [i for i in cands if is_pref(i)] or cands
|
||
try:
|
||
hero_raw = max(pref, key=lambda i: i.get("Width", 0)).get("Uri")
|
||
except Exception:
|
||
hero_raw = pref[0].get("Uri")
|
||
try:
|
||
thumb_raw = min(pref, key=lambda i: i.get("Width", 10**9)).get("Uri")
|
||
except Exception:
|
||
thumb_raw = hero_raw
|
||
hero = _sanitize_url(hero_raw)
|
||
thumb = _sanitize_url(thumb_raw)
|
||
return hero or thumb, thumb or hero
|
||
|
||
def fetch_xgp_recent(limit=20, market="US", languages="en-us") -> List[Dict[str, Any]]:
|
||
"""Best-effort: ‘Recently added’ via SIGLS; fallback All games; enrich via DisplayCatalog."""
|
||
recently_ids = [
|
||
"3fdd7f57-7092-4b65-bd40-5a9dac1b2b84",
|
||
"61d6e1a1-735c-4b97-9d15-22ce8dfb0c03",
|
||
"7d2d3d36-1c52-4a63-8b3e-3b7ee4d0f62a",
|
||
]
|
||
all_games_id = "29a81209-df6f-41fd-a528-2ae6b91f719c"
|
||
|
||
def sigls(collection_id: str) -> List[str]:
|
||
try:
|
||
r = requests.get("https://catalog.gamepass.com/sigls/v2",
|
||
params={"language": languages, "market": market, "id": collection_id},
|
||
headers=UA, timeout=25)
|
||
r.raise_for_status()
|
||
payload = r.json()
|
||
except Exception:
|
||
return []
|
||
ids: List[str] = []
|
||
if isinstance(payload, list):
|
||
for obj in payload:
|
||
if isinstance(obj, dict) and "id" in obj and isinstance(obj["id"], str):
|
||
ids.append(obj["id"])
|
||
elif isinstance(obj, str):
|
||
ids.append(obj)
|
||
return ids
|
||
|
||
big_ids: List[str] = []
|
||
for cid in recently_ids:
|
||
ids = sigls(cid)
|
||
if ids:
|
||
big_ids = ids
|
||
break
|
||
if not big_ids:
|
||
big_ids = sigls(all_games_id)
|
||
if not big_ids:
|
||
return []
|
||
|
||
out: List[Dict[str, Any]] = []
|
||
seen = set()
|
||
for i in range(0, len(big_ids), 20):
|
||
chunk = big_ids[i:i+20]
|
||
try:
|
||
dc = requests.get("https://displaycatalog.mp.microsoft.com/v7.0/products",
|
||
params={"bigIds": ",".join(chunk), "market": market, "languages": languages},
|
||
headers=UA, timeout=25)
|
||
dc.raise_for_status()
|
||
data = dc.json()
|
||
except Exception:
|
||
continue
|
||
for p in (data.get("Products") or []):
|
||
pid = p.get("ProductId")
|
||
if not pid or pid in seen:
|
||
continue
|
||
seen.add(pid)
|
||
title = pid
|
||
lp = p.get("LocalizedProperties") or []
|
||
if lp:
|
||
title = lp[0].get("ProductTitle") or pid
|
||
locale_path = _xbox_locale_from_languages("fr-fr")
|
||
url = f"https://www.xbox.com/{locale_path}/games/store/{_slugify(title)}/{pid}"
|
||
hero, thumb = _pick_images_displaycatalog(p)
|
||
out.append({"platform": "XGP", "title": title, "productId": pid, "url": _sanitize_url(url),
|
||
"image": hero, "thumbnail": thumb})
|
||
return out[:limit]
|
||
|
||
# -------------------- HTML builder --------------------
|
||
|
||
YOUTUBE_EMBED_TMPL = (
|
||
'<div class="yt-container" style="position:relative;aspect-ratio:16/9;max-width:800px;margin:1rem 0">'
|
||
'<iframe src="https://www.youtube.com/embed/{vid}" title="YouTube video" loading="lazy" '
|
||
'style="position:absolute;inset:0;width:100%;height:100%;border:0" '
|
||
'allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share" '
|
||
'allowfullscreen></iframe></div>'
|
||
)
|
||
|
||
def build_html(egs: List[Dict[str, Any]],
|
||
psplus: Optional[Dict[str, Any]],
|
||
xgp: List[Dict[str, Any]]) -> Tuple[str, Optional[str]]:
|
||
parts: List[str] = []
|
||
now_local = datetime.now(TZ)
|
||
title_h2 = f"Presque Gratuit — semaine du {(now_local - timedelta(days=6)).strftime('%d/%m')} au {now_local.strftime('%d/%m')}"
|
||
parts.append(f"<h2>{html.escape(title_h2)}</h2>")
|
||
feature: Optional[str] = None
|
||
|
||
# --- Epic Games Store
|
||
parts.append("<h3>🎁 Epic Games Store — Jeux gratuits (7 jours)</h3>")
|
||
if not egs:
|
||
parts.append("<p>Aucun jeu gratuit relevé sur la période.</p>")
|
||
else:
|
||
for it in egs:
|
||
title = html.escape(it.get("title") or "")
|
||
url = it.get("url") or ""
|
||
start = _fmt_dt(it.get("start")) or "?"
|
||
end = _fmt_dt(it.get("end")) or "?"
|
||
img = it.get("thumbnail")
|
||
if img and not feature:
|
||
feature = img
|
||
parts.append("<div style='margin:12px 0'>")
|
||
if img:
|
||
parts.append(f'<p><img src="{html.escape(img)}" style="max-width:100%;height:auto;border:0"/></p>')
|
||
parts.append(f"<p><strong>{title}</strong><br/>Période : {start} → {end}</p>")
|
||
if url:
|
||
parts.append(f'<p><a href="{html.escape(url)}">{html.escape(url)}</a></p>')
|
||
parts.append("</div>")
|
||
|
||
# --- PlayStation Plus
|
||
parts.append("<h3>🎮 PlayStation Plus — Jeux du mois (si annoncé cette semaine)</h3>")
|
||
if not psplus:
|
||
parts.append("<p>Aucun nouveau billet PS+ dans la fenêtre des 7 jours.</p>")
|
||
else:
|
||
if psplus.get("image") and not feature:
|
||
feature = psplus["image"]
|
||
parts.append("<div style='margin:12px 0'>")
|
||
if psplus.get("image"):
|
||
parts.append(f'<p><img src="{html.escape(psplus["image"])}" style="max-width:100%;height:auto;border:0"/></p>')
|
||
title = html.escape(psplus.get("title") or "PlayStation Plus — Jeux du mois")
|
||
url = psplus.get("url")
|
||
if url:
|
||
parts.append(f"<p><strong><a href='{html.escape(url)}'>{title}</a></strong></p>")
|
||
else:
|
||
parts.append(f"<p><strong>{title}</strong></p>")
|
||
games = psplus.get("games") or []
|
||
if games:
|
||
parts.append("<ul>" + "".join(f"<li>{html.escape(g)}</li>" for g in games) + "</ul>")
|
||
parts.append("</div>")
|
||
|
||
# --- Xbox Game Pass
|
||
parts.append("<h3>🟩 Xbox Game Pass — Récemment ajoutés</h3>")
|
||
if not xgp:
|
||
parts.append("<p>Pas d'entrées détectées.</p>")
|
||
else:
|
||
for it in xgp:
|
||
title = html.escape(it.get("title") or it.get("productId") or "")
|
||
url = it.get("url") or ""
|
||
img = it.get("thumbnail")
|
||
if img and not feature:
|
||
feature = img
|
||
parts.append("<div style='margin:12px 0'>")
|
||
if img:
|
||
parts.append(f'<p><img src="{html.escape(img)}" style="max-width:100%;height:auto;border:0"/></p>')
|
||
parts.append(f"<p><strong>{title}</strong></p>")
|
||
if url:
|
||
parts.append(f'<p><a href="{html.escape(url)}">{html.escape(url)}</a></p>')
|
||
parts.append("</div>")
|
||
|
||
parts.append("<hr><p><em>Newsletter hebdomadaire — envoyée automatiquement chaque dimanche à midi.</em></p>")
|
||
return "\n".join(parts), feature
|
||
|
||
# -------------------- Orchestrator --------------------
|
||
|
||
async def run_weekly():
|
||
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
|
||
admin_url = os.environ["GHOST_ADMIN_URL"]
|
||
admin_key = os.environ["GHOST_ADMIN_KEY"]
|
||
newsletter_slug = os.environ.get("GHOST_NEWSLETTER_SLUG")
|
||
ghost = GhostAdmin(admin_url, admin_key)
|
||
|
||
# Fetch all sources (with timeouts and isolation)
|
||
egs_items: List[Dict[str, Any]] = []
|
||
psplus_data: Optional[Dict[str, Any]] = None
|
||
xgp_items: List[Dict[str, Any]] = []
|
||
|
||
try:
|
||
async with aiohttp.ClientSession(headers=UA) as s:
|
||
# EGS
|
||
try:
|
||
locale = os.environ.get("EGS_LOCALE", "en-US")
|
||
country = os.environ.get("EGS_COUNTRY", "US")
|
||
allow = os.environ.get("EGS_ALLOW_COUNTRIES", country)
|
||
egs_items = await fetch_egs_week(s, locale=locale, country=country, allow_countries=allow)
|
||
except Exception as e:
|
||
LOG.warning("EGS fetch failed: %s", e)
|
||
|
||
# PS Plus
|
||
try:
|
||
psplus_data = await fetch_psplus_week(s)
|
||
except Exception as e:
|
||
LOG.warning("PS+ fetch failed: %s", e)
|
||
except Exception as e:
|
||
LOG.warning("HTTP session failed: %s", e)
|
||
|
||
# XGP (requests, sync)
|
||
try:
|
||
market = os.environ.get("XGP_MARKET", "US")
|
||
languages = os.environ.get("XGP_LANGUAGES", "en-us")
|
||
xgp_items = fetch_xgp_recent(limit=20, market=market, languages=languages)
|
||
except Exception as e:
|
||
LOG.warning("XGP fetch failed: %s", e)
|
||
|
||
# Build HTML
|
||
html_body, feature = build_html(egs_items, psplus_data, xgp_items)
|
||
# Title (FR)
|
||
start = (datetime.now(TZ) - timedelta(days=6)).strftime("%d/%m/%Y")
|
||
end = datetime.now(TZ).strftime("%d/%m/%Y")
|
||
title = f"Récap hebdo — EGS, PS Plus, Game Pass ({start} → {end})"
|
||
|
||
# Create + publish + email
|
||
created = ghost.create_post_html(title, html_body, status="draft", feature_image=feature)
|
||
#ghost.publish_post(created["id"], created["updated_at"], newsletter_slug=newsletter_slug)
|
||
|
||
LOG.info("Published weekly newsletter: %s", created.get("url"))
|
||
|
||
# -------------------- Scheduler --------------------
|
||
|
||
async def run_forever_sunday_noon():
|
||
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
|
||
"""Run at next Sunday 12:00 Europe/Brussels, then every 7 days."""
|
||
while True:
|
||
now = datetime.now(TZ)
|
||
# days until Sunday (weekday(): Monday=0..Sunday=6)
|
||
days = (6 - now.weekday()) % 7
|
||
target = (now + timedelta(days=days)).replace(hour=12, minute=0, second=0, microsecond=0)
|
||
if target <= now:
|
||
target = target + timedelta(days=7)
|
||
wait = (target - now).total_seconds()
|
||
LOG.info("Next run at %s (in %.0f min)", target.isoformat(), wait/60)
|
||
await asyncio.sleep(wait)
|
||
try:
|
||
await run_weekly()
|
||
except Exception as e:
|
||
LOG.exception("Weekly run failed: %s", e)
|
||
# then sleep 7 days
|
||
await asyncio.sleep(7 * 24 * 3600)
|
||
|
||
# -------------------- Entrypoint --------------------
|
||
|
||
async def main():
|
||
setuplogger()
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument("--runonce", action="store_true", help="Run now and exit (no scheduler)")
|
||
args = parser.parse_args()
|
||
if args.runonce:
|
||
await run_weekly()
|
||
else:
|
||
await run_forever_sunday_noon()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
asyncio.run(main()) |