Files
Substack_JV/post_rss_to_ghost.py
Gaël Honorez 74d61522a4 prod
2025-09-05 15:57:53 +02:00

306 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import argparse
import datetime
import html
import io
import logging
import os
import random
import re
import time
from logging.handlers import RotatingFileHandler
from typing import Optional
import feedparser
import requests
import jwt
LOG = logging.getLogger("bot")
LOG_PATTERN = logging.Formatter("%(asctime)s:%(levelname)s: [%(filename)s] %(message)s")
def setuplogger():
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(LOG_PATTERN)
stream_handler.setLevel(logging.DEBUG)
file_handler = RotatingFileHandler("bot.log", "a", 1000000, 1)
file_handler.setFormatter(LOG_PATTERN)
LOG.setLevel(logging.DEBUG)
LOG.addHandler(stream_handler)
LOG.addHandler(file_handler)
class RSSfeed:
def __init__(self, url, yt=False):
self.url = url
self.youtube = yt
# ---------- Ghost Admin API client ----------
class GhostAdmin:
def __init__(self, admin_url: str, admin_key: str, accept_version: str = "v6.0"):
self.base = admin_url.rstrip("/") + "/"
self.key_id, self.key_secret_hex = admin_key.split(":")
self.accept_version = accept_version
def _jwt(self) -> str:
iat = int(time.time())
payload = {"iat": iat, "exp": iat + 5 * 60, "aud": "/admin/"}
headers = {"alg": "HS256", "typ": "JWT", "kid": self.key_id}
token = jwt.encode(payload, bytes.fromhex(self.key_secret_hex), algorithm="HS256", headers=headers)
return token if isinstance(token, str) else token.decode("utf-8")
def _headers(self):
return {
"Authorization": f"Ghost {self._jwt()}",
"Accept-Version": self.accept_version,
"Content-Type": "application/json",
}
def get_newsletters(self):
url = self.base + "newsletters/"
resp = requests.get(url, headers=self._headers(), timeout=20)
if resp.status_code >= 400:
raise RuntimeError(f"Ghost newsletters error {resp.status_code}: {resp.text}")
return resp.json().get("newsletters", [])
def pick_newsletter_slug(self, preferred_slug: Optional[str]) -> str:
if preferred_slug:
return preferred_slug
# Fallback: choose the first active newsletter (favor default if present)
nls = self.get_newsletters()
if not nls:
raise RuntimeError("No newsletters configured in Ghost (Settings → Newsletters).")
# try 'status=active' first
actives = [n for n in nls if n.get("status") == "active"]
# prefer default one if flagged
for n in actives:
if n.get("is_default"):
return n.get("slug")
return (actives or nls)[0].get("slug")
def create_post_html(self, title: str, html_content: str, status: str = "draft", feature_image: Optional[str] = None):
"""
Create a post with HTML source; optionally set feature_image (absolute URL).
"""
url = self.base + "posts/?source=html"
post = {"title": title, "html": html_content, "status": status}
if feature_image:
post["feature_image"] = feature_image
resp = requests.post(url, headers=self._headers(), json={"posts": [post]}, timeout=30)
if resp.status_code >= 400:
raise RuntimeError(f"Ghost create error {resp.status_code}: {resp.text}")
return resp.json()["posts"][0]
def publish_post(self, post_id: str, updated_at: str, newsletter_slug: Optional[str], email_segment: Optional[str]):
"""
Publish + (if newsletter provided) send email to the chosen audience.
"""
slug = self.pick_newsletter_slug(newsletter_slug)
params = [f"newsletter={requests.utils.quote(slug)}"]
if email_segment:
params.append(f"email_segment={requests.utils.quote(email_segment)}")
url = self.base + f"posts/{post_id}/?{'&'.join(params)}"
body = {"posts": [{"updated_at": updated_at, "status": "published"}]}
resp = requests.put(url, headers=self._headers(), json=body, timeout=30)
if resp.status_code >= 400:
raise RuntimeError(f"Ghost publish error {resp.status_code}: {resp.text}")
return resp.json()["posts"][0]
# ---------- Your task logic (ported from Substack) ----------
class GhostTask:
def __init__(self, feeds, admin_url, admin_key, newsletter_slug=None, email_segment=None):
self.ghost = GhostAdmin(admin_url, admin_key)
self.feeds = feeds
self.newsletter_slug = newsletter_slug
self.email_segment = email_segment
for feed in self.feeds:
LOG.info("Adding feed " + feed.url)
def get_fr_date(self):
import datetime
months = {
'January':'Janvier','February':'Février','March':'Mars','April':'Avril','May':'Mai','June':'Juin',
'July':'Juillet','August':'Août','September':'Septembre','October':'Octobre','November':'Novembre','December':'Décembre'
}
today = datetime.datetime.now()
formatted = today.strftime("%d %B %Y")
for en, fr in months.items():
formatted = formatted.replace(en, fr)
return formatted
def _build_html_roundup(self, items, feeds):
"""
Build HTML and capture the first encountered image URL (for feature_image).
Returns (html_string, first_image_url_or_None).
"""
parts = []
parts.append(f"<h2>Les news du {self.get_fr_date()}</h2>")
first_image: Optional[str] = None
for post in items:
title = post.get("title", "")
linkURL = post.get("link", "")
parts.append(f'<hr><h3>{html.escape(title)}</h3>')
if "yt_videoid" in post:
videoId = post["yt_videoid"]
parts.append(f'<p>https://www.youtube.com/watch?v={videoId}</p>')
parts.append(f'<p><a href="{html.escape(linkURL)}">{html.escape(linkURL)}</a></p>')
else:
ftext = ""
if "summary" in post:
ftext = html.unescape(post["summary"])
ftext = re.sub("<[^<]+?>", "", ftext)
ftext = re.sub(r"Larticle .* est apparu en premier sur .*", "", ftext)
if ftext:
parts.append(f"<p>{html.escape(ftext)}</p>")
if linkURL:
parts.append(f'<p><a href="{html.escape(linkURL)}">{html.escape(linkURL)}</a></p>')
# Attach images in the body; remember the first one for feature_image
if "links" in post:
for link in post["links"]:
if link.get("type") in ("image/jpg","image/jpeg","image/png","image/webp"):
imgUrl = link.get("href")
if imgUrl:
if not first_image:
first_image = imgUrl
parts.append(f'<figure><img src="{html.escape(imgUrl)}" loading="lazy"></figure>')
# Sources
parts.append("<hr><h3>Sources</h3>")
for feed in feeds:
parts.append(f'<p><a href="{html.escape(feed.url)}">{html.escape(feed.url)}</a></p>')
parts.append('<p><em>Abonnez-vous pour recevoir chaque jour les news et soutenir mon travail.</em></p>')
return "\n".join(parts), first_image
def format_duration(self, seconds):
days, seconds = divmod(seconds, 86400)
hours, seconds = divmod(seconds, 3600)
minutes, seconds = divmod(seconds, 60)
parts = []
if days > 0: parts.append(f"{days} days")
if hours > 0: parts.append(f"{hours} hours")
if minutes > 0: parts.append(f"{minutes} minutes")
if seconds > 0: parts.append(f"{seconds} seconds")
return ", ".join(parts) if parts else "0 seconds"
async def run_daily_at_6_am(self):
while True:
now = datetime.datetime.now()
next_run = (now + datetime.timedelta(days=1)).replace(hour=6, minute=5, second=0, microsecond=0)
sleep_seconds = (next_run - now).total_seconds()
while sleep_seconds > 0:
LOG.info(f"Waiting for {self.format_duration(sleep_seconds)} for next scan")
await asyncio.sleep(min(sleep_seconds, 5 * 60))
now = datetime.datetime.now()
sleep_seconds = (next_run - now).total_seconds()
LOG.info("Going to run the daily task")
await self.daily_task()
async def daily_task(self):
nls = self.ghost.get_newsletters()
print("Newsletters:")
for n in nls:
print(f"- title={n.get('name')} slug={n.get('slug')} status={n.get('status')} default={n.get('is_default')}")
title_post = "Les news du " + self.get_fr_date()
LOG.info("Running daily task : " + str(title_post))
# Re-read feeds (unchanged)
feeds_file = os.environ.get("FEEDS_FILE", "/data/feeds.txt")
if not os.path.isfile(feeds_file):
feeds_file = os.environ.get("FEEDS_FILE_FALLBACK", "x:\\substack\\feeds.txt")
self.feeds = []
with open(feeds_file) as f:
lines = [line.strip() for line in f if line.strip()]
for line in lines:
self.feeds.append(RSSfeed(line, "youtube" in line))
yesterday_6am = datetime.datetime.now(datetime.timezone.utc).replace(
hour=6, minute=0, second=0, microsecond=0
) - datetime.timedelta(days=1)
all_news_posts = []
for feed in self.feeds:
LOG.info("Scanning feed " + feed.url)
html_text = requests.get(feed.url, timeout=30).text
newsFeed = feedparser.parse(html_text)
if feed.youtube:
new_posts = [e for e in newsFeed.entries if datetime.datetime.fromisoformat(e.published) > yesterday_6am]
else:
try:
new_posts = [e for e in newsFeed.entries
if datetime.datetime.strptime(e.published.replace('GMT', '+0000'),
'%a, %d %b %Y %H:%M:%S %z') > yesterday_6am]
except Exception:
new_posts = [e for e in newsFeed.entries
if datetime.datetime.fromtimestamp(time.mktime(e.updated_parsed)).replace(
tzinfo=datetime.timezone.utc) > yesterday_6am]
filtered = []
for e in new_posts:
linkURL = e.get("link", "")
if "actugaming" in linkURL and ("puzzle-" in linkURL or "guide-" in linkURL):
continue
filtered.append(e)
all_news_posts.extend(filtered)
random.shuffle(all_news_posts)
roundup_html, feature_image = self._build_html_roundup(all_news_posts, self.feeds)
# 1) Create as draft WITH feature_image if we found one
created = self.ghost.create_post_html(title_post, roundup_html, status="draft", feature_image=feature_image)
# 2) Publish AND SEND EMAIL (always)
published = self.ghost.publish_post(
post_id=created["id"],
updated_at=created["updated_at"],
newsletter_slug=os.environ.get("GHOST_NEWSLETTER_SLUG"), # may be None -> auto-pick
email_segment=os.environ.get("GHOST_EMAIL_SEGMENT"), # may be None -> send to all
)
LOG.info(f"Published post: {published.get('url')} (emailed via newsletter)")
def debug_list_newsletters(admin_url, admin_key):
g = GhostAdmin(admin_url, admin_key)
nls = g.get_newsletters()
print("Newsletters:")
for n in nls:
print(f"- title={n.get('name')} slug={n.get('slug')} status={n.get('status')} default={n.get('is_default')}")
# ---------------- main ----------------
async def main():
setuplogger()
# Feeds initial pass (kept for parity with your original script)
feeds = []
feeds_file = os.environ.get("FEEDS_FILE", "/data/feeds.txt")
if not os.path.isfile(feeds_file):
feeds_file = os.environ.get("FEEDS_FILE_FALLBACK", r"c:\workspace\Substack_JV\feeds.txt")
with open(feeds_file) as f:
lines = [line.strip() for line in f if line.strip()]
for line in lines:
feeds.append(RSSfeed(line, "youtube" in line))
admin_url = os.environ["GHOST_ADMIN_URL"]
admin_key = os.environ["GHOST_ADMIN_KEY"]
task = GhostTask(
feeds=feeds,
admin_url=admin_url,
admin_key=admin_key,
newsletter_slug=os.environ.get("GHOST_NEWSLETTER_SLUG"),
email_segment=os.environ.get("GHOST_EMAIL_SEGMENT"),
)
LOG.info("Starting bot")
await task.run_daily_at_6_am()
# Or just run once:
#await task.daily_task()
if __name__ == "__main__":
asyncio.run(main())