Шина данных на базе Apache Kafka, состоящая из отдельных компонентов-микросервисов
- Данные поступают во входной топик
Kafka, обработанные данные считываются из выходного топикаKafka - Данные обрабатываются 3-мя сервисами: фильтрация (
Filtering), дедубликация (Deduplication), обогащение (Enrichment) - Правила для сервисов поступают из соответствующих таблиц базы данных
PostgreSQL, правила вычитываются раз в заданный интервал (конфигурация:updateIntervalSec) - Правила конфигурируются с помощью сервиса-менеджера (
Management) черезSwagger UI(либо же через эндпоинты напрямую) - Сервис дедубликации использует
Redisдля хранения состояния дублей - Сервис обогащения использует
MongoDBдля хранения обогащающих данных - Все сервисы конфигурируются через переменные среды (возможна конфигурация через
application.confнапрямую)
Description:
Фильтрует данные по заданным правилам
Rules:
filter_id: id фильтраrule_id: id правилаfield_name: json-поле сообщения, по которому выполняется фильтрацияfilter_function_name: название функции фильтрации (equals, contains, not_equals, not_contains)filter_value: значение для сравнения
Configuration:
DB_URL: JDBC-URL базы данных с правилами фильтрацииDB_USER: имя пользователя базы данныхDB_PASSWORD: пароль базы данныхKAFKA_CONSUMER_BOOTSTRAP_SERVERS: URL для подключения к KafkaKAFKA_CONSUMER_CLIENT_ID: id клиента-консьюмера KafkaKAFKA_CONSUMER_GROUP_ID: id группы консьюмеров KafkaKAFKA_CONSUMER_AUTO_OFFSET_RESET: сдвиг для чтения из топика KafkaKAFKA_CONSUMER_TOPIC: входной топик Kafka для фильтрацииKAFKA_PRODUCER_BOOTSTRAP_SERVERS: URL для подключения к KafkaKAFKA_PRODUCER_CLIENT_ID: id клиента-продюсера KafkaKAFKA_PRODUCER_ACKS: фактор подтверждения отправки от брокеров KafkaKAFKA_PRODUCER_TOPIC: выходной топик Kafka после фильтрацииUPDATE_INTERVAL_SEC: интервал чтения правил фильтрации из базы данных
Description:
Очищает данные от дубликатов, используя для хранения ключей дедубликации Redis (несколько правил фильтрации объединяются в один ключ дедубликации)
Rules:
deduplication_id: id сервиса дедубликацииrule_id: id правилаfield_name: json-поле сообщения, по которому выполняется дедубликацияtime_to_live_sec: время жизни ключа в Redisis_active: вкл/выкл правила
Configuration:
DB_URL: JDBC-URL базы данных с правилами дедубликацииDB_USER: имя пользователя базы данныхDB_PASSWORD: пароль базы данныхKAFKA_CONSUMER_BOOTSTRAP_SERVERS: URL для подключения к KafkaKAFKA_CONSUMER_CLIENT_ID: id клиента-консьюмера KafkaKAFKA_CONSUMER_GROUP_ID: id группы консьюмеров KafkaKAFKA_CONSUMER_AUTO_OFFSET_RESET: сдвиг для чтения из топика KafkaKAFKA_CONSUMER_TOPIC: входной топик Kafka для дедубликацииKAFKA_PRODUCER_BOOTSTRAP_SERVERS: URL для подключения к KafkaKAFKA_PRODUCER_CLIENT_ID: id клиента-продюсера KafkaKAFKA_PRODUCER_ACKS: фактор подтверждения отправки от брокеров KafkaKAFKA_PRODUCER_TOPIC: выходной топик Kafka после дедубликацииREDIS_HOST: хост для подключения к RedisREDIS_PORT: порт для подключения к RedisUPDATE_INTERVAL_SEC: интервал чтения правил дедубликации из базы данных
Description:
Обогащает данные дополнительной информацией, использует для обогащения документы из MongoDB. Несколько правил обогащения объединяются и применяются для одного сообщения. Если два правила обогащают одно и то же поле разными документами, то актуальным правилом является то правило, чей rule_id больше. Если одному правилу соответствует несколько документов, то актуальным является тот документ, чей _id больше (максимальный из всех). Если по актуальному правилу документа в MongoDB нет, то поле обогащается значением по умолчанию из правила. Если правил нет, сообщение не обогащается и отправляется в том виде, в котором есть в выходной топик.
Rules:
enricher_id: id обогатителяrule_id: id правилаfield_name: json-поле сообщения, которое нужно обогатитьfield_name_enrichment: название поля в коллекции MongoDB для обогащенияfield_value: поле сообщения, из которого берется значение поля field_name_enrichment, по которому нужно найти документ в коллекции MongoDBfield_default_value: значение по умолчанию, если значение для обогащения не найдено в MongoDB
Configuration:
DB_URL: JDBC-URL базы данных с правилами обогащенияDB_USER: имя пользователя базы данныхDB_PASSWORD: пароль базы данныхKAFKA_CONSUMER_BOOTSTRAP_SERVERS: URL для подключения к KafkaKAFKA_CONSUMER_CLIENT_ID: id клиента-консьюмера KafkaKAFKA_CONSUMER_GROUP_ID: id группы консьюмеров KafkaKAFKA_CONSUMER_AUTO_OFFSET_RESET: сдвиг для чтения из топика KafkaKAFKA_CONSUMER_TOPIC: входной топик Kafka для обогащенияKAFKA_PRODUCER_BOOTSTRAP_SERVERS: URL для подключения к KafkaKAFKA_PRODUCER_CLIENT_ID: id клиента-продюсера KafkaKAFKA_PRODUCER_ACKS: фактор подтверждения отправки от брокеров KafkaKAFKA_PRODUCER_TOPIC: выходной топик Kafka после обогащенияMONGO_CONNECTION_STRING: строка подключения к MongoDBMONGO_DATABASE: название базы данных в MongoDBMONGO_COLLECTION: название коллекции с данными обогащенияENRICHMENT_ID: id сервиса-обогатителяUPDATE_INTERVAL_SEC: интервал чтения правил обогащения из базы данных
Description:
Позволяет конфигурировать правила фильтрации, дедубликации, обогащения. Валидирует вводимые правила. Имеет несколько метрик.
Endpoints:
/filter:/findAll: получить информацию о всех фильтрах в БД/findAll/{id}: получить информацию о всех фильтрах в БД по filter_id/find/{filterId}/{ruleId}: получить информацию о фильтре по filter_id и rule_id/delete: удалить информацию о всех фильтрах/delete/{filterId}/{ruleId}: удалить информацию по конкретному фильтру filter_id и rule_id/save: создать фильтр
/deduplication:/findAll: получить информацию о всех правилах дедубликации в БД/findAll/{id}: получить информацию о всех правилах дедубликации в БД по deduplication_id/find/{filterId}/{ruleId}: получить информацию о правиле дедубликации по deduplication_id и rule_id/delete: удалить информацию о всех правилах дедубликации/delete/{filterId}/{ruleId}: удалить информацию по конкретному правилу дедубликации с deduplication_id и rule_id/save: создать правило дедубликации
/enrichment:/findAll: получить информацию о всех правилах обогащения в БД/findAll/{id}: получить информацию о всех правилах обогащения в БД по enrichment_id/find/{filterId}/{ruleId}: получить информацию о правиле обогащения по enrichment_id и rule_id/delete: удалить информацию о всех правилах обогащения/delete/{filterId}/{ruleId}: удалить информацию по конкретному правилу обогащения с enrichment_id и rule_id/save: создать правило обогащения
Metrics:
countFilters: количество правил фильтрацииcountDeduplications: количество правил дедубликацииcountEnrichments: количество правил обогащения (URL по умолчанию: /actuator/metrics)
Configuration:
DB_URL: JDBC-URL базы данных с правиламиDB_USER: имя пользователя базы данныхDB_PASSWORD: пароль базы данных
