import asyncio import argparse import requests import feedparser import io import html import datetime import logging import os import re from logging.handlers import RotatingFileHandler import random from substack import Api from substack.post import Post LOG = logging.getLogger('bot') LOG_PATTERN = logging.Formatter('%(asctime)s:%(levelname)s: [%(filename)s] %(message)s') def setuplogger(): conf_filename = None steam_handler = logging.StreamHandler() steam_handler.setFormatter(LOG_PATTERN) steam_handler.setLevel(logging.DEBUG) def setup_logger(logger_name, file_name=None, add_steam=False): file_name = file_name or logger_name log_filename = f"{file_name}.log" logger = logging.getLogger(logger_name) logger.setLevel(logging.DEBUG) file_handler = RotatingFileHandler(log_filename, "a", 1000000, 1) file_handler.setFormatter(LOG_PATTERN) logger.addHandler(file_handler) if add_steam: logger.addHandler(steam_handler) setup_logger("bot", conf_filename, True) class RSSfeed(): def __init__(self, url, yt=False): self.url = url self.youtube = yt class SubStackTask: def __init__(self, login, password, account, feeds): self.api = Api( email=login, password=password, publication_url=account, ) self.user_id = self.api.get_user_id() self.feeds = feeds def get_fr_date(self): # Mapping of English month names to French months_en_to_fr = { 'January': 'Janvier', 'February': 'Février', 'March': 'Mars', 'April': 'Avril', 'May': 'Mai', 'June': 'Juin', 'July': 'Juillet', 'August': 'Août', 'September': 'Septembre', 'October': 'Octobre', 'November': 'Novembre', 'December': 'Décembre' } today = datetime.datetime.now() formatted_date = today.strftime("%d %B %Y") # Replace the English month with the French month for en, fr in months_en_to_fr.items(): formatted_date = formatted_date.replace(en, fr) return formatted_date async def run_daily_at_6_am(self): while True: now = datetime.datetime.now() # Calculate the time until 6 AM next day next_run = (now + datetime.timedelta(days=1)).replace(hour=6, minute=5, second=0, microsecond=0) sleep_seconds = (next_run - now).total_seconds() # Wait until the next run time await asyncio.sleep(sleep_seconds) # Run the daily task await self.daily_task() async def daily_task(self): title_post = "Les news du " + self.get_fr_date() sub_stack_post = Post( title=title_post, subtitle="", user_id=self.user_id ) midnight_today = datetime.datetime.now(datetime.timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) yesterday_6am = datetime.datetime.now(datetime.timezone.utc).replace(hour=6, minute=0, second=0, microsecond=0) - datetime.timedelta(days=1) formatted_date = midnight_today.strftime('%a, %d %b %Y %H:%M:%S %z') all_news_posts = [] for feed in self.feeds: print(feed.url) html_text = requests.get(feed.url).text newsFeed = feedparser.parse(html_text) if feed.youtube is True: new_posts = [entry for entry in newsFeed.entries if datetime.datetime.fromisoformat(entry.published) > yesterday_6am] else: new_posts = [entry for entry in newsFeed.entries if datetime.datetime.strptime(entry.published.replace('GMT', '+0000'), '%a, %d %b %Y %H:%M:%S %z') > yesterday_6am] all_news_posts.extend(new_posts) random.shuffle(all_news_posts) for post in all_news_posts: linkURL = post["link"] title = post["title"] ftext = "" if "summary" in post: ftext = html.unescape(post["summary"]) # Using regular expressions to remove HTML tags ftext = re.sub('<[^<]+?>', '', ftext) pattern = r"L’article .* est apparu en premier sur .*" ftext = re.sub(pattern, '', ftext) if "yt_videoid" in post: sub_stack_post.add({"type":"heading", "level":3, "content": title}) videoId = post["yt_videoid"] print(videoId) sub_stack_post.add({"type":"youtube2", "src": videoId }) sub_stack_post.add({'type': 'paragraph', 'content': [ {'content': linkURL, 'marks': [{'type': "link", 'href': linkURL}]}]}) else: if ftext != "": sub_stack_post.add({"type":"heading", "level":3, "content": title}) sub_stack_post.add({"type":"paragraph", "content": ftext }) sub_stack_post.add({'type': 'paragraph', 'content': [ {'content': linkURL, 'marks': [{'type': "link", 'href': linkURL}]}]}) if "links" in post: for link in post["links"]: if link["type"] == "image/jpg": imgUrl = link["href"] sub_stack_post.add({'type': 'captionedImage', 'src': imgUrl}) sub_stack_post.add({"type":"horizontal_rule"}) sub_stack_post.add({"type":"heading", "level":3, "content": "Sources"}) for feed in self.feeds: sub_stack_post.add({'type': 'paragraph', 'content': [ {'content': feed.url, 'marks': [{'type': "link", 'href': feed.url}]}]}) sub_stack_post.add({"type":"subscribeWidget", "message":"Abonnez-vous gratuitement pour recevoir chaque jour les news dans votre e-mail et soutenir mon travail."}) draft = self.api.post_draft(sub_stack_post.get_draft()) self.api.prepublish_draft(draft.get("id")) self.api.publish_draft(draft.get("id")) async def main(login, password, account): setuplogger() if os.path.exists("last_scan_date.txt"): with open("last_scan_date.txt", "r") as f: last_post_date = datetime.datetime.strptime(f.read().strip(), '%a, %d %b %Y %H:%M:%S %z') else: last_post_date = datetime.datetime.min.replace(tzinfo=datetime.timezone.utc) feeds = [] feeds.append(RSSfeed("https://www.factornews.com/rss.xml")) feeds.append(RSSfeed("https://nofrag.com/feed")) feeds.append(RSSfeed("https://dystopeek.fr/feed/")) feeds.append(RSSfeed("https://thepixelpost.com/rss/")) feeds.append(RSSfeed("https://yamukass.substack.com/feed")) feeds.append(RSSfeed("https://tseret.com/categorie/tests/feed")) feeds.append(RSSfeed("https://www.gamesidestory.com/feed")) feeds.append(RSSfeed("https://www.nintendo-town.fr/feed")) feeds.append(RSSfeed("https://www.youtube.com/feeds/videos.xml?channel_id=UC-OvBDfZGn1OdsqMBwkOI_A", True)) feeds.append(RSSfeed("https://www.youtube.com/feeds/videos.xml?playlist_id=PLZRiqJjIUlDTrwYs_UqEIts5fVaBpaIEz", True)) task = SubStackTask(login, password, account, feeds) await task.run_daily_at_6_am() #await task.daily_task() if __name__ == "__main__": asyncio.run(main("gael.honorez@gmail.com", "f3PaTGedjFc2gkr1ypi5", "https://aggregateurjvfr.substack.com"))