diff --git a/backfill.py b/backfill.py new file mode 100644 index 0000000..d5d4664 --- /dev/null +++ b/backfill.py @@ -0,0 +1,162 @@ +# backfill_from_ghost.py +from __future__ import annotations +import os, re, sys, html +from typing import Dict, List, Optional +import requests +from bs4 import BeautifulSoup + +# Reuse your existing GhostAdmin client (same headers/base/proxy behavior) +# Adjust the import path if your Ghost client lives elsewhere. +from presquegratos import GhostAdmin + +from storage import Storage +from keys import xgp_key, egs_key, psplus_key + +# ---------------- Ghost helpers (reusing your admin client) ---------------- +def ghost_list_posts(ghost: GhostAdmin, page: int = 1) -> Dict: + # Minimal params: avoid 'filter' and 'fields' to dodge 400 behind __bot proxy + url = ghost.base + "posts/" + params = { + "limit": "50", + "page": str(page), + "order": "published_at DESC", + "formats": "lexical,html", # <-- IMPORTANT + } + r = requests.get(url, headers=ghost._headers(), params=params, timeout=30) + r.raise_for_status() + return r.json() + +def list_recap_posts(ghost: GhostAdmin, hard_limit: int = 2000) -> List[Dict]: + posts: List[Dict] = [] + page = 1 + while True: + data = ghost_list_posts(ghost, page=page) + batch = data.get("posts", []) + if not batch: + break + # client-side filter to be robust to proxy quirks + for p in batch: + title = (p.get("title") or "").strip() + if title.startswith("Récap hebdo"): + posts.append(p) + if len(batch) < 50 or len(posts) >= hard_limit: + break + page += 1 + return posts + +# ---------------- Parsing helpers (unchanged) ---------------- +#MS_STORE_RE = re.compile(r"(?:microsoft|xbox)\.com/.*/store/.*/([0-9A-Z]{12,})", re.I) +MS_STORE_RE = re.compile(r"(?:xbox|microsoft)\.com/.*/store/.*/([0-9A-Z]{12,16})", re.I) +EPIC_RE = re.compile(r"epicgames\.com/store/.*/p/([\w\-]+)", re.I) +PSBLOG_RE = re.compile(r"blog\.playstation\.com/.*", re.I) + +def clean_text(s: str) -> str: + return re.sub(r"\s+", " ", html.unescape(s or "")).strip() + +def extract_sections(soup: BeautifulSoup) -> Dict[str, BeautifulSoup]: + sections: Dict[str, BeautifulSoup] = {} + current = None + current_key = None + for node in soup.find_all(["h2","h3","h4","p","ul","ol","div","section"]): + if node.name in ("h2","h3","h4"): + title = clean_text(node.get_text()) + key = None + tl = title.lower() + if "game pass" in tl: + key = "xgp" + elif "egs" in tl or "epic" in tl: + key = "egs" + elif "ps plus" in tl or "ps+" in tl: + key = "psplus" + if key: + current_key = key + current = sections[key] = soup.new_tag("div") + continue + if current_key and current is not None: + current.append(node) + return sections + +def parse_xgp(section: BeautifulSoup) -> List[Dict]: + items = [] + for a in section.find_all("a", href=True): + href = a["href"] + m = MS_STORE_RE.search(href) + title = clean_text(a.get_text()) + if m or title: + productId = m.group(1) if m else None + items.append({"title": title, "productId": productId}) + uniq, seen = [], set() + for it in items: + k = xgp_key(it) + if k not in seen: + uniq.append(it); seen.add(k) + return uniq + +def parse_egs(section: BeautifulSoup) -> List[Dict]: + items = [] + for a in section.find_all("a", href=True): + if not EPIC_RE.search(a["href"]): + continue + title = clean_text(a.get_text()) or clean_text(a.get("title")) + items.append({"title": title, "start": ""}) + uniq, seen = [], set() + for it in items: + k = egs_key(it) + if k not in seen: + uniq.append(it); seen.add(k) + return uniq + +def parse_psplus(section: BeautifulSoup, post_title: str) -> Optional[Dict]: + a = section.find("a", href=PSBLOG_RE) + url = a["href"] if a else "" + m = re.search(r"(\d{2})-(\d{2})-(\d{4})", post_title) + iso = "" + if m: + d, mth, y = m.group(1), m.group(2), m.group(3) + iso = f"{y}-{mth}-{d}" + return {"url": url, "date": iso} + +# ---------------- Main backfill ---------------- +def backfill(): + # Use the same env your main script uses; GhostAdmin will read them internally or + # you can pass them explicitly if your class expects (base_url, admin_key). + ghost = GhostAdmin( + admin_url=os.environ.get("GHOST_ADMIN_URL", "").rstrip("/") + "/", + admin_key=os.environ.get("GHOST_ADMIN_KEY", "") + ) + store = Storage() + + posts = list_recap_posts(ghost) + print(f"Found {len(posts)} recap posts.") + + total_xgp = total_egs = total_ps = 0 + + dedup = [] + + for p in posts: + + pid = p["id"] + title = p.get("title") or "" + html_body = p.get("html") or "" + + soup = BeautifulSoup(html_body, "html.parser") + sections = extract_sections(soup) + + for it in parse_xgp(sections.get("xgp", BeautifulSoup("", "html.parser"))): + key = xgp_key(it) + if not key in dedup: + store.remember("xgp", key, pid); total_xgp += 1 + dedup.append(key) + + # for it in parse_egs(sections.get("egs", BeautifulSoup("", "html.parser"))): + # store.remember("egs", egs_key(it), pid); total_egs += 1 + # if "psplus" in sections: + # item = parse_psplus(sections["psplus"], title) + # store.remember("psplus", psplus_key(item), pid); total_ps += 1 + + print(f"Backfilled from: {title}") + + print(f"Done. Inserted ~ XGP:{total_xgp} | EGS:{total_egs} | PS+:{total_ps}") + +if __name__ == "__main__": + backfill() diff --git a/keys.py b/keys.py new file mode 100644 index 0000000..973ea51 --- /dev/null +++ b/keys.py @@ -0,0 +1,20 @@ +# keys.py (or inline in your main) +def xgp_key(item) -> str: + # Prefer stable Microsoft Store productId if present; fallback to normalized title. + pid = (item.get("productId") or "").strip() + if pid: + return f"item:xgp:{pid}" + title = (item.get("title") or "").strip().lower() + return f"item:xgp:title:{title}" + +def egs_key(item) -> str: + # Use title + start window (your fetcher usually knows the free-week start) + title = (item.get("title") or "").strip() + start = (item.get("start") or "").strip() # ISO or YYYY-MM-DD + return f"item:egs:{title}|{start}" + +def psplus_key(item) -> str: + # Use official PS Blog URL + the published month (or your computed date) + url = (item.get("url") or "").strip() + date = (item.get("date") or "").strip() + return f"item:psplus:{url}|{date}" diff --git a/presquegratos.py b/presquegratos.py index 7d4f3f1..b7f12bb 100644 --- a/presquegratos.py +++ b/presquegratos.py @@ -21,10 +21,13 @@ from bs4 import BeautifulSoup from logging.handlers import RotatingFileHandler from datetime import datetime, timedelta, timezone import zoneinfo +from storage import Storage +from keys import xgp_key LOG = logging.getLogger("bot_weekly") LOG_PATTERN = logging.Formatter("%(asctime)s:%(levelname)s: [%(filename)s] %(message)s") + def setuplogger(): stream_handler = logging.StreamHandler() stream_handler.setFormatter(LOG_PATTERN) @@ -505,6 +508,7 @@ def build_html(egs: List[Dict[str, Any]], parts.append("") # --- Xbox Game Pass + parts.append("
Pas d'entrées détectées.
") @@ -568,6 +572,23 @@ async def run_weekly(): LOG.warning("XGP fetch failed: %s", e) # Build HTML + store = Storage() + + def keep_new(items, key_fn): + fresh = [] + for it in items: + k = key_fn(it) + + if not store.seen(k): + + it["_dedup_key"] = k + fresh.append(it) + return fresh + + # Fetch your three sources as you already do: + + xgp_items = keep_new(xgp_items, xgp_key) + html_body, feature = build_html(egs_items, psplus_data, xgp_items) # Title (FR) start = (datetime.now(TZ) - timedelta(days=6)).strftime("%d/%m/%Y") diff --git a/storage.py b/storage.py new file mode 100644 index 0000000..9a7a540 --- /dev/null +++ b/storage.py @@ -0,0 +1,55 @@ +# storage.py +from __future__ import annotations +import sqlite3, pathlib, datetime as dt +from typing import Optional, Iterable, Tuple +import os +DB_PATH = "/data/published.db" # bind-mount ./data:/data in docker + +_SCHEMA = """ +PRAGMA journal_mode = WAL; +CREATE TABLE IF NOT EXISTS published_items( + platform TEXT NOT NULL, -- e.g. xgp | egs | psplus + key TEXT PRIMARY KEY, -- your dedupe key (see below) + first_seen_utc TEXT NOT NULL, -- ISO-8601 + last_post_id TEXT -- Ghost post id that recorded it +); +CREATE INDEX IF NOT EXISTS idx_platform ON published_items(platform); +""" + +class Storage: + def __init__(self, db_path: str = DB_PATH): + + + if not os.path.isfile(db_path): + db_path = os.environ.get("DB_FILE_FALLBACK", r"f:\workspace\Substack_JV\data\published.db") + + print(db_path) + pathlib.Path(db_path).parent.mkdir(parents=True, exist_ok=True) + self.conn = sqlite3.connect(db_path) + self.conn.execute("PRAGMA foreign_keys = ON;") + for stmt in filter(None, _SCHEMA.split(";")): + if stmt.strip(): + self.conn.execute(stmt) + + def seen(self, key: str) -> bool: + cur = self.conn.execute("SELECT 1 FROM published_items WHERE key=?", (key,)) + return cur.fetchone() is not None + + def remember(self, platform: str, key: str, post_id: Optional[str]): + self.conn.execute( + "INSERT OR IGNORE INTO published_items(platform,key,first_seen_utc,last_post_id) VALUES(?,?,?,?)", + (platform, key, dt.datetime.utcnow().isoformat(), post_id), + ) + if post_id: + self.conn.execute("UPDATE published_items SET last_post_id=? WHERE key=?", (post_id, key)) + self.conn.commit() + + def bulk_remember(self, platform: str, pairs: Iterable[Tuple[str, Optional[str]]]): + rows = [(platform, k, dt.datetime.utcnow().isoformat(), pid) for (k, pid) in pairs] + self.conn.executemany( + "INSERT OR IGNORE INTO published_items(platform,key,first_seen_utc,last_post_id) VALUES(?,?,?,?)", + rows + ) + self.conn.commit() + +