adding caching to presque gratos
This commit is contained in:
162
backfill.py
Normal file
162
backfill.py
Normal file
@@ -0,0 +1,162 @@
|
||||
# backfill_from_ghost.py
|
||||
from __future__ import annotations
|
||||
import os, re, sys, html
|
||||
from typing import Dict, List, Optional
|
||||
import requests
|
||||
from bs4 import BeautifulSoup
|
||||
|
||||
# Reuse your existing GhostAdmin client (same headers/base/proxy behavior)
|
||||
# Adjust the import path if your Ghost client lives elsewhere.
|
||||
from presquegratos import GhostAdmin
|
||||
|
||||
from storage import Storage
|
||||
from keys import xgp_key, egs_key, psplus_key
|
||||
|
||||
# ---------------- Ghost helpers (reusing your admin client) ----------------
|
||||
def ghost_list_posts(ghost: GhostAdmin, page: int = 1) -> Dict:
|
||||
# Minimal params: avoid 'filter' and 'fields' to dodge 400 behind __bot proxy
|
||||
url = ghost.base + "posts/"
|
||||
params = {
|
||||
"limit": "50",
|
||||
"page": str(page),
|
||||
"order": "published_at DESC",
|
||||
"formats": "lexical,html", # <-- IMPORTANT
|
||||
}
|
||||
r = requests.get(url, headers=ghost._headers(), params=params, timeout=30)
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
def list_recap_posts(ghost: GhostAdmin, hard_limit: int = 2000) -> List[Dict]:
|
||||
posts: List[Dict] = []
|
||||
page = 1
|
||||
while True:
|
||||
data = ghost_list_posts(ghost, page=page)
|
||||
batch = data.get("posts", [])
|
||||
if not batch:
|
||||
break
|
||||
# client-side filter to be robust to proxy quirks
|
||||
for p in batch:
|
||||
title = (p.get("title") or "").strip()
|
||||
if title.startswith("Récap hebdo"):
|
||||
posts.append(p)
|
||||
if len(batch) < 50 or len(posts) >= hard_limit:
|
||||
break
|
||||
page += 1
|
||||
return posts
|
||||
|
||||
# ---------------- Parsing helpers (unchanged) ----------------
|
||||
#MS_STORE_RE = re.compile(r"(?:microsoft|xbox)\.com/.*/store/.*/([0-9A-Z]{12,})", re.I)
|
||||
MS_STORE_RE = re.compile(r"(?:xbox|microsoft)\.com/.*/store/.*/([0-9A-Z]{12,16})", re.I)
|
||||
EPIC_RE = re.compile(r"epicgames\.com/store/.*/p/([\w\-]+)", re.I)
|
||||
PSBLOG_RE = re.compile(r"blog\.playstation\.com/.*", re.I)
|
||||
|
||||
def clean_text(s: str) -> str:
|
||||
return re.sub(r"\s+", " ", html.unescape(s or "")).strip()
|
||||
|
||||
def extract_sections(soup: BeautifulSoup) -> Dict[str, BeautifulSoup]:
|
||||
sections: Dict[str, BeautifulSoup] = {}
|
||||
current = None
|
||||
current_key = None
|
||||
for node in soup.find_all(["h2","h3","h4","p","ul","ol","div","section"]):
|
||||
if node.name in ("h2","h3","h4"):
|
||||
title = clean_text(node.get_text())
|
||||
key = None
|
||||
tl = title.lower()
|
||||
if "game pass" in tl:
|
||||
key = "xgp"
|
||||
elif "egs" in tl or "epic" in tl:
|
||||
key = "egs"
|
||||
elif "ps plus" in tl or "ps+" in tl:
|
||||
key = "psplus"
|
||||
if key:
|
||||
current_key = key
|
||||
current = sections[key] = soup.new_tag("div")
|
||||
continue
|
||||
if current_key and current is not None:
|
||||
current.append(node)
|
||||
return sections
|
||||
|
||||
def parse_xgp(section: BeautifulSoup) -> List[Dict]:
|
||||
items = []
|
||||
for a in section.find_all("a", href=True):
|
||||
href = a["href"]
|
||||
m = MS_STORE_RE.search(href)
|
||||
title = clean_text(a.get_text())
|
||||
if m or title:
|
||||
productId = m.group(1) if m else None
|
||||
items.append({"title": title, "productId": productId})
|
||||
uniq, seen = [], set()
|
||||
for it in items:
|
||||
k = xgp_key(it)
|
||||
if k not in seen:
|
||||
uniq.append(it); seen.add(k)
|
||||
return uniq
|
||||
|
||||
def parse_egs(section: BeautifulSoup) -> List[Dict]:
|
||||
items = []
|
||||
for a in section.find_all("a", href=True):
|
||||
if not EPIC_RE.search(a["href"]):
|
||||
continue
|
||||
title = clean_text(a.get_text()) or clean_text(a.get("title"))
|
||||
items.append({"title": title, "start": ""})
|
||||
uniq, seen = [], set()
|
||||
for it in items:
|
||||
k = egs_key(it)
|
||||
if k not in seen:
|
||||
uniq.append(it); seen.add(k)
|
||||
return uniq
|
||||
|
||||
def parse_psplus(section: BeautifulSoup, post_title: str) -> Optional[Dict]:
|
||||
a = section.find("a", href=PSBLOG_RE)
|
||||
url = a["href"] if a else ""
|
||||
m = re.search(r"(\d{2})-(\d{2})-(\d{4})", post_title)
|
||||
iso = ""
|
||||
if m:
|
||||
d, mth, y = m.group(1), m.group(2), m.group(3)
|
||||
iso = f"{y}-{mth}-{d}"
|
||||
return {"url": url, "date": iso}
|
||||
|
||||
# ---------------- Main backfill ----------------
|
||||
def backfill():
|
||||
# Use the same env your main script uses; GhostAdmin will read them internally or
|
||||
# you can pass them explicitly if your class expects (base_url, admin_key).
|
||||
ghost = GhostAdmin(
|
||||
admin_url=os.environ.get("GHOST_ADMIN_URL", "").rstrip("/") + "/",
|
||||
admin_key=os.environ.get("GHOST_ADMIN_KEY", "")
|
||||
)
|
||||
store = Storage()
|
||||
|
||||
posts = list_recap_posts(ghost)
|
||||
print(f"Found {len(posts)} recap posts.")
|
||||
|
||||
total_xgp = total_egs = total_ps = 0
|
||||
|
||||
dedup = []
|
||||
|
||||
for p in posts:
|
||||
|
||||
pid = p["id"]
|
||||
title = p.get("title") or ""
|
||||
html_body = p.get("html") or ""
|
||||
|
||||
soup = BeautifulSoup(html_body, "html.parser")
|
||||
sections = extract_sections(soup)
|
||||
|
||||
for it in parse_xgp(sections.get("xgp", BeautifulSoup("", "html.parser"))):
|
||||
key = xgp_key(it)
|
||||
if not key in dedup:
|
||||
store.remember("xgp", key, pid); total_xgp += 1
|
||||
dedup.append(key)
|
||||
|
||||
# for it in parse_egs(sections.get("egs", BeautifulSoup("", "html.parser"))):
|
||||
# store.remember("egs", egs_key(it), pid); total_egs += 1
|
||||
# if "psplus" in sections:
|
||||
# item = parse_psplus(sections["psplus"], title)
|
||||
# store.remember("psplus", psplus_key(item), pid); total_ps += 1
|
||||
|
||||
print(f"Backfilled from: {title}")
|
||||
|
||||
print(f"Done. Inserted ~ XGP:{total_xgp} | EGS:{total_egs} | PS+:{total_ps}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
backfill()
|
||||
Reference in New Issue
Block a user