fixing yt + crashes + failsafe

This commit is contained in:
Gaël
2025-09-07 16:07:03 +02:00
parent 74d61522a4
commit a0f988ed32
3 changed files with 262 additions and 116 deletions

View File

@@ -1,4 +1,4 @@
FROM python:3.8
FROM python:3.9
RUN apt-get update && apt-get install -y git
RUN git clone http://gitea.zep.best/zep/Substack_JV.git /app

View File

@@ -1,18 +1,50 @@
import asyncio
import argparse
import datetime
import datetime as dt
import html
import io
import logging
import os
import random
import re
import time
from logging.handlers import RotatingFileHandler
from typing import Optional
from typing import Optional, List
import feedparser
import requests
import jwt
import zoneinfo # Python 3.9+
from urllib.parse import urlparse, parse_qs
# ------------- YouTube helpers -------------
YOUTUBE_EMBED_TMPL = (
'<div class="yt-container" style="position:relative;aspect-ratio:16/9;max-width:800px;margin:1rem 0">'
'<iframe src="https://www.youtube.com/embed/{vid}" '
'title="YouTube video" loading="lazy" '
'style="position:absolute;inset:0;width:100%;height:100%;border:0" '
'allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share" '
'allowfullscreen></iframe></div>'
)
def extract_youtube_id(url: str) -> Optional[str]:
try:
u = urlparse(url)
host = u.netloc.lower()
if host.endswith("youtube.com"):
if u.path == "/watch":
return parse_qs(u.query).get("v", [None])[0]
m = re.match(r"^/(shorts/|live/)?([A-Za-z0-9_-]{6,})", u.path)
if m:
return m.group(2)
if host == "youtu.be":
slug = u.path.strip("/").split("/")[0]
return slug or None
except Exception:
return None
return None
# ------------- Logging -------------
LOG = logging.getLogger("bot")
LOG_PATTERN = logging.Formatter("%(asctime)s:%(levelname)s: [%(filename)s] %(message)s")
@@ -22,19 +54,21 @@ def setuplogger():
stream_handler.setFormatter(LOG_PATTERN)
stream_handler.setLevel(logging.DEBUG)
file_handler = RotatingFileHandler("bot.log", "a", 1000000, 1)
file_handler = RotatingFileHandler("bot.log", "a", 1_000_000, 1)
file_handler.setFormatter(LOG_PATTERN)
LOG.setLevel(logging.DEBUG)
LOG.addHandler(stream_handler)
LOG.addHandler(file_handler)
# ------------- Model -------------
class RSSfeed:
def __init__(self, url, yt=False):
def __init__(self, url: str, yt: bool = False):
self.url = url
self.youtube = yt
# ---------- Ghost Admin API client ----------
# ------------- Ghost Admin API client -------------
class GhostAdmin:
def __init__(self, admin_url: str, admin_key: str, accept_version: str = "v6.0"):
@@ -56,6 +90,20 @@ class GhostAdmin:
"Content-Type": "application/json",
}
def latest_published_date(self, tz_name: str = "Europe/Brussels"):
"""
Date (aware) de la dernière publication (status=published), ou None.
"""
url = self.base + "posts/?limit=1&order=published_at%20desc&fields=published_at"
resp = requests.get(url, headers=self._headers(), timeout=20)
resp.raise_for_status()
posts = resp.json().get("posts", [])
if not posts or not posts[0].get("published_at"):
return None
# ISO 8601 → aware UTC → converti tz locale
dtu = dt.datetime.fromisoformat(posts[0]["published_at"].replace("Z", "+00:00"))
return dtu.astimezone(zoneinfo.ZoneInfo(tz_name))
def get_newsletters(self):
url = self.base + "newsletters/"
resp = requests.get(url, headers=self._headers(), timeout=20)
@@ -66,22 +114,16 @@ class GhostAdmin:
def pick_newsletter_slug(self, preferred_slug: Optional[str]) -> str:
if preferred_slug:
return preferred_slug
# Fallback: choose the first active newsletter (favor default if present)
nls = self.get_newsletters()
if not nls:
raise RuntimeError("No newsletters configured in Ghost (Settings → Newsletters).")
# try 'status=active' first
actives = [n for n in nls if n.get("status") == "active"]
# prefer default one if flagged
for n in actives:
if n.get("is_default"):
return n.get("slug")
return (actives or nls)[0].get("slug")
def create_post_html(self, title: str, html_content: str, status: str = "draft", feature_image: Optional[str] = None):
"""
Create a post with HTML source; optionally set feature_image (absolute URL).
"""
url = self.base + "posts/?source=html"
post = {"title": title, "html": html_content, "status": status}
if feature_image:
@@ -92,9 +134,6 @@ class GhostAdmin:
return resp.json()["posts"][0]
def publish_post(self, post_id: str, updated_at: str, newsletter_slug: Optional[str], email_segment: Optional[str]):
"""
Publish + (if newsletter provided) send email to the chosen audience.
"""
slug = self.pick_newsletter_slug(newsletter_slug)
params = [f"newsletter={requests.utils.quote(slug)}"]
if email_segment:
@@ -105,188 +144,265 @@ class GhostAdmin:
if resp.status_code >= 400:
raise RuntimeError(f"Ghost publish error {resp.status_code}: {resp.text}")
return resp.json()["posts"][0]
# ---------- Your task logic (ported from Substack) ----------
# ------------- Task orchestration -------------
class GhostTask:
def __init__(self, feeds, admin_url, admin_key, newsletter_slug=None, email_segment=None):
def __init__(self, feeds: List[RSSfeed], admin_url: str, admin_key: str, newsletter_slug: Optional[str] = None, email_segment: Optional[str] = None):
self.ghost = GhostAdmin(admin_url, admin_key)
self.feeds = feeds
self.newsletter_slug = newsletter_slug
self.email_segment = email_segment
for feed in self.feeds:
LOG.info("Adding feed " + feed.url)
LOG.info("Adding feed %s", feed.url)
def get_fr_date(self):
import datetime
# --- startup immediate run if not yet published today
def _published_today(self) -> bool:
tz = zoneinfo.ZoneInfo("Europe/Brussels")
last = self.ghost.latest_published_date("Europe/Brussels")
if not last:
return False
return last.date() == dt.datetime.now(tz).date()
async def maybe_run_today(self):
if not self._published_today():
LOG.info("Aucune newsletter publiée aujourd'hui → génération immédiate.")
await self.daily_task()
else:
LOG.info("Déjà publié aujourd'hui, on attend la prochaine fenêtre.")
# --- utils
@staticmethod
def _fr_date_today() -> str:
months = {
'January':'Janvier','February':'Février','March':'Mars','April':'Avril','May':'Mai','June':'Juin',
'July':'Juillet','August':'Août','September':'Septembre','October':'Octobre','November':'Novembre','December':'Décembre'
'January': 'Janvier', 'February': 'Février', 'March': 'Mars', 'April': 'Avril',
'May': 'Mai', 'June': 'Juin', 'July': 'Juillet', 'August': 'Août',
'September': 'Septembre', 'October': 'Octobre', 'November': 'Novembre', 'December': 'Décembre'
}
today = datetime.datetime.now()
today = dt.datetime.now()
formatted = today.strftime("%d %B %Y")
for en, fr in months.items():
formatted = formatted.replace(en, fr)
return formatted
def _build_html_roundup(self, items, feeds):
@staticmethod
def _safe_get(url: str, timeout: int = 20) -> Optional[bytes]:
try:
r = requests.get(url, timeout=timeout, headers={"User-Agent": "ghost-bot/1.0"})
r.raise_for_status()
return r.content
except Exception as e:
LOG.warning("Flux indisponible: %s (%s)", url, e)
return None
@staticmethod
def _entry_datetime(entry) -> Optional[dt.datetime]:
"""
Build HTML and capture the first encountered image URL (for feature_image).
Returns (html_string, first_image_url_or_None).
Tente de récupérer une datetime aware (UTC) pour un item feedparser.
"""
parts = []
parts.append(f"<h2>Les news du {self.get_fr_date()}</h2>")
# Try common fields first
if getattr(entry, "published", None):
try:
# YouTube (ISO) e.g. 2025-09-05T10:20:33+00:00
return dt.datetime.fromisoformat(entry.published.replace("Z", "+00:00")).astimezone(dt.timezone.utc)
except Exception:
pass
try:
# RFC822 e.g. Fri, 05 Sep 2025 10:20:33 +0000
return dt.datetime.strptime(entry.published.replace('GMT', '+0000'),
'%a, %d %b %Y %H:%M:%S %z').astimezone(dt.timezone.utc)
except Exception:
pass
if getattr(entry, "updated_parsed", None):
try:
return dt.datetime.fromtimestamp(time.mktime(entry.updated_parsed), tz=dt.timezone.utc)
except Exception:
pass
return None
# --- HTML builder
def _build_html_roundup(self, items: List[dict], feeds: List[RSSfeed]):
"""
Construit le HTML et retourne (html, feature_image_url_ou_None).
- YouTube: iframe + miniature cliquable (fallback email-safe)
- Images: collecte la première pour feature_image
"""
parts: List[str] = []
parts.append(f"<h2>Les news du {self._fr_date_today()}</h2>")
first_image: Optional[str] = None
for post in items:
title = post.get("title", "")
linkURL = post.get("link", "")
title = post.get("title", "") or ""
linkURL = post.get("link", "") or ""
parts.append(f'<hr><h3>{html.escape(title)}</h3>')
if "yt_videoid" in post:
videoId = post["yt_videoid"]
parts.append(f'<p>https://www.youtube.com/watch?v={videoId}</p>')
parts.append(f'<p><a href="{html.escape(linkURL)}">{html.escape(linkURL)}</a></p>')
# --- YouTube embed / fallback
vid = post.get("yt_videoid") or extract_youtube_id(linkURL)
if vid:
# iframe (web) + thumbnail (email-safe) + lien
thumb = f"https://i.ytimg.com/vi/{vid}/hqdefault.jpg"
parts.append(YOUTUBE_EMBED_TMPL.format(vid=vid))
parts.append(f'<p><a href="https://www.youtube.com/watch?v={vid}">Voir sur YouTube</a></p>')
parts.append(f'<p><a href="https://www.youtube.com/watch?v={vid}"><img src="{thumb}" alt="YouTube thumbnail" style="max-width:100%;height:auto;border:0" /></a></p>')
if not first_image:
first_image = thumb
else:
# --- Texte + lien
ftext = ""
if "summary" in post:
if "summary" in post and post["summary"]:
ftext = html.unescape(post["summary"])
ftext = re.sub("<[^<]+?>", "", ftext)
ftext = re.sub(r"Larticle .* est apparu en premier sur .*", "", ftext)
if ftext:
parts.append(f"<p>{html.escape(ftext)}</p>")
if linkURL:
parts.append(f'<p><a href="{html.escape(linkURL)}">{html.escape(linkURL)}</a></p>')
esc = html.escape(linkURL)
parts.append(f'<p><a href="{esc}">{esc}</a></p>')
# Attach images in the body; remember the first one for feature_image
if "links" in post:
for link in post["links"]:
if link.get("type") in ("image/jpg","image/jpeg","image/png","image/webp"):
# --- Images dans le contenu
for link in post.get("links", []) or []:
if link.get("type") in ("image/jpg", "image/jpeg", "image/png", "image/webp"):
imgUrl = link.get("href")
if imgUrl:
if not first_image:
first_image = imgUrl
parts.append(f'<figure><img src="{html.escape(imgUrl)}" loading="lazy"></figure>')
# Sources
# --- Sources
parts.append("<hr><h3>Sources</h3>")
for feed in feeds:
parts.append(f'<p><a href="{html.escape(feed.url)}">{html.escape(feed.url)}</a></p>')
esc = html.escape(feed.url)
parts.append(f'<p><a href="{esc}">{esc}</a></p>')
parts.append('<p><em>Abonnez-vous pour recevoir chaque jour les news et soutenir mon travail.</em></p>')
return "\n".join(parts), first_image
def format_duration(self, seconds):
@staticmethod
def _format_duration(seconds: float) -> str:
seconds = int(seconds)
days, seconds = divmod(seconds, 86400)
hours, seconds = divmod(seconds, 3600)
minutes, seconds = divmod(seconds, 60)
parts = []
if days > 0: parts.append(f"{days} days")
if hours > 0: parts.append(f"{hours} hours")
if minutes > 0: parts.append(f"{minutes} minutes")
if seconds > 0: parts.append(f"{seconds} seconds")
if days: parts.append(f"{days} days")
if hours: parts.append(f"{hours} hours")
if minutes: parts.append(f"{minutes} minutes")
if seconds: parts.append(f"{seconds} seconds")
return ", ".join(parts) if parts else "0 seconds"
async def run_daily_at_6_am(self):
async def run_daily_at_6_05(self):
while True:
now = datetime.datetime.now()
next_run = (now + datetime.timedelta(days=1)).replace(hour=6, minute=5, second=0, microsecond=0)
now = dt.datetime.now()
next_run = (now + dt.timedelta(days=1)).replace(hour=6, minute=5, second=0, microsecond=0)
sleep_seconds = (next_run - now).total_seconds()
while sleep_seconds > 0:
LOG.info(f"Waiting for {self.format_duration(sleep_seconds)} for next scan")
LOG.info("Waiting for %s for next scan", self._format_duration(sleep_seconds))
await asyncio.sleep(min(sleep_seconds, 5 * 60))
now = datetime.datetime.now()
now = dt.datetime.now()
sleep_seconds = (next_run - now).total_seconds()
LOG.info("Going to run the daily task")
await self.daily_task()
async def daily_task(self):
# Log newsletters (debug)
try:
nls = self.ghost.get_newsletters()
print("Newsletters:")
for n in nls:
print(f"- title={n.get('name')} slug={n.get('slug')} status={n.get('status')} default={n.get('is_default')}")
LOG.info("Newsletters: %s", ", ".join(f"{n.get('name')}[{n.get('slug')}]" for n in nls))
except Exception as e:
LOG.warning("Unable to list newsletters: %s", e)
title_post = "Les news du " + self.get_fr_date()
LOG.info("Running daily task : " + str(title_post))
title_post = "Les news du " + self._fr_date_today()
LOG.info("Running daily task : %s", title_post)
# Re-read feeds (unchanged)
# (Re)charge les feeds
feeds_file = os.environ.get("FEEDS_FILE", "/data/feeds.txt")
if not os.path.isfile(feeds_file):
feeds_file = os.environ.get("FEEDS_FILE_FALLBACK", "x:\\substack\\feeds.txt")
self.feeds = []
with open(feeds_file) as f:
feeds_file = os.environ.get("FEEDS_FILE_FALLBACK", r"c:\workspace\Substack_JV\feeds.txt")
feeds: List[RSSfeed] = []
with open(feeds_file, encoding="utf-8") as f:
lines = [line.strip() for line in f if line.strip()]
for line in lines:
self.feeds.append(RSSfeed(line, "youtube" in line))
feeds.append(RSSfeed(line, "youtube" in line.lower()))
self.feeds = feeds
yesterday_6am = datetime.datetime.now(datetime.timezone.utc).replace(
hour=6, minute=0, second=0, microsecond=0
) - datetime.timedelta(days=1)
# Fenêtre: depuis hier 06:00 UTC
yesterday_6am_utc = dt.datetime.now(dt.timezone.utc).replace(hour=6, minute=0, second=0, microsecond=0) - dt.timedelta(days=1)
all_news_posts = []
all_news_posts: List[dict] = []
for feed in self.feeds:
LOG.info("Scanning feed " + feed.url)
html_text = requests.get(feed.url, timeout=30).text
newsFeed = feedparser.parse(html_text)
LOG.info("Scanning feed %s", feed.url)
content = self._safe_get(feed.url, timeout=30)
if not content:
continue
fp = feedparser.parse(content)
if feed.youtube:
new_posts = [e for e in newsFeed.entries if datetime.datetime.fromisoformat(e.published) > yesterday_6am]
else:
try:
new_posts = [e for e in newsFeed.entries
if datetime.datetime.strptime(e.published.replace('GMT', '+0000'),
'%a, %d %b %Y %H:%M:%S %z') > yesterday_6am]
except Exception:
new_posts = [e for e in newsFeed.entries
if datetime.datetime.fromtimestamp(time.mktime(e.updated_parsed)).replace(
tzinfo=datetime.timezone.utc) > yesterday_6am]
# Sélection des items récents
new_entries = []
for e in fp.entries:
dte = self._entry_datetime(e)
if dte and dte > yesterday_6am_utc:
new_entries.append(e)
# Filtrage ad-hoc
filtered = []
for e in new_posts:
linkURL = e.get("link", "")
for e in new_entries:
linkURL = e.get("link", "") or ""
if "actugaming" in linkURL and ("puzzle-" in linkURL or "guide-" in linkURL):
continue
# enrich YouTube id if applicable
if feed.youtube and linkURL:
vid = extract_youtube_id(linkURL)
if vid:
e["yt_videoid"] = vid
filtered.append(e)
all_news_posts.extend(filtered)
if not all_news_posts:
LOG.warning("Aucun item récupéré (flux down ?). On n'envoie pas aujourd'hui.")
return
random.shuffle(all_news_posts)
roundup_html, feature_image = self._build_html_roundup(all_news_posts, self.feeds)
# 1) Create as draft WITH feature_image if we found one
# 1) Create draft (with feature image if any)
created = self.ghost.create_post_html(title_post, roundup_html, status="draft", feature_image=feature_image)
# 2) Publish AND SEND EMAIL (always)
# 2) Publish + send email
published = self.ghost.publish_post(
post_id=created["id"],
updated_at=created["updated_at"],
newsletter_slug=os.environ.get("GHOST_NEWSLETTER_SLUG"), # may be None -> auto-pick
email_segment=os.environ.get("GHOST_EMAIL_SEGMENT"), # may be None -> send to all
newsletter_slug=os.environ.get("GHOST_NEWSLETTER_SLUG"),
email_segment=os.environ.get("GHOST_EMAIL_SEGMENT"),
)
LOG.info("Published post: %s (emailed via newsletter)", published.get("url"))
LOG.info(f"Published post: {published.get('url')} (emailed via newsletter)")
def debug_list_newsletters(admin_url, admin_key):
g = GhostAdmin(admin_url, admin_key)
nls = g.get_newsletters()
print("Newsletters:")
for n in nls:
print(f"- title={n.get('name')} slug={n.get('slug')} status={n.get('status')} default={n.get('is_default')}")
# ---------------- main ----------------
# ------------- main -------------
async def main():
setuplogger()
# Feeds initial pass (kept for parity with your original script)
feeds = []
parser = argparse.ArgumentParser()
parser.add_argument("--run-once", action="store_true", help="Run immediately once then exit")
args = parser.parse_args()
# Feeds init (list may be reloaded inside task)
feeds: List[RSSfeed] = []
feeds_file = os.environ.get("FEEDS_FILE", "/data/feeds.txt")
if not os.path.isfile(feeds_file):
feeds_file = os.environ.get("FEEDS_FILE_FALLBACK", r"c:\workspace\Substack_JV\feeds.txt")
with open(feeds_file) as f:
lines = [line.strip() for line in f if line.strip()]
for line in lines:
feeds.append(RSSfeed(line, "youtube" in line))
with open(feeds_file, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
feeds.append(RSSfeed(line, "youtube" in line.lower()))
admin_url = os.environ["GHOST_ADMIN_URL"]
admin_key = os.environ["GHOST_ADMIN_KEY"]
admin_url = os.environ["GHOST_ADMIN_URL"] # e.g. https://ghostadmin.zep.best/ghost/api/admin/
admin_key = os.environ["GHOST_ADMIN_KEY"] # integration_id:secret_hex
task = GhostTask(
feeds=feeds,
@@ -297,9 +413,16 @@ async def main():
)
LOG.info("Starting bot")
await task.run_daily_at_6_am()
# Or just run once:
#await task.daily_task()
if args.run-once:
await task.daily_task()
return
# Démarrage: publier l'édition du jour si elle n'existe pas encore
await task.maybe_run_today()
# Planification quotidienne à 06:05 Europe/Brussels (via heure locale du conteneur)
await task.run_daily_at_6_05()
if __name__ == "__main__":
asyncio.run(main())

23
xboxsyde.py Normal file
View File

@@ -0,0 +1,23 @@
import feedparser
import io
import html
import datetime
import requests
import time
url = r'https://www.xboxygen.com/spip.php?page=backend'
html_text = requests.get(url).text
news = feedparser.parse(html_text)
yesterday_6am = datetime.datetime.now(datetime.timezone.utc).replace(hour=6, minute=0, second=0, microsecond=0) - datetime.timedelta(days=1)
try:
new_posts = [entry for entry in news.entries if datetime.datetime.strptime(entry.published.replace('GMT', '+0000'), '%a, %d %b %Y %H:%M:%S %z') > yesterday_6am]
except:
new_posts = [entry for entry in news.entries if datetime.datetime.fromtimestamp(time.mktime(entry.updated_parsed)).replace(tzinfo=datetime.timezone.utc) > yesterday_6am]
#else if
#entry.updated.replace('GMT', '+0000'), '%a, %d %b %Y %H:%M:%S %z'
print(new_posts)