From f17cd92f90eb8c24e37db8ff1e2eb93604c02b5a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ga=C3=ABl?= Date: Wed, 3 Jul 2024 17:01:29 +0200 Subject: [PATCH] fixing cookie things, more or less --- Dockerfile | 5 +- Post_RSS_on_SubStack.py | 21 +- feeds.txt | 15 + generate_cookie.py | 53 +++ requirements.txt | 3 +- substack/__init__.py | 11 + substack/api.py | 708 ++++++++++++++++++++++++++++++++++++++++ substack/exceptions.py | 32 ++ substack/post.py | 331 +++++++++++++++++++ 9 files changed, 1170 insertions(+), 9 deletions(-) create mode 100644 feeds.txt create mode 100644 generate_cookie.py create mode 100644 substack/__init__.py create mode 100644 substack/api.py create mode 100644 substack/exceptions.py create mode 100644 substack/post.py diff --git a/Dockerfile b/Dockerfile index 8310260..ab5816c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,10 +1,13 @@ FROM python:3.8 + RUN apt-get update && apt-get install -y git RUN git clone http://192.168.1.25:8124/zep/Substack_JV.git /app WORKDIR /app RUN pip install -r requirements.txt ENV TZ=Europe/Brussels RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone + COPY update_and_run.sh /app RUN chmod +x /app/update_and_run.sh -CMD ["./update_and_run.sh"] \ No newline at end of file +CMD ["./update_and_run.sh"] +ENTRYPOINT ["sh", "-c", "./update_and_run.sh"] \ No newline at end of file diff --git a/Post_RSS_on_SubStack.py b/Post_RSS_on_SubStack.py index de47ce7..3e4fc3a 100644 --- a/Post_RSS_on_SubStack.py +++ b/Post_RSS_on_SubStack.py @@ -11,6 +11,9 @@ import re from logging.handlers import RotatingFileHandler import random +import pyvirtualdisplay + + from substack import Api from substack.post import Post @@ -45,10 +48,11 @@ class RSSfeed(): self.youtube = yt class SubStackTask: - def __init__(self, login, password, account, feeds): + def __init__(self, login, password, cookies_path, account, feeds): self.api = Api( email=login, password=password, + cookies_path=cookies_path, publication_url=account, ) @@ -95,11 +99,11 @@ class SubStackTask: formatted_date = formatted_date.replace(en, fr) return formatted_date - async def run_daily_at_6_am(self): + async def run_daily_at_11_am(self): while True: now = datetime.datetime.now() # Calculate the time until 6 AM next day - next_run = (now + datetime.timedelta(days=1)).replace(hour=6, minute=5, second=0, microsecond=0) + next_run = (now + datetime.timedelta(days=1)).replace(hour=11, minute=0, second=0, microsecond=0) sleep_seconds = (next_run - now).total_seconds() while sleep_seconds > 0: @@ -237,7 +241,12 @@ async def main(login, password, account): ff = r'/data/feeds.txt' if os.path.isfile(ff) is False: - ff = r'x:\substack\feeds.txt' + ff = r'feeds.txt' + + cookies_path = r'/data/cookies.json' + if os.path.isfile(cookies_path) is False: + cookies_path = r'cookies.json' + with open(ff) as file: lines = [line.rstrip() for line in file] @@ -245,10 +254,10 @@ async def main(login, password, account): youtube = "youtube" in line feeds.append(RSSfeed(line, youtube)) - task = SubStackTask(login, password, account, feeds) + task = SubStackTask(login, password, cookies_path, account, feeds) LOG.info("Starting bot") - await task.run_daily_at_6_am() + await task.run_daily_at_11_am() #await task.daily_task() diff --git a/feeds.txt b/feeds.txt new file mode 100644 index 0000000..dcfd355 --- /dev/null +++ b/feeds.txt @@ -0,0 +1,15 @@ +https://www.factornews.com/rss.xml +https://nofrag.com/feed +https://dystopeek.fr/feed/ +https://thepixelpost.com/rss/ +https://yamukass.substack.com/feed +https://tseret.com/categorie/tests/feed +https://www.gamesidestory.com/feed +https://www.nintendo-town.fr/feed +https://jesuisungameur.com/feed +https://www.switch-actu.fr/categorie/tests/tests-de-jeux/feed +https://www.playscope.com/category/articles/test-gaming/feed +https://jrpgfr.net/category/test/feed +https://jv.jeuxonline.info/rss/dossiers/rss.xml +https://www.youtube.com/feeds/videos.xml?channel_id=UC-OvBDfZGn1OdsqMBwkOI_A +https://www.youtube.com/feeds/videos.xml?playlist_id=PLZRiqJjIUlDTrwYs_UqEIts5fVaBpaIEz \ No newline at end of file diff --git a/generate_cookie.py b/generate_cookie.py new file mode 100644 index 0000000..24bd307 --- /dev/null +++ b/generate_cookie.py @@ -0,0 +1,53 @@ +from selenium import webdriver +from selenium.webdriver.common.by import By +from selenium.webdriver.common.keys import Keys +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC + +import time +import pickle + +def save_cookies(driver, path): + with open(path, 'wb') as file: + pickle.dump(driver.get_cookies(), file) + +def generate_cookies(email, password): + cookie_path = r'x:\substack\cookies.pkl' + + + chrome_options = Options() + + driver = webdriver.Chrome(options=chrome_options) + driver.get('https://substack.com/sign-in') + + wait = WebDriverWait(driver, 10) + + try: + login_with_password_button = wait.until( + EC.element_to_be_clickable((By.LINK_TEXT, 'Sign in with password')) + ) + login_with_password_button.click() + + time.sleep(2) + + email_field = driver.find_element(By.NAME, 'email') + email_field.send_keys(email) + + password_field = driver.find_element(By.NAME, 'password') + password_field.send_keys(password) + + password_field.send_keys(Keys.RETURN) + + save_cookies(driver, cookie_path) + print("Cookies saved successfully.") + except Exception as e: + print("An error occurred during login.", e) + + driver.quit() + + +if __name__ == "__main__": + email = "gael.honorez@gmail.com" + password = "f3PaTGedjFc2gkr1ypi5" + generate_cookies(email, password) diff --git a/requirements.txt b/requirements.txt index 7189d4d..5d59535 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,2 @@ requests -feedparser -python-substack \ No newline at end of file +feedparser \ No newline at end of file diff --git a/substack/__init__.py b/substack/__init__.py new file mode 100644 index 0000000..0d8268f --- /dev/null +++ b/substack/__init__.py @@ -0,0 +1,11 @@ +"""A library that provides a Python interface to the Substack API.""" + +__author__ = "Paolo Mazza" +__email__ = "mazzapaolo2019@gmail.com" +__license__ = "MIT License" +__version__ = "1.0" +__url__ = "https://github.com/ma2za/python-substack" +__download_url__ = "https://pypi.python.org/pypi/python-substack" +__description__ = "A Python wrapper around the Substack API" + +from .api import Api diff --git a/substack/api.py b/substack/api.py new file mode 100644 index 0000000..5a6cfc4 --- /dev/null +++ b/substack/api.py @@ -0,0 +1,708 @@ +""" + +API Wrapper + +""" + +import base64 +import json +import logging +import os +from datetime import datetime +from urllib.parse import urljoin +from pyvirtualdisplay import Display +import requests +from selenium import webdriver +from selenium.webdriver.common.keys import Keys +from selenium.webdriver.common.by import By +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.support.ui import WebDriverWait +import pickle +import time +from substack.exceptions import SubstackAPIException, SubstackRequestException +from selenium.webdriver.support import expected_conditions as EC +import datetime +logger = logging.getLogger(__name__) + +__all__ = ["Api"] + + +class Api: + """ + + A python interface into the Substack API + + """ + + def __init__( + self, + email=None, + password=None, + cookies_path=None, + base_url=None, + publication_url=None, + debug=False, + ): + """ + + To create an instance of the substack.Api class: + >>> import substack + >>> api = substack.Api(email="substack email", password="substack password") + + Args: + email: + password: + cookies_path + To re-use your session without logging in each time, you can save your cookies to a json file and + then load them in the next session. + Make sure to re-save your cookies, as they do update over time. + base_url: + The base URL to use to contact the Substack API. + Defaults to https://substack.com/api/v1. + """ + self.base_url = base_url or "https://substack.com/api/v1" + self.email = email + self.password = password + + if debug: + logging.basicConfig() + logging.getLogger().setLevel(logging.DEBUG) + + self._session = requests.Session() + + # Load cookies from file if provided + # Helps with Captcha errors by reusing cookies from "local" auth, then switching to running code in the cloud + + if cookies_path is not None: + with open(cookies_path) as f: + cookies = json.load(f) + self._session.cookies.update(cookies) + + elif email is not None and password is not None: + self.send_magic_link(email) + magic_link = input("Enter magic link: ") + self.login_v2(email, password, magic_link) + self.export_cookies(cookies_path) + + else: + raise ValueError( + "Must provide email and password or cookies_path to authenticate." + ) + + user_publication = None + # if the user provided a publication url, then use that + if publication_url: + import re + + # Regular expression to extract subdomain name + match = re.search(r"https://(.*).substack.com", publication_url.lower()) + subdomain = match.group(1) if match else None + + user_publications = self.get_user_publications() + # search through publications to find the publication with the matching subdomain + for publication in user_publications: + if publication["subdomain"] == subdomain: + # set the current publication to the users publication + user_publication = publication + break + else: + # get the users primary publication + user_publication = self.get_user_primary_publication() + + # set the current publication to the users primary publication + self.change_publication(user_publication) + + def send_magic_link(self, email): + body = { + "email": email, + "redirect": "/", + "for_pub": "", + } + endpoint = f"https://substack.com/api/v1/email-login/" + response = self._session.post(endpoint, json=body) + print("Magic link sent!") + + + def login_v2(self, email, password, magic_link): + return self._session.get(magic_link) + + def login(self, email, password) -> dict: + """ + + Login to the substack account. + + Args: + email: substack account email + password: substack account password + """ + + response = self._session.post( + f"{self.base_url}/login", + json={ + "captcha_response": None, + "email": email, + "password": password, + }, + ) + + return self._handle_response(response=response) + + + + def signin_for_pub(self, publication): + """ + Complete the signin process + """ + response = self._session.get( + f"https://substack.com/sign-in?redirect=%2F&for_pub={publication['subdomain']}", + ) + try: + output = self._handle_response(response=response) + except SubstackRequestException as ex: + output = {} + return output + + def change_publication(self, publication): + """ + Change the publication URL + """ + self.publication_url = urljoin(publication["publication_url"], "api/v1") + + # sign-in to the publication + self.signin_for_pub(publication) + + def export_cookies(self, path: str = "cookies.json"): + """ + Export cookies to a json file. + Args: + path: path to the json file + """ + cookies = self._session.cookies.get_dict() + with open(path, "w") as f: + json.dump(cookies, f) + + def save_cookies(self, driver, path): + with open(path, 'wb') as file: + pickle.dump(driver.get_cookies(), file) + + def load_cookies(self, driver, path): + with open(path, 'rb') as file: + cookies = pickle.load(file) + for cookie in cookies: + driver.add_cookie(cookie) + + def login_with_selenium(self): + """ + Login using Selenium to solve CAPTCHA manually. + """ + + # Start virtual display + cookie_path = 'cookies.pkl' + + if os.path.exists(cookie_path): + try: + # Load cookies directly to session if they exist + with open(cookie_path, 'rb') as file: + cookies = pickle.load(file) + cookies_valid = True + for cookie in cookies: + if 'expiry' in cookie: + expiry_date = datetime.datetime.fromtimestamp(cookie['expiry']) + print(f"Cookie {cookie['name']} expires on {expiry_date}") + if cookie['expiry'] < time.time(): + cookies_valid = False + print(f"Cookie {cookie['name']} has expired.") + break + if cookies_valid: + # Load cookies into session + for cookie in cookies: + self._session.cookies.set(cookie['name'], cookie['value']) + + print("Cookies loaded successfully. Skipping login.") + return + except Exception as e: + print("Error loading cookies, proceeding with Selenium login.", e) + + #display = Display() + #display.start() + print("Login with selenium") + chrome_options = Options() + chrome_options.add_argument("--headless") + chrome_options.add_argument("--no-sandbox") + chrome_options.add_argument("--disable-dev-shm-usage") + chrome_options.add_argument("--disable-gpu") + + driver = webdriver.Chrome() + driver.get('https://substack.com/sign-in') + + # Check if already logged in by checking the presence of a user-specific element + wait = WebDriverWait(driver, 10) + try: + + # Adjust the selector to match an element that is present only when logged in + dashboard_button = wait.until( + EC.presence_of_element_located((By.CSS_SELECTOR, 'button[data-href*="publish/home?utm_source=menu"]')) + ) + print("Already logged in.") + + except Exception: + print("Not logged in. Proceeding with login steps.") + try: + login_with_password_button = wait.until(EC.element_to_be_clickable((By.LINK_TEXT, 'Log in with password'))) + login_with_password_button.click() + + time.sleep(2) # Wait for the transition to the login form + + # Fill in the email and password fields + email_field = driver.find_element(By.NAME, 'email') + email_field.send_keys(self.email) + + password_field = driver.find_element(By.NAME, 'password') + password_field.send_keys(self.password) + + # Submit the form + password_field.send_keys(Keys.RETURN) + + print("Please solve the CAPTCHA manually in the opened browser.") + input("Press Enter after solving the CAPTCHA...") + + # Save cookies after solving the CAPTCHA + self.save_cookies(driver, cookie_path) + print("Cookies saved successfully.") + except Exception as e: + print("An error occurred during login.", e) + + # Save cookies after login or cookie load + self.save_cookies(driver, cookie_path) + + # Extract cookies to use with requests + cookies = driver.get_cookies() + for cookie in cookies: + self._session.cookies.set(cookie['name'], cookie['value']) + + # Close the browser + driver.quit() + + + def _handle_response(self, response: requests.Response): + + """ + + Internal helper for handling API responses from the Substack server. + Raises the appropriate exceptions when necessary; otherwise, returns the + response. + + """ + if (response.status_code == 401): + print("CAPTCHA detected, switching to Selenium for manual solving.") + + return + + if not (200 <= response.status_code < 300): + raise SubstackAPIException(response.status_code, response.text) + try: + return response.json() + except ValueError: + raise SubstackRequestException("Invalid Response: %s" % response.text) + + def get_user_id(self): + """ + + Returns: + + """ + profile = self.get_user_profile() + user_id = profile["id"] + + return user_id + + @staticmethod + def get_publication_url(publication: dict) -> str: + """ + Gets the publication url + + Args: + publication: + """ + custom_domain = publication["custom_domain"] + if not custom_domain: + publication_url = f"https://{publication['subdomain']}.substack.com" + else: + publication_url = f"https://{custom_domain}" + + return publication_url + + def get_user_primary_publication(self): + """ + Gets the users primary publication + """ + + profile = self.get_user_profile() + primary_publication = profile["primaryPublication"] + primary_publication["publication_url"] = self.get_publication_url( + primary_publication + ) + + return primary_publication + + def get_user_publications(self): + """ + Gets the users publications + """ + + profile = self.get_user_profile() + + # Loop through users "publicationUsers" list, and return a list + # of dictionaries of "name", and "subdomain", and "id" + user_publications = [] + for publication in profile["publicationUsers"]: + pub = publication["publication"] + pub["publication_url"] = self.get_publication_url(pub) + user_publications.append(pub) + + return user_publications + + def get_user_profile(self): + """ + Gets the users profile + """ + response = self._session.get(f"{self.base_url}/user/profile/self") + + return self._handle_response(response=response) + + def get_user_settings(self): + """ + Get list of users. + + Returns: + + """ + response = self._session.get(f"{self.base_url}/settings") + + return self._handle_response(response=response) + + def get_publication_users(self): + """ + Get list of users. + + Returns: + + """ + response = self._session.get(f"{self.publication_url}/publication/users") + + return self._handle_response(response=response) + + def get_publication_subscriber_count(self): + + """ + Get subscriber count. + + Returns: + + """ + response = self._session.get( + f"{self.publication_url}/publication_launch_checklist" + ) + + return self._handle_response(response=response)["subscriberCount"] + + def get_published_posts( + self, offset=0, limit=25, order_by="post_date", order_direction="desc" + ): + """ + Get list of published posts for the publication. + """ + response = self._session.get( + f"{self.publication_url}/post_management/published", + params={ + "offset": offset, + "limit": limit, + "order_by": order_by, + "order_direction": order_direction, + }, + ) + + return self._handle_response(response=response) + + def get_posts(self) -> dict: + """ + + Returns: + + """ + response = self._session.get(f"{self.base_url}/reader/posts") + + return self._handle_response(response=response) + + def get_drafts(self, filter=None, offset=None, limit=None): + """ + + Args: + filter: + offset: + limit: + + Returns: + + """ + response = self._session.get( + f"{self.publication_url}/drafts", + params={"filter": filter, "offset": offset, "limit": limit}, + ) + return self._handle_response(response=response) + + def get_draft(self, draft_id): + """ + Gets a draft given it's id. + + """ + response = self._session.get(f"{self.publication_url}/drafts/{draft_id}") + return self._handle_response(response=response) + + def delete_draft(self, draft_id): + """ + + Args: + draft_id: + + Returns: + + """ + response = self._session.delete(f"{self.publication_url}/drafts/{draft_id}") + return self._handle_response(response=response) + + def post_draft(self, body) -> dict: + """ + + Args: + body: + + Returns: + + """ + response = self._session.post(f"{self.publication_url}/drafts", json=body) + return self._handle_response(response=response) + + def put_draft(self, draft, **kwargs) -> dict: + """ + + Args: + draft: + **kwargs: + + Returns: + + """ + response = self._session.put( + f"{self.publication_url}/drafts/{draft}", + json=kwargs, + ) + return self._handle_response(response=response) + + def prepublish_draft(self, draft) -> dict: + """ + + Args: + draft: draft id + + Returns: + + """ + + response = self._session.get( + f"{self.publication_url}/drafts/{draft}/prepublish" + ) + return self._handle_response(response=response) + + def publish_draft( + self, draft, send: bool = True, share_automatically: bool = False + ) -> dict: + """ + + Args: + draft: draft id + send: + share_automatically: + + Returns: + + """ + response = self._session.post( + f"{self.publication_url}/drafts/{draft}/publish", + json={"send": send, "share_automatically": share_automatically}, + ) + return self._handle_response(response=response) + + def schedule_draft(self, draft, draft_datetime: datetime) -> dict: + """ + + Args: + draft: draft id + draft_datetime: datetime to schedule the draft + + Returns: + + """ + response = self._session.post( + f"{self.publication_url}/drafts/{draft}/schedule", + json={"post_date": draft_datetime.isoformat()}, + ) + return self._handle_response(response=response) + + def unschedule_draft(self, draft) -> dict: + """ + + Args: + draft: draft id + + Returns: + + """ + response = self._session.post( + f"{self.publication_url}/drafts/{draft}/schedule", json={"post_date": None} + ) + return self._handle_response(response=response) + + def get_image(self, image: str): + """ + + This method generates a new substack link that contains the image. + + Args: + image: filepath or original url of image. + + Returns: + + """ + if os.path.exists(image): + with open(image, "rb") as file: + image = b"data:image/jpeg;base64," + base64.b64encode(file.read()) + + response = self._session.post( + f"{self.publication_url}/image", + data={"image": image}, + ) + return self._handle_response(response=response) + + def get_categories(self): + """ + + Retrieve list of all available categories. + + Returns: + + """ + response = self._session.get(f"{self.base_url}/categories") + return self._handle_response(response=response) + + def get_category(self, category_id, category_type, page): + """ + + Args: + category_id: + category_type: + page: + + Returns: + + """ + response = self._session.get( + f"{self.base_url}/category/public/{category_id}/{category_type}", + params={"page": page}, + ) + return self._handle_response(response=response) + + def get_single_category(self, category_id, category_type, page=None, limit=None): + """ + + Args: + category_id: + category_type: paid or all + page: by default substack retrieves only the first 25 publications in the category. If this is left None, + then all pages will be retrieved. The page size is 25 publications. + limit: + Returns: + + """ + if page is not None: + output = self.get_category(category_id, category_type, page) + else: + publications = [] + page = 0 + while True: + page_output = self.get_category(category_id, category_type, page) + publications.extend(page_output.get("publications", [])) + if ( + limit is not None and limit <= len(publications) + ) or not page_output.get("more", False): + publications = publications[:limit] + break + page += 1 + output = { + "publications": publications, + "more": page_output.get("more", False), + } + return output + + def delete_all_drafts(self): + """ + + Returns: + + """ + response = None + while True: + drafts = self.get_drafts(filter="draft", limit=10, offset=0) + if len(drafts) == 0: + break + for draft in drafts: + response = self.delete_draft(draft.get("id")) + return response + + def get_sections(self): + """ + Get a list of the sections of your publication. + + TODO: this is hacky but I cannot find another place where to get the sections. + Returns: + + """ + response = self._session.get( + f"{self.publication_url}/subscriptions", + ) + content = Api._handle_response(response=response) + sections = [ + p.get("sections") + for p in content.get("publications") + if p.get("hostname") in self.publication_url + ] + return sections[0] + + def publication_embed(self, url): + """ + + Args: + url: + + Returns: + + """ + return self.call("/publication/embed", "GET", url=url) + + def call(self, endpoint, method, **params): + """ + + Args: + endpoint: + method: + **params: + + Returns: + + """ + response = self._session.request( + method=method, + url=f"{self.publication_url}/{endpoint}", + params=params, + ) + return self._handle_response(response=response) diff --git a/substack/exceptions.py b/substack/exceptions.py new file mode 100644 index 0000000..e9b6f29 --- /dev/null +++ b/substack/exceptions.py @@ -0,0 +1,32 @@ +import json + + +class SubstackAPIException(Exception): + def __init__(self, status_code, text): + try: + json_res = json.loads(text) + except ValueError: + self.message = f"Invalid JSON error message from Substack: {text}" + else: + self.message = ", ".join( + list( + map(lambda error: error.get("msg", ""), json_res.get("errors", [])) + ) + ) + self.message = self.message or json_res.get("error", "") + self.status_code = status_code + + def __str__(self): + return f"APIError(code={self.status_code}): {self.message}" + + +class SubstackRequestException(Exception): + def __init__(self, message): + self.message = message + + def __str__(self): + return f"SubstackRequestException: {self.message}" + + +class SectionNotExistsException(SubstackRequestException): + pass diff --git a/substack/post.py b/substack/post.py new file mode 100644 index 0000000..825baf5 --- /dev/null +++ b/substack/post.py @@ -0,0 +1,331 @@ +""" + +Post Utilities + +""" + +import json +from typing import Dict + +__all__ = ["Post"] + +from substack.exceptions import SectionNotExistsException + + +class Post: + """ + + Post utility class + + """ + + def __init__( + self, + title: str, + subtitle: str, + user_id, + audience: str = None, + write_comment_permissions: str = None, + ): + """ + + Args: + title: + subtitle: + user_id: + audience: possible values: everyone, only_paid, founding, only_free + write_comment_permissions: none, only_paid, everyone (this field is a mess) + """ + self.draft_title = title + self.draft_subtitle = subtitle + self.draft_body = {"type": "doc", "content": []} + self.draft_bylines = [{"id": int(user_id), "is_guest": False}] + self.audience = audience if audience is not None else "everyone" + self.draft_section_id = None + self.section_chosen = True + + # TODO better understand the possible values and combinations with audience + if write_comment_permissions is not None: + self.write_comment_permissions = write_comment_permissions + else: + self.write_comment_permissions = self.audience + + def set_section(self, name: str, sections: list): + """ + + Args: + name: + sections: + + Returns: + + """ + section = [s for s in sections if s.get("name") == name] + if len(section) != 1: + raise SectionNotExistsException(name) + section = section[0] + self.draft_section_id = section.get("id") + + def add(self, item: Dict): + """ + + Add item to draft body. + + Args: + item: + + Returns: + + """ + + self.draft_body["content"] = self.draft_body.get("content", []) + [ + {"type": item.get("type")} + ] + content = item.get("content") + if item.get("type") == "captionedImage": + self.captioned_image(**item) + elif item.get("type") == "embeddedPublication": + self.draft_body["content"][-1]["attrs"] = item.get("url") + elif item.get("type") == "youtube2": + self.youtube(item.get("src")) + elif item.get("type") == "subscribeWidget": + self.subscribe_with_caption(item.get("message")) + else: + if content is not None: + self.add_complex_text(content) + + if item.get("type") == "heading": + self.attrs(item.get("level", 1)) + + marks = item.get("marks") + if marks is not None: + self.marks(marks) + + return self + + def paragraph(self, content=None): + """ + + Args: + content: + + Returns: + + """ + item = {"type": "paragraph"} + if content is not None: + item["content"] = content + return self.add(item) + + def heading(self, content=None, level: int = 1): + """ + + Args: + content: + level: + + Returns: + + """ + + item = {"type": "heading"} + if content is not None: + item["content"] = content + item["level"] = level + return self.add(item) + + def horizontal_rule(self): + """ + + Returns: + + """ + return self.add({"type": "horizontal_rule"}) + + def attrs(self, level): + """ + + Args: + level: + + Returns: + + """ + content_attrs = self.draft_body["content"][-1].get("attrs", {}) + content_attrs.update({"level": level}) + self.draft_body["content"][-1]["attrs"] = content_attrs + return self + + def captioned_image( + self, + src: str, + fullscreen: bool = False, + imageSize: str = "normal", + height: int = 819, + width: int = 1456, + resizeWidth: int = 728, + bytes: str = None, + alt: str = None, + title: str = None, + type: str = None, + href: str = None, + belowTheFold: bool = False, + internalRedirect: str = None, + ): + """ + + Add image to body. + + Args: + bytes: + alt: + title: + type: + href: + belowTheFold: + internalRedirect: + src: + fullscreen: + imageSize: + height: + width: + resizeWidth: + """ + + content = self.draft_body["content"][-1].get("content", []) + content += [ + { + "type": "image2", + "attrs": { + "src": src, + "fullscreen": fullscreen, + "imageSize": imageSize, + "height": height, + "width": width, + "resizeWidth": resizeWidth, + "bytes": bytes, + "alt": alt, + "title": title, + "type": type, + "href": href, + "belowTheFold": belowTheFold, + "internalRedirect": internalRedirect, + }, + } + ] + self.draft_body["content"][-1]["content"] = content + return self + + def text(self, value: str): + """ + + Add text to the last paragraph. + + Args: + value: Text to add to paragraph. + + Returns: + + """ + content = self.draft_body["content"][-1].get("content", []) + content += [{"type": "text", "text": value}] + self.draft_body["content"][-1]["content"] = content + return self + + def add_complex_text(self, text): + """ + + Args: + text: + """ + if isinstance(text, str): + self.text(text) + else: + for chunk in text: + if chunk: + self.text(chunk.get("content")).marks(chunk.get("marks", [])) + + def marks(self, marks): + """ + + Args: + marks: + + Returns: + + """ + content = self.draft_body["content"][-1].get("content", [])[-1] + content_marks = content.get("marks", []) + for mark in marks: + new_mark = {"type": mark.get("type")} + if mark.get("type") == "link": + href = mark.get("href") + new_mark.update({"attrs": {"href": href}}) + content_marks.append(new_mark) + content["marks"] = content_marks + return self + + def remove_last_paragraph(self): + """Remove last paragraph""" + del self.draft_body.get("content")[-1] + + def get_draft(self): + """ + + Returns: + + """ + out = vars(self) + out["draft_body"] = json.dumps(out["draft_body"]) + return out + + def subscribe_with_caption(self, message: str = None): + """ + + Add subscribe widget with caption + + Args: + message: + + Returns: + + """ + + if message is None: + message = """Thanks for reading this newsletter! + Subscribe for free to receive new posts and support my work.""" + + subscribe = self.draft_body["content"][-1] + subscribe["attrs"] = { + "url": "%%checkout_url%%", + "text": "Subscribe", + "language": "en", + } + subscribe["content"] = [ + { + "type": "ctaCaption", + "content": [ + { + "type": "text", + "text": message, + } + ], + } + ] + return self + + def youtube(self, value: str): + """ + + Add youtube video to post. + + Args: + value: youtube url + + Returns: + + """ + content_attrs = self.draft_body["content"][-1].get("attrs", {}) + content_attrs.update({"videoId": value}) + self.draft_body["content"][-1]["attrs"] = content_attrs + return self