Плагин aggregation
Плагин aggregation
Плагин aggregation позволяет объединять несколько событий в одно (агрегировать). В результате агрегации в суммарном событии можно выводить поля с помощью агрегирующих функций (first, last, distinct, avg, sum, min, max, custom). Агрегация также позволяет уменьшить поток событий, избавляясь от «шума».
Конфигурация
processors:
- module: aggregation
aggregation:
rule:
fields: ["Raw"]
window: 10s
min_events: 2
derive:
fields:
- deriving_data:
init_expr: '"hello aggregation"'
state_expr: 'agg_state'
finalize_expr: 'agg_state'
to: HelloAggregation
deriving_type: custom
condition:
equals:
Raw: "1"
keep-originals: false
| Параметр | Тип | По умолчанию | Описание |
|---|---|---|---|
rule | object | — | Правило агрегации (см. ниже) |
condition | object | — | Фильтр событий, подпадающих под агрегацию (опционально). Синтаксис соответствует блоку when |
keep-originals | bool | false | Сохранять ли оригинальные события, использовавшиеся для агрегации |
Правило агрегации
Блок rule определяет параметры окна агрегации, группировку и вывод полей.
| Параметр | Тип | Обязательный | Описание |
|---|---|---|---|
fields | array | нет | Массив полей группировки (см. ниже) |
window | duration | да | Размер временного окна агрегации ("5m", "1h" и т.п.) |
max_events | int | нет | Максимальное количество событий в окне. При достижении окно закрывается немедленно |
min_events | int | нет | Минимальное количество событий в окне. Агрегированное событие создаётся при закрытии окна, если накоплено не менее min_events событий |
additional_rules | array | нет | Дополнительные правила агрегации (см. ниже) |
derive | object | нет | Блок определения вывода полей для результата агрегации |
Хотя бы одно из полей max_events или min_events должно быть указано. Конфигурация с обоими значениями равными 0 (или не указанными) приведёт к ошибке.
Поля группировки
Агрегация поддерживает три режима группировки:
- Без группировки — агрегация по всем событиям (или по всем, прошедшим фильтр
condition) - По одному полю — аналогично классической агрегации
- По нескольким полям — события разделяются на группы по комбинации значений полей
Поле fields можно задать двумя способами:
Массив строк (параметр unnest считается равным false):
fields: ["ECS.Source.IP", "ECS.Event.Category"]
Массив объектов с управлением unnest:
fields:
- name: "ECS.Source.IP"
unnest: false
- name: "AssetIPs"
unnest: true
| Параметр | Тип | Описание |
|---|---|---|
name | string | Название поля |
unnest | bool | Если false — группировка по значению массива целиком. Если true — группировка отдельно по каждому элементу массива |
Условия завершения агрегации
Поведение определяется комбинацией max_events и min_events:
max_events | min_events | Поведение |
|---|---|---|
| > 0 | > 0 | Окно закрывается немедленно при max_events событиях или при истечении окна, если накоплено >= min_events |
| 0 | > 0 | Окно закрывается только при истечении окна, если накоплено >= min_events |
| > 0 | 0 | Окно закрывается при достижении max_events событий |
| 0 | 0 | Невалидная конфигурация — ошибка |
Дополнительные правила агрегации
Блок additional_rules — массив объектов, позволяющих определить дополнительные условия успешной агрегации (например, обнаружение всплеска количества событий).
additional_rules:
- window: 5s
max_events: 100
immediate_trigger: true
| Параметр | Тип | Обязательный | Описание |
|---|---|---|---|
window | duration | да | Размер дополнительного временного окна |
max_events | int | нет | Максимальное количество событий в дополнительном окне. При достижении — агрегация считается успешной при закрытии основного окна |
min_events | int | нет | Минимальное количество событий. При достижении на момент закрытия дополнительного окна — агрегация успешна |
immediate_trigger | bool | нет | Если true, основная агрегация завершается немедленно при успехе дополнительного правила. Если false — результат учитывается только при закрытии основного окна |
Вывод полей
Блок derive определяет, какие поля попадут в агрегированное событие и каким образом их значения будут вычислены.
Поля derive.fields
Массив объектов, каждый из которых определяет одно выходное поле:
| Параметр | Тип | Описание |
|---|---|---|
to | string | Название поля в результирующем событии |
deriving_type | string | Способ вывода (см. таблицу ниже) |
deriving_data | object | Данные для конкретного способа вывода |
Для типов first, last, distinct, avg, sum, min, max в блоке deriving_data необходимо указать поле from — название исходного поля.
Поля derive.all
Общее правило вывода полей, применяемое ко всем полям события:
| Параметр | Тип | Описание |
|---|---|---|
enable | bool | Активировать общий вывод полей |
overwrite | bool | Перезаписывать существующие значения полей |
deriving_type | string | Способ вывода: только first или last |
Доступные способы вывода (deriving_type)
| Тип | Описание | Применимость |
|---|---|---|
first | Значение из первого события | Все типы полей |
last | Значение из последнего события | Все типы полей |
distinct | Массив уникальных значений из всех событий | Все типы полей. Для полей-массивов применяется к элементам |
avg | Среднее значение | Только числовые поля |
sum | Сумма значений | Только числовые поля |
min | Минимальное значение | Только числовые поля |
max | Максимальное значение | Только числовые поля |
custom | Произвольный вывод на основе CEL-выражений | Все типы полей |
Пользовательский вывод (custom)
Способ custom позволяет создавать произвольный вывод полей на основе CEL-выражений, аналогично пользовательским агрегатным функциям в SQL.
При указании deriving_type: custom в deriving_data ожидаются следующие параметры:
| Параметр | Тип | Описание |
|---|---|---|
init_expr | string | CEL-выражение инициализации начального состояния агрегации |
state_expr | string | CEL-выражение функции перехода. Доступна переменная agg_state (текущее состояние) и все поля события |
finalize_expr | string | CEL-выражение финализации. Результат присваивается целевому полю |
disable_type_checking | bool | Отключить проверку типов в CEL-выражениях. Если true, agg_state имеет динамический тип |
Порядок выполнения при успешной агрегации:
- Вызывается
init_expr— определяет начальное значениеagg_state - Для каждого события в окне вызывается
state_expr. Результат становится новым значениемagg_state - Вызывается
finalize_expr. Результат присваивается целевому полю
Если disable_type_checking не указан или равен false, тип agg_state определяется результатом init_expr, а state_expr обязан возвращать тот же тип.
Фильтр событий
Фильтр задаётся опциональным блоком condition, синтаксис которого полностью соответствует блоку when. Если фильтр указан, под агрегацию подпадают только события, удовлетворяющие условию. Все прочие события игнорируют данный плагин агрегации.
Формирование событий
Возможные сценарии при активных плагинах агрегации:
| Сценарий | Результат |
|---|---|
| Событие не подходит ни под одно условие | Создаётся как есть, без агрегации |
Подходит, и хотя бы один плагин имеет keep-originals: true | Создаётся как есть и влияет на формирование агрегированного события |
Подходит, ни один плагин не имеет keep-originals: true | Только влияет на агрегированное событие. При провале агрегации (например, не набрано достаточно событий) оригинал создаётся |
Агрегированное событие содержит следующие поля:
| Поле | Описание |
|---|---|
| Поля группировки | Значения полей, по которым проводилась группировка |
ECS.Event.Start | Время генерации самого раннего события из агрегированных |
ECS.Event.End | Время генерации самого последнего события из агрегированных |
AssetIPs | Адреса всех активов из всех агрегированных событий |
Count | Количество агрегированных событий |
Producer | Значение Aggregator |
Поля из derive | Поля, определённые блоком rule.derive |
Пример
Агрегация событий по полю Raw с временным окном 10 секунд, минимум 2 события, с пользовательским выводом и фильтром:
processors:
- module: aggregation
aggregation:
rule:
fields: ["Raw"]
window: 10s
min_events: 2
derive:
fields:
# Стандартный вывод: первый IP-адрес источника
- deriving_data:
from: "ECS.Source.IP"
to: "ECS.Source.IP"
deriving_type: first
# Стандартный вывод: все уникальные имена пользователей
- deriving_data:
from: "ECS.User.Name"
to: "ECS.User.Name"
deriving_type: distinct
# Пользовательский вывод: конкатенация через CEL
- deriving_data:
init_expr: '"hello aggregation"'
state_expr: 'agg_state'
finalize_expr: 'agg_state'
to: HelloAggregation
deriving_type: custom
all:
enable: true
overwrite: false
deriving_type: first
condition:
equals:
Raw: "1"
keep-originals: false