Compare commits

..

1 Commits

Author SHA1 Message Date
Gaël Honorez
30f88b1d51 Merge branch 'main' 2024-01-01 19:54:01 +01:00
14 changed files with 220 additions and 2331 deletions

View File

@@ -1,19 +1,8 @@
FROM python:3.12 FROM python:3.8
RUN apt-get update && apt-get install -y git RUN apt-get update && apt-get install -y git
RUN git clone https://gitea.zep.best/zep/Substack_JV.git /app RUN git clone http://192.168.1.25:8124/zep/Substack_JV.git /app
WORKDIR /app WORKDIR /app
RUN pip install --upgrade pip
COPY requirements.txt .
RUN pip install -r requirements.txt RUN pip install -r requirements.txt
ENV TZ=Europe/Brussels
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN playwright install --with-deps chromium
COPY update_and_run.sh /app COPY update_and_run.sh /app
# Normalize line endings (Windows CRLF -> LF) and ensure readable RUN chmod +x /app/update_and_run.sh
RUN sed -i 's/\r$//' /app/update_and_run.sh && chmod a+r /app/update_and_run.sh CMD ["./update_and_run.sh"]
# Single entrypoint: run via sh (no exec bit required, survives noexec mounts)
ENTRYPOINT ["sh", "/app/update_and_run.sh"]

209
Post_RSS_on_SubStack.py Normal file
View File

@@ -0,0 +1,209 @@
import asyncio
import argparse
import requests
import feedparser
import io
import html
import datetime
import logging
import os
import re
from logging.handlers import RotatingFileHandler
import random
from substack import Api
from substack.post import Post
LOG = logging.getLogger('bot')
LOG_PATTERN = logging.Formatter('%(asctime)s:%(levelname)s: [%(filename)s] %(message)s')
def setuplogger():
conf_filename = None
steam_handler = logging.StreamHandler()
steam_handler.setFormatter(LOG_PATTERN)
steam_handler.setLevel(logging.DEBUG)
def setup_logger(logger_name, file_name=None, add_steam=False):
file_name = file_name or logger_name
log_filename = f"{file_name}.log"
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
file_handler = RotatingFileHandler(log_filename, "a", 1000000, 1)
file_handler.setFormatter(LOG_PATTERN)
logger.addHandler(file_handler)
if add_steam:
logger.addHandler(steam_handler)
setup_logger("bot", conf_filename, True)
class RSSfeed():
def __init__(self, url, yt=False):
self.url = url
self.youtube = yt
class SubStackTask:
def __init__(self, login, password, account, feeds):
self.api = Api(
email=login,
password=password,
publication_url=account,
)
self.user_id = self.api.get_user_id()
self.feeds = feeds
def get_fr_date(self):
# Mapping of English month names to French
months_en_to_fr = {
'January': 'Janvier', 'February': 'Février', 'March': 'Mars',
'April': 'Avril', 'May': 'Mai', 'June': 'Juin',
'July': 'Juillet', 'August': 'Août', 'September': 'Septembre',
'October': 'Octobre', 'November': 'Novembre', 'December': 'Décembre'
}
today = datetime.datetime.now()
formatted_date = today.strftime("%d %B %Y")
# Replace the English month with the French month
for en, fr in months_en_to_fr.items():
formatted_date = formatted_date.replace(en, fr)
return formatted_date
async def run_daily_at_6_am(self):
while True:
now = datetime.datetime.now()
# Calculate the time until 6 AM next day
next_run = (now + datetime.timedelta(days=1)).replace(hour=6, minute=5, second=0, microsecond=0)
sleep_seconds = (next_run - now).total_seconds()
LOG.info("Waiting for " + str(sleep_seconds) + " seconds for next scan")
# Wait until the next run time
await asyncio.sleep(sleep_seconds)
# Run the daily task
await self.daily_task()
async def daily_task(self):
title_post = "Les news du " + self.get_fr_date()
sub_stack_post = Post(
title=title_post,
subtitle="",
user_id=self.user_id
)
midnight_today = datetime.datetime.now(datetime.timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
yesterday_6am = datetime.datetime.now(datetime.timezone.utc).replace(hour=6, minute=0, second=0, microsecond=0) - datetime.timedelta(days=1)
formatted_date = midnight_today.strftime('%a, %d %b %Y %H:%M:%S %z')
all_news_posts = []
for feed in self.feeds:
html_text = requests.get(feed.url).text
newsFeed = feedparser.parse(html_text)
if feed.youtube is True:
new_posts = [entry for entry in newsFeed.entries if datetime.datetime.fromisoformat(entry.published) > yesterday_6am]
else:
new_posts = [entry for entry in newsFeed.entries if datetime.datetime.strptime(entry.published.replace('GMT', '+0000'), '%a, %d %b %Y %H:%M:%S %z') > yesterday_6am]
all_news_posts.extend(new_posts)
random.shuffle(all_news_posts)
for post in all_news_posts:
linkURL = post["link"]
title = post["title"]
ftext = ""
LOG.info("Posting " + str(title))
if "summary" in post:
ftext = html.unescape(post["summary"])
# Using regular expressions to remove HTML tags
ftext = re.sub('<[^<]+?>', '', ftext)
pattern = r"Larticle .* est apparu en premier sur .*"
ftext = re.sub(pattern, '', ftext)
if "yt_videoid" in post:
sub_stack_post.add({"type":"heading", "level":3, "content": title})
videoId = post["yt_videoid"]
sub_stack_post.add({"type":"youtube2", "src": videoId })
sub_stack_post.add({'type': 'paragraph', 'content': [
{'content': linkURL, 'marks': [{'type': "link", 'href': linkURL}]}]})
else:
if ftext != "":
sub_stack_post.add({"type":"heading", "level":3, "content": title})
sub_stack_post.add({"type":"paragraph", "content": ftext })
sub_stack_post.add({'type': 'paragraph', 'content': [
{'content': linkURL, 'marks': [{'type': "link", 'href': linkURL}]}]})
if "links" in post:
for link in post["links"]:
if link["type"] == "image/jpg":
imgUrl = link["href"]
sub_stack_post.add({'type': 'captionedImage', 'src': imgUrl})
sub_stack_post.add({"type":"horizontal_rule"})
sub_stack_post.add({"type":"heading", "level":3, "content": "Sources"})
for feed in self.feeds:
sub_stack_post.add({'type': 'paragraph', 'content': [
{'content': feed.url, 'marks': [{'type': "link", 'href': feed.url}]}]})
sub_stack_post.add({"type":"subscribeWidget", "message":"Abonnez-vous gratuitement pour recevoir chaque jour les news dans votre e-mail et soutenir mon travail."})
draft = self.api.post_draft(sub_stack_post.get_draft())
self.api.prepublish_draft(draft.get("id"))
self.api.publish_draft(draft.get("id"))
async def main(login, password, account):
setuplogger()
if os.path.exists("last_scan_date.txt"):
with open("last_scan_date.txt", "r") as f:
last_post_date = datetime.datetime.strptime(f.read().strip(), '%a, %d %b %Y %H:%M:%S %z')
else:
last_post_date = datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
feeds = []
feeds.append(RSSfeed("https://www.factornews.com/rss.xml"))
feeds.append(RSSfeed("https://nofrag.com/feed"))
feeds.append(RSSfeed("https://dystopeek.fr/feed/"))
feeds.append(RSSfeed("https://thepixelpost.com/rss/"))
feeds.append(RSSfeed("https://yamukass.substack.com/feed"))
feeds.append(RSSfeed("https://tseret.com/categorie/tests/feed"))
feeds.append(RSSfeed("https://www.gamesidestory.com/feed"))
feeds.append(RSSfeed("https://www.nintendo-town.fr/feed"))
feeds.append(RSSfeed("https://www.youtube.com/feeds/videos.xml?channel_id=UC-OvBDfZGn1OdsqMBwkOI_A", True))
feeds.append(RSSfeed("https://www.youtube.com/feeds/videos.xml?playlist_id=PLZRiqJjIUlDTrwYs_UqEIts5fVaBpaIEz", True))
task = SubStackTask(login, password, account, feeds)
LOG.info("Starting bot")
await task.run_daily_at_6_am()
#await task.daily_task()
if __name__ == "__main__":
asyncio.run(main("gael.honorez@gmail.com", "f3PaTGedjFc2gkr1ypi5", "https://aggregateurjvfr.substack.com"))

View File

@@ -1,162 +0,0 @@
# backfill_from_ghost.py
from __future__ import annotations
import os, re, sys, html
from typing import Dict, List, Optional
import requests
from bs4 import BeautifulSoup
# Reuse your existing GhostAdmin client (same headers/base/proxy behavior)
# Adjust the import path if your Ghost client lives elsewhere.
from presquegratos import GhostAdmin
from storage import Storage
from keys import xgp_key, egs_key, psplus_key
# ---------------- Ghost helpers (reusing your admin client) ----------------
def ghost_list_posts(ghost: GhostAdmin, page: int = 1) -> Dict:
# Minimal params: avoid 'filter' and 'fields' to dodge 400 behind __bot proxy
url = ghost.base + "posts/"
params = {
"limit": "50",
"page": str(page),
"order": "published_at DESC",
"formats": "lexical,html", # <-- IMPORTANT
}
r = requests.get(url, headers=ghost._headers(), params=params, timeout=30)
r.raise_for_status()
return r.json()
def list_recap_posts(ghost: GhostAdmin, hard_limit: int = 2000) -> List[Dict]:
posts: List[Dict] = []
page = 1
while True:
data = ghost_list_posts(ghost, page=page)
batch = data.get("posts", [])
if not batch:
break
# client-side filter to be robust to proxy quirks
for p in batch:
title = (p.get("title") or "").strip()
if title.startswith("Récap hebdo"):
posts.append(p)
if len(batch) < 50 or len(posts) >= hard_limit:
break
page += 1
return posts
# ---------------- Parsing helpers (unchanged) ----------------
#MS_STORE_RE = re.compile(r"(?:microsoft|xbox)\.com/.*/store/.*/([0-9A-Z]{12,})", re.I)
MS_STORE_RE = re.compile(r"(?:xbox|microsoft)\.com/.*/store/.*/([0-9A-Z]{12,16})", re.I)
EPIC_RE = re.compile(r"epicgames\.com/store/.*/p/([\w\-]+)", re.I)
PSBLOG_RE = re.compile(r"blog\.playstation\.com/.*", re.I)
def clean_text(s: str) -> str:
return re.sub(r"\s+", " ", html.unescape(s or "")).strip()
def extract_sections(soup: BeautifulSoup) -> Dict[str, BeautifulSoup]:
sections: Dict[str, BeautifulSoup] = {}
current = None
current_key = None
for node in soup.find_all(["h2","h3","h4","p","ul","ol","div","section"]):
if node.name in ("h2","h3","h4"):
title = clean_text(node.get_text())
key = None
tl = title.lower()
if "game pass" in tl:
key = "xgp"
elif "egs" in tl or "epic" in tl:
key = "egs"
elif "ps plus" in tl or "ps+" in tl:
key = "psplus"
if key:
current_key = key
current = sections[key] = soup.new_tag("div")
continue
if current_key and current is not None:
current.append(node)
return sections
def parse_xgp(section: BeautifulSoup) -> List[Dict]:
items = []
for a in section.find_all("a", href=True):
href = a["href"]
m = MS_STORE_RE.search(href)
title = clean_text(a.get_text())
if m or title:
productId = m.group(1) if m else None
items.append({"title": title, "productId": productId})
uniq, seen = [], set()
for it in items:
k = xgp_key(it)
if k not in seen:
uniq.append(it); seen.add(k)
return uniq
def parse_egs(section: BeautifulSoup) -> List[Dict]:
items = []
for a in section.find_all("a", href=True):
if not EPIC_RE.search(a["href"]):
continue
title = clean_text(a.get_text()) or clean_text(a.get("title"))
items.append({"title": title, "start": ""})
uniq, seen = [], set()
for it in items:
k = egs_key(it)
if k not in seen:
uniq.append(it); seen.add(k)
return uniq
def parse_psplus(section: BeautifulSoup, post_title: str) -> Optional[Dict]:
a = section.find("a", href=PSBLOG_RE)
url = a["href"] if a else ""
m = re.search(r"(\d{2})-(\d{2})-(\d{4})", post_title)
iso = ""
if m:
d, mth, y = m.group(1), m.group(2), m.group(3)
iso = f"{y}-{mth}-{d}"
return {"url": url, "date": iso}
# ---------------- Main backfill ----------------
def backfill():
# Use the same env your main script uses; GhostAdmin will read them internally or
# you can pass them explicitly if your class expects (base_url, admin_key).
ghost = GhostAdmin(
admin_url=os.environ.get("GHOST_ADMIN_URL", "").rstrip("/") + "/",
admin_key=os.environ.get("GHOST_ADMIN_KEY", "")
)
store = Storage()
posts = list_recap_posts(ghost)
print(f"Found {len(posts)} recap posts.")
total_xgp = total_egs = total_ps = 0
dedup = []
for p in posts:
pid = p["id"]
title = p.get("title") or ""
html_body = p.get("html") or ""
soup = BeautifulSoup(html_body, "html.parser")
sections = extract_sections(soup)
for it in parse_xgp(sections.get("xgp", BeautifulSoup("", "html.parser"))):
key = xgp_key(it)
if not key in dedup:
store.remember("xgp", key, pid); total_xgp += 1
dedup.append(key)
# for it in parse_egs(sections.get("egs", BeautifulSoup("", "html.parser"))):
# store.remember("egs", egs_key(it), pid); total_egs += 1
# if "psplus" in sections:
# item = parse_psplus(sections["psplus"], title)
# store.remember("psplus", psplus_key(item), pid); total_ps += 1
print(f"Backfilled from: {title}")
print(f"Done. Inserted ~ XGP:{total_xgp} | EGS:{total_egs} | PS+:{total_ps}")
if __name__ == "__main__":
backfill()

View File

@@ -1 +0,0 @@
{"__cf_bm": "95up0icsYyESvD6suTUFG05xaWxwEr5_xuHUOv32G9I-1720025055-1.0.1.1-NlvsLW9j26FX8aPpLmVETEJ0zd.VyXefLr75kvT6iC.zHnPtkbIWgfesI0VaUGuvwV62qHpctJEoahLR9TIuHQ", "ab_experiment_sampled": "%22false%22", "ab_testing_id": "%22a6e7ba67-7dc0-452c-a935-d2f2bddd5edf%22", "ajs_anonymous_id": "%22e4535e95-1c5b-4173-82db-47807c57fb38%22", "cookie_storage_key": "f666a42c-49e8-47a2-bdbc-6eece0d6a06e", "substack.sid": "s%3ARLYSI2_XaTlGuYIpTYWjS8ib48PpuE0S.jNwCzcGzKUvUAuFdLNdfgxwewTUawIoDDZ05moubvzM", "visit_id": "%7B%22id%22%3A%22a0d46be8-56f4-406f-b1d7-14c41369b737%22%2C%22timestamp%22%3A%222024-07-03T16%3A44%3A13.349Z%22%7D", "AWSALBTG": "yw2xMbYVFbKWSzJiQsdCKp7mMH+wQ5T4/JIUc1TvywUi5iIJVXuO21AMhb+oPgegicdtpekLTDTl+zWKEekRsurS7+20skhmPxZXJf/Tl7jBd/PecbW7qa3DHkPvQtWz+SWD8+7P1rNjmY9lmyZgzH/ZeGgeiishRz9gsGO0OT/d", "AWSALBTGCORS": "yw2xMbYVFbKWSzJiQsdCKp7mMH+wQ5T4/JIUc1TvywUi5iIJVXuO21AMhb+oPgegicdtpekLTDTl+zWKEekRsurS7+20skhmPxZXJf/Tl7jBd/PecbW7qa3DHkPvQtWz+SWD8+7P1rNjmY9lmyZgzH/ZeGgeiishRz9gsGO0OT/d"}

View File

@@ -1,9 +0,0 @@
version: '3.3'
services:
substackjv:
build: .
volumes:
- /path/to/your/host/directory:/data
environment:
- TZ=Europe/Brussels

View File

@@ -1,7 +0,0 @@
set GHOST_ADMIN_KEY=68bad0e13546e700012dd65d:116a81b7e189d3b3d3b86082f97ef65daedb06498a3f1f902b8e0c08d095dc19
set GHOST_ADMIN_URL=https://ghostadmin.zep.best/ghost/api/admin/__bot/FF4440EBA737506D397C170A8422109C357AA7582F10938B7C5F11D6B652F5D4
set GHOST_EMAIL_SEGMENT=status:free
set GHOST_NEWSLETTER_SLUG=default-newsletter
set GHOST_CONTENT_URL=https://ghost.zep.best
set DB_FILE_FALLBACK=f:\workspace\Substack_JV\data\published.db
set MISTRAL_API_KEY=tQJHvYlmwz1ihKxOhXS3FmDNTRhBh6b3

View File

@@ -1,15 +0,0 @@
https://www.factornews.com/rss.xml
https://nofrag.com/feed
https://dystopeek.fr/feed/
https://thepixelpost.com/rss/
https://yamukass.substack.com/feed
https://tseret.com/categorie/tests/feed
https://www.gamesidestory.com/feed
https://www.nintendo-town.fr/feed
https://jesuisungameur.com/feed
https://www.switch-actu.fr/categorie/tests/tests-de-jeux/feed
https://www.playscope.com/category/articles/test-gaming/feed
https://jrpgfr.net/category/test/feed
https://jv.jeuxonline.info/rss/dossiers/rss.xml
https://www.youtube.com/feeds/videos.xml?channel_id=UC-OvBDfZGn1OdsqMBwkOI_A
https://www.youtube.com/feeds/videos.xml?playlist_id=PLZRiqJjIUlDTrwYs_UqEIts5fVaBpaIEz

20
keys.py
View File

@@ -1,20 +0,0 @@
# keys.py (or inline in your main)
def xgp_key(item) -> str:
# Prefer stable Microsoft Store productId if present; fallback to normalized title.
pid = (item.get("productId") or "").strip()
if pid:
return f"item:xgp:{pid}"
title = (item.get("title") or "").strip().lower()
return f"item:xgp:title:{title}"
def egs_key(item) -> str:
# Use title + start window (your fetcher usually knows the free-week start)
title = (item.get("title") or "").strip()
start = (item.get("start") or "").strip() # ISO or YYYY-MM-DD
return f"item:egs:{title}|{start}"
def psplus_key(item) -> str:
# Use official PS Blog URL + the published month (or your computed date)
url = (item.get("url") or "").strip()
date = (item.get("date") or "").strip()
return f"item:psplus:{url}|{date}"

View File

@@ -1,928 +0,0 @@
import asyncio
import argparse
import datetime as dt
import html
import json
import logging
import os
import re
import time
from logging.handlers import RotatingFileHandler
from typing import Optional, List, Dict
import feedparser
import requests
import jwt
import zoneinfo # Python 3.9+
from urllib.parse import urlparse, parse_qs, urljoin
# ------------- Web Crawler for Images -------------
def extract_image_from_url(url: str, timeout: int = 10) -> Optional[str]:
"""
Fetch a webpage and extract the best image (og:image, twitter:image, or first large image).
Returns the image URL or None.
"""
try:
resp = requests.get(
url,
timeout=timeout,
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml",
},
allow_redirects=True,
)
resp.raise_for_status()
html_content = resp.text
# Try OpenGraph image first (most reliable)
og_match = re.search(r'<meta[^>]+property=["\']og:image["\'][^>]+content=["\']([^"\']+)["\']', html_content, re.IGNORECASE)
if not og_match:
og_match = re.search(r'<meta[^>]+content=["\']([^"\']+)["\'][^>]+property=["\']og:image["\']', html_content, re.IGNORECASE)
if og_match:
img_url = og_match.group(1)
return urljoin(url, img_url)
# Try Twitter card image
tw_match = re.search(r'<meta[^>]+name=["\']twitter:image["\'][^>]+content=["\']([^"\']+)["\']', html_content, re.IGNORECASE)
if not tw_match:
tw_match = re.search(r'<meta[^>]+content=["\']([^"\']+)["\'][^>]+name=["\']twitter:image["\']', html_content, re.IGNORECASE)
if tw_match:
img_url = tw_match.group(1)
return urljoin(url, img_url)
# Fallback: look for article/main image
article_img = re.search(r'<article[^>]*>.*?<img[^>]+src=["\']([^"\']+)["\']', html_content, re.IGNORECASE | re.DOTALL)
if article_img:
img_url = article_img.group(1)
# Skip tiny images, icons, avatars
if not any(skip in img_url.lower() for skip in ['avatar', 'icon', 'logo', 'emoji', '1x1', 'pixel']):
return urljoin(url, img_url)
return None
except Exception as e:
LOG.debug("Failed to extract image from %s: %s", url, e)
return None
# ------------- YouTube helpers -------------
def fetch_youtube_oembed_html(youtube_url: str, timeout: int = 10) -> Optional[str]:
"""
Get YouTube oEmbed HTML exactly as provided and wrap it as a Ghost embed card.
"""
try:
resp = requests.get(
"https://www.youtube.com/oembed",
params={"url": youtube_url, "format": "json"},
headers={"User-Agent": "ghost-bot/1.0"},
timeout=timeout,
)
resp.raise_for_status()
data = resp.json()
html_content = data.get("html")
if not html_content:
return None
# Wrap in Ghost embed card container; do NOT alter the iframe attributes.
return f'<figure class="kg-card kg-embed-card">{html_content}</figure>'
except Exception:
return None
def youtube_thumbnail_url(video_id: str) -> str:
return f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg"
def extract_youtube_id(url: str) -> Optional[str]:
try:
u = urlparse(url)
host = u.netloc.lower()
if host.endswith("youtube.com"):
if u.path == "/watch":
return parse_qs(u.query).get("v", [None])[0]
m = re.match(r"^/(shorts/|live/)?([A-Za-z0-9_-]{6,})", u.path)
if m:
return m.group(2)
if host == "youtu.be":
slug = u.path.strip("/").split("/")[0]
return slug or None
except Exception:
return None
return None
# ------------- Logging -------------
LOG = logging.getLogger("bot")
LOG_PATTERN = logging.Formatter("%(asctime)s:%(levelname)s: [%(filename)s] %(message)s")
def setuplogger():
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(LOG_PATTERN)
stream_handler.setLevel(logging.DEBUG)
file_handler = RotatingFileHandler("bot.log", "a", 1_000_000, 1)
file_handler.setFormatter(LOG_PATTERN)
LOG.setLevel(logging.DEBUG)
LOG.addHandler(stream_handler)
LOG.addHandler(file_handler)
# ------------- Model -------------
class RSSfeed:
def __init__(self, url: str, yt: bool = False):
self.url = url
self.youtube = yt
# ------------- Mistral AI Client -------------
class MistralClient:
"""Client for Mistral AI API to filter and group news items."""
def __init__(self, api_key: str, model: str = "mistral-small-latest"):
self.api_key = api_key
self.model = model
self.base_url = "https://api.mistral.ai/v1/chat/completions"
def _call_api(self, messages: List[Dict], temperature: float = 0.3) -> Optional[str]:
"""Make a call to the Mistral API."""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
payload = {
"model": self.model,
"messages": messages,
"temperature": temperature,
"response_format": {"type": "json_object"},
}
try:
resp = requests.post(self.base_url, headers=headers, json=payload, timeout=120)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"]
except Exception as e:
LOG.error("Mistral API error: %s", e)
return None
def filter_news_items(self, items: List[dict], dry_run: bool = False) -> List[dict]:
"""
Filter out non-news items (tips, walkthroughs, guides, tutorials).
Returns only actual game news items.
"""
if not items:
return []
# Prepare items for analysis
items_for_analysis = []
for i, item in enumerate(items):
items_for_analysis.append({
"id": i,
"title": item.get("title", ""),
"link": item.get("link", ""),
"summary": (item.get("summary", "") or "")[:300], # Truncate for API
})
# Split into batches to avoid token limits
batch_size = 50
filtered_ids = set()
for batch_start in range(0, len(items_for_analysis), batch_size):
batch = items_for_analysis[batch_start:batch_start + batch_size]
prompt = f"""Tu analyses des articles de sites de jeux vidéo. Tu dois identifier UNIQUEMENT les articles à EXCLURE.
EXCLURE UNIQUEMENT si le titre contient EXPLICITEMENT UN de ces mots-clés:
- "guide" (le mot exact)
- "soluce" (le mot exact)
- "astuce" (le mot exact)
- "solution" (le mot exact, pas "résolution")
- "code promo"
- "bon plan"
- "-20%" ou "-30%" etc (réductions)
- "tuto" ou "tutoriel"
- "comment faire"
- "how to"
NE JAMAIS EXCLURE:
- "Early Access" = news de sortie anticipée, À GARDER
- "Test" ou "Review" = critique, À GARDER
- "Partie Rapide" = émission/podcast, À GARDER
- Tout article de news, annonce, sortie, preview
- Tout article d'opinion, éditorial, récap
- Tout le reste qui ne contient pas les mots-clés d'exclusion ci-dessus
Articles à analyser:
{json.dumps(batch, ensure_ascii=False, indent=2)}
Retourne un JSON avec "exclude_ids" contenant UNIQUEMENT les IDs des articles guides/soluces/promos.
Si aucun article ne correspond aux critères d'exclusion, retourne {{"exclude_ids": []}}
Sois TRÈS conservateur - en cas de doute, NE PAS exclure."""
messages = [{"role": "user", "content": prompt}]
response = self._call_api(messages)
if response:
try:
result = json.loads(response)
excluded_ids = set(result.get("exclude_ids", []))
# Keep all items NOT in excluded_ids
batch_ids = {item["id"] for item in batch}
kept_ids = batch_ids - excluded_ids
filtered_ids.update(kept_ids)
except json.JSONDecodeError:
LOG.warning("Failed to parse Mistral response for filtering")
# Fallback: include all items from this batch
filtered_ids.update(item["id"] for item in batch)
else:
# Fallback: include all items from this batch
filtered_ids.update(item["id"] for item in batch)
# Log filtered out items in dry-run mode
if dry_run:
excluded_ids = set(range(len(items))) - filtered_ids
if excluded_ids:
LOG.debug("=== FILTERED OUT (non-news) ===")
for i in sorted(excluded_ids):
LOG.debug(" [EXCLUDED] %s", items[i].get("title", "No title"))
LOG.debug("=== KEPT (news) ===")
for i in sorted(filtered_ids):
if i < len(items):
LOG.debug(" [KEPT] %s", items[i].get("title", "No title"))
return [items[i] for i in sorted(filtered_ids) if i < len(items)]
def group_similar_items(self, items: List[dict]) -> List[Dict]:
"""
Group news items by category (News, Tests/Reviews, Previews, etc.)
with sub-groups by game/topic within each category.
Returns a list of categories, each with sub-groups containing items.
"""
if not items:
return []
# Prepare items for analysis
items_for_analysis = []
for i, item in enumerate(items):
items_for_analysis.append({
"id": i,
"title": item.get("title", ""),
"link": item.get("link", ""),
})
prompt = f"""Organise ces articles de jeux vidéo en CATÉGORIES et SOUS-GROUPES.
Articles à organiser:
{json.dumps(items_for_analysis, ensure_ascii=False, indent=2)}
CATÉGORIES (utilise ces noms exacts):
1. "Actualités" - News, annonces, sorties, mises à jour, industrie
2. "Tests & Critiques" - Reviews, tests, avis, notes
3. "Aperçus & Previews" - Previews, impressions, démos, hands-on
4. "Vidéos" - Trailers, gameplay vidéos, podcasts
5. "Autres" - Le reste
RÈGLES DE GROUPEMENT (TRÈS IMPORTANT):
- Groupe par FRANCHISE ou SÉRIE (ex: tous les "Final Fantasy" ensemble, même FF7, FF16, FF XIV)
- Groupe par UNIVERS (ex: "Warhammer 40K" = Space Marine + Dawn of War + Darktide)
- Groupe par ÉVÉNEMENT (ex: "Nintendo Direct", "State of Play", "Game Awards")
- N'utilise JAMAIS de noms de sites web comme groupes (pas "NoFrag", "JeuxOnline", etc.)
EXEMPLES DE GROUPEMENTS CORRECTS:
- "Final Fantasy VII Rebirth sur Switch 2" + "Final Fantasy VII Remake Intergrade en tête" → groupe "Final Fantasy"
- "Techmarine dans Space Marine 2" + "Dawn of War 4 gameplay Ork" → groupe "Warhammer 40K"
- "GTA 6 trailer" + "GTA 6 date de sortie" → groupe "GTA 6"
- "Nintendo Direct annoncé" + "Zelda dans le Nintendo Direct" → groupe "Nintendo Direct"
Retourne ce JSON:
{{
"categories": [
{{
"name": "Actualités",
"subgroups": [
{{"title": "Final Fantasy", "item_ids": [0, 3, 7]}},
{{"title": "Warhammer 40K", "item_ids": [1, 2]}},
{{"title": "Steam", "item_ids": [5]}}
]
}}
]
}}
IMPORTANT: Chaque article dans UN SEUL sous-groupe. Titre = nom de franchise/série/univers, PAS nom de site."""
messages = [{"role": "user", "content": prompt}]
response = self._call_api(messages, temperature=0.2)
if response:
try:
result = json.loads(response)
categories = []
used_ids = set()
for cat_data in result.get("categories", []):
cat_name = cat_data.get("name", "Autres")
subgroups = []
for sg_data in cat_data.get("subgroups", []):
sg_title = sg_data.get("title", "Divers")
item_ids = sg_data.get("item_ids", [])
# Filter to valid, unused IDs
valid_ids = [i for i in item_ids if i < len(items) and i not in used_ids]
if valid_ids:
used_ids.update(valid_ids)
subgroups.append({
"title": sg_title,
"items": [items[i] for i in valid_ids]
})
if subgroups:
categories.append({
"name": cat_name,
"subgroups": subgroups
})
# Add any ungrouped items
ungrouped = [items[i] for i in range(len(items)) if i not in used_ids]
if ungrouped:
# Find or create "Autres" category
autres_cat = next((c for c in categories if c["name"] == "Autres"), None)
if autres_cat:
autres_cat["subgroups"].append({"title": "Divers", "items": ungrouped})
else:
categories.append({
"name": "Autres",
"subgroups": [{"title": "Divers", "items": ungrouped}]
})
return categories
except json.JSONDecodeError:
LOG.warning("Failed to parse Mistral response for grouping")
# Fallback: return all items in a single category/subgroup
return [{
"name": "Actualités de la semaine",
"subgroups": [{"title": "Toutes les news", "items": items}]
}]
# ------------- Ghost Admin API client -------------
class GhostAdmin:
def __init__(self, admin_url: str, admin_key: str, accept_version: str = "v6.0"):
self.base = admin_url.rstrip("/") + "/"
self.key_id, self.key_secret_hex = admin_key.split(":")
self.accept_version = accept_version
def _jwt(self) -> str:
iat = int(time.time())
payload = {"iat": iat, "exp": iat + 5 * 60, "aud": "/admin/"}
headers = {"alg": "HS256", "typ": "JWT", "kid": self.key_id}
token = jwt.encode(payload, bytes.fromhex(self.key_secret_hex), algorithm="HS256", headers=headers)
return token if isinstance(token, str) else token.decode("utf-8")
def _headers(self):
return {
"Authorization": f"Ghost {self._jwt()}",
"Accept-Version": self.accept_version,
"Content-Type": "application/json",
}
def latest_published_date(self, tz_name: str = "Europe/Brussels"):
"""
Date (aware) de la dernière publication (status=published), ou None.
"""
url = self.base + "posts/?limit=1&order=published_at%20desc&fields=published_at"
resp = requests.get(url, headers=self._headers(), timeout=20)
resp.raise_for_status()
posts = resp.json().get("posts", [])
if not posts or not posts[0].get("published_at"):
return None
# ISO 8601 → aware UTC → converti tz locale
dtu = dt.datetime.fromisoformat(posts[0]["published_at"].replace("Z", "+00:00"))
return dtu.astimezone(zoneinfo.ZoneInfo(tz_name))
def get_newsletters(self):
url = self.base + "newsletters/"
resp = requests.get(url, headers=self._headers(), timeout=20)
if resp.status_code >= 400:
raise RuntimeError(f"Ghost newsletters error {resp.status_code}: {resp.text}")
return resp.json().get("newsletters", [])
def pick_newsletter_slug(self, preferred_slug: Optional[str]) -> str:
if preferred_slug:
return preferred_slug
nls = self.get_newsletters()
if not nls:
raise RuntimeError("No newsletters configured in Ghost (Settings → Newsletters).")
actives = [n for n in nls if n.get("status") == "active"]
for n in actives:
if n.get("is_default"):
return n.get("slug")
return (actives or nls)[0].get("slug")
def create_post_html(self, title: str, html_content: str, status: str = "draft", feature_image: Optional[str] = None):
url = self.base + "posts/?source=html"
post = {"title": title, "html": html_content, "status": status}
if feature_image:
post["feature_image"] = feature_image
resp = requests.post(url, headers=self._headers(), json={"posts": [post]}, timeout=30)
if resp.status_code >= 400:
raise RuntimeError(f"Ghost create error {resp.status_code}: {resp.text}")
return resp.json()["posts"][0]
def publish_post(self, post_id: str, updated_at: str, newsletter_slug: Optional[str], email_segment: Optional[str]):
slug = self.pick_newsletter_slug(newsletter_slug)
params = [f"newsletter={requests.utils.quote(slug)}"]
if email_segment:
params.append(f"email_segment={requests.utils.quote(email_segment)}")
url = self.base + f"posts/{post_id}/?{'&'.join(params)}"
body = {"posts": [{"updated_at": updated_at, "status": "published"}]}
resp = requests.put(url, headers=self._headers(), json=body, timeout=30)
if resp.status_code >= 400:
raise RuntimeError(f"Ghost publish error {resp.status_code}: {resp.text}")
return resp.json()["posts"][0]
# ------------- Task orchestration -------------
class GhostTask:
def __init__(self, feeds: List[RSSfeed], admin_url: str, admin_key: str,
mistral_api_key: Optional[str] = None,
newsletter_slug: Optional[str] = None, email_segment: Optional[str] = None,
dry_run: bool = False):
self.ghost = GhostAdmin(admin_url, admin_key)
self.feeds = feeds
self.newsletter_slug = newsletter_slug
self.email_segment = email_segment
self.mistral = MistralClient(mistral_api_key) if mistral_api_key else None
self.dry_run = dry_run
for feed in self.feeds:
LOG.info("Adding feed %s", feed.url)
# --- startup immediate run if not yet published this week
def _published_this_week(self) -> bool:
"""Check if we already published this week (since last Saturday 12:00)."""
tz = zoneinfo.ZoneInfo("Europe/Brussels")
last = self.ghost.latest_published_date("Europe/Brussels")
if not last:
return False
now = dt.datetime.now(tz)
# Find last Saturday at 12:00
days_since_saturday = (now.weekday() - 5) % 7 # Saturday = 5
last_saturday = (now - dt.timedelta(days=days_since_saturday)).replace(
hour=12, minute=0, second=0, microsecond=0
)
return last >= last_saturday
async def maybe_run_this_week(self):
if not self._published_this_week():
LOG.info("Aucune newsletter publiée cette semaine -> génération immédiate.")
await self.weekly_task()
else:
LOG.info("Déjà publié cette semaine, on attend la prochaine fenêtre.")
# --- utils
@staticmethod
def _fr_week_range() -> str:
"""Returns a French formatted date range for the past week."""
months = {
'January': 'Janvier', 'February': 'Février', 'March': 'Mars', 'April': 'Avril',
'May': 'Mai', 'June': 'Juin', 'July': 'Juillet', 'August': 'Août',
'September': 'Septembre', 'October': 'Octobre', 'November': 'Novembre', 'December': 'Décembre'
}
today = dt.datetime.now()
week_ago = today - dt.timedelta(days=7)
# Format: "24 - 31 Janvier 2025" or "28 Janvier - 4 Février 2025"
if week_ago.month == today.month:
formatted = f"{week_ago.day} - {today.strftime('%d %B %Y')}"
else:
formatted = f"{week_ago.strftime('%d %B')} - {today.strftime('%d %B %Y')}"
for en, fr in months.items():
formatted = formatted.replace(en, fr)
return formatted
@staticmethod
def _safe_get(url: str, timeout: int = 20) -> Optional[bytes]:
try:
r = requests.get(url, timeout=timeout, headers={"User-Agent": "ghost-bot/1.0"})
r.raise_for_status()
return r.content
except Exception as e:
LOG.warning("Flux indisponible: %s (%s)", url, e)
return None
@staticmethod
def _entry_datetime(entry) -> Optional[dt.datetime]:
"""
Tente de récupérer une datetime aware (UTC) pour un item feedparser.
"""
# Try common fields first
if getattr(entry, "published", None):
try:
# YouTube (ISO) e.g. 2025-09-05T10:20:33+00:00
return dt.datetime.fromisoformat(entry.published.replace("Z", "+00:00")).astimezone(dt.timezone.utc)
except Exception:
pass
try:
# RFC822 e.g. Fri, 05 Sep 2025 10:20:33 +0000
return dt.datetime.strptime(entry.published.replace('GMT', '+0000'),
'%a, %d %b %Y %H:%M:%S %z').astimezone(dt.timezone.utc)
except Exception:
pass
if getattr(entry, "updated_parsed", None):
try:
return dt.datetime.fromtimestamp(time.mktime(entry.updated_parsed), tz=dt.timezone.utc)
except Exception:
pass
return None
# --- HTML builder for grouped content
def _build_html_roundup_grouped(self, categories: List[Dict], feeds: List[RSSfeed]):
"""
Construit le HTML avec des catégories et sous-groupes thématiques.
Inclut un résumé et une table des matières en haut.
Retourne (html, feature_image_url_ou_None).
"""
parts: List[str] = []
first_image: Optional[str] = None
# --- Build Summary Section ---
parts.append('<h2>✨ En bref cette semaine</h2>')
parts.append('<ul>')
for cat in categories:
cat_name = cat.get("name", "Actualités")
subgroups = cat.get("subgroups", [])
# Get top subgroups with more than 1 item (by item count) for summary
multi_item_subgroups = [sg for sg in subgroups if len(sg.get("items", [])) > 1]
sorted_subgroups = sorted(multi_item_subgroups, key=lambda sg: len(sg.get("items", [])), reverse=True)
top_subgroups = sorted_subgroups[:5] # Max 5 highlights per category
total_items = sum(len(sg.get("items", [])) for sg in subgroups)
if top_subgroups:
highlights = ", ".join(sg.get("title", "Divers") for sg in top_subgroups)
total_items = sum(len(sg.get("items", [])) for sg in subgroups)
parts.append(f'<li><strong>{html.escape(cat_name)}</strong>: {html.escape(highlights)} ({total_items} articles)</li>')
elif total_items > 0:
parts.append(f'<li><strong>{html.escape(cat_name)}</strong>: {total_items} articles</li>')
parts.append('</ul>')
parts.append('<hr>')
# --- Build Table of Contents ---
# parts.append('<h2>📋 Sommaire</h2>')
# parts.append('<ul>')
# for cat in categories:
# cat_name = cat.get("name", "Actualités")
# cat_anchor = self._make_anchor(cat_name)
# subgroups = cat.get("subgroups", [])
# total_items = sum(len(sg.get("items", [])) for sg in subgroups)
# parts.append(f'<li><a href="#{cat_anchor}"><strong>{html.escape(cat_name)}</strong></a> ({total_items} articles)')
# if len(subgroups) > 1 or (len(subgroups) == 1 and len(subgroups[0].get("items", [])) > 1):
# parts.append('<ul>')
# for sg in subgroups:
# sg_title = sg.get("title", "Divers")
# sg_anchor = self._make_anchor(f"{cat_name}-{sg_title}")
# item_count = len(sg.get("items", []))
# parts.append(f'<li><a href="#{sg_anchor}">{html.escape(sg_title)}</a> ({item_count})</li>')
# parts.append('</ul>')
# parts.append('</li>')
# parts.append('</ul>')
# parts.append('<hr>')
# --- Build Content by Category ---
for cat in categories:
cat_name = cat.get("name", "Actualités")
cat_anchor = self._make_anchor(cat_name)
subgroups = cat.get("subgroups", [])
if not subgroups:
continue
# Category header with emoji
cat_emoji = {
"Actualités": "📰",
"Tests & Critiques": "",
"Aperçus & Previews": "👁️",
"Vidéos": "🎬",
"Autres": "📁"
}.get(cat_name, "📌")
parts.append(f'<h2 id="{cat_anchor}">{cat_emoji} {html.escape(cat_name)}</h2>')
for sg in subgroups:
sg_title = sg.get("title", "Divers")
sg_anchor = self._make_anchor(f"{cat_name}-{sg_title}")
items = sg.get("items", [])
if not items:
continue
# Sub-group header (only if more than 1 item in subgroup)
if len(items) > 1:
parts.append(f'<h3 id="{sg_anchor}">{html.escape(sg_title)}</h3>')
for post in items:
title = post.get("title", "") or ""
linkURL = post.get("link", "") or ""
parts.append(f'<h4>{html.escape(title)}</h4>')
# --- YouTube embed / fallback
vid = post.get("yt_videoid") or extract_youtube_id(linkURL)
if vid:
watch_url = f"https://www.youtube.com/watch?v={vid}"
# Try provider HTML via oEmbed (as Ghost does)
embed_html = fetch_youtube_oembed_html(watch_url, timeout=10)
if embed_html:
parts.append(embed_html)
else:
# Fallback: leave the plain URL on its own line so Ghost may still auto-embed
parts.append(f'\n<p>{watch_url}</p>\n')
# Minimal fallback link (non-intrusive for email/web)
parts.append(f'<p><a href="{watch_url}">Voir sur YouTube</a></p>')
else:
# --- Texte + lien
ftext = ""
if "summary" in post and post["summary"]:
ftext = html.unescape(post["summary"])
ftext = re.sub("<[^<]+?>", "", ftext)
ftext = re.sub(r"L'article .* est apparu en premier sur .*", "", ftext)
if ftext:
parts.append(f"<p>{html.escape(ftext)}</p>")
if linkURL:
esc = html.escape(linkURL)
parts.append(f'<p><a href="{esc}">{esc}</a></p>')
# --- Images: first try RSS metadata, then crawl the page
found_image = False
for link in post.get("links", []) or []:
if link.get("type") in ("image/jpg", "image/jpeg", "image/png", "image/webp"):
imgUrl = link.get("href")
if imgUrl:
imgUrl = imgUrl.replace("/250x250/", "/990x320/")
if not first_image:
first_image = imgUrl
parts.append(f'<figure><img src="{html.escape(imgUrl)}" loading="lazy"></figure>')
found_image = True
# If no image from RSS, try to extract from the article page
if not found_image and linkURL:
crawled_img = extract_image_from_url(linkURL, timeout=8)
if crawled_img:
if not first_image:
first_image = crawled_img
parts.append(f'<figure><img src="{html.escape(crawled_img)}" loading="lazy"></figure>')
parts.append('<hr>')
# --- Sources
parts.append("<h3>📚 Sources</h3>")
for feed in feeds:
esc = html.escape(feed.url)
parts.append(f'<p><a href="{esc}">{esc}</a></p>')
parts.append('<p><em>Abonnez-vous pour recevoir chaque semaine les news et soutenir mon travail.</em></p>')
return "\n".join(parts), first_image
@staticmethod
def _make_anchor(text: str) -> str:
"""Convert text to a valid HTML anchor ID."""
# Remove accents and special chars, lowercase, replace spaces with dashes
anchor = text.lower()
anchor = re.sub(r'[àáâãäå]', 'a', anchor)
anchor = re.sub(r'[èéêë]', 'e', anchor)
anchor = re.sub(r'[ìíîï]', 'i', anchor)
anchor = re.sub(r'[òóôõö]', 'o', anchor)
anchor = re.sub(r'[ùúûü]', 'u', anchor)
anchor = re.sub(r'[ýÿ]', 'y', anchor)
anchor = re.sub(r'[ç]', 'c', anchor)
anchor = re.sub(r'[^a-z0-9\s-]', '', anchor)
anchor = re.sub(r'\s+', '-', anchor.strip())
return anchor or "section"
@staticmethod
def _format_duration(seconds: float) -> str:
seconds = int(seconds)
days, seconds = divmod(seconds, 86400)
hours, seconds = divmod(seconds, 3600)
minutes, seconds = divmod(seconds, 60)
parts = []
if days: parts.append(f"{days} days")
if hours: parts.append(f"{hours} hours")
if minutes: parts.append(f"{minutes} minutes")
if seconds: parts.append(f"{seconds} seconds")
return ", ".join(parts) if parts else "0 seconds"
async def run_weekly_on_saturday(self):
"""Run every Saturday at 12:00 (noon)."""
while True:
now = dt.datetime.now()
# Calculate next Saturday at 12:00
days_until_saturday = (5 - now.weekday()) % 7 # Saturday = 5
if days_until_saturday == 0 and now.hour >= 12:
days_until_saturday = 7 # Already past Saturday 12:00, wait for next week
next_run = (now + dt.timedelta(days=days_until_saturday)).replace(
hour=12, minute=0, second=0, microsecond=0
)
sleep_seconds = (next_run - now).total_seconds()
while sleep_seconds > 0:
LOG.info("Waiting for %s for next scan (Saturday noon)", self._format_duration(sleep_seconds))
await asyncio.sleep(min(sleep_seconds, 5 * 60))
now = dt.datetime.now()
sleep_seconds = (next_run - now).total_seconds()
LOG.info("Going to run the weekly task")
await self.weekly_task()
async def weekly_task(self):
"""Main weekly task: collect, filter, group, and publish."""
# Log newsletters (debug)
try:
nls = self.ghost.get_newsletters()
LOG.info("Newsletters: %s", ", ".join(f"{n.get('name')}[{n.get('slug')}]" for n in nls))
except Exception as e:
LOG.warning("Unable to list newsletters: %s", e)
title_post = "Les news de la semaine du " + self._fr_week_range()
LOG.info("Running weekly task : %s", title_post)
# (Re)charge les feeds
feeds_file = os.environ.get("FEEDS_FILE", "/data/feeds.txt")
if not os.path.isfile(feeds_file):
feeds_file = os.environ.get("FEEDS_FILE_FALLBACK", r"f:\workspace\Substack_JV\feeds.txt")
feeds: List[RSSfeed] = []
with open(feeds_file, encoding="utf-8") as f:
lines = [line.strip() for line in f if line.strip()]
for line in lines:
feeds.append(RSSfeed(line, "youtube" in line.lower()))
self.feeds = feeds
# Fenêtre: depuis 7 jours à 06:00 UTC
week_ago_6am_utc = dt.datetime.now(dt.timezone.utc).replace(
hour=6, minute=0, second=0, microsecond=0
) - dt.timedelta(days=7)
all_news_posts: List[dict] = []
for feed in self.feeds:
LOG.info("Scanning feed %s", feed.url)
content = self._safe_get(feed.url, timeout=30)
if not content:
continue
fp = feedparser.parse(content)
# Sélection des items de la semaine
new_entries = []
for e in fp.entries:
dte = self._entry_datetime(e)
if dte and dte > week_ago_6am_utc:
new_entries.append(e)
# Basic URL-based filtering (keep existing logic)
filtered = []
for e in new_entries:
linkURL = e.get("link", "") or ""
if "actugaming" in linkURL and ("puzzle-" in linkURL or "guide-" in linkURL):
continue
# enrich YouTube id if applicable
if feed.youtube and linkURL:
vid = extract_youtube_id(linkURL)
if vid:
e["yt_videoid"] = vid
filtered.append(e)
all_news_posts.extend(filtered)
if not all_news_posts:
LOG.warning("Aucun item récupéré (flux down ?). On n'envoie pas cette semaine.")
return
LOG.info("Collected %d items from feeds", len(all_news_posts))
# Use Mistral AI for filtering and grouping if available
if self.mistral:
LOG.info("Using Mistral AI to filter non-news content...")
filtered_posts = self.mistral.filter_news_items(all_news_posts, dry_run=self.dry_run)
LOG.info("After filtering: %d items (removed %d)",
len(filtered_posts), len(all_news_posts) - len(filtered_posts))
if filtered_posts:
LOG.info("Using Mistral AI to group items by category...")
categories = self.mistral.group_similar_items(filtered_posts)
total_cats = len(categories)
total_subgroups = sum(len(cat.get("subgroups", [])) for cat in categories)
LOG.info("Created %d categories with %d sub-groups", total_cats, total_subgroups)
else:
categories = []
else:
LOG.warning("No Mistral API key configured, skipping AI filtering/grouping")
# Fallback: single category with all items
categories = [{
"name": "Actualités de la semaine",
"subgroups": [{"title": "Toutes les news", "items": all_news_posts}]
}]
if not categories or all(
len(sg.get("items", [])) == 0
for cat in categories
for sg in cat.get("subgroups", [])
):
LOG.warning("No news items after filtering. Skipping this week.")
return
roundup_html, feature_image = self._build_html_roundup_grouped(categories, self.feeds)
# 1) Create draft (with feature image if any)
created = self.ghost.create_post_html(title_post, roundup_html, status="draft", feature_image=feature_image)
LOG.info("Created draft post: %s (id: %s)", created.get("title"), created.get("id"))
# 2) Publish + send email (unless dry-run mode)
if self.dry_run:
LOG.info("DRY-RUN MODE: Post created as draft but NOT published. URL: %s",
created.get("url", "N/A"))
LOG.info("DRY-RUN MODE: Review the draft in Ghost admin, then publish manually if satisfied.")
return
published = self.ghost.publish_post(
post_id=created["id"],
updated_at=created["updated_at"],
newsletter_slug=os.environ.get("GHOST_NEWSLETTER_SLUG"),
email_segment=os.environ.get("GHOST_EMAIL_SEGMENT"),
)
LOG.info("Published post: %s (emailed via newsletter)", published.get("url"))
# ------------- main -------------
async def main():
setuplogger()
parser = argparse.ArgumentParser()
parser.add_argument("--runonce", action="store_true", help="Run now and exit (no scheduler)")
parser.add_argument("--dry-run", action="store_true", dest="dry_run",
help="Run immediately, create draft but do NOT publish (for testing)")
args = parser.parse_args()
# Feeds init (list may be reloaded inside task)
feeds: List[RSSfeed] = []
feeds_file = os.environ.get("FEEDS_FILE", "/data/feeds.txt")
if not os.path.isfile(feeds_file):
feeds_file = os.environ.get("FEEDS_FILE_FALLBACK", r"f:\workspace\Substack_JV\feeds.txt")
with open(feeds_file, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
feeds.append(RSSfeed(line, "youtube" in line.lower()))
admin_url = os.environ["GHOST_ADMIN_URL"] # e.g. https://ghostadmin.zep.best/ghost/api/admin/
admin_key = os.environ["GHOST_ADMIN_KEY"] # integration_id:secret_hex
mistral_api_key = os.environ.get("MISTRAL_API_KEY") # Optional: for AI filtering/grouping
if not mistral_api_key:
LOG.warning("MISTRAL_API_KEY not set. AI filtering and grouping will be disabled.")
task = GhostTask(
feeds=feeds,
admin_url=admin_url,
admin_key=admin_key,
mistral_api_key=mistral_api_key,
newsletter_slug=os.environ.get("GHOST_NEWSLETTER_SLUG"),
email_segment=os.environ.get("GHOST_EMAIL_SEGMENT"),
dry_run=args.dry_run,
)
LOG.info("Starting bot (weekly mode%s)", " - DRY RUN" if args.dry_run else "")
if args.runonce:
await task.weekly_task()
return
if args.dry_run:
LOG.info("DRY-RUN: Running weekly task immediately (will create draft only)")
await task.weekly_task()
return
# Démarrage: publier l'édition de la semaine si elle n'existe pas encore
await task.maybe_run_this_week()
# Planification hebdomadaire le samedi à 12:00 Europe/Brussels
await task.run_weekly_on_saturday()
if __name__ == "__main__":
asyncio.run(main())

File diff suppressed because it is too large Load Diff

View File

@@ -1,7 +1,3 @@
requests
feedparser feedparser
PyJWT>=2.7,<3 python-substack
requests>=2.31
feedparser>=6.0
aiohttp
bs4
playwright

View File

@@ -1,49 +0,0 @@
# storage.py
from __future__ import annotations
import sqlite3, pathlib, datetime as dt
from typing import Optional, Iterable, Tuple
import os
DB_PATH = "/data/published.db" # bind-mount ./data:/data in docker
_SCHEMA = """
PRAGMA journal_mode = WAL;
CREATE TABLE IF NOT EXISTS published_items(
platform TEXT NOT NULL, -- e.g. xgp | egs | psplus
key TEXT PRIMARY KEY, -- your dedupe key (see below)
first_seen_utc TEXT NOT NULL, -- ISO-8601
last_post_id TEXT -- Ghost post id that recorded it
);
CREATE INDEX IF NOT EXISTS idx_platform ON published_items(platform);
"""
class Storage:
def __init__(self, db_path: str = DB_PATH):
pathlib.Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self.conn = sqlite3.connect(db_path)
self.conn.execute("PRAGMA foreign_keys = ON;")
for stmt in filter(None, _SCHEMA.split(";")):
if stmt.strip():
self.conn.execute(stmt)
def seen(self, key: str) -> bool:
cur = self.conn.execute("SELECT 1 FROM published_items WHERE key=?", (key,))
return cur.fetchone() is not None
def remember(self, platform: str, key: str, post_id: Optional[str]):
self.conn.execute(
"INSERT OR IGNORE INTO published_items(platform,key,first_seen_utc,last_post_id) VALUES(?,?,?,?)",
(platform, key, dt.datetime.utcnow().isoformat(), post_id),
)
if post_id:
self.conn.execute("UPDATE published_items SET last_post_id=? WHERE key=?", (post_id, key))
self.conn.commit()
def bulk_remember(self, platform: str, pairs: Iterable[Tuple[str, Optional[str]]]):
rows = [(platform, k, dt.datetime.utcnow().isoformat(), pid) for (k, pid) in pairs]
self.conn.executemany(
"INSERT OR IGNORE INTO published_items(platform,key,first_seen_utc,last_post_id) VALUES(?,?,?,?)",
rows
)
self.conn.commit()

View File

@@ -1,58 +1,7 @@
#!/bin/sh #!/bin/bash
set -eu
log() { printf '%s %s\n' "[$(date -u +%FT%TZ)]" "$*"; } # Pull the latest changes
git pull origin main
stop() { # Run your Python script
log "stopping..." python Post_RSS_on_SubStack.py
[ -n "${PID1-}" ] && kill -TERM "$PID1" 2>/dev/null || true
[ -n "${PID2-}" ] && kill -TERM "$PID2" 2>/dev/null || true
[ -n "${TPID-}" ] && kill -TERM "$TPID" 2>/dev/null || true
wait || true
exit 0
}
trap stop INT TERM
cd /app
export GIT_TERMINAL_PROMPT=0
# MAJ forcée du code à chaque (re)démarrage
if [ -d .git ]; then
i=0
while [ $i -lt 5 ]; do
if git fetch --all --prune && git reset --hard origin/main; then
log "git updated to origin/main"
break
fi
i=$((i+1))
log "git update failed (attempt $i/5); retrying in 10s..."
sleep 10
done
[ $i -ge 5 ] && log "WARNING: git update failed after 5 attempts — continuing with current code"
else
log "WARNING: /app is not a git repo; skipping git update"
fi
# Dossiers logs
mkdir -p /var/log
: > /var/log/daily.log
: > /var/log/weekly.log
# Lancer les 2 bots (logs non bufferisés)
python -u post_rss_to_ghost.py > /var/log/daily.log 2>&1 & PID1=$!
python -u presquegratos.py > /var/log/weekly.log 2>&1 & PID2=$!
# Suivre les 2 fichiers de logs dans la sortie du conteneur
tail -F /var/log/daily.log /var/log/weekly.log &
TPID=$!
# Attente portable (pas de wait -n en /bin/sh)
while :; do
if ! kill -0 "$PID1" 2>/dev/null; then wait "$PID1" || true; break; fi
if ! kill -0 "$PID2" 2>/dev/null; then wait "$PID2" || true; break; fi
sleep 1
done
# Si un des scripts sort, on arrête le tail (le trap TERM arrêtera l'autre script)
kill -TERM "$TPID" 2>/dev/null || true
wait || true

View File

@@ -1,23 +0,0 @@
import feedparser
import io
import html
import datetime
import requests
import time
url = r'https://www.xboxygen.com/spip.php?page=backend'
html_text = requests.get(url).text
news = feedparser.parse(html_text)
yesterday_6am = datetime.datetime.now(datetime.timezone.utc).replace(hour=6, minute=0, second=0, microsecond=0) - datetime.timedelta(days=1)
try:
new_posts = [entry for entry in news.entries if datetime.datetime.strptime(entry.published.replace('GMT', '+0000'), '%a, %d %b %Y %H:%M:%S %z') > yesterday_6am]
except:
new_posts = [entry for entry in news.entries if datetime.datetime.fromtimestamp(time.mktime(entry.updated_parsed)).replace(tzinfo=datetime.timezone.utc) > yesterday_6am]
#else if
#entry.updated.replace('GMT', '+0000'), '%a, %d %b %Y %H:%M:%S %z'
print(new_posts)