Эта статья приняла участие в мероприятии «Церемония создания новичков», чтобы вместе начать путь создания золота.
Сегодня я расскажу вам о проблеме двухпотокового соединения Flink. Это точка высокочастотного интервью, а также реальная сцена, часто встречающаяся в работе.
Как обеспечить двухпотоковое присоединение Flink准确性
и及时性
,Кроме窗口join
Какие еще способы реализации существуют и как ответить, чтобы полностью произвести впечатление на интервьюера. . Ответ вы найдете в этой статье.
1 грунтовка
1.1 ПРИСОЕДИНЯЙТЕСЬ к базе данных SQL
Давайте сначала рассмотрим операцию JOIN в базе данных SQL. SQL-запрос заказа, показанный ниже, путем изменения таблицы заказовid
и лист сведений о заказеorder_id
Ссылка для получения информации о продукте под всеми заказами.
select
a.id as '订单id',
a.order_date as '下单时间',
a.order_amount as '订单金额',
b.order_detail_id as '订单详情id',
b.goods_name as '商品名称',
b.goods_price as '商品价格',
b.order_id as '订单id'
from
dwd_order_info_pfd a
right join
dwd_order_detail_pfd b
on a.id = b.order_id
Это очень простой код SQL и подробно описываться не будет. Здесь в основном представлен тип JOIN в SQL, который используется здесь.right join
, что является правым соединением.
-
left join
: Сохранить все данные в левой таблице и связанные данные в правой таблице и установить для несвязанных данных в правой таблице значение NULL. -
right join
: сохранить все данные в правой таблице и связанные данные в левой таблице, а для несвязанных данных в левой таблице установить значение NULL. -
inner join
: сохранить связанные данные левой таблицы и связанные данные правой таблицы. -
cross join
: сохранить декартово произведение данных левой и правой таблицы.
Построчное сопоставление ассоциаций на основе значений ассоциативных ключей, фильтрация данных таблицы и создание окончательных результатов для последующего анализа данных.
На этом остановимся, я не буду вдаваться в подробности о принципе JOIN в SQL баз данных, если вам интересно, вы можете изучить его самостоятельно, а обратим внимание на область больших данных.
1.2 ПРИСОЕДИНЯЙТЕСЬ к офлайн-сценариям
Предположим, есть такой сценарий:
Зная таблицу заказов и таблицу сведений о заказах в базе данных Mysql и удовлетворяя отношение «один ко многим», подсчитайте детали распределения товаров по всем заказам в день T-1.
Каждый, кто умный, должен был дать ответ, да ~ это приведенный выше SQL:
select a.*, b.*
from
dwd_order_info_pfd a
right join
dwd_order_detail_pfd b
on a.id = b.order_id
Теперь измените следующие условия: известно, что и таблица заказов, и таблица сведений о заказах亿级别
данные и найти результаты анализа по тому же сценарию.
Что делать? В настоящее время реляционная база данных не кажется подходящей ~ начните расширять трюк: используйте大数据计算引擎
решать.
Учитывая, что статистический сценарий T-1 имеет низкие требования к своевременности, для обработки можно использовать Hive SQL, а нижний слой выполняет задачи Mapreduce. Если вы хотите повысить скорость работы, переключитесь на вычислительный движок Flink или Spark и используйте вычисления в памяти.
Что касается SQL-запроса, он такой же, как и выше, и он инкапсулирован в запланированную по времени задачу планирования, выполнение которой запланировано системой. Если результат неверный, из-за источника данных и данных静态
Без изменений, большое дело, повтор, внешний вид皆大欢喜
~
Но хорошие времена не длятся долго, и враг продукта предъявил вам непреодолимое требование в это время:我要实时统计!!
2 ПРИСОЕДИНЯЙТЕСЬ к сценариям в реальном времени
Все тот же вышеприведенный сценарий, на этот раз источник данных заменяется на实时
поток заказов и实时
Поток сведений о заказе, напримерKafka
Две темы , требуют статистики в режиме реального времени о деталях распределения всех заказов за каждую минуту.
Теперь ситуация вроде бы усложнилась, простой анализ:
- источник данных. Поток данных в реальном времени, отличающийся от статического потока, данных течет в реальном времени и изменяется динамически, и вычислительная программа должна поддерживать механизм обработки в реальном времени.
- Актуальность. упомянутый ранее
静态
Данные выполняют несколько операций соединения, и данные, которые могут быть связаны с левой и правой таблицами, очень постоянны; и实时数据流
(Левая и правая таблица) Если синхронизация ввода несовместима, данные, которые могли быть связаны, не будут связаны или произойдет ошибка.- Задержка. Статистика в режиме реального времени, предоставляющая результаты ответов за минуты или даже секунды.
Из-за специфики объединения потоковых данных необходимо实时处理机制
,低延迟
,强关联性
В соответствии с предпосылкой кажется, что необходимо сформулировать полную схему данных, чтобы реализовать JOIN данных реального потока.
2.1 Идеи схемы
Мы знаем, что существует связь «один ко многим» между данными заказа и подробными данными заказа, то есть одни данные заказа соответствуют нескольким подробным данным о продукте. . Часть подробных данных соответствует только данным одного заказа.
Таким образом, стратегия двухпотокового соединения может учитывать следующие идеи:
- Когда поток данных является данными заказа. Безусловно зарезервировано, независимо от того, связано ли оно в данный момент с подробными данными, оно зарезервировано для последующего использования соединения.
- Когда поток данных является подробными данными. После того, как он связан с данными заказа, вы можете попрощаться, иначе он будет временно зарезервирован для следующей встречи с данными заказа.
- Завершите объединение всех данных заказа и сведений о заказе за один и тот же период времени и очистите статус хранения.
В реальных производственных сценариях необходимо учитывать более сложные ситуации, включая обработку нештатных ситуаций, таких как потеря данных в процессе JOIN, которые показаны только здесь.
Хорошо, похоже, у нас есть так себе прототип решения JOIN для потоковой передачи в реальном времени.
Кажется, мы готовы начать большую битву~ Не волнуйтесь, кто-то помог нам тайно это осознать:Apache Flink
3 Двойной поток Flink JOIN
Apache Flink — это платформа и механизм распределенной обработки для вычислений с отслеживанием состояния на неограниченных и ограниченных потоках данных. Flink предназначен для работы во всех распространенных кластерных средах, выполняя вычисления со скоростью выполнения в памяти и в произвольном масштабе.
—— Определение с официального сайта Flink
Здесь нам нужно только знать, что Flink — это вычислительный движок в реальном времени, в основном сосредоточенный на том, как он реализует JOIN с двумя потоками.
3.1 Внутренний рабочий механизм
-
内存计算
: Задачи Flink предпочтительно рассчитываются в памяти, а при недостатке памяти сохраняются на диски с высокой эффективностью доступа, обеспечивая秒级
замедленная реакция. -
状态强一致性
: Flink использует согласованные моментальные снимки для сохранения состояния и периодически проверяет локальное состояние и постоянное хранилище для обеспечения согласованности состояния. -
分布式执行
: приложения Flink можно разделить на бесконечное количество параллельных задач для выполнения в кластере, используя почти неограниченный ЦП, основную память, диск и сетевой ввод-вывод. -
内置高级编程模型
: Модель программирования Flink разделена на четыре уровня: SQL, Table, DataStream|DataSet API и Process, и инкапсулирована в операторы с широким набором функций, включаяJOIN类型
Оператор.
Присмотритесь, были ли соблюдены предпосылки схемы JOIN потока в реальном времени, обсуждавшиеся в предыдущих главах?
-
实时处理机制
: Flink рождается как вычислительная машина реального времени -
低延迟
: Задержка второго уровня вычислений в памяти Flink. -
强关联性
: Консистентность состояния Flink и операторы классов присоединения.
Я не могу не вздохнуть, этот Флинк действительно силен~
Оставайтесь любопытными, давайте посмотрим на истинное значение двухпотокового соединения Flink! !
3.2 Механизм реализации JOIN
Двойной поток Flink JOIN в основном делится на две категории. Одна из них — собственная операция оператора Connect на основе состояния, а другая — оконная операция JOIN. где JOIN на основе окна можно разделить наwindow join
иinterval join
два вида.
- Принцип реализации: базовый принцип зависит от
State状态存储
, сохраняя данные в состоянии для присоединения ассоциации и, наконец, выводя результат.
Неожиданно Flink использует состояние State для кэширования потока в реальном времени, ожидающего присоединения.
Вот вам вопрос:
Можно ли использовать хранилище Redis?В чем разница между хранилищем состояния и хранилищем Redis?
Более подробную информацию можно обсудить вместе, добавить личный WeChat:youlong525
Пригласите вас в группу и получите бесплатный Flink PDF~
Вернемся к теме: как эти методы реализуют двухпотоковое JOIN? Давайте посмотрим вниз.
Примечание. Следующий контент будет более
文字
+代码
Чтобы не быть скучным, выкладываю кучу оригинальных принципиальных схем~
4 Механизм реализации двухпотокового JOIN на основе Window Join
Как следует из названия, этот метод использует технологию Flink.窗口机制
Реализует двухпотоковое соединение. Как правило, элементы в двух потоках реального времени выделяются в одно и то же временное окно для завершения соединения.
- Основной принцип: два потока данных в реальном времени кэшируются в
Window State
, когда окно инициирует расчет, выполняется операция соединения.
4.1 оператор соединения
Давайте взглянем на оператор соединения, который является одним из методов реализации соединения окна. Это включает в себя понятие окна (окна) во Flink, поэтому Window Joinan можно разделить на три типа в некоторой степени в зависимости от типа окна:
- Кувыркающееся окно
- Скользящее окно
- Присоединение к просмотру сеанса
Два потока данных обрабатываются в окне (прокрутка, прокрутка, сеанс) в соответствии с соответствующим первичным ключом.inner join
, Нижний уровень основан на хранилище состояний и поддерживает две временные характеристики: время обработки и время события, см. исходный код:
Краткое изложение ядра исходного кода: окно Windows + хранилище состояния + двухслойный цикл for для выполнения join()
Теперь давайте отодвинем временную шкалу немного назад, в实时场景JOIN
Там мы получили такую просьбу: посчитать детальную раздачу товаров по всем заказам в каждую минуту.
Хорошо, используйте оператор соединения, чтобы попробовать. Мы определяем скользящее окно в 60 секунд, связываем поток заказов и поток деталей заказа через order_id и получаем следующую программу:
val env = ...
// kafka 订单流
val orderStream = ...
// kafka 订单明细流
val orderDetailStream = ...
orderStream.join(orderDetailStream)
.where(r => r._1) //订单id
.equalTo(r => r._2) //订单id
.window(TumblingProcessTimeWindows.of(
Time.seconds(60)))
.apply {(r1, r2) => r1 + " : " + r2}
.print()
Весь код на самом деле очень прост, сводка выглядит следующим образом:
- Определите два входных потока реального времени A, B
- Поток вызывает оператор join(b stream)
- Определение отношения ассоциации: где связанный ключ потока A, equalTo - связанный ключ потока B, оба являются идентификаторами заказов.
- Определить окно окна (интервал 60 с)
- Метод применения определяет логический вывод
Таким образом, пока программа работает стабильно, детали распределения заказов в каждую минуту могут непрерывно рассчитываться, что, кажется, решает проблему~
Не радуйтесь, не забывайте, что тип соединения в настоящее времяinner join
. Чтобы просмотреть знания: внутреннее соединение относится к сохранению только данных, связанных с двумя потоками.
Таким образом, не теряются ли несвязанные данные в двойных потоках? Не волнуйтесь, Flink также предоставляет другую операцию объединения окон:coGroup
оператор.
4.2 оператор совместной группы
Оператор coGroup также основан на оконном механизме, но оператор coGroup является более гибким, чем оператор Join, и может сопоставлять данные левого или правого потока и выводить их в соответствии с логикой, заданной пользователем.
Другими словами, мы достигаем цели левого соединения и правого соединения, самостоятельно определяя вывод двойного потока.
Теперь давайте посмотрим, как оператор coGroup реализует левое соединение в том же сценарии:
#这里看看java算子的写法
orderDetailStream
.coGroup(orderStream)
.where(r -> r.getOrderId())
.equalTo(r -> r.getOrderId())
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.apply(new CoGroupFunction<OrderDetail, Order, Tuple2<String, Long>>() {
@Override
public void coGroup(Iterable<OrderDetail> orderDetailRecords, Iterable<Order> orderRecords, Collector<Tuple2<String, Long>> collector) {
for (OrderDetail orderDetaill : orderDetailRecords) {
boolean flag = false;
for (Order orderRecord : orderRecords) {
// 右流中有对应的记录
collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), orderDetailRecords.getGoods_price()));
flag = true;
}
if (!flag) {
// 右流中没有对应的记录
collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), null));
}
}
}
})
.print();
Здесь следует отметить несколько моментов:
- Оператор соединения заменен оператором coGroup.
- Два потока по-прежнему должны находиться в одном окне, а связанные с ними условия определены.
- Пользовательское суждение в методе применения, где оценивается правильное значение: если есть значение, соединение выводится, в противном случае правая сторона устанавливается в NULL.
Можно сказать, что теперь мы полностью сделали оконный двухпоточный JOIN.
Пока вы предоставляете мне определенный размер окна, я могу возиться со всеми видами соединений с помощью операторов соединения или coGroup, и это очень просто в использовании.
Но если в это время наш дорогой продукт вызывает небольшой вопрос:
В пиковый период большой акции данные о продукте не будут записаны вовремя в течение определенного периода времени.Время может быть раньше, чем заказ или позже, чем заказ.Все равно необходимо рассчитать детали распределения заказывайте товары в течение каждой минуты, без проблем~
Конечно, есть проблема: если два потока не синхронизированы, то было бы странно использовать окно для управления теми, которые могут присоединиться, легко автоматически закрыть окно, не дожидаясь потока соединения.
К счастью, я знаю, что Flink предоставляетInterval join
механизм.
5 Механизм реализации двухпотокового JOIN на основе интервального соединения
Interval Join основан на временном интервале смещения правого потока относительно левого потока (interval
) в качестве окна ассоциации для завершения операции соединения в окне интервала смещения.
Это немного сложно понять, поэтому позвольте мне нарисовать картинку, чтобы увидеть:
поток2.время ∈ (поток1.время +низкий, поток1.время +высокий)
Удовлетворить поток данных stream2 в потоке данных stream1interval
(низкий, высокий) Ассоциативные соединения в пределах интервала смещения. Чем больше интервал, тем больше данных связано, а данные за пределами интервала больше не связаны.
- Принцип реализации: интервальное соединение также использует состояние Flink для хранения данных, но в настоящее время существует механизм сбоя состояния.
ttl
, который запускает операцию очистки данных.
Вот еще вопрос:
Как установить механизм состояния ttl? Приведут ли необоснованные настройки ttl к разрыву памяти?
Я подробно объясню механизм ttl State в следующих статьях, приглашаю всех к обсуждению~
Ниже приведен краткий обзор процесса реализации кода интервального соединения:
val env = ...
// kafka 订单流
val orderStream = ...
// kafka 订单明细流
val orderDetailStream = ...
orderStream.keyBy(_.1)
// 调用intervalJoin关联
.intervalJoin(orderDetailStream._2)
// 设定时间上限和下限
.between(Time.milliseconds(-30), Time.milliseconds(30))
.process(new ProcessWindowFunction())
class ProcessWindowFunction extends ProcessJoinFunction...{
override def processElement(...) {
collector.collect((r1, r2) => r1 + " : " + r2)
}
}
После того, как поток ордеров перейдет в программу, подождите, пока данные потока детализации ордера в интервале времени (низкий, высокий) присоединятся, в противном случае продолжите обработку следующего потока.
Из кода мы обнаружили, что интервальное соединение должно состоять из двухKeyedStream
Вышеупомянутая операция называется keyBy(), а верхняя и нижняя границы интервала смещения указываются в методе between().
Следует отметить, что реализация интервального соединения такжеinner join
, и в настоящее время поддерживает только время события.
6 Механизм реализации двухпотокового JOIN на основе подключения
Когда я использовал оконное соединение или интервальное соединение для реализации двухпотокового соединения, я обнаружил общие черты:
Независимо от реализации, Flink внутренне присоединится к процессу.
透明化
, который инкапсулирует все детали реализации в операторе.
Что это? на языке программирования抽象
Концепция ~ Скрыть основные детали и открыть унифицированный API для внешнего мира.简化
кодирование программы.
Но это приведет к проблеме: если программа сообщает об ошибке или данные ненормальны, как быстро выполнить настройку и устранение неполадок, а также непосредственно посмотреть исходный код? Не очень реалистично. .
Здесь основано наConnect算子
Реализуя двухпоточный метод JOIN, мы можем сами контролировать логику обработки двухпотокового JOIN, сохраняя при этом своевременность и точность процесса.
6.1 Принцип работы оператора Connect
Выполните операцию подключения к двум потокам данных и преобразуйте их в ConnectedStreams.Сгенерированные потоки могут вызывать разные методы для выполнения в двух потоках в реальном времени, а состояние может быть общим для двух потоков.
Как видно из рисунка, после соединения двух потоков данных они только помещаются в один и тот же поток, а их данные и формы по-прежнему поддерживаются внутри, и два потока независимы друг от друга.
[DataStream1, DataStream2] -> ConnectedStreams[1,2]
Таким образом, мы можем написать код в ConnectedStreams внизу оператора Connect, чтобы реализовать логическую обработку двухпотокового JOIN самостоятельно.
6.2 Техническая реализация
1. Вызовите оператора подключения, сгруппируйте их в соответствии с идентификатором заказа и используйте оператор процесса для обработки двух потоков соответственно.
orderStream.connect(orderDetailStream)
.keyBy("orderId", "orderId")
.process(new orderProcessFunc());
2. Программирование состояния выполняется внутри метода процесса для инициализации состояния ValueState заказа, сведений о заказе и таймера.
private ValueState<OrderEvent> orderState;
private ValueState<TxEvent> orderDetailState;
private ValueState<Long> timeState;
// 初始化状态Value
orderState = getRuntimeContext().getState(
new ValueStateDescriptor<Order>
("order-state",Order.class));
····
3. Сохраните состояние и создайте таймеры для каждого входящего потока данных. Присоединитесь и выведите, когда другой поток прибудет в течение временного окна, и удалите таймер, когда закончите.
@Override
public void processElement1(Order value, Context ctx, Collector<Tuple2<Order, OrderDetail>> out){
if (orderDetailState.value() == null){
//明细数据未到,先把订单数据放入状态
orderState.update(value);
//建立定时器,60秒后触发
Long ts = (value.getEventTime()+10)*1000L;
ctx.timerService().registerEventTimeTimer(
ts);
timeState.update(ts);
}else{
//明细数据已到,直接输出到主流
out.collect(new Tuple2<>(value,orderDetailS
tate.value()));
//删除定时器
ctx.timerService().deleteEventTimeTimer
(timeState.value());
//清空状态,注意清空的是支付状态
orderDetailState.clear();
timeState.clear();
}
}
...
@Override
public void processElement2(){
...
}
4. Недостигнутый вовремя поток данных запускает вывод таймера на боковой выходной поток, сначала приходит левый поток и не приходит правый поток, затем выводится левый поток, в противном случае выводится правый поток.
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<Order, OrderDetail>> out) {
// 实现左连接
if (orderState.value() != null){
ctx.output(new OutputTag<String>("left-jo
in") {},
orderState.value().getTxId());
// 实现右连接
}else{
ctx.output(new OutputTag<String>("left-jo
in") {},
orderDetailState.value().getTxId());
}
orderState.clear();
orderDetailState.clear();
timeState.clear();
}
Общая идея: Реализуйте связь между данными заказа и подробными данными заказа на основе времени данных и вывод из побочного потока вывода, если время истекло или отсутствует.
существует
connect
Для потока заказов и потока сведений о заказах сначала создайте таймер и сохраните его.state
состояние, оно будет выполнено, когда оно находится в окнеjoin
, в противном случае введите побочный выходной поток.
7 Оптимизация и сводка двухпотокового JOIN
- Почему наступает время присоединения к двойному потоку, но оно не срабатывает, и все время нет вывода
чек
watermark
Разумна ли установка?数据时间
Это намного больше, чем водяной знак и время окна, из-за чего данные окна часто бывают пустыми
- Как долго будут храниться данные о состоянии и не взорвется ли память?
государство приходит с
ttl机制
, вы можете настроить политику истечения срока действия ttl, чтобы запускать Flink для очистки просроченных данных о состоянии. в предлагаемой процедуреstate数据结构
Вручную очистите его после использования.
- Что делать с наклоном присоединения к двойному потоку
Соединение склоняется к трем осям: фильтрация аномальных ключей, разделение таблиц для сокращения данных и разделение распределения ключей. Конечно, я рекомендую добавить больше памяти, если вы можете! Добавь памяти! Добавь памяти! !
- Как реализовать многопотоковое соединение
В настоящее время это не может быть реализовано одновременно. Вы можете сначала рассмотреть объединение, а затем обработать его во второй раз или выполнить сначала операцию соединения, а затем операцию соединения. Это только рекомендуется ~
- Будет ли процесс присоединения отложен, а несвязанные данные будут потеряны?
Вообще говоря, это не так.Процесс соединения может использовать побочный выходной поток для хранения задержанного потока; если есть аномалия, такая как сеть узлов, контрольная точка Flink также может гарантировать, что данные не будут потеряны.
когда-нибудь
Интервьюер: Вы понимаете двухпотоковое соединение Flink? Кратко объясните принцип его реализации.
Ajun: Двойной поток Flink JOIN есть. . .
Эта статья закончилась.
》》》》Для получения дополнительной информации, пожалуйста, обратите внимание на мой публичный аккаунт: Big Data Arsenal