using ghost

This commit is contained in:
Gaël Honorez
2025-09-05 14:36:28 +02:00
parent 83869b0663
commit 34d91a3677
10 changed files with 309 additions and 1542 deletions

View File

@@ -1,272 +0,0 @@
import asyncio
import argparse
import requests
import feedparser
import io
import html
import datetime
import time
import logging
import os
import re
from logging.handlers import RotatingFileHandler
import random
from substack import Api
from substack.post import Post
LOG = logging.getLogger('bot')
LOG_PATTERN = logging.Formatter('%(asctime)s:%(levelname)s: [%(filename)s] %(message)s')
def setuplogger():
conf_filename = None
steam_handler = logging.StreamHandler()
steam_handler.setFormatter(LOG_PATTERN)
steam_handler.setLevel(logging.DEBUG)
def setup_logger(logger_name, file_name=None, add_steam=False):
file_name = file_name or logger_name
log_filename = f"{file_name}.log"
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
file_handler = RotatingFileHandler(log_filename, "a", 1000000, 1)
file_handler.setFormatter(LOG_PATTERN)
logger.addHandler(file_handler)
if add_steam:
logger.addHandler(steam_handler)
setup_logger("bot", conf_filename, True)
class RSSfeed():
def __init__(self, url, yt=False):
self.url = url
self.youtube = yt
class SubStackTask:
def __init__(self, login, password, cookies_path, account, feeds):
self.api = Api(
email=login,
password=password,
cookies_path=cookies_path,
publication_url=account,
)
self.user_id = self.api.get_user_id()
self.feeds = feeds
for feed in self.feeds:
LOG.info("Adding feed " + feed.url)
def format_duration(self, seconds):
days, seconds = divmod(seconds, 86400)
hours, seconds = divmod(seconds, 3600)
minutes, seconds = divmod(seconds, 60)
days = round(days)
hours = round(hours)
minutes = round(minutes)
seconds = round(seconds)
parts = []
if days > 0:
parts.append(f"{days} days")
if hours > 0:
parts.append(f"{hours} hours")
if minutes > 0:
parts.append(f"{minutes} minutes")
if seconds > 0:
parts.append(f"{seconds} seconds")
return ', '.join(parts) if parts else '0 seconds'
def get_fr_date(self):
# Mapping of English month names to French
months_en_to_fr = {
'January': 'Janvier', 'February': 'Février', 'March': 'Mars',
'April': 'Avril', 'May': 'Mai', 'June': 'Juin',
'July': 'Juillet', 'August': 'Août', 'September': 'Septembre',
'October': 'Octobre', 'November': 'Novembre', 'December': 'Décembre'
}
today = datetime.datetime.now()
formatted_date = today.strftime("%d %B %Y")
# Replace the English month with the French month
for en, fr in months_en_to_fr.items():
formatted_date = formatted_date.replace(en, fr)
return formatted_date
async def run_daily_at_6_am(self):
while True:
now = datetime.datetime.now()
# Calculate the time until 6 AM next day
next_run = (now + datetime.timedelta(days=1)).replace(hour=6, minute=5, second=0, microsecond=0)
sleep_seconds = (next_run - now).total_seconds()
while sleep_seconds > 0:
# Check if the remaining time is a multiple of 3600 seconds
formatted_duration = self.format_duration(sleep_seconds)
LOG.info(f"Waiting for {formatted_duration} for next scan")
# Wait for some time before checking again
await asyncio.sleep(min(sleep_seconds, 5 * 60))
# Recalculate the remaining sleep time
now = datetime.datetime.now()
sleep_seconds = (next_run - now).total_seconds()
LOG.info("Going to run the daily task")
# Run the daily task
await self.daily_task()
async def daily_task(self):
title_post = "Les news du " + self.get_fr_date()
LOG.info("Running daily task : " + str(title_post))
ff = r'/data/feeds.txt'
if os.path.isfile(ff) is False:
ff = r'x:\substack\feeds.txt'
self.feeds = []
with open(ff) as file:
lines = [line.rstrip() for line in file]
for line in lines:
youtube = "youtube" in line
self.feeds.append(RSSfeed(line, youtube))
sub_stack_post = Post(
title=title_post,
subtitle="",
user_id=self.user_id
)
midnight_today = datetime.datetime.now(datetime.timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
yesterday_6am = datetime.datetime.now(datetime.timezone.utc).replace(hour=6, minute=0, second=0, microsecond=0) - datetime.timedelta(days=1)
formatted_date = midnight_today.strftime('%a, %d %b %Y %H:%M:%S %z')
all_news_posts = []
for feed in self.feeds:
LOG.info("Scanning feed " + feed.url)
html_text = requests.get(feed.url).text
newsFeed = feedparser.parse(html_text)
if feed.youtube is True:
new_posts = [entry for entry in newsFeed.entries if datetime.datetime.fromisoformat(entry.published) > yesterday_6am]
else:
try:
new_posts = [entry for entry in newsFeed.entries if datetime.datetime.strptime(entry.published.replace('GMT', '+0000'), '%a, %d %b %Y %H:%M:%S %z') > yesterday_6am]
except:
new_posts = [entry for entry in newsFeed.entries if datetime.datetime.fromtimestamp(time.mktime(entry.updated_parsed)).replace(tzinfo=datetime.timezone.utc) > yesterday_6am]
all_news_posts.extend(new_posts)
random.shuffle(all_news_posts)
for post in all_news_posts:
linkURL = post["link"]
title = post["title"]
ftext = ""
if "actugaming" in linkURL:
if "puzzle-" in linkURL or "guide-" in linkURL:
continue
LOG.info("Posting " + str(title))
if "summary" in post:
ftext = html.unescape(post["summary"])
# Using regular expressions to remove HTML tags
ftext = re.sub('<[^<]+?>', '', ftext)
pattern = r"Larticle .* est apparu en premier sur .*"
ftext = re.sub(pattern, '', ftext)
if "yt_videoid" in post:
sub_stack_post.add({"type":"heading", "level":3, "content": title})
videoId = post["yt_videoid"]
sub_stack_post.add({"type":"youtube2", "src": videoId })
sub_stack_post.add({'type': 'paragraph', 'content': [
{'content': linkURL, 'marks': [{'type': "link", 'href': linkURL}]}]})
else:
if ftext != "":
sub_stack_post.add({"type":"heading", "level":3, "content": title})
sub_stack_post.add({"type":"paragraph", "content": ftext })
sub_stack_post.add({'type': 'paragraph', 'content': [
{'content': linkURL, 'marks': [{'type': "link", 'href': linkURL}]}]})
if "links" in post:
for link in post["links"]:
if link["type"] == "image/jpg":
imgUrl = link["href"]
sub_stack_post.add({'type': 'captionedImage', 'src': imgUrl})
sub_stack_post.add({"type":"horizontal_rule"})
sub_stack_post.add({"type":"heading", "level":3, "content": "Sources"})
for feed in self.feeds:
sub_stack_post.add({'type': 'paragraph', 'content': [
{'content': feed.url, 'marks': [{'type': "link", 'href': feed.url}]}]})
sub_stack_post.add({"type":"subscribeWidget", "message":"Abonnez-vous gratuitement pour recevoir chaque jour les news dans votre e-mail et soutenir mon travail."})
draft = self.api.post_draft(sub_stack_post.get_draft())
self.api.prepublish_draft(draft.get("id"))
self.api.publish_draft(draft.get("id"))
async def main(login, password, account):
setuplogger()
if os.path.exists("last_scan_date.txt"):
with open("last_scan_date.txt", "r") as f:
last_post_date = datetime.datetime.strptime(f.read().strip(), '%a, %d %b %Y %H:%M:%S %z')
else:
last_post_date = datetime.datetime.min.replace(tzinfo=datetime.timezone.utc)
feeds = []
ff = r'/data/feeds.txt'
if os.path.isfile(ff) is False:
ff = r'x:\substack\feeds.txt'
cookies_path = r'/data/cookies.json'
if os.path.isfile(cookies_path) is False:
cookies_path = r'x:\substack\cookies.json'
with open(ff) as file:
lines = [line.rstrip() for line in file]
for line in lines:
youtube = "youtube" in line
feeds.append(RSSfeed(line, youtube))
task = SubStackTask(login, password, cookies_path, account, feeds)
LOG.info("Starting bot")
await task.run_daily_at_6_am()
#await task.daily_task()
if __name__ == "__main__":
asyncio.run(main("gael.honorez@gmail.com", "f3PaTGedjFc2gkr1ypi5", "https://aggregateurjvfr.substack.com"))

View File

@@ -1,114 +0,0 @@
import os.path
import base64
import imaplib
import email
from email.header import decode_header
import google.auth
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from bs4 import BeautifulSoup # Import BeautifulSoup for parsing HTML
from datetime import datetime
# If modifying these SCOPES, delete the file token.json.
SCOPES = ['https://mail.google.com/']
def authenticate_gmail():
"""Shows basic usage of the Gmail API.
Lists the user's Gmail labels.
"""
creds = None
# The file token.json stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
token = r'/data/token.json'
if os.path.isfile(token) is False:
token = r'x:\substack\token.json'
cred = r'/data/client_secret_396578640529-o4dsukvomuo43j5d4j0bogg17e3e8l7f.apps.googleusercontent.com.json'
if os.path.isfile(cred) is False:
cred = r'x:\substack\client_secret_396578640529-o4dsukvomuo43j5d4j0bogg17e3e8l7f.apps.googleusercontent.com.json'
if os.path.exists(token):
creds = Credentials.from_authorized_user_file(token, SCOPES)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(cred, SCOPES)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open(token, 'w') as token:
token.write(creds.to_json())
return creds
def generate_oauth2_string(username, access_token):
return f"user={username}\1auth=Bearer {access_token}\1\1"
def decode_mime_words(s):
return ''.join(
word.decode(encoding or 'utf-8') if isinstance(word, bytes) else word
for word, encoding in decode_header(s)
)
def get_verification_link(email_user, sender_email, start_time):
creds = authenticate_gmail()
auth_string = generate_oauth2_string(email_user, creds.token)
mail = imaplib.IMAP4_SSL("imap.gmail.com")
#mail.debug = 4 # Enable IMAP debug output for more detailed logs
try:
mail.authenticate('XOAUTH2', lambda x: auth_string)
except imaplib.IMAP4.error as e:
print(f"IMAP authentication error: {e}")
return None
mail.select("inbox")
result, data = mail.search(None, f'(FROM "{sender_email}" SUBJECT "Finish signing in to Substack")')
mail_ids = data[0]
id_list = mail_ids.split()
for num in reversed(id_list): # Check the most recent emails first
result, data = mail.fetch(num, "(RFC822)")
raw_email = data[0][1]
msg = email.message_from_bytes(raw_email)
# Decode and print the email subject
subject = decode_mime_words(msg["Subject"])
# Get email date
email_date_tuple = email.utils.parsedate_tz(msg["Date"])
email_timestamp = email.utils.mktime_tz(email_date_tuple)
print(subject, start_time, email_timestamp)
if abs(email_timestamp - start_time) > 12 * 3600:
continue
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
body = part.get_payload(decode=True).decode()
for line in body.split("\n"):
if "http" in line:
return line.strip()
else:
print("-----")
body = msg.get_payload(decode=True).decode()
soup = BeautifulSoup(body, 'html.parser')
link = soup.find('a', href=True, text="Connectez-vous dès maintenant")
if link:
return link['href']
return None
if __name__ == "__main__":
email_user = "gael.honorez@gmail.com"
sender_email = "no-reply@substack.com"
verification_link = get_verification_link(email_user, sender_email)
if verification_link:
print("Verification link found:", verification_link)
else:
print("No verification link found.")

View File

@@ -1,53 +0,0 @@
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import pickle
def save_cookies(driver, path):
with open(path, 'wb') as file:
pickle.dump(driver.get_cookies(), file)
def generate_cookies(email, password):
cookie_path = r'x:\substack\cookies.pkl'
chrome_options = Options()
driver = webdriver.Chrome(options=chrome_options)
driver.get('https://substack.com/sign-in')
wait = WebDriverWait(driver, 10)
try:
login_with_password_button = wait.until(
EC.element_to_be_clickable((By.LINK_TEXT, 'Sign in with password'))
)
login_with_password_button.click()
time.sleep(2)
email_field = driver.find_element(By.NAME, 'email')
email_field.send_keys(email)
password_field = driver.find_element(By.NAME, 'password')
password_field.send_keys(password)
password_field.send_keys(Keys.RETURN)
save_cookies(driver, cookie_path)
print("Cookies saved successfully.")
except Exception as e:
print("An error occurred during login.", e)
driver.quit()
if __name__ == "__main__":
email = "gael.honorez@gmail.com"
password = "f3PaTGedjFc2gkr1ypi5"
generate_cookies(email, password)

305
post_rss_to_ghost.py Normal file
View File

@@ -0,0 +1,305 @@
import asyncio
import argparse
import datetime
import html
import io
import logging
import os
import random
import re
import time
from logging.handlers import RotatingFileHandler
from typing import Optional
import feedparser
import requests
import jwt
LOG = logging.getLogger("bot")
LOG_PATTERN = logging.Formatter("%(asctime)s:%(levelname)s: [%(filename)s] %(message)s")
def setuplogger():
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(LOG_PATTERN)
stream_handler.setLevel(logging.DEBUG)
file_handler = RotatingFileHandler("bot.log", "a", 1000000, 1)
file_handler.setFormatter(LOG_PATTERN)
LOG.setLevel(logging.DEBUG)
LOG.addHandler(stream_handler)
LOG.addHandler(file_handler)
class RSSfeed:
def __init__(self, url, yt=False):
self.url = url
self.youtube = yt
# ---------- Ghost Admin API client ----------
class GhostAdmin:
def __init__(self, admin_url: str, admin_key: str, accept_version: str = "v6.0"):
self.base = admin_url.rstrip("/") + "/"
self.key_id, self.key_secret_hex = admin_key.split(":")
self.accept_version = accept_version
def _jwt(self) -> str:
iat = int(time.time())
payload = {"iat": iat, "exp": iat + 5 * 60, "aud": "/admin/"}
headers = {"alg": "HS256", "typ": "JWT", "kid": self.key_id}
token = jwt.encode(payload, bytes.fromhex(self.key_secret_hex), algorithm="HS256", headers=headers)
return token if isinstance(token, str) else token.decode("utf-8")
def _headers(self):
return {
"Authorization": f"Ghost {self._jwt()}",
"Accept-Version": self.accept_version,
"Content-Type": "application/json",
}
def get_newsletters(self):
url = self.base + "newsletters/"
resp = requests.get(url, headers=self._headers(), timeout=20)
if resp.status_code >= 400:
raise RuntimeError(f"Ghost newsletters error {resp.status_code}: {resp.text}")
return resp.json().get("newsletters", [])
def pick_newsletter_slug(self, preferred_slug: Optional[str]) -> str:
if preferred_slug:
return preferred_slug
# Fallback: choose the first active newsletter (favor default if present)
nls = self.get_newsletters()
if not nls:
raise RuntimeError("No newsletters configured in Ghost (Settings → Newsletters).")
# try 'status=active' first
actives = [n for n in nls if n.get("status") == "active"]
# prefer default one if flagged
for n in actives:
if n.get("is_default"):
return n.get("slug")
return (actives or nls)[0].get("slug")
def create_post_html(self, title: str, html_content: str, status: str = "draft", feature_image: Optional[str] = None):
"""
Create a post with HTML source; optionally set feature_image (absolute URL).
"""
url = self.base + "posts/?source=html"
post = {"title": title, "html": html_content, "status": status}
if feature_image:
post["feature_image"] = feature_image
resp = requests.post(url, headers=self._headers(), json={"posts": [post]}, timeout=30)
if resp.status_code >= 400:
raise RuntimeError(f"Ghost create error {resp.status_code}: {resp.text}")
return resp.json()["posts"][0]
def publish_post(self, post_id: str, updated_at: str, newsletter_slug: Optional[str], email_segment: Optional[str]):
"""
Publish + (if newsletter provided) send email to the chosen audience.
"""
slug = self.pick_newsletter_slug(newsletter_slug)
params = [f"newsletter={requests.utils.quote(slug)}"]
if email_segment:
params.append(f"email_segment={requests.utils.quote(email_segment)}")
url = self.base + f"posts/{post_id}/?{'&'.join(params)}"
body = {"posts": [{"updated_at": updated_at, "status": "published"}]}
resp = requests.put(url, headers=self._headers(), json=body, timeout=30)
if resp.status_code >= 400:
raise RuntimeError(f"Ghost publish error {resp.status_code}: {resp.text}")
return resp.json()["posts"][0]
# ---------- Your task logic (ported from Substack) ----------
class GhostTask:
def __init__(self, feeds, admin_url, admin_key, newsletter_slug=None, email_segment=None):
self.ghost = GhostAdmin(admin_url, admin_key)
self.feeds = feeds
self.newsletter_slug = newsletter_slug
self.email_segment = email_segment
for feed in self.feeds:
LOG.info("Adding feed " + feed.url)
def get_fr_date(self):
import datetime
months = {
'January':'Janvier','February':'Février','March':'Mars','April':'Avril','May':'Mai','June':'Juin',
'July':'Juillet','August':'Août','September':'Septembre','October':'Octobre','November':'Novembre','December':'Décembre'
}
today = datetime.datetime.now()
formatted = today.strftime("%d %B %Y")
for en, fr in months.items():
formatted = formatted.replace(en, fr)
return formatted
def _build_html_roundup(self, items, feeds):
"""
Build HTML and capture the first encountered image URL (for feature_image).
Returns (html_string, first_image_url_or_None).
"""
parts = []
parts.append(f"<h2>Les news du {self.get_fr_date()}</h2>")
first_image: Optional[str] = None
for post in items:
title = post.get("title", "")
linkURL = post.get("link", "")
parts.append(f'<hr><h3>{html.escape(title)}</h3>')
if "yt_videoid" in post:
videoId = post["yt_videoid"]
parts.append(f'<p>https://www.youtube.com/watch?v={videoId}</p>')
parts.append(f'<p><a href="{html.escape(linkURL)}">{html.escape(linkURL)}</a></p>')
else:
ftext = ""
if "summary" in post:
ftext = html.unescape(post["summary"])
ftext = re.sub("<[^<]+?>", "", ftext)
ftext = re.sub(r"Larticle .* est apparu en premier sur .*", "", ftext)
if ftext:
parts.append(f"<p>{html.escape(ftext)}</p>")
if linkURL:
parts.append(f'<p><a href="{html.escape(linkURL)}">{html.escape(linkURL)}</a></p>')
# Attach images in the body; remember the first one for feature_image
if "links" in post:
for link in post["links"]:
if link.get("type") in ("image/jpg","image/jpeg","image/png","image/webp"):
imgUrl = link.get("href")
if imgUrl:
if not first_image:
first_image = imgUrl
parts.append(f'<figure><img src="{html.escape(imgUrl)}" loading="lazy"></figure>')
# Sources
parts.append("<hr><h3>Sources</h3>")
for feed in feeds:
parts.append(f'<p><a href="{html.escape(feed.url)}">{html.escape(feed.url)}</a></p>')
parts.append('<p><em>Abonnez-vous pour recevoir chaque jour les news et soutenir mon travail.</em></p>')
return "\n".join(parts), first_image
def format_duration(self, seconds):
days, seconds = divmod(seconds, 86400)
hours, seconds = divmod(seconds, 3600)
minutes, seconds = divmod(seconds, 60)
parts = []
if days > 0: parts.append(f"{days} days")
if hours > 0: parts.append(f"{hours} hours")
if minutes > 0: parts.append(f"{minutes} minutes")
if seconds > 0: parts.append(f"{seconds} seconds")
return ", ".join(parts) if parts else "0 seconds"
async def run_daily_at_6_am(self):
while True:
now = datetime.datetime.now()
next_run = (now + datetime.timedelta(days=1)).replace(hour=6, minute=5, second=0, microsecond=0)
sleep_seconds = (next_run - now).total_seconds()
while sleep_seconds > 0:
LOG.info(f"Waiting for {self.format_duration(sleep_seconds)} for next scan")
await asyncio.sleep(min(sleep_seconds, 5 * 60))
now = datetime.datetime.now()
sleep_seconds = (next_run - now).total_seconds()
LOG.info("Going to run the daily task")
await self.daily_task()
async def daily_task(self):
nls = self.ghost.get_newsletters()
print("Newsletters:")
for n in nls:
print(f"- title={n.get('name')} slug={n.get('slug')} status={n.get('status')} default={n.get('is_default')}")
title_post = "Les news du " + self.get_fr_date()
LOG.info("Running daily task : " + str(title_post))
# Re-read feeds (unchanged)
feeds_file = os.environ.get("FEEDS_FILE", "/data/feeds.txt")
if not os.path.isfile(feeds_file):
feeds_file = os.environ.get("FEEDS_FILE_FALLBACK", "x:\\substack\\feeds.txt")
self.feeds = []
with open(feeds_file) as f:
lines = [line.strip() for line in f if line.strip()]
for line in lines:
self.feeds.append(RSSfeed(line, "youtube" in line))
yesterday_6am = datetime.datetime.now(datetime.timezone.utc).replace(
hour=6, minute=0, second=0, microsecond=0
) - datetime.timedelta(days=1)
all_news_posts = []
for feed in self.feeds:
LOG.info("Scanning feed " + feed.url)
html_text = requests.get(feed.url, timeout=30).text
newsFeed = feedparser.parse(html_text)
if feed.youtube:
new_posts = [e for e in newsFeed.entries if datetime.datetime.fromisoformat(e.published) > yesterday_6am]
else:
try:
new_posts = [e for e in newsFeed.entries
if datetime.datetime.strptime(e.published.replace('GMT', '+0000'),
'%a, %d %b %Y %H:%M:%S %z') > yesterday_6am]
except Exception:
new_posts = [e for e in newsFeed.entries
if datetime.datetime.fromtimestamp(time.mktime(e.updated_parsed)).replace(
tzinfo=datetime.timezone.utc) > yesterday_6am]
filtered = []
for e in new_posts:
linkURL = e.get("link", "")
if "actugaming" in linkURL and ("puzzle-" in linkURL or "guide-" in linkURL):
continue
filtered.append(e)
all_news_posts.extend(filtered)
random.shuffle(all_news_posts)
roundup_html, feature_image = self._build_html_roundup(all_news_posts, self.feeds)
# 1) Create as draft WITH feature_image if we found one
created = self.ghost.create_post_html(title_post, roundup_html, status="draft", feature_image=feature_image)
# 2) Publish AND SEND EMAIL (always)
published = self.ghost.publish_post(
post_id=created["id"],
updated_at=created["updated_at"],
newsletter_slug=os.environ.get("GHOST_NEWSLETTER_SLUG"), # may be None -> auto-pick
email_segment=os.environ.get("GHOST_EMAIL_SEGMENT"), # may be None -> send to all
)
LOG.info(f"Published post: {published.get('url')} (emailed via newsletter)")
def debug_list_newsletters(admin_url, admin_key):
g = GhostAdmin(admin_url, admin_key)
nls = g.get_newsletters()
print("Newsletters:")
for n in nls:
print(f"- title={n.get('name')} slug={n.get('slug')} status={n.get('status')} default={n.get('is_default')}")
# ---------------- main ----------------
async def main():
setuplogger()
# Feeds initial pass (kept for parity with your original script)
feeds = []
feeds_file = os.environ.get("FEEDS_FILE", "/data/feeds.txt")
if not os.path.isfile(feeds_file):
feeds_file = os.environ.get("FEEDS_FILE_FALLBACK", r"c:\workspace\Substack_JV\feeds.txt")
with open(feeds_file) as f:
lines = [line.strip() for line in f if line.strip()]
for line in lines:
feeds.append(RSSfeed(line, "youtube" in line))
admin_url = os.environ["GHOST_ADMIN_URL"]
admin_key = os.environ["GHOST_ADMIN_KEY"]
task = GhostTask(
feeds=feeds,
admin_url=admin_url,
admin_key=admin_key,
newsletter_slug=os.environ.get("GHOST_NEWSLETTER_SLUG"),
email_segment=os.environ.get("GHOST_EMAIL_SEGMENT"),
)
LOG.info("Starting bot")
await task.run_daily_at_6_am()
# Or just run once:
#await task.daily_task()
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -1,6 +1,4 @@
requests
feedparser
google-auth
google-auth-oauthlib
google-auth-httplib2
beautifulsoup4
PyJWT>=2.7,<3
requests>=2.31
feedparser>=6.0

View File

@@ -1,11 +0,0 @@
"""A library that provides a Python interface to the Substack API."""
__author__ = "Paolo Mazza"
__email__ = "mazzapaolo2019@gmail.com"
__license__ = "MIT License"
__version__ = "1.0"
__url__ = "https://github.com/ma2za/python-substack"
__download_url__ = "https://pypi.python.org/pypi/python-substack"
__description__ = "A Python wrapper around the Substack API"
from .api import Api

View File

@@ -1,723 +0,0 @@
"""
API Wrapper
"""
import base64
import json
import logging
import os
from urllib.parse import urljoin
import requests
import pickle
import time
from substack.exceptions import SubstackAPIException, SubstackRequestException
from checkemail import get_verification_link
from datetime import datetime
logger = logging.getLogger(__name__)
__all__ = ["Api"]
class Api:
"""
A python interface into the Substack API
"""
def __init__(
self,
email=None,
password=None,
cookies_path=None,
base_url=None,
publication_url=None,
debug=False,
):
"""
To create an instance of the substack.Api class:
>>> import substack
>>> api = substack.Api(email="substack email", password="substack password")
Args:
email:
password:
cookies_path
To re-use your session without logging in each time, you can save your cookies to a json file and
then load them in the next session.
Make sure to re-save your cookies, as they do update over time.
base_url:
The base URL to use to contact the Substack API.
Defaults to https://substack.com/api/v1.
"""
self.base_url = base_url or "https://substack.com/api/v1"
self.email = email
self.password = password
if debug:
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
self._session = requests.Session()
# Load cookies from file if provided
if cookies_path is not None:
if os.path.exists(cookies_path):
with open(cookies_path) as f:
cookies = json.load(f)
self._session.cookies.update(cookies)
if not os.path.exists(cookies_path) or self.are_cookies_expired(cookies):
print("Cookies are expired. Sending magic link and waiting for verification.")
start_time = time.time() # Record the time when the magic link is sent
self.send_magic_link(email)
verification_link = self.wait_for_verification_link(start_time)
if verification_link:
self.login_v2(email, password, verification_link)
self.export_cookies(cookies_path)
else:
raise Exception("Failed to get the verification link.")
elif email is not None and password is not None:
self.login(email, password)
else:
raise ValueError("Must provide email and password or cookies_path to authenticate.")
user_publication = None
# if the user provided a publication url, then use that
if publication_url:
import re
# Regular expression to extract subdomain name
match = re.search(r"https://(.*).substack.com", publication_url.lower())
subdomain = match.group(1) if match else None
user_publications = self.get_user_publications()
# search through publications to find the publication with the matching subdomain
for publication in user_publications:
if publication["subdomain"] == subdomain:
# set the current publication to the users publication
user_publication = publication
break
else:
# get the users primary publication
user_publication = self.get_user_primary_publication()
# set the current publication to the users primary publication
self.change_publication(user_publication)
def are_cookies_expired(self, cookies):
for cookie in cookies:
if 'expiry' in cookie and cookie['expiry'] < time.time():
return True
return False
def wait_for_verification_link(self, start_time):
sender_email = "no-reply@substack.com"
while True:
verification_link = get_verification_link(self.email, sender_email, start_time)
if verification_link:
return verification_link
time.sleep(10) # Wait for X seconds before checking again
def send_magic_link(self, email):
body = {
"email": email,
"redirect": "/",
"for_pub": "",
}
endpoint = f"https://substack.com/api/v1/email-login/"
response = self._session.post(endpoint, json=body)
print("Magic link sent!")
def login_v2(self, email, password, magic_link):
return self._session.get(magic_link)
def login(self, email, password) -> dict:
"""
Login to the substack account.
Args:
email: substack account email
password: substack account password
"""
response = self._session.post(
f"{self.base_url}/login",
json={
"captcha_response": None,
"email": email,
"password": password,
},
)
return self._handle_response(response=response)
def signin_for_pub(self, publication):
"""
Complete the signin process
"""
response = self._session.get(
f"https://substack.com/sign-in?redirect=%2F&for_pub={publication['subdomain']}",
)
try:
output = self._handle_response(response=response)
except SubstackRequestException as ex:
output = {}
return output
def change_publication(self, publication):
"""
Change the publication URL
"""
self.publication_url = urljoin(publication["publication_url"], "api/v1")
# sign-in to the publication
self.signin_for_pub(publication)
def export_cookies(self, path: str = "cookies.json"):
"""
Export cookies to a json file.
Args:
path: path to the json file
"""
cookies = self._session.cookies.get_dict()
with open(path, "w") as f:
json.dump(cookies, f)
def save_cookies(self, driver, path):
with open(path, 'wb') as file:
pickle.dump(driver.get_cookies(), file)
def load_cookies(self, driver, path):
with open(path, 'rb') as file:
cookies = pickle.load(file)
for cookie in cookies:
driver.add_cookie(cookie)
def login_with_selenium(self):
"""
Login using Selenium to solve CAPTCHA manually.
"""
# Start virtual display
cookie_path = 'cookies.pkl'
if os.path.exists(cookie_path):
try:
# Load cookies directly to session if they exist
with open(cookie_path, 'rb') as file:
cookies = pickle.load(file)
cookies_valid = True
for cookie in cookies:
if 'expiry' in cookie:
expiry_date = datetime.datetime.fromtimestamp(cookie['expiry'])
print(f"Cookie {cookie['name']} expires on {expiry_date}")
if cookie['expiry'] < time.time():
cookies_valid = False
print(f"Cookie {cookie['name']} has expired.")
break
if cookies_valid:
# Load cookies into session
for cookie in cookies:
self._session.cookies.set(cookie['name'], cookie['value'])
print("Cookies loaded successfully. Skipping login.")
return
except Exception as e:
print("Error loading cookies, proceeding with Selenium login.", e)
#display = Display()
#display.start()
print("Login with selenium")
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
driver = webdriver.Chrome()
driver.get('https://substack.com/sign-in')
# Check if already logged in by checking the presence of a user-specific element
wait = WebDriverWait(driver, 10)
try:
# Adjust the selector to match an element that is present only when logged in
dashboard_button = wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'button[data-href*="publish/home?utm_source=menu"]'))
)
print("Already logged in.")
except Exception:
print("Not logged in. Proceeding with login steps.")
try:
login_with_password_button = wait.until(EC.element_to_be_clickable((By.LINK_TEXT, 'Log in with password')))
login_with_password_button.click()
time.sleep(2) # Wait for the transition to the login form
# Fill in the email and password fields
email_field = driver.find_element(By.NAME, 'email')
email_field.send_keys(self.email)
password_field = driver.find_element(By.NAME, 'password')
password_field.send_keys(self.password)
# Submit the form
password_field.send_keys(Keys.RETURN)
print("Please solve the CAPTCHA manually in the opened browser.")
input("Press Enter after solving the CAPTCHA...")
# Save cookies after solving the CAPTCHA
self.save_cookies(driver, cookie_path)
print("Cookies saved successfully.")
except Exception as e:
print("An error occurred during login.", e)
# Save cookies after login or cookie load
self.save_cookies(driver, cookie_path)
# Extract cookies to use with requests
cookies = driver.get_cookies()
for cookie in cookies:
self._session.cookies.set(cookie['name'], cookie['value'])
# Close the browser
driver.quit()
def _handle_response(self, response: requests.Response):
"""
Internal helper for handling API responses from the Substack server.
Raises the appropriate exceptions when necessary; otherwise, returns the
response.
"""
if (response.status_code == 401):
print("CAPTCHA detected, switching to Selenium for manual solving.")
return
if not (200 <= response.status_code < 300):
raise SubstackAPIException(response.status_code, response.text)
try:
return response.json()
except ValueError:
raise SubstackRequestException("Invalid Response: %s" % response.text)
def get_user_id(self):
"""
Returns:
"""
profile = self.get_user_profile()
user_id = profile["id"]
return user_id
@staticmethod
def get_publication_url(publication: dict) -> str:
"""
Gets the publication url
Args:
publication:
"""
custom_domain = publication["custom_domain"]
if not custom_domain:
publication_url = f"https://{publication['subdomain']}.substack.com"
else:
publication_url = f"https://{custom_domain}"
return publication_url
def get_user_primary_publication(self):
"""
Gets the users primary publication
"""
profile = self.get_user_profile()
primary_publication = profile["primaryPublication"]
primary_publication["publication_url"] = self.get_publication_url(
primary_publication
)
return primary_publication
def get_user_publications(self):
"""
Gets the users publications
"""
profile = self.get_user_profile()
# Loop through users "publicationUsers" list, and return a list
# of dictionaries of "name", and "subdomain", and "id"
user_publications = []
for publication in profile["publicationUsers"]:
pub = publication["publication"]
pub["publication_url"] = self.get_publication_url(pub)
user_publications.append(pub)
return user_publications
def get_user_profile(self):
"""
Gets the users profile
"""
response = self._session.get(f"{self.base_url}/user/profile/self")
return self._handle_response(response=response)
def get_user_settings(self):
"""
Get list of users.
Returns:
"""
response = self._session.get(f"{self.base_url}/settings")
return self._handle_response(response=response)
def get_publication_users(self):
"""
Get list of users.
Returns:
"""
response = self._session.get(f"{self.publication_url}/publication/users")
return self._handle_response(response=response)
def get_publication_subscriber_count(self):
"""
Get subscriber count.
Returns:
"""
response = self._session.get(
f"{self.publication_url}/publication_launch_checklist"
)
return self._handle_response(response=response)["subscriberCount"]
def get_published_posts(
self, offset=0, limit=25, order_by="post_date", order_direction="desc"
):
"""
Get list of published posts for the publication.
"""
response = self._session.get(
f"{self.publication_url}/post_management/published",
params={
"offset": offset,
"limit": limit,
"order_by": order_by,
"order_direction": order_direction,
},
)
return self._handle_response(response=response)
def get_posts(self) -> dict:
"""
Returns:
"""
response = self._session.get(f"{self.base_url}/reader/posts")
return self._handle_response(response=response)
def get_drafts(self, filter=None, offset=None, limit=None):
"""
Args:
filter:
offset:
limit:
Returns:
"""
response = self._session.get(
f"{self.publication_url}/drafts",
params={"filter": filter, "offset": offset, "limit": limit},
)
return self._handle_response(response=response)
def get_draft(self, draft_id):
"""
Gets a draft given it's id.
"""
response = self._session.get(f"{self.publication_url}/drafts/{draft_id}")
return self._handle_response(response=response)
def delete_draft(self, draft_id):
"""
Args:
draft_id:
Returns:
"""
response = self._session.delete(f"{self.publication_url}/drafts/{draft_id}")
return self._handle_response(response=response)
def post_draft(self, body) -> dict:
"""
Args:
body:
Returns:
"""
response = self._session.post(f"{self.publication_url}/drafts", json=body)
return self._handle_response(response=response)
def put_draft(self, draft, **kwargs) -> dict:
"""
Args:
draft:
**kwargs:
Returns:
"""
response = self._session.put(
f"{self.publication_url}/drafts/{draft}",
json=kwargs,
)
return self._handle_response(response=response)
def prepublish_draft(self, draft) -> dict:
"""
Args:
draft: draft id
Returns:
"""
response = self._session.get(
f"{self.publication_url}/drafts/{draft}/prepublish"
)
return self._handle_response(response=response)
def publish_draft(
self, draft, send: bool = True, share_automatically: bool = False
) -> dict:
"""
Args:
draft: draft id
send:
share_automatically:
Returns:
"""
response = self._session.post(
f"{self.publication_url}/drafts/{draft}/publish",
json={"send": send, "share_automatically": share_automatically},
)
return self._handle_response(response=response)
def schedule_draft(self, draft, draft_datetime: datetime) -> dict:
"""
Args:
draft: draft id
draft_datetime: datetime to schedule the draft
Returns:
"""
response = self._session.post(
f"{self.publication_url}/drafts/{draft}/schedule",
json={"post_date": draft_datetime.isoformat()},
)
return self._handle_response(response=response)
def unschedule_draft(self, draft) -> dict:
"""
Args:
draft: draft id
Returns:
"""
response = self._session.post(
f"{self.publication_url}/drafts/{draft}/schedule", json={"post_date": None}
)
return self._handle_response(response=response)
def get_image(self, image: str):
"""
This method generates a new substack link that contains the image.
Args:
image: filepath or original url of image.
Returns:
"""
if os.path.exists(image):
with open(image, "rb") as file:
image = b"data:image/jpeg;base64," + base64.b64encode(file.read())
response = self._session.post(
f"{self.publication_url}/image",
data={"image": image},
)
return self._handle_response(response=response)
def get_categories(self):
"""
Retrieve list of all available categories.
Returns:
"""
response = self._session.get(f"{self.base_url}/categories")
return self._handle_response(response=response)
def get_category(self, category_id, category_type, page):
"""
Args:
category_id:
category_type:
page:
Returns:
"""
response = self._session.get(
f"{self.base_url}/category/public/{category_id}/{category_type}",
params={"page": page},
)
return self._handle_response(response=response)
def get_single_category(self, category_id, category_type, page=None, limit=None):
"""
Args:
category_id:
category_type: paid or all
page: by default substack retrieves only the first 25 publications in the category. If this is left None,
then all pages will be retrieved. The page size is 25 publications.
limit:
Returns:
"""
if page is not None:
output = self.get_category(category_id, category_type, page)
else:
publications = []
page = 0
while True:
page_output = self.get_category(category_id, category_type, page)
publications.extend(page_output.get("publications", []))
if (
limit is not None and limit <= len(publications)
) or not page_output.get("more", False):
publications = publications[:limit]
break
page += 1
output = {
"publications": publications,
"more": page_output.get("more", False),
}
return output
def delete_all_drafts(self):
"""
Returns:
"""
response = None
while True:
drafts = self.get_drafts(filter="draft", limit=10, offset=0)
if len(drafts) == 0:
break
for draft in drafts:
response = self.delete_draft(draft.get("id"))
return response
def get_sections(self):
"""
Get a list of the sections of your publication.
TODO: this is hacky but I cannot find another place where to get the sections.
Returns:
"""
response = self._session.get(
f"{self.publication_url}/subscriptions",
)
content = Api._handle_response(response=response)
sections = [
p.get("sections")
for p in content.get("publications")
if p.get("hostname") in self.publication_url
]
return sections[0]
def publication_embed(self, url):
"""
Args:
url:
Returns:
"""
return self.call("/publication/embed", "GET", url=url)
def call(self, endpoint, method, **params):
"""
Args:
endpoint:
method:
**params:
Returns:
"""
response = self._session.request(
method=method,
url=f"{self.publication_url}/{endpoint}",
params=params,
)
return self._handle_response(response=response)

View File

@@ -1,32 +0,0 @@
import json
class SubstackAPIException(Exception):
def __init__(self, status_code, text):
try:
json_res = json.loads(text)
except ValueError:
self.message = f"Invalid JSON error message from Substack: {text}"
else:
self.message = ", ".join(
list(
map(lambda error: error.get("msg", ""), json_res.get("errors", []))
)
)
self.message = self.message or json_res.get("error", "")
self.status_code = status_code
def __str__(self):
return f"APIError(code={self.status_code}): {self.message}"
class SubstackRequestException(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
return f"SubstackRequestException: {self.message}"
class SectionNotExistsException(SubstackRequestException):
pass

View File

@@ -1,331 +0,0 @@
"""
Post Utilities
"""
import json
from typing import Dict
__all__ = ["Post"]
from substack.exceptions import SectionNotExistsException
class Post:
"""
Post utility class
"""
def __init__(
self,
title: str,
subtitle: str,
user_id,
audience: str = None,
write_comment_permissions: str = None,
):
"""
Args:
title:
subtitle:
user_id:
audience: possible values: everyone, only_paid, founding, only_free
write_comment_permissions: none, only_paid, everyone (this field is a mess)
"""
self.draft_title = title
self.draft_subtitle = subtitle
self.draft_body = {"type": "doc", "content": []}
self.draft_bylines = [{"id": int(user_id), "is_guest": False}]
self.audience = audience if audience is not None else "everyone"
self.draft_section_id = None
self.section_chosen = True
# TODO better understand the possible values and combinations with audience
if write_comment_permissions is not None:
self.write_comment_permissions = write_comment_permissions
else:
self.write_comment_permissions = self.audience
def set_section(self, name: str, sections: list):
"""
Args:
name:
sections:
Returns:
"""
section = [s for s in sections if s.get("name") == name]
if len(section) != 1:
raise SectionNotExistsException(name)
section = section[0]
self.draft_section_id = section.get("id")
def add(self, item: Dict):
"""
Add item to draft body.
Args:
item:
Returns:
"""
self.draft_body["content"] = self.draft_body.get("content", []) + [
{"type": item.get("type")}
]
content = item.get("content")
if item.get("type") == "captionedImage":
self.captioned_image(**item)
elif item.get("type") == "embeddedPublication":
self.draft_body["content"][-1]["attrs"] = item.get("url")
elif item.get("type") == "youtube2":
self.youtube(item.get("src"))
elif item.get("type") == "subscribeWidget":
self.subscribe_with_caption(item.get("message"))
else:
if content is not None:
self.add_complex_text(content)
if item.get("type") == "heading":
self.attrs(item.get("level", 1))
marks = item.get("marks")
if marks is not None:
self.marks(marks)
return self
def paragraph(self, content=None):
"""
Args:
content:
Returns:
"""
item = {"type": "paragraph"}
if content is not None:
item["content"] = content
return self.add(item)
def heading(self, content=None, level: int = 1):
"""
Args:
content:
level:
Returns:
"""
item = {"type": "heading"}
if content is not None:
item["content"] = content
item["level"] = level
return self.add(item)
def horizontal_rule(self):
"""
Returns:
"""
return self.add({"type": "horizontal_rule"})
def attrs(self, level):
"""
Args:
level:
Returns:
"""
content_attrs = self.draft_body["content"][-1].get("attrs", {})
content_attrs.update({"level": level})
self.draft_body["content"][-1]["attrs"] = content_attrs
return self
def captioned_image(
self,
src: str,
fullscreen: bool = False,
imageSize: str = "normal",
height: int = 819,
width: int = 1456,
resizeWidth: int = 728,
bytes: str = None,
alt: str = None,
title: str = None,
type: str = None,
href: str = None,
belowTheFold: bool = False,
internalRedirect: str = None,
):
"""
Add image to body.
Args:
bytes:
alt:
title:
type:
href:
belowTheFold:
internalRedirect:
src:
fullscreen:
imageSize:
height:
width:
resizeWidth:
"""
content = self.draft_body["content"][-1].get("content", [])
content += [
{
"type": "image2",
"attrs": {
"src": src,
"fullscreen": fullscreen,
"imageSize": imageSize,
"height": height,
"width": width,
"resizeWidth": resizeWidth,
"bytes": bytes,
"alt": alt,
"title": title,
"type": type,
"href": href,
"belowTheFold": belowTheFold,
"internalRedirect": internalRedirect,
},
}
]
self.draft_body["content"][-1]["content"] = content
return self
def text(self, value: str):
"""
Add text to the last paragraph.
Args:
value: Text to add to paragraph.
Returns:
"""
content = self.draft_body["content"][-1].get("content", [])
content += [{"type": "text", "text": value}]
self.draft_body["content"][-1]["content"] = content
return self
def add_complex_text(self, text):
"""
Args:
text:
"""
if isinstance(text, str):
self.text(text)
else:
for chunk in text:
if chunk:
self.text(chunk.get("content")).marks(chunk.get("marks", []))
def marks(self, marks):
"""
Args:
marks:
Returns:
"""
content = self.draft_body["content"][-1].get("content", [])[-1]
content_marks = content.get("marks", [])
for mark in marks:
new_mark = {"type": mark.get("type")}
if mark.get("type") == "link":
href = mark.get("href")
new_mark.update({"attrs": {"href": href}})
content_marks.append(new_mark)
content["marks"] = content_marks
return self
def remove_last_paragraph(self):
"""Remove last paragraph"""
del self.draft_body.get("content")[-1]
def get_draft(self):
"""
Returns:
"""
out = vars(self)
out["draft_body"] = json.dumps(out["draft_body"])
return out
def subscribe_with_caption(self, message: str = None):
"""
Add subscribe widget with caption
Args:
message:
Returns:
"""
if message is None:
message = """Thanks for reading this newsletter!
Subscribe for free to receive new posts and support my work."""
subscribe = self.draft_body["content"][-1]
subscribe["attrs"] = {
"url": "%%checkout_url%%",
"text": "Subscribe",
"language": "en",
}
subscribe["content"] = [
{
"type": "ctaCaption",
"content": [
{
"type": "text",
"text": message,
}
],
}
]
return self
def youtube(self, value: str):
"""
Add youtube video to post.
Args:
value: youtube url
Returns:
"""
content_attrs = self.draft_body["content"][-1].get("attrs", {})
content_attrs.update({"videoId": value})
self.draft_body["content"][-1]["attrs"] = content_attrs
return self

View File

@@ -7,4 +7,4 @@ git fetch --all
git reset --hard origin/main
# Run your Python script
python Post_RSS_on_SubStack.py
python post_rss_to_ghost.py