Тройка из Storm, Spark Streaming и потоковой обработки Flink имеет свои преимущества.
Storm имеет низкую задержку и имеет определенную позицию на рынке, и многие компании до сих пор используют его.
Spark Streaming использует системные преимущества Spark и активное сообщество, а также занимает определенную долю.
Flink же ближе к потоковой обработке по дизайну и имеет удобный API, который обязательно будет развиваться в будущем.
Но все они неотделимы от передачи сообщений Kafka, поэтому Kafka запустила собственный фреймворк обработки потоков Kafka Streams в версии 0.10.0.0. Позиционирование Kafka также официально стало Apache Kafka® — это *платформа распределенной потоковой передачи, *платформа распределенной потоковой обработки.
потоковая передача в реальном времени
Потоковые вычисления в реальном времени быстро развивались в последние годы, в основном из-за ценности данных в реальном времени и их влияния на архитектуру обработки данных. Потоковые вычисления в реальном времени включаютНеограниченные данные. Согласованность, близкая к реальному времени. Повторяющиеся результаты.и так далее.a type of data processing engine that is designed with infinite data sets in mindМеханизм обработки данных, учитывающий беспроводные наборы данных.
1. Неограниченные данные: постоянно растущий, практически неограниченный набор данных. Их часто называют «потоковыми данными». Бесконечные потоковые наборы данных можно назвать неограниченными данными, а относительно ограниченные пакетные данные — ограниченными данными.
2. Неограниченная обработка данных: режим непрерывной обработки данных, применяемый к вышеуказанным неограниченным данным. Пакетная обработка данных (вычисления в автономном режиме) также может выполняться многократно для обработки данных, но будут узкие места в производительности.
3. Результаты с малой задержкой и практически в реальном времени: по сравнению с автономными вычислениями, автономные вычисления не учитывают проблему задержки.
Решая две проблемы, потоковая обработка может заменить пакетные системы:
1. Корректность: при этом это эквивалентно пакетным вычислениям.
Для потоковой передачи требуются данные, которые все еще могут быть рассчитаны для определенного временного окна с течением времени. Spark Streaming решает эту проблему с помощью идеи микропакетов и сохраняет согласованные системы реального времени и автономные системы, которые должны быть удовлетворены в будущих вычислительных системах реального времени.
2. Инструменты для логического вывода: это позволяет нам выйти за рамки пакетных вычислений.
Хорошие инструменты временных рассуждений необходимы для работы с неограниченными и неупорядоченными данными о различных событиях.
Время делится на время события и время обработки.
Есть также много связанных концепций потоковых вычислений в реальном времени, которые не будут повторяться здесь.
Введение в потоки Kafka
Kafka Streams считается самым простым способом разработки приложений реального времени. Это клиентская API-библиотека Kafka, которая может реализовать потоковую обработку, написав простой код java и scala.
Преимущество:
-
Эластичный, масштабируемый, отказоустойчивый
-
Развертывание в контейнерах, виртуальных машинах, на «голом железе», в облаке
-
То же самое для малых, средних и больших вариантов использования
-
Полностью интегрирован с системой безопасности Kafka.
-
Написание стандартных приложений Java и Scala
-
Разработка на Mac, Linux, Windows
-
Семантика ровно один раз
Пример:
The New York Times использует Apache Kafkaи Kafka Streams для хранения и распространения опубликованного контента в режиме реального времени в различных приложениях и системах для чтения читателями.
PinterestмассивныйИспользование Apache Kafka и Kafka Streamsдля поддержки системы интеллектуального бюджетирования в режиме реального времени для своей рекламной инфраструктуры. С Kafka Streams прогнозы точнее, чем когда-либо.
Являясь ведущим европейским интернет-магазином модной одежды, Zalando использует Kafka в качестве ESB (Enterprise Service Bus), помогая нам перейти от монолитной к микросервисной архитектуре. Обработка с Кафкойпоток событийПозволяет нашей технической команде получать бизнес-аналитику практически в режиме реального времени.
Rabobank входит в тройку крупнейших банков Нидерландов. Его цифровая нервная система Business Event Bus работает на базе Apache Kafka. Он используется растущим числом финансовых процессов и сервисов, одним из которых является Rabo Alerts. Эта услуга оповещает клиентов в режиме реального времени о финансовых событиях иПостроен с помощью Kafka Streams.
LINE использует Apache KafkaВ качестве центральной базы данных для наших услуг, чтобы общаться друг с другом. Каждый день генерируются сотни миллиардов сообщений для выполнения различной бизнес-логики, обнаружения угроз, индексации поиска и анализа данных. LINE использует Kafka Streams для надежного преобразования и фильтрации тем, позволяя потребителям эффективно использовать подтемы, сохраняя при этом простоту обслуживания благодаря своей сложной и простой кодовой базе.
Topology
Kafka Streams через один или несколькотопологияОпределите его вычислительную логику, где топология — это граф, образованный потоками (ребрами) и потоковыми процессорами (узлами).
В топологии есть два особых типа процессоров.
- исходный процессор: Исходный процессор — это особый тип потокового процессора без каких-либо вышестоящих процессоров. Он генерирует входные потоки для своей топологии из одной или нескольких тем Kafka, потребляя записи из этих тем и пересылая их нижестоящим процессорам.
- процессор приемника: Процессор приемника — это особый тип потокового процессора без последующих процессоров. Он отправляет любые записи, которые он получает от вышестоящих процессоров, в указанную тему Kafka.
В обычных процессорных узлах данные также могут отправляться в удаленные системы. Таким образом, обработанные результаты могут быть переданы обратно в Kafka или записаны во внешнюю систему.
Kafka предоставляет наиболее часто используемые операции преобразования данных среди них, такие какmap
,filter
,join
иaggregations
и т.д. Простой и удобный в использовании.
Конечно, есть еще кое-что о времени, работе с окнами, агрегации, неупорядоченной обработке и т. д. В дальнейшем мы будем подробно знакомить их один за другим, а затем разработаем простой вводный кейс.
Быстрый старт
Сначала предоставьте версии WordCount для Java и scala.
Java8+:
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;
import java.util.Arrays;
import java.util.Properties;
public class WordCountApplication {
public static void main(final String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}
скала:
import java.util.Properties
import java.util.concurrent.TimeUnit
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.scala.ImplicitConversions._
import org.apache.kafka.streams.scala._
import org.apache.kafka.streams.scala.kstream._
import org.apache.kafka.streams.{KafkaStreams, StreamsConfig}
object WordCountApplication extends App {
import Serdes._
val props: Properties = {
val p = new Properties()
p.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application")
p.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092")
p
}
val builder: StreamsBuilder = new StreamsBuilder
val textLines: KStream[String, String] = builder.stream[String, String]("TextLinesTopic")
val wordCounts: KTable[String, Long] = textLines
.flatMapValues(textLine => textLine.toLowerCase.split("\\W+"))
.groupBy((_, word) => word)
.count()(Materialized.as("counts-store"))
wordCounts.toStream.to("WordsWithCountsTopic")
val streams: KafkaStreams = new KafkaStreams(builder.build(), props)
streams.start()
sys.ShutdownHookThread {
streams.close(10, TimeUnit.SECONDS)
}
}
Если kafka уже запущена, вы можете пропустить первые два шага.
1. Скачать
скачать2.3.0 и разархивируйте его. Обратите внимание, что существует несколько загружаемых версий Scala, мы выбрали рекомендуемую версию (2.12):
> tar -xzf kafka_2.12-2.3.0.tgz
> cd kafka_2.12-2.3.0
2. Старт
Кафка используетРаботник зоопарка,Поэтому, если у вас еще нет сервера ZooKeeper, вам нужно сначала его запустить.
> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
Запустите сервер Кафки:
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...
3. Создайте тему для запуска производителя
Мы создаемstreams-plaintext-inputвходной субъект и названныйstreams-wordcount-outputВыходная тема:
> bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic streams-plaintext-input
Created topic "streams-plaintext-input".
> bin/kafka-topics.sh --create \
--bootstrap-server localhost:9092 \
--replication-factor 1 \
--partitions 1 \
--topic streams-wordcount-output \
--config cleanup.policy=compact
Created topic "streams-wordcount-output".
Проверять:
> bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
Topic:streams-plaintext-input PartitionCount:1 ReplicationFactor:1 Configs:
Topic: streams-plaintext-input Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic:streams-wordcount-output PartitionCount:1 ReplicationFactor:1 Configs:cleanup.policy=compact
Topic: streams-wordcount-output Partition: 0 Leader: 0 Replicas: 0 Isr: 0
4. Запустите WordCount
Следующая команда запускает демонстрационное приложение WordCount:
> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
Демонстрационное приложение запустится с темы вводаstream-plaintext-inputЧтение, выполнение вычисления алгоритма WordCount для каждого прочитанного сообщения и непрерывная запись его текущего результата в выходную темуstreams-wordcount-output. Следовательно, не будет никаких выходных данных STDOUT, кроме записей журнала, поскольку результаты записываются обратно в Kafka.
Теперь мы можем запустить генератор консоли в отдельном терминале и написать некоторые входные данные для этой темы:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
И проверьте вывод демо-приложения WordCount, прочитав его тему вывода с потребителем консоли в отдельном терминале:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
5. Обработка данных
Вводим некоторые данные на стороне производителя.
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
Вывод:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
all 1
streams 1
lead 1
to 1
kafka 1
Продолжайте вводить:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
all streams lead to kafka
hello kafka streams
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
Мы видим, что поскольку данные вводятся в реальном времени, результаты подсчета слов выводятся в реальном времени.
6. Остановить программу
Теперь вы можете пройтиCtrl-C нажатьПоследовательно остановите потребителя консоли, производителя консоли, приложение Wordcount, брокер Kafka и сервер ZooKeeper.
Что такое Кафка? Обзор инструментов мониторинга Kafka Кафка Быстрый старт Потребитель в основе Kafka Продюсер в основе Kafka
Альтернатива Flume — введение в Kafka Connect
Больше вычислений в реальном времени, Flink, Kafka и других связанных технических сообщений в блогах, добро пожаловать, чтобы обратить внимание на потоковые вычисления в реальном времени.