Пишем бота для публикации постов в телеграм канал

0
(0)

Я решил завести телеграм канал, помимо сайт. Ссылка на него тут.

Писать посты самому естественно не наш случай, хотелось бы автоматизацию в виде бота, который и будет заниматься этой задачей.

Давайте обрисуем ТЗ:

  1. Бот проверяет раз в N время посты через API wordpress и публикует в ТГ последний добавленный.
  2. Чтобы не было дублей, нам нужен стейт для бота, в качестве него будем использовать sqlite3 базу. В БД будем хранить всего одну таблицу с записями о том, какие посты уже были отправлены.
  3. Бот должен иметь конфигурационный файл, чтобы мы не парились с переменными окружения.
  4. Отправлять пост необходимо с картинкой поста, его названием и короткой ссылкой на сам пост на сайте.

Конфигурационный файл бота:

В корне проекта создадим файл bot_config.ini со следующим содержимом:

Bash
[wordpress]
BASE_WP_URL = https://andtree.ru
API_EP_URL = /wp-json/wp/v2/posts
SCRAPE_INTERVAL = 3600
[database]
DB_PATH = /opt/publisher_bot/bot.db
DB_TABLE_NAME = posts
DB_COLUMN_NAME = post_url
[telegram]
BOT_TOKEN = token
CHANNEL_ID = id
[logger]
BOT_LOGGER_NAME = publisher_bot
BOT_LOGFILE = /var/log/publisher_bot.log

Категория wordpress:

BASE_WP_URL — адрес домена сайта.

API_EP_URL — endpoint для API запросов.

SCRAPE_INTERVAL — этот параметр установлен в секундах и влияет на частоту опроса ботом API WP с целью получения новых постов. В данном случае стоит 1 час.

Категория database:

DB_PATH — абсолютный путь до файла базы. Так как в будущем мы будем использовать systemd для управления запуском бота, это имеет смысл, если указать относительный путь, например просто имя файла, то он будет создан в корневом каталоге. Дело в том, что systemd имеет свое «пространство» для запуска и это не тоже самое, что ввести в корне проекта python main.py.

DB_TABLE_NAME — имя нашей таблицы.

DB_COLUMN_NAME — имя колонки в таблице.

Категория telegram:

BOT_TOKEN — токен бота. Создаем в телеграм как обычно через BotFather.

CHANNEL_ID — это ID канала. Его так просто не получить, как например ID чата. Чтобы получить ID канала, можно воспользоваться вот этим ботом @LeadConverterToolkitBot. Зайдя в него нажимаете /start, потом пересылаете ему любое сообщение из канала, ID которого вы хотите получить. В ответ бот пришлет вам ID.

Категория logger:

BOT_LOGGER_NAME — это имя логгера, оно будет видно, только если мы захотим его выводить в логах. Иногда это полезно, если вы используете несколько тредов.

BOT_LOGFILE — место, куда наш бот будем писать логи.

Реализовываем класс логирования:

Python
import logging
import logging.handlers
import json

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_record = {
            'asctime': self.formatTime(record, self.datefmt),
            'filename': record.filename,
            'codeLine': record.lineno,
            'levelname': record.levelname,
            'message': record.getMessage(),
        }
        return json.dumps(log_record, ensure_ascii=False)

class Logging():
    def __init__(self, logger_name, log_file):
        self.logger_name = logger_name
        self.logger = logging.getLogger(self.logger_name)
        self.logger.setLevel(logging.INFO)
        formatter = JSONFormatter()
        file_handler = logging.handlers.TimedRotatingFileHandler(
            log_file, when='midnight', interval=1, backupCount=3
        )
        file_handler.setLevel(logging.INFO)
        file_handler.setFormatter(formatter)
        self.logger.addHandler(file_handler) 

    def log(self, level, message, *args):
        self.logger.log(level, message, *args)

    def info(self, message, *args):
        self.log(logging.INFO, message, *args)

    def debug(self, message, *args):
        self.log(logging.DEBUG, message, *args)

    def warning(self, message, *args):
        self.log(logging.WARNING, message, *args)

    def error(self, message, *args):
        self.log(logging.ERROR, message, *args)

    def critical(self, message, *args):
        self.log(logging.CRITICAL, message, *args)

Класс под копирку из другого проекта с телеграм ботом, который я описывал ранее в другой статье. Обычно нечто подобное я и использую, меняя только handler в зависимости от проекта и его требований.

Реализовываем класс для работы с базой данных:

Python
from logger import Logging
from dataclasses import dataclass
import sqlite3

@dataclass
class DbProcess:
    database_path: str
    column_name: str
    table_name: str
    logger: Logging

    @staticmethod
    def db_connect(database_path):
        conn = sqlite3.connect(database_path)
        conn.row_factory = sqlite3.Row
        return conn
    
    def create_table(self):
        cursor = self.db_connect(self.database_path)
        try:
            cursor.execute(f"CREATE TABLE IF NOT EXISTS {self.table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT, {self.column_name} TEXT)")
            cursor.commit()
        except sqlite3.OperationalError as err:
            self.logger.error("Ошибка создания таблици: %s", err)
        finally:
            cursor.close()

    def inser_data(self, column_data):
        cursor = self.db_connect(self.database_path)
        try:
            cursor.execute(f"INSERT INTO {self.table_name} ({self.column_name}) VALUES (?)", (column_data,))
            cursor.commit()
            self.logger.info("Данные: %s записаны в таблицу: %s", column_data, self.table_name)
        except sqlite3.OperationalError as err:
            self.logger.error("Ошибка добавления данных в таблицу: %s", err)
        finally:
            cursor.close()

    def select_data(self, column_data):
        cursor = self.db_connect(self.database_path)
        try:
            select = cursor.execute(f"SELECT {self.column_name} FROM {self.table_name} WHERE {self.column_name} = '{column_data}'").fetchone()
        except sqlite3.OperationalError as err:
            self.logger.warning("Ошибка выполнения SQL запроса: %s", err)
        finally:
            cursor.close()
        return select

В полях класса все переменные уже знакомы нам. Мы их описывали в конфиг файле и будем передавать их экземпляру класса при его создании. Дополнительно указываем logger поле, которое принимает сам класс логирования.

Метод db_connect():

Python
@staticmethod
def db_connect(database_path):
    conn = sqlite3.connect(database_path)
    conn.row_factory = sqlite3.Row
    return conn

Метод является статичным, то есть он не будет доступен через экземпляр класса, а также не может обращаться к полям класса через указатель self. Метод просто возвращает нам объект сonn через который мы будем ходить к базе данных, поэтому он вполне подходит для использования в качестве статичного метода.

conn.row_factory = sqlite3.Row — это означает, что результаты SQL-запросов будут представлены в виде объектов класса sqlite3.Row. Эти объекты предоставляют доступ к данным в виде словаря и кортежа. Если это убрать, в ответ нам вернется строка, которую надо будет дополнительно обрабатывать, убирая лишние символы.

Метод create_table():

Данный метод будет вызываться каждый раз, когда бот делает новую итерацию проверки постов, поэтому мы указываем создавать таблицу, только если она еще не создана. Также указываем для поля id автоинкремент и передаем имя нашей колонки.

Метод insert_data():

Это метод, через который мы будем вставлять данные в нашу таблицу. По этим же данным мы в дальнейшем будем сравнивать, существует ли уже такая запись или нет и понимать, нужно ли нам отправлять ее в наш канал.

Метод select_data():

Наш select. В ответ возвращает объект класса sqlite3.Row. О нем было выше и будет дальше.

Реализовываем класс для работы с API:

Python
import requests
from logger import Logging
from dataclasses import dataclass
from database import DbProcess

@dataclass
class ApiMethods:
    base_url: str
    api_ep: str
    logger: Logging
    db: DbProcess

    @staticmethod
    def send_api_requests(**kwargs):
        params={'per_page': 1, 'orderby': 'date', 'order': 'desc'}
        logger = kwargs.get("logger")
        if kwargs.get("method") == "GET":
            try:
                response = requests.get(url=kwargs.get("url"), params=params)
                if response.status_code == 200:
                    return response.json()
            except requests.exceptions.RequestException as err:
                logger.warning("Ошибка при отправке запроса на %s. Код ответа %s", kwargs.get("url"), response.status_code)
        return None

    def get_latest_post(self):
        request_url = f"{self.base_url}/{self.api_ep}"
        response = self.send_api_requests(method="GET", url=request_url, logger=self.logger)
        post_mapping = {}
        try:
            latest_post = response[0]
            post_url = latest_post["guid"]["rendered"]
            post_title= latest_post["title"]["rendered"]
            attach_url = latest_post["_links"]["wp:attachment"][0]["href"]
            logo_attach_response = self.send_api_requests(method="GET", url=attach_url, logger=self.logger)
            post_logo = logo_attach_response[0]["guid"]["rendered"]
            post_mapping = {
                "post_url": post_url,
                "post_title": post_title,
                "post_logo": post_logo
            }
            return post_mapping
        except (IndexError, KeyError, TypeError, AttributeError) as err:
            self.logger.error("Ошибка при чтении данных из API WP: %s", err)
        return None

    def upload_post_into_db(self):
        make_post_in_telegram = False
        column_name = self.db.column_name
        post_url = self.get_latest_post()[column_name]
        self.db.create_table()
        check_exist_post_in_db = self.db.select_data(post_url)
        if check_exist_post_in_db is None:
            self.db.inser_data(post_url)
            make_post_in_telegram = True
        else:
            if check_exist_post_in_db[column_name] != post_url:
                self.db.inser_data(post_url)
                make_post_in_telegram = True
        return make_post_in_telegram

Давайте разбираться, что мы тут делаем.

Метод send_api_requests():

Python
@staticmethod
def send_api_requests(**kwargs):
    params={'per_page': 1, 'orderby': 'date', 'order': 'desc'}
    logger = kwargs.get("logger")
    if kwargs.get("method") == "GET":
        try:
            response = requests.get(url=kwargs.get("url"), params=params)
            if response.status_code == 200:
                return response.json()
        except requests.exceptions.RequestException as err:
            logger.warning("Ошибка при отправке запроса на %s. Код ответа %s", kwargs.get("url"), response.status_code)
    return None

Реализовываем обычный GET запрос с параметрами к API WP. Мы указываем params такие, так как хотим видеть «наверху» json наш последний пост.

Метод get_lates_post():

Python
def get_latest_post(self):
    request_url = f"{self.base_url}/{self.api_ep}"
    response = self.send_api_requests(method="GET", url=request_url, logger=self.logger)
    post_mapping = {}
    try:
        latest_post = response[0]
        post_url = latest_post["guid"]["rendered"]
        post_title= latest_post["title"]["rendered"]
        attach_url = latest_post["_links"]["wp:attachment"][0]["href"]
        logo_attach_response = self.send_api_requests(method="GET", url=attach_url, logger=self.logger)
        post_logo = logo_attach_response[0]["guid"]["rendered"]
        post_mapping = {
            "post_url": post_url,
            "post_title": post_title,
            "post_logo": post_logo
        }
        return post_mapping
    except (IndexError, KeyError, TypeError, AttributeError) as err:
        self.logger.error("Ошибка при чтении данных из API WP: %s", err)
    return None

Здесь мы получаем последний пост в переменной latest_post. Затем получаем нужные нам данные из словаря. Помните, мы хотели отправлять имя поста, ссылку на него и картинку.

Ссылку поста и его имя мы записываем в переменные post_url и post_title соответственно. А вот с изображением схема чуть сложнее. Дело в том, что WP не дает прямую ссылку на заглавное изображения записи. Чтобы его получить, мы должны взять ссылку по пути [«_links»][«wp:attachment»], сделать GET запрос на нее и получить в ответ нужную нам ссылку на изображение. Все это мы делаем в этих строках:

Python
attach_url = latest_post["_links"]["wp:attachment"][0]["href"]
logo_attach_response = self.send_api_requests(method="GET", url=attach_url, logger=self.logger)
post_logo = logo_attach_response[0]["guid"]["rendered"]

Далее мы делаем «маппинг» наших переменных в словарь, для дальнейшей удобной работы с данными.

Метод upload_post_into_db():

Python
def upload_post_into_db(self):
    make_post_in_telegram = False
    column_name = self.db.column_name
    post_url = self.get_latest_post()[column_name]
    self.db.create_table()
    check_exist_post_in_db = self.db.select_data(post_url)
    if check_exist_post_in_db is None:
        self.db.inser_data(post_url)
        make_post_in_telegram = True
    else:
        if check_exist_post_in_db[column_name] != post_url:
            self.db.inser_data(post_url)
            make_post_in_telegram = True
    return make_post_in_telegram

Тут мы вводим bool переменную make_post_in_telegram, по ней в дальнейшем бот будет понимать, делать ли пост в телеграм или нет. Получаем урл нашего поста, именно это значение будет лежать в базе. Затем вызываем метод select_data(post_url), чтобы проверить есть ли такой урл в базе. При первом обращении, если записи нет, нам в ответ вернется None, это значит, что мы должны сделать запись в БД с урлом и сказать боту, что делать пост нужно. При повторном обращении, при условии, что новый пост еще не появился, запись с текущим последним урл поста уже будет в базе, в ответ нам вернется не None, а тот самый объект класса sqllite3.Row, который хранит в себе словарь. Следовательно мы его извлекаем тут check_exist_post_in_db[column_name] и сравниваем с текущим урлом записи, если они не равны, делаем запись в базу и говорим боту, что можно отправлять сообщения в телеграм.

Результатом работы этого метода будем значение True или False в переменной make_post_in_telegram.

Реализовываем основную логику в main.py:

Python
from logger import Logging
from api_methods import ApiMethods
from database import DbProcess
from time import sleep
import telebot
import configparser

BOT_CONFIG = "/opt/publisher_bot/bot_config.ini"

config_parser = configparser.ConfigParser()
config_parser.read(BOT_CONFIG)

BASE_URL = config_parser.get("wordpress", "BASE_WP_URL")
API_EP_URL = config_parser.get("wordpress", "API_EP_URL")
DB_PATH = config_parser.get("database", "DB_PATH")
DB_TABLE_NAME = config_parser.get("database", "DB_TABLE_NAME")
DB_COLUMN_NAME = config_parser.get("database", "DB_COLUMN_NAME")
BOT_TOKEN = config_parser.get("telegram", "BOT_TOKEN")
CHANNEL_ID = config_parser.get("telegram", "CHANNEL_ID")
BOT_LOGGER_NAME = config_parser.get("logger", "BOT_LOGGER_NAME")
BOT_LOGFILE = config_parser.get("logger", "BOT_LOGFILE")
SCRAPE_INTERVAL = int(config_parser.get("wordpress", "SCRAPE_INTERVAL"))

logger = Logging(BOT_LOGGER_NAME, BOT_LOGFILE)
db = DbProcess(DB_PATH, DB_COLUMN_NAME, DB_TABLE_NAME, logger)
api = ApiMethods(BASE_URL, API_EP_URL, logger, db)
bot = telebot.TeleBot(BOT_TOKEN)

def send_post_to_channel():
    post_mapping = api.get_latest_post()
    is_make_post = api.upload_post_into_db()
    if is_make_post is True:
        message = (
            f"Новая статья:\n"
            f"{post_mapping['post_title']}\n\n"
            f"{post_mapping['post_url']}\n"
        )
        bot.send_photo(CHANNEL_ID, post_mapping["post_logo"], message)
        logger.info("Пост %s успешно отправлен в канал: %s", post_mapping["post_url"], CHANNEL_ID)

def main():
    while True:
        send_post_to_channel()
        sleep(SCRAPE_INTERVAL)

if __name__ == "__main__":
    main()

BOT_CONFIG — указываем абсолютный путь до созданного в начале конфигурационного файла бота.

Далее мы используем либу configparser для извлечения данных из конфиг файла и присваиваем их соответствующим переменным.

SCRAPE_INTERVAL — эту переменную обязательно приводим к типу int, иначе sleep работать не будет.

Python
logger = Logging(BOT_LOGGER_NAME, BOT_LOGFILE)
db = DbProcess(DB_PATH, DB_COLUMN_NAME, DB_TABLE_NAME, logger)
api = ApiMethods(BASE_URL, API_EP_URL, logger, db)
bot = telebot.TeleBot(BOT_TOKEN)

Создаем наши экземпляры классов.

Метод send_post_to_channel():

Python
def send_post_to_channel():
    post_mapping = api.get_latest_post()
    is_make_post = api.upload_post_into_db()
    if is_make_post is True:
        message = (
            f"Новая статья:\n"
            f"{post_mapping['post_title']}\n\n"
            f"{post_mapping['post_url']}\n"
        )
        bot.send_photo(CHANNEL_ID, post_mapping["post_logo"], message)
        logger.info("Пост %s успешно отправлен в канал: %s", post_mapping["post_url"], CHANNEL_ID)

Получаем переменную с маппингом наших данных (URL статьи, ее имя, картинка). Далее в переменную is_make_post получаем булевое значение, если оно True мы формируем сообщение в переменной message, а затем используем метод send_photo, чтобы отправить нашу картинку и сообщение в виде подписи. Выглядеть это будет примерно так:

В main() методе мы просто создаем бесконечный цикл с вызовом метода выше и метода sleep, куда передаем наш интервал из конфиг файла.

Финальные штрихи:

Делаем systemd сервис:

Plaintext
[Unit]
Description=Pusblisher bot
After=multi-user.target
[Service]
Type=simple
Restart=always
ExecStart=/usr/bin/python3 /opt/publisher_bot/main.py
[Install]
WantedBy=multi-user.target

Делаем конфиг для logrotate:

Plaintext
/var/log/publisher_bot.log* {
    daily
    missingok
    rotate 3
    compress
    dateext
    dateformat -%Y-%m-%d
    create 0644 root root
}

Насколько статья полезна?

Нажмите на звезду, чтобы оценить!

Средняя оценка 0 / 5. Количество оценок: 0

Оценок пока нет. Поставьте оценку первым.

Оставить комментарий