Перейти к основному содержимому
Версия: 4.6.X

Модули deny, if и assign

Единый движок обработки

Модули deny, if и assign реализованы на базе единого процессорного движка с различной семантикой для разных сценариев использования. Все три модуля используют одинаковый синтаксис конфигурации и поддерживают как регулярные выражения, так и CEL-выражения.

Обзор модулей

МодульНазначениеПоведение при совпадении условияПоведение при несовпадении
denyФильтрация (отбрасывание) событийСобытие отбрасывается, обработка прекращаетсяСобытие продолжает обработку
ifУсловная обработкаСобытие продолжает обработкуСобытие пропускается (не обрабатывается далее)
assignОбогащение полей событияВычисленные значения присваиваются полям

Модуль deny — Фильтрация событий

Модуль deny используется для отбрасывания нежелательных событий на этапе нормализации. Это позволяет исключить из анализа шумовые события, тестовые данные или события от определённых источников.

Поведение

Когда условия фильтра срабатывают:

  • Последующие процессоры в пайплайне не выполняются
  • Событие НЕ отправляется в SIEM
  • Инкрементируется метрика Prometheus: denied_events_total{plugin_id="<id>"}

Базовый синтаксис

processors:
- module: deny
regex:
- name: "ИмяПоля"
value: "регулярное_выражение"
cel:
- "CEL-выражение возвращающее boolean"
Важно

Для модуля deny обязательно наличие хотя бы одного правила фильтрации (regex или cel).

Пример: Фильтрация HTTP-логов

processors:
# Сначала парсим событие
- module: grok
from: Raw
patterns:
HTTP_ANSWER: "%{IP:ECS.Client.IP} - - [%{GREEDYDATA:ECS.Geo.Timezone}] %{GREEDYDATA}"

# Отбрасываем события по условиям
- module: deny
# Фильтрация по регулярному выражению
regex:
- name: "ECS.Geo.Timezone"
value: "([0-3][0-9]/Apr/20[0-2][0-9]:[0-2][0-9]:[0-5][0-9]:[0-5][0-9])"
# Фильтрация по CEL-выражению
cel:
- 'event["ECS.Client.IP"] == "127.0.0.1"'

# Этот процессор не выполнится для отфильтрованных событий
- module: grok
from: Raw
patterns:
HTTP_ANSWER: "%{GREEDYDATA}] %{WORD:ECS.HTTP.RequestMethod} %{NUMBER:ECS.Event.Code}"

Модуль if — Условная обработка

Модуль if является логической инверсией deny. Он позволяет продолжить обработку события только если условия выполняются. Используется для создания условных ветвей обработки.

Поведение

  • Условие выполнено → событие продолжает обработку
  • Условие не выполнено → событие пропускается (не обрабатывается последующими процессорами)

Ветвление обработки

Модуль if позволяет выстраивать ветвление в процессе обработки событий. Под веткой понимается один элемент плагина коллектора в формате Processors YAML. Можно создавать несколько элементов плагина (веток) и обрабатывать одно событие разными плагинами:

mermaid

  • ветка для обработки событий Suricata
  • ветка для обработки логов NGINX
  • ветка для обработки логов Zeek
  • ветка для обработки событий Auditd

На входе в каждую ветку верхним элементом yaml-файла processors можно поставить модуль if, который будет проверять сигнатуру события и разрешать обработку в данной ветке лишь событий данного формата. Таким образом можно направлять в один коллектор события с различных источников и обрабатывать их параллельно без конфликтов.

Пример: Условная нормализация

processors:
# Применяем нормализацию только для событий Mikrotik
- module: if
regex:
- name: "Raw"
value: "^system,info,account"
cel:
- 'event.as_string("ECS.Observer.Vendor") == "Mikrotik"'

# Этот процессор выполнится только для событий Mikrotik
- module: assign
assign:
- target: ECS.Event.Category
cel: "'authentication'"

Модуль assign — Обогащение событий

Модуль assign вычисляет значения с помощью CEL-выражений и присваивает результаты указанным полям события. В отличие от deny и if, модуль assign всегда пропускает событие дальше по пайплайну.

Базовый синтаксис

processors:
- module: assign
regex:
- name: "#имя_переменной"
value: "регулярное_выражение"
assign:
- target: ECS.ЦелевоеПоле
cel: "CEL-выражение"

Особенности

  • Блок условий (regex/cel для фильтрации) не требуется — модуль assign всегда пропускает событие
  • Если CEL-выражение завершается ошибкой, присваивание пропускается, но событие не блокируется
  • Можно определять вспомогательные регулярные выражения в блоке regex (имена начинающиеся с # — это переменные)
  • Порядок выполнения assign правил в одном блоке не гарантирован (начиная с v4.5)
  • Для последовательной обработки используйте несколько блоков assign

Практические примеры: Нормализация событий Mikrotik

Рассмотрим реальные примеры нормализации событий аутентификации от маршрутизаторов Mikrotik. Эти примеры демонстрируют различные подходы к извлечению данных из событий безопасности.

Входные данные

Типичные события аутентификации Mikrotik имеют следующий формат:

system,info,account user admin logged in from 192.168.1.100 via ssh
system,info,account user admin authentication failed from 10.0.0.50 via ssh

Пример 1: Монолитный подход с повторяющимися выражениями

Этот подход использует повторяющиеся CEL-выражения для извлечения каждого поля. Подходит для простых случаев, но приводит к многократному выполнению одного и того же регулярного выражения.

assign-mikrotik.yaml
processors:
- module: assign
regex:
# Предварительно скомпилированные регулярные выражения
- name: "#mikrotik_auth_success"
value: "^system,info,account user \\w+ logged in from .* via ssh$"
- name: "#mikrotik_auth_failure"
value: "^system,info,account user \\w+ authentication failed from .* via ssh$"
- name: "#mikrotik_auth_event_signature"
value: "^system,info,account user \\w+ (authentication failed|logged in) from .* via ssh$"
- name: "#mikrotik_auth_extract"
value: "^system,info,account user (\\w+) (authentication failed|logged in) from (.*) via (ssh)$"
assign:
# Извлечение IP-адреса источника (группа захвата [3])
- target: ECS.Source.IP
cel: |
size(
event.as_string("Raw")
.re_find_submatch('^system,info,account user (\\w+) (authentication failed|logged in) from (.*) via (ssh)$')
) > 4
?
event.as_string("Raw")
.re_find_submatch('^system,info,account user (\\w+) (authentication failed|logged in) from (.*) via (ssh)$')[3]
: ''

# Извлечение имени пользователя (группа захвата [1])
- target: ECS.Source.User.Name
cel: |
size(
event.as_string("Raw")
.re_find_submatch('^system,info,account user (\\w+) (authentication failed|logged in) from (.*) via (ssh)$')
) > 4
?
event.as_string("Raw")
.re_find_submatch('^system,info,account user (\\w+) (authentication failed|logged in) from (.*) via (ssh)$')[1]
: ''

# Определение результата события (success/failure)
- target: ECS.Event.Outcome
cel: |
(event.vars("ECS.Event.Outcome") == null || event.as_string("ECS.Event.Outcome") == 'unknown')
? (
event.as_string("Raw").startsWith('system,info,account user') ?
(
event.as_string("Raw").re_match('^system,info,account user \\w+ logged in from .* via ssh$')
?
'success'
:
(
event.as_string("Raw")
.re_match('^system,info,account user \\w+ authentication failed from .* via ssh$')
?
'failure'
:
'unknown'
)
)
: 'unknown'
)
: (
event.as_string("Raw").startsWith('system,info,account user') ?
(
event.as_string("Raw").re_match('^system,info,account user \\w+ logged in from .* via ssh$')
?
'success'
:
(
event.as_string("Raw")
.re_match('^system,info,account user \\w+ authentication failed from .* via ssh$')
?
'failure'
:
event.vars("ECS.Event.Outcome")
)
)
: event.vars("ECS.Event.Outcome")
)

# Добавление категории события
- target: ECS.Event.Category
cel: |
event.as_string("Raw").re_match('^system,info,account user \\w+ (authentication failed|logged in) from .* via ssh$')
?
(
event.vars("ECS.Event.Category") == null
?
'authentication'
:
event.as_string("ECS.Event.Category")+',authentication'
)
:
event.vars("ECS.Event.Category")

# Тип события
- target: ECS.Event.Type
cel: |
event.vars("ECS.Event.Type") == null
?
(
event.as_string("Raw").re_match('^system,info,account user \\w+ (authentication failed|logged in) from .* via ssh$')
?
'info'
:
'unknown'
)
:
event.vars("ECS.Event.Type")
Результат нормализации

Для события system,info,account user admin logged in from 192.168.1.100 via ssh:

  • ECS.Source.IP = 192.168.1.100
  • ECS.Source.User.Name = admin
  • ECS.Event.Outcome = success
  • ECS.Event.Category = authentication
  • ECS.Event.Type = info

Пример 2: Пайплайн-подход с промежуточными переменными

Более эффективный подход: сначала извлекаем все части события в промежуточную переменную, затем используем её для заполнения полей. Это избавляет от повторного выполнения регулярного выражения.

assign-mikrotik-pipeline.yaml
processors:
# ШАГ 1: Извлечение частей события в промежуточную переменную
- module: assign
regex:
- name: "#mikrotik_auth_extract"
value: "^system,info,account user (\\w+) (authentication failed|logged in) from (.*) via (ssh)$"
assign:
- target: Normalizer.Mikrotik.Auth.Parts
cel: event.as_string("Raw").re_find_submatch('^system,info,account user (\\w+) (authentication failed|logged in) from (.*) via (ssh)$')

# ШАГ 2: Заполнение ECS-полей из промежуточной переменной
# Важно: в v4.5 порядок assign правил в одном блоке не гарантирован
- module: assign
assign:
# Имя пользователя (группа [1])
- target: ECS.Source.User.Name
cel: |
type(event.vars('Normalizer.Mikrotik.Auth.Parts')) == list
?
event.vars('Normalizer.Mikrotik.Auth.Parts')[?1].orValue(event.as_string("ECS.Source.User.Name"))
:
event.as_string("ECS.Source.User.Name")

# IP-адрес источника (группа [3])
- target: ECS.Source.IP
cel: |
type(event.vars('Normalizer.Mikrotik.Auth.Parts')) == list
?
event.vars('Normalizer.Mikrotik.Auth.Parts')[?3].orValue(event.as_string("ECS.Source.IP"))
:
event.as_string("ECS.Source.IP")

# Сетевой протокол (группа [4])
- target: ECS.Network.Protocol
cel: |
type(event.vars('Normalizer.Mikrotik.Auth.Parts')) == list
?
event.vars('Normalizer.Mikrotik.Auth.Parts')[?4].orValue(event.as_string("ECS.Network.Protocol"))
:
event.as_string("ECS.Network.Protocol")

# Результат события (success/failure) на основе группы [2]
- target: ECS.Event.Outcome
cel: |
type(event.vars('Normalizer.Mikrotik.Auth.Parts')) == list
?
cel.bind(x, event.vars('Normalizer.Mikrotik.Auth.Parts')[?2].orValue(''),
x == 'logged in' ? 'success'
: ( x == 'authentication failed' ? 'failure' : event.as_string("ECS.Event.Outcome") )
)
:
event.as_string("ECS.Event.Outcome")

# Категория события
- target: ECS.Event.Category
cel: |
type(event.vars('Normalizer.Mikrotik.Auth.Parts')) == list
?
( event.vars("ECS.Event.Category") == null ? 'authentication'
: event.as_string("ECS.Event.Category")+',authentication'
)
:
event.vars("ECS.Event.Category")

# Информация о производителе оборудования
- target: ECS.Observer.Vendor
cel: |
type(event.vars('Normalizer.Mikrotik.Auth.Parts')) == list
?
'Mikrotik'
:
event.vars("ECS.Observer.Vendor")

- target: ECS.Observer.Product
cel: |
type(event.vars('Normalizer.Mikrotik.Auth.Parts')) == list
?
'Mikrotik'
:
event.vars("ECS.Observer.Product")

# Тип события
- target: ECS.Event.Type
cel: |
type(event.vars('Normalizer.Mikrotik.Auth.Parts')) == list
?
(event.vars("ECS.Event.Type") == null ? 'info' : event.as_string("ECS.Event.Type")+',info')
:
event.as_string("ECS.Event.Type")

# Тип наблюдателя
- target: ECS.Observer.Type
cel: |
type(event.vars('Normalizer.Mikrotik.Auth.Parts')) == list
?
'firewall'
:
event.vars("ECS.Observer.Type")

# ШАГ 3: Очистка промежуточной переменной
- module: assign
assign:
- target: Normalizer.Mikrotik.Auth.Parts
cel: "'_empty'"
Преимущества пайплайн-подхода
  1. Производительность: регулярное выражение выполняется только один раз
  2. Читаемость: логика извлечения отделена от логики присваивания
  3. Расширяемость: легко добавлять новые поля из тех же захваченных групп
  4. Обогащение: добавляется информация о производителе (ECS.Observer.*)
Результат нормализации

Для события system,info,account user admin authentication failed from 10.0.0.50 via ssh:

  • ECS.Source.IP = 10.0.0.50
  • ECS.Source.User.Name = admin
  • ECS.Network.Protocol = ssh
  • ECS.Event.Outcome = failure
  • ECS.Event.Category = authentication
  • ECS.Observer.Vendor = Mikrotik
  • ECS.Observer.Product = Mikrotik
  • ECS.Observer.Type = firewall
  • ECS.Event.Type = info

Конфигурация

Структура конфигурации модулей

Поля regex, cel, assign, globals и xml-xsd задаются на уровне конфигурации модуля:

processors:
- module: deny|if|assign
# Глобальные переменные для CEL (опционально)
globals:
threshold: 100
allowed_hosts:
- "server1"
- "server2"

# XML Schema для decode_xml() (опционально)
xml-xsd:
order: |
<?xml version="1.0" encoding="UTF-8"?>
<xs:schema ...>...</xs:schema>

# Регулярные выражения
regex:
- name: "ИмяПоля"
value: "паттерн"

# CEL-выражения (возвращают boolean)
cel:
- "выражение1"
- "выражение2"

# Присваивания (только для assign)
assign:
- target: "ЦелевоеПоле"
cel: "выражение"
Правила комбинирования
  • cel и assign нельзя использовать в одном блоке конфигурации
  • Для deny и if обязательно хотя бы одно правило (regex или cel)
  • Несколько cel выражений объединяются логическим ИЛИ

Блок when — внешние условия

Блок when — это отдельная система условий, доступная для всех процессоров (не только deny/if/assign). Подробная документация блока when доступна на отдельной странице.


Поддерживаемые поля для regex

Базовые поля события

ПолеОписание
RawИсходное (сырое) событие
ProducerИдентификатор источника
PluginIDsСписок ID плагинов
FwdIPIP-адрес отправителя
FwdPortПорт отправителя
CollectorIDID коллектора
CollectorTypeТип коллектора
SetupIDID настройки
CTimeВремя создания
WTimeВремя записи
GenerationTimeВремя генерации
TenantIDID тенанта
KeyКлюч события
AggregationNameМетка агрегации
SLМетка безопасности
CountЧисло агрегированных событий

Пользовательские поля

Любое поле из пространства ECS.* и других пользовательских полей.

AssetIPs недоступен для deny

Поле AssetIPs пустое в момент выполнения deny — оно заполняется позже в пайплайне. Для фильтрации по IP-адресу источника используйте FwdIP.

# НЕПРАВИЛЬНО — AssetIPs пуст на этапе deny
- module: deny
regex:
- name: "AssetIPs"
value: "10\\.0\\.0\\."

# ПРАВИЛЬНО — используйте FwdIP
- module: deny
cel:
- 'event.FwdIP.in_cidr("10.0.0.0/8")'

Метрики

При фильтрации событий модулем deny инкрементируется Prometheus-метрика:

denied_events_total{plugin_id="<id>"}

Эта метрика позволяет:

  • Отслеживать эффективность фильтрации
  • Обнаруживать аномалии в потоке событий
  • Настраивать алерты на изменение rate фильтрации

Ограничения

  1. Взаимоисключающие опции: нельзя использовать cel и assign в одном блоке конфигурации
  2. Порядок assign: начиная с v4.5, порядок выполнения правил assign в одном блоке не гарантирован
  3. AssetIPs: недоступен для фильтрации в deny (используйте FwdIP)
  4. Обязательные правила: для deny и if требуется хотя бы одно правило фильтрации

Рекомендации по использованию

Когда использовать deny

  • Отфильтровывание шумовых событий (heartbeat, keepalive)
  • Исключение тестовых источников из анализа
  • Блокировка событий от определённых IP-адресов

Когда использовать if

  • Условная обработка разных типов событий
  • Создание ветвей нормализации для разных форматов
  • Применение специфичной логики только к определённым событиям

Когда использовать assign

  • Обогащение событий вычисляемыми полями
  • Извлечение данных из сырых событий
  • Преобразование форматов полей
  • Заполнение ECS-полей из проприетарных форматов

Оптимизация производительности

  1. Используйте пайплайн-подход с промежуточными переменными для сложных извлечений
  2. Размещайте deny правила в начале пайплайна для раннего отсева событий
  3. Используйте предварительно скомпилированные regex (блок regex: с именами на #)
  4. Для последовательных зависимостей используйте несколько блоков assign

CEL-функции

Все три модуля используют Common Expression Language (CEL) для выражений. CEL предоставляет безопасный и эффективный язык для вычислений над полями событий.

Основные возможности

КатегорияПримеры функций
Доступ к полямevent.vars("field"), event.as_string("field"), event["field"]
Регулярные выраженияre_match(), re_find_submatch(), re_replace_all()
Строкиto_lower(), split(), replace_all(), trim(), has_prefix()
Хешированиеsha256(), md5(), sha1(), hmac()
JSON/XMLdecode_json(), jsonpath_get(), decode_xml()
IP-адресаin_cidr()
Времяnow(), format(), parse_time()
Коллекцииkeys(), values(), zip(), min(), max()
Обработка ошибокtry(), is_error()

Полный перечень всех доступных функций с сигнатурами и примерами использования см. в Справочнике CEL-функций.