')
if not first_image:
first_image = thumb
else:
# --- Texte + lien
ftext = ""
if "summary" in post and post["summary"]:
ftext = html.unescape(post["summary"])
ftext = re.sub("<[^<]+?>", "", ftext)
ftext = re.sub(r"L’article .* est apparu en premier sur .*", "", ftext)
if ftext:
parts.append(f"
{html.escape(ftext)}
")
if linkURL:
esc = html.escape(linkURL)
parts.append(f'
')
# --- Images dans le contenu
for link in post.get("links", []) or []:
if link.get("type") in ("image/jpg", "image/jpeg", "image/png", "image/webp"):
imgUrl = link.get("href")
if imgUrl:
if not first_image:
first_image = imgUrl
parts.append(f'')
# --- Sources
parts.append("
Sources
")
for feed in feeds:
esc = html.escape(feed.url)
parts.append(f'
Abonnez-vous pour recevoir chaque jour les news et soutenir mon travail.
')
return "\n".join(parts), first_image
@staticmethod
def _format_duration(seconds: float) -> str:
seconds = int(seconds)
days, seconds = divmod(seconds, 86400)
hours, seconds = divmod(seconds, 3600)
minutes, seconds = divmod(seconds, 60)
parts = []
if days: parts.append(f"{days} days")
if hours: parts.append(f"{hours} hours")
if minutes: parts.append(f"{minutes} minutes")
if seconds: parts.append(f"{seconds} seconds")
return ", ".join(parts) if parts else "0 seconds"
async def run_daily_at_6_05(self):
while True:
now = dt.datetime.now()
next_run = (now + dt.timedelta(days=1)).replace(hour=6, minute=5, second=0, microsecond=0)
sleep_seconds = (next_run - now).total_seconds()
while sleep_seconds > 0:
LOG.info("Waiting for %s for next scan", self._format_duration(sleep_seconds))
await asyncio.sleep(min(sleep_seconds, 5 * 60))
now = dt.datetime.now()
sleep_seconds = (next_run - now).total_seconds()
LOG.info("Going to run the daily task")
await self.daily_task()
async def daily_task(self):
# Log newsletters (debug)
try:
nls = self.ghost.get_newsletters()
LOG.info("Newsletters: %s", ", ".join(f"{n.get('name')}[{n.get('slug')}]" for n in nls))
except Exception as e:
LOG.warning("Unable to list newsletters: %s", e)
title_post = "Les news du " + self._fr_date_today()
LOG.info("Running daily task : %s", title_post)
# (Re)charge les feeds
feeds_file = os.environ.get("FEEDS_FILE", "/data/feeds.txt")
if not os.path.isfile(feeds_file):
feeds_file = os.environ.get("FEEDS_FILE_FALLBACK", r"c:\workspace\Substack_JV\feeds.txt")
feeds: List[RSSfeed] = []
with open(feeds_file, encoding="utf-8") as f:
lines = [line.strip() for line in f if line.strip()]
for line in lines:
feeds.append(RSSfeed(line, "youtube" in line.lower()))
self.feeds = feeds
# Fenêtre: depuis hier 06:00 UTC
yesterday_6am_utc = dt.datetime.now(dt.timezone.utc).replace(hour=6, minute=0, second=0, microsecond=0) - dt.timedelta(days=1)
all_news_posts: List[dict] = []
for feed in self.feeds:
LOG.info("Scanning feed %s", feed.url)
content = self._safe_get(feed.url, timeout=30)
if not content:
continue
fp = feedparser.parse(content)
# Sélection des items récents
new_entries = []
for e in fp.entries:
dte = self._entry_datetime(e)
if dte and dte > yesterday_6am_utc:
new_entries.append(e)
# Filtrage ad-hoc
filtered = []
for e in new_entries:
linkURL = e.get("link", "") or ""
if "actugaming" in linkURL and ("puzzle-" in linkURL or "guide-" in linkURL):
continue
# enrich YouTube id if applicable
if feed.youtube and linkURL:
vid = extract_youtube_id(linkURL)
if vid:
e["yt_videoid"] = vid
filtered.append(e)
all_news_posts.extend(filtered)
if not all_news_posts:
LOG.warning("Aucun item récupéré (flux down ?). On n'envoie pas aujourd'hui.")
return
random.shuffle(all_news_posts)
roundup_html, feature_image = self._build_html_roundup(all_news_posts, self.feeds)
# 1) Create draft (with feature image if any)
created = self.ghost.create_post_html(title_post, roundup_html, status="draft", feature_image=feature_image)
# 2) Publish + send email
published = self.ghost.publish_post(
post_id=created["id"],
updated_at=created["updated_at"],
newsletter_slug=os.environ.get("GHOST_NEWSLETTER_SLUG"),
email_segment=os.environ.get("GHOST_EMAIL_SEGMENT"),
)
LOG.info("Published post: %s (emailed via newsletter)", published.get("url"))
# ------------- main -------------
async def main():
setuplogger()
parser = argparse.ArgumentParser()
parser.add_argument("--runonce", action="store_true", help="Run now and exit (no scheduler)")
args = parser.parse_args()
# Feeds init (list may be reloaded inside task)
feeds: List[RSSfeed] = []
feeds_file = os.environ.get("FEEDS_FILE", "/data/feeds.txt")
if not os.path.isfile(feeds_file):
feeds_file = os.environ.get("FEEDS_FILE_FALLBACK", r"c:\workspace\Substack_JV\feeds.txt")
with open(feeds_file, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
feeds.append(RSSfeed(line, "youtube" in line.lower()))
admin_url = os.environ["GHOST_ADMIN_URL"] # e.g. https://ghostadmin.zep.best/ghost/api/admin/
admin_key = os.environ["GHOST_ADMIN_KEY"] # integration_id:secret_hex
task = GhostTask(
feeds=feeds,
admin_url=admin_url,
admin_key=admin_key,
newsletter_slug=os.environ.get("GHOST_NEWSLETTER_SLUG"),
email_segment=os.environ.get("GHOST_EMAIL_SEGMENT"),
)
LOG.info("Starting bot")
if args.runonce:
await task.daily_task()
return
# Démarrage: publier l'édition du jour si elle n'existe pas encore
await task.maybe_run_today()
# Planification quotidienne à 06:05 Europe/Brussels (via heure locale du conteneur)
await task.run_daily_at_6_05()
if __name__ == "__main__":
asyncio.run(main())