Перейти к основному содержимому

OLAP-аналитика в KOMRAD SIEM 4.5: превращаем `events.custom` в витрину атрибут–значение и ищем аномалии логинов

· 9 мин. чтения
КОМРАД
Команда КОМРАД

Аннотация

KOMRAD SIEM 4.5 хранит события в ClickHouse (база komrad_events, таблица events), где пользовательские/расширенные поля чаще всего попадают в events.custom как “плоский” JSON‑словарь. Это удобно для ingestion и гибкости, но неудобно для последующего OLAP‑анализа: по одному JSON‑полю сложно строить быстрые срезы, сравнения “устройство/пользователь/время”, профили поведения и метрики качества телеметрии.

В этой статье мы показываем проверенный приём: нормализуем custom в пары атрибут–значение и кладём их в отдельную MergeTree‑витрину через Materialized View (безопасно, без POPULATE). На этой витрине решаем 3 практических кейса:

  1. аномальные логины + браузерный fingerprint (включая Fingerprint.UserAgentData.*),
  2. “невозможные” перемещения/резкая смена устройства,
  3. бот/скрипт‑активность по UserAgent/TLS‑параметрам.

Почему это важно именно для SIEM

У SIEM есть две особенности:

  • нагрузка бывает очень разной — от 5 EPS в пилоте до десятков/сотен тысяч EPS в промышленной эксплуатации;
  • запросы аналитиков меняются быстрее, чем схемы хранения “успевают” под них адаптироваться.

KOMRAD SIEM как продукт закрывает весь pipeline: от сбора (Syslog, SNMP, SQL, FTP, WMI, SFTP, SSH, xFlow, HTTP, eBPF) и нормализации (CEF, RFC 5424/3164, EVTX, regex RE2) до корреляции, инцидентов, виджетов визуального анализа и интеграций (ГосСОПКА API, Kafka, JetSignal и др.). Но чтобы аналитика “летала”, важно правильно подготовить витрины под OLAP в ClickHouse.

Сюжет (вымышленный, но очень реалистичный)

Представим заказчика: АО «Северо‑Уральский Приборостроительный Завод». Типичная инфраструктура: домен AD, VPN, терминальные серверы, несколько производственных сегментов, интеграции с отечественными СЗИ. KOMRAD SIEM 4.5 уже собирает события и хранит их в ClickHouse.

Проблема, с которой пришли инженеры ИБ:

  • “вчера ночью у нескольких сотрудников прошли успешные логины из непривычных браузеров/устройств”;
  • “в логе нет одного ‘правильного’ поля, всё разнесено по кастомным атрибутам”;
  • “хочется быстро строить срезы: пользователь → устройство/UA → IP → время, и делать это на больших объёмах”.

То, что почти всегда оказывается правдой в таких ситуациях: нужные данные уже есть… но лежат в custom “как придётся”.

Вход: komrad_events.events и events.custom

В komrad_events.events ключ события — композиция (key_time, key_roller, key_stream); это правильная основа для дальнейшей нормализации.

Поле customString с JSON. Внутри:

  • ключи уровня ECS (ECS.User.Name, ECS.Client.Address, ECS.UserAgent.Original, ECS.TLS.*, …);
  • поля источников и обогащения;
  • и отдельный “особый кейс”: Fingerprint.UserAgentData — JSON‑строка, внутри которой лежит JSON‑объект (string‑marshalled JSON).

Наша цель: получить из одного события N строк вида:

(key_time, key_roller, key_stream, tenant_id, attr, value_raw, value/typed_value…).

Мини-ETL: из JSON в пары (EAV) — без боли

Мы подготовили пакет SQL/скриптов, который можно использовать как:

  • ad‑hoc запросы (получить пары и дальше строить свои SELECT’ы);
  • постоянную OLAP‑витрину (таблица + MV + безопасный backfill).

Полный набор лежит рядом с этим постом:

  • sql/01_select_custom_pairs_flat.sql — пары attr/value из custom;
  • sql/02_select_custom_pairs_expand_fingerprint_useragentdata.sql — то же + раскрытие Fingerprint.UserAgentData.<k>;
  • sql/10..13_ddl_*.sql — таблицы под 4 режима (flat/expanded × single/typed);
  • sql/20..23_mv_*_safe.sql — MV без POPULATE, с отсечкой по w_time;
  • scripts/backfill_custom_kv.sh — backfill истории по партициям, с возобновлением.

Почему 4 режима — это не “сложно”, а “прагматично”

РежимЦенаПольза
flat + single-valueминимальнаябыстрый старт почти для всех
flat + typedвышечисловые агрегации/фильтры проще и быстрее
expanded + single-valueвышеудобно анализировать UA‑данные
expanded + typedмаксимальнаямаксимум удобства для аналитики

В SIEM‑эксплуатации “универсальной” схемы не бывает: мы всегда начинаем с дешёвого варианта и расширяем витрины по потребности.

Безопасное включение витрины (без POPULATE и без дублей)

Для больших EPS важно не перегрузить ClickHouse в момент включения MV и не получить дубли из‑за гонки “MV начал работать” vs “мы догружаем историю”.

Правильный рецепт:

1) фиксируем момент включения MV:

SELECT now() AS mv_start;

2) создаём целевую таблицу (например, flat + typed):

clickhouse-client --multiquery < sql/11_ddl_kv_flat_typed.sql

3) создаём MV с условием w_time >= mv_start (подставить mv_start в WITH ... AS mv_start):

clickhouse-client --multiquery < sql/21_mv_kv_flat_typed_safe.sql

4) backfill "истории" с w_time < mv_start:

./scripts/backfill_custom_kv.sh \
--database komrad_events \
--source-table events \
--mode flat_typed \
--cutoff-wtime 'YYYY-MM-DD HH:MM:SS' \
--max-threads 4 \
--max-insert-threads 2 \
--sleep 1

Итого: MV обслуживает новые события, скрипт догружает историю — без пересечения по w_time, значит без дублей.

Кейс 1. Аномальные логины + fingerprint браузера

Идея

Для успешных логинов хотим видеть:

  • пользователь (ECS.User.Name);
  • источник (ECS.Client.Address);
  • “отпечаток” (Fingerprint.Fingerprint и/или ECS.UserAgent.Original);
  • “сигнал смены платформы” из Fingerprint.UserAgentData.platform;
  • и находить:
    • новые устройства/UA у пользователя;
    • слишком много разных платформ/UA за короткое время;
    • логины с “редкими” сочетаниями параметров.

Пример запроса: у кого за сутки появилось много разных платформ

Ниже пример для витрины expanded + typed (удобно именно из‑за Fingerprint.UserAgentData.platform):

WITH
toDateTime(now() - INTERVAL 1 DAY) AS ts_from
SELECT
toDate(fromUnixTimestamp(key_time)) AS day,
tenant_id,
user,
count() AS events,
uniqExact(platform) AS uniq_platforms,
groupUniqArray(platform) AS platforms
FROM
(
SELECT
key_time, key_roller, key_stream, tenant_id,
anyIf(value_string, attr = 'ECS.User.Name') AS user,
anyIf(value_string, attr = 'Fingerprint.UserAgentData.platform') AS platform,
anyIf(value_string, attr = 'ECS.Event.Category') AS category,
anyIf(value_string, attr = 'ECS.Event.Outcome') AS outcome
FROM komrad_events.events_custom_kv_expanded_typed
WHERE key_time >= toUnixTimestamp(ts_from)
GROUP BY key_time, key_roller, key_stream, tenant_id
)
WHERE category = 'authentication'
AND outcome = 'success'
AND user != ''
AND platform != ''
GROUP BY day, tenant_id, user
HAVING uniq_platforms >= 3
ORDER BY uniq_platforms DESC, events DESC
LIMIT 50;

Интерпретация для SOC:

  • “3+ разных платформ за сутки” — это не всегда инцидент (часто это админы/VDI), но отличный триггер для обогащения правила корреляции (например, “новая платформа + новый IP + нет MFA”).

Вариант: “новый отпечаток у пользователя”

Если в custom есть Fingerprint.Fingerprint, можно строить “первое появление” устройства.

WITH
toDateTime(now() - INTERVAL 14 DAY) AS ts_from
SELECT
tenant_id,
user,
fingerprint,
min(fromUnixTimestamp(key_time)) AS first_seen,
max(fromUnixTimestamp(key_time)) AS last_seen,
count() AS logins
FROM
(
SELECT
key_time, key_roller, key_stream, tenant_id,
anyIf(value_string, attr = 'ECS.User.Name') AS user,
anyIf(value_string, attr = 'Fingerprint.Fingerprint') AS fingerprint,
anyIf(value_string, attr = 'ECS.Event.Category') AS category,
anyIf(value_string, attr = 'ECS.Event.Outcome') AS outcome
FROM komrad_events.events_custom_kv_expanded_typed
WHERE key_time >= toUnixTimestamp(ts_from)
GROUP BY key_time, key_roller, key_stream, tenant_id
)
WHERE category = 'authentication'
AND outcome = 'success'
AND user != ''
AND fingerprint != ''
GROUP BY tenant_id, user, fingerprint
ORDER BY first_seen DESC
LIMIT 200;

Дальше вы уже “накручиваете” контекст: это новый ноутбук сотрудника или компрометация учётки?

Кейс 2. “Невозможные перемещения” и резкая смена устройства

Идея

Классический “impossible travel” требует геолокации IP. В ClickHouse это обычно делается:

  • словарём (DICTIONARY) по GeoIP/MaxMind;
  • или внешней таблицей ip → (country, city, asn).

Но даже без географии уже можно ловить очень полезные паттерны:

  • один пользователь за короткое время логинится с разных подсетей;
  • один пользователь в течение часа меняет fingerprint несколько раз;
  • или один fingerprint “ходит” по разным пользователям (подозрение на shared token / бот / прокси).

Пример: “быстрая смена отпечатка у пользователя”

WITH
toDateTime(now() - INTERVAL 3 DAY) AS ts_from
SELECT
tenant_id,
user,
toStartOfHour(fromUnixTimestamp(key_time)) AS hour,
uniqExact(fingerprint) AS uniq_fingerprints,
groupUniqArray(fingerprint) AS fingerprints,
count() AS events
FROM
(
SELECT
key_time, key_roller, key_stream, tenant_id,
anyIf(value_string, attr = 'ECS.User.Name') AS user,
anyIf(value_string, attr = 'Fingerprint.Fingerprint') AS fingerprint,
anyIf(value_string, attr = 'ECS.Event.Category') AS category,
anyIf(value_string, attr = 'ECS.Event.Outcome') AS outcome
FROM komrad_events.events_custom_kv_flat_typed
WHERE key_time >= toUnixTimestamp(ts_from)
GROUP BY key_time, key_roller, key_stream, tenant_id
)
WHERE category = 'authentication'
AND outcome = 'success'
AND user != ''
AND fingerprint != ''
GROUP BY tenant_id, user, hour
HAVING uniq_fingerprints >= 3
ORDER BY uniq_fingerprints DESC, events DESC
LIMIT 100;

Практика показывает: “3+ устройства за час” — почти всегда повод открыть тикет хотя бы на проверку (или на уточнение бизнес‑процесса, если это VDI/терминалки).

Пример: “один отпечаток — много пользователей”

WITH
toDateTime(now() - INTERVAL 7 DAY) AS ts_from
SELECT
tenant_id,
fingerprint,
uniqExact(user) AS uniq_users,
groupUniqArray(user) AS users,
count() AS events
FROM
(
SELECT
key_time, key_roller, key_stream, tenant_id,
anyIf(value_string, attr = 'ECS.User.Name') AS user,
anyIf(value_string, attr = 'Fingerprint.Fingerprint') AS fingerprint,
anyIf(value_string, attr = 'ECS.Event.Category') AS category,
anyIf(value_string, attr = 'ECS.Event.Outcome') AS outcome
FROM komrad_events.events_custom_kv_flat_typed
WHERE key_time >= toUnixTimestamp(ts_from)
GROUP BY key_time, key_roller, key_stream, tenant_id
)
WHERE category = 'authentication'
AND outcome = 'success'
AND user != ''
AND fingerprint != ''
GROUP BY tenant_id, fingerprint
HAVING uniq_users >= 5
ORDER BY uniq_users DESC, events DESC
LIMIT 100;

Это полезно для поиска:

  • shared‑browser на “общих” рабочих местах;
  • плохо настроенных терминалов;
  • и да — иногда это признак автоматизации/бота, который логинится под разными учётками.

Кейс 3. Бот/скрипт‑активность по UserAgent и TLS

Идея

Для web‑логинов/порталов/шлюзов очень часто есть:

  • ECS.UserAgent.Original
  • ECS.TLS.Version, ECS.TLS.Cipher, ECS.TLS.Established, ECS.TLS.Resumed (на стороне источника)

Сигналы “бота” (в реальности — и атакующего, и внутренней автоматизации) часто выглядят так:

  • одна и та же пара (UA, TLS параметры) создаёт огромное число попыток;
  • редкий UA встречается только на атакующих IP;
  • всплеск попыток логина/запросов с одинаковым “профилем клиента”.

Пример: топ “профилей клиента” по количеству событий

WITH
toDateTime(now() - INTERVAL 1 DAY) AS ts_from
SELECT
tenant_id,
ua,
tls_version,
tls_cipher,
count() AS events,
uniqExact(src_ip) AS uniq_ips
FROM
(
SELECT
key_time, key_roller, key_stream, tenant_id,
anyIf(value_string, attr = 'ECS.UserAgent.Original') AS ua,
anyIf(value_string, attr = 'ECS.TLS.Version') AS tls_version,
anyIf(value_string, attr = 'ECS.TLS.Cipher') AS tls_cipher,
anyIf(value_string, attr = 'ECS.Client.Address') AS src_ip
FROM komrad_events.events_custom_kv_flat_typed
WHERE key_time >= toUnixTimestamp(ts_from)
GROUP BY key_time, key_roller, key_stream, tenant_id
)
WHERE ua != ''
GROUP BY tenant_id, ua, tls_version, tls_cipher
ORDER BY events DESC
LIMIT 50;

Дальше — дело аналитика:

  • выделить “нормальные” массовые профили (корпоративные прокси, VDI);
  • и искать “аномальные” (много событий, мало разных IP, редкий UA, и т.п.).

Что получает заказчик (и почему это продаёт SIEM)

На практике заказчик выигрывает в 3 измерениях:

  1. Время до ответа (TTI/TTD): многие аналитические вопросы закрываются SQL‑запросом за секунды/минуты, а не “подождите, мы добавим поле в парсер”.
  2. Гибкость: новые источники/СЗИ и их поля не требуют миграций схемы событий — достаточно писать правила нормализации, а витрина custom_kv уже готова принять новые атрибуты.
  3. Масштабирование: ClickHouse хорошо масштабируется горизонтально, а подход с витринами позволяет перенести часть нагрузки с “тяжёлых” JSON‑операций в предрасчитанные пары.

Практические рекомендации (если вы реально будете запускать это на 100k EPS)

  • Начинайте с flat + single-value или flat + typed. expanded включайте точечно (или делайте отдельную витрину только под нужные домены).
  • Для включения используйте безопасную схему “MV по w_time >= mv_start + backfill w_time < mv_start”.
  • Включайте throttling в backfill (--sleep, --max-threads) и запускайте на окнах низкой нагрузки.
  • В OLAP‑запросах старайтесь:
    • сначала ограничивать время (key_time/партиции),
    • затем агрегировать на ключ события,
    • и только потом делать HAVING/ORDER BY по сложным условиям.

Заключение

events.custom даёт гибкость, но OLAP любит структуру. В KOMRAD SIEM 4.5 обе стороны можно совместить: хранить события “как есть” (с ECS и расширениями), а для аналитики — поддерживать витрину attribute/value пар в ClickHouse.

Если вы хотите, чтобы мы подготовили “боевую” витрину под конкретные источники/домены (AD/VPN/WEB/EDR) и ваши правила корреляции — мы можем собрать whitelist атрибутов, оптимизировать типизацию и помочь выстроить SLO по задержке и стоимости хранения.