Интересные 4D-слова | Подробности интервью с двойным потоком Flink JOIN

Flink
Интересные 4D-слова | Подробности интервью с двойным потоком Flink JOIN

Эта статья приняла участие в мероприятии «Церемония создания новичков», чтобы вместе начать путь создания золота.

Сегодня я расскажу вам о проблеме двухпотокового соединения Flink. Это точка высокочастотного интервью, а также реальная сцена, часто встречающаяся в работе.

Как обеспечить двухпотоковое присоединение Flink准确性и及时性,Кроме窗口joinКакие еще способы реализации существуют и как ответить, чтобы полностью произвести впечатление на интервьюера. . Ответ вы найдете в этой статье.

1 грунтовка

1.1 ПРИСОЕДИНЯЙТЕСЬ к базе данных SQL

Давайте сначала рассмотрим операцию JOIN в базе данных SQL. SQL-запрос заказа, показанный ниже, путем изменения таблицы заказовidи лист сведений о заказеorder_idСсылка для получения информации о продукте под всеми заказами.

select 
   a.id as '订单id',
   a.order_date as '下单时间',
   a.order_amount as '订单金额',
   b.order_detail_id as '订单详情id',
   b.goods_name as '商品名称',
   b.goods_price as '商品价格',
   b.order_id as '订单id'
from 
   dwd_order_info_pfd a
right join 
   dwd_order_detail_pfd b
on a.id = b.order_id

Это очень простой код SQL и подробно описываться не будет. Здесь в основном представлен тип JOIN в SQL, который используется здесь.right join, что является правым соединением.

  • left join: Сохранить все данные в левой таблице и связанные данные в правой таблице и установить для несвязанных данных в правой таблице значение NULL.
  • right join: сохранить все данные в правой таблице и связанные данные в левой таблице, а для несвязанных данных в левой таблице установить значение NULL.
  • inner join: сохранить связанные данные левой таблицы и связанные данные правой таблицы.
  • cross join: сохранить декартово произведение данных левой и правой таблицы.

Построчное сопоставление ассоциаций на основе значений ассоциативных ключей, фильтрация данных таблицы и создание окончательных результатов для последующего анализа данных.

На этом остановимся, я не буду вдаваться в подробности о принципе JOIN в SQL баз данных, если вам интересно, вы можете изучить его самостоятельно, а обратим внимание на область больших данных.

1.2 ПРИСОЕДИНЯЙТЕСЬ к офлайн-сценариям

Предположим, есть такой сценарий:

Зная таблицу заказов и таблицу сведений о заказах в базе данных Mysql и удовлетворяя отношение «один ко многим», подсчитайте детали распределения товаров по всем заказам в день T-1.

Каждый, кто умный, должен был дать ответ, да ~ это приведенный выше SQL:

select a.*, b.*
from 
   dwd_order_info_pfd a
right join 
   dwd_order_detail_pfd b
on a.id = b.order_id

Теперь измените следующие условия: известно, что и таблица заказов, и таблица сведений о заказах亿级别данные и найти результаты анализа по тому же сценарию.

Что делать? В настоящее время реляционная база данных не кажется подходящей ~ начните расширять трюк: используйте大数据计算引擎решать.

Учитывая, что статистический сценарий T-1 имеет низкие требования к своевременности, для обработки можно использовать Hive SQL, а нижний слой выполняет задачи Mapreduce. Если вы хотите повысить скорость работы, переключитесь на вычислительный движок Flink или Spark и используйте вычисления в памяти.

Что касается SQL-запроса, он такой же, как и выше, и он инкапсулирован в запланированную по времени задачу планирования, выполнение которой запланировано системой. Если результат неверный, из-за источника данных и данных静态Без изменений, большое дело, повтор, внешний вид皆大欢喜~

Но хорошие времена не длятся долго, и враг продукта предъявил вам непреодолимое требование в это время:我要实时统计!!

2 ПРИСОЕДИНЯЙТЕСЬ к сценариям в реальном времени

Все тот же вышеприведенный сценарий, на этот раз источник данных заменяется на实时поток заказов и实时Поток сведений о заказе, напримерKafkaДве темы , требуют статистики в режиме реального времени о деталях распределения всех заказов за каждую минуту.

Теперь ситуация вроде бы усложнилась, простой анализ:

  1. источник данных. Поток данных в реальном времени, отличающийся от статического потока, данных течет в реальном времени и изменяется динамически, и вычислительная программа должна поддерживать механизм обработки в реальном времени.
  2. Актуальность. упомянутый ранее静态Данные выполняют несколько операций соединения, и данные, которые могут быть связаны с левой и правой таблицами, очень постоянны; и实时数据流(Левая и правая таблица) Если синхронизация ввода несовместима, данные, которые могли быть связаны, не будут связаны или произойдет ошибка.
  3. Задержка. Статистика в режиме реального времени, предоставляющая результаты ответов за минуты или даже секунды.

Из-за специфики объединения потоковых данных необходимо实时处理机制,低延迟,强关联性В соответствии с предпосылкой кажется, что необходимо сформулировать полную схему данных, чтобы реализовать JOIN данных реального потока.

2.1 Идеи схемы

Мы знаем, что существует связь «один ко многим» между данными заказа и подробными данными заказа, то есть одни данные заказа соответствуют нескольким подробным данным о продукте. . Часть подробных данных соответствует только данным одного заказа.

Таким образом, стратегия двухпотокового соединения может учитывать следующие идеи:

  • Когда поток данных является данными заказа. Безусловно зарезервировано, независимо от того, связано ли оно в данный момент с подробными данными, оно зарезервировано для последующего использования соединения.
  • Когда поток данных является подробными данными. После того, как он связан с данными заказа, вы можете попрощаться, иначе он будет временно зарезервирован для следующей встречи с данными заказа.
  • Завершите объединение всех данных заказа и сведений о заказе за один и тот же период времени и очистите статус хранения.

В реальных производственных сценариях необходимо учитывать более сложные ситуации, включая обработку нештатных ситуаций, таких как потеря данных в процессе JOIN, которые показаны только здесь.

Хорошо, похоже, у нас есть так себе прототип решения JOIN для потоковой передачи в реальном времени.

Кажется, мы готовы начать большую битву~ Не волнуйтесь, кто-то помог нам тайно это осознать:Apache Flink

3 Двойной поток Flink JOIN

Apache Flink — это платформа и механизм распределенной обработки для вычислений с отслеживанием состояния на неограниченных и ограниченных потоках данных. Flink предназначен для работы во всех распространенных кластерных средах, выполняя вычисления со скоростью выполнения в памяти и в произвольном масштабе.
—— Определение с официального сайта Flink

Здесь нам нужно только знать, что Flink — это вычислительный движок в реальном времени, в основном сосредоточенный на том, как он реализует JOIN с двумя потоками.

3.1 Внутренний рабочий механизм

  • 内存计算: Задачи Flink предпочтительно рассчитываются в памяти, а при недостатке памяти сохраняются на диски с высокой эффективностью доступа, обеспечивая秒级замедленная реакция.
  • 状态强一致性: Flink использует согласованные моментальные снимки для сохранения состояния и периодически проверяет локальное состояние и постоянное хранилище для обеспечения согласованности состояния.
  • 分布式执行: приложения Flink можно разделить на бесконечное количество параллельных задач для выполнения в кластере, используя почти неограниченный ЦП, основную память, диск и сетевой ввод-вывод.
  • 内置高级编程模型: Модель программирования Flink разделена на четыре уровня: SQL, Table, DataStream|DataSet API и Process, и инкапсулирована в операторы с широким набором функций, включаяJOIN类型Оператор.

Присмотритесь, были ли соблюдены предпосылки схемы JOIN потока в реальном времени, обсуждавшиеся в предыдущих главах?

  1. 实时处理机制: Flink рождается как вычислительная машина реального времени
  2. 低延迟: Задержка второго уровня вычислений в памяти Flink.
  3. 强关联性: Консистентность состояния Flink и операторы классов присоединения.

Я не могу не вздохнуть, этот Флинк действительно силен~

Оставайтесь любопытными, давайте посмотрим на истинное значение двухпотокового соединения Flink! !

3.2 Механизм реализации JOIN

Двойной поток Flink JOIN в основном делится на две категории. Одна из них — собственная операция оператора Connect на основе состояния, а другая — оконная операция JOIN. где JOIN на основе окна можно разделить наwindow joinиinterval joinдва вида.

  • Принцип реализации: базовый принцип зависит отState状态存储, сохраняя данные в состоянии для присоединения ассоциации и, наконец, выводя результат.

Неожиданно Flink использует состояние State для кэширования потока в реальном времени, ожидающего присоединения.

Вот вам вопрос:

Можно ли использовать хранилище Redis?В чем разница между хранилищем состояния и хранилищем Redis?

Более подробную информацию можно обсудить вместе, добавить личный WeChat:youlong525Пригласите вас в группу и получите бесплатный Flink PDF~

Вернемся к теме: как эти методы реализуют двухпотоковое JOIN? Давайте посмотрим вниз.

Примечание. Следующий контент будет более文字 + 代码Чтобы не быть скучным, выкладываю кучу оригинальных принципиальных схем~

4 Механизм реализации двухпотокового JOIN на основе Window Join

Как следует из названия, этот метод использует технологию Flink.窗口机制Реализует двухпотоковое соединение. Как правило, элементы в двух потоках реального времени выделяются в одно и то же временное окно для завершения соединения.

  • Основной принцип: два потока данных в реальном времени кэшируются вWindow State, когда окно инициирует расчет, выполняется операция соединения.

4.1 оператор соединения

Давайте взглянем на оператор соединения, который является одним из методов реализации соединения окна. Это включает в себя понятие окна (окна) во Flink, поэтому Window Joinan можно разделить на три типа в некоторой степени в зависимости от типа окна:

  • Кувыркающееся окно
  • Скользящее окно
  • Присоединение к просмотру сеанса

Два потока данных обрабатываются в окне (прокрутка, прокрутка, сеанс) в соответствии с соответствующим первичным ключом.inner join, Нижний уровень основан на хранилище состояний и поддерживает две временные характеристики: время обработки и время события, см. исходный код:

Краткое изложение ядра исходного кода: окно Windows + хранилище состояния + двухслойный цикл for для выполнения join()

Теперь давайте отодвинем временную шкалу немного назад, в实时场景JOINТам мы получили такую ​​просьбу: посчитать детальную раздачу товаров по всем заказам в каждую минуту.

Хорошо, используйте оператор соединения, чтобы попробовать. Мы определяем скользящее окно в 60 секунд, связываем поток заказов и поток деталей заказа через order_id и получаем следующую программу:

val env = ...
// kafka 订单流
val orderStream = ... 
// kafka 订单明细流
val orderDetailStream = ...
    
orderStream.join(orderDetailStream)
    .where(r => r._1)  //订单id
    .equalTo(r => r._2) //订单id
    .window(TumblingProcessTimeWindows.of(
          Time.seconds(60)))
    .apply {(r1, r2) => r1 + " : " + r2}
    .print()

Весь код на самом деле очень прост, сводка выглядит следующим образом:

  • Определите два входных потока реального времени A, B
  • Поток вызывает оператор join(b stream)
  • Определение отношения ассоциации: где связанный ключ потока A, equalTo - связанный ключ потока B, оба являются идентификаторами заказов.
  • Определить окно окна (интервал 60 с)
  • Метод применения определяет логический вывод

Таким образом, пока программа работает стабильно, детали распределения заказов в каждую минуту могут непрерывно рассчитываться, что, кажется, решает проблему~

Не радуйтесь, не забывайте, что тип соединения в настоящее времяinner join. Чтобы просмотреть знания: внутреннее соединение относится к сохранению только данных, связанных с двумя потоками.

Таким образом, не теряются ли несвязанные данные в двойных потоках? Не волнуйтесь, Flink также предоставляет другую операцию объединения окон:coGroupоператор.

4.2 оператор совместной группы

Оператор coGroup также основан на оконном механизме, но оператор coGroup является более гибким, чем оператор Join, и может сопоставлять данные левого или правого потока и выводить их в соответствии с логикой, заданной пользователем.

Другими словами, мы достигаем цели левого соединения и правого соединения, самостоятельно определяя вывод двойного потока.

Теперь давайте посмотрим, как оператор coGroup реализует левое соединение в том же сценарии:

#这里看看java算子的写法
orderDetailStream
  .coGroup(orderStream)
  .where(r -> r.getOrderId())
  .equalTo(r -> r.getOrderId())
  .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
  .apply(new CoGroupFunction<OrderDetail, Order, Tuple2<String, Long>>() {
    @Override
    public void coGroup(Iterable<OrderDetail> orderDetailRecords, Iterable<Order> orderRecords, Collector<Tuple2<String, Long>> collector)  {
      for (OrderDetail orderDetaill : orderDetailRecords) {
        boolean flag = false;
        for (Order orderRecord : orderRecords) {
          // 右流中有对应的记录
          collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), orderDetailRecords.getGoods_price()));
          flag = true;
        }
        if (!flag) {
          // 右流中没有对应的记录
          collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), null));
        }
      }
    }
  })
  .print();

Здесь следует отметить несколько моментов:

  • Оператор соединения заменен оператором coGroup.
  • Два потока по-прежнему должны находиться в одном окне, а связанные с ними условия определены.
  • Пользовательское суждение в методе применения, где оценивается правильное значение: если есть значение, соединение выводится, в противном случае правая сторона устанавливается в NULL.

Можно сказать, что теперь мы полностью сделали оконный двухпоточный JOIN.

Пока вы предоставляете мне определенный размер окна, я могу возиться со всеми видами соединений с помощью операторов соединения или coGroup, и это очень просто в использовании.

Но если в это время наш дорогой продукт вызывает небольшой вопрос:

В пиковый период большой акции данные о продукте не будут записаны вовремя в течение определенного периода времени.Время может быть раньше, чем заказ или позже, чем заказ.Все равно необходимо рассчитать детали распределения заказывайте товары в течение каждой минуты, без проблем~

Конечно, есть проблема: если два потока не синхронизированы, то было бы странно использовать окно для управления теми, которые могут присоединиться, легко автоматически закрыть окно, не дожидаясь потока соединения.

К счастью, я знаю, что Flink предоставляетInterval joinмеханизм.

5 Механизм реализации двухпотокового JOIN на основе интервального соединения

Interval Join основан на временном интервале смещения правого потока относительно левого потока (interval) в качестве окна ассоциации для завершения операции соединения в окне интервала смещения.

Это немного сложно понять, поэтому позвольте мне нарисовать картинку, чтобы увидеть:

поток2.время ∈ (поток1.время +низкий, поток1.время +высокий)

Удовлетворить поток данных stream2 в потоке данных stream1interval(низкий, высокий) Ассоциативные соединения в пределах интервала смещения. Чем больше интервал, тем больше данных связано, а данные за пределами интервала больше не связаны.

  • Принцип реализации: интервальное соединение также использует состояние Flink для хранения данных, но в настоящее время существует механизм сбоя состояния.ttl, который запускает операцию очистки данных.

Вот еще вопрос:

Как установить механизм состояния ttl? Приведут ли необоснованные настройки ttl к разрыву памяти?

Я подробно объясню механизм ttl State в следующих статьях, приглашаю всех к обсуждению~

Ниже приведен краткий обзор процесса реализации кода интервального соединения:

val env = ...
// kafka 订单流
val orderStream = ... 
// kafka 订单明细流
val orderDetailStream = ...
    
orderStream.keyBy(_.1)
    // 调用intervalJoin关联
    .intervalJoin(orderDetailStream._2)
    // 设定时间上限和下限
    .between(Time.milliseconds(-30), Time.milliseconds(30))  
    .process(new ProcessWindowFunction())
    
class ProcessWindowFunction extends ProcessJoinFunction...{
   override def processElement(...) {
      collector.collect((r1, r2) => r1 + " : " + r2)
   }
}

После того, как поток ордеров перейдет в программу, подождите, пока данные потока детализации ордера в интервале времени (низкий, высокий) присоединятся, в противном случае продолжите обработку следующего потока.

Из кода мы обнаружили, что интервальное соединение должно состоять из двухKeyedStreamВышеупомянутая операция называется keyBy(), а верхняя и нижняя границы интервала смещения указываются в методе between().

Следует отметить, что реализация интервального соединения такжеinner join, и в настоящее время поддерживает только время события.

6 Механизм реализации двухпотокового JOIN на основе подключения

Когда я использовал оконное соединение или интервальное соединение для реализации двухпотокового соединения, я обнаружил общие черты:

Независимо от реализации, Flink внутренне присоединится к процессу.透明化, который инкапсулирует все детали реализации в операторе.

Что это? на языке программирования抽象Концепция ~ Скрыть основные детали и открыть унифицированный API для внешнего мира.简化кодирование программы.

Но это приведет к проблеме: если программа сообщает об ошибке или данные ненормальны, как быстро выполнить настройку и устранение неполадок, а также непосредственно посмотреть исходный код? Не очень реалистично. .

Здесь основано наConnect算子Реализуя двухпоточный метод JOIN, мы можем сами контролировать логику обработки двухпотокового JOIN, сохраняя при этом своевременность и точность процесса.

6.1 Принцип работы оператора Connect

Выполните операцию подключения к двум потокам данных и преобразуйте их в ConnectedStreams.Сгенерированные потоки могут вызывать разные методы для выполнения в двух потоках в реальном времени, а состояние может быть общим для двух потоков.

Как видно из рисунка, после соединения двух потоков данных они только помещаются в один и тот же поток, а их данные и формы по-прежнему поддерживаются внутри, и два потока независимы друг от друга.

[DataStream1, DataStream2] -> ConnectedStreams[1,2]

Таким образом, мы можем написать код в ConnectedStreams внизу оператора Connect, чтобы реализовать логическую обработку двухпотокового JOIN самостоятельно.

6.2 Техническая реализация

1. Вызовите оператора подключения, сгруппируйте их в соответствии с идентификатором заказа и используйте оператор процесса для обработки двух потоков соответственно.

orderStream.connect(orderDetailStream)
  .keyBy("orderId", "orderId")
  .process(new orderProcessFunc());

2. Программирование состояния выполняется внутри метода процесса для инициализации состояния ValueState заказа, сведений о заказе и таймера.

private ValueState<OrderEvent> orderState;
private ValueState<TxEvent> orderDetailState;
private ValueState<Long> timeState;

// 初始化状态Value
orderState = getRuntimeContext().getState(
 new ValueStateDescriptor<Order>
 ("order-state",Order.class));
····

3. Сохраните состояние и создайте таймеры для каждого входящего потока данных. Присоединитесь и выведите, когда другой поток прибудет в течение временного окна, и удалите таймер, когда закончите.

@Override
public void processElement1(Order value, Context ctx, Collector<Tuple2<Order, OrderDetail>> out){
  if (orderDetailState.value() == null){
    //明细数据未到,先把订单数据放入状态
     orderState.update(value);
    //建立定时器,60秒后触发
     Long ts = (value.getEventTime()+10)*1000L;
     ctx.timerService().registerEventTimeTimer(
       ts);
     timeState.update(ts);
  }else{
    //明细数据已到,直接输出到主流
     out.collect(new Tuple2<>(value,orderDetailS
       tate.value()));
    //删除定时器
     ctx.timerService().deleteEventTimeTimer
      (timeState.value());
     //清空状态,注意清空的是支付状态
      orderDetailState.clear();
      timeState.clear();
  }
}
...
@Override
public void processElement2(){
  ...
}

4. Недостигнутый вовремя поток данных запускает вывод таймера на боковой выходной поток, сначала приходит левый поток и не приходит правый поток, затем выводится левый поток, в противном случае выводится правый поток.

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<Order, OrderDetail>> out) {
  // 实现左连接
   if (orderState.value() != null){
       ctx.output(new OutputTag<String>("left-jo 
       in") {}, 
       orderState.value().getTxId());
   // 实现右连接
   }else{
      ctx.output(new OutputTag<String>("left-jo 
       in") {}, 
       orderDetailState.value().getTxId());
   }
   orderState.clear();
   orderDetailState.clear();
   timeState.clear();
}

Общая идея: Реализуйте связь между данными заказа и подробными данными заказа на основе времени данных и вывод из побочного потока вывода, если время истекло или отсутствует.

существуетconnectДля потока заказов и потока сведений о заказах сначала создайте таймер и сохраните его.stateсостояние, оно будет выполнено, когда оно находится в окнеjoin, в противном случае введите побочный выходной поток.

7 Оптимизация и сводка двухпотокового JOIN

  1. Почему наступает время присоединения к двойному потоку, но оно не срабатывает, и все время нет вывода

чекwatermarkРазумна ли установка?数据时间Это намного больше, чем водяной знак и время окна, из-за чего данные окна часто бывают пустыми

  1. Как долго будут храниться данные о состоянии и не взорвется ли память?

государство приходит сttl机制, вы можете настроить политику истечения срока действия ttl, чтобы запускать Flink для очистки просроченных данных о состоянии. в предлагаемой процедуреstate数据结构Вручную очистите его после использования.

  1. Что делать с наклоном присоединения к двойному потоку

Соединение склоняется к трем осям: фильтрация аномальных ключей, разделение таблиц для сокращения данных и разделение распределения ключей. Конечно, я рекомендую добавить больше памяти, если вы можете! Добавь памяти! Добавь памяти! !

  1. Как реализовать многопотоковое соединение

В настоящее время это не может быть реализовано одновременно. Вы можете сначала рассмотреть объединение, а затем обработать его во второй раз или выполнить сначала операцию соединения, а затем операцию соединения. Это только рекомендуется ~

  1. Будет ли процесс присоединения отложен, а несвязанные данные будут потеряны?

Вообще говоря, это не так.Процесс соединения может использовать побочный выходной поток для хранения задержанного потока; если есть аномалия, такая как сеть узлов, контрольная точка Flink также может гарантировать, что данные не будут потеряны.

когда-нибудь

Интервьюер: Вы понимаете двухпотоковое соединение Flink? Кратко объясните принцип его реализации.

Ajun: Двойной поток Flink JOIN есть. . .

Эта статья закончилась.

》》》》Для получения дополнительной информации, пожалуйста, обратите внимание на мой публичный аккаунт: Big Data Arsenal