Работа с поврежденными архивами мониторинга
В системе Proceset сбор пользовательской активности выполняет агент мониторинга. Агент ежедневно отправляет на сервер архивы с данными. После приема сервер выполняет первичную валидацию архивов. Архивы, которые не проходят проверку (например, поврежденный manifest.json, отсутствующий пользователь, ошибки структуры), не попадают в ClickHouse и помещаются в специальную очередь corrupted.
Обработка архивов
После поступления на сервер каждому архиву присваивается порядковый номер. Очередь обработки не имеет приоритетов и разбирается строго в порядке поступления.
Разбор очереди выполняется пачками:
- Если с момента предыдущего запуска разбора накопилось более 20 необработанных архивов — обработка запускается сразу
- Если архивов меньше 20 — обработка запускается, когда с момента предыдущего запуска прошло более 5 секунд
- За один запуск обрабатывается не более 50 архивов. Если архивов накопилось больше, оставшиеся будут обработаны в следующих запусках
Для каждого архива выполняются последовательные проверки:
- Проверка валидности архива
- Проверка
manifest.json(существование источника активности) - Проверка структуры файлов (
activity.json,inspector.json)
Если все проверки пройдены, данные из архива записываются в таблицы активности базы main. Дополнительных проверок после успешной обработки не выполняется.
Если на любом этапе возникает ошибка, архив помещается в очередь corrupted и остается в ней. Такие архивы можно обработать повторно.
Хранение архивов
Очереди активности и поврежденных архивов хранятся сегментами по 100 000 архивов.
- Архив остается доступным для скачивания, пока сегмент не обработан полностью
- После обработки всех архивов сегмент удаляется из хранилища
- После удаления сегмента архивы из него недоступны
Это правило одинаково применяется как к очереди активности, так и к очереди corrupted.
Диагностика поврежденных архивов
Для работы с поврежденными архивами используется GraphQL API.
Получение статистики по очереди corrupted
Чтобы вывести количество архивов в очереди, выполните запрос:
{
monitoring_diagnostics {
corrupted_file_query {
corrupted_file_statistic {
all_size
node_id
}
}
}
}
Где:
all_size— общее количество архивов в очередиcorruptednode_id— идентификатор узла, на котором накоплены поврежденные файлы
Список поврежденных архивов с детализацией
Чтобы получить список поврежденных архивов, выполните запрос:
{
monitoring_diagnostics {
corrupted_file_query {
corrupted_file_column_families {
min_id
max_id
column_family_name
node_id
size
corrupted_file_data_list {
id
source_file_name
}
}
}
}
}
Где:
min_id,max_id— диапазон идентификаторов архивов в группеcolumn_family_name— название группыnode_id— идентификатор сервераsize— количество архивов в группеcorrupted_file_data_list— список файлов:id— уникальный ID архиваsource_file_name— исходное имя файла (например,archive_20251114_012345.zip)
Общая статистика очереди обработки архивов
Чтобы отличить проблемы с corrupted от общей нагрузки на систему, выполните запрос:
{
monitoring_diagnostics {
agent_file_query {
agent_file_queue_statistic {
all_queue_size
wait_processing_size
processed_size
}
}
}
}
Где:
all_queue_size— всего архивов в очередиwait_processing_size— архивов, ожидающих обработкиprocessed_size— успешно обработанных и загруженных в ClickHouse
Выгрузка архива
Для выгрузки архива из очереди corrupted необходимы следующие идентификаторы, полученные из запроса на получение списка поврежденных архивов:
idархиваnode_idсервера
Выполните запрос:
{
monitoring_diagnostics {
corrupted_file_query {
corrupted_file_by_id(id: $id, runtimeNodeId: "$node_id")
}
}
}
Также можно скачать архив по имени:
{
monitoring_diagnostics {
corrupted_file_query {
corrupted_file_by_name(
name: "archive_name.zip"
runtimeNodeId: "$node_id"
)
}
}
}
После выполнения запроса:
- Удалите букву
iв адресной строке браузера изgraphiql. - Нажмите Enter.
- Начнется автоматическая загрузка архива в браузере. Файл загрузится с именем
corrupted_<id>.zip.
Работа с содержимым поврежденного архива
Поврежденный архив можно скачать и распаковать локально.
Возможные причины попадания в corrupted:
- На сервере отсутствует пользователь, для которого получен архив
- Архив невалиден, например, в нем отсутствует
manifest.json - В структуре файлов допущена ошибка (например, некорректный JSON)
Причина фиксируется в main.log.
Возможные действия:
- Если пользователь отсутствует на сервере, восстановите его (например, переустановите агент или перезапустите службу агента) и выполните повторную обработку
- Если ошибка в структуре — скачайте архив, исправьте файл вручную и загрузите архив обратно на сервер
Повторная обработка поврежденных архивов
Если архивы попали в corrupted ошибочно, запустите повторную обработку.
Перед выполнением запроса убедитесь, что в очереди corrupted находится не менее 100 000 архивов.
Выполните запрос:
mutation {
computer_activity {
reload_corrupted_activities(is_delete_on_fail: true)
}
}
Где у параметра is_delete_on_fail может быть установлено два значения:
true— если архив не удается разобрать, то он безвозвратно удаляетсяfalse— если архив не удается разобрать, то он возвращается в очередьcorrupted
Если повторная обработка успешна, данные попадут в таблицы активности.
Была ли статья полезна?