Compare commits
2 Commits
d2b39db82e
...
e0127a0362
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e0127a0362 | ||
|
|
f17cd92f90 |
@@ -1,10 +1,13 @@
|
|||||||
FROM python:3.8
|
FROM python:3.8
|
||||||
|
|
||||||
RUN apt-get update && apt-get install -y git
|
RUN apt-get update && apt-get install -y git
|
||||||
RUN git clone http://192.168.1.25:8124/zep/Substack_JV.git /app
|
RUN git clone http://192.168.1.25:8124/zep/Substack_JV.git /app
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
RUN pip install -r requirements.txt
|
RUN pip install -r requirements.txt
|
||||||
ENV TZ=Europe/Brussels
|
ENV TZ=Europe/Brussels
|
||||||
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
|
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
|
||||||
|
|
||||||
COPY update_and_run.sh /app
|
COPY update_and_run.sh /app
|
||||||
RUN chmod +x /app/update_and_run.sh
|
RUN chmod +x /app/update_and_run.sh
|
||||||
CMD ["./update_and_run.sh"]
|
CMD ["./update_and_run.sh"]
|
||||||
|
ENTRYPOINT ["sh", "-c", "./update_and_run.sh"]
|
||||||
@@ -11,6 +11,9 @@ import re
|
|||||||
from logging.handlers import RotatingFileHandler
|
from logging.handlers import RotatingFileHandler
|
||||||
import random
|
import random
|
||||||
|
|
||||||
|
import pyvirtualdisplay
|
||||||
|
|
||||||
|
|
||||||
from substack import Api
|
from substack import Api
|
||||||
from substack.post import Post
|
from substack.post import Post
|
||||||
|
|
||||||
@@ -45,10 +48,11 @@ class RSSfeed():
|
|||||||
self.youtube = yt
|
self.youtube = yt
|
||||||
|
|
||||||
class SubStackTask:
|
class SubStackTask:
|
||||||
def __init__(self, login, password, account, feeds):
|
def __init__(self, login, password, cookies_path, account, feeds):
|
||||||
self.api = Api(
|
self.api = Api(
|
||||||
email=login,
|
email=login,
|
||||||
password=password,
|
password=password,
|
||||||
|
cookies_path=cookies_path,
|
||||||
publication_url=account,
|
publication_url=account,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -237,7 +241,12 @@ async def main(login, password, account):
|
|||||||
|
|
||||||
ff = r'/data/feeds.txt'
|
ff = r'/data/feeds.txt'
|
||||||
if os.path.isfile(ff) is False:
|
if os.path.isfile(ff) is False:
|
||||||
ff = r'x:\substack\feeds.txt'
|
ff = r'feeds.txt'
|
||||||
|
|
||||||
|
cookies_path = r'/data/cookies.json'
|
||||||
|
if os.path.isfile(cookies_path) is False:
|
||||||
|
cookies_path = r'cookies.json'
|
||||||
|
|
||||||
with open(ff) as file:
|
with open(ff) as file:
|
||||||
lines = [line.rstrip() for line in file]
|
lines = [line.rstrip() for line in file]
|
||||||
|
|
||||||
@@ -245,7 +254,7 @@ async def main(login, password, account):
|
|||||||
youtube = "youtube" in line
|
youtube = "youtube" in line
|
||||||
feeds.append(RSSfeed(line, youtube))
|
feeds.append(RSSfeed(line, youtube))
|
||||||
|
|
||||||
task = SubStackTask(login, password, account, feeds)
|
task = SubStackTask(login, password, cookies_path, account, feeds)
|
||||||
|
|
||||||
LOG.info("Starting bot")
|
LOG.info("Starting bot")
|
||||||
await task.run_daily_at_6_am()
|
await task.run_daily_at_6_am()
|
||||||
|
|||||||
15
feeds.txt
Normal file
15
feeds.txt
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
https://www.factornews.com/rss.xml
|
||||||
|
https://nofrag.com/feed
|
||||||
|
https://dystopeek.fr/feed/
|
||||||
|
https://thepixelpost.com/rss/
|
||||||
|
https://yamukass.substack.com/feed
|
||||||
|
https://tseret.com/categorie/tests/feed
|
||||||
|
https://www.gamesidestory.com/feed
|
||||||
|
https://www.nintendo-town.fr/feed
|
||||||
|
https://jesuisungameur.com/feed
|
||||||
|
https://www.switch-actu.fr/categorie/tests/tests-de-jeux/feed
|
||||||
|
https://www.playscope.com/category/articles/test-gaming/feed
|
||||||
|
https://jrpgfr.net/category/test/feed
|
||||||
|
https://jv.jeuxonline.info/rss/dossiers/rss.xml
|
||||||
|
https://www.youtube.com/feeds/videos.xml?channel_id=UC-OvBDfZGn1OdsqMBwkOI_A
|
||||||
|
https://www.youtube.com/feeds/videos.xml?playlist_id=PLZRiqJjIUlDTrwYs_UqEIts5fVaBpaIEz
|
||||||
53
generate_cookie.py
Normal file
53
generate_cookie.py
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
from selenium import webdriver
|
||||||
|
from selenium.webdriver.common.by import By
|
||||||
|
from selenium.webdriver.common.keys import Keys
|
||||||
|
from selenium.webdriver.chrome.options import Options
|
||||||
|
from selenium.webdriver.support.ui import WebDriverWait
|
||||||
|
from selenium.webdriver.support import expected_conditions as EC
|
||||||
|
|
||||||
|
import time
|
||||||
|
import pickle
|
||||||
|
|
||||||
|
def save_cookies(driver, path):
|
||||||
|
with open(path, 'wb') as file:
|
||||||
|
pickle.dump(driver.get_cookies(), file)
|
||||||
|
|
||||||
|
def generate_cookies(email, password):
|
||||||
|
cookie_path = r'x:\substack\cookies.pkl'
|
||||||
|
|
||||||
|
|
||||||
|
chrome_options = Options()
|
||||||
|
|
||||||
|
driver = webdriver.Chrome(options=chrome_options)
|
||||||
|
driver.get('https://substack.com/sign-in')
|
||||||
|
|
||||||
|
wait = WebDriverWait(driver, 10)
|
||||||
|
|
||||||
|
try:
|
||||||
|
login_with_password_button = wait.until(
|
||||||
|
EC.element_to_be_clickable((By.LINK_TEXT, 'Sign in with password'))
|
||||||
|
)
|
||||||
|
login_with_password_button.click()
|
||||||
|
|
||||||
|
time.sleep(2)
|
||||||
|
|
||||||
|
email_field = driver.find_element(By.NAME, 'email')
|
||||||
|
email_field.send_keys(email)
|
||||||
|
|
||||||
|
password_field = driver.find_element(By.NAME, 'password')
|
||||||
|
password_field.send_keys(password)
|
||||||
|
|
||||||
|
password_field.send_keys(Keys.RETURN)
|
||||||
|
|
||||||
|
save_cookies(driver, cookie_path)
|
||||||
|
print("Cookies saved successfully.")
|
||||||
|
except Exception as e:
|
||||||
|
print("An error occurred during login.", e)
|
||||||
|
|
||||||
|
driver.quit()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
email = "gael.honorez@gmail.com"
|
||||||
|
password = "f3PaTGedjFc2gkr1ypi5"
|
||||||
|
generate_cookies(email, password)
|
||||||
@@ -1,3 +1,2 @@
|
|||||||
requests
|
requests
|
||||||
feedparser
|
feedparser
|
||||||
python-substack
|
|
||||||
11
substack/__init__.py
Normal file
11
substack/__init__.py
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
"""A library that provides a Python interface to the Substack API."""
|
||||||
|
|
||||||
|
__author__ = "Paolo Mazza"
|
||||||
|
__email__ = "mazzapaolo2019@gmail.com"
|
||||||
|
__license__ = "MIT License"
|
||||||
|
__version__ = "1.0"
|
||||||
|
__url__ = "https://github.com/ma2za/python-substack"
|
||||||
|
__download_url__ = "https://pypi.python.org/pypi/python-substack"
|
||||||
|
__description__ = "A Python wrapper around the Substack API"
|
||||||
|
|
||||||
|
from .api import Api
|
||||||
708
substack/api.py
Normal file
708
substack/api.py
Normal file
@@ -0,0 +1,708 @@
|
|||||||
|
"""
|
||||||
|
|
||||||
|
API Wrapper
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
import base64
|
||||||
|
import json
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from datetime import datetime
|
||||||
|
from urllib.parse import urljoin
|
||||||
|
from pyvirtualdisplay import Display
|
||||||
|
import requests
|
||||||
|
from selenium import webdriver
|
||||||
|
from selenium.webdriver.common.keys import Keys
|
||||||
|
from selenium.webdriver.common.by import By
|
||||||
|
from selenium.webdriver.chrome.options import Options
|
||||||
|
from selenium.webdriver.support.ui import WebDriverWait
|
||||||
|
import pickle
|
||||||
|
import time
|
||||||
|
from substack.exceptions import SubstackAPIException, SubstackRequestException
|
||||||
|
from selenium.webdriver.support import expected_conditions as EC
|
||||||
|
import datetime
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
__all__ = ["Api"]
|
||||||
|
|
||||||
|
|
||||||
|
class Api:
|
||||||
|
"""
|
||||||
|
|
||||||
|
A python interface into the Substack API
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
email=None,
|
||||||
|
password=None,
|
||||||
|
cookies_path=None,
|
||||||
|
base_url=None,
|
||||||
|
publication_url=None,
|
||||||
|
debug=False,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
|
||||||
|
To create an instance of the substack.Api class:
|
||||||
|
>>> import substack
|
||||||
|
>>> api = substack.Api(email="substack email", password="substack password")
|
||||||
|
|
||||||
|
Args:
|
||||||
|
email:
|
||||||
|
password:
|
||||||
|
cookies_path
|
||||||
|
To re-use your session without logging in each time, you can save your cookies to a json file and
|
||||||
|
then load them in the next session.
|
||||||
|
Make sure to re-save your cookies, as they do update over time.
|
||||||
|
base_url:
|
||||||
|
The base URL to use to contact the Substack API.
|
||||||
|
Defaults to https://substack.com/api/v1.
|
||||||
|
"""
|
||||||
|
self.base_url = base_url or "https://substack.com/api/v1"
|
||||||
|
self.email = email
|
||||||
|
self.password = password
|
||||||
|
|
||||||
|
if debug:
|
||||||
|
logging.basicConfig()
|
||||||
|
logging.getLogger().setLevel(logging.DEBUG)
|
||||||
|
|
||||||
|
self._session = requests.Session()
|
||||||
|
|
||||||
|
# Load cookies from file if provided
|
||||||
|
# Helps with Captcha errors by reusing cookies from "local" auth, then switching to running code in the cloud
|
||||||
|
|
||||||
|
if cookies_path is not None:
|
||||||
|
with open(cookies_path) as f:
|
||||||
|
cookies = json.load(f)
|
||||||
|
self._session.cookies.update(cookies)
|
||||||
|
|
||||||
|
elif email is not None and password is not None:
|
||||||
|
self.send_magic_link(email)
|
||||||
|
magic_link = input("Enter magic link: ")
|
||||||
|
self.login_v2(email, password, magic_link)
|
||||||
|
self.export_cookies(cookies_path)
|
||||||
|
|
||||||
|
else:
|
||||||
|
raise ValueError(
|
||||||
|
"Must provide email and password or cookies_path to authenticate."
|
||||||
|
)
|
||||||
|
|
||||||
|
user_publication = None
|
||||||
|
# if the user provided a publication url, then use that
|
||||||
|
if publication_url:
|
||||||
|
import re
|
||||||
|
|
||||||
|
# Regular expression to extract subdomain name
|
||||||
|
match = re.search(r"https://(.*).substack.com", publication_url.lower())
|
||||||
|
subdomain = match.group(1) if match else None
|
||||||
|
|
||||||
|
user_publications = self.get_user_publications()
|
||||||
|
# search through publications to find the publication with the matching subdomain
|
||||||
|
for publication in user_publications:
|
||||||
|
if publication["subdomain"] == subdomain:
|
||||||
|
# set the current publication to the users publication
|
||||||
|
user_publication = publication
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
# get the users primary publication
|
||||||
|
user_publication = self.get_user_primary_publication()
|
||||||
|
|
||||||
|
# set the current publication to the users primary publication
|
||||||
|
self.change_publication(user_publication)
|
||||||
|
|
||||||
|
def send_magic_link(self, email):
|
||||||
|
body = {
|
||||||
|
"email": email,
|
||||||
|
"redirect": "/",
|
||||||
|
"for_pub": "",
|
||||||
|
}
|
||||||
|
endpoint = f"https://substack.com/api/v1/email-login/"
|
||||||
|
response = self._session.post(endpoint, json=body)
|
||||||
|
print("Magic link sent!")
|
||||||
|
|
||||||
|
|
||||||
|
def login_v2(self, email, password, magic_link):
|
||||||
|
return self._session.get(magic_link)
|
||||||
|
|
||||||
|
def login(self, email, password) -> dict:
|
||||||
|
"""
|
||||||
|
|
||||||
|
Login to the substack account.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
email: substack account email
|
||||||
|
password: substack account password
|
||||||
|
"""
|
||||||
|
|
||||||
|
response = self._session.post(
|
||||||
|
f"{self.base_url}/login",
|
||||||
|
json={
|
||||||
|
"captcha_response": None,
|
||||||
|
"email": email,
|
||||||
|
"password": password,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
return self._handle_response(response=response)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def signin_for_pub(self, publication):
|
||||||
|
"""
|
||||||
|
Complete the signin process
|
||||||
|
"""
|
||||||
|
response = self._session.get(
|
||||||
|
f"https://substack.com/sign-in?redirect=%2F&for_pub={publication['subdomain']}",
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
output = self._handle_response(response=response)
|
||||||
|
except SubstackRequestException as ex:
|
||||||
|
output = {}
|
||||||
|
return output
|
||||||
|
|
||||||
|
def change_publication(self, publication):
|
||||||
|
"""
|
||||||
|
Change the publication URL
|
||||||
|
"""
|
||||||
|
self.publication_url = urljoin(publication["publication_url"], "api/v1")
|
||||||
|
|
||||||
|
# sign-in to the publication
|
||||||
|
self.signin_for_pub(publication)
|
||||||
|
|
||||||
|
def export_cookies(self, path: str = "cookies.json"):
|
||||||
|
"""
|
||||||
|
Export cookies to a json file.
|
||||||
|
Args:
|
||||||
|
path: path to the json file
|
||||||
|
"""
|
||||||
|
cookies = self._session.cookies.get_dict()
|
||||||
|
with open(path, "w") as f:
|
||||||
|
json.dump(cookies, f)
|
||||||
|
|
||||||
|
def save_cookies(self, driver, path):
|
||||||
|
with open(path, 'wb') as file:
|
||||||
|
pickle.dump(driver.get_cookies(), file)
|
||||||
|
|
||||||
|
def load_cookies(self, driver, path):
|
||||||
|
with open(path, 'rb') as file:
|
||||||
|
cookies = pickle.load(file)
|
||||||
|
for cookie in cookies:
|
||||||
|
driver.add_cookie(cookie)
|
||||||
|
|
||||||
|
def login_with_selenium(self):
|
||||||
|
"""
|
||||||
|
Login using Selenium to solve CAPTCHA manually.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Start virtual display
|
||||||
|
cookie_path = 'cookies.pkl'
|
||||||
|
|
||||||
|
if os.path.exists(cookie_path):
|
||||||
|
try:
|
||||||
|
# Load cookies directly to session if they exist
|
||||||
|
with open(cookie_path, 'rb') as file:
|
||||||
|
cookies = pickle.load(file)
|
||||||
|
cookies_valid = True
|
||||||
|
for cookie in cookies:
|
||||||
|
if 'expiry' in cookie:
|
||||||
|
expiry_date = datetime.datetime.fromtimestamp(cookie['expiry'])
|
||||||
|
print(f"Cookie {cookie['name']} expires on {expiry_date}")
|
||||||
|
if cookie['expiry'] < time.time():
|
||||||
|
cookies_valid = False
|
||||||
|
print(f"Cookie {cookie['name']} has expired.")
|
||||||
|
break
|
||||||
|
if cookies_valid:
|
||||||
|
# Load cookies into session
|
||||||
|
for cookie in cookies:
|
||||||
|
self._session.cookies.set(cookie['name'], cookie['value'])
|
||||||
|
|
||||||
|
print("Cookies loaded successfully. Skipping login.")
|
||||||
|
return
|
||||||
|
except Exception as e:
|
||||||
|
print("Error loading cookies, proceeding with Selenium login.", e)
|
||||||
|
|
||||||
|
#display = Display()
|
||||||
|
#display.start()
|
||||||
|
print("Login with selenium")
|
||||||
|
chrome_options = Options()
|
||||||
|
chrome_options.add_argument("--headless")
|
||||||
|
chrome_options.add_argument("--no-sandbox")
|
||||||
|
chrome_options.add_argument("--disable-dev-shm-usage")
|
||||||
|
chrome_options.add_argument("--disable-gpu")
|
||||||
|
|
||||||
|
driver = webdriver.Chrome()
|
||||||
|
driver.get('https://substack.com/sign-in')
|
||||||
|
|
||||||
|
# Check if already logged in by checking the presence of a user-specific element
|
||||||
|
wait = WebDriverWait(driver, 10)
|
||||||
|
try:
|
||||||
|
|
||||||
|
# Adjust the selector to match an element that is present only when logged in
|
||||||
|
dashboard_button = wait.until(
|
||||||
|
EC.presence_of_element_located((By.CSS_SELECTOR, 'button[data-href*="publish/home?utm_source=menu"]'))
|
||||||
|
)
|
||||||
|
print("Already logged in.")
|
||||||
|
|
||||||
|
except Exception:
|
||||||
|
print("Not logged in. Proceeding with login steps.")
|
||||||
|
try:
|
||||||
|
login_with_password_button = wait.until(EC.element_to_be_clickable((By.LINK_TEXT, 'Log in with password')))
|
||||||
|
login_with_password_button.click()
|
||||||
|
|
||||||
|
time.sleep(2) # Wait for the transition to the login form
|
||||||
|
|
||||||
|
# Fill in the email and password fields
|
||||||
|
email_field = driver.find_element(By.NAME, 'email')
|
||||||
|
email_field.send_keys(self.email)
|
||||||
|
|
||||||
|
password_field = driver.find_element(By.NAME, 'password')
|
||||||
|
password_field.send_keys(self.password)
|
||||||
|
|
||||||
|
# Submit the form
|
||||||
|
password_field.send_keys(Keys.RETURN)
|
||||||
|
|
||||||
|
print("Please solve the CAPTCHA manually in the opened browser.")
|
||||||
|
input("Press Enter after solving the CAPTCHA...")
|
||||||
|
|
||||||
|
# Save cookies after solving the CAPTCHA
|
||||||
|
self.save_cookies(driver, cookie_path)
|
||||||
|
print("Cookies saved successfully.")
|
||||||
|
except Exception as e:
|
||||||
|
print("An error occurred during login.", e)
|
||||||
|
|
||||||
|
# Save cookies after login or cookie load
|
||||||
|
self.save_cookies(driver, cookie_path)
|
||||||
|
|
||||||
|
# Extract cookies to use with requests
|
||||||
|
cookies = driver.get_cookies()
|
||||||
|
for cookie in cookies:
|
||||||
|
self._session.cookies.set(cookie['name'], cookie['value'])
|
||||||
|
|
||||||
|
# Close the browser
|
||||||
|
driver.quit()
|
||||||
|
|
||||||
|
|
||||||
|
def _handle_response(self, response: requests.Response):
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
Internal helper for handling API responses from the Substack server.
|
||||||
|
Raises the appropriate exceptions when necessary; otherwise, returns the
|
||||||
|
response.
|
||||||
|
|
||||||
|
"""
|
||||||
|
if (response.status_code == 401):
|
||||||
|
print("CAPTCHA detected, switching to Selenium for manual solving.")
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
if not (200 <= response.status_code < 300):
|
||||||
|
raise SubstackAPIException(response.status_code, response.text)
|
||||||
|
try:
|
||||||
|
return response.json()
|
||||||
|
except ValueError:
|
||||||
|
raise SubstackRequestException("Invalid Response: %s" % response.text)
|
||||||
|
|
||||||
|
def get_user_id(self):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
profile = self.get_user_profile()
|
||||||
|
user_id = profile["id"]
|
||||||
|
|
||||||
|
return user_id
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_publication_url(publication: dict) -> str:
|
||||||
|
"""
|
||||||
|
Gets the publication url
|
||||||
|
|
||||||
|
Args:
|
||||||
|
publication:
|
||||||
|
"""
|
||||||
|
custom_domain = publication["custom_domain"]
|
||||||
|
if not custom_domain:
|
||||||
|
publication_url = f"https://{publication['subdomain']}.substack.com"
|
||||||
|
else:
|
||||||
|
publication_url = f"https://{custom_domain}"
|
||||||
|
|
||||||
|
return publication_url
|
||||||
|
|
||||||
|
def get_user_primary_publication(self):
|
||||||
|
"""
|
||||||
|
Gets the users primary publication
|
||||||
|
"""
|
||||||
|
|
||||||
|
profile = self.get_user_profile()
|
||||||
|
primary_publication = profile["primaryPublication"]
|
||||||
|
primary_publication["publication_url"] = self.get_publication_url(
|
||||||
|
primary_publication
|
||||||
|
)
|
||||||
|
|
||||||
|
return primary_publication
|
||||||
|
|
||||||
|
def get_user_publications(self):
|
||||||
|
"""
|
||||||
|
Gets the users publications
|
||||||
|
"""
|
||||||
|
|
||||||
|
profile = self.get_user_profile()
|
||||||
|
|
||||||
|
# Loop through users "publicationUsers" list, and return a list
|
||||||
|
# of dictionaries of "name", and "subdomain", and "id"
|
||||||
|
user_publications = []
|
||||||
|
for publication in profile["publicationUsers"]:
|
||||||
|
pub = publication["publication"]
|
||||||
|
pub["publication_url"] = self.get_publication_url(pub)
|
||||||
|
user_publications.append(pub)
|
||||||
|
|
||||||
|
return user_publications
|
||||||
|
|
||||||
|
def get_user_profile(self):
|
||||||
|
"""
|
||||||
|
Gets the users profile
|
||||||
|
"""
|
||||||
|
response = self._session.get(f"{self.base_url}/user/profile/self")
|
||||||
|
|
||||||
|
return self._handle_response(response=response)
|
||||||
|
|
||||||
|
def get_user_settings(self):
|
||||||
|
"""
|
||||||
|
Get list of users.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = self._session.get(f"{self.base_url}/settings")
|
||||||
|
|
||||||
|
return self._handle_response(response=response)
|
||||||
|
|
||||||
|
def get_publication_users(self):
|
||||||
|
"""
|
||||||
|
Get list of users.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = self._session.get(f"{self.publication_url}/publication/users")
|
||||||
|
|
||||||
|
return self._handle_response(response=response)
|
||||||
|
|
||||||
|
def get_publication_subscriber_count(self):
|
||||||
|
|
||||||
|
"""
|
||||||
|
Get subscriber count.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = self._session.get(
|
||||||
|
f"{self.publication_url}/publication_launch_checklist"
|
||||||
|
)
|
||||||
|
|
||||||
|
return self._handle_response(response=response)["subscriberCount"]
|
||||||
|
|
||||||
|
def get_published_posts(
|
||||||
|
self, offset=0, limit=25, order_by="post_date", order_direction="desc"
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Get list of published posts for the publication.
|
||||||
|
"""
|
||||||
|
response = self._session.get(
|
||||||
|
f"{self.publication_url}/post_management/published",
|
||||||
|
params={
|
||||||
|
"offset": offset,
|
||||||
|
"limit": limit,
|
||||||
|
"order_by": order_by,
|
||||||
|
"order_direction": order_direction,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
return self._handle_response(response=response)
|
||||||
|
|
||||||
|
def get_posts(self) -> dict:
|
||||||
|
"""
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = self._session.get(f"{self.base_url}/reader/posts")
|
||||||
|
|
||||||
|
return self._handle_response(response=response)
|
||||||
|
|
||||||
|
def get_drafts(self, filter=None, offset=None, limit=None):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filter:
|
||||||
|
offset:
|
||||||
|
limit:
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = self._session.get(
|
||||||
|
f"{self.publication_url}/drafts",
|
||||||
|
params={"filter": filter, "offset": offset, "limit": limit},
|
||||||
|
)
|
||||||
|
return self._handle_response(response=response)
|
||||||
|
|
||||||
|
def get_draft(self, draft_id):
|
||||||
|
"""
|
||||||
|
Gets a draft given it's id.
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = self._session.get(f"{self.publication_url}/drafts/{draft_id}")
|
||||||
|
return self._handle_response(response=response)
|
||||||
|
|
||||||
|
def delete_draft(self, draft_id):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
draft_id:
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = self._session.delete(f"{self.publication_url}/drafts/{draft_id}")
|
||||||
|
return self._handle_response(response=response)
|
||||||
|
|
||||||
|
def post_draft(self, body) -> dict:
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
body:
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = self._session.post(f"{self.publication_url}/drafts", json=body)
|
||||||
|
return self._handle_response(response=response)
|
||||||
|
|
||||||
|
def put_draft(self, draft, **kwargs) -> dict:
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
draft:
|
||||||
|
**kwargs:
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = self._session.put(
|
||||||
|
f"{self.publication_url}/drafts/{draft}",
|
||||||
|
json=kwargs,
|
||||||
|
)
|
||||||
|
return self._handle_response(response=response)
|
||||||
|
|
||||||
|
def prepublish_draft(self, draft) -> dict:
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
draft: draft id
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
response = self._session.get(
|
||||||
|
f"{self.publication_url}/drafts/{draft}/prepublish"
|
||||||
|
)
|
||||||
|
return self._handle_response(response=response)
|
||||||
|
|
||||||
|
def publish_draft(
|
||||||
|
self, draft, send: bool = True, share_automatically: bool = False
|
||||||
|
) -> dict:
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
draft: draft id
|
||||||
|
send:
|
||||||
|
share_automatically:
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = self._session.post(
|
||||||
|
f"{self.publication_url}/drafts/{draft}/publish",
|
||||||
|
json={"send": send, "share_automatically": share_automatically},
|
||||||
|
)
|
||||||
|
return self._handle_response(response=response)
|
||||||
|
|
||||||
|
def schedule_draft(self, draft, draft_datetime: datetime) -> dict:
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
draft: draft id
|
||||||
|
draft_datetime: datetime to schedule the draft
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = self._session.post(
|
||||||
|
f"{self.publication_url}/drafts/{draft}/schedule",
|
||||||
|
json={"post_date": draft_datetime.isoformat()},
|
||||||
|
)
|
||||||
|
return self._handle_response(response=response)
|
||||||
|
|
||||||
|
def unschedule_draft(self, draft) -> dict:
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
draft: draft id
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = self._session.post(
|
||||||
|
f"{self.publication_url}/drafts/{draft}/schedule", json={"post_date": None}
|
||||||
|
)
|
||||||
|
return self._handle_response(response=response)
|
||||||
|
|
||||||
|
def get_image(self, image: str):
|
||||||
|
"""
|
||||||
|
|
||||||
|
This method generates a new substack link that contains the image.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
image: filepath or original url of image.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
if os.path.exists(image):
|
||||||
|
with open(image, "rb") as file:
|
||||||
|
image = b"data:image/jpeg;base64," + base64.b64encode(file.read())
|
||||||
|
|
||||||
|
response = self._session.post(
|
||||||
|
f"{self.publication_url}/image",
|
||||||
|
data={"image": image},
|
||||||
|
)
|
||||||
|
return self._handle_response(response=response)
|
||||||
|
|
||||||
|
def get_categories(self):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Retrieve list of all available categories.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = self._session.get(f"{self.base_url}/categories")
|
||||||
|
return self._handle_response(response=response)
|
||||||
|
|
||||||
|
def get_category(self, category_id, category_type, page):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
category_id:
|
||||||
|
category_type:
|
||||||
|
page:
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = self._session.get(
|
||||||
|
f"{self.base_url}/category/public/{category_id}/{category_type}",
|
||||||
|
params={"page": page},
|
||||||
|
)
|
||||||
|
return self._handle_response(response=response)
|
||||||
|
|
||||||
|
def get_single_category(self, category_id, category_type, page=None, limit=None):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
category_id:
|
||||||
|
category_type: paid or all
|
||||||
|
page: by default substack retrieves only the first 25 publications in the category. If this is left None,
|
||||||
|
then all pages will be retrieved. The page size is 25 publications.
|
||||||
|
limit:
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
if page is not None:
|
||||||
|
output = self.get_category(category_id, category_type, page)
|
||||||
|
else:
|
||||||
|
publications = []
|
||||||
|
page = 0
|
||||||
|
while True:
|
||||||
|
page_output = self.get_category(category_id, category_type, page)
|
||||||
|
publications.extend(page_output.get("publications", []))
|
||||||
|
if (
|
||||||
|
limit is not None and limit <= len(publications)
|
||||||
|
) or not page_output.get("more", False):
|
||||||
|
publications = publications[:limit]
|
||||||
|
break
|
||||||
|
page += 1
|
||||||
|
output = {
|
||||||
|
"publications": publications,
|
||||||
|
"more": page_output.get("more", False),
|
||||||
|
}
|
||||||
|
return output
|
||||||
|
|
||||||
|
def delete_all_drafts(self):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = None
|
||||||
|
while True:
|
||||||
|
drafts = self.get_drafts(filter="draft", limit=10, offset=0)
|
||||||
|
if len(drafts) == 0:
|
||||||
|
break
|
||||||
|
for draft in drafts:
|
||||||
|
response = self.delete_draft(draft.get("id"))
|
||||||
|
return response
|
||||||
|
|
||||||
|
def get_sections(self):
|
||||||
|
"""
|
||||||
|
Get a list of the sections of your publication.
|
||||||
|
|
||||||
|
TODO: this is hacky but I cannot find another place where to get the sections.
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = self._session.get(
|
||||||
|
f"{self.publication_url}/subscriptions",
|
||||||
|
)
|
||||||
|
content = Api._handle_response(response=response)
|
||||||
|
sections = [
|
||||||
|
p.get("sections")
|
||||||
|
for p in content.get("publications")
|
||||||
|
if p.get("hostname") in self.publication_url
|
||||||
|
]
|
||||||
|
return sections[0]
|
||||||
|
|
||||||
|
def publication_embed(self, url):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
url:
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
return self.call("/publication/embed", "GET", url=url)
|
||||||
|
|
||||||
|
def call(self, endpoint, method, **params):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
endpoint:
|
||||||
|
method:
|
||||||
|
**params:
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
response = self._session.request(
|
||||||
|
method=method,
|
||||||
|
url=f"{self.publication_url}/{endpoint}",
|
||||||
|
params=params,
|
||||||
|
)
|
||||||
|
return self._handle_response(response=response)
|
||||||
32
substack/exceptions.py
Normal file
32
substack/exceptions.py
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
import json
|
||||||
|
|
||||||
|
|
||||||
|
class SubstackAPIException(Exception):
|
||||||
|
def __init__(self, status_code, text):
|
||||||
|
try:
|
||||||
|
json_res = json.loads(text)
|
||||||
|
except ValueError:
|
||||||
|
self.message = f"Invalid JSON error message from Substack: {text}"
|
||||||
|
else:
|
||||||
|
self.message = ", ".join(
|
||||||
|
list(
|
||||||
|
map(lambda error: error.get("msg", ""), json_res.get("errors", []))
|
||||||
|
)
|
||||||
|
)
|
||||||
|
self.message = self.message or json_res.get("error", "")
|
||||||
|
self.status_code = status_code
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return f"APIError(code={self.status_code}): {self.message}"
|
||||||
|
|
||||||
|
|
||||||
|
class SubstackRequestException(Exception):
|
||||||
|
def __init__(self, message):
|
||||||
|
self.message = message
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return f"SubstackRequestException: {self.message}"
|
||||||
|
|
||||||
|
|
||||||
|
class SectionNotExistsException(SubstackRequestException):
|
||||||
|
pass
|
||||||
331
substack/post.py
Normal file
331
substack/post.py
Normal file
@@ -0,0 +1,331 @@
|
|||||||
|
"""
|
||||||
|
|
||||||
|
Post Utilities
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
from typing import Dict
|
||||||
|
|
||||||
|
__all__ = ["Post"]
|
||||||
|
|
||||||
|
from substack.exceptions import SectionNotExistsException
|
||||||
|
|
||||||
|
|
||||||
|
class Post:
|
||||||
|
"""
|
||||||
|
|
||||||
|
Post utility class
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
title: str,
|
||||||
|
subtitle: str,
|
||||||
|
user_id,
|
||||||
|
audience: str = None,
|
||||||
|
write_comment_permissions: str = None,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
title:
|
||||||
|
subtitle:
|
||||||
|
user_id:
|
||||||
|
audience: possible values: everyone, only_paid, founding, only_free
|
||||||
|
write_comment_permissions: none, only_paid, everyone (this field is a mess)
|
||||||
|
"""
|
||||||
|
self.draft_title = title
|
||||||
|
self.draft_subtitle = subtitle
|
||||||
|
self.draft_body = {"type": "doc", "content": []}
|
||||||
|
self.draft_bylines = [{"id": int(user_id), "is_guest": False}]
|
||||||
|
self.audience = audience if audience is not None else "everyone"
|
||||||
|
self.draft_section_id = None
|
||||||
|
self.section_chosen = True
|
||||||
|
|
||||||
|
# TODO better understand the possible values and combinations with audience
|
||||||
|
if write_comment_permissions is not None:
|
||||||
|
self.write_comment_permissions = write_comment_permissions
|
||||||
|
else:
|
||||||
|
self.write_comment_permissions = self.audience
|
||||||
|
|
||||||
|
def set_section(self, name: str, sections: list):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name:
|
||||||
|
sections:
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
section = [s for s in sections if s.get("name") == name]
|
||||||
|
if len(section) != 1:
|
||||||
|
raise SectionNotExistsException(name)
|
||||||
|
section = section[0]
|
||||||
|
self.draft_section_id = section.get("id")
|
||||||
|
|
||||||
|
def add(self, item: Dict):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Add item to draft body.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
item:
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
self.draft_body["content"] = self.draft_body.get("content", []) + [
|
||||||
|
{"type": item.get("type")}
|
||||||
|
]
|
||||||
|
content = item.get("content")
|
||||||
|
if item.get("type") == "captionedImage":
|
||||||
|
self.captioned_image(**item)
|
||||||
|
elif item.get("type") == "embeddedPublication":
|
||||||
|
self.draft_body["content"][-1]["attrs"] = item.get("url")
|
||||||
|
elif item.get("type") == "youtube2":
|
||||||
|
self.youtube(item.get("src"))
|
||||||
|
elif item.get("type") == "subscribeWidget":
|
||||||
|
self.subscribe_with_caption(item.get("message"))
|
||||||
|
else:
|
||||||
|
if content is not None:
|
||||||
|
self.add_complex_text(content)
|
||||||
|
|
||||||
|
if item.get("type") == "heading":
|
||||||
|
self.attrs(item.get("level", 1))
|
||||||
|
|
||||||
|
marks = item.get("marks")
|
||||||
|
if marks is not None:
|
||||||
|
self.marks(marks)
|
||||||
|
|
||||||
|
return self
|
||||||
|
|
||||||
|
def paragraph(self, content=None):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
content:
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
item = {"type": "paragraph"}
|
||||||
|
if content is not None:
|
||||||
|
item["content"] = content
|
||||||
|
return self.add(item)
|
||||||
|
|
||||||
|
def heading(self, content=None, level: int = 1):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
content:
|
||||||
|
level:
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
item = {"type": "heading"}
|
||||||
|
if content is not None:
|
||||||
|
item["content"] = content
|
||||||
|
item["level"] = level
|
||||||
|
return self.add(item)
|
||||||
|
|
||||||
|
def horizontal_rule(self):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
return self.add({"type": "horizontal_rule"})
|
||||||
|
|
||||||
|
def attrs(self, level):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
level:
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
content_attrs = self.draft_body["content"][-1].get("attrs", {})
|
||||||
|
content_attrs.update({"level": level})
|
||||||
|
self.draft_body["content"][-1]["attrs"] = content_attrs
|
||||||
|
return self
|
||||||
|
|
||||||
|
def captioned_image(
|
||||||
|
self,
|
||||||
|
src: str,
|
||||||
|
fullscreen: bool = False,
|
||||||
|
imageSize: str = "normal",
|
||||||
|
height: int = 819,
|
||||||
|
width: int = 1456,
|
||||||
|
resizeWidth: int = 728,
|
||||||
|
bytes: str = None,
|
||||||
|
alt: str = None,
|
||||||
|
title: str = None,
|
||||||
|
type: str = None,
|
||||||
|
href: str = None,
|
||||||
|
belowTheFold: bool = False,
|
||||||
|
internalRedirect: str = None,
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Add image to body.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
bytes:
|
||||||
|
alt:
|
||||||
|
title:
|
||||||
|
type:
|
||||||
|
href:
|
||||||
|
belowTheFold:
|
||||||
|
internalRedirect:
|
||||||
|
src:
|
||||||
|
fullscreen:
|
||||||
|
imageSize:
|
||||||
|
height:
|
||||||
|
width:
|
||||||
|
resizeWidth:
|
||||||
|
"""
|
||||||
|
|
||||||
|
content = self.draft_body["content"][-1].get("content", [])
|
||||||
|
content += [
|
||||||
|
{
|
||||||
|
"type": "image2",
|
||||||
|
"attrs": {
|
||||||
|
"src": src,
|
||||||
|
"fullscreen": fullscreen,
|
||||||
|
"imageSize": imageSize,
|
||||||
|
"height": height,
|
||||||
|
"width": width,
|
||||||
|
"resizeWidth": resizeWidth,
|
||||||
|
"bytes": bytes,
|
||||||
|
"alt": alt,
|
||||||
|
"title": title,
|
||||||
|
"type": type,
|
||||||
|
"href": href,
|
||||||
|
"belowTheFold": belowTheFold,
|
||||||
|
"internalRedirect": internalRedirect,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
]
|
||||||
|
self.draft_body["content"][-1]["content"] = content
|
||||||
|
return self
|
||||||
|
|
||||||
|
def text(self, value: str):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Add text to the last paragraph.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
value: Text to add to paragraph.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
content = self.draft_body["content"][-1].get("content", [])
|
||||||
|
content += [{"type": "text", "text": value}]
|
||||||
|
self.draft_body["content"][-1]["content"] = content
|
||||||
|
return self
|
||||||
|
|
||||||
|
def add_complex_text(self, text):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
text:
|
||||||
|
"""
|
||||||
|
if isinstance(text, str):
|
||||||
|
self.text(text)
|
||||||
|
else:
|
||||||
|
for chunk in text:
|
||||||
|
if chunk:
|
||||||
|
self.text(chunk.get("content")).marks(chunk.get("marks", []))
|
||||||
|
|
||||||
|
def marks(self, marks):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
marks:
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
content = self.draft_body["content"][-1].get("content", [])[-1]
|
||||||
|
content_marks = content.get("marks", [])
|
||||||
|
for mark in marks:
|
||||||
|
new_mark = {"type": mark.get("type")}
|
||||||
|
if mark.get("type") == "link":
|
||||||
|
href = mark.get("href")
|
||||||
|
new_mark.update({"attrs": {"href": href}})
|
||||||
|
content_marks.append(new_mark)
|
||||||
|
content["marks"] = content_marks
|
||||||
|
return self
|
||||||
|
|
||||||
|
def remove_last_paragraph(self):
|
||||||
|
"""Remove last paragraph"""
|
||||||
|
del self.draft_body.get("content")[-1]
|
||||||
|
|
||||||
|
def get_draft(self):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
out = vars(self)
|
||||||
|
out["draft_body"] = json.dumps(out["draft_body"])
|
||||||
|
return out
|
||||||
|
|
||||||
|
def subscribe_with_caption(self, message: str = None):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Add subscribe widget with caption
|
||||||
|
|
||||||
|
Args:
|
||||||
|
message:
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
|
if message is None:
|
||||||
|
message = """Thanks for reading this newsletter!
|
||||||
|
Subscribe for free to receive new posts and support my work."""
|
||||||
|
|
||||||
|
subscribe = self.draft_body["content"][-1]
|
||||||
|
subscribe["attrs"] = {
|
||||||
|
"url": "%%checkout_url%%",
|
||||||
|
"text": "Subscribe",
|
||||||
|
"language": "en",
|
||||||
|
}
|
||||||
|
subscribe["content"] = [
|
||||||
|
{
|
||||||
|
"type": "ctaCaption",
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"type": "text",
|
||||||
|
"text": message,
|
||||||
|
}
|
||||||
|
],
|
||||||
|
}
|
||||||
|
]
|
||||||
|
return self
|
||||||
|
|
||||||
|
def youtube(self, value: str):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Add youtube video to post.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
value: youtube url
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
|
||||||
|
"""
|
||||||
|
content_attrs = self.draft_body["content"][-1].get("attrs", {})
|
||||||
|
content_attrs.update({"videoId": value})
|
||||||
|
self.draft_body["content"][-1]["attrs"] = content_attrs
|
||||||
|
return self
|
||||||
Reference in New Issue
Block a user