fixing email check

This commit is contained in:
Gaël
2024-07-03 18:44:36 +02:00
parent e0127a0362
commit 668843d8e8
4 changed files with 159 additions and 25 deletions

114
checkemail.py Normal file
View File

@@ -0,0 +1,114 @@
import os.path
import base64
import imaplib
import email
from email.header import decode_header
import google.auth
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from bs4 import BeautifulSoup # Import BeautifulSoup for parsing HTML
from datetime import datetime
# If modifying these SCOPES, delete the file token.json.
SCOPES = ['https://mail.google.com/']
def authenticate_gmail():
"""Shows basic usage of the Gmail API.
Lists the user's Gmail labels.
"""
creds = None
# The file token.json stores the user's access and refresh tokens, and is
# created automatically when the authorization flow completes for the first
# time.
token = r'/data/token.json'
if os.path.isfile(token) is False:
token = r'x:\substack\token.json'
cred = r'/data/client_secret_396578640529-o4dsukvomuo43j5d4j0bogg17e3e8l7f.apps.googleusercontent.com.json'
if os.path.isfile(cred) is False:
cred = r'x:\substack\client_secret_396578640529-o4dsukvomuo43j5d4j0bogg17e3e8l7f.apps.googleusercontent.com.json'
if os.path.exists(token):
creds = Credentials.from_authorized_user_file(token, SCOPES)
# If there are no (valid) credentials available, let the user log in.
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(cred, SCOPES)
creds = flow.run_local_server(port=0)
# Save the credentials for the next run
with open(token, 'w') as token:
token.write(creds.to_json())
return creds
def generate_oauth2_string(username, access_token):
return f"user={username}\1auth=Bearer {access_token}\1\1"
def decode_mime_words(s):
return ''.join(
word.decode(encoding or 'utf-8') if isinstance(word, bytes) else word
for word, encoding in decode_header(s)
)
def get_verification_link(email_user, sender_email, start_time):
creds = authenticate_gmail()
auth_string = generate_oauth2_string(email_user, creds.token)
mail = imaplib.IMAP4_SSL("imap.gmail.com")
#mail.debug = 4 # Enable IMAP debug output for more detailed logs
try:
mail.authenticate('XOAUTH2', lambda x: auth_string)
except imaplib.IMAP4.error as e:
print(f"IMAP authentication error: {e}")
return None
mail.select("inbox")
result, data = mail.search(None, f'(FROM "{sender_email}" SUBJECT "Finish signing in to Substack")')
mail_ids = data[0]
id_list = mail_ids.split()
for num in reversed(id_list): # Check the most recent emails first
result, data = mail.fetch(num, "(RFC822)")
raw_email = data[0][1]
msg = email.message_from_bytes(raw_email)
# Decode and print the email subject
subject = decode_mime_words(msg["Subject"])
# Get email date
email_date_tuple = email.utils.parsedate_tz(msg["Date"])
email_timestamp = email.utils.mktime_tz(email_date_tuple)
print(subject, start_time, email_timestamp)
if abs(email_timestamp - start_time) > 12 * 3600:
continue
if msg.is_multipart():
for part in msg.walk():
if part.get_content_type() == "text/plain":
body = part.get_payload(decode=True).decode()
for line in body.split("\n"):
if "http" in line:
return line.strip()
else:
print("-----")
body = msg.get_payload(decode=True).decode()
soup = BeautifulSoup(body, 'html.parser')
link = soup.find('a', href=True, text="Connectez-vous dès maintenant")
if link:
return link['href']
return None
if __name__ == "__main__":
email_user = "gael.honorez@gmail.com"
sender_email = "no-reply@substack.com"
verification_link = get_verification_link(email_user, sender_email)
if verification_link:
print("Verification link found:", verification_link)
else:
print("No verification link found.")

1
cookies.json Normal file
View File

@@ -0,0 +1 @@
{"__cf_bm": "95up0icsYyESvD6suTUFG05xaWxwEr5_xuHUOv32G9I-1720025055-1.0.1.1-NlvsLW9j26FX8aPpLmVETEJ0zd.VyXefLr75kvT6iC.zHnPtkbIWgfesI0VaUGuvwV62qHpctJEoahLR9TIuHQ", "ab_experiment_sampled": "%22false%22", "ab_testing_id": "%22a6e7ba67-7dc0-452c-a935-d2f2bddd5edf%22", "ajs_anonymous_id": "%22e4535e95-1c5b-4173-82db-47807c57fb38%22", "cookie_storage_key": "f666a42c-49e8-47a2-bdbc-6eece0d6a06e", "substack.sid": "s%3ARLYSI2_XaTlGuYIpTYWjS8ib48PpuE0S.jNwCzcGzKUvUAuFdLNdfgxwewTUawIoDDZ05moubvzM", "visit_id": "%7B%22id%22%3A%22a0d46be8-56f4-406f-b1d7-14c41369b737%22%2C%22timestamp%22%3A%222024-07-03T16%3A44%3A13.349Z%22%7D", "AWSALBTG": "yw2xMbYVFbKWSzJiQsdCKp7mMH+wQ5T4/JIUc1TvywUi5iIJVXuO21AMhb+oPgegicdtpekLTDTl+zWKEekRsurS7+20skhmPxZXJf/Tl7jBd/PecbW7qa3DHkPvQtWz+SWD8+7P1rNjmY9lmyZgzH/ZeGgeiishRz9gsGO0OT/d", "AWSALBTGCORS": "yw2xMbYVFbKWSzJiQsdCKp7mMH+wQ5T4/JIUc1TvywUi5iIJVXuO21AMhb+oPgegicdtpekLTDTl+zWKEekRsurS7+20skhmPxZXJf/Tl7jBd/PecbW7qa3DHkPvQtWz+SWD8+7P1rNjmY9lmyZgzH/ZeGgeiishRz9gsGO0OT/d"}

View File

@@ -1,2 +1,6 @@
requests requests
feedparser feedparser
google-auth
google-auth-oauthlib
google-auth-httplib2
beautifulsoup4

View File

@@ -8,20 +8,13 @@ import base64
import json import json
import logging import logging
import os import os
from datetime import datetime
from urllib.parse import urljoin from urllib.parse import urljoin
from pyvirtualdisplay import Display
import requests import requests
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.ui import WebDriverWait
import pickle import pickle
import time import time
from substack.exceptions import SubstackAPIException, SubstackRequestException from substack.exceptions import SubstackAPIException, SubstackRequestException
from selenium.webdriver.support import expected_conditions as EC from checkemail import get_verification_link
import datetime from datetime import datetime
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
__all__ = ["Api"] __all__ = ["Api"]
@@ -71,23 +64,29 @@ class Api:
self._session = requests.Session() self._session = requests.Session()
# Load cookies from file if provided # Load cookies from file if provided
# Helps with Captcha errors by reusing cookies from "local" auth, then switching to running code in the cloud
if cookies_path is not None: if cookies_path is not None:
if os.path.exists(cookies_path):
with open(cookies_path) as f: with open(cookies_path) as f:
cookies = json.load(f) cookies = json.load(f)
self._session.cookies.update(cookies) self._session.cookies.update(cookies)
elif email is not None and password is not None: if not os.path.exists(cookies_path) or self.are_cookies_expired(cookies):
print("Cookies are expired. Sending magic link and waiting for verification.")
start_time = time.time() # Record the time when the magic link is sent
self.send_magic_link(email) self.send_magic_link(email)
magic_link = input("Enter magic link: ") verification_link = self.wait_for_verification_link(start_time)
self.login_v2(email, password, magic_link) if verification_link:
self.login_v2(email, password, verification_link)
self.export_cookies(cookies_path) self.export_cookies(cookies_path)
else: else:
raise ValueError( raise Exception("Failed to get the verification link.")
"Must provide email and password or cookies_path to authenticate."
) elif email is not None and password is not None:
self.login(email, password)
else:
raise ValueError("Must provide email and password or cookies_path to authenticate.")
user_publication = None user_publication = None
# if the user provided a publication url, then use that # if the user provided a publication url, then use that
@@ -112,6 +111,22 @@ class Api:
# set the current publication to the users primary publication # set the current publication to the users primary publication
self.change_publication(user_publication) self.change_publication(user_publication)
def are_cookies_expired(self, cookies):
for cookie in cookies:
if 'expiry' in cookie and cookie['expiry'] < time.time():
return True
return False
def wait_for_verification_link(self, start_time):
sender_email = "no-reply@substack.com"
while True:
verification_link = get_verification_link(self.email, sender_email, start_time)
if verification_link:
return verification_link
time.sleep(10) # Wait for X seconds before checking again
def send_magic_link(self, email): def send_magic_link(self, email):
body = { body = {
"email": email, "email": email,