Сравнение производительности между Flink 1.10 и Hive 3.0 (с демонстрационной презентацией PPT)

Flink

Автор: Ли Цзиньсон (Письмо)

Сегодняшние пакетные вычисления больших данных, с учетом зрелости хранилища данных Hive, распространенной моделью является хранилище метаданных Hive + вычислительный движок. Распространенными вычислительными механизмами являются Hive на MapReduce, Hive на Tez, Hive на Spark, Spark с интеграцией Hive, Presto с интеграцией Hive и Flink Batch SQL, доступные для производства после выпуска Flink 1.10.

Являясь унифицированным вычислительным механизмом, Flink стремится предоставить унифицированный пакетный и потоковый опыт и технологический стек. Flink объединил код Blink в версии 1.9 и улучшил большое количество функций и производительность в версии 1.10. Он может выполнять все запросы TPC-DS, и его производительность также очень конкурентоспособна. Flink 1.10 — это готовая к производству версия унифицированного ядра SQL с пакетным потоком. .

В процессе создания вычислительной платформы производительность и стоимость являются ключевыми факторами при выборе вычислительной машины. С этой целью проект Ververica flink-sql-benchmark [1] предоставляет инструменты для тестирования TPC-DS Benchmark на основе Hive Metastore, чтобы проводить тестирование ближе к реальным рабочим заданиям:

  • Все входные таблицы для теста являются стандартными таблицами Hive, а все данные находятся в хранилище данных Hive, совместимом с рабочей средой. Другие вычислительные машины также могут легко анализировать эти таблицы.
  • Формат данных использует ORC.ORC является широко используемым форматом производственных файлов, который обеспечивает более высокую степень сжатия и лучшую производительность чтения.
  • Выберите набор данных объемом 10 ТБ в тесте TPC-DS Benchmark. Набор данных объемом 10 ТБ является относительно распространенным производственным масштабом. Если это всего 1 ТБ, его можно запустить в традиционной базе данных, которая не подходит для тестирования больших данных.

Мы протестировали три движка на 20 машинах: Flink 1.10, Hive 3.0 на MapReduce, Hive 3.0 на Tez и проверили производительность движка по двум измерениям:

  • общая продолжительность: Интуитивно понятные данные о производительности, но на них могут сильно влиять отдельные запросы.
  • среднее геометрическое: указывает центральную тенденцию набора чисел, которая может лучше устранить большее влияние отдельных запросов и представить более точное среднее значение.

Резюме результатов:

  • Flink 1.10 VS Hive 3.0 on MapReduce
  • Общая продолжительность Flink в 8,7 раз выше, чем у Hive на MapReduce.
  • Среднегеометрическая производительность Flink Queries в 7,8 раз выше, чем у Hive на MapReduce.
  • Flink 1.10 VS Hive 3.0 on Tez
  • Общая продолжительность Flink в 2,1 раза выше, чем у Hive на Tez.
  • Среднегеометрическая производительность Flink Queries в 2,0 раза выше, чем у Hive на Tez.

Сравнительные результаты по общему времени работы:

1.jpg

Среднее геометрическое сравнения запросов:

2.jpg

В этой статье тестировались только указанные выше движки и наборы данных объемом 10 ТБ. Читатели могут выбрать конкретный набор данных в соответствии с размером своего кластера и использовать инструмент flink-sql-benchmark, чтобы запустить дополнительные сравнительные тесты движков.

Детали эталона

Эталонная среда

Конкретная среда и инструкции по настройке:

  • Вычислительная среда: 20 машин, параметры машины: 64-ядерный процессор Intel, 256 ГБ памяти, 1 SSD-диск для вычислительного движка, несколько SATA-дисков для HDFS, 10-гигабитная сетевая карта.
  • Кластерная среда: Yarn + HDFS + Hive.
  • Параметры флинка: flink-conf.yaml[2].
  • Параметры куста: пороговое значение MapJoin в основном настроено для повышения производительности и предотвращения OOM.
  • Выбрал более новую версию Hadoop (3.X) и выбрал более новую версию Hive и Tez.

Контрольные шаги

Подготовка окружающей среды

  1. Подготовьте среду Hadoop (HDFS + YARN)
  2. Подготовьте среду Hive

■ Генерация набора данных

  1. Распределенно сгенерируйте набор данных TPC-DS и загрузите набор данных TEXT в Hive, исходные данные находятся в формате Csv. Рекомендуется генерировать данные распределенным образом, что также требует много времени. (инструменты, которые интегрируют TPC-DS в инструмент flink-sql-benmark)
  2. Таблица Hive TEXT преобразуется в таблицу ORC.Формат ORC является распространенным форматом файла данных Hive.Смешанное хранилище строк и столбцов способствует последующему быстрому анализу и имеет высокую степень сжатия. Выполнить запрос: создать таблицу{NAME} stored as{FILE} as select * from {SOURCE}.{NAME};

3.jpg

Как показано на рисунке, создаются 7 таблиц фактов и 17 таблиц измерений, официально описанных TPC-DS.

  1. При анализе таблиц Hive статистика очень важна для оптимизации запросов аналитических заданий.Для сложного SQL эффективность выполнения Plan сильно различается. Flink не только поддерживает чтение статистики таблиц Hive, но также поддерживает чтение статистики разделов Hive и оптимизирует CBO в соответствии со статистикой. Выполните команду: проанализировать таблицу ${NAME}, вычислить статистику по столбцам;

4.jpg

Flink запускает запросы

  1. Подготовьте среду Flink и создайте среду сеанса Flink Yarn.Рекомендуется использовать автономный режим или режим сеанса, который может повторно использовать процесс Flink и ускорить выполнение аналитических задач.
  2. Напишите код для запуска запросов, подсчета времени выполнения и другой связанной информации. Конкретный код может напрямую повторно использовать проект flink-tpcds в flink-sql-benchmark.
  3. FLINK_HOME/flink run Запустить программу, выполнить все запросы, дождаться завершения выполнения и подсчитать время выполнения.

5.jpg

Другие движки запускают запросы

  1. Соберите окружение по инструкции на официальном сайте других движков.
  2. Благодаря стандартному набору данных Hive легко использовать другие механизмы для чтения данных Hive.
  3. При запуске стоит отметить, что нужно достичь узкого места кластера, такого как ЦП, например, Диск.Должно быть узкое место, чтобы доказать, что режим работы и параметры являются разумными.Для этого некоторая настройка производительности требуется.

Сравнительный анализ

Flink 1.10

Когда Flink 1.9 объединил код Blink, была проделана большая работа: глубокая генерация кода, хранение и расчет двоичных файлов, идеальная оптимизация CBO, Batch Shuffler, что заложило прочную основу для последующих прорывов в производительности.

Flink 1.10 продолжает улучшать интеграцию Hive и достиг стандартов интеграции Hive на уровне производства.Другие также проделали большую работу с точки зрения производительности и готовых решений:

  • Поддержка нескольких версий Hive, поддерживает основные версии после Hive 1.0.
  • В настоящее время векторизованное чтение ORC включено по умолчанию только в Hive 2.0 и более поздних версиях.
  • Поддержка Hive версии 1.X уже выполняется: FLINK-14802 [3]
  • Поддержка векторизованного чтения для Parquet также находится в разработке: FLINK-11899 [4]
  • Эластичное распределение памяти на основе пропорций не только помогает оператору использовать больше памяти, но и значительно упрощает настройку пользователя.Пользователям больше не нужно настраивать память оператора.Оператор может гибко получать память в соответствии со слотом, что улучшает выход Flink готовое использование. Подробнее см. FLIP-53 [5].
  • Сжатие в случайном порядке: по умолчанию Flink позволяет пакетным заданиям сбрасывать промежуточные данные на диск, что помогает избежать возможности планирования взаимоблокировок и обеспечивает хороший механизм отказоустойчивости.Однако большое количество сбрасываемых дисков может привести к узким местам задания в пропускной способности диска. Поэтому Flink 1.10 разработал сжатие Shuffle с использованием ЦП для ввода-вывода.
  • Новая структура планирования: Flink 1.10 также представляет новую структуру планирования, которая улучшает производительность планирования JobMaster и позволяет избежать того, чтобы JobMaster стал узким местом производительности, когда параллелизм слишком велик.

Параметрический анализ Флинка

Flink 1.10 оптимизировал множество параметров, чтобы улучшить работу пользователя «из коробки», однако из-за некоторых ограничений интеграции пакетного потока в настоящее время необходимо установить некоторые настройки параметров, которые кратко проанализированы в этой статье. .

Параметры слоя таблицы:

  • table.optimizer.join-reorder-enabled = true: его нужно открывать вручную. В настоящее время JoinReorder основных движков редко открывается по умолчанию. Когда статистическая информация относительно полна, его можно открыть. Вообще говоря, переупорядочить ошибки относительно редки.
  • table.optimizer.join.broadcast-threshold = 1010241024: значение по умолчанию изменено с 1 МБ на 10 МБ.В настоящее время механизм широковещательной передачи Flink нуждается в улучшении, поэтому значение по умолчанию составляет 1 МБ, но если масштаб параллелизма не такой большой, его можно открыть до 10 МБ.
  • table.exec.resource.default-parallelism = 800: Настройка параллелизма оператора.Для ввода 10T рекомендуется открыть параллелизм до 800. Не рекомендуется иметь слишком много параллелизма.Чем больше параллелизма, тем больше давление на все аспекты системы.

Анализ параметров TaskManager:

  • taskmanager.numberOfTaskSlots = 10: количество слотов в одном TM.
  • Параметры памяти TaskManager: память TaskManager в основном делится на три типа: память управления, сетевая память и другая память, связанная с JVM. Вам необходимо понимать документацию на официальном сайте, чтобы эффективно устанавливать эти параметры.
  • taskmanager.memory.process.size = 15000m: общая память TaskManager за вычетом другой памяти обычно зарезервирована для 3-5 ГБ памяти в куче.
  • taskmanager.memory.managed.size = 8000m: Управляемая память для расчетов Оператора.Разумно оставить 300-800 МБ памяти для одного слота.
  • taskmanager.network.memory.max = 2200 МБ: для двухточечной связи задачи требуется 4 буфера.Согласно расчету параллелизма предполагается, что требуется 2 ГБ.Это можно выяснить, попробовав, что если буферов недостаточно, будет выброшено исключение.

Анализ сетевых параметров

  • taskmanager.network.blocking-shuffle.type = mmap: чтение в случайном порядке использует mmap для управления памятью непосредственно системой, что является более удобной формой.
  • taskmanager.network.blocking-shuffle.compression.enabled = true: Shuffle использует сжатие.Этот параметр используется для пакетного мультиплексирования потоков.Настоятельно рекомендуется включать сжатие для пакетных заданий, иначе узкое место будет на диске.

■** Анализ параметров расписания**

  • cluster.evenly-spread-out-slots = true: равномерно распределять задачи в каждом диспетчере задач при планировании задач, что выгодно для использования всех ресурсов.
  • jobmanager.execution.failover-strategy = region: глобальная повторная попытка по умолчанию, вам необходимо включить повторную попытку для региона, чтобы включить одноточечную отработку отказа.
  • Стратегия перезапуска = фиксированная задержка: стратегию повтора необходимо установить вручную.По умолчанию нет повтора.

Другие параметры, связанные с тайм-аутом, используются, чтобы избежать дрожания сети, вызванного большим объемом данных во время планирования и выполнения, что может привести к сбою задания.

Flink 1.11 и последующие планы

В будущем сообщество Flink продолжит повышать производительность, улучшая функции:

  • Предоставляет шлюз SQL и драйвер JDBC, а в настоящее время предоставляет независимое хранилище для Flink 1.10. [6] [7]
  • Обеспечьте режим совместимости синтаксиса Hive, чтобы избежать путаницы для пользователей Hive.
  • Улучшите векторизованное чтение для ORC и Parquet.
  • Оператор N-арного потока [8]: разработайте цепную структуру уровня таблицы, чтобы еще больше избежать накладных расходов, вызванных Shuffle.

Ссылка на ссылку:

[1]GitHub.com/ver V Эрика/… [2]GitHub.com/ver V Эрика/… [3]Поскольку ah.apache.org/now ah/browse… [4]Issues.apache.org/since-ah/browse… [5]Из wiki.Apache.org/confluence/… [6]GitHub.com/ver V Эрика/… [7]GitHub.com/ver V Эрика/… [8]Из wiki.Apache.org/confluence/…

# Большие преимущества #

PPT «Демонстрация: создание автономных приложений на основе Flink SQL» уже здесь! Подпишитесь на общедоступную учетную запись WeChat «Китайского сообщества Flink» и ответьте на ключевое слово «0218SQL» в фоновом режиме, чтобы получить демо-презентацию PPT этого живого курса~