Алертинг по логам с elastalert

5
(1)

Существует инструмент для реализации алертинга на основе логов, называется elastalert. Он подходит для 5 и 6 elasticsearch, но по факту его можно успешно использовать с 7 и 8. Также на больших объемах логов (индексы в 10ки терабайт в сутки) мы столкнулись с проблемой, elastalert намертво зависает, без каких-либо ошибок и прочего.

Интеграция с 8 elasticsearch

Для интеграции с 8 еластиком, нужно всего лишь добавить одно поле в правило elastalert — doc_type: «»

YAML
name: "Too many 529 response code Android"
type: frequency
index: android-*
use_count_query: True
doc_type: ""
num_events: 3000
run_every:
  minutes: 3
timeframe:
  minutes: 3
filter:
  - term:
      response: "529"

При таком подходе, elastalert будет пытаться стянуть все поля, что есть в логах, из-за чего на больших объемах он зависает. Можно добавить параметр use_count_query: True, тогда будет возращаться небольшой json вида

JSON
[{'@timestamp': '2024-06-28T10:46:09.106546Z', 'num_hits': 17422, 'num_matches': 1}]

Работает реактивно быстро, без зависаний и отлично подходит для накопительных алертов, например в правиле выше, при нахождении 3000 и более 529 ошибок в индексе, за 3 минуты, мы получаем алерт. Все бы хорошо, но во-первых мы хотим видеть реальное кол-во ошибок за данный период, а по дефолту elastalert делает выборку за время, указанное в run_every основного конфиг файла, а не конфиг файла правила. Следовательно в json в поле num_hits мы видим общее кол-во логов за период из глобального параметра run_every, а не нашего параметра timeframe внутри правила. Казалось бы, решение простое, сделать глобально параметр run_every равный timeframe внутри каждых правил, тогда мы в алертинге увидим корректное кол-во ошибок за период, но это не гибко, правила разные, как и временные рамки. Собственно тут я приведу код, который это правит.

Правки в elastalert

Во-первых я решил переписать сборку на poetry

Файл pyproject.toml

TOML
[tool.poetry]
name = "elastalert"
version = "0.0.1"
description = "Custom elastalert"
authors = ["Quentin Long <qlo@yelp.com>", "Andrei Trusikhin <modern@for.project"]
include = ["schema.yaml", "es_mappings/**/*.json"]

[tool.poetry.dependencies]
python = ">=3.6,<3.8"
apscheduler = "^3.3.0"
aws-requests-auth = "^0.3.0"
blist = "^1.3.6"
boto3 = "^1.4.4"
configparser = "^3.5.0"
croniter = "^0.3.16"
elasticsearch = "==7.0.0"
envparse = "^0.2.0"
exotel = "^0.1.3"
jira = "^2.0.0"
jsonschema = "^3.0.2"
mock = "^2.0.0"
prison = "^0.1.2"
PyStaticConfiguration = "^0.10.3"
python-dateutil = ">=2.6.0,<2.7.0"
PyYAML = "^5.1"
requests = "^2.10.0"
"stomp.py" = "^4.1.17"
texttable = "^0.8.8"
twilio = ">=6.0.0,<6.1"
cffi = "^1.11.5"

[build-system]
requires = ["poetry-core>=1.0.0", "poetry-dynamic-versioning>=1.0.0,<2.0.0"]
build-backend = "poetry_dynamic_versioning.backend"

[[tool.poetry.source]]
name = "gitlab_elastalert"
url = "pypiregistry"

[tool.poetry.scripts]
elastalert-create-index = "elastalert.create_index:main"
elastalert-test-rule = "elastalert.test_rule:main"
elastalert-rule-from-kibana = "elastalert.rule_from_kibana:main"
elastalert = "elastalert.elastalert:main"

В CI я использую небольшой bash скрипт для релиза пакета, собственно для него и нужен poetry_dynamic_version

Bash
#!/bin/bash

function pre_init() {
    apt-get update && apt-get install git python3-pip -y
    pip3.10 install -r requirements.txt
    poetry config certificates.gitlab_elastalert.cert /usr/local/share/ca-certificates/rca.crt
    poetry self add "poetry-dynamic-versioning[plugin]"
}

function build_and_publish() {
    poetry env use python3.7
    poetry install
    poetry build
    poetry publish --repository gitlab_elastalert -u $CI_DEPLOY_USER -p $CI_DEPLOY_PASSWORD
}

function get_version() {
    cat $PYPROJECT_CONFIG | grep -w "version" | awk '{print $3}'|tr -d '"'
}

function git_fetch() {
    git config --global --add safe.directory '*'
    git remote set-url origin "https://oauth2:$BUMP_TOKEN@$CI_SERVER_HOST/$CI_PROJECT_PATH.git"
    git config --global user.name $GITLAB_USER_NAME
    git config --global user.email $GITLAB_USER_EMAIL
    git fetch
    git checkout $CI_COMMIT_REF_NAME
    git reset --hard "origin/$CI_COMMIT_REF_NAME"
}

function git_commit() {
    git commit -am "CI. Version changed $1 -> $2"
    git push origin $CI_COMMIT_REF_NAME
}

function main() {
    pre_init
    git_fetch
    CURRENT_VERSION=$(get_version)
    poetry version $1
    NEW_VERSION=$(get_version)
    echo -e "\033[0;32mCURRENT_VERSION: $CURRENT_VERSION\nNEW_VERSION: $NEW_VERSION\e[0m"
    build_and_publish
    git_commit "${CURRENT_VERSION}" "${NEW_VERSION}"
}

if [[ $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH ]];then
    main "patch"
else
    main "prerelease"
fi;

Логика тут простая. При комите не в DEFAULT_BRANCH в pyproject.toml в поле version увеличиться патч версия на одно значение вперед, а также добавить число-буквенное значение. При следующих комитах число-буквенное значение будет менять и следовательно будет легко установить любую тестовую версию. При комите в DEFAULT_BRANCH то, число-буквенное значение отбрасывается и остается только уже увеличенная патч версия.

Кусок из gitlab-ci

YAML
release:
  stage: release
  variables:
    PYPROJECT_CONFIG: "pyproject.toml"
  image: python3.7
  script:
   - ./release.sh
  rules:
    - { when: manual }

Касательно правок в самом коде. Нам нужно, чтобы run_every параметр можно было считывать из конфига самого правила и чтобы он учитывался для каждого созданного правила.

В elastalert.py в методе start приводим код к следующему виду:

YAML
def start(self):
    """ Periodically go through each rule and run it """
    if self.starttime:
        if self.starttime == 'NOW':
            self.starttime = ts_now()
        else:
            try:
                self.starttime = ts_to_dt(self.starttime)
            except (TypeError, ValueError):
                self.handle_error("%s is not a valid ISO8601 timestamp (YYYY-MM-DDTHH:MM:SS+XX:00)" % (self.starttime))
                exit(1)
    for rule in self.rules:
        rule['initial_starttime'] = self.starttime
    self.wait_until_responsive(timeout=self.args.timeout)
    self.running = True
    elastalert_logger.info("Starting up")
    for rule in self.rules:
        run_every = rule.get('run_every', self.run_every)
        if isinstance(run_every, dict):
            run_every = datetime.timedelta(**run_every)
        self.scheduler.add_job(self.handle_pending_alerts, 'interval',
                           seconds=run_every.total_seconds(), id=f'_internal_handle_pending_alerts{rule["name"]}')
        self.scheduler.add_job(self.handle_config_change, 'interval',
                           seconds=run_every.total_seconds(), id=f'_internal_handle_config_change{rule["name"]}')
    self.scheduler.start()
    while self.running:
        next_run = datetime.datetime.utcnow() + run_every
        # Quit after end_time has been reached
        if self.args.end:
            endtime = ts_to_dt(self.args.end)
            if next_run.replace(tzinfo=dateutil.tz.tzutc()) > endtime:
                exit(0)
        if next_run < datetime.datetime.utcnow():
            continue
        # Show disabled rules
        if self.show_disabled_rules:
            elastalert_logger.info("Disabled rules are: %s" % (str(self.get_disabled_rules())))
        # Wait before querying again
        sleep_duration = total_seconds(next_run - datetime.datetime.utcnow())
        self.sleep_for(sleep_duration)

Что мы добавили?

Во-первых цикл с чтением правил, где получаем значение run_every и перенесли код шедулинга в этот цикл, так как там используется наш параметр

YAML
for rule in self.rules:
    run_every = rule.get('run_every', self.run_every)
    if isinstance(run_every, dict):
        run_every = datetime.timedelta(**run_every
    self.scheduler.add_job(self.handle_pending_alerts, 'interval',
                       seconds=run_every.total_seconds(), id=f'_internal_handle_pending_alerts{rule["name"]}')
    self.scheduler.add_job(self.handle_config_change, 'interval',
                       seconds=run_every.total_seconds(), id=f'_internal_handle_config_change{rule["name"]}')

Во-вторых во всем данном модуле (elastalert.py) нужно заменить self.run_every на rule[‘run_every’].

Теперь при запуске мы берем данный параметр из конфига конкретного правила, а не из общего конфига. Тем самым мы можем использовать разный тайминг в конфиге разных правил.

Параметры run_every и timeframe в конфиге правила должны иметь одинаковое значение для получения корректного значение количества hits.

Насколько статья полезна?

Нажмите на звезду, чтобы оценить!

Средняя оценка 5 / 5. Количество оценок: 1

Оценок пока нет. Поставьте оценку первым.

Оставить комментарий