""" API Wrapper """ import base64 import json import logging import os from datetime import datetime from urllib.parse import urljoin from pyvirtualdisplay import Display import requests from selenium import webdriver from selenium.webdriver.common.keys import Keys from selenium.webdriver.common.by import By from selenium.webdriver.chrome.options import Options from selenium.webdriver.support.ui import WebDriverWait import pickle import time from substack.exceptions import SubstackAPIException, SubstackRequestException from selenium.webdriver.support import expected_conditions as EC import datetime logger = logging.getLogger(__name__) __all__ = ["Api"] class Api: """ A python interface into the Substack API """ def __init__( self, email=None, password=None, cookies_path=None, base_url=None, publication_url=None, debug=False, ): """ To create an instance of the substack.Api class: >>> import substack >>> api = substack.Api(email="substack email", password="substack password") Args: email: password: cookies_path To re-use your session without logging in each time, you can save your cookies to a json file and then load them in the next session. Make sure to re-save your cookies, as they do update over time. base_url: The base URL to use to contact the Substack API. Defaults to https://substack.com/api/v1. """ self.base_url = base_url or "https://substack.com/api/v1" self.email = email self.password = password if debug: logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) self._session = requests.Session() # Load cookies from file if provided # Helps with Captcha errors by reusing cookies from "local" auth, then switching to running code in the cloud if cookies_path is not None: with open(cookies_path) as f: cookies = json.load(f) self._session.cookies.update(cookies) elif email is not None and password is not None: self.send_magic_link(email) magic_link = input("Enter magic link: ") self.login_v2(email, password, magic_link) self.export_cookies(cookies_path) else: raise ValueError( "Must provide email and password or cookies_path to authenticate." ) user_publication = None # if the user provided a publication url, then use that if publication_url: import re # Regular expression to extract subdomain name match = re.search(r"https://(.*).substack.com", publication_url.lower()) subdomain = match.group(1) if match else None user_publications = self.get_user_publications() # search through publications to find the publication with the matching subdomain for publication in user_publications: if publication["subdomain"] == subdomain: # set the current publication to the users publication user_publication = publication break else: # get the users primary publication user_publication = self.get_user_primary_publication() # set the current publication to the users primary publication self.change_publication(user_publication) def send_magic_link(self, email): body = { "email": email, "redirect": "/", "for_pub": "", } endpoint = f"https://substack.com/api/v1/email-login/" response = self._session.post(endpoint, json=body) print("Magic link sent!") def login_v2(self, email, password, magic_link): return self._session.get(magic_link) def login(self, email, password) -> dict: """ Login to the substack account. Args: email: substack account email password: substack account password """ response = self._session.post( f"{self.base_url}/login", json={ "captcha_response": None, "email": email, "password": password, }, ) return self._handle_response(response=response) def signin_for_pub(self, publication): """ Complete the signin process """ response = self._session.get( f"https://substack.com/sign-in?redirect=%2F&for_pub={publication['subdomain']}", ) try: output = self._handle_response(response=response) except SubstackRequestException as ex: output = {} return output def change_publication(self, publication): """ Change the publication URL """ self.publication_url = urljoin(publication["publication_url"], "api/v1") # sign-in to the publication self.signin_for_pub(publication) def export_cookies(self, path: str = "cookies.json"): """ Export cookies to a json file. Args: path: path to the json file """ cookies = self._session.cookies.get_dict() with open(path, "w") as f: json.dump(cookies, f) def save_cookies(self, driver, path): with open(path, 'wb') as file: pickle.dump(driver.get_cookies(), file) def load_cookies(self, driver, path): with open(path, 'rb') as file: cookies = pickle.load(file) for cookie in cookies: driver.add_cookie(cookie) def login_with_selenium(self): """ Login using Selenium to solve CAPTCHA manually. """ # Start virtual display cookie_path = 'cookies.pkl' if os.path.exists(cookie_path): try: # Load cookies directly to session if they exist with open(cookie_path, 'rb') as file: cookies = pickle.load(file) cookies_valid = True for cookie in cookies: if 'expiry' in cookie: expiry_date = datetime.datetime.fromtimestamp(cookie['expiry']) print(f"Cookie {cookie['name']} expires on {expiry_date}") if cookie['expiry'] < time.time(): cookies_valid = False print(f"Cookie {cookie['name']} has expired.") break if cookies_valid: # Load cookies into session for cookie in cookies: self._session.cookies.set(cookie['name'], cookie['value']) print("Cookies loaded successfully. Skipping login.") return except Exception as e: print("Error loading cookies, proceeding with Selenium login.", e) #display = Display() #display.start() print("Login with selenium") chrome_options = Options() chrome_options.add_argument("--headless") chrome_options.add_argument("--no-sandbox") chrome_options.add_argument("--disable-dev-shm-usage") chrome_options.add_argument("--disable-gpu") driver = webdriver.Chrome() driver.get('https://substack.com/sign-in') # Check if already logged in by checking the presence of a user-specific element wait = WebDriverWait(driver, 10) try: # Adjust the selector to match an element that is present only when logged in dashboard_button = wait.until( EC.presence_of_element_located((By.CSS_SELECTOR, 'button[data-href*="publish/home?utm_source=menu"]')) ) print("Already logged in.") except Exception: print("Not logged in. Proceeding with login steps.") try: login_with_password_button = wait.until(EC.element_to_be_clickable((By.LINK_TEXT, 'Log in with password'))) login_with_password_button.click() time.sleep(2) # Wait for the transition to the login form # Fill in the email and password fields email_field = driver.find_element(By.NAME, 'email') email_field.send_keys(self.email) password_field = driver.find_element(By.NAME, 'password') password_field.send_keys(self.password) # Submit the form password_field.send_keys(Keys.RETURN) print("Please solve the CAPTCHA manually in the opened browser.") input("Press Enter after solving the CAPTCHA...") # Save cookies after solving the CAPTCHA self.save_cookies(driver, cookie_path) print("Cookies saved successfully.") except Exception as e: print("An error occurred during login.", e) # Save cookies after login or cookie load self.save_cookies(driver, cookie_path) # Extract cookies to use with requests cookies = driver.get_cookies() for cookie in cookies: self._session.cookies.set(cookie['name'], cookie['value']) # Close the browser driver.quit() def _handle_response(self, response: requests.Response): """ Internal helper for handling API responses from the Substack server. Raises the appropriate exceptions when necessary; otherwise, returns the response. """ if (response.status_code == 401): print("CAPTCHA detected, switching to Selenium for manual solving.") return if not (200 <= response.status_code < 300): raise SubstackAPIException(response.status_code, response.text) try: return response.json() except ValueError: raise SubstackRequestException("Invalid Response: %s" % response.text) def get_user_id(self): """ Returns: """ profile = self.get_user_profile() user_id = profile["id"] return user_id @staticmethod def get_publication_url(publication: dict) -> str: """ Gets the publication url Args: publication: """ custom_domain = publication["custom_domain"] if not custom_domain: publication_url = f"https://{publication['subdomain']}.substack.com" else: publication_url = f"https://{custom_domain}" return publication_url def get_user_primary_publication(self): """ Gets the users primary publication """ profile = self.get_user_profile() primary_publication = profile["primaryPublication"] primary_publication["publication_url"] = self.get_publication_url( primary_publication ) return primary_publication def get_user_publications(self): """ Gets the users publications """ profile = self.get_user_profile() # Loop through users "publicationUsers" list, and return a list # of dictionaries of "name", and "subdomain", and "id" user_publications = [] for publication in profile["publicationUsers"]: pub = publication["publication"] pub["publication_url"] = self.get_publication_url(pub) user_publications.append(pub) return user_publications def get_user_profile(self): """ Gets the users profile """ response = self._session.get(f"{self.base_url}/user/profile/self") return self._handle_response(response=response) def get_user_settings(self): """ Get list of users. Returns: """ response = self._session.get(f"{self.base_url}/settings") return self._handle_response(response=response) def get_publication_users(self): """ Get list of users. Returns: """ response = self._session.get(f"{self.publication_url}/publication/users") return self._handle_response(response=response) def get_publication_subscriber_count(self): """ Get subscriber count. Returns: """ response = self._session.get( f"{self.publication_url}/publication_launch_checklist" ) return self._handle_response(response=response)["subscriberCount"] def get_published_posts( self, offset=0, limit=25, order_by="post_date", order_direction="desc" ): """ Get list of published posts for the publication. """ response = self._session.get( f"{self.publication_url}/post_management/published", params={ "offset": offset, "limit": limit, "order_by": order_by, "order_direction": order_direction, }, ) return self._handle_response(response=response) def get_posts(self) -> dict: """ Returns: """ response = self._session.get(f"{self.base_url}/reader/posts") return self._handle_response(response=response) def get_drafts(self, filter=None, offset=None, limit=None): """ Args: filter: offset: limit: Returns: """ response = self._session.get( f"{self.publication_url}/drafts", params={"filter": filter, "offset": offset, "limit": limit}, ) return self._handle_response(response=response) def get_draft(self, draft_id): """ Gets a draft given it's id. """ response = self._session.get(f"{self.publication_url}/drafts/{draft_id}") return self._handle_response(response=response) def delete_draft(self, draft_id): """ Args: draft_id: Returns: """ response = self._session.delete(f"{self.publication_url}/drafts/{draft_id}") return self._handle_response(response=response) def post_draft(self, body) -> dict: """ Args: body: Returns: """ response = self._session.post(f"{self.publication_url}/drafts", json=body) return self._handle_response(response=response) def put_draft(self, draft, **kwargs) -> dict: """ Args: draft: **kwargs: Returns: """ response = self._session.put( f"{self.publication_url}/drafts/{draft}", json=kwargs, ) return self._handle_response(response=response) def prepublish_draft(self, draft) -> dict: """ Args: draft: draft id Returns: """ response = self._session.get( f"{self.publication_url}/drafts/{draft}/prepublish" ) return self._handle_response(response=response) def publish_draft( self, draft, send: bool = True, share_automatically: bool = False ) -> dict: """ Args: draft: draft id send: share_automatically: Returns: """ response = self._session.post( f"{self.publication_url}/drafts/{draft}/publish", json={"send": send, "share_automatically": share_automatically}, ) return self._handle_response(response=response) def schedule_draft(self, draft, draft_datetime: datetime) -> dict: """ Args: draft: draft id draft_datetime: datetime to schedule the draft Returns: """ response = self._session.post( f"{self.publication_url}/drafts/{draft}/schedule", json={"post_date": draft_datetime.isoformat()}, ) return self._handle_response(response=response) def unschedule_draft(self, draft) -> dict: """ Args: draft: draft id Returns: """ response = self._session.post( f"{self.publication_url}/drafts/{draft}/schedule", json={"post_date": None} ) return self._handle_response(response=response) def get_image(self, image: str): """ This method generates a new substack link that contains the image. Args: image: filepath or original url of image. Returns: """ if os.path.exists(image): with open(image, "rb") as file: image = b"data:image/jpeg;base64," + base64.b64encode(file.read()) response = self._session.post( f"{self.publication_url}/image", data={"image": image}, ) return self._handle_response(response=response) def get_categories(self): """ Retrieve list of all available categories. Returns: """ response = self._session.get(f"{self.base_url}/categories") return self._handle_response(response=response) def get_category(self, category_id, category_type, page): """ Args: category_id: category_type: page: Returns: """ response = self._session.get( f"{self.base_url}/category/public/{category_id}/{category_type}", params={"page": page}, ) return self._handle_response(response=response) def get_single_category(self, category_id, category_type, page=None, limit=None): """ Args: category_id: category_type: paid or all page: by default substack retrieves only the first 25 publications in the category. If this is left None, then all pages will be retrieved. The page size is 25 publications. limit: Returns: """ if page is not None: output = self.get_category(category_id, category_type, page) else: publications = [] page = 0 while True: page_output = self.get_category(category_id, category_type, page) publications.extend(page_output.get("publications", [])) if ( limit is not None and limit <= len(publications) ) or not page_output.get("more", False): publications = publications[:limit] break page += 1 output = { "publications": publications, "more": page_output.get("more", False), } return output def delete_all_drafts(self): """ Returns: """ response = None while True: drafts = self.get_drafts(filter="draft", limit=10, offset=0) if len(drafts) == 0: break for draft in drafts: response = self.delete_draft(draft.get("id")) return response def get_sections(self): """ Get a list of the sections of your publication. TODO: this is hacky but I cannot find another place where to get the sections. Returns: """ response = self._session.get( f"{self.publication_url}/subscriptions", ) content = Api._handle_response(response=response) sections = [ p.get("sections") for p in content.get("publications") if p.get("hostname") in self.publication_url ] return sections[0] def publication_embed(self, url): """ Args: url: Returns: """ return self.call("/publication/embed", "GET", url=url) def call(self, endpoint, method, **params): """ Args: endpoint: method: **params: Returns: """ response = self._session.request( method=method, url=f"{self.publication_url}/{endpoint}", params=params, ) return self._handle_response(response=response)