diff --git a/Post_RSS_on_SubStack.py b/Post_RSS_on_SubStack.py deleted file mode 100644 index e8110ef..0000000 --- a/Post_RSS_on_SubStack.py +++ /dev/null @@ -1,272 +0,0 @@ -import asyncio -import argparse -import requests -import feedparser -import io -import html -import datetime -import time -import logging -import os -import re -from logging.handlers import RotatingFileHandler -import random - - -from substack import Api -from substack.post import Post - -LOG = logging.getLogger('bot') -LOG_PATTERN = logging.Formatter('%(asctime)s:%(levelname)s: [%(filename)s] %(message)s') - -def setuplogger(): - - conf_filename = None - - steam_handler = logging.StreamHandler() - steam_handler.setFormatter(LOG_PATTERN) - steam_handler.setLevel(logging.DEBUG) - - def setup_logger(logger_name, file_name=None, add_steam=False): - file_name = file_name or logger_name - log_filename = f"{file_name}.log" - - logger = logging.getLogger(logger_name) - logger.setLevel(logging.DEBUG) - file_handler = RotatingFileHandler(log_filename, "a", 1000000, 1) - file_handler.setFormatter(LOG_PATTERN) - logger.addHandler(file_handler) - if add_steam: - logger.addHandler(steam_handler) - - setup_logger("bot", conf_filename, True) - -class RSSfeed(): - def __init__(self, url, yt=False): - self.url = url - self.youtube = yt - -class SubStackTask: - def __init__(self, login, password, cookies_path, account, feeds): - self.api = Api( - email=login, - password=password, - cookies_path=cookies_path, - publication_url=account, - ) - - self.user_id = self.api.get_user_id() - self.feeds = feeds - for feed in self.feeds: - LOG.info("Adding feed " + feed.url) - - - def format_duration(self, seconds): - days, seconds = divmod(seconds, 86400) - hours, seconds = divmod(seconds, 3600) - minutes, seconds = divmod(seconds, 60) - - days = round(days) - hours = round(hours) - minutes = round(minutes) - seconds = round(seconds) - - parts = [] - if days > 0: - parts.append(f"{days} days") - if hours > 0: - parts.append(f"{hours} hours") - if minutes > 0: - parts.append(f"{minutes} minutes") - if seconds > 0: - parts.append(f"{seconds} seconds") - - return ', '.join(parts) if parts else '0 seconds' - - def get_fr_date(self): - # Mapping of English month names to French - months_en_to_fr = { - 'January': 'Janvier', 'February': 'Février', 'March': 'Mars', - 'April': 'Avril', 'May': 'Mai', 'June': 'Juin', - 'July': 'Juillet', 'August': 'Août', 'September': 'Septembre', - 'October': 'Octobre', 'November': 'Novembre', 'December': 'Décembre' - } - today = datetime.datetime.now() - formatted_date = today.strftime("%d %B %Y") - # Replace the English month with the French month - for en, fr in months_en_to_fr.items(): - formatted_date = formatted_date.replace(en, fr) - return formatted_date - - async def run_daily_at_6_am(self): - while True: - now = datetime.datetime.now() - # Calculate the time until 6 AM next day - next_run = (now + datetime.timedelta(days=1)).replace(hour=6, minute=5, second=0, microsecond=0) - sleep_seconds = (next_run - now).total_seconds() - - while sleep_seconds > 0: - # Check if the remaining time is a multiple of 3600 seconds - formatted_duration = self.format_duration(sleep_seconds) - LOG.info(f"Waiting for {formatted_duration} for next scan") - - # Wait for some time before checking again - await asyncio.sleep(min(sleep_seconds, 5 * 60)) - - # Recalculate the remaining sleep time - now = datetime.datetime.now() - sleep_seconds = (next_run - now).total_seconds() - - LOG.info("Going to run the daily task") - # Run the daily task - await self.daily_task() - - - - async def daily_task(self): - - title_post = "Les news du " + self.get_fr_date() - - LOG.info("Running daily task : " + str(title_post)) - - ff = r'/data/feeds.txt' - if os.path.isfile(ff) is False: - ff = r'x:\substack\feeds.txt' - - self.feeds = [] - with open(ff) as file: - lines = [line.rstrip() for line in file] - - for line in lines: - youtube = "youtube" in line - self.feeds.append(RSSfeed(line, youtube)) - - sub_stack_post = Post( - title=title_post, - subtitle="", - user_id=self.user_id - ) - - midnight_today = datetime.datetime.now(datetime.timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) - yesterday_6am = datetime.datetime.now(datetime.timezone.utc).replace(hour=6, minute=0, second=0, microsecond=0) - datetime.timedelta(days=1) - - formatted_date = midnight_today.strftime('%a, %d %b %Y %H:%M:%S %z') - - all_news_posts = [] - - for feed in self.feeds: - LOG.info("Scanning feed " + feed.url) - html_text = requests.get(feed.url).text - newsFeed = feedparser.parse(html_text) - - - if feed.youtube is True: - new_posts = [entry for entry in newsFeed.entries if datetime.datetime.fromisoformat(entry.published) > yesterday_6am] - else: - try: - new_posts = [entry for entry in newsFeed.entries if datetime.datetime.strptime(entry.published.replace('GMT', '+0000'), '%a, %d %b %Y %H:%M:%S %z') > yesterday_6am] - except: - new_posts = [entry for entry in newsFeed.entries if datetime.datetime.fromtimestamp(time.mktime(entry.updated_parsed)).replace(tzinfo=datetime.timezone.utc) > yesterday_6am] - - - all_news_posts.extend(new_posts) - - - random.shuffle(all_news_posts) - - - for post in all_news_posts: - linkURL = post["link"] - title = post["title"] - ftext = "" - - if "actugaming" in linkURL: - if "puzzle-" in linkURL or "guide-" in linkURL: - continue - - LOG.info("Posting " + str(title)) - - if "summary" in post: - ftext = html.unescape(post["summary"]) - # Using regular expressions to remove HTML tags - ftext = re.sub('<[^<]+?>', '', ftext) - pattern = r"L’article .* est apparu en premier sur .*" - ftext = re.sub(pattern, '', ftext) - - if "yt_videoid" in post: - sub_stack_post.add({"type":"heading", "level":3, "content": title}) - videoId = post["yt_videoid"] - sub_stack_post.add({"type":"youtube2", "src": videoId }) - sub_stack_post.add({'type': 'paragraph', 'content': [ - {'content': linkURL, 'marks': [{'type': "link", 'href': linkURL}]}]}) - else: - - - - - if ftext != "": - sub_stack_post.add({"type":"heading", "level":3, "content": title}) - sub_stack_post.add({"type":"paragraph", "content": ftext }) - sub_stack_post.add({'type': 'paragraph', 'content': [ - {'content': linkURL, 'marks': [{'type': "link", 'href': linkURL}]}]}) - - if "links" in post: - for link in post["links"]: - - if link["type"] == "image/jpg": - imgUrl = link["href"] - sub_stack_post.add({'type': 'captionedImage', 'src': imgUrl}) - - - sub_stack_post.add({"type":"horizontal_rule"}) - - - - sub_stack_post.add({"type":"heading", "level":3, "content": "Sources"}) - for feed in self.feeds: - sub_stack_post.add({'type': 'paragraph', 'content': [ - {'content': feed.url, 'marks': [{'type': "link", 'href': feed.url}]}]}) - - - sub_stack_post.add({"type":"subscribeWidget", "message":"Abonnez-vous gratuitement pour recevoir chaque jour les news dans votre e-mail et soutenir mon travail."}) - - draft = self.api.post_draft(sub_stack_post.get_draft()) - self.api.prepublish_draft(draft.get("id")) - self.api.publish_draft(draft.get("id")) - -async def main(login, password, account): - - setuplogger() - - if os.path.exists("last_scan_date.txt"): - with open("last_scan_date.txt", "r") as f: - last_post_date = datetime.datetime.strptime(f.read().strip(), '%a, %d %b %Y %H:%M:%S %z') - else: - last_post_date = datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) - - feeds = [] - - ff = r'/data/feeds.txt' - if os.path.isfile(ff) is False: - ff = r'x:\substack\feeds.txt' - - cookies_path = r'/data/cookies.json' - if os.path.isfile(cookies_path) is False: - cookies_path = r'x:\substack\cookies.json' - - with open(ff) as file: - lines = [line.rstrip() for line in file] - - for line in lines: - youtube = "youtube" in line - feeds.append(RSSfeed(line, youtube)) - - task = SubStackTask(login, password, cookies_path, account, feeds) - - LOG.info("Starting bot") - await task.run_daily_at_6_am() - #await task.daily_task() - - -if __name__ == "__main__": - asyncio.run(main("gael.honorez@gmail.com", "f3PaTGedjFc2gkr1ypi5", "https://aggregateurjvfr.substack.com")) \ No newline at end of file diff --git a/checkemail.py b/checkemail.py deleted file mode 100644 index c6df40b..0000000 --- a/checkemail.py +++ /dev/null @@ -1,114 +0,0 @@ -import os.path -import base64 -import imaplib -import email -from email.header import decode_header -import google.auth -from google.oauth2.credentials import Credentials -from google_auth_oauthlib.flow import InstalledAppFlow -from google.auth.transport.requests import Request -from bs4 import BeautifulSoup # Import BeautifulSoup for parsing HTML -from datetime import datetime - -# If modifying these SCOPES, delete the file token.json. -SCOPES = ['https://mail.google.com/'] - -def authenticate_gmail(): - """Shows basic usage of the Gmail API. - Lists the user's Gmail labels. - """ - creds = None - # The file token.json stores the user's access and refresh tokens, and is - # created automatically when the authorization flow completes for the first - # time. - - token = r'/data/token.json' - if os.path.isfile(token) is False: - token = r'x:\substack\token.json' - - cred = r'/data/client_secret_396578640529-o4dsukvomuo43j5d4j0bogg17e3e8l7f.apps.googleusercontent.com.json' - if os.path.isfile(cred) is False: - cred = r'x:\substack\client_secret_396578640529-o4dsukvomuo43j5d4j0bogg17e3e8l7f.apps.googleusercontent.com.json' - - if os.path.exists(token): - creds = Credentials.from_authorized_user_file(token, SCOPES) - # If there are no (valid) credentials available, let the user log in. - if not creds or not creds.valid: - if creds and creds.expired and creds.refresh_token: - creds.refresh(Request()) - else: - flow = InstalledAppFlow.from_client_secrets_file(cred, SCOPES) - creds = flow.run_local_server(port=0) - # Save the credentials for the next run - with open(token, 'w') as token: - token.write(creds.to_json()) - - return creds - -def generate_oauth2_string(username, access_token): - return f"user={username}\1auth=Bearer {access_token}\1\1" - -def decode_mime_words(s): - return ''.join( - word.decode(encoding or 'utf-8') if isinstance(word, bytes) else word - for word, encoding in decode_header(s) - ) - -def get_verification_link(email_user, sender_email, start_time): - creds = authenticate_gmail() - auth_string = generate_oauth2_string(email_user, creds.token) - - mail = imaplib.IMAP4_SSL("imap.gmail.com") - #mail.debug = 4 # Enable IMAP debug output for more detailed logs - - try: - mail.authenticate('XOAUTH2', lambda x: auth_string) - except imaplib.IMAP4.error as e: - print(f"IMAP authentication error: {e}") - return None - - mail.select("inbox") - - result, data = mail.search(None, f'(FROM "{sender_email}" SUBJECT "Finish signing in to Substack")') - mail_ids = data[0] - id_list = mail_ids.split() - - for num in reversed(id_list): # Check the most recent emails first - result, data = mail.fetch(num, "(RFC822)") - raw_email = data[0][1] - msg = email.message_from_bytes(raw_email) - # Decode and print the email subject - subject = decode_mime_words(msg["Subject"]) - # Get email date - email_date_tuple = email.utils.parsedate_tz(msg["Date"]) - email_timestamp = email.utils.mktime_tz(email_date_tuple) - print(subject, start_time, email_timestamp) - - if abs(email_timestamp - start_time) > 12 * 3600: - continue - - if msg.is_multipart(): - for part in msg.walk(): - if part.get_content_type() == "text/plain": - body = part.get_payload(decode=True).decode() - for line in body.split("\n"): - if "http" in line: - return line.strip() - else: - print("-----") - body = msg.get_payload(decode=True).decode() - soup = BeautifulSoup(body, 'html.parser') - link = soup.find('a', href=True, text="Connectez-vous dès maintenant") - if link: - return link['href'] - - return None - -if __name__ == "__main__": - email_user = "gael.honorez@gmail.com" - sender_email = "no-reply@substack.com" - verification_link = get_verification_link(email_user, sender_email) - if verification_link: - print("Verification link found:", verification_link) - else: - print("No verification link found.") \ No newline at end of file diff --git a/generate_cookie.py b/generate_cookie.py deleted file mode 100644 index 24bd307..0000000 --- a/generate_cookie.py +++ /dev/null @@ -1,53 +0,0 @@ -from selenium import webdriver -from selenium.webdriver.common.by import By -from selenium.webdriver.common.keys import Keys -from selenium.webdriver.chrome.options import Options -from selenium.webdriver.support.ui import WebDriverWait -from selenium.webdriver.support import expected_conditions as EC - -import time -import pickle - -def save_cookies(driver, path): - with open(path, 'wb') as file: - pickle.dump(driver.get_cookies(), file) - -def generate_cookies(email, password): - cookie_path = r'x:\substack\cookies.pkl' - - - chrome_options = Options() - - driver = webdriver.Chrome(options=chrome_options) - driver.get('https://substack.com/sign-in') - - wait = WebDriverWait(driver, 10) - - try: - login_with_password_button = wait.until( - EC.element_to_be_clickable((By.LINK_TEXT, 'Sign in with password')) - ) - login_with_password_button.click() - - time.sleep(2) - - email_field = driver.find_element(By.NAME, 'email') - email_field.send_keys(email) - - password_field = driver.find_element(By.NAME, 'password') - password_field.send_keys(password) - - password_field.send_keys(Keys.RETURN) - - save_cookies(driver, cookie_path) - print("Cookies saved successfully.") - except Exception as e: - print("An error occurred during login.", e) - - driver.quit() - - -if __name__ == "__main__": - email = "gael.honorez@gmail.com" - password = "f3PaTGedjFc2gkr1ypi5" - generate_cookies(email, password) diff --git a/post_rss_to_ghost.py b/post_rss_to_ghost.py new file mode 100644 index 0000000..4cfec87 --- /dev/null +++ b/post_rss_to_ghost.py @@ -0,0 +1,305 @@ +import asyncio +import argparse +import datetime +import html +import io +import logging +import os +import random +import re +import time +from logging.handlers import RotatingFileHandler +from typing import Optional +import feedparser +import requests +import jwt + +LOG = logging.getLogger("bot") +LOG_PATTERN = logging.Formatter("%(asctime)s:%(levelname)s: [%(filename)s] %(message)s") + +def setuplogger(): + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(LOG_PATTERN) + stream_handler.setLevel(logging.DEBUG) + + file_handler = RotatingFileHandler("bot.log", "a", 1000000, 1) + file_handler.setFormatter(LOG_PATTERN) + + LOG.setLevel(logging.DEBUG) + LOG.addHandler(stream_handler) + LOG.addHandler(file_handler) + +class RSSfeed: + def __init__(self, url, yt=False): + self.url = url + self.youtube = yt + +# ---------- Ghost Admin API client ---------- + +class GhostAdmin: + def __init__(self, admin_url: str, admin_key: str, accept_version: str = "v6.0"): + self.base = admin_url.rstrip("/") + "/" + self.key_id, self.key_secret_hex = admin_key.split(":") + self.accept_version = accept_version + + def _jwt(self) -> str: + iat = int(time.time()) + payload = {"iat": iat, "exp": iat + 5 * 60, "aud": "/admin/"} + headers = {"alg": "HS256", "typ": "JWT", "kid": self.key_id} + token = jwt.encode(payload, bytes.fromhex(self.key_secret_hex), algorithm="HS256", headers=headers) + return token if isinstance(token, str) else token.decode("utf-8") + + def _headers(self): + return { + "Authorization": f"Ghost {self._jwt()}", + "Accept-Version": self.accept_version, + "Content-Type": "application/json", + } + + def get_newsletters(self): + url = self.base + "newsletters/" + resp = requests.get(url, headers=self._headers(), timeout=20) + if resp.status_code >= 400: + raise RuntimeError(f"Ghost newsletters error {resp.status_code}: {resp.text}") + return resp.json().get("newsletters", []) + + def pick_newsletter_slug(self, preferred_slug: Optional[str]) -> str: + if preferred_slug: + return preferred_slug + # Fallback: choose the first active newsletter (favor default if present) + nls = self.get_newsletters() + if not nls: + raise RuntimeError("No newsletters configured in Ghost (Settings → Newsletters).") + # try 'status=active' first + actives = [n for n in nls if n.get("status") == "active"] + # prefer default one if flagged + for n in actives: + if n.get("is_default"): + return n.get("slug") + return (actives or nls)[0].get("slug") + + def create_post_html(self, title: str, html_content: str, status: str = "draft", feature_image: Optional[str] = None): + """ + Create a post with HTML source; optionally set feature_image (absolute URL). + """ + url = self.base + "posts/?source=html" + post = {"title": title, "html": html_content, "status": status} + if feature_image: + post["feature_image"] = feature_image + resp = requests.post(url, headers=self._headers(), json={"posts": [post]}, timeout=30) + if resp.status_code >= 400: + raise RuntimeError(f"Ghost create error {resp.status_code}: {resp.text}") + return resp.json()["posts"][0] + + def publish_post(self, post_id: str, updated_at: str, newsletter_slug: Optional[str], email_segment: Optional[str]): + """ + Publish + (if newsletter provided) send email to the chosen audience. + """ + slug = self.pick_newsletter_slug(newsletter_slug) + params = [f"newsletter={requests.utils.quote(slug)}"] + if email_segment: + params.append(f"email_segment={requests.utils.quote(email_segment)}") + url = self.base + f"posts/{post_id}/?{'&'.join(params)}" + body = {"posts": [{"updated_at": updated_at, "status": "published"}]} + resp = requests.put(url, headers=self._headers(), json=body, timeout=30) + if resp.status_code >= 400: + raise RuntimeError(f"Ghost publish error {resp.status_code}: {resp.text}") + return resp.json()["posts"][0] +# ---------- Your task logic (ported from Substack) ---------- + +class GhostTask: + def __init__(self, feeds, admin_url, admin_key, newsletter_slug=None, email_segment=None): + self.ghost = GhostAdmin(admin_url, admin_key) + self.feeds = feeds + self.newsletter_slug = newsletter_slug + self.email_segment = email_segment + for feed in self.feeds: + LOG.info("Adding feed " + feed.url) + + def get_fr_date(self): + import datetime + months = { + 'January':'Janvier','February':'Février','March':'Mars','April':'Avril','May':'Mai','June':'Juin', + 'July':'Juillet','August':'Août','September':'Septembre','October':'Octobre','November':'Novembre','December':'Décembre' + } + today = datetime.datetime.now() + formatted = today.strftime("%d %B %Y") + for en, fr in months.items(): + formatted = formatted.replace(en, fr) + return formatted + + def _build_html_roundup(self, items, feeds): + """ + Build HTML and capture the first encountered image URL (for feature_image). + Returns (html_string, first_image_url_or_None). + """ + parts = [] + parts.append(f"
https://www.youtube.com/watch?v={videoId}
') + parts.append(f'') + else: + ftext = "" + if "summary" in post: + ftext = html.unescape(post["summary"]) + ftext = re.sub("<[^<]+?>", "", ftext) + ftext = re.sub(r"L’article .* est apparu en premier sur .*", "", ftext) + if ftext: + parts.append(f"{html.escape(ftext)}
") + if linkURL: + parts.append(f'') + + # Attach images in the body; remember the first one for feature_image + if "links" in post: + for link in post["links"]: + if link.get("type") in ("image/jpg","image/jpeg","image/png","image/webp"): + imgUrl = link.get("href") + if imgUrl: + if not first_image: + first_image = imgUrl + parts.append(f'Abonnez-vous pour recevoir chaque jour les news et soutenir mon travail.
') + return "\n".join(parts), first_image + + def format_duration(self, seconds): + days, seconds = divmod(seconds, 86400) + hours, seconds = divmod(seconds, 3600) + minutes, seconds = divmod(seconds, 60) + parts = [] + if days > 0: parts.append(f"{days} days") + if hours > 0: parts.append(f"{hours} hours") + if minutes > 0: parts.append(f"{minutes} minutes") + if seconds > 0: parts.append(f"{seconds} seconds") + return ", ".join(parts) if parts else "0 seconds" + + async def run_daily_at_6_am(self): + while True: + now = datetime.datetime.now() + next_run = (now + datetime.timedelta(days=1)).replace(hour=6, minute=5, second=0, microsecond=0) + sleep_seconds = (next_run - now).total_seconds() + while sleep_seconds > 0: + LOG.info(f"Waiting for {self.format_duration(sleep_seconds)} for next scan") + await asyncio.sleep(min(sleep_seconds, 5 * 60)) + now = datetime.datetime.now() + sleep_seconds = (next_run - now).total_seconds() + LOG.info("Going to run the daily task") + await self.daily_task() + + async def daily_task(self): + + nls = self.ghost.get_newsletters() + print("Newsletters:") + for n in nls: + print(f"- title={n.get('name')} slug={n.get('slug')} status={n.get('status')} default={n.get('is_default')}") + + title_post = "Les news du " + self.get_fr_date() + LOG.info("Running daily task : " + str(title_post)) + + # Re-read feeds (unchanged) + feeds_file = os.environ.get("FEEDS_FILE", "/data/feeds.txt") + if not os.path.isfile(feeds_file): + feeds_file = os.environ.get("FEEDS_FILE_FALLBACK", "x:\\substack\\feeds.txt") + self.feeds = [] + with open(feeds_file) as f: + lines = [line.strip() for line in f if line.strip()] + for line in lines: + self.feeds.append(RSSfeed(line, "youtube" in line)) + + yesterday_6am = datetime.datetime.now(datetime.timezone.utc).replace( + hour=6, minute=0, second=0, microsecond=0 + ) - datetime.timedelta(days=1) + + all_news_posts = [] + for feed in self.feeds: + LOG.info("Scanning feed " + feed.url) + html_text = requests.get(feed.url, timeout=30).text + newsFeed = feedparser.parse(html_text) + + if feed.youtube: + new_posts = [e for e in newsFeed.entries if datetime.datetime.fromisoformat(e.published) > yesterday_6am] + else: + try: + new_posts = [e for e in newsFeed.entries + if datetime.datetime.strptime(e.published.replace('GMT', '+0000'), + '%a, %d %b %Y %H:%M:%S %z') > yesterday_6am] + except Exception: + new_posts = [e for e in newsFeed.entries + if datetime.datetime.fromtimestamp(time.mktime(e.updated_parsed)).replace( + tzinfo=datetime.timezone.utc) > yesterday_6am] + + filtered = [] + for e in new_posts: + linkURL = e.get("link", "") + if "actugaming" in linkURL and ("puzzle-" in linkURL or "guide-" in linkURL): + continue + filtered.append(e) + all_news_posts.extend(filtered) + + random.shuffle(all_news_posts) + roundup_html, feature_image = self._build_html_roundup(all_news_posts, self.feeds) + + # 1) Create as draft WITH feature_image if we found one + created = self.ghost.create_post_html(title_post, roundup_html, status="draft", feature_image=feature_image) + + # 2) Publish AND SEND EMAIL (always) + published = self.ghost.publish_post( + post_id=created["id"], + updated_at=created["updated_at"], + newsletter_slug=os.environ.get("GHOST_NEWSLETTER_SLUG"), # may be None -> auto-pick + email_segment=os.environ.get("GHOST_EMAIL_SEGMENT"), # may be None -> send to all + ) + + LOG.info(f"Published post: {published.get('url')} (emailed via newsletter)") + + def debug_list_newsletters(admin_url, admin_key): + g = GhostAdmin(admin_url, admin_key) + nls = g.get_newsletters() + print("Newsletters:") + for n in nls: + print(f"- title={n.get('name')} slug={n.get('slug')} status={n.get('status')} default={n.get('is_default')}") +# ---------------- main ---------------- + +async def main(): + setuplogger() + # Feeds initial pass (kept for parity with your original script) + feeds = [] + feeds_file = os.environ.get("FEEDS_FILE", "/data/feeds.txt") + if not os.path.isfile(feeds_file): + feeds_file = os.environ.get("FEEDS_FILE_FALLBACK", r"c:\workspace\Substack_JV\feeds.txt") + with open(feeds_file) as f: + lines = [line.strip() for line in f if line.strip()] + for line in lines: + feeds.append(RSSfeed(line, "youtube" in line)) + + admin_url = os.environ["GHOST_ADMIN_URL"] + admin_key = os.environ["GHOST_ADMIN_KEY"] + + task = GhostTask( + feeds=feeds, + admin_url=admin_url, + admin_key=admin_key, + newsletter_slug=os.environ.get("GHOST_NEWSLETTER_SLUG"), + email_segment=os.environ.get("GHOST_EMAIL_SEGMENT"), + ) + + LOG.info("Starting bot") + await task.run_daily_at_6_am() + # Or just run once: + #await task.daily_task() + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/requirements.txt b/requirements.txt index 05eb052..2a5aa3c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,4 @@ -requests feedparser -google-auth -google-auth-oauthlib -google-auth-httplib2 -beautifulsoup4 +PyJWT>=2.7,<3 +requests>=2.31 +feedparser>=6.0 diff --git a/substack/__init__.py b/substack/__init__.py deleted file mode 100644 index 0d8268f..0000000 --- a/substack/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -"""A library that provides a Python interface to the Substack API.""" - -__author__ = "Paolo Mazza" -__email__ = "mazzapaolo2019@gmail.com" -__license__ = "MIT License" -__version__ = "1.0" -__url__ = "https://github.com/ma2za/python-substack" -__download_url__ = "https://pypi.python.org/pypi/python-substack" -__description__ = "A Python wrapper around the Substack API" - -from .api import Api diff --git a/substack/api.py b/substack/api.py deleted file mode 100644 index 9106621..0000000 --- a/substack/api.py +++ /dev/null @@ -1,723 +0,0 @@ -""" - -API Wrapper - -""" - -import base64 -import json -import logging -import os -from urllib.parse import urljoin -import requests -import pickle -import time -from substack.exceptions import SubstackAPIException, SubstackRequestException -from checkemail import get_verification_link -from datetime import datetime -logger = logging.getLogger(__name__) - -__all__ = ["Api"] - - -class Api: - """ - - A python interface into the Substack API - - """ - - def __init__( - self, - email=None, - password=None, - cookies_path=None, - base_url=None, - publication_url=None, - debug=False, - ): - """ - - To create an instance of the substack.Api class: - >>> import substack - >>> api = substack.Api(email="substack email", password="substack password") - - Args: - email: - password: - cookies_path - To re-use your session without logging in each time, you can save your cookies to a json file and - then load them in the next session. - Make sure to re-save your cookies, as they do update over time. - base_url: - The base URL to use to contact the Substack API. - Defaults to https://substack.com/api/v1. - """ - self.base_url = base_url or "https://substack.com/api/v1" - self.email = email - self.password = password - - if debug: - logging.basicConfig() - logging.getLogger().setLevel(logging.DEBUG) - - self._session = requests.Session() - - # Load cookies from file if provided - if cookies_path is not None: - - if os.path.exists(cookies_path): - with open(cookies_path) as f: - cookies = json.load(f) - self._session.cookies.update(cookies) - - if not os.path.exists(cookies_path) or self.are_cookies_expired(cookies): - print("Cookies are expired. Sending magic link and waiting for verification.") - start_time = time.time() # Record the time when the magic link is sent - self.send_magic_link(email) - verification_link = self.wait_for_verification_link(start_time) - if verification_link: - self.login_v2(email, password, verification_link) - self.export_cookies(cookies_path) - - else: - raise Exception("Failed to get the verification link.") - - elif email is not None and password is not None: - self.login(email, password) - else: - raise ValueError("Must provide email and password or cookies_path to authenticate.") - - user_publication = None - # if the user provided a publication url, then use that - if publication_url: - import re - - # Regular expression to extract subdomain name - match = re.search(r"https://(.*).substack.com", publication_url.lower()) - subdomain = match.group(1) if match else None - - user_publications = self.get_user_publications() - # search through publications to find the publication with the matching subdomain - for publication in user_publications: - if publication["subdomain"] == subdomain: - # set the current publication to the users publication - user_publication = publication - break - else: - # get the users primary publication - user_publication = self.get_user_primary_publication() - - # set the current publication to the users primary publication - self.change_publication(user_publication) - - def are_cookies_expired(self, cookies): - for cookie in cookies: - if 'expiry' in cookie and cookie['expiry'] < time.time(): - return True - return False - - def wait_for_verification_link(self, start_time): - sender_email = "no-reply@substack.com" - - while True: - verification_link = get_verification_link(self.email, sender_email, start_time) - if verification_link: - return verification_link - time.sleep(10) # Wait for X seconds before checking again - - - def send_magic_link(self, email): - body = { - "email": email, - "redirect": "/", - "for_pub": "", - } - endpoint = f"https://substack.com/api/v1/email-login/" - response = self._session.post(endpoint, json=body) - print("Magic link sent!") - - - def login_v2(self, email, password, magic_link): - return self._session.get(magic_link) - - def login(self, email, password) -> dict: - """ - - Login to the substack account. - - Args: - email: substack account email - password: substack account password - """ - - response = self._session.post( - f"{self.base_url}/login", - json={ - "captcha_response": None, - "email": email, - "password": password, - }, - ) - - return self._handle_response(response=response) - - - - def signin_for_pub(self, publication): - """ - Complete the signin process - """ - response = self._session.get( - f"https://substack.com/sign-in?redirect=%2F&for_pub={publication['subdomain']}", - ) - try: - output = self._handle_response(response=response) - except SubstackRequestException as ex: - output = {} - return output - - def change_publication(self, publication): - """ - Change the publication URL - """ - self.publication_url = urljoin(publication["publication_url"], "api/v1") - - # sign-in to the publication - self.signin_for_pub(publication) - - def export_cookies(self, path: str = "cookies.json"): - """ - Export cookies to a json file. - Args: - path: path to the json file - """ - cookies = self._session.cookies.get_dict() - with open(path, "w") as f: - json.dump(cookies, f) - - def save_cookies(self, driver, path): - with open(path, 'wb') as file: - pickle.dump(driver.get_cookies(), file) - - def load_cookies(self, driver, path): - with open(path, 'rb') as file: - cookies = pickle.load(file) - for cookie in cookies: - driver.add_cookie(cookie) - - def login_with_selenium(self): - """ - Login using Selenium to solve CAPTCHA manually. - """ - - # Start virtual display - cookie_path = 'cookies.pkl' - - if os.path.exists(cookie_path): - try: - # Load cookies directly to session if they exist - with open(cookie_path, 'rb') as file: - cookies = pickle.load(file) - cookies_valid = True - for cookie in cookies: - if 'expiry' in cookie: - expiry_date = datetime.datetime.fromtimestamp(cookie['expiry']) - print(f"Cookie {cookie['name']} expires on {expiry_date}") - if cookie['expiry'] < time.time(): - cookies_valid = False - print(f"Cookie {cookie['name']} has expired.") - break - if cookies_valid: - # Load cookies into session - for cookie in cookies: - self._session.cookies.set(cookie['name'], cookie['value']) - - print("Cookies loaded successfully. Skipping login.") - return - except Exception as e: - print("Error loading cookies, proceeding with Selenium login.", e) - - #display = Display() - #display.start() - print("Login with selenium") - chrome_options = Options() - chrome_options.add_argument("--headless") - chrome_options.add_argument("--no-sandbox") - chrome_options.add_argument("--disable-dev-shm-usage") - chrome_options.add_argument("--disable-gpu") - - driver = webdriver.Chrome() - driver.get('https://substack.com/sign-in') - - # Check if already logged in by checking the presence of a user-specific element - wait = WebDriverWait(driver, 10) - try: - - # Adjust the selector to match an element that is present only when logged in - dashboard_button = wait.until( - EC.presence_of_element_located((By.CSS_SELECTOR, 'button[data-href*="publish/home?utm_source=menu"]')) - ) - print("Already logged in.") - - except Exception: - print("Not logged in. Proceeding with login steps.") - try: - login_with_password_button = wait.until(EC.element_to_be_clickable((By.LINK_TEXT, 'Log in with password'))) - login_with_password_button.click() - - time.sleep(2) # Wait for the transition to the login form - - # Fill in the email and password fields - email_field = driver.find_element(By.NAME, 'email') - email_field.send_keys(self.email) - - password_field = driver.find_element(By.NAME, 'password') - password_field.send_keys(self.password) - - # Submit the form - password_field.send_keys(Keys.RETURN) - - print("Please solve the CAPTCHA manually in the opened browser.") - input("Press Enter after solving the CAPTCHA...") - - # Save cookies after solving the CAPTCHA - self.save_cookies(driver, cookie_path) - print("Cookies saved successfully.") - except Exception as e: - print("An error occurred during login.", e) - - # Save cookies after login or cookie load - self.save_cookies(driver, cookie_path) - - # Extract cookies to use with requests - cookies = driver.get_cookies() - for cookie in cookies: - self._session.cookies.set(cookie['name'], cookie['value']) - - # Close the browser - driver.quit() - - - def _handle_response(self, response: requests.Response): - - """ - - Internal helper for handling API responses from the Substack server. - Raises the appropriate exceptions when necessary; otherwise, returns the - response. - - """ - if (response.status_code == 401): - print("CAPTCHA detected, switching to Selenium for manual solving.") - - return - - if not (200 <= response.status_code < 300): - raise SubstackAPIException(response.status_code, response.text) - try: - return response.json() - except ValueError: - raise SubstackRequestException("Invalid Response: %s" % response.text) - - def get_user_id(self): - """ - - Returns: - - """ - profile = self.get_user_profile() - user_id = profile["id"] - - return user_id - - @staticmethod - def get_publication_url(publication: dict) -> str: - """ - Gets the publication url - - Args: - publication: - """ - custom_domain = publication["custom_domain"] - if not custom_domain: - publication_url = f"https://{publication['subdomain']}.substack.com" - else: - publication_url = f"https://{custom_domain}" - - return publication_url - - def get_user_primary_publication(self): - """ - Gets the users primary publication - """ - - profile = self.get_user_profile() - primary_publication = profile["primaryPublication"] - primary_publication["publication_url"] = self.get_publication_url( - primary_publication - ) - - return primary_publication - - def get_user_publications(self): - """ - Gets the users publications - """ - - profile = self.get_user_profile() - - # Loop through users "publicationUsers" list, and return a list - # of dictionaries of "name", and "subdomain", and "id" - user_publications = [] - for publication in profile["publicationUsers"]: - pub = publication["publication"] - pub["publication_url"] = self.get_publication_url(pub) - user_publications.append(pub) - - return user_publications - - def get_user_profile(self): - """ - Gets the users profile - """ - response = self._session.get(f"{self.base_url}/user/profile/self") - - return self._handle_response(response=response) - - def get_user_settings(self): - """ - Get list of users. - - Returns: - - """ - response = self._session.get(f"{self.base_url}/settings") - - return self._handle_response(response=response) - - def get_publication_users(self): - """ - Get list of users. - - Returns: - - """ - response = self._session.get(f"{self.publication_url}/publication/users") - - return self._handle_response(response=response) - - def get_publication_subscriber_count(self): - - """ - Get subscriber count. - - Returns: - - """ - response = self._session.get( - f"{self.publication_url}/publication_launch_checklist" - ) - - return self._handle_response(response=response)["subscriberCount"] - - def get_published_posts( - self, offset=0, limit=25, order_by="post_date", order_direction="desc" - ): - """ - Get list of published posts for the publication. - """ - response = self._session.get( - f"{self.publication_url}/post_management/published", - params={ - "offset": offset, - "limit": limit, - "order_by": order_by, - "order_direction": order_direction, - }, - ) - - return self._handle_response(response=response) - - def get_posts(self) -> dict: - """ - - Returns: - - """ - response = self._session.get(f"{self.base_url}/reader/posts") - - return self._handle_response(response=response) - - def get_drafts(self, filter=None, offset=None, limit=None): - """ - - Args: - filter: - offset: - limit: - - Returns: - - """ - response = self._session.get( - f"{self.publication_url}/drafts", - params={"filter": filter, "offset": offset, "limit": limit}, - ) - return self._handle_response(response=response) - - def get_draft(self, draft_id): - """ - Gets a draft given it's id. - - """ - response = self._session.get(f"{self.publication_url}/drafts/{draft_id}") - return self._handle_response(response=response) - - def delete_draft(self, draft_id): - """ - - Args: - draft_id: - - Returns: - - """ - response = self._session.delete(f"{self.publication_url}/drafts/{draft_id}") - return self._handle_response(response=response) - - def post_draft(self, body) -> dict: - """ - - Args: - body: - - Returns: - - """ - response = self._session.post(f"{self.publication_url}/drafts", json=body) - return self._handle_response(response=response) - - def put_draft(self, draft, **kwargs) -> dict: - """ - - Args: - draft: - **kwargs: - - Returns: - - """ - response = self._session.put( - f"{self.publication_url}/drafts/{draft}", - json=kwargs, - ) - return self._handle_response(response=response) - - def prepublish_draft(self, draft) -> dict: - """ - - Args: - draft: draft id - - Returns: - - """ - - response = self._session.get( - f"{self.publication_url}/drafts/{draft}/prepublish" - ) - return self._handle_response(response=response) - - def publish_draft( - self, draft, send: bool = True, share_automatically: bool = False - ) -> dict: - """ - - Args: - draft: draft id - send: - share_automatically: - - Returns: - - """ - response = self._session.post( - f"{self.publication_url}/drafts/{draft}/publish", - json={"send": send, "share_automatically": share_automatically}, - ) - return self._handle_response(response=response) - - def schedule_draft(self, draft, draft_datetime: datetime) -> dict: - """ - - Args: - draft: draft id - draft_datetime: datetime to schedule the draft - - Returns: - - """ - response = self._session.post( - f"{self.publication_url}/drafts/{draft}/schedule", - json={"post_date": draft_datetime.isoformat()}, - ) - return self._handle_response(response=response) - - def unschedule_draft(self, draft) -> dict: - """ - - Args: - draft: draft id - - Returns: - - """ - response = self._session.post( - f"{self.publication_url}/drafts/{draft}/schedule", json={"post_date": None} - ) - return self._handle_response(response=response) - - def get_image(self, image: str): - """ - - This method generates a new substack link that contains the image. - - Args: - image: filepath or original url of image. - - Returns: - - """ - if os.path.exists(image): - with open(image, "rb") as file: - image = b"data:image/jpeg;base64," + base64.b64encode(file.read()) - - response = self._session.post( - f"{self.publication_url}/image", - data={"image": image}, - ) - return self._handle_response(response=response) - - def get_categories(self): - """ - - Retrieve list of all available categories. - - Returns: - - """ - response = self._session.get(f"{self.base_url}/categories") - return self._handle_response(response=response) - - def get_category(self, category_id, category_type, page): - """ - - Args: - category_id: - category_type: - page: - - Returns: - - """ - response = self._session.get( - f"{self.base_url}/category/public/{category_id}/{category_type}", - params={"page": page}, - ) - return self._handle_response(response=response) - - def get_single_category(self, category_id, category_type, page=None, limit=None): - """ - - Args: - category_id: - category_type: paid or all - page: by default substack retrieves only the first 25 publications in the category. If this is left None, - then all pages will be retrieved. The page size is 25 publications. - limit: - Returns: - - """ - if page is not None: - output = self.get_category(category_id, category_type, page) - else: - publications = [] - page = 0 - while True: - page_output = self.get_category(category_id, category_type, page) - publications.extend(page_output.get("publications", [])) - if ( - limit is not None and limit <= len(publications) - ) or not page_output.get("more", False): - publications = publications[:limit] - break - page += 1 - output = { - "publications": publications, - "more": page_output.get("more", False), - } - return output - - def delete_all_drafts(self): - """ - - Returns: - - """ - response = None - while True: - drafts = self.get_drafts(filter="draft", limit=10, offset=0) - if len(drafts) == 0: - break - for draft in drafts: - response = self.delete_draft(draft.get("id")) - return response - - def get_sections(self): - """ - Get a list of the sections of your publication. - - TODO: this is hacky but I cannot find another place where to get the sections. - Returns: - - """ - response = self._session.get( - f"{self.publication_url}/subscriptions", - ) - content = Api._handle_response(response=response) - sections = [ - p.get("sections") - for p in content.get("publications") - if p.get("hostname") in self.publication_url - ] - return sections[0] - - def publication_embed(self, url): - """ - - Args: - url: - - Returns: - - """ - return self.call("/publication/embed", "GET", url=url) - - def call(self, endpoint, method, **params): - """ - - Args: - endpoint: - method: - **params: - - Returns: - - """ - response = self._session.request( - method=method, - url=f"{self.publication_url}/{endpoint}", - params=params, - ) - return self._handle_response(response=response) diff --git a/substack/exceptions.py b/substack/exceptions.py deleted file mode 100644 index e9b6f29..0000000 --- a/substack/exceptions.py +++ /dev/null @@ -1,32 +0,0 @@ -import json - - -class SubstackAPIException(Exception): - def __init__(self, status_code, text): - try: - json_res = json.loads(text) - except ValueError: - self.message = f"Invalid JSON error message from Substack: {text}" - else: - self.message = ", ".join( - list( - map(lambda error: error.get("msg", ""), json_res.get("errors", [])) - ) - ) - self.message = self.message or json_res.get("error", "") - self.status_code = status_code - - def __str__(self): - return f"APIError(code={self.status_code}): {self.message}" - - -class SubstackRequestException(Exception): - def __init__(self, message): - self.message = message - - def __str__(self): - return f"SubstackRequestException: {self.message}" - - -class SectionNotExistsException(SubstackRequestException): - pass diff --git a/substack/post.py b/substack/post.py deleted file mode 100644 index 825baf5..0000000 --- a/substack/post.py +++ /dev/null @@ -1,331 +0,0 @@ -""" - -Post Utilities - -""" - -import json -from typing import Dict - -__all__ = ["Post"] - -from substack.exceptions import SectionNotExistsException - - -class Post: - """ - - Post utility class - - """ - - def __init__( - self, - title: str, - subtitle: str, - user_id, - audience: str = None, - write_comment_permissions: str = None, - ): - """ - - Args: - title: - subtitle: - user_id: - audience: possible values: everyone, only_paid, founding, only_free - write_comment_permissions: none, only_paid, everyone (this field is a mess) - """ - self.draft_title = title - self.draft_subtitle = subtitle - self.draft_body = {"type": "doc", "content": []} - self.draft_bylines = [{"id": int(user_id), "is_guest": False}] - self.audience = audience if audience is not None else "everyone" - self.draft_section_id = None - self.section_chosen = True - - # TODO better understand the possible values and combinations with audience - if write_comment_permissions is not None: - self.write_comment_permissions = write_comment_permissions - else: - self.write_comment_permissions = self.audience - - def set_section(self, name: str, sections: list): - """ - - Args: - name: - sections: - - Returns: - - """ - section = [s for s in sections if s.get("name") == name] - if len(section) != 1: - raise SectionNotExistsException(name) - section = section[0] - self.draft_section_id = section.get("id") - - def add(self, item: Dict): - """ - - Add item to draft body. - - Args: - item: - - Returns: - - """ - - self.draft_body["content"] = self.draft_body.get("content", []) + [ - {"type": item.get("type")} - ] - content = item.get("content") - if item.get("type") == "captionedImage": - self.captioned_image(**item) - elif item.get("type") == "embeddedPublication": - self.draft_body["content"][-1]["attrs"] = item.get("url") - elif item.get("type") == "youtube2": - self.youtube(item.get("src")) - elif item.get("type") == "subscribeWidget": - self.subscribe_with_caption(item.get("message")) - else: - if content is not None: - self.add_complex_text(content) - - if item.get("type") == "heading": - self.attrs(item.get("level", 1)) - - marks = item.get("marks") - if marks is not None: - self.marks(marks) - - return self - - def paragraph(self, content=None): - """ - - Args: - content: - - Returns: - - """ - item = {"type": "paragraph"} - if content is not None: - item["content"] = content - return self.add(item) - - def heading(self, content=None, level: int = 1): - """ - - Args: - content: - level: - - Returns: - - """ - - item = {"type": "heading"} - if content is not None: - item["content"] = content - item["level"] = level - return self.add(item) - - def horizontal_rule(self): - """ - - Returns: - - """ - return self.add({"type": "horizontal_rule"}) - - def attrs(self, level): - """ - - Args: - level: - - Returns: - - """ - content_attrs = self.draft_body["content"][-1].get("attrs", {}) - content_attrs.update({"level": level}) - self.draft_body["content"][-1]["attrs"] = content_attrs - return self - - def captioned_image( - self, - src: str, - fullscreen: bool = False, - imageSize: str = "normal", - height: int = 819, - width: int = 1456, - resizeWidth: int = 728, - bytes: str = None, - alt: str = None, - title: str = None, - type: str = None, - href: str = None, - belowTheFold: bool = False, - internalRedirect: str = None, - ): - """ - - Add image to body. - - Args: - bytes: - alt: - title: - type: - href: - belowTheFold: - internalRedirect: - src: - fullscreen: - imageSize: - height: - width: - resizeWidth: - """ - - content = self.draft_body["content"][-1].get("content", []) - content += [ - { - "type": "image2", - "attrs": { - "src": src, - "fullscreen": fullscreen, - "imageSize": imageSize, - "height": height, - "width": width, - "resizeWidth": resizeWidth, - "bytes": bytes, - "alt": alt, - "title": title, - "type": type, - "href": href, - "belowTheFold": belowTheFold, - "internalRedirect": internalRedirect, - }, - } - ] - self.draft_body["content"][-1]["content"] = content - return self - - def text(self, value: str): - """ - - Add text to the last paragraph. - - Args: - value: Text to add to paragraph. - - Returns: - - """ - content = self.draft_body["content"][-1].get("content", []) - content += [{"type": "text", "text": value}] - self.draft_body["content"][-1]["content"] = content - return self - - def add_complex_text(self, text): - """ - - Args: - text: - """ - if isinstance(text, str): - self.text(text) - else: - for chunk in text: - if chunk: - self.text(chunk.get("content")).marks(chunk.get("marks", [])) - - def marks(self, marks): - """ - - Args: - marks: - - Returns: - - """ - content = self.draft_body["content"][-1].get("content", [])[-1] - content_marks = content.get("marks", []) - for mark in marks: - new_mark = {"type": mark.get("type")} - if mark.get("type") == "link": - href = mark.get("href") - new_mark.update({"attrs": {"href": href}}) - content_marks.append(new_mark) - content["marks"] = content_marks - return self - - def remove_last_paragraph(self): - """Remove last paragraph""" - del self.draft_body.get("content")[-1] - - def get_draft(self): - """ - - Returns: - - """ - out = vars(self) - out["draft_body"] = json.dumps(out["draft_body"]) - return out - - def subscribe_with_caption(self, message: str = None): - """ - - Add subscribe widget with caption - - Args: - message: - - Returns: - - """ - - if message is None: - message = """Thanks for reading this newsletter! - Subscribe for free to receive new posts and support my work.""" - - subscribe = self.draft_body["content"][-1] - subscribe["attrs"] = { - "url": "%%checkout_url%%", - "text": "Subscribe", - "language": "en", - } - subscribe["content"] = [ - { - "type": "ctaCaption", - "content": [ - { - "type": "text", - "text": message, - } - ], - } - ] - return self - - def youtube(self, value: str): - """ - - Add youtube video to post. - - Args: - value: youtube url - - Returns: - - """ - content_attrs = self.draft_body["content"][-1].get("attrs", {}) - content_attrs.update({"videoId": value}) - self.draft_body["content"][-1]["attrs"] = content_attrs - return self diff --git a/update_and_run.sh b/update_and_run.sh index 6754a57..1b4b7ec 100644 --- a/update_and_run.sh +++ b/update_and_run.sh @@ -7,4 +7,4 @@ git fetch --all git reset --hard origin/main # Run your Python script -python Post_RSS_on_SubStack.py \ No newline at end of file +python post_rss_to_ghost.py \ No newline at end of file