Анализ исходного кода Kafka: запущенный процесс серверной части

Kafka

​​​​​​Аннотация: Серверная часть сетевого модуля Kafka представляет процесс запуска, получения и обработки запросов на стороне сервера.

Эта статья опубликована в сообществе HUAWEI CLOUD.«Сетевой модуль Kafka — серверная часть», оригинальный автор: брат промежуточного программного обеспечения.

SocketServer — это модуль, используемый сервером Kafka для обработки запросов, который создается, инициализируется и запускается в процессе запуска Kafka.

Процесс запуска SocketServer:

  • Инициализируйте акцепторы в порядке конечных точек, каждая конечная точка соответствует акцептору, создайте процессор для каждого акцептора (количество определяется элементом конфигурации num.network.threads) и запустите акцептор. контролировать соединение через селектор, а вновь установленное соединение передается на процессор (опрос на выбор процессора)

  • запустить все процессоры

  • Acceptor запускается и прослушивает процесс подключения:

  • После того, как Acceptor запустится, он создаст serverSocketChannel, прослушает конечную точку, соответствующую акцептору, и зарегистрирует OP_ACCEPT на селекторе, а затем войдет в бесконечный цикл, в каждом цикле ключ готовности (то есть ранее зарегистрированный serverSocketChannel) получается через селектор, указывая, что есть. Когда соединение поступает, создайте socketChannel, соответствующий соединению через accept(), затем опросите, чтобы выбрать один из процессоров, ответственных за акцептор, и передайте socketChannel выбранному процессору для обработки , то есть передать подключение к процессору.

  • Акцептор передает соединение процессору для обработки, которая заключается в добавлении socketChannel в очередь соединений процессора newConnection, и процессор будет постоянно получать и обрабатывать его в методе запуска.

  • После того, как процессор получает socketChannel из newConnection, он регистрирует OP_READ в селекторе и создает соответствующий KafkaChannel.

Процесс приема и обработки запросов на стороне сервера:

  • После того, как Процессор получит событие OP_READ и будет готов, проверьте и попытайтесь выполнить рукопожатие SSL и проверку SASL (рукопожатие может быть не завершено в это время, поэтому после того, как Процессор получит событие OP_READ и будет готов, он должен сначала проверить и убедитесь, что рукопожатие было завершено, ссылка, связанная с SSL/SASL (раздел 9.4)

  • После завершения подтверждения SSL и проверки SASL данные считываются из канала, создается объект NetworkReceive и очередь обрабатывается.

  • Выньте первый элемент команды stagedReceives (удалить) и добавьте completeReceives

  • Извлеките (не удаляйте) элементы в completeReceives, создайте объект Request, добавьте requestQueue, удалите регистрацию события для OP_READ, установите для соответствующего KafkaChannel значение MUTED, а затем установите для него значение MUTED_AND_RESPONSE_PENDING.

  • KafkaRequestHandler берет элементы (удаляет) из requestQueue и передает их модулю KafkaApi для обработки запроса.

  • После того, как KafkaApi обработает запрос, он помещает ответ в responseQueue и inflightResponses соответствующего процессора и пробуждает свой селектор.

  • Процессор берет ответ (удаляет) из responseQueue.Если ответ нужно отправить обратно клиенту, он назначает отправку ответа KafkaChannel и регистрирует событие OP_WRITE

  • Когда канал готов к записи, запишите посылку в буфер записи канала, когда отправка завершится, удалите регистрацию для события OP_WRITE и добавьте отправку в завершенные отправки.

  • Удалите соответствующий ответ из inflightResponses, выполните обратный вызов ответа, установите для KafkaChannel значение MUTED, затем с MUTED на NOT_MUTED и повторно добавьте регистрацию события OP_READ.

Нажмите «Подписаться», чтобы впервые узнать о новых технологиях HUAWEI CLOUD~