Я решил завести телеграм канал, помимо сайт. Ссылка на него тут.
Писать посты самому естественно не наш случай, хотелось бы автоматизацию в виде бота, который и будет заниматься этой задачей.
Давайте обрисуем ТЗ:
- Бот проверяет раз в N время посты через API wordpress и публикует в ТГ последний добавленный.
- Чтобы не было дублей, нам нужен стейт для бота, в качестве него будем использовать sqlite3 базу. В БД будем хранить всего одну таблицу с записями о том, какие посты уже были отправлены.
- Бот должен иметь конфигурационный файл, чтобы мы не парились с переменными окружения.
- Отправлять пост необходимо с картинкой поста, его названием и короткой ссылкой на сам пост на сайте.
Конфигурационный файл бота:
В корне проекта создадим файл bot_config.ini со следующим содержимом:
[wordpress]
BASE_WP_URL = https://andtree.ru
API_EP_URL = /wp-json/wp/v2/posts
SCRAPE_INTERVAL = 3600
[database]
DB_PATH = /opt/publisher_bot/bot.db
DB_TABLE_NAME = posts
DB_COLUMN_NAME = post_url
[telegram]
BOT_TOKEN = token
CHANNEL_ID = id
[logger]
BOT_LOGGER_NAME = publisher_bot
BOT_LOGFILE = /var/log/publisher_bot.log
Категория wordpress:
BASE_WP_URL — адрес домена сайта.
API_EP_URL — endpoint для API запросов.
SCRAPE_INTERVAL — этот параметр установлен в секундах и влияет на частоту опроса ботом API WP с целью получения новых постов. В данном случае стоит 1 час.
Категория database:
DB_PATH — абсолютный путь до файла базы. Так как в будущем мы будем использовать systemd для управления запуском бота, это имеет смысл, если указать относительный путь, например просто имя файла, то он будет создан в корневом каталоге. Дело в том, что systemd имеет свое «пространство» для запуска и это не тоже самое, что ввести в корне проекта python main.py.
DB_TABLE_NAME — имя нашей таблицы.
DB_COLUMN_NAME — имя колонки в таблице.
Категория telegram:
BOT_TOKEN — токен бота. Создаем в телеграм как обычно через BotFather.
CHANNEL_ID — это ID канала. Его так просто не получить, как например ID чата. Чтобы получить ID канала, можно воспользоваться вот этим ботом @LeadConverterToolkitBot. Зайдя в него нажимаете /start, потом пересылаете ему любое сообщение из канала, ID которого вы хотите получить. В ответ бот пришлет вам ID.
Категория logger:
BOT_LOGGER_NAME — это имя логгера, оно будет видно, только если мы захотим его выводить в логах. Иногда это полезно, если вы используете несколько тредов.
BOT_LOGFILE — место, куда наш бот будем писать логи.
Реализовываем класс логирования:
import logging
import logging.handlers
import json
class JSONFormatter(logging.Formatter):
def format(self, record):
log_record = {
'asctime': self.formatTime(record, self.datefmt),
'filename': record.filename,
'codeLine': record.lineno,
'levelname': record.levelname,
'message': record.getMessage(),
}
return json.dumps(log_record, ensure_ascii=False)
class Logging():
def __init__(self, logger_name, log_file):
self.logger_name = logger_name
self.logger = logging.getLogger(self.logger_name)
self.logger.setLevel(logging.INFO)
formatter = JSONFormatter()
file_handler = logging.handlers.TimedRotatingFileHandler(
log_file, when='midnight', interval=1, backupCount=3
)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
def log(self, level, message, *args):
self.logger.log(level, message, *args)
def info(self, message, *args):
self.log(logging.INFO, message, *args)
def debug(self, message, *args):
self.log(logging.DEBUG, message, *args)
def warning(self, message, *args):
self.log(logging.WARNING, message, *args)
def error(self, message, *args):
self.log(logging.ERROR, message, *args)
def critical(self, message, *args):
self.log(logging.CRITICAL, message, *args)
Класс под копирку из другого проекта с телеграм ботом, который я описывал ранее в другой статье. Обычно нечто подобное я и использую, меняя только handler в зависимости от проекта и его требований.
Реализовываем класс для работы с базой данных:
from logger import Logging
from dataclasses import dataclass
import sqlite3
@dataclass
class DbProcess:
database_path: str
column_name: str
table_name: str
logger: Logging
@staticmethod
def db_connect(database_path):
conn = sqlite3.connect(database_path)
conn.row_factory = sqlite3.Row
return conn
def create_table(self):
cursor = self.db_connect(self.database_path)
try:
cursor.execute(f"CREATE TABLE IF NOT EXISTS {self.table_name} (id INTEGER PRIMARY KEY AUTOINCREMENT, {self.column_name} TEXT)")
cursor.commit()
except sqlite3.OperationalError as err:
self.logger.error("Ошибка создания таблици: %s", err)
finally:
cursor.close()
def inser_data(self, column_data):
cursor = self.db_connect(self.database_path)
try:
cursor.execute(f"INSERT INTO {self.table_name} ({self.column_name}) VALUES (?)", (column_data,))
cursor.commit()
self.logger.info("Данные: %s записаны в таблицу: %s", column_data, self.table_name)
except sqlite3.OperationalError as err:
self.logger.error("Ошибка добавления данных в таблицу: %s", err)
finally:
cursor.close()
def select_data(self, column_data):
cursor = self.db_connect(self.database_path)
try:
select = cursor.execute(f"SELECT {self.column_name} FROM {self.table_name} WHERE {self.column_name} = '{column_data}'").fetchone()
except sqlite3.OperationalError as err:
self.logger.warning("Ошибка выполнения SQL запроса: %s", err)
finally:
cursor.close()
return select
В полях класса все переменные уже знакомы нам. Мы их описывали в конфиг файле и будем передавать их экземпляру класса при его создании. Дополнительно указываем logger поле, которое принимает сам класс логирования.
Метод db_connect():
@staticmethod
def db_connect(database_path):
conn = sqlite3.connect(database_path)
conn.row_factory = sqlite3.Row
return conn
Метод является статичным, то есть он не будет доступен через экземпляр класса, а также не может обращаться к полям класса через указатель self. Метод просто возвращает нам объект сonn через который мы будем ходить к базе данных, поэтому он вполне подходит для использования в качестве статичного метода.
conn.row_factory = sqlite3.Row — это означает, что результаты SQL-запросов будут представлены в виде объектов класса sqlite3.Row
. Эти объекты предоставляют доступ к данным в виде словаря и кортежа. Если это убрать, в ответ нам вернется строка, которую надо будет дополнительно обрабатывать, убирая лишние символы.
Метод create_table():
Данный метод будет вызываться каждый раз, когда бот делает новую итерацию проверки постов, поэтому мы указываем создавать таблицу, только если она еще не создана. Также указываем для поля id автоинкремент и передаем имя нашей колонки.
Метод insert_data():
Это метод, через который мы будем вставлять данные в нашу таблицу. По этим же данным мы в дальнейшем будем сравнивать, существует ли уже такая запись или нет и понимать, нужно ли нам отправлять ее в наш канал.
Метод select_data():
Наш select. В ответ возвращает объект класса sqlite3.Row. О нем было выше и будет дальше.
Реализовываем класс для работы с API:
import requests
from logger import Logging
from dataclasses import dataclass
from database import DbProcess
@dataclass
class ApiMethods:
base_url: str
api_ep: str
logger: Logging
db: DbProcess
@staticmethod
def send_api_requests(**kwargs):
params={'per_page': 1, 'orderby': 'date', 'order': 'desc'}
logger = kwargs.get("logger")
if kwargs.get("method") == "GET":
try:
response = requests.get(url=kwargs.get("url"), params=params)
if response.status_code == 200:
return response.json()
except requests.exceptions.RequestException as err:
logger.warning("Ошибка при отправке запроса на %s. Код ответа %s", kwargs.get("url"), response.status_code)
return None
def get_latest_post(self):
request_url = f"{self.base_url}/{self.api_ep}"
response = self.send_api_requests(method="GET", url=request_url, logger=self.logger)
post_mapping = {}
try:
latest_post = response[0]
post_url = latest_post["guid"]["rendered"]
post_title= latest_post["title"]["rendered"]
attach_url = latest_post["_links"]["wp:attachment"][0]["href"]
logo_attach_response = self.send_api_requests(method="GET", url=attach_url, logger=self.logger)
post_logo = logo_attach_response[0]["guid"]["rendered"]
post_mapping = {
"post_url": post_url,
"post_title": post_title,
"post_logo": post_logo
}
return post_mapping
except (IndexError, KeyError, TypeError, AttributeError) as err:
self.logger.error("Ошибка при чтении данных из API WP: %s", err)
return None
def upload_post_into_db(self):
make_post_in_telegram = False
column_name = self.db.column_name
post_url = self.get_latest_post()[column_name]
self.db.create_table()
check_exist_post_in_db = self.db.select_data(post_url)
if check_exist_post_in_db is None:
self.db.inser_data(post_url)
make_post_in_telegram = True
else:
if check_exist_post_in_db[column_name] != post_url:
self.db.inser_data(post_url)
make_post_in_telegram = True
return make_post_in_telegram
Давайте разбираться, что мы тут делаем.
Метод send_api_requests():
@staticmethod
def send_api_requests(**kwargs):
params={'per_page': 1, 'orderby': 'date', 'order': 'desc'}
logger = kwargs.get("logger")
if kwargs.get("method") == "GET":
try:
response = requests.get(url=kwargs.get("url"), params=params)
if response.status_code == 200:
return response.json()
except requests.exceptions.RequestException as err:
logger.warning("Ошибка при отправке запроса на %s. Код ответа %s", kwargs.get("url"), response.status_code)
return None
Реализовываем обычный GET запрос с параметрами к API WP. Мы указываем params такие, так как хотим видеть «наверху» json наш последний пост.
Метод get_lates_post():
def get_latest_post(self):
request_url = f"{self.base_url}/{self.api_ep}"
response = self.send_api_requests(method="GET", url=request_url, logger=self.logger)
post_mapping = {}
try:
latest_post = response[0]
post_url = latest_post["guid"]["rendered"]
post_title= latest_post["title"]["rendered"]
attach_url = latest_post["_links"]["wp:attachment"][0]["href"]
logo_attach_response = self.send_api_requests(method="GET", url=attach_url, logger=self.logger)
post_logo = logo_attach_response[0]["guid"]["rendered"]
post_mapping = {
"post_url": post_url,
"post_title": post_title,
"post_logo": post_logo
}
return post_mapping
except (IndexError, KeyError, TypeError, AttributeError) as err:
self.logger.error("Ошибка при чтении данных из API WP: %s", err)
return None
Здесь мы получаем последний пост в переменной latest_post. Затем получаем нужные нам данные из словаря. Помните, мы хотели отправлять имя поста, ссылку на него и картинку.
Ссылку поста и его имя мы записываем в переменные post_url и post_title соответственно. А вот с изображением схема чуть сложнее. Дело в том, что WP не дает прямую ссылку на заглавное изображения записи. Чтобы его получить, мы должны взять ссылку по пути [«_links»][«wp:attachment»], сделать GET запрос на нее и получить в ответ нужную нам ссылку на изображение. Все это мы делаем в этих строках:
attach_url = latest_post["_links"]["wp:attachment"][0]["href"]
logo_attach_response = self.send_api_requests(method="GET", url=attach_url, logger=self.logger)
post_logo = logo_attach_response[0]["guid"]["rendered"]
Далее мы делаем «маппинг» наших переменных в словарь, для дальнейшей удобной работы с данными.
Метод upload_post_into_db():
def upload_post_into_db(self):
make_post_in_telegram = False
column_name = self.db.column_name
post_url = self.get_latest_post()[column_name]
self.db.create_table()
check_exist_post_in_db = self.db.select_data(post_url)
if check_exist_post_in_db is None:
self.db.inser_data(post_url)
make_post_in_telegram = True
else:
if check_exist_post_in_db[column_name] != post_url:
self.db.inser_data(post_url)
make_post_in_telegram = True
return make_post_in_telegram
Тут мы вводим bool переменную make_post_in_telegram, по ней в дальнейшем бот будет понимать, делать ли пост в телеграм или нет. Получаем урл нашего поста, именно это значение будет лежать в базе. Затем вызываем метод select_data(post_url), чтобы проверить есть ли такой урл в базе. При первом обращении, если записи нет, нам в ответ вернется None, это значит, что мы должны сделать запись в БД с урлом и сказать боту, что делать пост нужно. При повторном обращении, при условии, что новый пост еще не появился, запись с текущим последним урл поста уже будет в базе, в ответ нам вернется не None, а тот самый объект класса sqllite3.Row, который хранит в себе словарь. Следовательно мы его извлекаем тут check_exist_post_in_db[column_name] и сравниваем с текущим урлом записи, если они не равны, делаем запись в базу и говорим боту, что можно отправлять сообщения в телеграм.
Результатом работы этого метода будем значение True или False в переменной make_post_in_telegram.
Реализовываем основную логику в main.py:
from logger import Logging
from api_methods import ApiMethods
from database import DbProcess
from time import sleep
import telebot
import configparser
BOT_CONFIG = "/opt/publisher_bot/bot_config.ini"
config_parser = configparser.ConfigParser()
config_parser.read(BOT_CONFIG)
BASE_URL = config_parser.get("wordpress", "BASE_WP_URL")
API_EP_URL = config_parser.get("wordpress", "API_EP_URL")
DB_PATH = config_parser.get("database", "DB_PATH")
DB_TABLE_NAME = config_parser.get("database", "DB_TABLE_NAME")
DB_COLUMN_NAME = config_parser.get("database", "DB_COLUMN_NAME")
BOT_TOKEN = config_parser.get("telegram", "BOT_TOKEN")
CHANNEL_ID = config_parser.get("telegram", "CHANNEL_ID")
BOT_LOGGER_NAME = config_parser.get("logger", "BOT_LOGGER_NAME")
BOT_LOGFILE = config_parser.get("logger", "BOT_LOGFILE")
SCRAPE_INTERVAL = int(config_parser.get("wordpress", "SCRAPE_INTERVAL"))
logger = Logging(BOT_LOGGER_NAME, BOT_LOGFILE)
db = DbProcess(DB_PATH, DB_COLUMN_NAME, DB_TABLE_NAME, logger)
api = ApiMethods(BASE_URL, API_EP_URL, logger, db)
bot = telebot.TeleBot(BOT_TOKEN)
def send_post_to_channel():
post_mapping = api.get_latest_post()
is_make_post = api.upload_post_into_db()
if is_make_post is True:
message = (
f"Новая статья:\n"
f"{post_mapping['post_title']}\n\n"
f"{post_mapping['post_url']}\n"
)
bot.send_photo(CHANNEL_ID, post_mapping["post_logo"], message)
logger.info("Пост %s успешно отправлен в канал: %s", post_mapping["post_url"], CHANNEL_ID)
def main():
while True:
send_post_to_channel()
sleep(SCRAPE_INTERVAL)
if __name__ == "__main__":
main()
BOT_CONFIG — указываем абсолютный путь до созданного в начале конфигурационного файла бота.
Далее мы используем либу configparser для извлечения данных из конфиг файла и присваиваем их соответствующим переменным.
SCRAPE_INTERVAL — эту переменную обязательно приводим к типу int, иначе sleep работать не будет.
logger = Logging(BOT_LOGGER_NAME, BOT_LOGFILE)
db = DbProcess(DB_PATH, DB_COLUMN_NAME, DB_TABLE_NAME, logger)
api = ApiMethods(BASE_URL, API_EP_URL, logger, db)
bot = telebot.TeleBot(BOT_TOKEN)
Создаем наши экземпляры классов.
Метод send_post_to_channel():
def send_post_to_channel():
post_mapping = api.get_latest_post()
is_make_post = api.upload_post_into_db()
if is_make_post is True:
message = (
f"Новая статья:\n"
f"{post_mapping['post_title']}\n\n"
f"{post_mapping['post_url']}\n"
)
bot.send_photo(CHANNEL_ID, post_mapping["post_logo"], message)
logger.info("Пост %s успешно отправлен в канал: %s", post_mapping["post_url"], CHANNEL_ID)
Получаем переменную с маппингом наших данных (URL статьи, ее имя, картинка). Далее в переменную is_make_post получаем булевое значение, если оно True мы формируем сообщение в переменной message, а затем используем метод send_photo, чтобы отправить нашу картинку и сообщение в виде подписи. Выглядеть это будет примерно так:
В main() методе мы просто создаем бесконечный цикл с вызовом метода выше и метода sleep, куда передаем наш интервал из конфиг файла.
Финальные штрихи:
Делаем systemd сервис:
[Unit]
Description=Pusblisher bot
After=multi-user.target
[Service]
Type=simple
Restart=always
ExecStart=/usr/bin/python3 /opt/publisher_bot/main.py
[Install]
WantedBy=multi-user.target
Делаем конфиг для logrotate:
/var/log/publisher_bot.log* {
daily
missingok
rotate 3
compress
dateext
dateformat -%Y-%m-%d
create 0644 root root
}