Compare commits

...

49 Commits

Author SHA1 Message Date
Gaël
85d79db3fd new newsletter format 2026-01-31 19:21:24 +01:00
Gaël
e15d53339f fixing the loop 2025-11-24 08:50:58 +01:00
Gaël
80d7c45cfb also publish 2025-11-17 13:23:48 +01:00
Gaël
85b4ea9e24 changing scheduler 2025-11-17 13:20:37 +01:00
Gaël
91add44592 updating python 2025-10-05 19:54:32 +02:00
Gaël
898ada327f adding prime gaming 2025-10-05 19:52:50 +02:00
Gaël
c66935bcb6 wrong path 2025-09-29 09:41:15 +02:00
Gaël
1db321b7d4 remember to save! 2025-09-29 09:39:41 +02:00
Gaël
d08a598fd7 adding caching to presque gratos 2025-09-29 09:36:47 +02:00
Gaël
595c11eeb5 no title 2025-09-17 10:48:16 +02:00
Gaël
8074e7df84 python 3.9 2025-09-10 09:45:18 +02:00
Gaël
b6102d0b4f updating youtube 2025-09-10 09:38:27 +02:00
Gaël
39a4ed88de update logger 2025-09-08 19:01:59 +02:00
Gaël
542aea6602 allez on essaie ca 2025-09-08 18:53:34 +02:00
Gaël
a1e462eab9 bs4 2025-09-08 18:47:48 +02:00
Gaël
a727d5f66a adding dependency 2025-09-08 18:45:57 +02:00
Gaël
ae9c1517d9 update url 2025-09-08 18:39:50 +02:00
Gaël
6cf99fe2e1 again... 2025-09-08 18:34:12 +02:00
Gaël
7b1ee4a0ab test 2025-09-08 18:31:28 +02:00
Gaël
fae2482067 update docker 2025-09-08 18:28:03 +02:00
Gaël
5f57eab24a update docker 2025-09-08 18:23:36 +02:00
Gaël
67f85515a9 bad arg 2025-09-07 16:13:28 +02:00
Gaël
a0f988ed32 fixing yt + crashes + failsafe 2025-09-07 16:07:03 +02:00
Gaël Honorez
74d61522a4 prod 2025-09-05 15:57:53 +02:00
Gaël Honorez
8fbdf0eff6 testing 2025-09-05 15:03:16 +02:00
Gaël Honorez
34d91a3677 using ghost 2025-09-05 14:36:28 +02:00
zep
83869b0663 Update Dockerfile 2025-03-06 12:25:30 +01:00
Gaël
e3d51d6c03 oupsie 2024-10-11 16:38:16 +02:00
Gaël
08442b88e6 removed guides/puzzle from actugaming 2024-10-09 08:56:38 +02:00
Gaël
967527835b fixing xboxsyde feed. 2024-07-30 08:59:14 +02:00
Gaël
8b1a331eb4 up docker 2024-07-03 19:06:56 +02:00
Gaël
ef04c73c31 remove virtual display 2024-07-03 19:02:56 +02:00
Gaël
c78078ce35 fixing pathing 2024-07-03 18:46:00 +02:00
Gaël
668843d8e8 fixing email check 2024-07-03 18:44:36 +02:00
Gaël
e0127a0362 reset the time to 6pm 2024-07-03 17:02:58 +02:00
Gaël
f17cd92f90 fixing cookie things, more or less 2024-07-03 17:01:29 +02:00
Gaël Honorez
d2b39db82e more rounding 2024-01-02 18:38:15 +01:00
Gaël Honorez
364660a7f5 even more readeable 2024-01-02 18:35:30 +01:00
Gaël Honorez
cfe4c70a32 human readeable time. 2024-01-02 18:30:54 +01:00
Gaël Honorez
2db0a6543f scan feeds from file 2024-01-02 14:05:10 +01:00
Gaël Honorez
ef414db31f adding docker compose 2024-01-02 13:53:55 +01:00
Gaël Honorez
a3885024db waiting 5 minutes 2024-01-02 10:10:12 +01:00
Gaël Honorez
16d72ffd32 fix 2024-01-02 09:06:40 +01:00
Gaël Honorez
a3719f1a35 logging hourly 2024-01-02 08:58:01 +01:00
Gaël Honorez
f8b9ba7eb6 ok this is necessary 2024-01-02 08:42:22 +01:00
Gaël Honorez
320863ca50 correct timezone 2024-01-02 08:40:11 +01:00
Gaël Honorez
5c68063ad3 add some logs 2024-01-02 08:35:20 +01:00
Gaël Honorez
84ab8eb974 adding logs 2024-01-01 20:16:18 +01:00
Gaël Honorez
c268dd00c8 better update script 2024-01-01 20:09:48 +01:00
14 changed files with 2331 additions and 220 deletions

View File

@@ -1,8 +1,19 @@
FROM python:3.8 FROM python:3.12
RUN apt-get update && apt-get install -y git RUN apt-get update && apt-get install -y git
RUN git clone http://192.168.1.25:8124/zep/Substack_JV.git /app RUN git clone https://gitea.zep.best/zep/Substack_JV.git /app
WORKDIR /app WORKDIR /app
RUN pip install --upgrade pip
COPY requirements.txt .
RUN pip install -r requirements.txt RUN pip install -r requirements.txt
ENV TZ=Europe/Brussels
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN playwright install --with-deps chromium
COPY update_and_run.sh /app COPY update_and_run.sh /app
RUN chmod +x /app/update_and_run.sh # Normalize line endings (Windows CRLF -> LF) and ensure readable
CMD ["./update_and_run.sh"] RUN sed -i 's/\r$//' /app/update_and_run.sh && chmod a+r /app/update_and_run.sh
# Single entrypoint: run via sh (no exec bit required, survives noexec mounts)
ENTRYPOINT ["sh", "/app/update_and_run.sh"]

View File

@@ -1,209 +0,0 @@
import asyncio
import argparse
import requests
import feedparser
import io
import html
import datetime
import logging
import os
import re
from logging.handlers import RotatingFileHandler
import random
from substack import Api
from substack.post import Post
LOG = logging.getLogger('bot')
LOG_PATTERN = logging.Formatter('%(asctime)s:%(levelname)s: [%(filename)s] %(message)s')
def setuplogger():
conf_filename = None
steam_handler = logging.StreamHandler()
steam_handler.setFormatter(LOG_PATTERN)
steam_handler.setLevel(logging.DEBUG)
def setup_logger(logger_name, file_name=None, add_steam=False):
file_name = file_name or logger_name
log_filename = f"{file_name}.log"
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
file_handler = RotatingFileHandler(log_filename, "a", 1000000, 1)
file_handler.setFormatter(LOG_PATTERN)
logger.addHandler(file_handler)
if add_steam:
logger.addHandler(steam_handler)
setup_logger("bot", conf_filename, True)
class RSSfeed():
def __init__(self, url, yt=False):
self.url = url
self.youtube = yt
class SubStackTask:
def __init__(self, login, password, account, feeds):
self.api = Api(
email=login,
password=password,
publication_url=account,
)
self.user_id = self.api.get_user_id()
self.feeds = feeds
def get_fr_date(self):
# Mapping of English month names to French
months_en_to_fr = {
'January': 'Janvier', 'February': 'Février', 'March': 'Mars',
'April': 'Avril', 'May': 'Mai', 'June': 'Juin',
'July': 'Juillet', 'August': 'Août', 'September': 'Septembre',
'October': 'Octobre', 'November': 'Novembre', 'December': 'Décembre'
}
today = datetime.datetime.now()
formatted_date = today.strftime("%d %B %Y")
# Replace the English month with the French month
for en, fr in months_en_to_fr.items():
formatted_date = formatted_date.replace(en, fr)
return formatted_date
async def run_daily_at_6_am(self):
while True:
now = datetime.datetime.now()
# Calculate the time until 6 AM next day
next_run = (now + datetime.timedelta(days=1)).replace(hour=6, minute=5, second=0, microsecond=0)
sleep_seconds = (next_run - now).total_seconds()
LOG.info("Waiting for " + str(sleep_seconds) + " seconds for next scan")
# Wait until the next run time
await asyncio.sleep(sleep_seconds)
# Run the daily task
await self.daily_task()
async def daily_task(self):
title_post = "Les news du " + self.get_fr_date()
sub_stack_post = Post(
title=title_post,
subtitle="",
user_id=self.user_id
)
midnight_today = datetime.datetime.now(datetime.timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
yesterday_6am = datetime.datetime.now(datetime.timezone.utc).replace(hour=6, minute=0, second=0, microsecond=0) - datetime.timedelta(days=1)
formatted_date = midnight_today.strftime('%a, %d %b %Y %H:%M:%S %z')
all_news_posts = []
for feed in self.feeds:
html_text = requests.get(feed.url).text
newsFeed = feedparser.parse(html_text)
if feed.youtube is True:
new_posts = [entry for entry in newsFeed.entries if datetime.datetime.fromisoformat(entry.published) > yesterday_6am]
else:
new_posts = [entry for entry in newsFeed.entries if datetime.datetime.strptime(entry.published.replace('GMT', '+0000'), '%a, %d %b %Y %H:%M:%S %z') > yesterday_6am]
all_news_posts.extend(new_posts)
random.shuffle(all_news_posts)
for post in all_news_posts:
linkURL = post["link"]
title = post["title"]
ftext = ""
LOG.info("Posting " + str(title))
if "summary" in post:
ftext = html.unescape(post["summary"])
# Using regular expressions to remove HTML tags
ftext = re.sub('<[^<]+?>', '', ftext)
pattern = r"Larticle .* est apparu en premier sur .*"
ftext = re.sub(pattern, '', ftext)
if "yt_videoid" in post:
sub_stack_post.add({"type":"heading", "level":3, "content": title})
videoId = post["yt_videoid"]
sub_stack_post.add({"type":"youtube2", "src": videoId })
sub_stack_post.add({'type': 'paragraph', 'content': [
{'content': linkURL, 'marks': [{'type': "link", 'href': linkURL}]}]})
else:
if ftext != "":
sub_stack_post.add({"type":"heading", "level":3, "content": title})
sub_stack_post.add({"type":"paragraph", "content": ftext })
sub_stack_post.add({'type': 'paragraph', 'content': [
{'content': linkURL, 'marks': [{'type': "link", 'href': linkURL}]}]})
if "links" in post:
for link in post["links"]:
if link["type"] == "image/jpg":
imgUrl = link["href"]
sub_stack_post.add({'type': 'captionedImage', 'src': imgUrl})
sub_stack_post.add({"type":"horizontal_rule"})
sub_stack_post.add({"type":"heading", "level":3, "content": "Sources"})
for feed in self.feeds:
sub_stack_post.add({'type': 'paragraph', 'content': [
{'content': feed.url, 'marks': [{'type': "link", 'href': feed.url}]}]})
sub_stack_post.add({"type":"subscribeWidget", "message":"Abonnez-vous gratuitement pour recevoir chaque jour les news dans votre e-mail et soutenir mon travail."})
draft = self.api.post_draft(sub_stack_post.get_draft())
self.api.prepublish_draft(draft.get("id"))
self.api.publish_draft(draft.get("id"))
async def main(login, password, account):
setuplogger()
if os.path.exists("last_scan_date.txt"):
with open("last_scan_date.txt", "r") as f:
last_post_date = datetime.datetime.strptime(f.read().strip(), '%a, %d %b %Y %H:%M:%S %z')
else:
last_post_date = datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
feeds = []
feeds.append(RSSfeed("https://www.factornews.com/rss.xml"))
feeds.append(RSSfeed("https://nofrag.com/feed"))
feeds.append(RSSfeed("https://dystopeek.fr/feed/"))
feeds.append(RSSfeed("https://thepixelpost.com/rss/"))
feeds.append(RSSfeed("https://yamukass.substack.com/feed"))
feeds.append(RSSfeed("https://tseret.com/categorie/tests/feed"))
feeds.append(RSSfeed("https://www.gamesidestory.com/feed"))
feeds.append(RSSfeed("https://www.nintendo-town.fr/feed"))
feeds.append(RSSfeed("https://www.youtube.com/feeds/videos.xml?channel_id=UC-OvBDfZGn1OdsqMBwkOI_A", True))
feeds.append(RSSfeed("https://www.youtube.com/feeds/videos.xml?playlist_id=PLZRiqJjIUlDTrwYs_UqEIts5fVaBpaIEz", True))
task = SubStackTask(login, password, account, feeds)
LOG.info("Starting bot")
await task.run_daily_at_6_am()
#await task.daily_task()
if __name__ == "__main__":
asyncio.run(main("gael.honorez@gmail.com", "f3PaTGedjFc2gkr1ypi5", "https://aggregateurjvfr.substack.com"))

162
backfill.py Normal file
View File

@@ -0,0 +1,162 @@
# backfill_from_ghost.py
from __future__ import annotations
import os, re, sys, html
from typing import Dict, List, Optional
import requests
from bs4 import BeautifulSoup
# Reuse your existing GhostAdmin client (same headers/base/proxy behavior)
# Adjust the import path if your Ghost client lives elsewhere.
from presquegratos import GhostAdmin
from storage import Storage
from keys import xgp_key, egs_key, psplus_key
# ---------------- Ghost helpers (reusing your admin client) ----------------
def ghost_list_posts(ghost: GhostAdmin, page: int = 1) -> Dict:
# Minimal params: avoid 'filter' and 'fields' to dodge 400 behind __bot proxy
url = ghost.base + "posts/"
params = {
"limit": "50",
"page": str(page),
"order": "published_at DESC",
"formats": "lexical,html", # <-- IMPORTANT
}
r = requests.get(url, headers=ghost._headers(), params=params, timeout=30)
r.raise_for_status()
return r.json()
def list_recap_posts(ghost: GhostAdmin, hard_limit: int = 2000) -> List[Dict]:
posts: List[Dict] = []
page = 1
while True:
data = ghost_list_posts(ghost, page=page)
batch = data.get("posts", [])
if not batch:
break
# client-side filter to be robust to proxy quirks
for p in batch:
title = (p.get("title") or "").strip()
if title.startswith("Récap hebdo"):
posts.append(p)
if len(batch) < 50 or len(posts) >= hard_limit:
break
page += 1
return posts
# ---------------- Parsing helpers (unchanged) ----------------
#MS_STORE_RE = re.compile(r"(?:microsoft|xbox)\.com/.*/store/.*/([0-9A-Z]{12,})", re.I)
MS_STORE_RE = re.compile(r"(?:xbox|microsoft)\.com/.*/store/.*/([0-9A-Z]{12,16})", re.I)
EPIC_RE = re.compile(r"epicgames\.com/store/.*/p/([\w\-]+)", re.I)
PSBLOG_RE = re.compile(r"blog\.playstation\.com/.*", re.I)
def clean_text(s: str) -> str:
return re.sub(r"\s+", " ", html.unescape(s or "")).strip()
def extract_sections(soup: BeautifulSoup) -> Dict[str, BeautifulSoup]:
sections: Dict[str, BeautifulSoup] = {}
current = None
current_key = None
for node in soup.find_all(["h2","h3","h4","p","ul","ol","div","section"]):
if node.name in ("h2","h3","h4"):
title = clean_text(node.get_text())
key = None
tl = title.lower()
if "game pass" in tl:
key = "xgp"
elif "egs" in tl or "epic" in tl:
key = "egs"
elif "ps plus" in tl or "ps+" in tl:
key = "psplus"
if key:
current_key = key
current = sections[key] = soup.new_tag("div")
continue
if current_key and current is not None:
current.append(node)
return sections
def parse_xgp(section: BeautifulSoup) -> List[Dict]:
items = []
for a in section.find_all("a", href=True):
href = a["href"]
m = MS_STORE_RE.search(href)
title = clean_text(a.get_text())
if m or title:
productId = m.group(1) if m else None
items.append({"title": title, "productId": productId})
uniq, seen = [], set()
for it in items:
k = xgp_key(it)
if k not in seen:
uniq.append(it); seen.add(k)
return uniq
def parse_egs(section: BeautifulSoup) -> List[Dict]:
items = []
for a in section.find_all("a", href=True):
if not EPIC_RE.search(a["href"]):
continue
title = clean_text(a.get_text()) or clean_text(a.get("title"))
items.append({"title": title, "start": ""})
uniq, seen = [], set()
for it in items:
k = egs_key(it)
if k not in seen:
uniq.append(it); seen.add(k)
return uniq
def parse_psplus(section: BeautifulSoup, post_title: str) -> Optional[Dict]:
a = section.find("a", href=PSBLOG_RE)
url = a["href"] if a else ""
m = re.search(r"(\d{2})-(\d{2})-(\d{4})", post_title)
iso = ""
if m:
d, mth, y = m.group(1), m.group(2), m.group(3)
iso = f"{y}-{mth}-{d}"
return {"url": url, "date": iso}
# ---------------- Main backfill ----------------
def backfill():
# Use the same env your main script uses; GhostAdmin will read them internally or
# you can pass them explicitly if your class expects (base_url, admin_key).
ghost = GhostAdmin(
admin_url=os.environ.get("GHOST_ADMIN_URL", "").rstrip("/") + "/",
admin_key=os.environ.get("GHOST_ADMIN_KEY", "")
)
store = Storage()
posts = list_recap_posts(ghost)
print(f"Found {len(posts)} recap posts.")
total_xgp = total_egs = total_ps = 0
dedup = []
for p in posts:
pid = p["id"]
title = p.get("title") or ""
html_body = p.get("html") or ""
soup = BeautifulSoup(html_body, "html.parser")
sections = extract_sections(soup)
for it in parse_xgp(sections.get("xgp", BeautifulSoup("", "html.parser"))):
key = xgp_key(it)
if not key in dedup:
store.remember("xgp", key, pid); total_xgp += 1
dedup.append(key)
# for it in parse_egs(sections.get("egs", BeautifulSoup("", "html.parser"))):
# store.remember("egs", egs_key(it), pid); total_egs += 1
# if "psplus" in sections:
# item = parse_psplus(sections["psplus"], title)
# store.remember("psplus", psplus_key(item), pid); total_ps += 1
print(f"Backfilled from: {title}")
print(f"Done. Inserted ~ XGP:{total_xgp} | EGS:{total_egs} | PS+:{total_ps}")
if __name__ == "__main__":
backfill()

1
cookies.json Normal file
View File

@@ -0,0 +1 @@
{"__cf_bm": "95up0icsYyESvD6suTUFG05xaWxwEr5_xuHUOv32G9I-1720025055-1.0.1.1-NlvsLW9j26FX8aPpLmVETEJ0zd.VyXefLr75kvT6iC.zHnPtkbIWgfesI0VaUGuvwV62qHpctJEoahLR9TIuHQ", "ab_experiment_sampled": "%22false%22", "ab_testing_id": "%22a6e7ba67-7dc0-452c-a935-d2f2bddd5edf%22", "ajs_anonymous_id": "%22e4535e95-1c5b-4173-82db-47807c57fb38%22", "cookie_storage_key": "f666a42c-49e8-47a2-bdbc-6eece0d6a06e", "substack.sid": "s%3ARLYSI2_XaTlGuYIpTYWjS8ib48PpuE0S.jNwCzcGzKUvUAuFdLNdfgxwewTUawIoDDZ05moubvzM", "visit_id": "%7B%22id%22%3A%22a0d46be8-56f4-406f-b1d7-14c41369b737%22%2C%22timestamp%22%3A%222024-07-03T16%3A44%3A13.349Z%22%7D", "AWSALBTG": "yw2xMbYVFbKWSzJiQsdCKp7mMH+wQ5T4/JIUc1TvywUi5iIJVXuO21AMhb+oPgegicdtpekLTDTl+zWKEekRsurS7+20skhmPxZXJf/Tl7jBd/PecbW7qa3DHkPvQtWz+SWD8+7P1rNjmY9lmyZgzH/ZeGgeiishRz9gsGO0OT/d", "AWSALBTGCORS": "yw2xMbYVFbKWSzJiQsdCKp7mMH+wQ5T4/JIUc1TvywUi5iIJVXuO21AMhb+oPgegicdtpekLTDTl+zWKEekRsurS7+20skhmPxZXJf/Tl7jBd/PecbW7qa3DHkPvQtWz+SWD8+7P1rNjmY9lmyZgzH/ZeGgeiishRz9gsGO0OT/d"}

9
docker-compose.yml Normal file
View File

@@ -0,0 +1,9 @@
version: '3.3'
services:
substackjv:
build: .
volumes:
- /path/to/your/host/directory:/data
environment:
- TZ=Europe/Brussels

7
env.bat Normal file
View File

@@ -0,0 +1,7 @@
set GHOST_ADMIN_KEY=68bad0e13546e700012dd65d:116a81b7e189d3b3d3b86082f97ef65daedb06498a3f1f902b8e0c08d095dc19
set GHOST_ADMIN_URL=https://ghostadmin.zep.best/ghost/api/admin/__bot/FF4440EBA737506D397C170A8422109C357AA7582F10938B7C5F11D6B652F5D4
set GHOST_EMAIL_SEGMENT=status:free
set GHOST_NEWSLETTER_SLUG=default-newsletter
set GHOST_CONTENT_URL=https://ghost.zep.best
set DB_FILE_FALLBACK=f:\workspace\Substack_JV\data\published.db
set MISTRAL_API_KEY=tQJHvYlmwz1ihKxOhXS3FmDNTRhBh6b3

15
feeds.txt Normal file
View File

@@ -0,0 +1,15 @@
https://www.factornews.com/rss.xml
https://nofrag.com/feed
https://dystopeek.fr/feed/
https://thepixelpost.com/rss/
https://yamukass.substack.com/feed
https://tseret.com/categorie/tests/feed
https://www.gamesidestory.com/feed
https://www.nintendo-town.fr/feed
https://jesuisungameur.com/feed
https://www.switch-actu.fr/categorie/tests/tests-de-jeux/feed
https://www.playscope.com/category/articles/test-gaming/feed
https://jrpgfr.net/category/test/feed
https://jv.jeuxonline.info/rss/dossiers/rss.xml
https://www.youtube.com/feeds/videos.xml?channel_id=UC-OvBDfZGn1OdsqMBwkOI_A
https://www.youtube.com/feeds/videos.xml?playlist_id=PLZRiqJjIUlDTrwYs_UqEIts5fVaBpaIEz

20
keys.py Normal file
View File

@@ -0,0 +1,20 @@
# keys.py (or inline in your main)
def xgp_key(item) -> str:
# Prefer stable Microsoft Store productId if present; fallback to normalized title.
pid = (item.get("productId") or "").strip()
if pid:
return f"item:xgp:{pid}"
title = (item.get("title") or "").strip().lower()
return f"item:xgp:title:{title}"
def egs_key(item) -> str:
# Use title + start window (your fetcher usually knows the free-week start)
title = (item.get("title") or "").strip()
start = (item.get("start") or "").strip() # ISO or YYYY-MM-DD
return f"item:egs:{title}|{start}"
def psplus_key(item) -> str:
# Use official PS Blog URL + the published month (or your computed date)
url = (item.get("url") or "").strip()
date = (item.get("date") or "").strip()
return f"item:psplus:{url}|{date}"

928
post_rss_to_ghost.py Normal file
View File

@@ -0,0 +1,928 @@
import asyncio
import argparse
import datetime as dt
import html
import json
import logging
import os
import re
import time
from logging.handlers import RotatingFileHandler
from typing import Optional, List, Dict
import feedparser
import requests
import jwt
import zoneinfo # Python 3.9+
from urllib.parse import urlparse, parse_qs, urljoin
# ------------- Web Crawler for Images -------------
def extract_image_from_url(url: str, timeout: int = 10) -> Optional[str]:
"""
Fetch a webpage and extract the best image (og:image, twitter:image, or first large image).
Returns the image URL or None.
"""
try:
resp = requests.get(
url,
timeout=timeout,
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml",
},
allow_redirects=True,
)
resp.raise_for_status()
html_content = resp.text
# Try OpenGraph image first (most reliable)
og_match = re.search(r'<meta[^>]+property=["\']og:image["\'][^>]+content=["\']([^"\']+)["\']', html_content, re.IGNORECASE)
if not og_match:
og_match = re.search(r'<meta[^>]+content=["\']([^"\']+)["\'][^>]+property=["\']og:image["\']', html_content, re.IGNORECASE)
if og_match:
img_url = og_match.group(1)
return urljoin(url, img_url)
# Try Twitter card image
tw_match = re.search(r'<meta[^>]+name=["\']twitter:image["\'][^>]+content=["\']([^"\']+)["\']', html_content, re.IGNORECASE)
if not tw_match:
tw_match = re.search(r'<meta[^>]+content=["\']([^"\']+)["\'][^>]+name=["\']twitter:image["\']', html_content, re.IGNORECASE)
if tw_match:
img_url = tw_match.group(1)
return urljoin(url, img_url)
# Fallback: look for article/main image
article_img = re.search(r'<article[^>]*>.*?<img[^>]+src=["\']([^"\']+)["\']', html_content, re.IGNORECASE | re.DOTALL)
if article_img:
img_url = article_img.group(1)
# Skip tiny images, icons, avatars
if not any(skip in img_url.lower() for skip in ['avatar', 'icon', 'logo', 'emoji', '1x1', 'pixel']):
return urljoin(url, img_url)
return None
except Exception as e:
LOG.debug("Failed to extract image from %s: %s", url, e)
return None
# ------------- YouTube helpers -------------
def fetch_youtube_oembed_html(youtube_url: str, timeout: int = 10) -> Optional[str]:
"""
Get YouTube oEmbed HTML exactly as provided and wrap it as a Ghost embed card.
"""
try:
resp = requests.get(
"https://www.youtube.com/oembed",
params={"url": youtube_url, "format": "json"},
headers={"User-Agent": "ghost-bot/1.0"},
timeout=timeout,
)
resp.raise_for_status()
data = resp.json()
html_content = data.get("html")
if not html_content:
return None
# Wrap in Ghost embed card container; do NOT alter the iframe attributes.
return f'<figure class="kg-card kg-embed-card">{html_content}</figure>'
except Exception:
return None
def youtube_thumbnail_url(video_id: str) -> str:
return f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg"
def extract_youtube_id(url: str) -> Optional[str]:
try:
u = urlparse(url)
host = u.netloc.lower()
if host.endswith("youtube.com"):
if u.path == "/watch":
return parse_qs(u.query).get("v", [None])[0]
m = re.match(r"^/(shorts/|live/)?([A-Za-z0-9_-]{6,})", u.path)
if m:
return m.group(2)
if host == "youtu.be":
slug = u.path.strip("/").split("/")[0]
return slug or None
except Exception:
return None
return None
# ------------- Logging -------------
LOG = logging.getLogger("bot")
LOG_PATTERN = logging.Formatter("%(asctime)s:%(levelname)s: [%(filename)s] %(message)s")
def setuplogger():
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(LOG_PATTERN)
stream_handler.setLevel(logging.DEBUG)
file_handler = RotatingFileHandler("bot.log", "a", 1_000_000, 1)
file_handler.setFormatter(LOG_PATTERN)
LOG.setLevel(logging.DEBUG)
LOG.addHandler(stream_handler)
LOG.addHandler(file_handler)
# ------------- Model -------------
class RSSfeed:
def __init__(self, url: str, yt: bool = False):
self.url = url
self.youtube = yt
# ------------- Mistral AI Client -------------
class MistralClient:
"""Client for Mistral AI API to filter and group news items."""
def __init__(self, api_key: str, model: str = "mistral-small-latest"):
self.api_key = api_key
self.model = model
self.base_url = "https://api.mistral.ai/v1/chat/completions"
def _call_api(self, messages: List[Dict], temperature: float = 0.3) -> Optional[str]:
"""Make a call to the Mistral API."""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
payload = {
"model": self.model,
"messages": messages,
"temperature": temperature,
"response_format": {"type": "json_object"},
}
try:
resp = requests.post(self.base_url, headers=headers, json=payload, timeout=120)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"]
except Exception as e:
LOG.error("Mistral API error: %s", e)
return None
def filter_news_items(self, items: List[dict], dry_run: bool = False) -> List[dict]:
"""
Filter out non-news items (tips, walkthroughs, guides, tutorials).
Returns only actual game news items.
"""
if not items:
return []
# Prepare items for analysis
items_for_analysis = []
for i, item in enumerate(items):
items_for_analysis.append({
"id": i,
"title": item.get("title", ""),
"link": item.get("link", ""),
"summary": (item.get("summary", "") or "")[:300], # Truncate for API
})
# Split into batches to avoid token limits
batch_size = 50
filtered_ids = set()
for batch_start in range(0, len(items_for_analysis), batch_size):
batch = items_for_analysis[batch_start:batch_start + batch_size]
prompt = f"""Tu analyses des articles de sites de jeux vidéo. Tu dois identifier UNIQUEMENT les articles à EXCLURE.
EXCLURE UNIQUEMENT si le titre contient EXPLICITEMENT UN de ces mots-clés:
- "guide" (le mot exact)
- "soluce" (le mot exact)
- "astuce" (le mot exact)
- "solution" (le mot exact, pas "résolution")
- "code promo"
- "bon plan"
- "-20%" ou "-30%" etc (réductions)
- "tuto" ou "tutoriel"
- "comment faire"
- "how to"
NE JAMAIS EXCLURE:
- "Early Access" = news de sortie anticipée, À GARDER
- "Test" ou "Review" = critique, À GARDER
- "Partie Rapide" = émission/podcast, À GARDER
- Tout article de news, annonce, sortie, preview
- Tout article d'opinion, éditorial, récap
- Tout le reste qui ne contient pas les mots-clés d'exclusion ci-dessus
Articles à analyser:
{json.dumps(batch, ensure_ascii=False, indent=2)}
Retourne un JSON avec "exclude_ids" contenant UNIQUEMENT les IDs des articles guides/soluces/promos.
Si aucun article ne correspond aux critères d'exclusion, retourne {{"exclude_ids": []}}
Sois TRÈS conservateur - en cas de doute, NE PAS exclure."""
messages = [{"role": "user", "content": prompt}]
response = self._call_api(messages)
if response:
try:
result = json.loads(response)
excluded_ids = set(result.get("exclude_ids", []))
# Keep all items NOT in excluded_ids
batch_ids = {item["id"] for item in batch}
kept_ids = batch_ids - excluded_ids
filtered_ids.update(kept_ids)
except json.JSONDecodeError:
LOG.warning("Failed to parse Mistral response for filtering")
# Fallback: include all items from this batch
filtered_ids.update(item["id"] for item in batch)
else:
# Fallback: include all items from this batch
filtered_ids.update(item["id"] for item in batch)
# Log filtered out items in dry-run mode
if dry_run:
excluded_ids = set(range(len(items))) - filtered_ids
if excluded_ids:
LOG.debug("=== FILTERED OUT (non-news) ===")
for i in sorted(excluded_ids):
LOG.debug(" [EXCLUDED] %s", items[i].get("title", "No title"))
LOG.debug("=== KEPT (news) ===")
for i in sorted(filtered_ids):
if i < len(items):
LOG.debug(" [KEPT] %s", items[i].get("title", "No title"))
return [items[i] for i in sorted(filtered_ids) if i < len(items)]
def group_similar_items(self, items: List[dict]) -> List[Dict]:
"""
Group news items by category (News, Tests/Reviews, Previews, etc.)
with sub-groups by game/topic within each category.
Returns a list of categories, each with sub-groups containing items.
"""
if not items:
return []
# Prepare items for analysis
items_for_analysis = []
for i, item in enumerate(items):
items_for_analysis.append({
"id": i,
"title": item.get("title", ""),
"link": item.get("link", ""),
})
prompt = f"""Organise ces articles de jeux vidéo en CATÉGORIES et SOUS-GROUPES.
Articles à organiser:
{json.dumps(items_for_analysis, ensure_ascii=False, indent=2)}
CATÉGORIES (utilise ces noms exacts):
1. "Actualités" - News, annonces, sorties, mises à jour, industrie
2. "Tests & Critiques" - Reviews, tests, avis, notes
3. "Aperçus & Previews" - Previews, impressions, démos, hands-on
4. "Vidéos" - Trailers, gameplay vidéos, podcasts
5. "Autres" - Le reste
RÈGLES DE GROUPEMENT (TRÈS IMPORTANT):
- Groupe par FRANCHISE ou SÉRIE (ex: tous les "Final Fantasy" ensemble, même FF7, FF16, FF XIV)
- Groupe par UNIVERS (ex: "Warhammer 40K" = Space Marine + Dawn of War + Darktide)
- Groupe par ÉVÉNEMENT (ex: "Nintendo Direct", "State of Play", "Game Awards")
- N'utilise JAMAIS de noms de sites web comme groupes (pas "NoFrag", "JeuxOnline", etc.)
EXEMPLES DE GROUPEMENTS CORRECTS:
- "Final Fantasy VII Rebirth sur Switch 2" + "Final Fantasy VII Remake Intergrade en tête" → groupe "Final Fantasy"
- "Techmarine dans Space Marine 2" + "Dawn of War 4 gameplay Ork" → groupe "Warhammer 40K"
- "GTA 6 trailer" + "GTA 6 date de sortie" → groupe "GTA 6"
- "Nintendo Direct annoncé" + "Zelda dans le Nintendo Direct" → groupe "Nintendo Direct"
Retourne ce JSON:
{{
"categories": [
{{
"name": "Actualités",
"subgroups": [
{{"title": "Final Fantasy", "item_ids": [0, 3, 7]}},
{{"title": "Warhammer 40K", "item_ids": [1, 2]}},
{{"title": "Steam", "item_ids": [5]}}
]
}}
]
}}
IMPORTANT: Chaque article dans UN SEUL sous-groupe. Titre = nom de franchise/série/univers, PAS nom de site."""
messages = [{"role": "user", "content": prompt}]
response = self._call_api(messages, temperature=0.2)
if response:
try:
result = json.loads(response)
categories = []
used_ids = set()
for cat_data in result.get("categories", []):
cat_name = cat_data.get("name", "Autres")
subgroups = []
for sg_data in cat_data.get("subgroups", []):
sg_title = sg_data.get("title", "Divers")
item_ids = sg_data.get("item_ids", [])
# Filter to valid, unused IDs
valid_ids = [i for i in item_ids if i < len(items) and i not in used_ids]
if valid_ids:
used_ids.update(valid_ids)
subgroups.append({
"title": sg_title,
"items": [items[i] for i in valid_ids]
})
if subgroups:
categories.append({
"name": cat_name,
"subgroups": subgroups
})
# Add any ungrouped items
ungrouped = [items[i] for i in range(len(items)) if i not in used_ids]
if ungrouped:
# Find or create "Autres" category
autres_cat = next((c for c in categories if c["name"] == "Autres"), None)
if autres_cat:
autres_cat["subgroups"].append({"title": "Divers", "items": ungrouped})
else:
categories.append({
"name": "Autres",
"subgroups": [{"title": "Divers", "items": ungrouped}]
})
return categories
except json.JSONDecodeError:
LOG.warning("Failed to parse Mistral response for grouping")
# Fallback: return all items in a single category/subgroup
return [{
"name": "Actualités de la semaine",
"subgroups": [{"title": "Toutes les news", "items": items}]
}]
# ------------- Ghost Admin API client -------------
class GhostAdmin:
def __init__(self, admin_url: str, admin_key: str, accept_version: str = "v6.0"):
self.base = admin_url.rstrip("/") + "/"
self.key_id, self.key_secret_hex = admin_key.split(":")
self.accept_version = accept_version
def _jwt(self) -> str:
iat = int(time.time())
payload = {"iat": iat, "exp": iat + 5 * 60, "aud": "/admin/"}
headers = {"alg": "HS256", "typ": "JWT", "kid": self.key_id}
token = jwt.encode(payload, bytes.fromhex(self.key_secret_hex), algorithm="HS256", headers=headers)
return token if isinstance(token, str) else token.decode("utf-8")
def _headers(self):
return {
"Authorization": f"Ghost {self._jwt()}",
"Accept-Version": self.accept_version,
"Content-Type": "application/json",
}
def latest_published_date(self, tz_name: str = "Europe/Brussels"):
"""
Date (aware) de la dernière publication (status=published), ou None.
"""
url = self.base + "posts/?limit=1&order=published_at%20desc&fields=published_at"
resp = requests.get(url, headers=self._headers(), timeout=20)
resp.raise_for_status()
posts = resp.json().get("posts", [])
if not posts or not posts[0].get("published_at"):
return None
# ISO 8601 → aware UTC → converti tz locale
dtu = dt.datetime.fromisoformat(posts[0]["published_at"].replace("Z", "+00:00"))
return dtu.astimezone(zoneinfo.ZoneInfo(tz_name))
def get_newsletters(self):
url = self.base + "newsletters/"
resp = requests.get(url, headers=self._headers(), timeout=20)
if resp.status_code >= 400:
raise RuntimeError(f"Ghost newsletters error {resp.status_code}: {resp.text}")
return resp.json().get("newsletters", [])
def pick_newsletter_slug(self, preferred_slug: Optional[str]) -> str:
if preferred_slug:
return preferred_slug
nls = self.get_newsletters()
if not nls:
raise RuntimeError("No newsletters configured in Ghost (Settings → Newsletters).")
actives = [n for n in nls if n.get("status") == "active"]
for n in actives:
if n.get("is_default"):
return n.get("slug")
return (actives or nls)[0].get("slug")
def create_post_html(self, title: str, html_content: str, status: str = "draft", feature_image: Optional[str] = None):
url = self.base + "posts/?source=html"
post = {"title": title, "html": html_content, "status": status}
if feature_image:
post["feature_image"] = feature_image
resp = requests.post(url, headers=self._headers(), json={"posts": [post]}, timeout=30)
if resp.status_code >= 400:
raise RuntimeError(f"Ghost create error {resp.status_code}: {resp.text}")
return resp.json()["posts"][0]
def publish_post(self, post_id: str, updated_at: str, newsletter_slug: Optional[str], email_segment: Optional[str]):
slug = self.pick_newsletter_slug(newsletter_slug)
params = [f"newsletter={requests.utils.quote(slug)}"]
if email_segment:
params.append(f"email_segment={requests.utils.quote(email_segment)}")
url = self.base + f"posts/{post_id}/?{'&'.join(params)}"
body = {"posts": [{"updated_at": updated_at, "status": "published"}]}
resp = requests.put(url, headers=self._headers(), json=body, timeout=30)
if resp.status_code >= 400:
raise RuntimeError(f"Ghost publish error {resp.status_code}: {resp.text}")
return resp.json()["posts"][0]
# ------------- Task orchestration -------------
class GhostTask:
def __init__(self, feeds: List[RSSfeed], admin_url: str, admin_key: str,
mistral_api_key: Optional[str] = None,
newsletter_slug: Optional[str] = None, email_segment: Optional[str] = None,
dry_run: bool = False):
self.ghost = GhostAdmin(admin_url, admin_key)
self.feeds = feeds
self.newsletter_slug = newsletter_slug
self.email_segment = email_segment
self.mistral = MistralClient(mistral_api_key) if mistral_api_key else None
self.dry_run = dry_run
for feed in self.feeds:
LOG.info("Adding feed %s", feed.url)
# --- startup immediate run if not yet published this week
def _published_this_week(self) -> bool:
"""Check if we already published this week (since last Saturday 12:00)."""
tz = zoneinfo.ZoneInfo("Europe/Brussels")
last = self.ghost.latest_published_date("Europe/Brussels")
if not last:
return False
now = dt.datetime.now(tz)
# Find last Saturday at 12:00
days_since_saturday = (now.weekday() - 5) % 7 # Saturday = 5
last_saturday = (now - dt.timedelta(days=days_since_saturday)).replace(
hour=12, minute=0, second=0, microsecond=0
)
return last >= last_saturday
async def maybe_run_this_week(self):
if not self._published_this_week():
LOG.info("Aucune newsletter publiée cette semaine -> génération immédiate.")
await self.weekly_task()
else:
LOG.info("Déjà publié cette semaine, on attend la prochaine fenêtre.")
# --- utils
@staticmethod
def _fr_week_range() -> str:
"""Returns a French formatted date range for the past week."""
months = {
'January': 'Janvier', 'February': 'Février', 'March': 'Mars', 'April': 'Avril',
'May': 'Mai', 'June': 'Juin', 'July': 'Juillet', 'August': 'Août',
'September': 'Septembre', 'October': 'Octobre', 'November': 'Novembre', 'December': 'Décembre'
}
today = dt.datetime.now()
week_ago = today - dt.timedelta(days=7)
# Format: "24 - 31 Janvier 2025" or "28 Janvier - 4 Février 2025"
if week_ago.month == today.month:
formatted = f"{week_ago.day} - {today.strftime('%d %B %Y')}"
else:
formatted = f"{week_ago.strftime('%d %B')} - {today.strftime('%d %B %Y')}"
for en, fr in months.items():
formatted = formatted.replace(en, fr)
return formatted
@staticmethod
def _safe_get(url: str, timeout: int = 20) -> Optional[bytes]:
try:
r = requests.get(url, timeout=timeout, headers={"User-Agent": "ghost-bot/1.0"})
r.raise_for_status()
return r.content
except Exception as e:
LOG.warning("Flux indisponible: %s (%s)", url, e)
return None
@staticmethod
def _entry_datetime(entry) -> Optional[dt.datetime]:
"""
Tente de récupérer une datetime aware (UTC) pour un item feedparser.
"""
# Try common fields first
if getattr(entry, "published", None):
try:
# YouTube (ISO) e.g. 2025-09-05T10:20:33+00:00
return dt.datetime.fromisoformat(entry.published.replace("Z", "+00:00")).astimezone(dt.timezone.utc)
except Exception:
pass
try:
# RFC822 e.g. Fri, 05 Sep 2025 10:20:33 +0000
return dt.datetime.strptime(entry.published.replace('GMT', '+0000'),
'%a, %d %b %Y %H:%M:%S %z').astimezone(dt.timezone.utc)
except Exception:
pass
if getattr(entry, "updated_parsed", None):
try:
return dt.datetime.fromtimestamp(time.mktime(entry.updated_parsed), tz=dt.timezone.utc)
except Exception:
pass
return None
# --- HTML builder for grouped content
def _build_html_roundup_grouped(self, categories: List[Dict], feeds: List[RSSfeed]):
"""
Construit le HTML avec des catégories et sous-groupes thématiques.
Inclut un résumé et une table des matières en haut.
Retourne (html, feature_image_url_ou_None).
"""
parts: List[str] = []
first_image: Optional[str] = None
# --- Build Summary Section ---
parts.append('<h2>✨ En bref cette semaine</h2>')
parts.append('<ul>')
for cat in categories:
cat_name = cat.get("name", "Actualités")
subgroups = cat.get("subgroups", [])
# Get top subgroups with more than 1 item (by item count) for summary
multi_item_subgroups = [sg for sg in subgroups if len(sg.get("items", [])) > 1]
sorted_subgroups = sorted(multi_item_subgroups, key=lambda sg: len(sg.get("items", [])), reverse=True)
top_subgroups = sorted_subgroups[:5] # Max 5 highlights per category
total_items = sum(len(sg.get("items", [])) for sg in subgroups)
if top_subgroups:
highlights = ", ".join(sg.get("title", "Divers") for sg in top_subgroups)
total_items = sum(len(sg.get("items", [])) for sg in subgroups)
parts.append(f'<li><strong>{html.escape(cat_name)}</strong>: {html.escape(highlights)} ({total_items} articles)</li>')
elif total_items > 0:
parts.append(f'<li><strong>{html.escape(cat_name)}</strong>: {total_items} articles</li>')
parts.append('</ul>')
parts.append('<hr>')
# --- Build Table of Contents ---
# parts.append('<h2>📋 Sommaire</h2>')
# parts.append('<ul>')
# for cat in categories:
# cat_name = cat.get("name", "Actualités")
# cat_anchor = self._make_anchor(cat_name)
# subgroups = cat.get("subgroups", [])
# total_items = sum(len(sg.get("items", [])) for sg in subgroups)
# parts.append(f'<li><a href="#{cat_anchor}"><strong>{html.escape(cat_name)}</strong></a> ({total_items} articles)')
# if len(subgroups) > 1 or (len(subgroups) == 1 and len(subgroups[0].get("items", [])) > 1):
# parts.append('<ul>')
# for sg in subgroups:
# sg_title = sg.get("title", "Divers")
# sg_anchor = self._make_anchor(f"{cat_name}-{sg_title}")
# item_count = len(sg.get("items", []))
# parts.append(f'<li><a href="#{sg_anchor}">{html.escape(sg_title)}</a> ({item_count})</li>')
# parts.append('</ul>')
# parts.append('</li>')
# parts.append('</ul>')
# parts.append('<hr>')
# --- Build Content by Category ---
for cat in categories:
cat_name = cat.get("name", "Actualités")
cat_anchor = self._make_anchor(cat_name)
subgroups = cat.get("subgroups", [])
if not subgroups:
continue
# Category header with emoji
cat_emoji = {
"Actualités": "📰",
"Tests & Critiques": "",
"Aperçus & Previews": "👁️",
"Vidéos": "🎬",
"Autres": "📁"
}.get(cat_name, "📌")
parts.append(f'<h2 id="{cat_anchor}">{cat_emoji} {html.escape(cat_name)}</h2>')
for sg in subgroups:
sg_title = sg.get("title", "Divers")
sg_anchor = self._make_anchor(f"{cat_name}-{sg_title}")
items = sg.get("items", [])
if not items:
continue
# Sub-group header (only if more than 1 item in subgroup)
if len(items) > 1:
parts.append(f'<h3 id="{sg_anchor}">{html.escape(sg_title)}</h3>')
for post in items:
title = post.get("title", "") or ""
linkURL = post.get("link", "") or ""
parts.append(f'<h4>{html.escape(title)}</h4>')
# --- YouTube embed / fallback
vid = post.get("yt_videoid") or extract_youtube_id(linkURL)
if vid:
watch_url = f"https://www.youtube.com/watch?v={vid}"
# Try provider HTML via oEmbed (as Ghost does)
embed_html = fetch_youtube_oembed_html(watch_url, timeout=10)
if embed_html:
parts.append(embed_html)
else:
# Fallback: leave the plain URL on its own line so Ghost may still auto-embed
parts.append(f'\n<p>{watch_url}</p>\n')
# Minimal fallback link (non-intrusive for email/web)
parts.append(f'<p><a href="{watch_url}">Voir sur YouTube</a></p>')
else:
# --- Texte + lien
ftext = ""
if "summary" in post and post["summary"]:
ftext = html.unescape(post["summary"])
ftext = re.sub("<[^<]+?>", "", ftext)
ftext = re.sub(r"L'article .* est apparu en premier sur .*", "", ftext)
if ftext:
parts.append(f"<p>{html.escape(ftext)}</p>")
if linkURL:
esc = html.escape(linkURL)
parts.append(f'<p><a href="{esc}">{esc}</a></p>')
# --- Images: first try RSS metadata, then crawl the page
found_image = False
for link in post.get("links", []) or []:
if link.get("type") in ("image/jpg", "image/jpeg", "image/png", "image/webp"):
imgUrl = link.get("href")
if imgUrl:
imgUrl = imgUrl.replace("/250x250/", "/990x320/")
if not first_image:
first_image = imgUrl
parts.append(f'<figure><img src="{html.escape(imgUrl)}" loading="lazy"></figure>')
found_image = True
# If no image from RSS, try to extract from the article page
if not found_image and linkURL:
crawled_img = extract_image_from_url(linkURL, timeout=8)
if crawled_img:
if not first_image:
first_image = crawled_img
parts.append(f'<figure><img src="{html.escape(crawled_img)}" loading="lazy"></figure>')
parts.append('<hr>')
# --- Sources
parts.append("<h3>📚 Sources</h3>")
for feed in feeds:
esc = html.escape(feed.url)
parts.append(f'<p><a href="{esc}">{esc}</a></p>')
parts.append('<p><em>Abonnez-vous pour recevoir chaque semaine les news et soutenir mon travail.</em></p>')
return "\n".join(parts), first_image
@staticmethod
def _make_anchor(text: str) -> str:
"""Convert text to a valid HTML anchor ID."""
# Remove accents and special chars, lowercase, replace spaces with dashes
anchor = text.lower()
anchor = re.sub(r'[àáâãäå]', 'a', anchor)
anchor = re.sub(r'[èéêë]', 'e', anchor)
anchor = re.sub(r'[ìíîï]', 'i', anchor)
anchor = re.sub(r'[òóôõö]', 'o', anchor)
anchor = re.sub(r'[ùúûü]', 'u', anchor)
anchor = re.sub(r'[ýÿ]', 'y', anchor)
anchor = re.sub(r'[ç]', 'c', anchor)
anchor = re.sub(r'[^a-z0-9\s-]', '', anchor)
anchor = re.sub(r'\s+', '-', anchor.strip())
return anchor or "section"
@staticmethod
def _format_duration(seconds: float) -> str:
seconds = int(seconds)
days, seconds = divmod(seconds, 86400)
hours, seconds = divmod(seconds, 3600)
minutes, seconds = divmod(seconds, 60)
parts = []
if days: parts.append(f"{days} days")
if hours: parts.append(f"{hours} hours")
if minutes: parts.append(f"{minutes} minutes")
if seconds: parts.append(f"{seconds} seconds")
return ", ".join(parts) if parts else "0 seconds"
async def run_weekly_on_saturday(self):
"""Run every Saturday at 12:00 (noon)."""
while True:
now = dt.datetime.now()
# Calculate next Saturday at 12:00
days_until_saturday = (5 - now.weekday()) % 7 # Saturday = 5
if days_until_saturday == 0 and now.hour >= 12:
days_until_saturday = 7 # Already past Saturday 12:00, wait for next week
next_run = (now + dt.timedelta(days=days_until_saturday)).replace(
hour=12, minute=0, second=0, microsecond=0
)
sleep_seconds = (next_run - now).total_seconds()
while sleep_seconds > 0:
LOG.info("Waiting for %s for next scan (Saturday noon)", self._format_duration(sleep_seconds))
await asyncio.sleep(min(sleep_seconds, 5 * 60))
now = dt.datetime.now()
sleep_seconds = (next_run - now).total_seconds()
LOG.info("Going to run the weekly task")
await self.weekly_task()
async def weekly_task(self):
"""Main weekly task: collect, filter, group, and publish."""
# Log newsletters (debug)
try:
nls = self.ghost.get_newsletters()
LOG.info("Newsletters: %s", ", ".join(f"{n.get('name')}[{n.get('slug')}]" for n in nls))
except Exception as e:
LOG.warning("Unable to list newsletters: %s", e)
title_post = "Les news de la semaine du " + self._fr_week_range()
LOG.info("Running weekly task : %s", title_post)
# (Re)charge les feeds
feeds_file = os.environ.get("FEEDS_FILE", "/data/feeds.txt")
if not os.path.isfile(feeds_file):
feeds_file = os.environ.get("FEEDS_FILE_FALLBACK", r"f:\workspace\Substack_JV\feeds.txt")
feeds: List[RSSfeed] = []
with open(feeds_file, encoding="utf-8") as f:
lines = [line.strip() for line in f if line.strip()]
for line in lines:
feeds.append(RSSfeed(line, "youtube" in line.lower()))
self.feeds = feeds
# Fenêtre: depuis 7 jours à 06:00 UTC
week_ago_6am_utc = dt.datetime.now(dt.timezone.utc).replace(
hour=6, minute=0, second=0, microsecond=0
) - dt.timedelta(days=7)
all_news_posts: List[dict] = []
for feed in self.feeds:
LOG.info("Scanning feed %s", feed.url)
content = self._safe_get(feed.url, timeout=30)
if not content:
continue
fp = feedparser.parse(content)
# Sélection des items de la semaine
new_entries = []
for e in fp.entries:
dte = self._entry_datetime(e)
if dte and dte > week_ago_6am_utc:
new_entries.append(e)
# Basic URL-based filtering (keep existing logic)
filtered = []
for e in new_entries:
linkURL = e.get("link", "") or ""
if "actugaming" in linkURL and ("puzzle-" in linkURL or "guide-" in linkURL):
continue
# enrich YouTube id if applicable
if feed.youtube and linkURL:
vid = extract_youtube_id(linkURL)
if vid:
e["yt_videoid"] = vid
filtered.append(e)
all_news_posts.extend(filtered)
if not all_news_posts:
LOG.warning("Aucun item récupéré (flux down ?). On n'envoie pas cette semaine.")
return
LOG.info("Collected %d items from feeds", len(all_news_posts))
# Use Mistral AI for filtering and grouping if available
if self.mistral:
LOG.info("Using Mistral AI to filter non-news content...")
filtered_posts = self.mistral.filter_news_items(all_news_posts, dry_run=self.dry_run)
LOG.info("After filtering: %d items (removed %d)",
len(filtered_posts), len(all_news_posts) - len(filtered_posts))
if filtered_posts:
LOG.info("Using Mistral AI to group items by category...")
categories = self.mistral.group_similar_items(filtered_posts)
total_cats = len(categories)
total_subgroups = sum(len(cat.get("subgroups", [])) for cat in categories)
LOG.info("Created %d categories with %d sub-groups", total_cats, total_subgroups)
else:
categories = []
else:
LOG.warning("No Mistral API key configured, skipping AI filtering/grouping")
# Fallback: single category with all items
categories = [{
"name": "Actualités de la semaine",
"subgroups": [{"title": "Toutes les news", "items": all_news_posts}]
}]
if not categories or all(
len(sg.get("items", [])) == 0
for cat in categories
for sg in cat.get("subgroups", [])
):
LOG.warning("No news items after filtering. Skipping this week.")
return
roundup_html, feature_image = self._build_html_roundup_grouped(categories, self.feeds)
# 1) Create draft (with feature image if any)
created = self.ghost.create_post_html(title_post, roundup_html, status="draft", feature_image=feature_image)
LOG.info("Created draft post: %s (id: %s)", created.get("title"), created.get("id"))
# 2) Publish + send email (unless dry-run mode)
if self.dry_run:
LOG.info("DRY-RUN MODE: Post created as draft but NOT published. URL: %s",
created.get("url", "N/A"))
LOG.info("DRY-RUN MODE: Review the draft in Ghost admin, then publish manually if satisfied.")
return
published = self.ghost.publish_post(
post_id=created["id"],
updated_at=created["updated_at"],
newsletter_slug=os.environ.get("GHOST_NEWSLETTER_SLUG"),
email_segment=os.environ.get("GHOST_EMAIL_SEGMENT"),
)
LOG.info("Published post: %s (emailed via newsletter)", published.get("url"))
# ------------- main -------------
async def main():
setuplogger()
parser = argparse.ArgumentParser()
parser.add_argument("--runonce", action="store_true", help="Run now and exit (no scheduler)")
parser.add_argument("--dry-run", action="store_true", dest="dry_run",
help="Run immediately, create draft but do NOT publish (for testing)")
args = parser.parse_args()
# Feeds init (list may be reloaded inside task)
feeds: List[RSSfeed] = []
feeds_file = os.environ.get("FEEDS_FILE", "/data/feeds.txt")
if not os.path.isfile(feeds_file):
feeds_file = os.environ.get("FEEDS_FILE_FALLBACK", r"f:\workspace\Substack_JV\feeds.txt")
with open(feeds_file, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
feeds.append(RSSfeed(line, "youtube" in line.lower()))
admin_url = os.environ["GHOST_ADMIN_URL"] # e.g. https://ghostadmin.zep.best/ghost/api/admin/
admin_key = os.environ["GHOST_ADMIN_KEY"] # integration_id:secret_hex
mistral_api_key = os.environ.get("MISTRAL_API_KEY") # Optional: for AI filtering/grouping
if not mistral_api_key:
LOG.warning("MISTRAL_API_KEY not set. AI filtering and grouping will be disabled.")
task = GhostTask(
feeds=feeds,
admin_url=admin_url,
admin_key=admin_key,
mistral_api_key=mistral_api_key,
newsletter_slug=os.environ.get("GHOST_NEWSLETTER_SLUG"),
email_segment=os.environ.get("GHOST_EMAIL_SEGMENT"),
dry_run=args.dry_run,
)
LOG.info("Starting bot (weekly mode%s)", " - DRY RUN" if args.dry_run else "")
if args.runonce:
await task.weekly_task()
return
if args.dry_run:
LOG.info("DRY-RUN: Running weekly task immediately (will create draft only)")
await task.weekly_task()
return
# Démarrage: publier l'édition de la semaine si elle n'existe pas encore
await task.maybe_run_this_week()
# Planification hebdomadaire le samedi à 12:00 Europe/Brussels
await task.run_weekly_on_saturday()
if __name__ == "__main__":
asyncio.run(main())

1040
presquegratos.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,3 +1,7 @@
requests
feedparser feedparser
python-substack PyJWT>=2.7,<3
requests>=2.31
feedparser>=6.0
aiohttp
bs4
playwright

49
storage.py Normal file
View File

@@ -0,0 +1,49 @@
# storage.py
from __future__ import annotations
import sqlite3, pathlib, datetime as dt
from typing import Optional, Iterable, Tuple
import os
DB_PATH = "/data/published.db" # bind-mount ./data:/data in docker
_SCHEMA = """
PRAGMA journal_mode = WAL;
CREATE TABLE IF NOT EXISTS published_items(
platform TEXT NOT NULL, -- e.g. xgp | egs | psplus
key TEXT PRIMARY KEY, -- your dedupe key (see below)
first_seen_utc TEXT NOT NULL, -- ISO-8601
last_post_id TEXT -- Ghost post id that recorded it
);
CREATE INDEX IF NOT EXISTS idx_platform ON published_items(platform);
"""
class Storage:
def __init__(self, db_path: str = DB_PATH):
pathlib.Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self.conn = sqlite3.connect(db_path)
self.conn.execute("PRAGMA foreign_keys = ON;")
for stmt in filter(None, _SCHEMA.split(";")):
if stmt.strip():
self.conn.execute(stmt)
def seen(self, key: str) -> bool:
cur = self.conn.execute("SELECT 1 FROM published_items WHERE key=?", (key,))
return cur.fetchone() is not None
def remember(self, platform: str, key: str, post_id: Optional[str]):
self.conn.execute(
"INSERT OR IGNORE INTO published_items(platform,key,first_seen_utc,last_post_id) VALUES(?,?,?,?)",
(platform, key, dt.datetime.utcnow().isoformat(), post_id),
)
if post_id:
self.conn.execute("UPDATE published_items SET last_post_id=? WHERE key=?", (post_id, key))
self.conn.commit()
def bulk_remember(self, platform: str, pairs: Iterable[Tuple[str, Optional[str]]]):
rows = [(platform, k, dt.datetime.utcnow().isoformat(), pid) for (k, pid) in pairs]
self.conn.executemany(
"INSERT OR IGNORE INTO published_items(platform,key,first_seen_utc,last_post_id) VALUES(?,?,?,?)",
rows
)
self.conn.commit()

View File

@@ -1,7 +1,58 @@
#!/bin/bash #!/bin/sh
set -eu
# Pull the latest changes log() { printf '%s %s\n' "[$(date -u +%FT%TZ)]" "$*"; }
git pull origin main
# Run your Python script stop() {
python Post_RSS_on_SubStack.py log "stopping..."
[ -n "${PID1-}" ] && kill -TERM "$PID1" 2>/dev/null || true
[ -n "${PID2-}" ] && kill -TERM "$PID2" 2>/dev/null || true
[ -n "${TPID-}" ] && kill -TERM "$TPID" 2>/dev/null || true
wait || true
exit 0
}
trap stop INT TERM
cd /app
export GIT_TERMINAL_PROMPT=0
# MAJ forcée du code à chaque (re)démarrage
if [ -d .git ]; then
i=0
while [ $i -lt 5 ]; do
if git fetch --all --prune && git reset --hard origin/main; then
log "git updated to origin/main"
break
fi
i=$((i+1))
log "git update failed (attempt $i/5); retrying in 10s..."
sleep 10
done
[ $i -ge 5 ] && log "WARNING: git update failed after 5 attempts — continuing with current code"
else
log "WARNING: /app is not a git repo; skipping git update"
fi
# Dossiers logs
mkdir -p /var/log
: > /var/log/daily.log
: > /var/log/weekly.log
# Lancer les 2 bots (logs non bufferisés)
python -u post_rss_to_ghost.py > /var/log/daily.log 2>&1 & PID1=$!
python -u presquegratos.py > /var/log/weekly.log 2>&1 & PID2=$!
# Suivre les 2 fichiers de logs dans la sortie du conteneur
tail -F /var/log/daily.log /var/log/weekly.log &
TPID=$!
# Attente portable (pas de wait -n en /bin/sh)
while :; do
if ! kill -0 "$PID1" 2>/dev/null; then wait "$PID1" || true; break; fi
if ! kill -0 "$PID2" 2>/dev/null; then wait "$PID2" || true; break; fi
sleep 1
done
# Si un des scripts sort, on arrête le tail (le trap TERM arrêtera l'autre script)
kill -TERM "$TPID" 2>/dev/null || true
wait || true

23
xboxsyde.py Normal file
View File

@@ -0,0 +1,23 @@
import feedparser
import io
import html
import datetime
import requests
import time
url = r'https://www.xboxygen.com/spip.php?page=backend'
html_text = requests.get(url).text
news = feedparser.parse(html_text)
yesterday_6am = datetime.datetime.now(datetime.timezone.utc).replace(hour=6, minute=0, second=0, microsecond=0) - datetime.timedelta(days=1)
try:
new_posts = [entry for entry in news.entries if datetime.datetime.strptime(entry.published.replace('GMT', '+0000'), '%a, %d %b %Y %H:%M:%S %z') > yesterday_6am]
except:
new_posts = [entry for entry in news.entries if datetime.datetime.fromtimestamp(time.mktime(entry.updated_parsed)).replace(tzinfo=datetime.timezone.utc) > yesterday_6am]
#else if
#entry.updated.replace('GMT', '+0000'), '%a, %d %b %Y %H:%M:%S %z'
print(new_posts)