Существует инструмент для реализации алертинга на основе логов, называется elastalert. Он подходит для 5 и 6 elasticsearch, но по факту его можно успешно использовать с 7 и 8. Также на больших объемах логов (индексы в 10ки терабайт в сутки) мы столкнулись с проблемой, elastalert намертво зависает, без каких-либо ошибок и прочего.
Интеграция с 8 elasticsearch
Для интеграции с 8 еластиком, нужно всего лишь добавить одно поле в правило elastalert — doc_type: «»
name: "Too many 529 response code Android"
type: frequency
index: android-*
use_count_query: True
doc_type: ""
num_events: 3000
run_every:
minutes: 3
timeframe:
minutes: 3
filter:
- term:
response: "529"
При таком подходе, elastalert будет пытаться стянуть все поля, что есть в логах, из-за чего на больших объемах он зависает. Можно добавить параметр use_count_query: True, тогда будет возращаться небольшой json вида
[{'@timestamp': '2024-06-28T10:46:09.106546Z', 'num_hits': 17422, 'num_matches': 1}]
Работает реактивно быстро, без зависаний и отлично подходит для накопительных алертов, например в правиле выше, при нахождении 3000 и более 529 ошибок в индексе, за 3 минуты, мы получаем алерт. Все бы хорошо, но во-первых мы хотим видеть реальное кол-во ошибок за данный период, а по дефолту elastalert делает выборку за время, указанное в run_every основного конфиг файла, а не конфиг файла правила. Следовательно в json в поле num_hits мы видим общее кол-во логов за период из глобального параметра run_every, а не нашего параметра timeframe внутри правила. Казалось бы, решение простое, сделать глобально параметр run_every равный timeframe внутри каждых правил, тогда мы в алертинге увидим корректное кол-во ошибок за период, но это не гибко, правила разные, как и временные рамки. Собственно тут я приведу код, который это правит.
Правки в elastalert
Во-первых я решил переписать сборку на poetry
Файл pyproject.toml
[tool.poetry]
name = "elastalert"
version = "0.0.1"
description = "Custom elastalert"
authors = ["Quentin Long <qlo@yelp.com>", "Andrei Trusikhin <modern@for.project"]
include = ["schema.yaml", "es_mappings/**/*.json"]
[tool.poetry.dependencies]
python = ">=3.6,<3.8"
apscheduler = "^3.3.0"
aws-requests-auth = "^0.3.0"
blist = "^1.3.6"
boto3 = "^1.4.4"
configparser = "^3.5.0"
croniter = "^0.3.16"
elasticsearch = "==7.0.0"
envparse = "^0.2.0"
exotel = "^0.1.3"
jira = "^2.0.0"
jsonschema = "^3.0.2"
mock = "^2.0.0"
prison = "^0.1.2"
PyStaticConfiguration = "^0.10.3"
python-dateutil = ">=2.6.0,<2.7.0"
PyYAML = "^5.1"
requests = "^2.10.0"
"stomp.py" = "^4.1.17"
texttable = "^0.8.8"
twilio = ">=6.0.0,<6.1"
cffi = "^1.11.5"
[build-system]
requires = ["poetry-core>=1.0.0", "poetry-dynamic-versioning>=1.0.0,<2.0.0"]
build-backend = "poetry_dynamic_versioning.backend"
[[tool.poetry.source]]
name = "gitlab_elastalert"
url = "pypiregistry"
[tool.poetry.scripts]
elastalert-create-index = "elastalert.create_index:main"
elastalert-test-rule = "elastalert.test_rule:main"
elastalert-rule-from-kibana = "elastalert.rule_from_kibana:main"
elastalert = "elastalert.elastalert:main"
В CI я использую небольшой bash скрипт для релиза пакета, собственно для него и нужен poetry_dynamic_version
#!/bin/bash
function pre_init() {
apt-get update && apt-get install git python3-pip -y
pip3.10 install -r requirements.txt
poetry config certificates.gitlab_elastalert.cert /usr/local/share/ca-certificates/rca.crt
poetry self add "poetry-dynamic-versioning[plugin]"
}
function build_and_publish() {
poetry env use python3.7
poetry install
poetry build
poetry publish --repository gitlab_elastalert -u $CI_DEPLOY_USER -p $CI_DEPLOY_PASSWORD
}
function get_version() {
cat $PYPROJECT_CONFIG | grep -w "version" | awk '{print $3}'|tr -d '"'
}
function git_fetch() {
git config --global --add safe.directory '*'
git remote set-url origin "https://oauth2:$BUMP_TOKEN@$CI_SERVER_HOST/$CI_PROJECT_PATH.git"
git config --global user.name $GITLAB_USER_NAME
git config --global user.email $GITLAB_USER_EMAIL
git fetch
git checkout $CI_COMMIT_REF_NAME
git reset --hard "origin/$CI_COMMIT_REF_NAME"
}
function git_commit() {
git commit -am "CI. Version changed $1 -> $2"
git push origin $CI_COMMIT_REF_NAME
}
function main() {
pre_init
git_fetch
CURRENT_VERSION=$(get_version)
poetry version $1
NEW_VERSION=$(get_version)
echo -e "\033[0;32mCURRENT_VERSION: $CURRENT_VERSION\nNEW_VERSION: $NEW_VERSION\e[0m"
build_and_publish
git_commit "${CURRENT_VERSION}" "${NEW_VERSION}"
}
if [[ $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH ]];then
main "patch"
else
main "prerelease"
fi;
Логика тут простая. При комите не в DEFAULT_BRANCH в pyproject.toml в поле version увеличиться патч версия на одно значение вперед, а также добавить число-буквенное значение. При следующих комитах число-буквенное значение будет менять и следовательно будет легко установить любую тестовую версию. При комите в DEFAULT_BRANCH то, число-буквенное значение отбрасывается и остается только уже увеличенная патч версия.
Кусок из gitlab-ci
release:
stage: release
variables:
PYPROJECT_CONFIG: "pyproject.toml"
image: python3.7
script:
- ./release.sh
rules:
- { when: manual }
Касательно правок в самом коде. Нам нужно, чтобы run_every параметр можно было считывать из конфига самого правила и чтобы он учитывался для каждого созданного правила.
В elastalert.py в методе start приводим код к следующему виду:
def start(self):
""" Periodically go through each rule and run it """
if self.starttime:
if self.starttime == 'NOW':
self.starttime = ts_now()
else:
try:
self.starttime = ts_to_dt(self.starttime)
except (TypeError, ValueError):
self.handle_error("%s is not a valid ISO8601 timestamp (YYYY-MM-DDTHH:MM:SS+XX:00)" % (self.starttime))
exit(1)
for rule in self.rules:
rule['initial_starttime'] = self.starttime
self.wait_until_responsive(timeout=self.args.timeout)
self.running = True
elastalert_logger.info("Starting up")
for rule in self.rules:
run_every = rule.get('run_every', self.run_every)
if isinstance(run_every, dict):
run_every = datetime.timedelta(**run_every)
self.scheduler.add_job(self.handle_pending_alerts, 'interval',
seconds=run_every.total_seconds(), id=f'_internal_handle_pending_alerts{rule["name"]}')
self.scheduler.add_job(self.handle_config_change, 'interval',
seconds=run_every.total_seconds(), id=f'_internal_handle_config_change{rule["name"]}')
self.scheduler.start()
while self.running:
next_run = datetime.datetime.utcnow() + run_every
# Quit after end_time has been reached
if self.args.end:
endtime = ts_to_dt(self.args.end)
if next_run.replace(tzinfo=dateutil.tz.tzutc()) > endtime:
exit(0)
if next_run < datetime.datetime.utcnow():
continue
# Show disabled rules
if self.show_disabled_rules:
elastalert_logger.info("Disabled rules are: %s" % (str(self.get_disabled_rules())))
# Wait before querying again
sleep_duration = total_seconds(next_run - datetime.datetime.utcnow())
self.sleep_for(sleep_duration)
Что мы добавили?
Во-первых цикл с чтением правил, где получаем значение run_every и перенесли код шедулинга в этот цикл, так как там используется наш параметр
for rule in self.rules:
run_every = rule.get('run_every', self.run_every)
if isinstance(run_every, dict):
run_every = datetime.timedelta(**run_every
self.scheduler.add_job(self.handle_pending_alerts, 'interval',
seconds=run_every.total_seconds(), id=f'_internal_handle_pending_alerts{rule["name"]}')
self.scheduler.add_job(self.handle_config_change, 'interval',
seconds=run_every.total_seconds(), id=f'_internal_handle_config_change{rule["name"]}')
Во-вторых во всем данном модуле (elastalert.py) нужно заменить self.run_every на rule[‘run_every’].
Теперь при запуске мы берем данный параметр из конфига конкретного правила, а не из общего конфига. Тем самым мы можем использовать разный тайминг в конфиге разных правил.
Параметры run_every и timeframe в конфиге правила должны иметь одинаковое значение для получения корректного значение количества hits.