Compare commits

..

34 Commits

Author SHA1 Message Date
Gaël
85d79db3fd new newsletter format 2026-01-31 19:21:24 +01:00
Gaël
e15d53339f fixing the loop 2025-11-24 08:50:58 +01:00
Gaël
80d7c45cfb also publish 2025-11-17 13:23:48 +01:00
Gaël
85b4ea9e24 changing scheduler 2025-11-17 13:20:37 +01:00
Gaël
91add44592 updating python 2025-10-05 19:54:32 +02:00
Gaël
898ada327f adding prime gaming 2025-10-05 19:52:50 +02:00
Gaël
c66935bcb6 wrong path 2025-09-29 09:41:15 +02:00
Gaël
1db321b7d4 remember to save! 2025-09-29 09:39:41 +02:00
Gaël
d08a598fd7 adding caching to presque gratos 2025-09-29 09:36:47 +02:00
Gaël
595c11eeb5 no title 2025-09-17 10:48:16 +02:00
Gaël
8074e7df84 python 3.9 2025-09-10 09:45:18 +02:00
Gaël
b6102d0b4f updating youtube 2025-09-10 09:38:27 +02:00
Gaël
39a4ed88de update logger 2025-09-08 19:01:59 +02:00
Gaël
542aea6602 allez on essaie ca 2025-09-08 18:53:34 +02:00
Gaël
a1e462eab9 bs4 2025-09-08 18:47:48 +02:00
Gaël
a727d5f66a adding dependency 2025-09-08 18:45:57 +02:00
Gaël
ae9c1517d9 update url 2025-09-08 18:39:50 +02:00
Gaël
6cf99fe2e1 again... 2025-09-08 18:34:12 +02:00
Gaël
7b1ee4a0ab test 2025-09-08 18:31:28 +02:00
Gaël
fae2482067 update docker 2025-09-08 18:28:03 +02:00
Gaël
5f57eab24a update docker 2025-09-08 18:23:36 +02:00
Gaël
67f85515a9 bad arg 2025-09-07 16:13:28 +02:00
Gaël
a0f988ed32 fixing yt + crashes + failsafe 2025-09-07 16:07:03 +02:00
Gaël Honorez
74d61522a4 prod 2025-09-05 15:57:53 +02:00
Gaël Honorez
8fbdf0eff6 testing 2025-09-05 15:03:16 +02:00
Gaël Honorez
34d91a3677 using ghost 2025-09-05 14:36:28 +02:00
zep
83869b0663 Update Dockerfile 2025-03-06 12:25:30 +01:00
Gaël
e3d51d6c03 oupsie 2024-10-11 16:38:16 +02:00
Gaël
08442b88e6 removed guides/puzzle from actugaming 2024-10-09 08:56:38 +02:00
Gaël
967527835b fixing xboxsyde feed. 2024-07-30 08:59:14 +02:00
Gaël
8b1a331eb4 up docker 2024-07-03 19:06:56 +02:00
Gaël
ef04c73c31 remove virtual display 2024-07-03 19:02:56 +02:00
Gaël
c78078ce35 fixing pathing 2024-07-03 18:46:00 +02:00
Gaël
668843d8e8 fixing email check 2024-07-03 18:44:36 +02:00
17 changed files with 2303 additions and 1414 deletions

View File

@@ -1,13 +1,19 @@
FROM python:3.8 FROM python:3.12
RUN apt-get update && apt-get install -y git RUN apt-get update && apt-get install -y git
RUN git clone http://192.168.1.25:8124/zep/Substack_JV.git /app RUN git clone https://gitea.zep.best/zep/Substack_JV.git /app
WORKDIR /app WORKDIR /app
RUN pip install --upgrade pip
COPY requirements.txt .
RUN pip install -r requirements.txt RUN pip install -r requirements.txt
ENV TZ=Europe/Brussels ENV TZ=Europe/Brussels
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
RUN playwright install --with-deps chromium
COPY update_and_run.sh /app COPY update_and_run.sh /app
RUN chmod +x /app/update_and_run.sh # Normalize line endings (Windows CRLF -> LF) and ensure readable
CMD ["./update_and_run.sh"] RUN sed -i 's/\r$//' /app/update_and_run.sh && chmod a+r /app/update_and_run.sh
ENTRYPOINT ["sh", "-c", "./update_and_run.sh"]
# Single entrypoint: run via sh (no exec bit required, survives noexec mounts)
ENTRYPOINT ["sh", "/app/update_and_run.sh"]

View File

@@ -1,265 +0,0 @@
import asyncio
import argparse
import requests
import feedparser
import io
import html
import datetime
import logging
import os
import re
from logging.handlers import RotatingFileHandler
import random
import pyvirtualdisplay
from substack import Api
from substack.post import Post
LOG = logging.getLogger('bot')
LOG_PATTERN = logging.Formatter('%(asctime)s:%(levelname)s: [%(filename)s] %(message)s')
def setuplogger():
conf_filename = None
steam_handler = logging.StreamHandler()
steam_handler.setFormatter(LOG_PATTERN)
steam_handler.setLevel(logging.DEBUG)
def setup_logger(logger_name, file_name=None, add_steam=False):
file_name = file_name or logger_name
log_filename = f"{file_name}.log"
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
file_handler = RotatingFileHandler(log_filename, "a", 1000000, 1)
file_handler.setFormatter(LOG_PATTERN)
logger.addHandler(file_handler)
if add_steam:
logger.addHandler(steam_handler)
setup_logger("bot", conf_filename, True)
class RSSfeed():
def __init__(self, url, yt=False):
self.url = url
self.youtube = yt
class SubStackTask:
def __init__(self, login, password, cookies_path, account, feeds):
self.api = Api(
email=login,
password=password,
cookies_path=cookies_path,
publication_url=account,
)
self.user_id = self.api.get_user_id()
self.feeds = feeds
for feed in self.feeds:
LOG.info("Adding feed " + feed.url)
def format_duration(self, seconds):
days, seconds = divmod(seconds, 86400)
hours, seconds = divmod(seconds, 3600)
minutes, seconds = divmod(seconds, 60)
days = round(days)
hours = round(hours)
minutes = round(minutes)
seconds = round(seconds)
parts = []
if days > 0:
parts.append(f"{days} days")
if hours > 0:
parts.append(f"{hours} hours")
if minutes > 0:
parts.append(f"{minutes} minutes")
if seconds > 0:
parts.append(f"{seconds} seconds")
return ', '.join(parts) if parts else '0 seconds'
def get_fr_date(self):
# Mapping of English month names to French
months_en_to_fr = {
'January': 'Janvier', 'February': 'Février', 'March': 'Mars',
'April': 'Avril', 'May': 'Mai', 'June': 'Juin',
'July': 'Juillet', 'August': 'Août', 'September': 'Septembre',
'October': 'Octobre', 'November': 'Novembre', 'December': 'Décembre'
}
today = datetime.datetime.now()
formatted_date = today.strftime("%d %B %Y")
# Replace the English month with the French month
for en, fr in months_en_to_fr.items():
formatted_date = formatted_date.replace(en, fr)
return formatted_date
async def run_daily_at_6_am(self):
while True:
now = datetime.datetime.now()
# Calculate the time until 6 AM next day
next_run = (now + datetime.timedelta(days=1)).replace(hour=6, minute=5, second=0, microsecond=0)
sleep_seconds = (next_run - now).total_seconds()
while sleep_seconds > 0:
# Check if the remaining time is a multiple of 3600 seconds
formatted_duration = self.format_duration(sleep_seconds)
LOG.info(f"Waiting for {formatted_duration} for next scan")
# Wait for some time before checking again
await asyncio.sleep(min(sleep_seconds, 5 * 60))
# Recalculate the remaining sleep time
now = datetime.datetime.now()
sleep_seconds = (next_run - now).total_seconds()
LOG.info("Going to run the daily task")
# Run the daily task
await self.daily_task()
async def daily_task(self):
title_post = "Les news du " + self.get_fr_date()
LOG.info("Running daily task : " + str(title_post))
ff = r'/data/feeds.txt'
if os.path.isfile(ff) is False:
ff = r'x:\substack\feeds.txt'
self.feeds = []
with open(ff) as file:
lines = [line.rstrip() for line in file]
for line in lines:
youtube = "youtube" in line
self.feeds.append(RSSfeed(line, youtube))
sub_stack_post = Post(
title=title_post,
subtitle="",
user_id=self.user_id
)
midnight_today = datetime.datetime.now(datetime.timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
yesterday_6am = datetime.datetime.now(datetime.timezone.utc).replace(hour=6, minute=0, second=0, microsecond=0) - datetime.timedelta(days=1)
formatted_date = midnight_today.strftime('%a, %d %b %Y %H:%M:%S %z')
all_news_posts = []
for feed in self.feeds:
LOG.info("Scanning feed " + feed.url)
html_text = requests.get(feed.url).text
newsFeed = feedparser.parse(html_text)
if feed.youtube is True:
new_posts = [entry for entry in newsFeed.entries if datetime.datetime.fromisoformat(entry.published) > yesterday_6am]
else:
new_posts = [entry for entry in newsFeed.entries if datetime.datetime.strptime(entry.published.replace('GMT', '+0000'), '%a, %d %b %Y %H:%M:%S %z') > yesterday_6am]
all_news_posts.extend(new_posts)
random.shuffle(all_news_posts)
for post in all_news_posts:
linkURL = post["link"]
title = post["title"]
ftext = ""
LOG.info("Posting " + str(title))
if "summary" in post:
ftext = html.unescape(post["summary"])
# Using regular expressions to remove HTML tags
ftext = re.sub('<[^<]+?>', '', ftext)
pattern = r"Larticle .* est apparu en premier sur .*"
ftext = re.sub(pattern, '', ftext)
if "yt_videoid" in post:
sub_stack_post.add({"type":"heading", "level":3, "content": title})
videoId = post["yt_videoid"]
sub_stack_post.add({"type":"youtube2", "src": videoId })
sub_stack_post.add({'type': 'paragraph', 'content': [
{'content': linkURL, 'marks': [{'type': "link", 'href': linkURL}]}]})
else:
if ftext != "":
sub_stack_post.add({"type":"heading", "level":3, "content": title})
sub_stack_post.add({"type":"paragraph", "content": ftext })
sub_stack_post.add({'type': 'paragraph', 'content': [
{'content': linkURL, 'marks': [{'type': "link", 'href': linkURL}]}]})
if "links" in post:
for link in post["links"]:
if link["type"] == "image/jpg":
imgUrl = link["href"]
sub_stack_post.add({'type': 'captionedImage', 'src': imgUrl})
sub_stack_post.add({"type":"horizontal_rule"})
sub_stack_post.add({"type":"heading", "level":3, "content": "Sources"})
for feed in self.feeds:
sub_stack_post.add({'type': 'paragraph', 'content': [
{'content': feed.url, 'marks': [{'type': "link", 'href': feed.url}]}]})
sub_stack_post.add({"type":"subscribeWidget", "message":"Abonnez-vous gratuitement pour recevoir chaque jour les news dans votre e-mail et soutenir mon travail."})
draft = self.api.post_draft(sub_stack_post.get_draft())
self.api.prepublish_draft(draft.get("id"))
self.api.publish_draft(draft.get("id"))
async def main(login, password, account):
setuplogger()
if os.path.exists("last_scan_date.txt"):
with open("last_scan_date.txt", "r") as f:
last_post_date = datetime.datetime.strptime(f.read().strip(), '%a, %d %b %Y %H:%M:%S %z')
else:
last_post_date = datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
feeds = []
ff = r'/data/feeds.txt'
if os.path.isfile(ff) is False:
ff = r'feeds.txt'
cookies_path = r'/data/cookies.json'
if os.path.isfile(cookies_path) is False:
cookies_path = r'cookies.json'
with open(ff) as file:
lines = [line.rstrip() for line in file]
for line in lines:
youtube = "youtube" in line
feeds.append(RSSfeed(line, youtube))
task = SubStackTask(login, password, cookies_path, account, feeds)
LOG.info("Starting bot")
await task.run_daily_at_6_am()
#await task.daily_task()
if __name__ == "__main__":
asyncio.run(main("gael.honorez@gmail.com", "f3PaTGedjFc2gkr1ypi5", "https://aggregateurjvfr.substack.com"))

162
backfill.py Normal file
View File

@@ -0,0 +1,162 @@
# backfill_from_ghost.py
from __future__ import annotations
import os, re, sys, html
from typing import Dict, List, Optional
import requests
from bs4 import BeautifulSoup
# Reuse your existing GhostAdmin client (same headers/base/proxy behavior)
# Adjust the import path if your Ghost client lives elsewhere.
from presquegratos import GhostAdmin
from storage import Storage
from keys import xgp_key, egs_key, psplus_key
# ---------------- Ghost helpers (reusing your admin client) ----------------
def ghost_list_posts(ghost: GhostAdmin, page: int = 1) -> Dict:
# Minimal params: avoid 'filter' and 'fields' to dodge 400 behind __bot proxy
url = ghost.base + "posts/"
params = {
"limit": "50",
"page": str(page),
"order": "published_at DESC",
"formats": "lexical,html", # <-- IMPORTANT
}
r = requests.get(url, headers=ghost._headers(), params=params, timeout=30)
r.raise_for_status()
return r.json()
def list_recap_posts(ghost: GhostAdmin, hard_limit: int = 2000) -> List[Dict]:
posts: List[Dict] = []
page = 1
while True:
data = ghost_list_posts(ghost, page=page)
batch = data.get("posts", [])
if not batch:
break
# client-side filter to be robust to proxy quirks
for p in batch:
title = (p.get("title") or "").strip()
if title.startswith("Récap hebdo"):
posts.append(p)
if len(batch) < 50 or len(posts) >= hard_limit:
break
page += 1
return posts
# ---------------- Parsing helpers (unchanged) ----------------
#MS_STORE_RE = re.compile(r"(?:microsoft|xbox)\.com/.*/store/.*/([0-9A-Z]{12,})", re.I)
MS_STORE_RE = re.compile(r"(?:xbox|microsoft)\.com/.*/store/.*/([0-9A-Z]{12,16})", re.I)
EPIC_RE = re.compile(r"epicgames\.com/store/.*/p/([\w\-]+)", re.I)
PSBLOG_RE = re.compile(r"blog\.playstation\.com/.*", re.I)
def clean_text(s: str) -> str:
return re.sub(r"\s+", " ", html.unescape(s or "")).strip()
def extract_sections(soup: BeautifulSoup) -> Dict[str, BeautifulSoup]:
sections: Dict[str, BeautifulSoup] = {}
current = None
current_key = None
for node in soup.find_all(["h2","h3","h4","p","ul","ol","div","section"]):
if node.name in ("h2","h3","h4"):
title = clean_text(node.get_text())
key = None
tl = title.lower()
if "game pass" in tl:
key = "xgp"
elif "egs" in tl or "epic" in tl:
key = "egs"
elif "ps plus" in tl or "ps+" in tl:
key = "psplus"
if key:
current_key = key
current = sections[key] = soup.new_tag("div")
continue
if current_key and current is not None:
current.append(node)
return sections
def parse_xgp(section: BeautifulSoup) -> List[Dict]:
items = []
for a in section.find_all("a", href=True):
href = a["href"]
m = MS_STORE_RE.search(href)
title = clean_text(a.get_text())
if m or title:
productId = m.group(1) if m else None
items.append({"title": title, "productId": productId})
uniq, seen = [], set()
for it in items:
k = xgp_key(it)
if k not in seen:
uniq.append(it); seen.add(k)
return uniq
def parse_egs(section: BeautifulSoup) -> List[Dict]:
items = []
for a in section.find_all("a", href=True):
if not EPIC_RE.search(a["href"]):
continue
title = clean_text(a.get_text()) or clean_text(a.get("title"))
items.append({"title": title, "start": ""})
uniq, seen = [], set()
for it in items:
k = egs_key(it)
if k not in seen:
uniq.append(it); seen.add(k)
return uniq
def parse_psplus(section: BeautifulSoup, post_title: str) -> Optional[Dict]:
a = section.find("a", href=PSBLOG_RE)
url = a["href"] if a else ""
m = re.search(r"(\d{2})-(\d{2})-(\d{4})", post_title)
iso = ""
if m:
d, mth, y = m.group(1), m.group(2), m.group(3)
iso = f"{y}-{mth}-{d}"
return {"url": url, "date": iso}
# ---------------- Main backfill ----------------
def backfill():
# Use the same env your main script uses; GhostAdmin will read them internally or
# you can pass them explicitly if your class expects (base_url, admin_key).
ghost = GhostAdmin(
admin_url=os.environ.get("GHOST_ADMIN_URL", "").rstrip("/") + "/",
admin_key=os.environ.get("GHOST_ADMIN_KEY", "")
)
store = Storage()
posts = list_recap_posts(ghost)
print(f"Found {len(posts)} recap posts.")
total_xgp = total_egs = total_ps = 0
dedup = []
for p in posts:
pid = p["id"]
title = p.get("title") or ""
html_body = p.get("html") or ""
soup = BeautifulSoup(html_body, "html.parser")
sections = extract_sections(soup)
for it in parse_xgp(sections.get("xgp", BeautifulSoup("", "html.parser"))):
key = xgp_key(it)
if not key in dedup:
store.remember("xgp", key, pid); total_xgp += 1
dedup.append(key)
# for it in parse_egs(sections.get("egs", BeautifulSoup("", "html.parser"))):
# store.remember("egs", egs_key(it), pid); total_egs += 1
# if "psplus" in sections:
# item = parse_psplus(sections["psplus"], title)
# store.remember("psplus", psplus_key(item), pid); total_ps += 1
print(f"Backfilled from: {title}")
print(f"Done. Inserted ~ XGP:{total_xgp} | EGS:{total_egs} | PS+:{total_ps}")
if __name__ == "__main__":
backfill()

1
cookies.json Normal file
View File

@@ -0,0 +1 @@
{"__cf_bm": "95up0icsYyESvD6suTUFG05xaWxwEr5_xuHUOv32G9I-1720025055-1.0.1.1-NlvsLW9j26FX8aPpLmVETEJ0zd.VyXefLr75kvT6iC.zHnPtkbIWgfesI0VaUGuvwV62qHpctJEoahLR9TIuHQ", "ab_experiment_sampled": "%22false%22", "ab_testing_id": "%22a6e7ba67-7dc0-452c-a935-d2f2bddd5edf%22", "ajs_anonymous_id": "%22e4535e95-1c5b-4173-82db-47807c57fb38%22", "cookie_storage_key": "f666a42c-49e8-47a2-bdbc-6eece0d6a06e", "substack.sid": "s%3ARLYSI2_XaTlGuYIpTYWjS8ib48PpuE0S.jNwCzcGzKUvUAuFdLNdfgxwewTUawIoDDZ05moubvzM", "visit_id": "%7B%22id%22%3A%22a0d46be8-56f4-406f-b1d7-14c41369b737%22%2C%22timestamp%22%3A%222024-07-03T16%3A44%3A13.349Z%22%7D", "AWSALBTG": "yw2xMbYVFbKWSzJiQsdCKp7mMH+wQ5T4/JIUc1TvywUi5iIJVXuO21AMhb+oPgegicdtpekLTDTl+zWKEekRsurS7+20skhmPxZXJf/Tl7jBd/PecbW7qa3DHkPvQtWz+SWD8+7P1rNjmY9lmyZgzH/ZeGgeiishRz9gsGO0OT/d", "AWSALBTGCORS": "yw2xMbYVFbKWSzJiQsdCKp7mMH+wQ5T4/JIUc1TvywUi5iIJVXuO21AMhb+oPgegicdtpekLTDTl+zWKEekRsurS7+20skhmPxZXJf/Tl7jBd/PecbW7qa3DHkPvQtWz+SWD8+7P1rNjmY9lmyZgzH/ZeGgeiishRz9gsGO0OT/d"}

7
env.bat Normal file
View File

@@ -0,0 +1,7 @@
set GHOST_ADMIN_KEY=68bad0e13546e700012dd65d:116a81b7e189d3b3d3b86082f97ef65daedb06498a3f1f902b8e0c08d095dc19
set GHOST_ADMIN_URL=https://ghostadmin.zep.best/ghost/api/admin/__bot/FF4440EBA737506D397C170A8422109C357AA7582F10938B7C5F11D6B652F5D4
set GHOST_EMAIL_SEGMENT=status:free
set GHOST_NEWSLETTER_SLUG=default-newsletter
set GHOST_CONTENT_URL=https://ghost.zep.best
set DB_FILE_FALLBACK=f:\workspace\Substack_JV\data\published.db
set MISTRAL_API_KEY=tQJHvYlmwz1ihKxOhXS3FmDNTRhBh6b3

View File

@@ -1,53 +0,0 @@
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import pickle
def save_cookies(driver, path):
with open(path, 'wb') as file:
pickle.dump(driver.get_cookies(), file)
def generate_cookies(email, password):
cookie_path = r'x:\substack\cookies.pkl'
chrome_options = Options()
driver = webdriver.Chrome(options=chrome_options)
driver.get('https://substack.com/sign-in')
wait = WebDriverWait(driver, 10)
try:
login_with_password_button = wait.until(
EC.element_to_be_clickable((By.LINK_TEXT, 'Sign in with password'))
)
login_with_password_button.click()
time.sleep(2)
email_field = driver.find_element(By.NAME, 'email')
email_field.send_keys(email)
password_field = driver.find_element(By.NAME, 'password')
password_field.send_keys(password)
password_field.send_keys(Keys.RETURN)
save_cookies(driver, cookie_path)
print("Cookies saved successfully.")
except Exception as e:
print("An error occurred during login.", e)
driver.quit()
if __name__ == "__main__":
email = "gael.honorez@gmail.com"
password = "f3PaTGedjFc2gkr1ypi5"
generate_cookies(email, password)

20
keys.py Normal file
View File

@@ -0,0 +1,20 @@
# keys.py (or inline in your main)
def xgp_key(item) -> str:
# Prefer stable Microsoft Store productId if present; fallback to normalized title.
pid = (item.get("productId") or "").strip()
if pid:
return f"item:xgp:{pid}"
title = (item.get("title") or "").strip().lower()
return f"item:xgp:title:{title}"
def egs_key(item) -> str:
# Use title + start window (your fetcher usually knows the free-week start)
title = (item.get("title") or "").strip()
start = (item.get("start") or "").strip() # ISO or YYYY-MM-DD
return f"item:egs:{title}|{start}"
def psplus_key(item) -> str:
# Use official PS Blog URL + the published month (or your computed date)
url = (item.get("url") or "").strip()
date = (item.get("date") or "").strip()
return f"item:psplus:{url}|{date}"

928
post_rss_to_ghost.py Normal file
View File

@@ -0,0 +1,928 @@
import asyncio
import argparse
import datetime as dt
import html
import json
import logging
import os
import re
import time
from logging.handlers import RotatingFileHandler
from typing import Optional, List, Dict
import feedparser
import requests
import jwt
import zoneinfo # Python 3.9+
from urllib.parse import urlparse, parse_qs, urljoin
# ------------- Web Crawler for Images -------------
def extract_image_from_url(url: str, timeout: int = 10) -> Optional[str]:
"""
Fetch a webpage and extract the best image (og:image, twitter:image, or first large image).
Returns the image URL or None.
"""
try:
resp = requests.get(
url,
timeout=timeout,
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml",
},
allow_redirects=True,
)
resp.raise_for_status()
html_content = resp.text
# Try OpenGraph image first (most reliable)
og_match = re.search(r'<meta[^>]+property=["\']og:image["\'][^>]+content=["\']([^"\']+)["\']', html_content, re.IGNORECASE)
if not og_match:
og_match = re.search(r'<meta[^>]+content=["\']([^"\']+)["\'][^>]+property=["\']og:image["\']', html_content, re.IGNORECASE)
if og_match:
img_url = og_match.group(1)
return urljoin(url, img_url)
# Try Twitter card image
tw_match = re.search(r'<meta[^>]+name=["\']twitter:image["\'][^>]+content=["\']([^"\']+)["\']', html_content, re.IGNORECASE)
if not tw_match:
tw_match = re.search(r'<meta[^>]+content=["\']([^"\']+)["\'][^>]+name=["\']twitter:image["\']', html_content, re.IGNORECASE)
if tw_match:
img_url = tw_match.group(1)
return urljoin(url, img_url)
# Fallback: look for article/main image
article_img = re.search(r'<article[^>]*>.*?<img[^>]+src=["\']([^"\']+)["\']', html_content, re.IGNORECASE | re.DOTALL)
if article_img:
img_url = article_img.group(1)
# Skip tiny images, icons, avatars
if not any(skip in img_url.lower() for skip in ['avatar', 'icon', 'logo', 'emoji', '1x1', 'pixel']):
return urljoin(url, img_url)
return None
except Exception as e:
LOG.debug("Failed to extract image from %s: %s", url, e)
return None
# ------------- YouTube helpers -------------
def fetch_youtube_oembed_html(youtube_url: str, timeout: int = 10) -> Optional[str]:
"""
Get YouTube oEmbed HTML exactly as provided and wrap it as a Ghost embed card.
"""
try:
resp = requests.get(
"https://www.youtube.com/oembed",
params={"url": youtube_url, "format": "json"},
headers={"User-Agent": "ghost-bot/1.0"},
timeout=timeout,
)
resp.raise_for_status()
data = resp.json()
html_content = data.get("html")
if not html_content:
return None
# Wrap in Ghost embed card container; do NOT alter the iframe attributes.
return f'<figure class="kg-card kg-embed-card">{html_content}</figure>'
except Exception:
return None
def youtube_thumbnail_url(video_id: str) -> str:
return f"https://i.ytimg.com/vi/{video_id}/hqdefault.jpg"
def extract_youtube_id(url: str) -> Optional[str]:
try:
u = urlparse(url)
host = u.netloc.lower()
if host.endswith("youtube.com"):
if u.path == "/watch":
return parse_qs(u.query).get("v", [None])[0]
m = re.match(r"^/(shorts/|live/)?([A-Za-z0-9_-]{6,})", u.path)
if m:
return m.group(2)
if host == "youtu.be":
slug = u.path.strip("/").split("/")[0]
return slug or None
except Exception:
return None
return None
# ------------- Logging -------------
LOG = logging.getLogger("bot")
LOG_PATTERN = logging.Formatter("%(asctime)s:%(levelname)s: [%(filename)s] %(message)s")
def setuplogger():
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(LOG_PATTERN)
stream_handler.setLevel(logging.DEBUG)
file_handler = RotatingFileHandler("bot.log", "a", 1_000_000, 1)
file_handler.setFormatter(LOG_PATTERN)
LOG.setLevel(logging.DEBUG)
LOG.addHandler(stream_handler)
LOG.addHandler(file_handler)
# ------------- Model -------------
class RSSfeed:
def __init__(self, url: str, yt: bool = False):
self.url = url
self.youtube = yt
# ------------- Mistral AI Client -------------
class MistralClient:
"""Client for Mistral AI API to filter and group news items."""
def __init__(self, api_key: str, model: str = "mistral-small-latest"):
self.api_key = api_key
self.model = model
self.base_url = "https://api.mistral.ai/v1/chat/completions"
def _call_api(self, messages: List[Dict], temperature: float = 0.3) -> Optional[str]:
"""Make a call to the Mistral API."""
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
payload = {
"model": self.model,
"messages": messages,
"temperature": temperature,
"response_format": {"type": "json_object"},
}
try:
resp = requests.post(self.base_url, headers=headers, json=payload, timeout=120)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"]
except Exception as e:
LOG.error("Mistral API error: %s", e)
return None
def filter_news_items(self, items: List[dict], dry_run: bool = False) -> List[dict]:
"""
Filter out non-news items (tips, walkthroughs, guides, tutorials).
Returns only actual game news items.
"""
if not items:
return []
# Prepare items for analysis
items_for_analysis = []
for i, item in enumerate(items):
items_for_analysis.append({
"id": i,
"title": item.get("title", ""),
"link": item.get("link", ""),
"summary": (item.get("summary", "") or "")[:300], # Truncate for API
})
# Split into batches to avoid token limits
batch_size = 50
filtered_ids = set()
for batch_start in range(0, len(items_for_analysis), batch_size):
batch = items_for_analysis[batch_start:batch_start + batch_size]
prompt = f"""Tu analyses des articles de sites de jeux vidéo. Tu dois identifier UNIQUEMENT les articles à EXCLURE.
EXCLURE UNIQUEMENT si le titre contient EXPLICITEMENT UN de ces mots-clés:
- "guide" (le mot exact)
- "soluce" (le mot exact)
- "astuce" (le mot exact)
- "solution" (le mot exact, pas "résolution")
- "code promo"
- "bon plan"
- "-20%" ou "-30%" etc (réductions)
- "tuto" ou "tutoriel"
- "comment faire"
- "how to"
NE JAMAIS EXCLURE:
- "Early Access" = news de sortie anticipée, À GARDER
- "Test" ou "Review" = critique, À GARDER
- "Partie Rapide" = émission/podcast, À GARDER
- Tout article de news, annonce, sortie, preview
- Tout article d'opinion, éditorial, récap
- Tout le reste qui ne contient pas les mots-clés d'exclusion ci-dessus
Articles à analyser:
{json.dumps(batch, ensure_ascii=False, indent=2)}
Retourne un JSON avec "exclude_ids" contenant UNIQUEMENT les IDs des articles guides/soluces/promos.
Si aucun article ne correspond aux critères d'exclusion, retourne {{"exclude_ids": []}}
Sois TRÈS conservateur - en cas de doute, NE PAS exclure."""
messages = [{"role": "user", "content": prompt}]
response = self._call_api(messages)
if response:
try:
result = json.loads(response)
excluded_ids = set(result.get("exclude_ids", []))
# Keep all items NOT in excluded_ids
batch_ids = {item["id"] for item in batch}
kept_ids = batch_ids - excluded_ids
filtered_ids.update(kept_ids)
except json.JSONDecodeError:
LOG.warning("Failed to parse Mistral response for filtering")
# Fallback: include all items from this batch
filtered_ids.update(item["id"] for item in batch)
else:
# Fallback: include all items from this batch
filtered_ids.update(item["id"] for item in batch)
# Log filtered out items in dry-run mode
if dry_run:
excluded_ids = set(range(len(items))) - filtered_ids
if excluded_ids:
LOG.debug("=== FILTERED OUT (non-news) ===")
for i in sorted(excluded_ids):
LOG.debug(" [EXCLUDED] %s", items[i].get("title", "No title"))
LOG.debug("=== KEPT (news) ===")
for i in sorted(filtered_ids):
if i < len(items):
LOG.debug(" [KEPT] %s", items[i].get("title", "No title"))
return [items[i] for i in sorted(filtered_ids) if i < len(items)]
def group_similar_items(self, items: List[dict]) -> List[Dict]:
"""
Group news items by category (News, Tests/Reviews, Previews, etc.)
with sub-groups by game/topic within each category.
Returns a list of categories, each with sub-groups containing items.
"""
if not items:
return []
# Prepare items for analysis
items_for_analysis = []
for i, item in enumerate(items):
items_for_analysis.append({
"id": i,
"title": item.get("title", ""),
"link": item.get("link", ""),
})
prompt = f"""Organise ces articles de jeux vidéo en CATÉGORIES et SOUS-GROUPES.
Articles à organiser:
{json.dumps(items_for_analysis, ensure_ascii=False, indent=2)}
CATÉGORIES (utilise ces noms exacts):
1. "Actualités" - News, annonces, sorties, mises à jour, industrie
2. "Tests & Critiques" - Reviews, tests, avis, notes
3. "Aperçus & Previews" - Previews, impressions, démos, hands-on
4. "Vidéos" - Trailers, gameplay vidéos, podcasts
5. "Autres" - Le reste
RÈGLES DE GROUPEMENT (TRÈS IMPORTANT):
- Groupe par FRANCHISE ou SÉRIE (ex: tous les "Final Fantasy" ensemble, même FF7, FF16, FF XIV)
- Groupe par UNIVERS (ex: "Warhammer 40K" = Space Marine + Dawn of War + Darktide)
- Groupe par ÉVÉNEMENT (ex: "Nintendo Direct", "State of Play", "Game Awards")
- N'utilise JAMAIS de noms de sites web comme groupes (pas "NoFrag", "JeuxOnline", etc.)
EXEMPLES DE GROUPEMENTS CORRECTS:
- "Final Fantasy VII Rebirth sur Switch 2" + "Final Fantasy VII Remake Intergrade en tête" → groupe "Final Fantasy"
- "Techmarine dans Space Marine 2" + "Dawn of War 4 gameplay Ork" → groupe "Warhammer 40K"
- "GTA 6 trailer" + "GTA 6 date de sortie" → groupe "GTA 6"
- "Nintendo Direct annoncé" + "Zelda dans le Nintendo Direct" → groupe "Nintendo Direct"
Retourne ce JSON:
{{
"categories": [
{{
"name": "Actualités",
"subgroups": [
{{"title": "Final Fantasy", "item_ids": [0, 3, 7]}},
{{"title": "Warhammer 40K", "item_ids": [1, 2]}},
{{"title": "Steam", "item_ids": [5]}}
]
}}
]
}}
IMPORTANT: Chaque article dans UN SEUL sous-groupe. Titre = nom de franchise/série/univers, PAS nom de site."""
messages = [{"role": "user", "content": prompt}]
response = self._call_api(messages, temperature=0.2)
if response:
try:
result = json.loads(response)
categories = []
used_ids = set()
for cat_data in result.get("categories", []):
cat_name = cat_data.get("name", "Autres")
subgroups = []
for sg_data in cat_data.get("subgroups", []):
sg_title = sg_data.get("title", "Divers")
item_ids = sg_data.get("item_ids", [])
# Filter to valid, unused IDs
valid_ids = [i for i in item_ids if i < len(items) and i not in used_ids]
if valid_ids:
used_ids.update(valid_ids)
subgroups.append({
"title": sg_title,
"items": [items[i] for i in valid_ids]
})
if subgroups:
categories.append({
"name": cat_name,
"subgroups": subgroups
})
# Add any ungrouped items
ungrouped = [items[i] for i in range(len(items)) if i not in used_ids]
if ungrouped:
# Find or create "Autres" category
autres_cat = next((c for c in categories if c["name"] == "Autres"), None)
if autres_cat:
autres_cat["subgroups"].append({"title": "Divers", "items": ungrouped})
else:
categories.append({
"name": "Autres",
"subgroups": [{"title": "Divers", "items": ungrouped}]
})
return categories
except json.JSONDecodeError:
LOG.warning("Failed to parse Mistral response for grouping")
# Fallback: return all items in a single category/subgroup
return [{
"name": "Actualités de la semaine",
"subgroups": [{"title": "Toutes les news", "items": items}]
}]
# ------------- Ghost Admin API client -------------
class GhostAdmin:
def __init__(self, admin_url: str, admin_key: str, accept_version: str = "v6.0"):
self.base = admin_url.rstrip("/") + "/"
self.key_id, self.key_secret_hex = admin_key.split(":")
self.accept_version = accept_version
def _jwt(self) -> str:
iat = int(time.time())
payload = {"iat": iat, "exp": iat + 5 * 60, "aud": "/admin/"}
headers = {"alg": "HS256", "typ": "JWT", "kid": self.key_id}
token = jwt.encode(payload, bytes.fromhex(self.key_secret_hex), algorithm="HS256", headers=headers)
return token if isinstance(token, str) else token.decode("utf-8")
def _headers(self):
return {
"Authorization": f"Ghost {self._jwt()}",
"Accept-Version": self.accept_version,
"Content-Type": "application/json",
}
def latest_published_date(self, tz_name: str = "Europe/Brussels"):
"""
Date (aware) de la dernière publication (status=published), ou None.
"""
url = self.base + "posts/?limit=1&order=published_at%20desc&fields=published_at"
resp = requests.get(url, headers=self._headers(), timeout=20)
resp.raise_for_status()
posts = resp.json().get("posts", [])
if not posts or not posts[0].get("published_at"):
return None
# ISO 8601 → aware UTC → converti tz locale
dtu = dt.datetime.fromisoformat(posts[0]["published_at"].replace("Z", "+00:00"))
return dtu.astimezone(zoneinfo.ZoneInfo(tz_name))
def get_newsletters(self):
url = self.base + "newsletters/"
resp = requests.get(url, headers=self._headers(), timeout=20)
if resp.status_code >= 400:
raise RuntimeError(f"Ghost newsletters error {resp.status_code}: {resp.text}")
return resp.json().get("newsletters", [])
def pick_newsletter_slug(self, preferred_slug: Optional[str]) -> str:
if preferred_slug:
return preferred_slug
nls = self.get_newsletters()
if not nls:
raise RuntimeError("No newsletters configured in Ghost (Settings → Newsletters).")
actives = [n for n in nls if n.get("status") == "active"]
for n in actives:
if n.get("is_default"):
return n.get("slug")
return (actives or nls)[0].get("slug")
def create_post_html(self, title: str, html_content: str, status: str = "draft", feature_image: Optional[str] = None):
url = self.base + "posts/?source=html"
post = {"title": title, "html": html_content, "status": status}
if feature_image:
post["feature_image"] = feature_image
resp = requests.post(url, headers=self._headers(), json={"posts": [post]}, timeout=30)
if resp.status_code >= 400:
raise RuntimeError(f"Ghost create error {resp.status_code}: {resp.text}")
return resp.json()["posts"][0]
def publish_post(self, post_id: str, updated_at: str, newsletter_slug: Optional[str], email_segment: Optional[str]):
slug = self.pick_newsletter_slug(newsletter_slug)
params = [f"newsletter={requests.utils.quote(slug)}"]
if email_segment:
params.append(f"email_segment={requests.utils.quote(email_segment)}")
url = self.base + f"posts/{post_id}/?{'&'.join(params)}"
body = {"posts": [{"updated_at": updated_at, "status": "published"}]}
resp = requests.put(url, headers=self._headers(), json=body, timeout=30)
if resp.status_code >= 400:
raise RuntimeError(f"Ghost publish error {resp.status_code}: {resp.text}")
return resp.json()["posts"][0]
# ------------- Task orchestration -------------
class GhostTask:
def __init__(self, feeds: List[RSSfeed], admin_url: str, admin_key: str,
mistral_api_key: Optional[str] = None,
newsletter_slug: Optional[str] = None, email_segment: Optional[str] = None,
dry_run: bool = False):
self.ghost = GhostAdmin(admin_url, admin_key)
self.feeds = feeds
self.newsletter_slug = newsletter_slug
self.email_segment = email_segment
self.mistral = MistralClient(mistral_api_key) if mistral_api_key else None
self.dry_run = dry_run
for feed in self.feeds:
LOG.info("Adding feed %s", feed.url)
# --- startup immediate run if not yet published this week
def _published_this_week(self) -> bool:
"""Check if we already published this week (since last Saturday 12:00)."""
tz = zoneinfo.ZoneInfo("Europe/Brussels")
last = self.ghost.latest_published_date("Europe/Brussels")
if not last:
return False
now = dt.datetime.now(tz)
# Find last Saturday at 12:00
days_since_saturday = (now.weekday() - 5) % 7 # Saturday = 5
last_saturday = (now - dt.timedelta(days=days_since_saturday)).replace(
hour=12, minute=0, second=0, microsecond=0
)
return last >= last_saturday
async def maybe_run_this_week(self):
if not self._published_this_week():
LOG.info("Aucune newsletter publiée cette semaine -> génération immédiate.")
await self.weekly_task()
else:
LOG.info("Déjà publié cette semaine, on attend la prochaine fenêtre.")
# --- utils
@staticmethod
def _fr_week_range() -> str:
"""Returns a French formatted date range for the past week."""
months = {
'January': 'Janvier', 'February': 'Février', 'March': 'Mars', 'April': 'Avril',
'May': 'Mai', 'June': 'Juin', 'July': 'Juillet', 'August': 'Août',
'September': 'Septembre', 'October': 'Octobre', 'November': 'Novembre', 'December': 'Décembre'
}
today = dt.datetime.now()
week_ago = today - dt.timedelta(days=7)
# Format: "24 - 31 Janvier 2025" or "28 Janvier - 4 Février 2025"
if week_ago.month == today.month:
formatted = f"{week_ago.day} - {today.strftime('%d %B %Y')}"
else:
formatted = f"{week_ago.strftime('%d %B')} - {today.strftime('%d %B %Y')}"
for en, fr in months.items():
formatted = formatted.replace(en, fr)
return formatted
@staticmethod
def _safe_get(url: str, timeout: int = 20) -> Optional[bytes]:
try:
r = requests.get(url, timeout=timeout, headers={"User-Agent": "ghost-bot/1.0"})
r.raise_for_status()
return r.content
except Exception as e:
LOG.warning("Flux indisponible: %s (%s)", url, e)
return None
@staticmethod
def _entry_datetime(entry) -> Optional[dt.datetime]:
"""
Tente de récupérer une datetime aware (UTC) pour un item feedparser.
"""
# Try common fields first
if getattr(entry, "published", None):
try:
# YouTube (ISO) e.g. 2025-09-05T10:20:33+00:00
return dt.datetime.fromisoformat(entry.published.replace("Z", "+00:00")).astimezone(dt.timezone.utc)
except Exception:
pass
try:
# RFC822 e.g. Fri, 05 Sep 2025 10:20:33 +0000
return dt.datetime.strptime(entry.published.replace('GMT', '+0000'),
'%a, %d %b %Y %H:%M:%S %z').astimezone(dt.timezone.utc)
except Exception:
pass
if getattr(entry, "updated_parsed", None):
try:
return dt.datetime.fromtimestamp(time.mktime(entry.updated_parsed), tz=dt.timezone.utc)
except Exception:
pass
return None
# --- HTML builder for grouped content
def _build_html_roundup_grouped(self, categories: List[Dict], feeds: List[RSSfeed]):
"""
Construit le HTML avec des catégories et sous-groupes thématiques.
Inclut un résumé et une table des matières en haut.
Retourne (html, feature_image_url_ou_None).
"""
parts: List[str] = []
first_image: Optional[str] = None
# --- Build Summary Section ---
parts.append('<h2>✨ En bref cette semaine</h2>')
parts.append('<ul>')
for cat in categories:
cat_name = cat.get("name", "Actualités")
subgroups = cat.get("subgroups", [])
# Get top subgroups with more than 1 item (by item count) for summary
multi_item_subgroups = [sg for sg in subgroups if len(sg.get("items", [])) > 1]
sorted_subgroups = sorted(multi_item_subgroups, key=lambda sg: len(sg.get("items", [])), reverse=True)
top_subgroups = sorted_subgroups[:5] # Max 5 highlights per category
total_items = sum(len(sg.get("items", [])) for sg in subgroups)
if top_subgroups:
highlights = ", ".join(sg.get("title", "Divers") for sg in top_subgroups)
total_items = sum(len(sg.get("items", [])) for sg in subgroups)
parts.append(f'<li><strong>{html.escape(cat_name)}</strong>: {html.escape(highlights)} ({total_items} articles)</li>')
elif total_items > 0:
parts.append(f'<li><strong>{html.escape(cat_name)}</strong>: {total_items} articles</li>')
parts.append('</ul>')
parts.append('<hr>')
# --- Build Table of Contents ---
# parts.append('<h2>📋 Sommaire</h2>')
# parts.append('<ul>')
# for cat in categories:
# cat_name = cat.get("name", "Actualités")
# cat_anchor = self._make_anchor(cat_name)
# subgroups = cat.get("subgroups", [])
# total_items = sum(len(sg.get("items", [])) for sg in subgroups)
# parts.append(f'<li><a href="#{cat_anchor}"><strong>{html.escape(cat_name)}</strong></a> ({total_items} articles)')
# if len(subgroups) > 1 or (len(subgroups) == 1 and len(subgroups[0].get("items", [])) > 1):
# parts.append('<ul>')
# for sg in subgroups:
# sg_title = sg.get("title", "Divers")
# sg_anchor = self._make_anchor(f"{cat_name}-{sg_title}")
# item_count = len(sg.get("items", []))
# parts.append(f'<li><a href="#{sg_anchor}">{html.escape(sg_title)}</a> ({item_count})</li>')
# parts.append('</ul>')
# parts.append('</li>')
# parts.append('</ul>')
# parts.append('<hr>')
# --- Build Content by Category ---
for cat in categories:
cat_name = cat.get("name", "Actualités")
cat_anchor = self._make_anchor(cat_name)
subgroups = cat.get("subgroups", [])
if not subgroups:
continue
# Category header with emoji
cat_emoji = {
"Actualités": "📰",
"Tests & Critiques": "",
"Aperçus & Previews": "👁️",
"Vidéos": "🎬",
"Autres": "📁"
}.get(cat_name, "📌")
parts.append(f'<h2 id="{cat_anchor}">{cat_emoji} {html.escape(cat_name)}</h2>')
for sg in subgroups:
sg_title = sg.get("title", "Divers")
sg_anchor = self._make_anchor(f"{cat_name}-{sg_title}")
items = sg.get("items", [])
if not items:
continue
# Sub-group header (only if more than 1 item in subgroup)
if len(items) > 1:
parts.append(f'<h3 id="{sg_anchor}">{html.escape(sg_title)}</h3>')
for post in items:
title = post.get("title", "") or ""
linkURL = post.get("link", "") or ""
parts.append(f'<h4>{html.escape(title)}</h4>')
# --- YouTube embed / fallback
vid = post.get("yt_videoid") or extract_youtube_id(linkURL)
if vid:
watch_url = f"https://www.youtube.com/watch?v={vid}"
# Try provider HTML via oEmbed (as Ghost does)
embed_html = fetch_youtube_oembed_html(watch_url, timeout=10)
if embed_html:
parts.append(embed_html)
else:
# Fallback: leave the plain URL on its own line so Ghost may still auto-embed
parts.append(f'\n<p>{watch_url}</p>\n')
# Minimal fallback link (non-intrusive for email/web)
parts.append(f'<p><a href="{watch_url}">Voir sur YouTube</a></p>')
else:
# --- Texte + lien
ftext = ""
if "summary" in post and post["summary"]:
ftext = html.unescape(post["summary"])
ftext = re.sub("<[^<]+?>", "", ftext)
ftext = re.sub(r"L'article .* est apparu en premier sur .*", "", ftext)
if ftext:
parts.append(f"<p>{html.escape(ftext)}</p>")
if linkURL:
esc = html.escape(linkURL)
parts.append(f'<p><a href="{esc}">{esc}</a></p>')
# --- Images: first try RSS metadata, then crawl the page
found_image = False
for link in post.get("links", []) or []:
if link.get("type") in ("image/jpg", "image/jpeg", "image/png", "image/webp"):
imgUrl = link.get("href")
if imgUrl:
imgUrl = imgUrl.replace("/250x250/", "/990x320/")
if not first_image:
first_image = imgUrl
parts.append(f'<figure><img src="{html.escape(imgUrl)}" loading="lazy"></figure>')
found_image = True
# If no image from RSS, try to extract from the article page
if not found_image and linkURL:
crawled_img = extract_image_from_url(linkURL, timeout=8)
if crawled_img:
if not first_image:
first_image = crawled_img
parts.append(f'<figure><img src="{html.escape(crawled_img)}" loading="lazy"></figure>')
parts.append('<hr>')
# --- Sources
parts.append("<h3>📚 Sources</h3>")
for feed in feeds:
esc = html.escape(feed.url)
parts.append(f'<p><a href="{esc}">{esc}</a></p>')
parts.append('<p><em>Abonnez-vous pour recevoir chaque semaine les news et soutenir mon travail.</em></p>')
return "\n".join(parts), first_image
@staticmethod
def _make_anchor(text: str) -> str:
"""Convert text to a valid HTML anchor ID."""
# Remove accents and special chars, lowercase, replace spaces with dashes
anchor = text.lower()
anchor = re.sub(r'[àáâãäå]', 'a', anchor)
anchor = re.sub(r'[èéêë]', 'e', anchor)
anchor = re.sub(r'[ìíîï]', 'i', anchor)
anchor = re.sub(r'[òóôõö]', 'o', anchor)
anchor = re.sub(r'[ùúûü]', 'u', anchor)
anchor = re.sub(r'[ýÿ]', 'y', anchor)
anchor = re.sub(r'[ç]', 'c', anchor)
anchor = re.sub(r'[^a-z0-9\s-]', '', anchor)
anchor = re.sub(r'\s+', '-', anchor.strip())
return anchor or "section"
@staticmethod
def _format_duration(seconds: float) -> str:
seconds = int(seconds)
days, seconds = divmod(seconds, 86400)
hours, seconds = divmod(seconds, 3600)
minutes, seconds = divmod(seconds, 60)
parts = []
if days: parts.append(f"{days} days")
if hours: parts.append(f"{hours} hours")
if minutes: parts.append(f"{minutes} minutes")
if seconds: parts.append(f"{seconds} seconds")
return ", ".join(parts) if parts else "0 seconds"
async def run_weekly_on_saturday(self):
"""Run every Saturday at 12:00 (noon)."""
while True:
now = dt.datetime.now()
# Calculate next Saturday at 12:00
days_until_saturday = (5 - now.weekday()) % 7 # Saturday = 5
if days_until_saturday == 0 and now.hour >= 12:
days_until_saturday = 7 # Already past Saturday 12:00, wait for next week
next_run = (now + dt.timedelta(days=days_until_saturday)).replace(
hour=12, minute=0, second=0, microsecond=0
)
sleep_seconds = (next_run - now).total_seconds()
while sleep_seconds > 0:
LOG.info("Waiting for %s for next scan (Saturday noon)", self._format_duration(sleep_seconds))
await asyncio.sleep(min(sleep_seconds, 5 * 60))
now = dt.datetime.now()
sleep_seconds = (next_run - now).total_seconds()
LOG.info("Going to run the weekly task")
await self.weekly_task()
async def weekly_task(self):
"""Main weekly task: collect, filter, group, and publish."""
# Log newsletters (debug)
try:
nls = self.ghost.get_newsletters()
LOG.info("Newsletters: %s", ", ".join(f"{n.get('name')}[{n.get('slug')}]" for n in nls))
except Exception as e:
LOG.warning("Unable to list newsletters: %s", e)
title_post = "Les news de la semaine du " + self._fr_week_range()
LOG.info("Running weekly task : %s", title_post)
# (Re)charge les feeds
feeds_file = os.environ.get("FEEDS_FILE", "/data/feeds.txt")
if not os.path.isfile(feeds_file):
feeds_file = os.environ.get("FEEDS_FILE_FALLBACK", r"f:\workspace\Substack_JV\feeds.txt")
feeds: List[RSSfeed] = []
with open(feeds_file, encoding="utf-8") as f:
lines = [line.strip() for line in f if line.strip()]
for line in lines:
feeds.append(RSSfeed(line, "youtube" in line.lower()))
self.feeds = feeds
# Fenêtre: depuis 7 jours à 06:00 UTC
week_ago_6am_utc = dt.datetime.now(dt.timezone.utc).replace(
hour=6, minute=0, second=0, microsecond=0
) - dt.timedelta(days=7)
all_news_posts: List[dict] = []
for feed in self.feeds:
LOG.info("Scanning feed %s", feed.url)
content = self._safe_get(feed.url, timeout=30)
if not content:
continue
fp = feedparser.parse(content)
# Sélection des items de la semaine
new_entries = []
for e in fp.entries:
dte = self._entry_datetime(e)
if dte and dte > week_ago_6am_utc:
new_entries.append(e)
# Basic URL-based filtering (keep existing logic)
filtered = []
for e in new_entries:
linkURL = e.get("link", "") or ""
if "actugaming" in linkURL and ("puzzle-" in linkURL or "guide-" in linkURL):
continue
# enrich YouTube id if applicable
if feed.youtube and linkURL:
vid = extract_youtube_id(linkURL)
if vid:
e["yt_videoid"] = vid
filtered.append(e)
all_news_posts.extend(filtered)
if not all_news_posts:
LOG.warning("Aucun item récupéré (flux down ?). On n'envoie pas cette semaine.")
return
LOG.info("Collected %d items from feeds", len(all_news_posts))
# Use Mistral AI for filtering and grouping if available
if self.mistral:
LOG.info("Using Mistral AI to filter non-news content...")
filtered_posts = self.mistral.filter_news_items(all_news_posts, dry_run=self.dry_run)
LOG.info("After filtering: %d items (removed %d)",
len(filtered_posts), len(all_news_posts) - len(filtered_posts))
if filtered_posts:
LOG.info("Using Mistral AI to group items by category...")
categories = self.mistral.group_similar_items(filtered_posts)
total_cats = len(categories)
total_subgroups = sum(len(cat.get("subgroups", [])) for cat in categories)
LOG.info("Created %d categories with %d sub-groups", total_cats, total_subgroups)
else:
categories = []
else:
LOG.warning("No Mistral API key configured, skipping AI filtering/grouping")
# Fallback: single category with all items
categories = [{
"name": "Actualités de la semaine",
"subgroups": [{"title": "Toutes les news", "items": all_news_posts}]
}]
if not categories or all(
len(sg.get("items", [])) == 0
for cat in categories
for sg in cat.get("subgroups", [])
):
LOG.warning("No news items after filtering. Skipping this week.")
return
roundup_html, feature_image = self._build_html_roundup_grouped(categories, self.feeds)
# 1) Create draft (with feature image if any)
created = self.ghost.create_post_html(title_post, roundup_html, status="draft", feature_image=feature_image)
LOG.info("Created draft post: %s (id: %s)", created.get("title"), created.get("id"))
# 2) Publish + send email (unless dry-run mode)
if self.dry_run:
LOG.info("DRY-RUN MODE: Post created as draft but NOT published. URL: %s",
created.get("url", "N/A"))
LOG.info("DRY-RUN MODE: Review the draft in Ghost admin, then publish manually if satisfied.")
return
published = self.ghost.publish_post(
post_id=created["id"],
updated_at=created["updated_at"],
newsletter_slug=os.environ.get("GHOST_NEWSLETTER_SLUG"),
email_segment=os.environ.get("GHOST_EMAIL_SEGMENT"),
)
LOG.info("Published post: %s (emailed via newsletter)", published.get("url"))
# ------------- main -------------
async def main():
setuplogger()
parser = argparse.ArgumentParser()
parser.add_argument("--runonce", action="store_true", help="Run now and exit (no scheduler)")
parser.add_argument("--dry-run", action="store_true", dest="dry_run",
help="Run immediately, create draft but do NOT publish (for testing)")
args = parser.parse_args()
# Feeds init (list may be reloaded inside task)
feeds: List[RSSfeed] = []
feeds_file = os.environ.get("FEEDS_FILE", "/data/feeds.txt")
if not os.path.isfile(feeds_file):
feeds_file = os.environ.get("FEEDS_FILE_FALLBACK", r"f:\workspace\Substack_JV\feeds.txt")
with open(feeds_file, encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
feeds.append(RSSfeed(line, "youtube" in line.lower()))
admin_url = os.environ["GHOST_ADMIN_URL"] # e.g. https://ghostadmin.zep.best/ghost/api/admin/
admin_key = os.environ["GHOST_ADMIN_KEY"] # integration_id:secret_hex
mistral_api_key = os.environ.get("MISTRAL_API_KEY") # Optional: for AI filtering/grouping
if not mistral_api_key:
LOG.warning("MISTRAL_API_KEY not set. AI filtering and grouping will be disabled.")
task = GhostTask(
feeds=feeds,
admin_url=admin_url,
admin_key=admin_key,
mistral_api_key=mistral_api_key,
newsletter_slug=os.environ.get("GHOST_NEWSLETTER_SLUG"),
email_segment=os.environ.get("GHOST_EMAIL_SEGMENT"),
dry_run=args.dry_run,
)
LOG.info("Starting bot (weekly mode%s)", " - DRY RUN" if args.dry_run else "")
if args.runonce:
await task.weekly_task()
return
if args.dry_run:
LOG.info("DRY-RUN: Running weekly task immediately (will create draft only)")
await task.weekly_task()
return
# Démarrage: publier l'édition de la semaine si elle n'existe pas encore
await task.maybe_run_this_week()
# Planification hebdomadaire le samedi à 12:00 Europe/Brussels
await task.run_weekly_on_saturday()
if __name__ == "__main__":
asyncio.run(main())

1040
presquegratos.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -1,2 +1,7 @@
requests
feedparser feedparser
PyJWT>=2.7,<3
requests>=2.31
feedparser>=6.0
aiohttp
bs4
playwright

49
storage.py Normal file
View File

@@ -0,0 +1,49 @@
# storage.py
from __future__ import annotations
import sqlite3, pathlib, datetime as dt
from typing import Optional, Iterable, Tuple
import os
DB_PATH = "/data/published.db" # bind-mount ./data:/data in docker
_SCHEMA = """
PRAGMA journal_mode = WAL;
CREATE TABLE IF NOT EXISTS published_items(
platform TEXT NOT NULL, -- e.g. xgp | egs | psplus
key TEXT PRIMARY KEY, -- your dedupe key (see below)
first_seen_utc TEXT NOT NULL, -- ISO-8601
last_post_id TEXT -- Ghost post id that recorded it
);
CREATE INDEX IF NOT EXISTS idx_platform ON published_items(platform);
"""
class Storage:
def __init__(self, db_path: str = DB_PATH):
pathlib.Path(db_path).parent.mkdir(parents=True, exist_ok=True)
self.conn = sqlite3.connect(db_path)
self.conn.execute("PRAGMA foreign_keys = ON;")
for stmt in filter(None, _SCHEMA.split(";")):
if stmt.strip():
self.conn.execute(stmt)
def seen(self, key: str) -> bool:
cur = self.conn.execute("SELECT 1 FROM published_items WHERE key=?", (key,))
return cur.fetchone() is not None
def remember(self, platform: str, key: str, post_id: Optional[str]):
self.conn.execute(
"INSERT OR IGNORE INTO published_items(platform,key,first_seen_utc,last_post_id) VALUES(?,?,?,?)",
(platform, key, dt.datetime.utcnow().isoformat(), post_id),
)
if post_id:
self.conn.execute("UPDATE published_items SET last_post_id=? WHERE key=?", (post_id, key))
self.conn.commit()
def bulk_remember(self, platform: str, pairs: Iterable[Tuple[str, Optional[str]]]):
rows = [(platform, k, dt.datetime.utcnow().isoformat(), pid) for (k, pid) in pairs]
self.conn.executemany(
"INSERT OR IGNORE INTO published_items(platform,key,first_seen_utc,last_post_id) VALUES(?,?,?,?)",
rows
)
self.conn.commit()

View File

@@ -1,11 +0,0 @@
"""A library that provides a Python interface to the Substack API."""
__author__ = "Paolo Mazza"
__email__ = "mazzapaolo2019@gmail.com"
__license__ = "MIT License"
__version__ = "1.0"
__url__ = "https://github.com/ma2za/python-substack"
__download_url__ = "https://pypi.python.org/pypi/python-substack"
__description__ = "A Python wrapper around the Substack API"
from .api import Api

View File

@@ -1,708 +0,0 @@
"""
API Wrapper
"""
import base64
import json
import logging
import os
from datetime import datetime
from urllib.parse import urljoin
from pyvirtualdisplay import Display
import requests
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
import pickle
import time
from substack.exceptions import SubstackAPIException, SubstackRequestException
from selenium.webdriver.support import expected_conditions as EC
import datetime
logger = logging.getLogger(__name__)
__all__ = ["Api"]
class Api:
"""
A python interface into the Substack API
"""
def __init__(
self,
email=None,
password=None,
cookies_path=None,
base_url=None,
publication_url=None,
debug=False,
):
"""
To create an instance of the substack.Api class:
>>> import substack
>>> api = substack.Api(email="substack email", password="substack password")
Args:
email:
password:
cookies_path
To re-use your session without logging in each time, you can save your cookies to a json file and
then load them in the next session.
Make sure to re-save your cookies, as they do update over time.
base_url:
The base URL to use to contact the Substack API.
Defaults to https://substack.com/api/v1.
"""
self.base_url = base_url or "https://substack.com/api/v1"
self.email = email
self.password = password
if debug:
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
self._session = requests.Session()
# Load cookies from file if provided
# Helps with Captcha errors by reusing cookies from "local" auth, then switching to running code in the cloud
if cookies_path is not None:
with open(cookies_path) as f:
cookies = json.load(f)
self._session.cookies.update(cookies)
elif email is not None and password is not None:
self.send_magic_link(email)
magic_link = input("Enter magic link: ")
self.login_v2(email, password, magic_link)
self.export_cookies(cookies_path)
else:
raise ValueError(
"Must provide email and password or cookies_path to authenticate."
)
user_publication = None
# if the user provided a publication url, then use that
if publication_url:
import re
# Regular expression to extract subdomain name
match = re.search(r"https://(.*).substack.com", publication_url.lower())
subdomain = match.group(1) if match else None
user_publications = self.get_user_publications()
# search through publications to find the publication with the matching subdomain
for publication in user_publications:
if publication["subdomain"] == subdomain:
# set the current publication to the users publication
user_publication = publication
break
else:
# get the users primary publication
user_publication = self.get_user_primary_publication()
# set the current publication to the users primary publication
self.change_publication(user_publication)
def send_magic_link(self, email):
body = {
"email": email,
"redirect": "/",
"for_pub": "",
}
endpoint = f"https://substack.com/api/v1/email-login/"
response = self._session.post(endpoint, json=body)
print("Magic link sent!")
def login_v2(self, email, password, magic_link):
return self._session.get(magic_link)
def login(self, email, password) -> dict:
"""
Login to the substack account.
Args:
email: substack account email
password: substack account password
"""
response = self._session.post(
f"{self.base_url}/login",
json={
"captcha_response": None,
"email": email,
"password": password,
},
)
return self._handle_response(response=response)
def signin_for_pub(self, publication):
"""
Complete the signin process
"""
response = self._session.get(
f"https://substack.com/sign-in?redirect=%2F&for_pub={publication['subdomain']}",
)
try:
output = self._handle_response(response=response)
except SubstackRequestException as ex:
output = {}
return output
def change_publication(self, publication):
"""
Change the publication URL
"""
self.publication_url = urljoin(publication["publication_url"], "api/v1")
# sign-in to the publication
self.signin_for_pub(publication)
def export_cookies(self, path: str = "cookies.json"):
"""
Export cookies to a json file.
Args:
path: path to the json file
"""
cookies = self._session.cookies.get_dict()
with open(path, "w") as f:
json.dump(cookies, f)
def save_cookies(self, driver, path):
with open(path, 'wb') as file:
pickle.dump(driver.get_cookies(), file)
def load_cookies(self, driver, path):
with open(path, 'rb') as file:
cookies = pickle.load(file)
for cookie in cookies:
driver.add_cookie(cookie)
def login_with_selenium(self):
"""
Login using Selenium to solve CAPTCHA manually.
"""
# Start virtual display
cookie_path = 'cookies.pkl'
if os.path.exists(cookie_path):
try:
# Load cookies directly to session if they exist
with open(cookie_path, 'rb') as file:
cookies = pickle.load(file)
cookies_valid = True
for cookie in cookies:
if 'expiry' in cookie:
expiry_date = datetime.datetime.fromtimestamp(cookie['expiry'])
print(f"Cookie {cookie['name']} expires on {expiry_date}")
if cookie['expiry'] < time.time():
cookies_valid = False
print(f"Cookie {cookie['name']} has expired.")
break
if cookies_valid:
# Load cookies into session
for cookie in cookies:
self._session.cookies.set(cookie['name'], cookie['value'])
print("Cookies loaded successfully. Skipping login.")
return
except Exception as e:
print("Error loading cookies, proceeding with Selenium login.", e)
#display = Display()
#display.start()
print("Login with selenium")
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
driver = webdriver.Chrome()
driver.get('https://substack.com/sign-in')
# Check if already logged in by checking the presence of a user-specific element
wait = WebDriverWait(driver, 10)
try:
# Adjust the selector to match an element that is present only when logged in
dashboard_button = wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'button[data-href*="publish/home?utm_source=menu"]'))
)
print("Already logged in.")
except Exception:
print("Not logged in. Proceeding with login steps.")
try:
login_with_password_button = wait.until(EC.element_to_be_clickable((By.LINK_TEXT, 'Log in with password')))
login_with_password_button.click()
time.sleep(2) # Wait for the transition to the login form
# Fill in the email and password fields
email_field = driver.find_element(By.NAME, 'email')
email_field.send_keys(self.email)
password_field = driver.find_element(By.NAME, 'password')
password_field.send_keys(self.password)
# Submit the form
password_field.send_keys(Keys.RETURN)
print("Please solve the CAPTCHA manually in the opened browser.")
input("Press Enter after solving the CAPTCHA...")
# Save cookies after solving the CAPTCHA
self.save_cookies(driver, cookie_path)
print("Cookies saved successfully.")
except Exception as e:
print("An error occurred during login.", e)
# Save cookies after login or cookie load
self.save_cookies(driver, cookie_path)
# Extract cookies to use with requests
cookies = driver.get_cookies()
for cookie in cookies:
self._session.cookies.set(cookie['name'], cookie['value'])
# Close the browser
driver.quit()
def _handle_response(self, response: requests.Response):
"""
Internal helper for handling API responses from the Substack server.
Raises the appropriate exceptions when necessary; otherwise, returns the
response.
"""
if (response.status_code == 401):
print("CAPTCHA detected, switching to Selenium for manual solving.")
return
if not (200 <= response.status_code < 300):
raise SubstackAPIException(response.status_code, response.text)
try:
return response.json()
except ValueError:
raise SubstackRequestException("Invalid Response: %s" % response.text)
def get_user_id(self):
"""
Returns:
"""
profile = self.get_user_profile()
user_id = profile["id"]
return user_id
@staticmethod
def get_publication_url(publication: dict) -> str:
"""
Gets the publication url
Args:
publication:
"""
custom_domain = publication["custom_domain"]
if not custom_domain:
publication_url = f"https://{publication['subdomain']}.substack.com"
else:
publication_url = f"https://{custom_domain}"
return publication_url
def get_user_primary_publication(self):
"""
Gets the users primary publication
"""
profile = self.get_user_profile()
primary_publication = profile["primaryPublication"]
primary_publication["publication_url"] = self.get_publication_url(
primary_publication
)
return primary_publication
def get_user_publications(self):
"""
Gets the users publications
"""
profile = self.get_user_profile()
# Loop through users "publicationUsers" list, and return a list
# of dictionaries of "name", and "subdomain", and "id"
user_publications = []
for publication in profile["publicationUsers"]:
pub = publication["publication"]
pub["publication_url"] = self.get_publication_url(pub)
user_publications.append(pub)
return user_publications
def get_user_profile(self):
"""
Gets the users profile
"""
response = self._session.get(f"{self.base_url}/user/profile/self")
return self._handle_response(response=response)
def get_user_settings(self):
"""
Get list of users.
Returns:
"""
response = self._session.get(f"{self.base_url}/settings")
return self._handle_response(response=response)
def get_publication_users(self):
"""
Get list of users.
Returns:
"""
response = self._session.get(f"{self.publication_url}/publication/users")
return self._handle_response(response=response)
def get_publication_subscriber_count(self):
"""
Get subscriber count.
Returns:
"""
response = self._session.get(
f"{self.publication_url}/publication_launch_checklist"
)
return self._handle_response(response=response)["subscriberCount"]
def get_published_posts(
self, offset=0, limit=25, order_by="post_date", order_direction="desc"
):
"""
Get list of published posts for the publication.
"""
response = self._session.get(
f"{self.publication_url}/post_management/published",
params={
"offset": offset,
"limit": limit,
"order_by": order_by,
"order_direction": order_direction,
},
)
return self._handle_response(response=response)
def get_posts(self) -> dict:
"""
Returns:
"""
response = self._session.get(f"{self.base_url}/reader/posts")
return self._handle_response(response=response)
def get_drafts(self, filter=None, offset=None, limit=None):
"""
Args:
filter:
offset:
limit:
Returns:
"""
response = self._session.get(
f"{self.publication_url}/drafts",
params={"filter": filter, "offset": offset, "limit": limit},
)
return self._handle_response(response=response)
def get_draft(self, draft_id):
"""
Gets a draft given it's id.
"""
response = self._session.get(f"{self.publication_url}/drafts/{draft_id}")
return self._handle_response(response=response)
def delete_draft(self, draft_id):
"""
Args:
draft_id:
Returns:
"""
response = self._session.delete(f"{self.publication_url}/drafts/{draft_id}")
return self._handle_response(response=response)
def post_draft(self, body) -> dict:
"""
Args:
body:
Returns:
"""
response = self._session.post(f"{self.publication_url}/drafts", json=body)
return self._handle_response(response=response)
def put_draft(self, draft, **kwargs) -> dict:
"""
Args:
draft:
**kwargs:
Returns:
"""
response = self._session.put(
f"{self.publication_url}/drafts/{draft}",
json=kwargs,
)
return self._handle_response(response=response)
def prepublish_draft(self, draft) -> dict:
"""
Args:
draft: draft id
Returns:
"""
response = self._session.get(
f"{self.publication_url}/drafts/{draft}/prepublish"
)
return self._handle_response(response=response)
def publish_draft(
self, draft, send: bool = True, share_automatically: bool = False
) -> dict:
"""
Args:
draft: draft id
send:
share_automatically:
Returns:
"""
response = self._session.post(
f"{self.publication_url}/drafts/{draft}/publish",
json={"send": send, "share_automatically": share_automatically},
)
return self._handle_response(response=response)
def schedule_draft(self, draft, draft_datetime: datetime) -> dict:
"""
Args:
draft: draft id
draft_datetime: datetime to schedule the draft
Returns:
"""
response = self._session.post(
f"{self.publication_url}/drafts/{draft}/schedule",
json={"post_date": draft_datetime.isoformat()},
)
return self._handle_response(response=response)
def unschedule_draft(self, draft) -> dict:
"""
Args:
draft: draft id
Returns:
"""
response = self._session.post(
f"{self.publication_url}/drafts/{draft}/schedule", json={"post_date": None}
)
return self._handle_response(response=response)
def get_image(self, image: str):
"""
This method generates a new substack link that contains the image.
Args:
image: filepath or original url of image.
Returns:
"""
if os.path.exists(image):
with open(image, "rb") as file:
image = b"data:image/jpeg;base64," + base64.b64encode(file.read())
response = self._session.post(
f"{self.publication_url}/image",
data={"image": image},
)
return self._handle_response(response=response)
def get_categories(self):
"""
Retrieve list of all available categories.
Returns:
"""
response = self._session.get(f"{self.base_url}/categories")
return self._handle_response(response=response)
def get_category(self, category_id, category_type, page):
"""
Args:
category_id:
category_type:
page:
Returns:
"""
response = self._session.get(
f"{self.base_url}/category/public/{category_id}/{category_type}",
params={"page": page},
)
return self._handle_response(response=response)
def get_single_category(self, category_id, category_type, page=None, limit=None):
"""
Args:
category_id:
category_type: paid or all
page: by default substack retrieves only the first 25 publications in the category. If this is left None,
then all pages will be retrieved. The page size is 25 publications.
limit:
Returns:
"""
if page is not None:
output = self.get_category(category_id, category_type, page)
else:
publications = []
page = 0
while True:
page_output = self.get_category(category_id, category_type, page)
publications.extend(page_output.get("publications", []))
if (
limit is not None and limit <= len(publications)
) or not page_output.get("more", False):
publications = publications[:limit]
break
page += 1
output = {
"publications": publications,
"more": page_output.get("more", False),
}
return output
def delete_all_drafts(self):
"""
Returns:
"""
response = None
while True:
drafts = self.get_drafts(filter="draft", limit=10, offset=0)
if len(drafts) == 0:
break
for draft in drafts:
response = self.delete_draft(draft.get("id"))
return response
def get_sections(self):
"""
Get a list of the sections of your publication.
TODO: this is hacky but I cannot find another place where to get the sections.
Returns:
"""
response = self._session.get(
f"{self.publication_url}/subscriptions",
)
content = Api._handle_response(response=response)
sections = [
p.get("sections")
for p in content.get("publications")
if p.get("hostname") in self.publication_url
]
return sections[0]
def publication_embed(self, url):
"""
Args:
url:
Returns:
"""
return self.call("/publication/embed", "GET", url=url)
def call(self, endpoint, method, **params):
"""
Args:
endpoint:
method:
**params:
Returns:
"""
response = self._session.request(
method=method,
url=f"{self.publication_url}/{endpoint}",
params=params,
)
return self._handle_response(response=response)

View File

@@ -1,32 +0,0 @@
import json
class SubstackAPIException(Exception):
def __init__(self, status_code, text):
try:
json_res = json.loads(text)
except ValueError:
self.message = f"Invalid JSON error message from Substack: {text}"
else:
self.message = ", ".join(
list(
map(lambda error: error.get("msg", ""), json_res.get("errors", []))
)
)
self.message = self.message or json_res.get("error", "")
self.status_code = status_code
def __str__(self):
return f"APIError(code={self.status_code}): {self.message}"
class SubstackRequestException(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
return f"SubstackRequestException: {self.message}"
class SectionNotExistsException(SubstackRequestException):
pass

View File

@@ -1,331 +0,0 @@
"""
Post Utilities
"""
import json
from typing import Dict
__all__ = ["Post"]
from substack.exceptions import SectionNotExistsException
class Post:
"""
Post utility class
"""
def __init__(
self,
title: str,
subtitle: str,
user_id,
audience: str = None,
write_comment_permissions: str = None,
):
"""
Args:
title:
subtitle:
user_id:
audience: possible values: everyone, only_paid, founding, only_free
write_comment_permissions: none, only_paid, everyone (this field is a mess)
"""
self.draft_title = title
self.draft_subtitle = subtitle
self.draft_body = {"type": "doc", "content": []}
self.draft_bylines = [{"id": int(user_id), "is_guest": False}]
self.audience = audience if audience is not None else "everyone"
self.draft_section_id = None
self.section_chosen = True
# TODO better understand the possible values and combinations with audience
if write_comment_permissions is not None:
self.write_comment_permissions = write_comment_permissions
else:
self.write_comment_permissions = self.audience
def set_section(self, name: str, sections: list):
"""
Args:
name:
sections:
Returns:
"""
section = [s for s in sections if s.get("name") == name]
if len(section) != 1:
raise SectionNotExistsException(name)
section = section[0]
self.draft_section_id = section.get("id")
def add(self, item: Dict):
"""
Add item to draft body.
Args:
item:
Returns:
"""
self.draft_body["content"] = self.draft_body.get("content", []) + [
{"type": item.get("type")}
]
content = item.get("content")
if item.get("type") == "captionedImage":
self.captioned_image(**item)
elif item.get("type") == "embeddedPublication":
self.draft_body["content"][-1]["attrs"] = item.get("url")
elif item.get("type") == "youtube2":
self.youtube(item.get("src"))
elif item.get("type") == "subscribeWidget":
self.subscribe_with_caption(item.get("message"))
else:
if content is not None:
self.add_complex_text(content)
if item.get("type") == "heading":
self.attrs(item.get("level", 1))
marks = item.get("marks")
if marks is not None:
self.marks(marks)
return self
def paragraph(self, content=None):
"""
Args:
content:
Returns:
"""
item = {"type": "paragraph"}
if content is not None:
item["content"] = content
return self.add(item)
def heading(self, content=None, level: int = 1):
"""
Args:
content:
level:
Returns:
"""
item = {"type": "heading"}
if content is not None:
item["content"] = content
item["level"] = level
return self.add(item)
def horizontal_rule(self):
"""
Returns:
"""
return self.add({"type": "horizontal_rule"})
def attrs(self, level):
"""
Args:
level:
Returns:
"""
content_attrs = self.draft_body["content"][-1].get("attrs", {})
content_attrs.update({"level": level})
self.draft_body["content"][-1]["attrs"] = content_attrs
return self
def captioned_image(
self,
src: str,
fullscreen: bool = False,
imageSize: str = "normal",
height: int = 819,
width: int = 1456,
resizeWidth: int = 728,
bytes: str = None,
alt: str = None,
title: str = None,
type: str = None,
href: str = None,
belowTheFold: bool = False,
internalRedirect: str = None,
):
"""
Add image to body.
Args:
bytes:
alt:
title:
type:
href:
belowTheFold:
internalRedirect:
src:
fullscreen:
imageSize:
height:
width:
resizeWidth:
"""
content = self.draft_body["content"][-1].get("content", [])
content += [
{
"type": "image2",
"attrs": {
"src": src,
"fullscreen": fullscreen,
"imageSize": imageSize,
"height": height,
"width": width,
"resizeWidth": resizeWidth,
"bytes": bytes,
"alt": alt,
"title": title,
"type": type,
"href": href,
"belowTheFold": belowTheFold,
"internalRedirect": internalRedirect,
},
}
]
self.draft_body["content"][-1]["content"] = content
return self
def text(self, value: str):
"""
Add text to the last paragraph.
Args:
value: Text to add to paragraph.
Returns:
"""
content = self.draft_body["content"][-1].get("content", [])
content += [{"type": "text", "text": value}]
self.draft_body["content"][-1]["content"] = content
return self
def add_complex_text(self, text):
"""
Args:
text:
"""
if isinstance(text, str):
self.text(text)
else:
for chunk in text:
if chunk:
self.text(chunk.get("content")).marks(chunk.get("marks", []))
def marks(self, marks):
"""
Args:
marks:
Returns:
"""
content = self.draft_body["content"][-1].get("content", [])[-1]
content_marks = content.get("marks", [])
for mark in marks:
new_mark = {"type": mark.get("type")}
if mark.get("type") == "link":
href = mark.get("href")
new_mark.update({"attrs": {"href": href}})
content_marks.append(new_mark)
content["marks"] = content_marks
return self
def remove_last_paragraph(self):
"""Remove last paragraph"""
del self.draft_body.get("content")[-1]
def get_draft(self):
"""
Returns:
"""
out = vars(self)
out["draft_body"] = json.dumps(out["draft_body"])
return out
def subscribe_with_caption(self, message: str = None):
"""
Add subscribe widget with caption
Args:
message:
Returns:
"""
if message is None:
message = """Thanks for reading this newsletter!
Subscribe for free to receive new posts and support my work."""
subscribe = self.draft_body["content"][-1]
subscribe["attrs"] = {
"url": "%%checkout_url%%",
"text": "Subscribe",
"language": "en",
}
subscribe["content"] = [
{
"type": "ctaCaption",
"content": [
{
"type": "text",
"text": message,
}
],
}
]
return self
def youtube(self, value: str):
"""
Add youtube video to post.
Args:
value: youtube url
Returns:
"""
content_attrs = self.draft_body["content"][-1].get("attrs", {})
content_attrs.update({"videoId": value})
self.draft_body["content"][-1]["attrs"] = content_attrs
return self

View File

@@ -1,10 +1,58 @@
#!/bin/bash #!/bin/sh
set -eu
log() { printf '%s %s\n' "[$(date -u +%FT%TZ)]" "$*"; }
stop() {
log "stopping..."
[ -n "${PID1-}" ] && kill -TERM "$PID1" 2>/dev/null || true
[ -n "${PID2-}" ] && kill -TERM "$PID2" 2>/dev/null || true
[ -n "${TPID-}" ] && kill -TERM "$TPID" 2>/dev/null || true
wait || true
exit 0
}
trap stop INT TERM
# Pull the latest changes cd /app
git fetch --all export GIT_TERMINAL_PROMPT=0
git reset --hard origin/main
# Run your Python script # MAJ forcée du code à chaque (re)démarrage
python Post_RSS_on_SubStack.py if [ -d .git ]; then
i=0
while [ $i -lt 5 ]; do
if git fetch --all --prune && git reset --hard origin/main; then
log "git updated to origin/main"
break
fi
i=$((i+1))
log "git update failed (attempt $i/5); retrying in 10s..."
sleep 10
done
[ $i -ge 5 ] && log "WARNING: git update failed after 5 attempts — continuing with current code"
else
log "WARNING: /app is not a git repo; skipping git update"
fi
# Dossiers logs
mkdir -p /var/log
: > /var/log/daily.log
: > /var/log/weekly.log
# Lancer les 2 bots (logs non bufferisés)
python -u post_rss_to_ghost.py > /var/log/daily.log 2>&1 & PID1=$!
python -u presquegratos.py > /var/log/weekly.log 2>&1 & PID2=$!
# Suivre les 2 fichiers de logs dans la sortie du conteneur
tail -F /var/log/daily.log /var/log/weekly.log &
TPID=$!
# Attente portable (pas de wait -n en /bin/sh)
while :; do
if ! kill -0 "$PID1" 2>/dev/null; then wait "$PID1" || true; break; fi
if ! kill -0 "$PID2" 2>/dev/null; then wait "$PID2" || true; break; fi
sleep 1
done
# Si un des scripts sort, on arrête le tail (le trap TERM arrêtera l'autre script)
kill -TERM "$TPID" 2>/dev/null || true
wait || true

23
xboxsyde.py Normal file
View File

@@ -0,0 +1,23 @@
import feedparser
import io
import html
import datetime
import requests
import time
url = r'https://www.xboxygen.com/spip.php?page=backend'
html_text = requests.get(url).text
news = feedparser.parse(html_text)
yesterday_6am = datetime.datetime.now(datetime.timezone.utc).replace(hour=6, minute=0, second=0, microsecond=0) - datetime.timedelta(days=1)
try:
new_posts = [entry for entry in news.entries if datetime.datetime.strptime(entry.published.replace('GMT', '+0000'), '%a, %d %b %Y %H:%M:%S %z') > yesterday_6am]
except:
new_posts = [entry for entry in news.entries if datetime.datetime.fromtimestamp(time.mktime(entry.updated_parsed)).replace(tzinfo=datetime.timezone.utc) > yesterday_6am]
#else if
#entry.updated.replace('GMT', '+0000'), '%a, %d %b %Y %H:%M:%S %z'
print(new_posts)