Применение и практика Flink в Ele.me

Большие данные

Автор этой статьи: И Вэйпин (голоден?)

Аранжировка: Цзи Пин (отдел вычислений реального времени Alibaba)

Эта статья покажет вам работу, проделанную платформой больших данных Ele.me для вычислений в реальном времени, и эволюцию вычислительного движка, вы сможете узнать о преимуществах и недостатках Storm, Spark и Flink. Как выбрать подходящую вычислительную машину реального времени? Какие преимущества есть у Flink, чтобы стать первым выбором Ele.me? Эта статья поможет вам разгадать тайну один за другим.

Статус платформы

Ниже представлена ​​схема текущей архитектуры текущей платформы Ele.me:

Данные из нескольких источников данных записываются в kafka, а вычислительные механизмы — это в основном Storm, Spark и Flink, а полученные данные из вычислительных механизмов затем помещаются в различные хранилища.

В настоящее время существует около 100 задач Storm, около 50 задач Spark, а Flink пока относительно невелик.

В настоящее время ежедневный объем данных нашего кластера составляет 60 ТБ, количество вычислений — 1000000000, а узлов — 400. Здесь следует упомянуть, что и Spark, и Flink работают на пряже, Flink onyarn в основном используется для изоляции менеджера заданий между задачами, а Storm — в автономном режиме.

Сценарии применения

1. Семантика согласованности

Прежде чем описывать сценарии наших приложений, мы сначала подчеркнем важную концепцию вычислений в реальном времени, семантику согласованности:

  1. at-most-once: то есть выстрелил и забыл, мы обычно пишем java-приложение, не учитывая управление смещением источника и идемпотентность нисходящего потока, это просто не более одного раза, данные приходят, независимо от того, какое промежуточное состояние. , Какой статус записи данных, механизма акка нет.

  2. at-least-once:Механизм повторной передачи, повторная передача данных, чтобы гарантировать, что все данные обрабатываются хотя бы один раз.

  3. ровно один раз:Используйте грубый контроль детализации контрольных точек для реализации точного однократного выполнения.Большая часть точного однократного обращения, о котором мы говорим, относится к точно однократному выполнению в вычислительном движке, то есть может ли быть воспроизведено внутреннее состояние оператора на каждом шаге; задание зависает, может быть Нет, оно плавно восстанавливается из последнего состояния и не использует понятие идемпотентности вывода на сток.

  4. at-least-one + idempotent = exactly-one: Если мы можем гарантировать идемпотентность последующих операций, таких как реализация обновления по дублирующемуся ключу на основе mysql, или если вы используете es, cassandra и т. д., вы можете использовать первичный ключ для реализации семантики обновления, чтобы обеспечить в то же время, в сочетании с идемпотентностью ровно один раз.

2. Storm

Ele.me использовал Storm в первые дни, до 16 лет он все еще был Storm, а Sparkstreaming и Structed-streaming начались в 2017 году. Storm использовался ранее и в основном имеет следующие концепции:

  1. Данные основаны на кортежах

  2. миллисекундная задержка

  3. В основном поддерживает java, а теперь также поддерживает python и использует apache beam.

  4. Функции Sql еще не завершены. Мы инкапсулировали тифон внутри. Пользователям нужно только расширить некоторые из наших интерфейсов, чтобы использовать многие основные функции. Flux — лучший инструмент для Storm, и ему нужно только написать файл yaml. задача в определенной степени удовлетворяет некоторым требованиям, но все же требует от пользователя быть инженером, умеющим писать на Java, а аналитики данных не могут ее использовать.

★ 2.1 Резюме
  1. Простота использования: Из-за высокого порога использования его продвижение ограничено.

2) StateBackend: больше требуется внешнее хранилище, например хранилище kv, такое как Redis.

  1. С точки зрения распределения ресурсов: используется метод предварительной установки воркеров и слотов, кроме того, из-за меньшего количества точек оптимизации производительность движка относительно низкая.

3. Sparkstreaming

Однажды к нам подошла деловая группа и спросила, можем ли мы написать SQL и опубликовать вычислительную задачу в реальном времени за несколько минут. Итак, мы начали заниматься Sparkstreaming. Его основные концепции заключаются в следующем:

  1. Микропакет: вам нужно заранее установить окно, а затем обработать данные в окне.

  2. Задержка находится на втором уровне и в лучшем случае составляет около 500 мс.

  3. Языки разработки — java и scala.

  4. Streaming SQL, в основном наша работа, мы хотим предоставить платформу для Streaming SQL.

Функции:

  1. Экология Spark и SparkSQL: в этом Spark лучше.Стек технологий унифицирован, а пакеты SQL, графовых вычислений и машинного обучения могут интермодулироваться. Поскольку он сначала выполняет пакетную обработку, в отличие от Flink, его естественные API в реальном времени и автономные API унифицированы.

  2. Контрольная точка на hdfs.

  3. На пряже: Spark принадлежит к экосистеме хаупов и тесно интегрирован с пряжей.

  4. Высокая пропускная способность: Поскольку это микропакетный метод, пропускная способность также относительно высока.

Ниже приведен общий обзор шагов, необходимых пользователям нашей платформы для быстрой публикации страницы выполнения задачи в реальном времени. Здесь мы пишем не операторы DDL и DML, а то, как пользовательский интерфейс отображает страницу.

На странице пользователю будет предложено выбрать некоторые необходимые параметры, какой кластер kafka будет выбран первым, сколько потребляет каждый раздел, а также по умолчанию включено обратное давление. Место потребления должно быть указано пользователем каждый раз.Возможно, что в следующий раз, когда пользователь переписывает задачу в реальном времени, смещенная точка потребления может быть выбрана в соответствии с потребностями бизнеса.

В середине пользователь может описать конвейер. SQL — это несколько тем kafka, выбираем выходную таблицу для вывода, SQL регистрирует использованный выше kafka DStream как таблицу, затем записывает серию конвейеров, и, наконец, мы инкапсулируем некоторые внешние приемники для пользователей (поддерживаются все виды только что упомянутых хранилищ , если хранилище может достичьupsertС точки зрения семантики мы все это поддерживаем).

★ 3.1 Многопотоковое соединение

Хотя вычислительные требования в общих пакетах без сохранения состояния только что были выполнены, некоторые пользователи хотят сказать, что мне делать, если я хочу выполнять потоковое соединение, ранний Spark1.5 может ссылаться наSpark-streamingsqlЭтот проект с открытым исходным кодом регистрирует DStream как таблицу, а затем выполняет операции соединения с этой таблицей, но он поддерживает только версии до 1.5, и проект был заброшен после того, как Spark 2.0 представила структурированную потоковую передачу. у нас естьtrickyПуть:

Пусть Sparkstreaming потребляет несколько топиков, но я конвертирую каждый пакет RDD в потребляемом DStream в DataFrame по некоторым условиям, чтобы его можно было зарегистрировать как таблицу, и разделить на две таблицы по конкретным условиям, просто можно просто сделать соединение.Проблема этого соединения полностью зависит от данных, потребляемых на этот раз.Условия для их соединения неуправляемы, и это хитрый способ. Например, в следующем примере два топика потребляются, а затем просто разделяются на две таблицы с помощью условия фильтрации, после чего может быть выполнено объединение двух таблиц, но по сути это поток.

★ 3.2 Ровно один раз

Ровно-один раз требует особого внимания к одному моменту:

Мы должны потребовать, чтобы приемник данных был отправлен во внешнее хранилище, прежде чем можно будет зафиксировать смещение.Будь то в zookeeper или mysql, вам лучше убедиться, что он находится в транзакции и должен быть выведен во внешнее хранилище (здесь лучше всего обеспечитьupsertПосле этого исходный драйвер генерирует kafka RDD в соответствии с сохраненным смещением, а исполнитель потребляет данные в соответствии со смещением каждого раздела kafka. Если эти условия соблюдены, может быть достигнут сквозной ровно один раз, что является основной предпосылкой.

★ 3.3 Резюме
  1. SQL обработки с отслеживанием состояния (: Если мы хотим реализовать расчет с состоянием по батчам, то в версии 1.Х мы делаем это через эти два интерфейса, но нам все равно нужно хранить это состояние в hdfs или снаружи, что чуть более хлопотно реализовать.

  2. Real Multi-Stream Join: невозможно достичь семантики истинного соединения нескольких потоков.

  3. **Семантика End-To-End Exactly-Once:** Его семантика end-to-end Exactly-Once громоздка для реализации. После погружения во внешнее хранилище вам необходимо вручную зафиксировать смещения в транзакции.

4. STRUCTURED STREAMING

Мы исследовали, а затем перешли к использованию инкрементных вычислений с состоянием после Spark2.X. Картинка ниже взята с официального сайта:

Все расчеты потока относятся к потоку данных Google, который имеет важное понятие: время обработки данных и время события, то есть существует разрыв между временем обработки данных и реальным временем возникновения. Таким образом, в области потоковых вычислений также существует водяной знак.Водяной знак должен поддерживать уровень воды текущего входящего события.Водяной знак может указывать диапазон времени задержки.Данные за пределами окна задержки могут быть отброшены, и данные, которые поступают в бизнес поздно, бессмысленны.

Ниже представлена ​​диаграмма архитектуры структурированной потоковой передачи:

Это реализация шагов 1, 2 и 3 точно один раз в sparkstreaming только что. По сути, это пакетный метод, смещение поддерживается само по себе, hdfs для хранения состояния, а внешний приемник не выполняет подобную идемпотентность. .Операция не идет на фиксацию смещения после записи, она просто гарантирует отказоустойчивость и добивается ровно одного внутреннего движка.

★ 4.1 Особенности
  1. Stateful Processing SQL&DSL: может удовлетворить расчет потока с состоянием

  2. Real Multi-Stream Join: Вы можете реализовать объединение нескольких потоков через Spark 2.3. Метод объединения нескольких потоков аналогичен методу Flink. Вам необходимо определить условия двух потоков (в основном время как условие), например, есть два потока поступающих тем , а затем вы хотите ограничить данные, которые должны быть буферизованы полем (обычно временем события) в определенной схеме, чтобы можно было реализовать реальное присоединение к потоку.

3) Легче реализовать сквозную семантику ровно один раз, и добиться этого можно только путем расширения интерфейса приемника для поддержки идемпотентных операций.

В частности, API структурированной потоковой передачи немного отличается от API нативной потоковой передачи, когда он создает Dataframe таблицы, ему необходимо указать схему таблицы, а это означает, что вам нужно указать схему заранее. Кроме того, его водяной знак не поддерживает SQL, поэтому мы добавили расширение для полной записи SQL, которое можно преобразовать слева направо (ниже), мы надеемся, что пользователи не только программисты, но и надеемся, что не писать программные данные Аналитики и другие студенты также могут использовать его.

★ 4.2 Резюме
  1. Триггер (время обработки, непрерывный): До версии 2.3 это в основном основывалось на времени обработки.После обработки каждого пакета данных немедленно запускался расчет следующего пакета. 2.3 введен триггер для непрерывной обработки записи за записью.

  2. Continuous Processing (Only Map-Like Operations): В настоящее время он поддерживает только операции, подобные картам, а поддержка sql также несколько ограничена.

  3. LowEnd-To-End Latency With Exactly-Once Guarantees: Сквозная гарантия ровно один раз требует некоторых дополнительных расширений.Мы обнаружили, что kafka 0.11 предоставляет функцию транзакций, которая может быть реализована от источника к движку и приемнику на основе этого соображения, в реальном смысле end-to-- закончить ровно один раз.

  4. CEP(Drools): мы обнаружили, что некоторым деловым сторонам необходимо предоставлять сложные функции обработки событий, такие как CEP. В настоящее время наша грамматика не может напрямую поддерживать это. Мы позволяем пользователям использовать механизм правил Drools, а затем запускать на каждом исполнителе, полагаясь на функцию механизма правил. реализовать КЭП.

Поэтому, основываясь на перечисленных выше функциях и недостатках структурированной потоковой передачи Spark, мы рассматриваем возможность использования Flink для выполнения этих задач.

5.Flink

Цель Flink — сравнить Spark, Streaming более продвинут, и его амбиции относительно велики, в нем есть графовые вычисления, машинное обучение и т. д., а нижний слой также поддерживает пряжу, tez и т. д. Для большего объема хранилища, используемого сообществом, официальная поддержка сообщества Flink относительно хороша.

Каркасная схема Flink:

JobManager во Flink эквивалентен роли Driver Spark, а TaskManger эквивалентен Executor, а задачи в нем аналогичны задачам Spark. Однако RPC, используемый Flink,akkaПри этом Flink Core настраивает фреймворк сериализации памяти, кроме того, задачам не нужно ждать друг друга, как на каждом этапе Spark, а отправлять данные вниз по течению после обработки.

Оператор обработки бинарных данных Flink:

Пользователи сериализации Spark обычно используют сериализацию по умолчанию kryo или java, а проект Tungsten также оптимизирует программы Spark на уровне JVM и генерации кода. По сравнению со Spark, Flink реализовал фреймворк сериализации на основе памяти, который поддерживает концепцию ключа и указателя, его ключ хранится постоянно, и некоторые оптимизации будут выполняться на уровне процессора, а вероятность промаха кеша крайне мала. При сравнении и сортировке нет необходимости сравнивать реальные данные.Сначала сравните ключ через этот ключ.Только при его равенстве данные будут десериализованы из памяти,а потом будут сравниваться конкретные данные.Это хорошая производительность точка оптимизации.

Цепочка задач Flink:

Цепочка операторов в Task — хорошая концепция. Если распределение восходящего и нисходящего потока не нужно снова тасовать, например, источник на рисунке — это источник kafka, а следующая карта — это просто простой фильтр данных, мы помещаем его в поток, чтобы уменьшить стоимость переключение контекста потока.

Концепция параллелизма

Например, в нем будет 5 задач и одновременно будет запущено несколько потоков, если цепочка вверх, то она будет запущена в одном потоке для повышения производительности передачи данных. Spark — это черный ящик, и каждый оператор не может устанавливать степень параллелизма, в то время как Flink может устанавливать степень параллелизма для каждого оператора, что может быть более гибким, а использование ресурсов задания выше.

Spark обычно регулирует степень параллелизма с помощью Spark.default.parallelism. Если есть операция перемешивания, степень параллелизма обычно настраивается с помощью параметра Spark.sql.shuffle.partitions. При расчете в реальном времени его следует корректировать. на меньшее значение, как у нас в продакшне и кафке.Количество партиций примерно такое же, а партия будет больше в продакшене.Выставляем на 1000.На картинке слева ставим concurrency на 2 , а максимум 10. Таким образом, мы сначала разделяем на 2 одновременных прогона, а затем делаем группировку по ключу.Концепцию , можно разделить не более чем на 10 групп, а данные можно разбить как насколько это возможно.

State & Checkpoint

Поскольку данные Flink обрабатываются один за другим, каждый фрагмент данных в Flink отправляется в нисходящий поток сразу после обработки, в отличие от spark, которому необходимо дождаться выполнения всех задач на этапе, где находится оператор, прежде чем отправить его вниз.

Flink имеет крупнозернистый механизм контрольных точек, который присваивает концепцию моментального снимка каждому элементу за очень небольшую плату.Расчет запускается только тогда, когда поступают все данные, принадлежащие этому снимку.После расчета данные буфера отправляются вниз, в настоящее время Flink sql не предоставляет интерфейс для управления временем ожидания буфера, то есть сколько времени требуется для отправки моих данных в буфер. При построении контекста Flink можно указать время ожидания буфера равным 0, и обработанные данные будут отправлены немедленно, и нет необходимости ждать достижения определенного порога перед отправкой.

Бэкэнд по умолчанию поддерживается в памяти jobmanager.Что мы используем больше, так это запись в hdfs.Статус каждого оператора записывается в rockdb, а затем асинхронный цикл инкрементно синхронизируется с внешним хранилищем.

Отказоустойчивость

Красный узел в левой половине рисунка имеет отработку отказа. Если это хотя бы один раз, можно повторно отправить данные в самом восходящем направлении; но если это ровно один раз, каждый вычислительный узел должен начать с последняя неудача. Время переиграть.

Exactly Once Two-Phase Commit

После Flink 1.4 появились двухфазные фиксации для поддержки ровно один раз. Его концепция заключается в том, что после потребления данных из вышестоящей кафки каждый шаг будет инициировать голосование для записи статуса, обработки метки через барьер контрольной точки и записи ее в кафку (версия после 0.11) только в конце и только после окончательного завершение, будет ли это статус каждого шага может быть закреплен координатором в менеджере по работе, чтобы уведомить, чтобы достичь ровно один раз.

Savepoints

Еще одна хорошая вещь о Flink заключается в том, что он реализует функцию точки сохранения на основе своей контрольной точки. Бизнес-сторона требует, чтобы каждый узел восстановления приложения был разным, а также можно было указать восстанавливаемую версию, что лучше. Эта точка сохранения не только восстанавливает данные, но и восстанавливает состояние вычислений.

Функции:

  1. Триггер (время обработки, время события, время приема): Напротив, Flink поддерживает более богатую семантику потоковой передачи, а не только поддерживаетProcessing Time, также поддерживаетEvent timeиIngestion Time.

2)Continuous Processing & Window: Поддерживает непрерывную обработку в чистом виде, запись за записью, и оконная тоже лучше, чем Spark.

  1. Low End-To-End Latency With Exactly-Once Guarantees: поскольку отправка выполняется в два этапа, пользователи могут выбрать настройку в соответствии с потребностями бизнеса, чтобы обеспечить однократную сквозную передачу за счет определенной пропускной способности.

  2. CEP: Хорошо поддерживается.

  3. Savepoints: Вы можете осуществлять некоторый контроль версий в соответствии с потребностями бизнеса.

Есть и плохие:

1)SQL (синтаксическая функция, параллелизм): функция SQL не очень полная.Большинство пользователей мигрировали из улья, а Spark поддерживает охват улья более 99%. Функции SQL не поддерживаются, и в настоящее время невозможно установить параллелизм для одного оператора.

  1. машинное обучение, график и т. д.: машинное обучение, графовые вычисления и другие области немного слабее, чем Spark, но сообщество также сосредоточено на постоянном улучшении этой проблемы.

Последующее планирование

Поскольку Ele.me теперь является членом Alibaba, в будущем Flink будет использоваться чаще, и я также с нетерпением жду возможности использовать Blink.

Для получения дополнительной информации, пожалуйста, посетитеВеб-сайт китайского сообщества Apache Flink