Перейти к основному содержимому
Версия: 4.5.X

http-scraping

Введение

Плагин http-scraping позволяет отправлять HTTP-запросы на различные хосты и создавать событие на основе возвращаемого ответа. Если сравнивать этот плагин с функционалом HTTP-коллектора, то можно представить их как подходы pull и push.

Push-подход HTTP-коллектора работает следующим образом: источник, у которого есть событие, сам посылает его на коллектор (т.е. событие "проталкивается" на коллектор).

Pull-подход плагина http-scraping же работает иначе: источник, у которого есть событие, отдает его по HTTP. Коллектор с плагином делает запрос на сервер-источник, чтобы это событие получить (т.е. событие "извлекается" коллектором).

Помимо создания событий на основе тела ответа, данный плагин также может быть полезен, чтобы отслеживать задержку ответа, либо следить за состоянием TLS-сертификатов сервера.

Конфигурация

Конфигурация плагина состоит из объекта http-scraping, в котором имеется два поля:

  1. targets — список целей для HTTP-запросов. Их можно сравнить с scrape_config в Prometheus. Их структура будет описана далее
  2. include-response-headers — список заголовков ответа, которые будет включены в сформированное событие

Вложенность в объект http-scraping требуется для идентификации плагина.

Конфигурация цели включает в себя множество полей. Рассмотрим их:

  • headers — заголовки запроса в виде словаря "строка-строка"
  • query — query-параметры URL, в конфигурации задаются как словарь, где ключом является строка, а значением - либо строка, либо массив строк
  • url — URL запроса
  • body — тело запроса
  • method — метод запроса. По умолчанию равен GET
  • username — имя пользователя для Basic Auth
  • password — пароль пользователя для Basic Auth
  • bearer-token-file — путь к файлу с токеном для авторизации (для авторизации на основе токенов)
  • tls — параметры TLS. Схожи с параметрами в конфигурационных файлах сервисов KOMRAD с единственным исключением: вместо того, чтобы указывать, выключен ли TLS (disable: false), здесь указывается, включен ли он (enable: true)
  • period — частота запросов в виде time.Duration. Задается строкой по типу "2s", "5h" и т.п.
  • timeout — таймаут для запроса, задается в виде time.Duration
  • deduplication — подход к дедупликации событий
  • pagination — блок продвинутой логики (в основном для пагинации)

Пример конфигурации:

http-scraping:
include-response-headers:
- Date
- Expires
targets:
- headers:
Cookie: "session=session_key"
query:
plain_param: param
list_param:
- param1
- param2
url: example.com/api/path
body: '{"hello": "world"}'
method: POST
tls:
enabled: true
TrustedCA: "/path/to/cert"
system-pool: true
period: 15s
timeout: 3m
deduplication: none
pagination:
type: json
transforms:
- set:
target: url.params.page
value: "{{ persistent.page }}"
persistent_values:
page:
value: last_response.body.page

Дедупликация событий

Для дедупликации событий в цели можно использовать параметр deduplication в конфигурации одной цели. Основной механизм работы дедупликации - плагин запоминает определенный параметр ответа цели, который в дальнейшем сравнивается с аналогичным параметром нового ответа. Если они совпадают - новое событие не создается. Далее перечислены возможные стратегии дедупликации:

СтратегияПринцип работы
noneДедупликация отсутствует
bodyСобытия дедуплицируются на основе тела ответа
response_codeСобытия дедуплицируются на основе кода состояния ответа
errorСобытия дедуплицируются на основе произошедшей (или нет) ошибки

Например, мы дедуплицируем события на основе кода состояния ответа. Допустим, у нас следующая последовательность кодов ответа: 200 500 500 200 200. Тогда в результате получим 3 события: 200 500 200, т.к. вторые (и так далее) события убираются.

Запросы с учетом пагинации

Плагин предоставляет несколько мощных инструментов, позволяющих добавлять достаточно сложную логику в создание запросов. Источником вдохновения служил модуль HTTP JSON в filebeat. Эти инструменты описываются в блоке pagination конфигурации.

В данный момент есть только два типа пагинации: отсутствующая пагинация - none (по умолчанию), и пагинация json на основе JSON.

Среди инструментов пагинации json:

  • модификация запроса на основе шаблонов Liquid перед его отправкой
  • модификация запросов для пагинации и их отправка в рамках одной "сессии сбора" (т.е. несколько запросов идут подряд, без интервала)
  • сохранение значений на диске

Перед тем, как рассматривать параметры конфигурации, необходимо рассмотреть то, как работает пагинация json. Для этого используем диаграмму последовательностей:

persistentHTTP-клиентБлок применения измененийПагинация jsonpersistentHTTP-клиентБлок применения измененийПагинация jsontransformsЗапросЗавершениеbreak[кодстатуса >=400]after_response_transformspersistent_valuespagination_transformsЗавершениеbreak[отсутствиеизмененийпагинацииилипровал]loop[Цикл применения запросов]Передача долговечных переменных и содержимого последнего HTTP-ответа (если есть)1Первоначальное изменение HTTP-запроса и переменных2HTTP-запрос3HTTP-ответ4Передача переменных (долговечных и нет), содержимого ответа5Изменение HTTP-запроса и переменных6Передача долговечных переменных7Перевычисление и сохранение долговечных переменных8Передача переменных, содержимого ответа9Изменение HTTP-запроса и переменных10

Цикл запросов:

  • 0: (при запуске плагина) извлечение с диска или создание долговечных переменных, создание шаблона запроса (на базе основной конфигурации http-scraping)

  • 1-2: сброс переменных цикла, применение трансформаций transforms к шаблону запроса и переменным цикла. Если произойдет провал применения трансформаций - выход из цикла

  • 3-4: создание запроса на основе шаблона, передача основному http-scraping, выполнение запроса клиентом и создание события на основе ответа. Если на данном этапе произойдет провал запроса, либо ответ будет указывать на неудачное исполнение (код статуса ответа - ошибка клиента или сервера), то это приведет к выходу из текущего цикла запросов

  • 5-6: применение трансформаций after_response_transforms для изменения запроса и переменных цикла. Если произойдет провал применения трансформаций - выход из цикла

  • 7-8: Применение блока persistent_values - перевычисление значений долговечных переменных и их сохранение

  • 9-10: применение трансформаций пагинации pagination_transforms. Если произойдет провал, либо блок pagination_transforms пустой - выход из цикла. Иначе - возвращение на шаг 3

Контекст трансформаций (состояние пагинации)

Блок пагинации сохраняет состояние, которое можно применять в трансформациях. В состоянии (или, иначе, контексте трансформаций) на разных этапах можно встретить следующие элементы:

  • persistent - долговечные переменные, сохраняющиеся между циклами запросов и даже запусками коллектора
  • vars - переменные текущего цикла запросов, сбрасываются при каждом цикле
  • last_response.url.value - полный URL запроса, связанного с последним успешным ответом
  • last_response.url.params - отображение (map) для параметров (query) URL запроса. Если говорить точнее, то это Go-тип url.Values
  • last_response.body - тело последнего успешного ответа (JSON)
  • last_response.page - номер "страницы" последнего успешного ответа, т.е. порядковый номер ответа (начиная с нуля) в цикле запросов
  • last_response.header - заголовки последнего успешного ответа (в виде отображения (map)).
  • url - URL запроса в виде Go-типа url.URL
  • body - тело запроса (JSON)
  • header - заголовки запроса

Все значения (кроме persistent и vars) сохраняются на протяжении всего времени работы плагина (т.е. до его выключения или выключения коллектора).

Трансформации

В конфигурации описывается три блока трансформаций - transforms, after_response_transforms и pagination_transforms. То, когда они применяются, описано выше. Обсудим здесь то, как они применяются.

Каждая трансформация относится к одному из трех типов:

  • set - установить значение
  • append - добавить значение к массиву
  • delete - удалить значение
set

Трансформация set устанавливает значение в некотором элементе (теле, заголовках и т.д.), переопределяя старое, если есть. В конфигурации данной трансформации содержатся следующие поля:

  • target - целевое поле, куда устанавливается значение
  • value - Liquid-шаблон, возвращающий при применении значение
  • default - Liquid-шаблон, используемый при ошибке применения шаблона value
  • fail_on_template_error - вызывать провал при ошибочном применении шаблона, что приводит к выходу из цикла запросов. Если установлено в false, то при провале изменения не применятся
  • value_type - тип значения (string, int или json)
append

Трансформация append добавляет значение к массиву (или создает массив из одного элемента, если его не было). Поля:

  • target - целевое поле, куда добавляется значение
  • value - Liquid-шаблон, возвращающий при применении значение
  • default - Liquid-шаблон, используемый при ошибке применения шаблона value
  • fail_on_template_error - вызывать провал при ошибочном применении шаблона, что приводит к выходу из цикла запросов. Если установлено в false, то при провале изменения не применятся
  • value_type - тип значения (string, int или json)
delete

Трансформация delete удаляет заданное поле. Параметры:

  • target - удаляемое поле

Шаблоны

В модуле пагинации для шаблонов используется язык Liquid, аналогично шаблонам уведомлений электронной почты. Помимо стандартных фильтров, также определены дополнительные:

ФильтрОписаниеАргументыВозвращаемое значение
toUnixMillisecondsКонвертирует время (time.Time) в целое число, представляющее количество миллисекунд в формате Unixtime.Timeint64
unixMillisecondsToDateКонвертирует целое число, представляющее количество миллисекунд в формате Unix, во время (time.Time)int64time.Time
toUTCМеняет часовой пояс времени на UTC (соответственно корректирует само время, чтобы оно указывало на тот же момент времени)time.Timetime.Time

Конфигурация

Конфигурация зависит от типа пагинации, которая определяется полем type (возможные значения: none, json). Поскольку в настоящее время поддерживается только пагинация на основе JSON, дальше будет описание полей для данного типа.

transforms

В этом блоке перечисляются трансформации, применяемые только в начале цикла запросов.

Поля для чтения: last_response.* (если это не самый первый цикл), persistent.*, header.*, url.*, body.*.

Поля для записи: header.*, url.*, body.*, vars.*.

В целом, данный блок нужен для модификации запроса или создания переменных (например, на основе долговечных переменных). Поскольку в начале каждого цикла запросов недолговечные переменные обнуляются, читать их смысла нет.

Пример:

transforms:
- set:
target: url.params.since
value: "{% if persistent.since %}{{ persistent.since }}{% endif %}"
after_response_transforms

В этом блоке перечисляются трансформации, применяемые после каждого ответа. Этот блок полезен, чтобы установить значения в переменные и дальше использовать их при установке долговечных переменных.

Поля для чтения: last_response.*, persistent.*, vars.*, header.*, url.*, body.*.

Поля для записи: header.*, url.*, body.*, vars.*.

Пример:

after_response_transforms:
- append:
target: vars.array
value: "{{ last_response.body.value }}"
persistent_values

В блоке persistent_values указываются долговечные переменные. Этот блок представлен в виде словаря "имя переменной - конфигурация".

Конфигурация каждой переменной содержит следующие поля:

  • value - Liquid-шаблон, возвращающий при применении значение
  • default - Liquid-шаблон, используемый при ошибке применения шаблона value
  • ignore_empty_values - булево значение, определяющее то, менять ли значение переменной, если результатом шаблона является пустое значение (т.е., если ignore_empty_values: true, то при возвращении шаблоном пустого значение старое значение переменной не переопределяется)

Пример:

persistent_values:
page:
value: "{{ url.params.page | plus: 1 }}"
ignore_empty_values: true
key:
value: "{{ last_response.body.key }}"
pagination_transforms

Блок pagination_transforms содержит трансформации, применяющиеся для создания следующего запроса в цикле (для получения следующей страницы). Этот блок примечателен тем, что с его помощью можно управлять пагинацией и циклом запросов в целом - если любая трансформация в этот блоке вернет ошибку, то текущий цикл завершится.

Поля для чтения: last_response.*, persistent.*, vars.*, header.*, url.*, body.*.

Поля для записи: header.*, url.*, body.*, vars.*.

Пример:

pagination_transforms:
- set:
target: url.params.page
value: "{% if last_response.body.has_more_pages %}{{ url.params.page | plus: 1}}{% endif %}"
fail_on_template_error: true

Парсинг ответа

Плагин парсит множество полей, связанных с HTTP-соединением. В таблице ниже перечислены извлекаемые поля:

ПолеЗначениеКогда добавляется к событию
ECS.Destination.AddressАдрес сервераПри создании подключения
(обычно TCP)
ECS.Destination.DomainДомен сервераПри начале DNS запроса
ECS.Destination.IPIP сервераВ конце DNS запроса
ECS.Event.DurationДлительность запросаПосле получения ответа
ECS.Event.StartВремя начала запросаПосле получения ответа
ECS.Event.EndВремя конца запроса (время получения ответа)После получения ответа
ECS.HTTP.ResponseBodyContentТело ответаПосле получения ответа
ECS.HTTP.ResponseBodyBytesРазмер тела ответа в байтахПосле получения ответа
ECS.HTTP.VersionВерсия HTTPПосле получения ответа
ECS.HTTP.Header.*Значения заголовков, заданных в поле include-response-headersПосле получения ответа
ECS.TLS.VersionВерсия TLSПосле получения ответа
ECS.TLS.CipherАлгоритм шифрования TLSПосле получения ответа
ECS.TLS.ResumedБыло ли соединение TLS возобновленоПосле получения ответа
ECS.TLS.EstablishedБыло ли соединение TLS успешно установленоПосле получения ответа
ECS.TLS.ServerIssuerИздатель сертификата сервераПосле получения ответа либо при неудачном рукопожатии TLS
ECS.TLS.ServerNotAfterВремя окончания сертификата сервераПосле получения ответа либо при неудачном рукопожатии TLS
ECS.TLS.ServerNotBeforeВремя начала действия сертификата сервераПосле получения ответа либо при неудачном рукопожатии TLS
ECS.TLS.ServerSubjectСубъект сертификата сервераПосле получения ответа либо при неудачном рукопожатии TLS

Примеры

Проверка здоровья сервиса

В разработке сервисов есть паттерн "проверки здоровья" (healthcheck), помогающий узнать, работает ли сервис исправно или есть какие-то неполадки. Для этого в HTTP API добавляется дополнительный маршрут (обычно, его называют /health), который в нормальной ситуации возвращает код состояния 200 и какую-то информацию, например, о состоянии системы). Если информации нет, можно возвращать просто какой-либо текст (например, текст кода состояния).

Можем использовать плагин http-scraping для того, чтобы создавать события, когда сервис не здоров. В качестве подопытного используем сам KOMRAD. Для сбора информации о здоровье используем следующий конфигурационный файл:

http-scraping:
targets:
- url: 'https://search-komrad.etecs.ru/api/health'
method: GET
tls:
enabled: true
TrustedCA: "{путь к CA сертификату}"
system-pool: true
period: 30s
timeout: 15s
deduplication: response_code

Чтобы отфильтровывать события с успешным кодом возврата, можем использовать плагин if:

processors:
- module: deny
cel:
- event.vars("ECS.HTTP.ResponseStatusCode") == 200

Поскольку, скорее всего, search-komrad работает, то событий мы получить не должны. Чтобы их получить, можем попробовать добавить опечатку. Например, поменять health на healthx. Тогда получим одно (т.к. добавлена дедупликация на основе кода статуса) событие с текстом 404 page not found, что указывает нам на отсутствие страницы и нашу опечатку.

Обнаружение истечения срока действия TLS-сертификата

Плагин http-scraping можно использовать для обнаружения истекших сертификатов. Для этих целей можно использовать любой URL, например, для проверки здоровья из предыдущего примера.

Для создания сервера с истекшим (или, точнее, близким к истеканию - 1 день до конца) сертификатом я сгенерировал следующий набор с помощью openssl:

  • Закрытый ключ корневого сертификата
  • Корневой сертификат
  • Закрытый ключ сертификата сервера
  • Сертификат сервера

Также это можно сделать с помощью komrad-cli.

Далее я создал простой сервер, который запускается с созданным сертификатом и ключ и обслуживает только один endpoint - /health.

Чтобы извлекать из этого сервера события, используем следующую конфигурацию:

http-scraping:
targets:
- url: 'https://localhost:33333/health'
method: GET
tls:
enabled: true
TrustedCA: "{путь к сгенерированному корневому сертификату}"
period: 15s
timeout: 5s

После запуска плагина будем получать события, где в поле ECS.TLS.ServerNotAfter будет указана дата окончания действия сертификата. Можем использовать ее для фильтрации и корреляции. Например, можем использовать плагин deny для отсечения событий с долгоживущими сертификатами:

processors:
- module: deny
cel:
- event.vars("ECS.TLS.ServerNotAfter") != null && (event.vars("ECS.TLS.ServerNotAfter") - now() > duration("72h"))
- module: assign
assign:
- target: CertIsGoingToBeExpired
cel: "true"

В данном плагине deny отсеиваются все события, оставшийся срок действия сертификата которых превышает три дня. Также с помощью модуля assign добавляется поле, указывающее на скорое истечение сертификата.

После применения плагинов http-scraping и deny мы будем получать только те события от http-scraping, в которых срок действия сертификата подходит к концу.

Отслеживание высокой задержки HTTP-запросов

С помощью плагина http-scraping совместно с deny или assign можно отслеживать высокую задержку HTTP-ответа от сервера. Для этого в конфигурации необходимо "нацелить" http-scraping на нужный URL. Например:

http-scraping:
targets:
- url: http://localhost:44444/
method: POST
body: Hello HTTP body!
period: 20s

В данном случае я написал простой сервер, который при запросе на "/" с методом POST засыпает на две секунды, а затем возвращает тело запроса в ответе. Чтобы отследить высокую задержку, можно написать еще один плагин. Я решил использовать assign, чтобы обогатить событие новым полем:

processors:
- module: assign
assign:
- target: HighLatency
cel: event.vars("ECS.Event.Duration") != null && event.vars("ECS.Event.Duration") > duration("1s")

Этот плагин записывает в поле HighLatency то, длился ли запрос больше одной секунды.

Сбор логов аудита Jira

Поскольку API Jira использует пагинацию, да и в целом нам нужно как-то отделять уже обработанные записи от новых, то этот случай потребует использования продвинутой логики пагинации. Т.к. API Jira использует JSON в качестве формата данных, то можем использовать пагинацию json.

Сперва рассмотрим, как получить возможность собирать логи. Для начала вам нужны права администратора, чтобы получить доступ к логам аудита (самому их можно посмотреть в Администрирование -> Система -> Журнал). Как только права получены, необходимо выпустить персональный токен доступа. Для этого переходим в свой профиль и открываем вкладку Персональные токены доступа. В этой вкладке нажимаем кнопку Создать токен, и далее задаем токену имя и срок действия. После создания копируем токен и сохраняем его в файл, он нам понадобится.

Чтобы составить конфигурацию, рассмотрим более подробно получение записей лога аудита. Для этого используется API /rest/auditing/1.0/events. Данное API использует пагинацию на основе токена (pageCursor в API). При данном подходе цикл запросов должен выглядеть примерно так:

  1. Делаем запрос без токена.

  2. Получаем ответ, извлекаем из него токен.

  3. Делаем запрос с извлеченным токеном, возвращаемся на шаг 2.

При такой ситуации конфигурация должна была бы выглядеть примерно так:

pagination:
type: json
transforms:
- set:
target: url.params.pageCursor
value: "{{ persistent.pageCursor }}"
persistent_values:
pageCursor:
value: "{{ last_response.body.pagingInfo.nextPageCursor }}"
ignore_empty_values: true
pagination_trasnforms:
- set:
target: url.params.pageCursor
value: "{{ last_response.body.pagingInfo.nextPageCursor }}"
fail_on_template_error: true

Однако, есть большой нюанс - API возвращает данные, отсортированные от более новых к более старым, и это поведение нельзя переопределить. Т.е. с каждой новой страницей мы возвращаемся назад в прошлое и теряем доступ к новым событиям. Чтобы избежать ситуации, когда мы теряем доступ к событиям, следует использовать следующий подход:

  1. Делаем запрос.

  2. Получаем ответ, запоминаем время самого нового события из всех, если еще не запоминали. Извлекаем токен пагинации, если его нет - переходим к шагу 4.

  3. Добавляем токен в запрос, возвращаемся на шаг 1.

  4. Убираем токен из запроса, добавляем сохраненное время в качестве фильтра "От" ("From"). Выходим из текущего цикла запросов.

Попробуем адаптировать этот алгоритм к конфигурации и получим окончательный вариант:

http-scraping:
targets:
- query:
limit: 1000
url: 'https://jira-dev.etecs.ru/rest/auditing/1.0/events'
method: GET
bearer-token-file: "путь к сохраненному файлу с токеном"
period: 1m
timeout: 30s
deduplication: body
pagination:
type: json
transforms:
- set:
target: url.params.from
value: "{% if persistent.timestamp %}{{ persistent.timestamp | plus: 1 | unixMillisecondsToDate | toUTC | date: '%Y-%m-%dT%H:%M:%S.%LZ'}}{% endif %}"
after_response_transforms:
- set:
target: vars.timestamp
value: >
{%- assign first_entity = last_response.body.entities | first -%}
{%- assign first_entity_timestamp = first_entity.timestamp | toUnixMilliseconds -%}
{%- unless vars.timestamp and first_entity_timestamp <= vars.timestamp -%}
{{- first_entity_timestamp -}}
{%- else -%}
{{- vars.timestamp -}}
{%- endunless -%}
value_type: int
pagination_transforms:
- set:
target: url.params.pageCursor
value: "{{ last_response.body.pagingInfo.nextPageCursor }}"
fail_on_template_error: true
persistent_values:
timestamp:
value: "{{ vars.timestamp }}"
ignore_empty_values: true

Пояснения:

  • В блоке transforms в начале циклов запросов пытаемся установить фильтр "From", используя сохраненное ранее время. Для этого берем сохраненное количество миллисекунд Unix, добавляем единицу (чтобы исключить событие из предыдущего цикла), переводим в дату в часовом поясе UTC (Jira API не совсем адекватно с прочими часовыми поясами работает) и в формате ISO8601, как описано в документации.

  • В блоке after_response_transforms пытаемся извлечь время самого нового события в переменную. Для этого берем самое первое событие (поскольку они отсортированы от более нового к более старому) и сравниваем время с уже имеющимся (потому что мы можем быть не на первой странице и события на ней уже не самые актуальные). Если время события больше, либо ранее мы не сохраняли время - меняем.

  • В блоке pagination_transforms просто-напросто поставляем токен пагинации. Если мы на последней странице, то в ответе токена не будет и подстановка провалится. Чтобы в таком случае выйти из цикла запросов, добавляем fail_on_template_error: true.

  • В блоке persistent_values сохраняем на диск время самого последнего события, чтобы использовать его в следующем цикле запросов.

Чтобы обрабатывать события, можно использовать следующий плагин:

processors:
- module: split
type: json_array
path: entities
- module: json
patterns:
- expr: timestamp
to: JiraAuditTimestamp
- expr: author.name
to: JiraAuditAuthorName
- expr: type.action
to: JiraAuditAction

Здесь создаются события на основе элементов entities, а затем происходит их парсинг. Также можно добавить какой-нибудь фильтрующий плагин (if, deny), чтобы отсеивать события, относящиеся к доступу к самому модулю аудита. Эти события будут появляться каждый раз, когда происходит доступ к API аудита - т.е. сам сбор событий порождает новое событие.

Сбор issue в Gitlab

Для доступа API Gitlab, как и в Jira, необходимо выпустить персональный токен доступа. Для этого нажмите на свой аватар слева сверху, выберите пункт Edit profile. Затем выберите Access tokens на левой панели, нажмите Add new token, укажите название, срок годности и задайте права на чтение (я добавлял read_api, read_user и read_repository, но, возможно, хватит только read_api). После создания сохраните токен в файл.

Для составления конфигурации снова рассмотрим API для получения issue в выбранном проекте. За эту часть отвечает API /projects/:id/issues. Чтобы получить ID проекта, откройте его, нажмите на троеточие сверху справа (рядом с колокольчиком, кнопками Star и Fork) и скопируйте ID проекта. В данном примере мы будем использовать offset-based пагинацию, поэтому для сбора issue применим следующий алгоритм:

  1. Делаем запрос с возрастающей сортировкой по времени обновления.

  2. Получаем ответ, запоминаем время обновления последнего issue. Если мы на последней странице - переходим к шагу 4.

  3. Увеличиваем номер страницы на 1.

  4. Сбрасываем номер страницы, добавляем сохраненное время в качестве фильтра "Обновлено после" (updated_after). Выходим из цикла запросов.

Информация о страницах в API Gitlab расположена в HTTP-заголовках (X-Page для текущей страницы и X-Last-Page для последней страницы). С учетом этого, составим конфигурацию:

http-scraping:
targets:
- url: 'https://gitlab.etecs.ru/api/v4/projects/477/issues'
query:
page: 1
per_page: 25
order_by: updated_at
sort: asc
method: GET
bearer-token-file: "путь к сохраненному токену"
period: 2m
timeout: 30s
deduplication: body
pagination:
type: json
transforms:
- set:
target: url.params.updated_after
value: "{% if persistent.timestamp %}{{ persistent.timestamp | plus: 1 | unixMillisecondsToDate | toUTC | date: '%Y-%m-%dT%H:%M:%S.%LZ'}}{% endif %}"
pagination_transforms:
- set:
target: url.params.page
value: >
{%- unless last_response.header.X-Total-Pages >= last_response.header.X-Page -%}
{{- last_response.header.X-Next-Page -}}
{%- endunless -%}
fail_on_template_error: true
persistent_values:
timestamp:
value: "{% assign last_issue = last_response.body | last %}{{ last_issue.updated_at }}"
ignore_empty_values: true

Пояснения:

  • 477 в URL запроса - это ID проекта (в данном случае - KOMRAD).
  • В блоке transforms устанавливаем фильтр updated_after, используя сохраненное ранее время. Для этого берем сохраненное количество миллисекунд Unix, добавляем единицу (чтобы исключить событие из предыдущего цикла), переводим в дату в часовом поясе UTC и в формате ISO8601, как описано в документации.
  • В блоке pagination_transforms увеличиваем меняем номер страницы на следующую. Если мы на последней странице, не подставляем значение. Поскольку fail_on_template_error: true, произойдет выход из цикла запросов.
  • В блоке persistent_values сохраняем время обновления самого последнего issue из ответа - поскольку в ответе применена возрастающая сортировка по времени обновления, то это и самое большое время обновления из всех на странице.

Для обработки событий можно использовать плагин split:

processors:
- module: split
type: json_array

Т.к. ответ от Gitlab API содержит только массив с issue, можно не указывать путь к полю с массивом.