Модули deny, if и assign
Модули deny, if и assign реализованы на базе единого процессорного движка с различной семантикой для разных сценариев использования. Все три модуля используют одинаковый синтаксис конфигурации и поддерживают как регулярные выражения, так и CEL-выражения.
Обзор модулей
| Модуль | Назначение | Поведение при совпадении условия | Поведение при несовпадении |
|---|---|---|---|
deny | Фильтрация (отбрасывание) событий | Событие отбрасывается, обработка прекращается | Событие продолжает обработку |
if | Условная обработка | Событие продолжает обработку | Событие пропускается (не обрабатывается далее) |
assign | Обогащение полей события | Вычисленные значения присваиваются полям | — |
Модуль deny — Фильтрация событий
Модуль deny используется для отбрасывания нежелательных событий на этапе нормализации. Это позволяет исключить из анализа шумовые события, тестовые данные или события от определённых источников.
Поведение
Когда условия фильтра срабатывают:
- Последующие процессоры в пайплайне не выполняются
- Событие НЕ отправляется в SIEM
- Инкрементируется метрика Prometheus:
denied_events_total{plugin_id="<id>"}
Базовый синтаксис
processors:
- module: deny
regex:
- name: "ИмяПоля"
value: "регулярное_выражение"
cel:
- "CEL-выражение возвращающее boolean"
Для модуля deny обязательно наличие хотя бы одного правила фильтрации (regex или cel).
Пример: Фильтрация HTTP-логов
processors:
# Сначала парсим событие
- module: grok
from: Raw
patterns:
HTTP_ANSWER: "%{IP:ECS.Client.IP} - - [%{GREEDYDATA:ECS.Geo.Timezone}] %{GREEDYDATA}"
# Отбрасываем события по условиям
- module: deny
# Фильтрация по регулярному выражению
regex:
- name: "ECS.Geo.Timezone"
value: "([0-3][0-9]/Apr/20[0-2][0-9]:[0-2][0-9]:[0-5][0-9]:[0-5][0-9])"
# Фильтрация по CEL-выражению
cel:
- 'event["ECS.Client.IP"] == "127.0.0.1"'
# Этот процессор не выполнится для отфильтрованных событий
- module: grok
from: Raw
patterns:
HTTP_ANSWER: "%{GREEDYDATA}] %{WORD:ECS.HTTP.RequestMethod} %{NUMBER:ECS.Event.Code}"
Модуль if — Условная обработка
Модуль if является логической инверсией deny. Он позволяет продолжить обработку события только если условия выполняются. Используется для создания условных ветвей обработки.
Поведение
- Условие выполнено → событие продолжает обработку
- Условие не выполнено → событие пропускается (не обрабатывается последующими процессорами)
Ветвление обработки
Модуль if позволяет выстраивать ветвление в процессе обработки событий. Под веткой понимается один элемент плагина коллектора в формате Processors YAML. Можно создавать несколько элементов плагина (веток) и обрабатывать одно событие разными плагинами:
- ветка для обработки событий Suricata
- ветка для обработки логов NGINX
- ветка для обработки логов Zeek
- ветка для обработки событий Auditd
На входе в каждую ветку верхним элементом yaml-файла processors можно поставить модуль if, который будет проверять сигнатуру события и разрешать обработку в данной ветке лишь событий данного формата. Таким образом можно направлять в один коллектор события с различных источников и обрабатывать их параллельно без конфликтов.
Пример: Условная нормализация
processors:
# Применяем нормализацию только для событий Mikrotik
- module: if
regex:
- name: "Raw"
value: "^system,info,account"
cel:
- 'event.as_string("ECS.Observer.Vendor") == "Mikrotik"'
# Этот процессор выполнится только для событий Mikrotik
- module: assign
assign:
- target: ECS.Event.Category
cel: "'authentication'"
Модуль assign — Обогащение событий
Модуль assign вычисляет значения с помощью CEL-выражений и присваивает результаты указанным полям события. В отличие от deny и if, модуль assign всегда пропускает событие дальше по пайплайну.
Базовый синтаксис
processors:
- module: assign
regex:
- name: "#имя_переменной"
value: "регулярное_выражение"
assign:
- target: ECS.ЦелевоеПоле
cel: "CEL-выражение"
Особенности
- Блок условий (
regex/celдля фильтрации) не требуется — модуль assign всегда пропускает событие - Если CEL-выражение завершается ошибкой, присваивание пропускается, но событие не блокируется
- Можно определять вспомогательные регулярные выражения в блоке
regex(имена начинающиеся с#— это переменные) - Порядок выполнения
assignправил в одном блоке не гарантирован (начиная с v4.5) - Для последовательной обработки используйте несколько блоков
assign
Практические примеры: Нормализация событий Mikrotik
Рассмотрим реальные примеры нормализации событий аутентификации от маршрутизаторов Mikrotik. Эти примеры демонстрируют различные подходы к извлечению данных из событий безопасности.
Входные данные
Типичные события аутентификации Mikrotik имеют следующий формат:
system,info,account user admin logged in from 192.168.1.100 via ssh
system,info,account user admin authentication failed from 10.0.0.50 via ssh
Пример 1: Монолитный подход с повторяющимися выражениями
Этот подход использует повторяющиеся CEL-выражения для извлечения каждого поля. Подходит для простых случаев, но приводит к многократному выполнению одного и того же регулярного выражения.
processors:
- module: assign
regex:
# Предварительно скомпилированные регулярные выражения
- name: "#mikrotik_auth_success"
value: "^system,info,account user \\w+ logged in from .* via ssh$"
- name: "#mikrotik_auth_failure"
value: "^system,info,account user \\w+ authentication failed from .* via ssh$"
- name: "#mikrotik_auth_event_signature"
value: "^system,info,account user \\w+ (authentication failed|logged in) from .* via ssh$"
- name: "#mikrotik_auth_extract"
value: "^system,info,account user (\\w+) (authentication failed|logged in) from (.*) via (ssh)$"
assign:
# Извлечение IP-адреса источника (группа захвата [3])
- target: ECS.Source.IP
cel: |
size(
event.as_string("Raw")
.re_find_submatch('^system,info,account user (\\w+) (authentication failed|logged in) from (.*) via (ssh)$')
) > 4
?
event.as_string("Raw")
.re_find_submatch('^system,info,account user (\\w+) (authentication failed|logged in) from (.*) via (ssh)$')[3]
: ''
# Извлечение имени пользователя (группа захвата [1])
- target: ECS.Source.User.Name
cel: |
size(
event.as_string("Raw")
.re_find_submatch('^system,info,account user (\\w+) (authentication failed|logged in) from (.*) via (ssh)$')
) > 4
?
event.as_string("Raw")
.re_find_submatch('^system,info,account user (\\w+) (authentication failed|logged in) from (.*) via (ssh)$')[1]
: ''
# Определение результата события (success/failure)
- target: ECS.Event.Outcome
cel: |
(event.vars("ECS.Event.Outcome") == null || event.as_string("ECS.Event.Outcome") == 'unknown')
? (
event.as_string("Raw").startsWith('system,info,account user') ?
(
event.as_string("Raw").re_match('^system,info,account user \\w+ logged in from .* via ssh$')
?
'success'
:
(
event.as_string("Raw")
.re_match('^system,info,account user \\w+ authentication failed from .* via ssh$')
?
'failure'
:
'unknown'
)
)
: 'unknown'
)
: (
event.as_string("Raw").startsWith('system,info,account user') ?
(
event.as_string("Raw").re_match('^system,info,account user \\w+ logged in from .* via ssh$')
?
'success'
:
(
event.as_string("Raw")
.re_match('^system,info,account user \\w+ authentication failed from .* via ssh$')
?
'failure'
:
event.vars("ECS.Event.Outcome")
)
)
: event.vars("ECS.Event.Outcome")
)
# Добавление категории события
- target: ECS.Event.Category
cel: |
event.as_string("Raw").re_match('^system,info,account user \\w+ (authentication failed|logged in) from .* via ssh$')
?
(
event.vars("ECS.Event.Category") == null
?
'authentication'
:
event.as_string("ECS.Event.Category")+',authentication'
)
:
event.vars("ECS.Event.Category")
# Тип события
- target: ECS.Event.Type
cel: |
event.vars("ECS.Event.Type") == null
?
(
event.as_string("Raw").re_match('^system,info,account user \\w+ (authentication failed|logged in) from .* via ssh$')
?
'info'
:
'unknown'
)
:
event.vars("ECS.Event.Type")
Для события system,info,account user admin logged in from 192.168.1.100 via ssh:
ECS.Source.IP=192.168.1.100ECS.Source.User.Name=adminECS.Event.Outcome=successECS.Event.Category=authenticationECS.Event.Type=info
Пример 2: Пайплайн-подход с промежуточными переменными
Более эффективный подход: сначала извлекаем все части события в промежуточную переменную, затем используем её для заполнения полей. Это избавляет от повторного выполнения регулярного выражения.
processors:
# ШАГ 1: Извлечение частей события в промежуточную переменную
- module: assign
regex:
- name: "#mikrotik_auth_extract"
value: "^system,info,account user (\\w+) (authentication failed|logged in) from (.*) via (ssh)$"
assign:
- target: Normalizer.Mikrotik.Auth.Parts
cel: event.as_string("Raw").re_find_submatch('^system,info,account user (\\w+) (authentication failed|logged in) from (.*) via (ssh)$')
# ШАГ 2: Заполнение ECS-полей из промежуточной переменной
# Важно: в v4.5 порядок assign правил в одном блоке не гарантирован
- module: assign
assign:
# Имя пользователя (группа [1])
- target: ECS.Source.User.Name
cel: |
type(event.vars('Normalizer.Mikrotik.Auth.Parts')) == list
?
event.vars('Normalizer.Mikrotik.Auth.Parts')[?1].orValue(event.as_string("ECS.Source.User.Name"))
:
event.as_string("ECS.Source.User.Name")
# IP-адрес источника (группа [3])
- target: ECS.Source.IP
cel: |
type(event.vars('Normalizer.Mikrotik.Auth.Parts')) == list
?
event.vars('Normalizer.Mikrotik.Auth.Parts')[?3].orValue(event.as_string("ECS.Source.IP"))
:
event.as_string("ECS.Source.IP")
# Сетевой протокол (группа [4])
- target: ECS.Network.Protocol
cel: |
type(event.vars('Normalizer.Mikrotik.Auth.Parts')) == list
?
event.vars('Normalizer.Mikrotik.Auth.Parts')[?4].orValue(event.as_string("ECS.Network.Protocol"))
:
event.as_string("ECS.Network.Protocol")
# Результат события (success/failure) на основе группы [2]
- target: ECS.Event.Outcome
cel: |
type(event.vars('Normalizer.Mikrotik.Auth.Parts')) == list
?
cel.bind(x, event.vars('Normalizer.Mikrotik.Auth.Parts')[?2].orValue(''),
x == 'logged in' ? 'success'
: ( x == 'authentication failed' ? 'failure' : event.as_string("ECS.Event.Outcome") )
)
:
event.as_string("ECS.Event.Outcome")
# Категория события
- target: ECS.Event.Category
cel: |
type(event.vars('Normalizer.Mikrotik.Auth.Parts')) == list
?
( event.vars("ECS.Event.Category") == null ? 'authentication'
: event.as_string("ECS.Event.Category")+',authentication'
)
:
event.vars("ECS.Event.Category")
# Информация о производителе оборудования
- target: ECS.Observer.Vendor
cel: |
type(event.vars('Normalizer.Mikrotik.Auth.Parts')) == list
?
'Mikrotik'
:
event.vars("ECS.Observer.Vendor")
- target: ECS.Observer.Product
cel: |
type(event.vars('Normalizer.Mikrotik.Auth.Parts')) == list
?
'Mikrotik'
:
event.vars("ECS.Observer.Product")
# Тип события
- target: ECS.Event.Type
cel: |
type(event.vars('Normalizer.Mikrotik.Auth.Parts')) == list
?
(event.vars("ECS.Event.Type") == null ? 'info' : event.as_string("ECS.Event.Type")+',info')
:
event.as_string("ECS.Event.Type")
# Тип наблюдателя
- target: ECS.Observer.Type
cel: |
type(event.vars('Normalizer.Mikrotik.Auth.Parts')) == list
?
'firewall'
:
event.vars("ECS.Observer.Type")
# ШАГ 3: Очистка промежуточной переменной
- module: assign
assign:
- target: Normalizer.Mikrotik.Auth.Parts
cel: "'_empty'"
- Производительность: регулярное выражение выполняется только один раз
- Читаемость: логика извлечения отделена от логики присваивания
- Расширяемость: легко добавлять новые поля из тех же захваченных групп
- Обогащение: добавляется информация о производителе (
ECS.Observer.*)
Для события system,info,account user admin authentication failed from 10.0.0.50 via ssh:
ECS.Source.IP=10.0.0.50ECS.Source.User.Name=adminECS.Network.Protocol=sshECS.Event.Outcome=failureECS.Event.Category=authenticationECS.Observer.Vendor=MikrotikECS.Observer.Product=MikrotikECS.Observer.Type=firewallECS.Event.Type=info
Конфигурация
Структура конфигурации модулей
Поля regex, cel, assign, globals и xml-xsd задаются на уровне конфигурации модуля:
processors:
- module: deny|if|assign
# Глобальные переменные для CEL (опционально)
globals:
threshold: 100
allowed_hosts:
- "server1"
- "server2"
# XML Schema для decode_xml() (опционально)
xml-xsd:
order: |
<?xml version="1.0" encoding="UTF-8"?>
<xs:schema ...>...</xs:schema>
# Регулярные выражения
regex:
- name: "ИмяПоля"
value: "паттерн"
# CEL-выражения (возвращают boolean)
cel:
- "выражение1"
- "выражение2"
# Присваивания (только для assign)
assign:
- target: "ЦелевоеПоле"
cel: "выражение"
celиassignнельзя использовать в одном блоке конфигурации- Для
denyиifобязательно хотя бы одно правило (regexилиcel) - Несколько
celвыражений объединяются логическим ИЛИ
Блок when — внешние условия
Блок when — это отдельная система условий, доступная для всех процессоров (не только deny/if/assign). Подробная документация блока when доступна на отдельной странице.
Поддерживаемые поля для regex
Базовые поля события
| Поле | Описание |
|---|---|
Raw | Исходное (сырое) событие |
Producer | Идентификатор источника |
PluginIDs | Список ID плагинов |
FwdIP | IP-адрес отправителя |
FwdPort | Порт отправителя |
CollectorID | ID коллектора |
CollectorType | Тип коллектора |
SetupID | ID настройки |
CTime | Время создания |
WTime | Время записи |
GenerationTime | Время генерации |
TenantID | ID тенанта |
Key | Ключ события |
AggregationName | Метка агрегации |
SL | Метка безопасности |
Count | Число агрегированных событий |
Пользовательские поля
Любое поле из пространства ECS.* и других пользовательских полей.
Поле AssetIPs пустое в момент выполнения deny — оно заполняется позже в пайплайне. Для фильтрации по IP-адресу источника используйте FwdIP.
# НЕПРАВИЛЬНО — AssetIPs пуст на этапе deny
- module: deny
regex:
- name: "AssetIPs"
value: "10\\.0\\.0\\."
# ПРАВИЛЬНО — используйте FwdIP
- module: deny
cel:
- 'event.FwdIP.in_cidr("10.0.0.0/8")'
Метрики
При фильтрации событий модулем deny инкрементируется Prometheus-метрика:
denied_events_total{plugin_id="<id>"}
Эта метрика позволяет:
- Отслеживать эффективность фильтрации
- Обнаруживать аномалии в потоке событий
- Настраивать алерты на изменение rate фильтрации
Ограничения
- Взаимоисключающие опции: нельзя использовать
celиassignв одном блоке конфигурации - Порядок assign: начиная с v4.5, порядок выполнения правил
assignв одном блоке не гарантирован - AssetIPs: недоступен для фильтрации в
deny(используйтеFwdIP) - Обязательные правила: для
denyиifтребуется хотя бы одно правило фильтрации
Рекомендации по использованию
Когда использовать deny
- Отфильтровывание шумовых событий (heartbeat, keepalive)
- Исключение тестовых источников из анализа
- Блокировка событий от определённых IP-адресов
Когда использовать if
- Условная обработка разных типов событий
- Создание ветвей нормализации для разных форматов
- Применение специфичной логики только к определённым событиям
Когда использовать assign
- Обогащение событий вычисляемыми полями
- Извлечение данных из сырых событий
- Преобразование форматов полей
- Заполнение ECS-полей из проприетарных форматов
Оптимизация производительности
- Используйте пайплайн-подход с промежуточными переменными для сложных извлечений
- Размещайте
denyправила в начале пайплайна для раннего отсева событий - Используйте предварительно скомпилированные regex (блок
regex:с именами на#) - Для последовательных зависимостей используйте несколько блоков
assign
CEL-функции
Все три модуля используют Common Expression Language (CEL) для выражений. CEL предоставляет безопасный и эффективный язык для вычислений над полями событий.
Основные возможности
| Категория | Примеры функций |
|---|---|
| Доступ к полям | event.vars("field"), event.as_string("field"), event["field"] |
| Регулярные выражения | re_match(), re_find_submatch(), re_replace_all() |
| Строки | to_lower(), split(), replace_all(), trim(), has_prefix() |
| Хеширование | sha256(), md5(), sha1(), hmac() |
| JSON/XML | decode_json(), jsonpath_get(), decode_xml() |
| IP-адреса | in_cidr() |
| Время | now(), format(), parse_time() |
| Коллекции | keys(), values(), zip(), min(), max() |
| Обработка ошибок | try(), is_error() |
Полный перечень всех доступных функций с сигнатурами и примерами использования см. в Справочнике CEL-функций.