Перейти к основному содержимому
Версия: 4.5.X

Работа с фильтрами

После того, как события были получены от источника и прошли процесс нормализации, они попадают в хранилище и коррелятор. Для выбора событий, соответствующих необходимым критериям, используются фильтры (запросы). Их можно формировать с помощью графического конструктора, который автоматически преобразовывает блоки конструктора в выражение для поиска событий по поставленным условиям. Продвинутые пользователи могут сразу использовать язык Lua, не обращаясь к графическому представлению фильтров. В изделии на странице создания фильтров есть встроенное руководство по Lua, объясняющее работу с полями событий и синтаксис языка.

Графическое создание фильтров

Фильтры представляют собой набор логических выражений, объединенных между собой операторами И/ИЛИ. В выражениях может быть использовано множество различных операторов сравнения, например: «равно», «не равно», «содержит», «не содержит», «начинается с», «не начинается с», «содержит примерно» и др. Расположение: "События" ⇒ Фильтры.

Действия:

  1. Нажать на кнопку Добавить
  2. В открывшейся вкладке ввести название фильтра и его описание
  3. Во вкладке «Конструктор фильтра» выбрать условия объединения параметров запроса («И» или «ИЛИ», по умолчанию используется «И»)
  4. Добавить ограничение на поле события, нажав на кнопку Добавить
  5. Задать необходимые ограничения на поля событий, выбрав нужное поле и воспользовавшись доступными для данного поля операторами
  6. Сохранить запрос

Операции

Во вкладке "Конструктор фильтра", в зависимости от типа поля к нему могут применяться разные операции

  • Равно
  • Не равно
  • Равно, игнорируя регистр
  • Не содержит, игнорируя регистр
  • Начинается с
  • Не начинается с
  • Заканчивается на
  • Не заканчивается на
  • Пусто
  • Не пусто
  • Входит в подсеть
  • Не входит в подсеть
  • До
  • После
  • Исключая дни недели
  • Между
  • Больше
  • Больше или равно
  • Меньше
  • Меньше или равно
  • Не содержит
  • Содержит
  • Содержит примерно - показывать событие с заданным значением отклонения от эталонного
  • Содержит один из - показывать события, где есть хотя бы один из выбранных параметров
  • Содержит все - показывать события, которые содержат одновременно все выбранные параметры
  • Не содержит один из - не показывать события, где есть хотя бы один из выбранных параметров
  • Не содержит все - не показывать события, которые содержат одновременно все выбранные параметры

Поля категоризации

event.kind

event.kind дает высокоуровневую информацию о том, какой тип информации содержит событие, без указания содержания события. Например, значения этого поля отличают события предупреждений от событий метрик.

Значение этого поля может использоваться для информирования о том, как следует обрабатывать события такого рода. Они могут гарантировать различное хранение, другой контроль доступа, это также может помочь понять, поступают ли данные через регулярные промежутки времени или нет.

event.category

event.category представляет «большие ведра» категорий ECS. Например, фильтрация по event.category:process дает все события, связанные с активностью процесса. Это поле тесно связано с полем event.type, которое используется как подкатегория.

Это поле представляет собой массив. Это позволит правильно классифицировать некоторые события, попадающие в несколько категорий.

event.type

event.type представляет собой «подсегмент» категоризации, который при использовании вместе с event.category значениями полей позволяет фильтровать события до уровня, подходящего для одиночной визуализации.

Это поле представляет собой массив. Это позволит правильно классифицировать некоторые события, которые относятся к нескольким типам событий.

event.outcome

event.outcome просто обозначает, представляет ли событие успех или неудачу с точки зрения объекта, вызвавшего событие.

Обратите внимание, что когда одна транзакция описывается в нескольких событиях, каждое событие может содержать разные значения event.outcome, в зависимости от их точки зрения.

Также обратите внимание, что в случае составного события (одно событие, содержащее несколько логических событий) это поле должно быть заполнено значением, которое лучше всего отражает общий успех или неудачу с точки зрения производителя события.

Кроме того, обратите внимание, что не все события будут иметь связанный результат. Например, это поле обычно не заполняется для событий метрик, событий с event.type:info значком или любых событий, для которых результат не имеет логического смысла.

Создание фильтров с помощью Lua

Расположение: "События" ⇒ "Фильтры"

Действия:

  1. Нажать на кнопку Добавить

  2. В открывшейся вкладке ввести название фильтра и его описание

  3. Во вкладке «Код» набрать текст запроса на языке Lua

  4. Сохранить запрос

примечание

В случае, если выбран вариант самостоятельного написания запроса на языке Luа, содержимое страницы конструктора запроса будет оставаться пустым

Структура запроса

Запрос должен быть оформлен как функция с фиксированной входной переменной:

event (событие):
function filter(event)
-- код запроса
end

Можно объявлять дополнительные функции и переменные как до объявления функции filter так и после.

Переменные

Lua – язык с динамической типизацией. В языке нет определений типов, каждое значение несет свой собственный тип. Переменная может быть:

  1. Числом, например:

    local number = 42;

  2. Строкой, например:

    local str = "string";

  3. Таблицей (ассоциативным массивом), например:

    local empty_table = {}

    local pre_defined_table = {field="data"}

    Таблица - основной тип данных в Lua. Таблица может быть как массивом, так и классом с методами. Переменная event является таблицей, содержащей поля события, значения которых можно извлекать по ключу-названию.

    Извлеченные данные можно помещать в переменные без предварительного объявления:

    cl = event:get('Info_Classification')

    Если значение переменной равно nil, значит нет данных для извлечения по таким условиям.

  4. Булевым значением (true или false. Так же nil можно воспринимать как false), например:

    local empty = nil or {}

    В итоге empty будет пустой таблицей, а не nil.

  5. userdata - специфичный тип для данных, которых изначально в Lua нет. По сути, это типы данных того языка, на котором Lua выполняется.

Извлечение данных из событий

В качестве аргумента указанных ниже функций задается имя поля события. Название поля события может заключаться:

в одинарные кавычки: 'TenantID'

в двойные кавычки: "TenantID"

Для извлечения из события значения поля можно использовать метод get: event:get("Raw")

Тип извлеченного значения будет таким же, как и в событии. Для уточнения см. руководство по языку Lua «Поля событий».

Небольшой пример:

local event_code = event:get("ECS.Event.Code")

Оператор local объявляет переменную в локальной области видимости (функция, тело цикла).

По умолчанию объявленная без local переменная будет глобальной. Если event_code == nil, значит в событии отсутствует поле "ECS.Event.Code". Также можно извлекать данные явно, указывая их тип. Описания возвращаемых значений для событий представлены в таблице.

МетодОписание
event:getNumber('TenantID')Извлечение числа из поля события
event:getIP("ECS.Source.IP")Извлечение IP-адреса
event:getIPFromNumber("DrWeb.Event.IP")Извлечение IP-адреса, закодированного в число int64
event:getIPs("AssetIPs")Извлечение массива IP адресов
event:getString ('FwdIP')Извлечение строки. Важно: любое поле можно извлечь как строку
event:getStrings('AssetIPs')Извлечение массива строк
event:getBool ('BoolField')Извлечение булево значения

Как пример, следующий фрагмент кода, вставленный в самое начало фильтра, будет прекращать работу фильтра, если полученные события не получены от узла с IP-адресом 10.0.0.34:

if (event:get('FwdIP') ~= "10.0.0.34") then
return
end

Работа со временем

Описания возвращаемых значений из метода get для работы со временем представлены в таблице.

МетодОписание
local gen_time = event:get("ECS.File.Accessed")
local evt_time = event:get("ECS.Event.Start")
if gen_time:After(evt_time) then return end
Возвращается значение типа bool
local gen_time = event:get("ECS.File.Accessed")
local evt_time = event:get("ECS.Event.Start")
if evt_time:Before(gen_time) then return end
Возвращается значение типа bool. Результат будет аналогичен предыдущему
hour, min, sec = event:get('WTime'):Clock()Извлечение из поля события значений часа, минут и секунд в отдельные переменные
hyear, month, day = event:get('WTime'):Date()Извлекает из поля события год, месяц и день в отдельные переменные
year = event:get('WTime'):Year()Извлечение года
month = event:get('WTime'):Month()Извлечение месяца
day = event:get('WTime'):Day()Извлечение дня месяца
hour = event:get('WTime'):Hour()Извлечение дня месяца
min = event:get('WTime'):Minute()Извлечение минуты
sec = event:get('WTime'):Second()Извлечение секунды
timeString = event:get('WTime'):String()Извлечение времени в виде строки формата «2006-01-02 15:04:05.999999999 -0700 MST»
wday = event:get('WTime'):Weekday()Извлечение дня недели: Воскресенье – 0, Понедельник – 1,
Вторник – 2, Среда – 3, Четверг – 4, Пятница – 5, Суббота – 6
yeard = event:get('WTime'):YearDay()Функция возвращает номер дня в году

Строки

local inspect = require("inspect")
local strings = require("strings")-- strings.split(string, sep)
local result = strings.split("a b c d", " ")
print(inspect(result, {newline="", indent=""}))
-- Output: { "a", "b", "c", "d" }-- strings.has_prefix(string, prefix)
local result = strings.has_prefix("abcd", "a")
-- Output: true-- strings.has_suffix(string, suffix)
local result = strings.has_suffix("abcd", "d")
-- Output: true-- strings.trim(string, cutset)
local result = strings.trim("abcd", "d")
-- Output: abc-- strings.contains(string, substring)
local result = strings.contains("abcd", "d")
-- Output: true

Поиск подстроки

first, last = string.find(source, substring)

Где:

  • first - индекс начала подстроки в исходной строке
  • last - индекс конца подстроки в исходной строке

В качестве подстроки можно задавать паттерны, как в регулярном выражении.

Поиск значения в таблице

for key, value in pairs(tb) do 
if value == element then
k = key
end
осторожно

Если tb == nil, то будет ошибка

Работа с IP

Доступные методы представлены в следующей таблице.

МетодОписание
EqualЕдинственный аргумент - переменная того же типа ip. Возвращаемое значение - bool
StringВозвращает ip, записанный в строку

Условия

Для проверки условия используется конструкция if ... then:

if (category == 'ET EXPLOIT ETERNALBLUE Exploit M2 MS17- 010') and 
(cl == 'Attempted Administrator Privilege Gain') then
return {IP=ip}
end

Циклы

Для работы с массивами и таблицами можно использовать цикл for:

for key, value in pairs(assets) do 
-- код логики фильтрации событий
end

где assets - таблица (ассоциативный массив).

В языке Lua массивы тоже являются таблицами, в которых индекс элемента - ключ.

pairs(assets) - позволяет последовательно проходить по полям таблицы.

Использование массивов и локальных функций

В фильтрах можно задавать списки значений для сравнения значений, извлекаемых из событий. Для этого потребуется объявить массив и реализовать функцию по проверке вхождения значения в массив.

Массив задается следующим образом:

users = {'admin','administrator','root'}
Локальная функция поиска значения в массиве:
local function array_has_value (tab, val)
for index, value in ipairs(tab) do
if value == val then
return true
end
end
return false
end

Возвращаемые значения

Для события, которое подходит под фильтр, функция filter обязательно должна возвращать параметры события, которые впоследствии могут использоваться в директивах корреляции:

local Table = {}
Table["asset_ips"] = event:get('AssetIPs')
Table["src_ip"] = event:get('ECS.Source.IP')
return Table

В директиве использование этой переменной будет выглядеть так: ep.asset_ip != ep.src_ip

Регулярные выражения

При работе с фильтрами можно использовать регулярные выражения.

Пример:

local regexp = require("regexp")
local inspect = require("inspect")local fieldValue = e.getString("Raw")-- regexp.match(regexp, data)
local result, err = regexp.match(fieldValue, "hello world")if err then
return
end
-- regexp.find_all_string_submatch(regexp, data)
local result, err = regexp.find_all_string_submatch("string: '(.**)\\s+(.**)'$", fieldValue)
if err then
return
endif not(result[1][2] == "hello") then
local result = tostring(result[1][2]))
endif not(result[1][3] == "world") then
local result = tostring(result[1][3])
end-- regexp:match()
local reg, err = regexp.compile("hello")
if err then error(err) end
local result = reg:match(fieldValue)
if not(result==true) then
return
end
-- regexp:find_all_string_submatch()
local reg, err = regexp.compile("string: '(.**)\\s+(.**)'$")
if err then
error(err)
end
local result = reg:find_all_string_submatch(fieldValue)
local result = inspect(result, {newline="", indent=""})
if not(result == [[{ { "string: 'hello world'", "hello", "world" } }]]) then error("regexp:find_all_string_submatch()") end-- regexp.find_all_string_submatch(regexp, data)
local result, err = regexp.find_all_string_submatch("string: '(.**)\\s+(.**)'$", fieldValue)
if err then error(err) end
if not(result[1][2] == "hello") then error("not found: "..tostring(result[1][2])) end
if not(result[1][3] == "world") then error("not found: "..tostring(result[1][3])) end

XML

Извлекать XML:
----
local xmlpath = require("xmlpath")local data = [[
<channels>
<channel id="1" xz1="600" />
<channel id="2" />
<channel id="x" xz2="600" />
</channels>
]]
local data_path = "//channel/@id"-- xmlpath.load(data string)
local node, err = xmlpath.load(data)
if err then error(err) end-- xmlpath.compile(path string)
local path, err = xmlpath.compile(data_path)
if err then error(err) end-- path:iter(node)
local iter = path:iter(node)for k, v in pairs(iter) do print(v:string()) end
-- Output:
-- 1
-- 2
-- x

Неэкранированные строки JSON

И работать с неэкранированными строками JSON в lua-фильтрах:
local json = require("json")
local inspect = require("inspect")-- json.decode()
local jsonString = [[
{
"a": {"b":1}
}
]]
local result, err = json.decode(jsonString)
if err then
error(err)
end
local result = inspect(result, { newline = "", indent = "" })
if not (result == "{a = {b = 1}}") then
error("json.decode")
end-- json.encode()
local table = { a = { b = 1 } }
local result, err = json.encode(table)
if err then
error(err)
end
local result = inspect(result, { newline = "", indent = "" })
if not (result == [['{"a":{"b":1}}']]) then
error("json.encode")
end

Контрольные суммы

Производить подсчёт контрольных сумм:

local crypto = require("crypto")-- md5
if not(crypto.md5("1\n") == "b026324c6904b2a9cb4b88d6d61c81d1") then
error("md5")
end-- sha256
if not(crypto.sha256("1\n") == "4355a46b19d348dc2f57c046f8ef63d4538ebb936000f3c9ee954a27460dd865") then
error("sha256")
end

Специальные функции

Имеется ряд специальных функций для работы с данными:

  1. Расстояние Левенштейна. Возвращает число, расстояние Левенштейна между значением ПолеСобытия и подстрокой: event:levenstein ("ПолеСобытия", "подстрока")

  2. Поиск по IP:

    • возвращает true, если связанные с событиями активы имеют указанный IP-адреса, либо входят в подсеть, либо в интервал IP-адресов. Примеры интервалов: 10.0.4.10-10.0.4.25, 192.168.0.1/24.
    • возможно извлечь IPv4 в виде строки из представления в виде числа uint64 с помощью getIPFromNumber. Такое представление, например, используется в базе данных Dr.Web

    Пример:

    event:hasAssetIP("10.0.4.10")
    ---
    event:hasAssetIP("10.0.4.10-10.0.4.25")
    ---
    event:hasAssetIP("192.168.0.1/24")
    ---
    event:isIPInSubnets("ECS.Source.IP", "<интервал ip адресов>")
    ---
    event:isIPInSubnets("ECS.Source.IP", "10.0.4.10")
    ---
    event:isIPInSubnets("ECS.Source.IP", "10.0.4.10-10.0.4.25")
    ---
    event:isIPInSubnets("ECS.Source.IP", "192.168.0.1/24")
    ---
    event:isIPInSubnets("ECS.Source.IP", event:getIPFromNumber("DrWeb.IPInUint64"))
    • Поиск по времени. Возвращает true, если распознанное время в поле предшествует указанного времени, либо позже указанного времени:
    event:before("CTime", "2021-06-24T17:38:29+03:00")
    ---
    event:after("CTime", "2021-06-24T17:38:29+03:00")
    • Поиск аномалий смены раскладки. Злоумышленник может использовать метод подмены символа в имени пользователя, идентификаторе на похожий в другой раскладке, например, буквы "а" в кириллице и латинице. Функция hasNonASCIICharsAtLeast проверяет число вхождений не-ASCII символов. Функция hasASCIICharsAtLeast проверяет число вхождений ASCII символов:

    -- false для Иван, true для Ив<латинская а>н

    event:hasASCIICharsAtLeast("ECS.User.Name", 1)

    -- false для ivan, true для iv<русская а>n

    event:hasNonASCIICharsAtLeast("ECS.User.Name", 1)

    • Поиск вхождения символа. Для поиска на вхождение UTF символа в строку поля события используйте:

    -- true для примеров: "Ãa", "ßb", "©c", "Ãß©", "Ãß", "é"

    -- false для примеров: "a", "b", ""

    event:containsAny("ECS.User.Name", "Ãß©")

    • Поиск вхождения строки. Для поиска на равенство строки поля события типов string, string_slice используйте:
    -- ECS.User.Name = "I.Petrov"
    -- true
    event:hasString("ECS.User.Name", "i.petrov")
    -- false
    event:hasString("ECS.User.Name", "I.petrov")
    -- true
    event:hasStringCaseInsensitive("ECS.User.Name", "i.petrov")
    ---
    -- ECS.Related.Users = ["i.petrov", "a.ivanov", "M.Petrov"]
    -- true
    event:hasString("ECS.User.Name", "a.petrov")
    -- false
    event:hasString("ECS.User.Name", "m.petrov")
    -- true
    event:hasStringCaseInsensitive("ECS.User.Name", "m.petrov")

    Для поиска на вхождение подстроки поля события типов string, string_slice используйте:

    -- ECS.User.Name = "I.Petrov"
    -- true
    event:hasSubString("ECS.User.Name", "Pet")
    -- false
    event:hasSubString("ECS.User.Name", "fetr")
    -- true
    event:hasSubStringCaseInsensitive("ECS.User.Name", "pet")
    ---
    -- ECS.Related.Users = ["i.petrov", "a.ivanov", "M.Petrov"]
    -- true
    event:hasSubString("ECS.User.Name", "petr")
    -- false
    event:hasSubString("ECS.User.Name", "Petr")
    -- true
    event:hasSubStringCaseInsensitive("ECS.User.Name", "m.p")

    Для поиска на начало строки у поля события типов string, string_slice используйте:

    -- ECS.User.Name = "I.Petrov"
    -- true
    event:startsWith("ECS.User.Name", "I.Pet")
    -- false
    event:startsWith("ECS.User.Name", "fetr")
    -- true
    event:startsWithCaseInsensitive("ECS.User.Name", "i.p")
    ---
    -- ECS.Related.Users = ["i.petrov", "a.ivanov", "M.Petrov"]
    -- true
    event:startsWith("ECS.User.Name", "i.p")
    -- false
    event:startsWith("ECS.User.Name", "Petr")
    -- true
    event:startsWithCaseInsensitive("ECS.User.Name", "m.p")

    Для поиска пересечений комбинаций строк string, string_slice используйте:

    -- User.Array = "foo,bar,beta.times,beta.plus"
    -- true
    event:isSubsetOfStrings("User.Array", "foo,bar")
    -- false
    event:isSubsetOfStrings("User.Array", "foo,baz")
    -- true
    event:isSupersetOfStrings("User.Array", "foo,baz")
    -- true
    event:isEqualSetOfStrings("User.Array", "foo,baz")
    ---
    -- ECS.Related.Users = ["i.petrov", "a.ivanov", "M.Petrov"]
    -- true
    event:isSubsetOfStrings("ECS.Related.Users", "i.petrov,a.ivanov")
    -- false
    event:isSubsetOfStrings("ECS.Related.Users", "M.Petrov,a.ivanov")
    -- true
    event:isSupersetOfStrings("ECS.Related.Users", "a.ivanov")
    -- true
    event:isEqualSetOfStrings("ECS.Related.Users", "i.petrov")

Пример фильтра

Данный фильтр поможет искать нам события в определённые интервалы времени с точностью до секунды.

В примере с 00:00:00 до 10:47:00

Принцип работы - заполнить левую границу и правую границу left bound и right bound

local strings = require("strings")

function is_time_in_range(h, m, s)
-- left bound
local l_hour = 0
local l_minute = 0
local l_second = 0
local l_time = l_hour * 60 * 60 + l_minute * 60 + l_second

-- right bound
local r_hour = 10
local r_minute = 47
local r_second = 0
local r_time = r_hour * 60 * 60 + r_minute * 60 + r_second

-- logic
local cur_time = h * 60 * 60 + m * 60 + s

if l_time <= r_time then
if (cur_time >= l_time) and (cur_time <= r_time) then
return true
end
else
if cur_time >= l_time or cur_time <= r_time then
return true
end
end
return false
end

function filter(event)
local ret = {}
local ok_1 = false
-- and
local tmp = event:getString("GenerationTime")
local h_str,m_str,s_str = tmp:match("T(%d%d):(%d%d):(%d%d)")

local h_num = tonumber(h_str)
local m_num = tonumber(m_str)
local s_num = tonumber(s_str)

do
local ok_2 = is_time_in_range(h_num, m_num, s_num)
if ok_2 then
ok_1 = true
end
end
if ok_1 then return ret end
end

Ретро-индексация и ретро-фильтрация

При изменении фильтра меняется и результат поиска по данному фильтру, это называется ретро-индексацией.

Для поиска события в прошлом используется ретро-фильтрация. Ретро-фильтрация — это фильтр с диапазоном в прошлое. Его можно использовать для поиска событий или для проверки работы фильтра.

Ограничения ретро-индексации. Для ограничения нагрузки на процессор ретро-индексация фильтра происходит по очереди, это значит, что Вы не сможете изменить другой фильтр в момент, когда происходит ретро-индексация уже изменённого.

По умолчанию диапазон поиска в конфигурации сервера установлен на 24 часа.

к сведению

Для того чтобы увеличить диапазон поиска по событиям, необходимо добавить в конфигурационный файл komrad-processor.yaml параметр ttl-days

Порядок выполняемых действий:

  1. Для редактирования конфигурационного файла выполните команду:

    sudo nano /etc/echelon/komrad/komrad-processor.yaml
  2. Добавьте строку ttl-days: 365 в Параметры блока индексации событий ИБ

    # Параметры блока индексации событий ИБ
    indexation:
    debug: false
    # Таймаут сохранения индекса (при превышении индекс будет утерян)
    putindextimeout: 3m0s
    ttl-days: 365
  3. Перезапустите KOMRAD-процессор

    sudo systemctl restart komrad-processor.service
примечание

После перезагрузки все вновь созданные фильтры будут работать в диапазоне 365 дней. Для корректной работы ранее созданных фильтров необходимо произвести их переиндексацию.