Как использовать Kafka в приложении Rails?

искусственный интеллект Kafka Rails Ruby

Автор | АГИС АНАСТАСОПУЛОС
компилировать | не знать
Редактор | Эмили
Введение

Какое-то время нашим системам требовались распределенные системы потоковой передачи и обмена сообщениями, и Apache Kafka казался нам надежной основой для создания критически важных бизнес-приложений. Его можно использовать во многих сценариях, таких как конвейеры обновления продуктов, отслеживание заказов, уведомления пользователей в режиме реального времени, выставление счетов продавцам и многое другое.

Далее следует рассказ о том, как мы внедрили Kafka в нашу монолитную кодовую базу Rails, о технических деталях, проблемах, с которыми мы столкнулись, и технических решениях, которые мы приняли на этом пути.

Для получения дополнительных галантерейных товаров, пожалуйста, обратите внимание на публичный аккаунт WeChat «AI Frontline» (ID: ai-front)

немедленный вызов

Первая проблема заключается в том, что Kafka обеспечивает лишь относительно низкоуровневую абстракцию. Хотя это имеет определенные преимущества, это также означает, что разработчикам клиентов приходится сталкиваться с большим количеством API и иметь дело с большим количеством деталей, что делает реализацию клиента Kafka сложной задачей.

Как проект на основе Ruby, мы пробовали различные клиенты Kafka, разработанные на Ruby, но всегда сталкивались с трудными для диагностики ошибками. В Ruby отсутствуют примитивы параллелизма, и написать эффективный клиент непросто.

Мы обходим эти проблемы несколькими способами: скрывая основную сложность с помощью автономных сервисов и предоставляя клиентам только минимальный набор API. Этот сервис может быть разработан на языках, отличных от Ruby, поэтому мы можем использовать проверенную librdkafka, которую мы использовали в других приложениях Python и Go.

Итак, мы разработали Rafka, прокси-сервис перед Kafka, и представили его с помощью простой семантики и API. Он обеспечивает разумную конфигурацию по умолчанию, скрывая от пользователя множество сложных деталей. Мы выбрали Go, потому что он уже имеет надежный клиент Kafka на основе librdkafka и предоставляет необходимые инструменты для реализации необходимых нам функций.

Чтобы не усложнять разработку клиента, мы решили использовать подмножество протокола Redis. Все, что нам нужно сделать, это добавить слой поверх клиента Ruby Redis.

Несколько дней спустя у нас был клиент, разработанный на Ruby, упакованный в виде драгоценного камня под названием rafka-rb, который содержал потребителей и производителей.

С Rafka и сопровождающим его клиентом Ruby наши сервисы и приложения Rails могут легко читать и писать в Kafka.

Большая часть времени наших разработчиков уходит на наше основное приложение Rails, поэтому стало важно иметь возможность легко использовать потребителей и производителей Kafka в приложении. Следующий шаг — позволить разработчикам Rails напрямую использовать потребителей и производителей Kafka.

Отправить данные в приложении Rails

Интеграция производителя в существующее приложение на самом деле довольно проста, потому что даже если вам нужно использовать несколько тем, вам нужен только один производитель.

Поэтому мы используем единственный экземпляр производителя и создаем его при инициализации приложения, который используется всей кодовой базой:

# config/initializers/kafka_producer.rb
Skroutz.kafka_producer = Rafka::Producer.new(...)

Отправить сообщение очень просто:

Skroutz.kafka_producer.produce("greetings", "Hello there!")
Чтение данных в приложении Rails

Использование потребителя немного отличается, потому что потребление сообщений занимает много времени. Далее мы увидим, как использовать потребителей Kafka с Rafka в кодовой базе Rails.

Ссылки на исходный код соответствующих компонентов приведены в конце статьи.

Потребители — это обычные объекты Ruby, классы которых определены в приложении Rails. Они наследуют абстрактный класс KafkaConsumer, который интегрирует statsd для статистики и Sentry для отслеживания ошибок и, возможно, другие вещи в будущем. Имена их классов имеют суффикс «Consumer», а соответствующие файлы названы в соответствии с соглашениями Rails.

Типичный потребитель выглядит так:

Здесь экземпляр Rafka::Consumer используется для каждого потребителя.

После написания нового потребителя необходимо включить его в файле конфигурации:

- name: "price_drops"
  scale: 2

По соглашению Rails имя потребителя происходит от имени класса.

Дело в том, что все экземпляры-потребители в основном являются независимыми потребителями Kafka и принадлежат к одной и той же группе потребителей.

Во время развертывания Capistrano считывает файл конфигурации и создает на сервере соответствующий экземпляр потребителя.

Это то, что нужно сделать для разработки и развертывания потребителей.

Возникает следующий вопрос: как использовать потребителей в качестве долговременных процессов?

длительный потребительский процесс

После реализации потребителей следующим шагом будет их запуск.

Мы используем класс под названием KafkaConsumerWorker, который упаковывает потребительские объекты и делает их длительными процессами. Упрощенная версия кода для этого класса приведена ниже:

KafkaConsumerWorker постоянно вызывает метод #process базового потребителя для обработки сообщений в цикле. Он также обеспечивает изящную функциональность выхода. Он также интегрирует потребителей с systemd для обеспечения надежности, проверки работоспособности, видимости и возможностей мониторинга.

Его также легко разрабатывать или отлаживать без прямого взаимодействия с KafkaConsumerWorker:

consumer = PriceDropsConsumer.new(...)
worker = KafkaConsumerWorker.new(consumer)

# start work loop
worker.work

Следующим шагом будет запуск KafkaConsumerWorker с помощью systemd. Мы использовали простой служебный файл systemd:

Каждый экземпляр потребителя идентифицируется с помощью строки, содержащей имя потребителя и номер экземпляра (например, price_drops: 1), передаваемых в systemd в качестве параметра шаблона (часть %i). Таким образом, мы можем создавать разные экземпляры потребителей, используя один и тот же файл службы.

Интеграция потребителя с systemd означает, что мы можем использовать множество функций, встроенных в потребителя:


  • Команды управления потребителями (запуск, остановка, перезагрузка, статус)

  • Оповещение потребителей при возникновении исключений

  • Видимость: статус каждого потребителя (работает, ожидает заданий, закрыт), его текущее смещение/тема/раздел (используя sd_notify(3))

  • Автоматический перезапуск отказавших потребителей

  • Автоматически перезапускать приостановленных потребителей с помощью сторожевого таймера systemd.

  • Упрощенное ведение журнала: мы просто регистрируемся в stdout/stderr, а systemd позаботится обо всем остальном.

Проверяя экземпляр потребителя, мы можем получить очень полезную информацию, которую можно использовать для отладки проблемы, когда что-то пойдет не так:

Последний вопрос, который необходимо решить, — какую команду systemd нужно вызвать, чтобы перезапустить потребителя. Эта команда на самом деле представляет собой обычную задачу rake, которая устанавливает потребителя в качестве рабочего и запускает его.

Как и в случае с другими компонентами, соответствующий код для этой задачи также размещен в кодовой базе Rails:

развертывать

Поскольку мы используем Capistrano для развертывания, мы добавили задачу Capistrano, отвечающую за остановку и запуск потребителя. Его упрощенная версия выглядит следующим образом:


kafkactl — это скрипт-оболочка, отвечающий за выполнение необходимых команд systemctl.

Когда кто-то развертывает приложение, Capistrano читает файл конфигурации YAML и создает потребителя:

После развертывания потребителя мы проверяем панель инструментов Grafana, чтобы убедиться, что все работает, и проверяем Slack, чтобы убедиться, что оповещения не срабатывают.

Общая структура

Наша инфраструктура интеграции Kafka/Rails состоит из следующих компонентов:


  • Rafka: служба брокера Kafka с простой семантикой и минимальным набором API.

  • rafka-rb: Ruby-клиент для Rafka

  • KafkaConsumer: абстрактный класс Ruby, конкретный класс реализации потребителя наследует этот класс.

  • KafkaConsumerWorker: класс Ruby для обработки потребителей как длительных процессов.

  • kafka:consumer: рейк-задачи, запускающие экземпляры-потребители

  • kafka_consumers.yml: файл конфигурации, который определяет, какие потребители должны работать в рабочей среде и сколько экземпляров использовать.

  • kafka-consumer@.service: создайте файл службы systemd потребителя, вызвав задачу rake.

Взаимодействие между ними показано на следующем рисунке:

Как видно из рисунка, эти компоненты распределены ортогонально, и каждый из них можно использовать отдельно от других, будь то для отладки, тестирования или прототипирования.

монитор

Поскольку многие потребители выполняют критически важные задачи, они должны надлежащим образом контролироваться.

Мониторинг происходит на различных уровнях, и каждый потребитель предоставляет следующие возможности:

  • Icinga предупреждает, когда потребители терпят неудачу (через systemd)

  • Генерировать событие Sentry при возникновении исключения


  • Статистика: время обработки задания и пропускная способность потребителя (обработанных сообщений в секунду).


  • Когда потребители отстают (через Burrow и Grafana)

Эти возможности в первую очередь связаны с использованием нами общей потребительской инфраструктуры.

перспективы на будущее

Нам очень нравится взаимодействовать с Kafka таким образом, и мы получили очень положительные отзывы.

Возможность разработки и развертывания потребителей за несколько простых шагов значительно повышает эффективность команды разработчиков, и мы можем последовательно и эффективно разрабатывать приложения на основе Kafka.

В будущем мы надеемся открыть исходный код всех компонентов, описанных в этой статье, чтобы другие организации могли извлечь из них пользу.

Наконец, мы планируем добавить больше функциональности в Rafka и инфраструктуру потребителя/производителя, в том числе:


  • Пакетная функция

  • многоцелевой потребитель

  • Примитивы на основе KSQL (агрегаты, объединения и т. д.)

  • Потребительский крючок (крючок)

Ссылка на исходный код компонента:


  • Рафка: https://github.com/skroutz/rafka

  • рафка-рб: https://github.com/skroutz/rafka-rb

  • Потребительская инфраструктура Rails: https://github.com/skroutz/rails-kafka-consumers


Для получения дополнительных галантерейных товаров, пожалуйста, обратите внимание на публичный аккаунт WeChat «AI Frontline» (ID: ai-front)