Files
Substack_JV/post_rss_to_ghost.py
2025-09-08 18:23:36 +02:00

429 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import argparse
import datetime as dt
import html
import logging
import os
import random
import re
import time
from logging.handlers import RotatingFileHandler
from typing import Optional, List
import feedparser
import requests
import jwt
import zoneinfo # Python 3.9+
from urllib.parse import urlparse, parse_qs
# ------------- YouTube helpers -------------
YOUTUBE_EMBED_TMPL = (
'<div class="yt-container" style="position:relative;aspect-ratio:16/9;max-width:800px;margin:1rem 0">'
'<iframe src="https://www.youtube.com/embed/{vid}" '
'title="YouTube video" loading="lazy" '
'style="position:absolute;inset:0;width:100%;height:100%;border:0" '
'allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share" '
'allowfullscreen></iframe></div>'
)
def extract_youtube_id(url: str) -> Optional[str]:
try:
u = urlparse(url)
host = u.netloc.lower()
if host.endswith("youtube.com"):
if u.path == "/watch":
return parse_qs(u.query).get("v", [None])[0]
m = re.match(r"^/(shorts/|live/)?([A-Za-z0-9_-]{6,})", u.path)
if m:
return m.group(2)
if host == "youtu.be":
slug = u.path.strip("/").split("/")[0]
return slug or None
except Exception:
return None
return None
# ------------- Logging -------------
LOG = logging.getLogger("bot")
LOG_PATTERN = logging.Formatter("%(asctime)s:%(levelname)s: [%(filename)s] %(message)s")
def setuplogger():
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(LOG_PATTERN)
stream_handler.setLevel(logging.DEBUG)
file_handler = RotatingFileHandler("bot.log", "a", 1_000_000, 1)
file_handler.setFormatter(LOG_PATTERN)
LOG.setLevel(logging.DEBUG)
LOG.addHandler(stream_handler)
LOG.addHandler(file_handler)
# ------------- Model -------------
class RSSfeed:
def __init__(self, url: str, yt: bool = False):
self.url = url
self.youtube = yt
# ------------- Ghost Admin API client -------------
class GhostAdmin:
def __init__(self, admin_url: str, admin_key: str, accept_version: str = "v6.0"):
self.base = admin_url.rstrip("/") + "/"
self.key_id, self.key_secret_hex = admin_key.split(":")
self.accept_version = accept_version
def _jwt(self) -> str:
iat = int(time.time())
payload = {"iat": iat, "exp": iat + 5 * 60, "aud": "/admin/"}
headers = {"alg": "HS256", "typ": "JWT", "kid": self.key_id}
token = jwt.encode(payload, bytes.fromhex(self.key_secret_hex), algorithm="HS256", headers=headers)
return token if isinstance(token, str) else token.decode("utf-8")
def _headers(self):
return {
"Authorization": f"Ghost {self._jwt()}",
"Accept-Version": self.accept_version,
"Content-Type": "application/json",
}
def latest_published_date(self, tz_name: str = "Europe/Brussels"):
"""
Date (aware) de la dernière publication (status=published), ou None.
"""
url = self.base + "posts/?limit=1&order=published_at%20desc&fields=published_at"
resp = requests.get(url, headers=self._headers(), timeout=20)
resp.raise_for_status()
posts = resp.json().get("posts", [])
if not posts or not posts[0].get("published_at"):
return None
# ISO 8601 → aware UTC → converti tz locale
dtu = dt.datetime.fromisoformat(posts[0]["published_at"].replace("Z", "+00:00"))
return dtu.astimezone(zoneinfo.ZoneInfo(tz_name))
def get_newsletters(self):
url = self.base + "newsletters/"
resp = requests.get(url, headers=self._headers(), timeout=20)
if resp.status_code >= 400:
raise RuntimeError(f"Ghost newsletters error {resp.status_code}: {resp.text}")
return resp.json().get("newsletters", [])
def pick_newsletter_slug(self, preferred_slug: Optional[str]) -> str:
if preferred_slug:
return preferred_slug
nls = self.get_newsletters()
if not nls:
raise RuntimeError("No newsletters configured in Ghost (Settings → Newsletters).")
actives = [n for n in nls if n.get("status") == "active"]
for n in actives:
if n.get("is_default"):
return n.get("slug")
return (actives or nls)[0].get("slug")
def create_post_html(self, title: str, html_content: str, status: str = "draft", feature_image: Optional[str] = None):
url = self.base + "posts/?source=html"
post = {"title": title, "html": html_content, "status": status}
if feature_image:
post["feature_image"] = feature_image
resp = requests.post(url, headers=self._headers(), json={"posts": [post]}, timeout=30)
if resp.status_code >= 400:
raise RuntimeError(f"Ghost create error {resp.status_code}: {resp.text}")
return resp.json()["posts"][0]
def publish_post(self, post_id: str, updated_at: str, newsletter_slug: Optional[str], email_segment: Optional[str]):
slug = self.pick_newsletter_slug(newsletter_slug)
params = [f"newsletter={requests.utils.quote(slug)}"]
if email_segment:
params.append(f"email_segment={requests.utils.quote(email_segment)}")
url = self.base + f"posts/{post_id}/?{'&'.join(params)}"
body = {"posts": [{"updated_at": updated_at, "status": "published"}]}
resp = requests.put(url, headers=self._headers(), json=body, timeout=30)
if resp.status_code >= 400:
raise RuntimeError(f"Ghost publish error {resp.status_code}: {resp.text}")
return resp.json()["posts"][0]
# ------------- Task orchestration -------------
class GhostTask:
def __init__(self, feeds: List[RSSfeed], admin_url: str, admin_key: str, newsletter_slug: Optional[str] = None, email_segment: Optional[str] = None):
self.ghost = GhostAdmin(admin_url, admin_key)
self.feeds = feeds
self.newsletter_slug = newsletter_slug
self.email_segment = email_segment
for feed in self.feeds:
LOG.info("Adding feed %s", feed.url)
# --- startup immediate run if not yet published today
def _published_today(self) -> bool:
tz = zoneinfo.ZoneInfo("Europe/Brussels")
last = self.ghost.latest_published_date("Europe/Brussels")
if not last:
return False
return last.date() == dt.datetime.now(tz).date()
async def maybe_run_today(self):
if not self._published_today():
LOG.info("Aucune newsletter publiée aujourd'hui → génération immédiate.")
await self.daily_task()
else:
LOG.info("Déjà publié aujourd'hui, on attend la prochaine fenêtre.")
# --- utils
@staticmethod
def _fr_date_today() -> str:
months = {
'January': 'Janvier', 'February': 'Février', 'March': 'Mars', 'April': 'Avril',
'May': 'Mai', 'June': 'Juin', 'July': 'Juillet', 'August': 'Août',
'September': 'Septembre', 'October': 'Octobre', 'November': 'Novembre', 'December': 'Décembre'
}
today = dt.datetime.now()
formatted = today.strftime("%d %B %Y")
for en, fr in months.items():
formatted = formatted.replace(en, fr)
return formatted
@staticmethod
def _safe_get(url: str, timeout: int = 20) -> Optional[bytes]:
try:
r = requests.get(url, timeout=timeout, headers={"User-Agent": "ghost-bot/1.0"})
r.raise_for_status()
return r.content
except Exception as e:
LOG.warning("Flux indisponible: %s (%s)", url, e)
return None
@staticmethod
def _entry_datetime(entry) -> Optional[dt.datetime]:
"""
Tente de récupérer une datetime aware (UTC) pour un item feedparser.
"""
# Try common fields first
if getattr(entry, "published", None):
try:
# YouTube (ISO) e.g. 2025-09-05T10:20:33+00:00
return dt.datetime.fromisoformat(entry.published.replace("Z", "+00:00")).astimezone(dt.timezone.utc)
except Exception:
pass
try:
# RFC822 e.g. Fri, 05 Sep 2025 10:20:33 +0000
return dt.datetime.strptime(entry.published.replace('GMT', '+0000'),
'%a, %d %b %Y %H:%M:%S %z').astimezone(dt.timezone.utc)
except Exception:
pass
if getattr(entry, "updated_parsed", None):
try:
return dt.datetime.fromtimestamp(time.mktime(entry.updated_parsed), tz=dt.timezone.utc)
except Exception:
pass
return None
# --- HTML builder
def _build_html_roundup(self, items: List[dict], feeds: List[RSSfeed]):
"""
Construit le HTML et retourne (html, feature_image_url_ou_None).
- YouTube: iframe + miniature cliquable (fallback email-safe)
- Images: collecte la première pour feature_image
"""
parts: List[str] = []
parts.append(f"<h2>Les news du {self._fr_date_today()}</h2>")
first_image: Optional[str] = None
for post in items:
title = post.get("title", "") or ""
linkURL = post.get("link", "") or ""
parts.append(f'<hr><h3>{html.escape(title)}</h3>')
# --- YouTube embed / fallback
vid = post.get("yt_videoid") or extract_youtube_id(linkURL)
if vid:
# iframe (web) + thumbnail (email-safe) + lien
thumb = f"https://i.ytimg.com/vi/{vid}/hqdefault.jpg"
parts.append(YOUTUBE_EMBED_TMPL.format(vid=vid))
parts.append(f'<p><a href="https://www.youtube.com/watch?v={vid}">Voir sur YouTube</a></p>')
parts.append(f'<p><a href="https://www.youtube.com/watch?v={vid}"><img src="{thumb}" alt="YouTube thumbnail" style="max-width:100%;height:auto;border:0" /></a></p>')
if not first_image:
first_image = thumb
else:
# --- Texte + lien
ftext = ""
if "summary" in post and post["summary"]:
ftext = html.unescape(post["summary"])
ftext = re.sub("<[^<]+?>", "", ftext)
ftext = re.sub(r"Larticle .* est apparu en premier sur .*", "", ftext)
if ftext:
parts.append(f"<p>{html.escape(ftext)}</p>")
if linkURL:
esc = html.escape(linkURL)
parts.append(f'<p><a href="{esc}">{esc}</a></p>')
# --- Images dans le contenu
for link in post.get("links", []) or []:
if link.get("type") in ("image/jpg", "image/jpeg", "image/png", "image/webp"):
imgUrl = link.get("href")
if imgUrl:
if not first_image:
first_image = imgUrl
parts.append(f'<figure><img src="{html.escape(imgUrl)}" loading="lazy"></figure>')
# --- Sources
parts.append("<hr><h3>Sources</h3>")
for feed in feeds:
esc = html.escape(feed.url)
parts.append(f'<p><a href="{esc}">{esc}</a></p>')
parts.append('<p><em>Abonnez-vous pour recevoir chaque jour les news et soutenir mon travail.</em></p>')
return "\n".join(parts), first_image
@staticmethod
def _format_duration(seconds: float) -> str:
seconds = int(seconds)
days, seconds = divmod(seconds, 86400)
hours, seconds = divmod(seconds, 3600)
minutes, seconds = divmod(seconds, 60)
parts = []
if days: parts.append(f"{days} days")
if hours: parts.append(f"{hours} hours")
if minutes: parts.append(f"{minutes} minutes")
if seconds: parts.append(f"{seconds} seconds")
return ", ".join(parts) if parts else "0 seconds"
async def run_daily_at_6_05(self):
while True:
now = dt.datetime.now()
next_run = (now + dt.timedelta(days=1)).replace(hour=6, minute=5, second=0, microsecond=0)
sleep_seconds = (next_run - now).total_seconds()
while sleep_seconds > 0:
LOG.info("Waiting for %s for next scan", self._format_duration(sleep_seconds))
await asyncio.sleep(min(sleep_seconds, 5 * 60))
now = dt.datetime.now()
sleep_seconds = (next_run - now).total_seconds()
LOG.info("Going to run the daily task")
await self.daily_task()
async def daily_task(self):
# Log newsletters (debug)
try:
nls = self.ghost.get_newsletters()
LOG.info("Newsletters: %s", ", ".join(f"{n.get('name')}[{n.get('slug')}]" for n in nls))
except Exception as e:
LOG.warning("Unable to list newsletters: %s", e)
title_post = "Les news du " + self._fr_date_today()
LOG.info("Running daily task : %s", title_post)
# (Re)charge les feeds
feeds_file = os.environ.get("FEEDS_FILE", "/data/feeds.txt")
if not os.path.isfile(feeds_file):
feeds_file = os.environ.get("FEEDS_FILE_FALLBACK", r"c:\workspace\Substack_JV\feeds.txt")
feeds: List[RSSfeed] = []
with open(feeds_file, encoding="utf-8") as f:
lines = [line.strip() for line in f if line.strip()]
for line in lines:
feeds.append(RSSfeed(line, "youtube" in line.lower()))
self.feeds = feeds
# Fenêtre: depuis hier 06:00 UTC
yesterday_6am_utc = dt.datetime.now(dt.timezone.utc).replace(hour=6, minute=0, second=0, microsecond=0) - dt.timedelta(days=1)
all_news_posts: List[dict] = []
for feed in self.feeds:
LOG.info("Scanning feed %s", feed.url)
content = self._safe_get(feed.url, timeout=30)
if not content:
continue
fp = feedparser.parse(content)
# Sélection des items récents
new_entries = []
for e in fp.entries:
dte = self._entry_datetime(e)
if dte and dte > yesterday_6am_utc:
new_entries.append(e)
# Filtrage ad-hoc
filtered = []
for e in new_entries:
linkURL = e.get("link", "") or ""
if "actugaming" in linkURL and ("puzzle-" in linkURL or "guide-" in linkURL):
continue
# enrich YouTube id if applicable
if feed.youtube and linkURL:
vid = extract_youtube_id(linkURL)
if vid:
e["yt_videoid"] = vid
filtered.append(e)
all_news_posts.extend(filtered)
if not all_news_posts:
LOG.warning("Aucun item récupéré (flux down ?). On n'envoie pas aujourd'hui.")
return
random.shuffle(all_news_posts)
roundup_html, feature_image = self._build_html_roundup(all_news_posts, self.feeds)
# 1) Create draft (with feature image if any)
created = self.ghost.create_post_html(title_post, roundup_html, status="draft", feature_image=feature_image)
# 2) Publish + send email
published = self.ghost.publish_post(
post_id=created["id"],
updated_at=created["updated_at"],
newsletter_slug=os.environ.get("GHOST_NEWSLETTER_SLUG"),
email_segment=os.environ.get("GHOST_EMAIL_SEGMENT"),
)
LOG.info("Published post: %s (emailed via newsletter)", published.get("url"))
# ------------- main -------------
async def main():
setuplogger()
parser = argparse.ArgumentParser()
parser.add_argument("--run-once", action="store_true", help="Run immediately once then exit")
args = parser.parse_args()
# Feeds init (list may be reloaded inside task)
feeds: List[RSSfeed] = []
feeds_file = os.environ.get("FEEDS_FILE", "/data/feeds.txt")
if not os.path.isfile(feeds_file):
feeds_file = os.environ.get("FEEDS_FILE_FALLBACK", r"c:\workspace\Substack_JV\feeds.txt")
with open(feeds_file, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
feeds.append(RSSfeed(line, "youtube" in line.lower()))
admin_url = os.environ["GHOST_ADMIN_URL"] # e.g. https://ghostadmin.zep.best/ghost/api/admin/
admin_key = os.environ["GHOST_ADMIN_KEY"] # integration_id:secret_hex
task = GhostTask(
feeds=feeds,
admin_url=admin_url,
admin_key=admin_key,
newsletter_slug=os.environ.get("GHOST_NEWSLETTER_SLUG"),
email_segment=os.environ.get("GHOST_EMAIL_SEGMENT"),
)
LOG.info("Starting bot")
if args.run_once:
await task.daily_task()
return
# Démarrage: publier l'édition du jour si elle n'existe pas encore
await task.maybe_run_today()
# Planification quotidienne à 06:05 Europe/Brussels (via heure locale du conteneur)
await task.run_daily_at_6_05()
if __name__ == "__main__":
asyncio.run(main())