Compare commits

...

2 Commits

Author SHA1 Message Date
Gaël
e0127a0362 reset the time to 6pm 2024-07-03 17:02:58 +02:00
Gaël
f17cd92f90 fixing cookie things, more or less 2024-07-03 17:01:29 +02:00
9 changed files with 1167 additions and 6 deletions

View File

@@ -1,10 +1,13 @@
FROM python:3.8
RUN apt-get update && apt-get install -y git
RUN git clone http://192.168.1.25:8124/zep/Substack_JV.git /app
WORKDIR /app
RUN pip install -r requirements.txt
ENV TZ=Europe/Brussels
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
COPY update_and_run.sh /app
RUN chmod +x /app/update_and_run.sh
CMD ["./update_and_run.sh"]
ENTRYPOINT ["sh", "-c", "./update_and_run.sh"]

View File

@@ -11,6 +11,9 @@ import re
from logging.handlers import RotatingFileHandler
import random
import pyvirtualdisplay
from substack import Api
from substack.post import Post
@@ -45,10 +48,11 @@ class RSSfeed():
self.youtube = yt
class SubStackTask:
def __init__(self, login, password, account, feeds):
def __init__(self, login, password, cookies_path, account, feeds):
self.api = Api(
email=login,
password=password,
cookies_path=cookies_path,
publication_url=account,
)
@@ -237,7 +241,12 @@ async def main(login, password, account):
ff = r'/data/feeds.txt'
if os.path.isfile(ff) is False:
ff = r'x:\substack\feeds.txt'
ff = r'feeds.txt'
cookies_path = r'/data/cookies.json'
if os.path.isfile(cookies_path) is False:
cookies_path = r'cookies.json'
with open(ff) as file:
lines = [line.rstrip() for line in file]
@@ -245,7 +254,7 @@ async def main(login, password, account):
youtube = "youtube" in line
feeds.append(RSSfeed(line, youtube))
task = SubStackTask(login, password, account, feeds)
task = SubStackTask(login, password, cookies_path, account, feeds)
LOG.info("Starting bot")
await task.run_daily_at_6_am()

15
feeds.txt Normal file
View File

@@ -0,0 +1,15 @@
https://www.factornews.com/rss.xml
https://nofrag.com/feed
https://dystopeek.fr/feed/
https://thepixelpost.com/rss/
https://yamukass.substack.com/feed
https://tseret.com/categorie/tests/feed
https://www.gamesidestory.com/feed
https://www.nintendo-town.fr/feed
https://jesuisungameur.com/feed
https://www.switch-actu.fr/categorie/tests/tests-de-jeux/feed
https://www.playscope.com/category/articles/test-gaming/feed
https://jrpgfr.net/category/test/feed
https://jv.jeuxonline.info/rss/dossiers/rss.xml
https://www.youtube.com/feeds/videos.xml?channel_id=UC-OvBDfZGn1OdsqMBwkOI_A
https://www.youtube.com/feeds/videos.xml?playlist_id=PLZRiqJjIUlDTrwYs_UqEIts5fVaBpaIEz

53
generate_cookie.py Normal file
View File

@@ -0,0 +1,53 @@
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import time
import pickle
def save_cookies(driver, path):
with open(path, 'wb') as file:
pickle.dump(driver.get_cookies(), file)
def generate_cookies(email, password):
cookie_path = r'x:\substack\cookies.pkl'
chrome_options = Options()
driver = webdriver.Chrome(options=chrome_options)
driver.get('https://substack.com/sign-in')
wait = WebDriverWait(driver, 10)
try:
login_with_password_button = wait.until(
EC.element_to_be_clickable((By.LINK_TEXT, 'Sign in with password'))
)
login_with_password_button.click()
time.sleep(2)
email_field = driver.find_element(By.NAME, 'email')
email_field.send_keys(email)
password_field = driver.find_element(By.NAME, 'password')
password_field.send_keys(password)
password_field.send_keys(Keys.RETURN)
save_cookies(driver, cookie_path)
print("Cookies saved successfully.")
except Exception as e:
print("An error occurred during login.", e)
driver.quit()
if __name__ == "__main__":
email = "gael.honorez@gmail.com"
password = "f3PaTGedjFc2gkr1ypi5"
generate_cookies(email, password)

View File

@@ -1,3 +1,2 @@
requests
feedparser
python-substack

11
substack/__init__.py Normal file
View File

@@ -0,0 +1,11 @@
"""A library that provides a Python interface to the Substack API."""
__author__ = "Paolo Mazza"
__email__ = "mazzapaolo2019@gmail.com"
__license__ = "MIT License"
__version__ = "1.0"
__url__ = "https://github.com/ma2za/python-substack"
__download_url__ = "https://pypi.python.org/pypi/python-substack"
__description__ = "A Python wrapper around the Substack API"
from .api import Api

708
substack/api.py Normal file
View File

@@ -0,0 +1,708 @@
"""
API Wrapper
"""
import base64
import json
import logging
import os
from datetime import datetime
from urllib.parse import urljoin
from pyvirtualdisplay import Display
import requests
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
import pickle
import time
from substack.exceptions import SubstackAPIException, SubstackRequestException
from selenium.webdriver.support import expected_conditions as EC
import datetime
logger = logging.getLogger(__name__)
__all__ = ["Api"]
class Api:
"""
A python interface into the Substack API
"""
def __init__(
self,
email=None,
password=None,
cookies_path=None,
base_url=None,
publication_url=None,
debug=False,
):
"""
To create an instance of the substack.Api class:
>>> import substack
>>> api = substack.Api(email="substack email", password="substack password")
Args:
email:
password:
cookies_path
To re-use your session without logging in each time, you can save your cookies to a json file and
then load them in the next session.
Make sure to re-save your cookies, as they do update over time.
base_url:
The base URL to use to contact the Substack API.
Defaults to https://substack.com/api/v1.
"""
self.base_url = base_url or "https://substack.com/api/v1"
self.email = email
self.password = password
if debug:
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
self._session = requests.Session()
# Load cookies from file if provided
# Helps with Captcha errors by reusing cookies from "local" auth, then switching to running code in the cloud
if cookies_path is not None:
with open(cookies_path) as f:
cookies = json.load(f)
self._session.cookies.update(cookies)
elif email is not None and password is not None:
self.send_magic_link(email)
magic_link = input("Enter magic link: ")
self.login_v2(email, password, magic_link)
self.export_cookies(cookies_path)
else:
raise ValueError(
"Must provide email and password or cookies_path to authenticate."
)
user_publication = None
# if the user provided a publication url, then use that
if publication_url:
import re
# Regular expression to extract subdomain name
match = re.search(r"https://(.*).substack.com", publication_url.lower())
subdomain = match.group(1) if match else None
user_publications = self.get_user_publications()
# search through publications to find the publication with the matching subdomain
for publication in user_publications:
if publication["subdomain"] == subdomain:
# set the current publication to the users publication
user_publication = publication
break
else:
# get the users primary publication
user_publication = self.get_user_primary_publication()
# set the current publication to the users primary publication
self.change_publication(user_publication)
def send_magic_link(self, email):
body = {
"email": email,
"redirect": "/",
"for_pub": "",
}
endpoint = f"https://substack.com/api/v1/email-login/"
response = self._session.post(endpoint, json=body)
print("Magic link sent!")
def login_v2(self, email, password, magic_link):
return self._session.get(magic_link)
def login(self, email, password) -> dict:
"""
Login to the substack account.
Args:
email: substack account email
password: substack account password
"""
response = self._session.post(
f"{self.base_url}/login",
json={
"captcha_response": None,
"email": email,
"password": password,
},
)
return self._handle_response(response=response)
def signin_for_pub(self, publication):
"""
Complete the signin process
"""
response = self._session.get(
f"https://substack.com/sign-in?redirect=%2F&for_pub={publication['subdomain']}",
)
try:
output = self._handle_response(response=response)
except SubstackRequestException as ex:
output = {}
return output
def change_publication(self, publication):
"""
Change the publication URL
"""
self.publication_url = urljoin(publication["publication_url"], "api/v1")
# sign-in to the publication
self.signin_for_pub(publication)
def export_cookies(self, path: str = "cookies.json"):
"""
Export cookies to a json file.
Args:
path: path to the json file
"""
cookies = self._session.cookies.get_dict()
with open(path, "w") as f:
json.dump(cookies, f)
def save_cookies(self, driver, path):
with open(path, 'wb') as file:
pickle.dump(driver.get_cookies(), file)
def load_cookies(self, driver, path):
with open(path, 'rb') as file:
cookies = pickle.load(file)
for cookie in cookies:
driver.add_cookie(cookie)
def login_with_selenium(self):
"""
Login using Selenium to solve CAPTCHA manually.
"""
# Start virtual display
cookie_path = 'cookies.pkl'
if os.path.exists(cookie_path):
try:
# Load cookies directly to session if they exist
with open(cookie_path, 'rb') as file:
cookies = pickle.load(file)
cookies_valid = True
for cookie in cookies:
if 'expiry' in cookie:
expiry_date = datetime.datetime.fromtimestamp(cookie['expiry'])
print(f"Cookie {cookie['name']} expires on {expiry_date}")
if cookie['expiry'] < time.time():
cookies_valid = False
print(f"Cookie {cookie['name']} has expired.")
break
if cookies_valid:
# Load cookies into session
for cookie in cookies:
self._session.cookies.set(cookie['name'], cookie['value'])
print("Cookies loaded successfully. Skipping login.")
return
except Exception as e:
print("Error loading cookies, proceeding with Selenium login.", e)
#display = Display()
#display.start()
print("Login with selenium")
chrome_options = Options()
chrome_options.add_argument("--headless")
chrome_options.add_argument("--no-sandbox")
chrome_options.add_argument("--disable-dev-shm-usage")
chrome_options.add_argument("--disable-gpu")
driver = webdriver.Chrome()
driver.get('https://substack.com/sign-in')
# Check if already logged in by checking the presence of a user-specific element
wait = WebDriverWait(driver, 10)
try:
# Adjust the selector to match an element that is present only when logged in
dashboard_button = wait.until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'button[data-href*="publish/home?utm_source=menu"]'))
)
print("Already logged in.")
except Exception:
print("Not logged in. Proceeding with login steps.")
try:
login_with_password_button = wait.until(EC.element_to_be_clickable((By.LINK_TEXT, 'Log in with password')))
login_with_password_button.click()
time.sleep(2) # Wait for the transition to the login form
# Fill in the email and password fields
email_field = driver.find_element(By.NAME, 'email')
email_field.send_keys(self.email)
password_field = driver.find_element(By.NAME, 'password')
password_field.send_keys(self.password)
# Submit the form
password_field.send_keys(Keys.RETURN)
print("Please solve the CAPTCHA manually in the opened browser.")
input("Press Enter after solving the CAPTCHA...")
# Save cookies after solving the CAPTCHA
self.save_cookies(driver, cookie_path)
print("Cookies saved successfully.")
except Exception as e:
print("An error occurred during login.", e)
# Save cookies after login or cookie load
self.save_cookies(driver, cookie_path)
# Extract cookies to use with requests
cookies = driver.get_cookies()
for cookie in cookies:
self._session.cookies.set(cookie['name'], cookie['value'])
# Close the browser
driver.quit()
def _handle_response(self, response: requests.Response):
"""
Internal helper for handling API responses from the Substack server.
Raises the appropriate exceptions when necessary; otherwise, returns the
response.
"""
if (response.status_code == 401):
print("CAPTCHA detected, switching to Selenium for manual solving.")
return
if not (200 <= response.status_code < 300):
raise SubstackAPIException(response.status_code, response.text)
try:
return response.json()
except ValueError:
raise SubstackRequestException("Invalid Response: %s" % response.text)
def get_user_id(self):
"""
Returns:
"""
profile = self.get_user_profile()
user_id = profile["id"]
return user_id
@staticmethod
def get_publication_url(publication: dict) -> str:
"""
Gets the publication url
Args:
publication:
"""
custom_domain = publication["custom_domain"]
if not custom_domain:
publication_url = f"https://{publication['subdomain']}.substack.com"
else:
publication_url = f"https://{custom_domain}"
return publication_url
def get_user_primary_publication(self):
"""
Gets the users primary publication
"""
profile = self.get_user_profile()
primary_publication = profile["primaryPublication"]
primary_publication["publication_url"] = self.get_publication_url(
primary_publication
)
return primary_publication
def get_user_publications(self):
"""
Gets the users publications
"""
profile = self.get_user_profile()
# Loop through users "publicationUsers" list, and return a list
# of dictionaries of "name", and "subdomain", and "id"
user_publications = []
for publication in profile["publicationUsers"]:
pub = publication["publication"]
pub["publication_url"] = self.get_publication_url(pub)
user_publications.append(pub)
return user_publications
def get_user_profile(self):
"""
Gets the users profile
"""
response = self._session.get(f"{self.base_url}/user/profile/self")
return self._handle_response(response=response)
def get_user_settings(self):
"""
Get list of users.
Returns:
"""
response = self._session.get(f"{self.base_url}/settings")
return self._handle_response(response=response)
def get_publication_users(self):
"""
Get list of users.
Returns:
"""
response = self._session.get(f"{self.publication_url}/publication/users")
return self._handle_response(response=response)
def get_publication_subscriber_count(self):
"""
Get subscriber count.
Returns:
"""
response = self._session.get(
f"{self.publication_url}/publication_launch_checklist"
)
return self._handle_response(response=response)["subscriberCount"]
def get_published_posts(
self, offset=0, limit=25, order_by="post_date", order_direction="desc"
):
"""
Get list of published posts for the publication.
"""
response = self._session.get(
f"{self.publication_url}/post_management/published",
params={
"offset": offset,
"limit": limit,
"order_by": order_by,
"order_direction": order_direction,
},
)
return self._handle_response(response=response)
def get_posts(self) -> dict:
"""
Returns:
"""
response = self._session.get(f"{self.base_url}/reader/posts")
return self._handle_response(response=response)
def get_drafts(self, filter=None, offset=None, limit=None):
"""
Args:
filter:
offset:
limit:
Returns:
"""
response = self._session.get(
f"{self.publication_url}/drafts",
params={"filter": filter, "offset": offset, "limit": limit},
)
return self._handle_response(response=response)
def get_draft(self, draft_id):
"""
Gets a draft given it's id.
"""
response = self._session.get(f"{self.publication_url}/drafts/{draft_id}")
return self._handle_response(response=response)
def delete_draft(self, draft_id):
"""
Args:
draft_id:
Returns:
"""
response = self._session.delete(f"{self.publication_url}/drafts/{draft_id}")
return self._handle_response(response=response)
def post_draft(self, body) -> dict:
"""
Args:
body:
Returns:
"""
response = self._session.post(f"{self.publication_url}/drafts", json=body)
return self._handle_response(response=response)
def put_draft(self, draft, **kwargs) -> dict:
"""
Args:
draft:
**kwargs:
Returns:
"""
response = self._session.put(
f"{self.publication_url}/drafts/{draft}",
json=kwargs,
)
return self._handle_response(response=response)
def prepublish_draft(self, draft) -> dict:
"""
Args:
draft: draft id
Returns:
"""
response = self._session.get(
f"{self.publication_url}/drafts/{draft}/prepublish"
)
return self._handle_response(response=response)
def publish_draft(
self, draft, send: bool = True, share_automatically: bool = False
) -> dict:
"""
Args:
draft: draft id
send:
share_automatically:
Returns:
"""
response = self._session.post(
f"{self.publication_url}/drafts/{draft}/publish",
json={"send": send, "share_automatically": share_automatically},
)
return self._handle_response(response=response)
def schedule_draft(self, draft, draft_datetime: datetime) -> dict:
"""
Args:
draft: draft id
draft_datetime: datetime to schedule the draft
Returns:
"""
response = self._session.post(
f"{self.publication_url}/drafts/{draft}/schedule",
json={"post_date": draft_datetime.isoformat()},
)
return self._handle_response(response=response)
def unschedule_draft(self, draft) -> dict:
"""
Args:
draft: draft id
Returns:
"""
response = self._session.post(
f"{self.publication_url}/drafts/{draft}/schedule", json={"post_date": None}
)
return self._handle_response(response=response)
def get_image(self, image: str):
"""
This method generates a new substack link that contains the image.
Args:
image: filepath or original url of image.
Returns:
"""
if os.path.exists(image):
with open(image, "rb") as file:
image = b"data:image/jpeg;base64," + base64.b64encode(file.read())
response = self._session.post(
f"{self.publication_url}/image",
data={"image": image},
)
return self._handle_response(response=response)
def get_categories(self):
"""
Retrieve list of all available categories.
Returns:
"""
response = self._session.get(f"{self.base_url}/categories")
return self._handle_response(response=response)
def get_category(self, category_id, category_type, page):
"""
Args:
category_id:
category_type:
page:
Returns:
"""
response = self._session.get(
f"{self.base_url}/category/public/{category_id}/{category_type}",
params={"page": page},
)
return self._handle_response(response=response)
def get_single_category(self, category_id, category_type, page=None, limit=None):
"""
Args:
category_id:
category_type: paid or all
page: by default substack retrieves only the first 25 publications in the category. If this is left None,
then all pages will be retrieved. The page size is 25 publications.
limit:
Returns:
"""
if page is not None:
output = self.get_category(category_id, category_type, page)
else:
publications = []
page = 0
while True:
page_output = self.get_category(category_id, category_type, page)
publications.extend(page_output.get("publications", []))
if (
limit is not None and limit <= len(publications)
) or not page_output.get("more", False):
publications = publications[:limit]
break
page += 1
output = {
"publications": publications,
"more": page_output.get("more", False),
}
return output
def delete_all_drafts(self):
"""
Returns:
"""
response = None
while True:
drafts = self.get_drafts(filter="draft", limit=10, offset=0)
if len(drafts) == 0:
break
for draft in drafts:
response = self.delete_draft(draft.get("id"))
return response
def get_sections(self):
"""
Get a list of the sections of your publication.
TODO: this is hacky but I cannot find another place where to get the sections.
Returns:
"""
response = self._session.get(
f"{self.publication_url}/subscriptions",
)
content = Api._handle_response(response=response)
sections = [
p.get("sections")
for p in content.get("publications")
if p.get("hostname") in self.publication_url
]
return sections[0]
def publication_embed(self, url):
"""
Args:
url:
Returns:
"""
return self.call("/publication/embed", "GET", url=url)
def call(self, endpoint, method, **params):
"""
Args:
endpoint:
method:
**params:
Returns:
"""
response = self._session.request(
method=method,
url=f"{self.publication_url}/{endpoint}",
params=params,
)
return self._handle_response(response=response)

32
substack/exceptions.py Normal file
View File

@@ -0,0 +1,32 @@
import json
class SubstackAPIException(Exception):
def __init__(self, status_code, text):
try:
json_res = json.loads(text)
except ValueError:
self.message = f"Invalid JSON error message from Substack: {text}"
else:
self.message = ", ".join(
list(
map(lambda error: error.get("msg", ""), json_res.get("errors", []))
)
)
self.message = self.message or json_res.get("error", "")
self.status_code = status_code
def __str__(self):
return f"APIError(code={self.status_code}): {self.message}"
class SubstackRequestException(Exception):
def __init__(self, message):
self.message = message
def __str__(self):
return f"SubstackRequestException: {self.message}"
class SectionNotExistsException(SubstackRequestException):
pass

331
substack/post.py Normal file
View File

@@ -0,0 +1,331 @@
"""
Post Utilities
"""
import json
from typing import Dict
__all__ = ["Post"]
from substack.exceptions import SectionNotExistsException
class Post:
"""
Post utility class
"""
def __init__(
self,
title: str,
subtitle: str,
user_id,
audience: str = None,
write_comment_permissions: str = None,
):
"""
Args:
title:
subtitle:
user_id:
audience: possible values: everyone, only_paid, founding, only_free
write_comment_permissions: none, only_paid, everyone (this field is a mess)
"""
self.draft_title = title
self.draft_subtitle = subtitle
self.draft_body = {"type": "doc", "content": []}
self.draft_bylines = [{"id": int(user_id), "is_guest": False}]
self.audience = audience if audience is not None else "everyone"
self.draft_section_id = None
self.section_chosen = True
# TODO better understand the possible values and combinations with audience
if write_comment_permissions is not None:
self.write_comment_permissions = write_comment_permissions
else:
self.write_comment_permissions = self.audience
def set_section(self, name: str, sections: list):
"""
Args:
name:
sections:
Returns:
"""
section = [s for s in sections if s.get("name") == name]
if len(section) != 1:
raise SectionNotExistsException(name)
section = section[0]
self.draft_section_id = section.get("id")
def add(self, item: Dict):
"""
Add item to draft body.
Args:
item:
Returns:
"""
self.draft_body["content"] = self.draft_body.get("content", []) + [
{"type": item.get("type")}
]
content = item.get("content")
if item.get("type") == "captionedImage":
self.captioned_image(**item)
elif item.get("type") == "embeddedPublication":
self.draft_body["content"][-1]["attrs"] = item.get("url")
elif item.get("type") == "youtube2":
self.youtube(item.get("src"))
elif item.get("type") == "subscribeWidget":
self.subscribe_with_caption(item.get("message"))
else:
if content is not None:
self.add_complex_text(content)
if item.get("type") == "heading":
self.attrs(item.get("level", 1))
marks = item.get("marks")
if marks is not None:
self.marks(marks)
return self
def paragraph(self, content=None):
"""
Args:
content:
Returns:
"""
item = {"type": "paragraph"}
if content is not None:
item["content"] = content
return self.add(item)
def heading(self, content=None, level: int = 1):
"""
Args:
content:
level:
Returns:
"""
item = {"type": "heading"}
if content is not None:
item["content"] = content
item["level"] = level
return self.add(item)
def horizontal_rule(self):
"""
Returns:
"""
return self.add({"type": "horizontal_rule"})
def attrs(self, level):
"""
Args:
level:
Returns:
"""
content_attrs = self.draft_body["content"][-1].get("attrs", {})
content_attrs.update({"level": level})
self.draft_body["content"][-1]["attrs"] = content_attrs
return self
def captioned_image(
self,
src: str,
fullscreen: bool = False,
imageSize: str = "normal",
height: int = 819,
width: int = 1456,
resizeWidth: int = 728,
bytes: str = None,
alt: str = None,
title: str = None,
type: str = None,
href: str = None,
belowTheFold: bool = False,
internalRedirect: str = None,
):
"""
Add image to body.
Args:
bytes:
alt:
title:
type:
href:
belowTheFold:
internalRedirect:
src:
fullscreen:
imageSize:
height:
width:
resizeWidth:
"""
content = self.draft_body["content"][-1].get("content", [])
content += [
{
"type": "image2",
"attrs": {
"src": src,
"fullscreen": fullscreen,
"imageSize": imageSize,
"height": height,
"width": width,
"resizeWidth": resizeWidth,
"bytes": bytes,
"alt": alt,
"title": title,
"type": type,
"href": href,
"belowTheFold": belowTheFold,
"internalRedirect": internalRedirect,
},
}
]
self.draft_body["content"][-1]["content"] = content
return self
def text(self, value: str):
"""
Add text to the last paragraph.
Args:
value: Text to add to paragraph.
Returns:
"""
content = self.draft_body["content"][-1].get("content", [])
content += [{"type": "text", "text": value}]
self.draft_body["content"][-1]["content"] = content
return self
def add_complex_text(self, text):
"""
Args:
text:
"""
if isinstance(text, str):
self.text(text)
else:
for chunk in text:
if chunk:
self.text(chunk.get("content")).marks(chunk.get("marks", []))
def marks(self, marks):
"""
Args:
marks:
Returns:
"""
content = self.draft_body["content"][-1].get("content", [])[-1]
content_marks = content.get("marks", [])
for mark in marks:
new_mark = {"type": mark.get("type")}
if mark.get("type") == "link":
href = mark.get("href")
new_mark.update({"attrs": {"href": href}})
content_marks.append(new_mark)
content["marks"] = content_marks
return self
def remove_last_paragraph(self):
"""Remove last paragraph"""
del self.draft_body.get("content")[-1]
def get_draft(self):
"""
Returns:
"""
out = vars(self)
out["draft_body"] = json.dumps(out["draft_body"])
return out
def subscribe_with_caption(self, message: str = None):
"""
Add subscribe widget with caption
Args:
message:
Returns:
"""
if message is None:
message = """Thanks for reading this newsletter!
Subscribe for free to receive new posts and support my work."""
subscribe = self.draft_body["content"][-1]
subscribe["attrs"] = {
"url": "%%checkout_url%%",
"text": "Subscribe",
"language": "en",
}
subscribe["content"] = [
{
"type": "ctaCaption",
"content": [
{
"type": "text",
"text": message,
}
],
}
]
return self
def youtube(self, value: str):
"""
Add youtube video to post.
Args:
value: youtube url
Returns:
"""
content_attrs = self.draft_body["content"][-1].get("attrs", {})
content_attrs.update({"videoId": value})
self.draft_body["content"][-1]["attrs"] = content_attrs
return self