Простейший движок потоковой обработки — введение в Kafka Streams

Kafka

file
До версии 0.10.0.0 Kafka позиционировалась как распределенная, разделенная служба отправки журналов с механизмом резервного копирования. А Kafka раньше не предоставляла услуги по обработке данных. Все расчеты потоковой обработки в основном основаны на платформах потоковой обработки, таких как Storm, Spark Streaming и Flink.

file

Тройка из Storm, Spark Streaming и потоковой обработки Flink имеет свои преимущества.

Storm имеет низкую задержку и имеет определенную позицию на рынке, и многие компании до сих пор используют его.

Spark Streaming использует системные преимущества Spark и активное сообщество, а также занимает определенную долю.

Flink же ближе к потоковой обработке по дизайну и имеет удобный API, который обязательно будет развиваться в будущем.

file

Но все они неотделимы от передачи сообщений Kafka, поэтому Kafka запустила собственный фреймворк обработки потоков Kafka Streams в версии 0.10.0.0. Позиционирование Kafka также официально стало Apache Kafka® — это *платформа распределенной потоковой передачи, *платформа распределенной потоковой обработки.

потоковая передача в реальном времени

Потоковые вычисления в реальном времени быстро развивались в последние годы, в основном из-за ценности данных в реальном времени и их влияния на архитектуру обработки данных. Потоковые вычисления в реальном времени включаютНеограниченные данные. Согласованность, близкая к реальному времени. Повторяющиеся результаты.и так далее.a type of data processing engine that is designed with infinite data sets in mindМеханизм обработки данных, учитывающий беспроводные наборы данных.

1. Неограниченные данные: постоянно растущий, практически неограниченный набор данных. Их часто называют «потоковыми данными». Бесконечные потоковые наборы данных можно назвать неограниченными данными, а относительно ограниченные пакетные данные — ограниченными данными.

2. Неограниченная обработка данных: режим непрерывной обработки данных, применяемый к вышеуказанным неограниченным данным. Пакетная обработка данных (вычисления в автономном режиме) также может выполняться многократно для обработки данных, но будут узкие места в производительности.

3. Результаты с малой задержкой и практически в реальном времени: по сравнению с автономными вычислениями, автономные вычисления не учитывают проблему задержки.

Решая две проблемы, потоковая обработка может заменить пакетные системы:

1. Корректность: при этом это эквивалентно пакетным вычислениям.

Для потоковой передачи требуются данные, которые все еще могут быть рассчитаны для определенного временного окна с течением времени. Spark Streaming решает эту проблему с помощью идеи микропакетов и сохраняет согласованные системы реального времени и автономные системы, которые должны быть удовлетворены в будущих вычислительных системах реального времени.

2. Инструменты для логического вывода: это позволяет нам выйти за рамки пакетных вычислений.

Хорошие инструменты временных рассуждений необходимы для работы с неограниченными и неупорядоченными данными о различных событиях.

Время делится на время события и время обработки.

Есть также много связанных концепций потоковых вычислений в реальном времени, которые не будут повторяться здесь.

Введение в потоки Kafka

Kafka Streams считается самым простым способом разработки приложений реального времени. Это клиентская API-библиотека Kafka, которая может реализовать потоковую обработку, написав простой код java и scala.

Преимущество:

  • Эластичный, масштабируемый, отказоустойчивый

  • Развертывание в контейнерах, виртуальных машинах, на «голом железе», в облаке

  • То же самое для малых, средних и больших вариантов использования

  • Полностью интегрирован с системой безопасности Kafka.

  • Написание стандартных приложений Java и Scala

  • Разработка на Mac, Linux, Windows

  • Семантика ровно один раз

Пример:

The New York Times использует Apache Kafkaи Kafka Streams для хранения и распространения опубликованного контента в режиме реального времени в различных приложениях и системах для чтения читателями.

PinterestмассивныйИспользование Apache Kafka и Kafka Streamsдля поддержки системы интеллектуального бюджетирования в режиме реального времени для своей рекламной инфраструктуры. С Kafka Streams прогнозы точнее, чем когда-либо.

Являясь ведущим европейским интернет-магазином модной одежды, Zalando использует Kafka в качестве ESB (Enterprise Service Bus), помогая нам перейти от монолитной к микросервисной архитектуре. Обработка с Кафкойпоток событийПозволяет нашей технической команде получать бизнес-аналитику практически в режиме реального времени.

Rabobank входит в тройку крупнейших банков Нидерландов. Его цифровая нервная система Business Event Bus работает на базе Apache Kafka. Он используется растущим числом финансовых процессов и сервисов, одним из которых является Rabo Alerts. Эта услуга оповещает клиентов в режиме реального времени о финансовых событиях иПостроен с помощью Kafka Streams.

LINE использует Apache KafkaВ качестве центральной базы данных для наших услуг, чтобы общаться друг с другом. Каждый день генерируются сотни миллиардов сообщений для выполнения различной бизнес-логики, обнаружения угроз, индексации поиска и анализа данных. LINE использует Kafka Streams для надежного преобразования и фильтрации тем, позволяя потребителям эффективно использовать подтемы, сохраняя при этом простоту обслуживания благодаря своей сложной и простой кодовой базе.

Topology

Kafka Streams через один или несколькотопологияОпределите его вычислительную логику, где топология — это граф, образованный потоками (ребрами) и потоковыми процессорами (узлами).

file

В топологии есть два особых типа процессоров.

  • исходный процессор: Исходный процессор — это особый тип потокового процессора без каких-либо вышестоящих процессоров. Он генерирует входные потоки для своей топологии из одной или нескольких тем Kafka, потребляя записи из этих тем и пересылая их нижестоящим процессорам.
  • процессор приемника: Процессор приемника — это особый тип потокового процессора без последующих процессоров. Он отправляет любые записи, которые он получает от вышестоящих процессоров, в указанную тему Kafka.

В обычных процессорных узлах данные также могут отправляться в удаленные системы. Таким образом, обработанные результаты могут быть переданы обратно в Kafka или записаны во внешнюю систему.

Kafka предоставляет наиболее часто используемые операции преобразования данных среди них, такие какmap,filter,joinиaggregationsи т.д. Простой и удобный в использовании.

Конечно, есть еще кое-что о времени, работе с окнами, агрегации, неупорядоченной обработке и т. д. В дальнейшем мы будем подробно знакомить их один за другим, а затем разработаем простой вводный кейс.

Быстрый старт

Сначала предоставьте версии WordCount для Java и scala.

Java8+:

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
 
import java.util.Arrays;
import java.util.Properties;
 
public class WordCountApplication {
 
    public static void main(final String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, String> textLines = builder.stream("TextLinesTopic");
        KTable<String, Long> wordCounts = textLines
            .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
            .groupBy((key, word) -> word)
            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
        wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
 
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }
 
}

скала:

import java.util.Properties
import java.util.concurrent.TimeUnit
 
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
 
object WordCountApplication extends App {
  import Serdes._
 
  val props: Properties = {
    val p = new Properties()
    p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
    p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
    p
  }
 
  val builder: StreamsBuilder = new StreamsBuilder
  val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
  val wordCounts: KTable[String, Long] = textLines
    .flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
    .groupBy((_, word) => word)
    .count()(Materialized.as("counts-store"))
  wordCounts.toStream.to("WordsWithCountsTopic")
 
  val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
  streams.start()
 
  sys.ShutdownHookThread {
     streams.close(10, TimeUnit.SECONDS)
  }
}

Если kafka уже запущена, вы можете пропустить первые два шага.

1. Скачать

скачать2.3.0 и разархивируйте его. Обратите внимание, что существует несколько загружаемых версий Scala, мы выбрали рекомендуемую версию (2.12):

> tar -xzf kafka_2.12-2.3.0.tgz
> cd kafka_2.12-2.3.0

2. Старт

Кафка используетРаботник зоопарка,Поэтому, если у вас еще нет сервера ZooKeeper, вам нужно сначала его запустить.

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...

Запустите сервер Кафки:

> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

3. Создайте тему для запуска производителя

Мы создаемstreams-plaintext-inputвходной субъект и названныйstreams-wordcount-outputВыходная тема:

> bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-plaintext-input
Created topic "streams-plaintext-input".


> bin/kafka-topics.sh --create \
    --bootstrap-server localhost:9092 \
    --replication-factor 1 \
    --partitions 1 \
    --topic streams-wordcount-output \
    --config cleanup.policy=compact
Created topic "streams-wordcount-output".

Проверять:

> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
 
Topic:streams-plaintext-input   PartitionCount:1    ReplicationFactor:1 Configs:
    Topic: streams-plaintext-input  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
Topic:streams-wordcount-output  PartitionCount:1    ReplicationFactor:1 Configs:cleanup.policy=compact
    Topic: streams-wordcount-output Partition: 0    Leader: 0   Replicas: 0 Isr: 0

4. Запустите WordCount

Следующая команда запускает демонстрационное приложение WordCount:

> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo

Демонстрационное приложение запустится с темы вводаstream-plaintext-inputЧтение, выполнение вычисления алгоритма WordCount для каждого прочитанного сообщения и непрерывная запись его текущего результата в выходную темуstreams-wordcount-output. Следовательно, не будет никаких выходных данных STDOUT, кроме записей журнала, поскольку результаты записываются обратно в Kafka.

Теперь мы можем запустить генератор консоли в отдельном терминале и написать некоторые входные данные для этой темы:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input

И проверьте вывод демо-приложения WordCount, прочитав его тему вывода с потребителем консоли в отдельном терминале:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

5. Обработка данных

Вводим некоторые данные на стороне производителя.

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka

Вывод:

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
 
all     1
streams 1
lead    1
to      1
kafka   1

Продолжайте вводить:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
hello kafka streams
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic streams-wordcount-output \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
 
all     1
streams 1
lead    1
to      1
kafka   1
hello   1
kafka   2
streams 2

Мы видим, что поскольку данные вводятся в реальном времени, результаты подсчета слов выводятся в реальном времени.

6. Остановить программу

Теперь вы можете пройтиCtrl-C нажатьПоследовательно остановите потребителя консоли, производителя консоли, приложение Wordcount, брокер Kafka и сервер ZooKeeper.

Что такое Кафка? Обзор инструментов мониторинга Kafka Кафка Быстрый старт Потребитель в основе Kafka Продюсер в основе Kafka

Альтернатива Flume — введение в Kafka Connect

Больше вычислений в реальном времени, Flink, Kafka и других связанных технических сообщений в блогах, добро пожаловать, чтобы обратить внимание на потоковые вычисления в реальном времени.

file