Как выполнить настройку производительности Spark? Обмен реальными боями с большими данными

Spark Apache Hive
Как выполнить настройку производительности Spark? Обмен реальными боями с большими данными

предисловие

В настоящее время Spark является основным механизмом обработки больших данных.Его функции охватывают различные типы вычислительных операций, такие как автономная пакетная обработка, SQL-подобная обработка, потоковые вычисления/вычисления в реальном времени, машинное обучение и графические вычисления в области больших данных. . какПлатформа вычислений в памяти, ИскраБыстрая работа, и может удовлетворить различные требования к расчету и обработке данных, такие как UDF, объединение таблиц размеров и множественный вывод.

Являясь отечественным профессиональным поставщиком услуг по анализу данных,Getui представил Spark из ранней версии 1.3, а также создать хранилище данных на основе Spark для выполнения вычислений больших объемов данных в автономном режиме и в режиме реального времени. Поскольку оптимизация Spark до версии 2.x была сосредоточена на вычислительном движке, никаких серьезных улучшений и обновлений в управлении метаданными не производилось. Поэтому Getui по-прежнему использует Hive для управления метаданными, используяУправление метаданными Hive + вычислительный движок Sparkдля поддержки развития собственного бизнеса больших данных. Getui также широко использует Spark для анализа отчетов, машинного обучения и других сценариев, чтобы предоставить отраслевым клиентам и государственным учреждениямАнализ населения в режиме реального времени и построение группового портретаи другие услуги.

▲В реальном бизнес-сценарии SparkSQL и HiveSQL использовались для расчета данных 3T, и на приведенном выше рисунке показана скорость работы. Данные показывают, что при условии блокировки очереди (120 ГБ памяти,

Для предприятий эффективность и стоимость всегда являются теми вопросами, на которые они должны обращать внимание при обработке и вычислении больших объемов данных. Как в полной мере использовать преимущества Spark, чтобы действительно сократить расходы и повысить эффективность операций с большими данными? Толчок будет накапливаться годамиСоветы по настройке производительности SparkОбобщил и поделился с вами.

Настройка производительности Spark — основы

Как мы все знаем, правильная настройка параметров может значительно повысить эффективность Spark. следовательно,против Для пользователей Spark, которые не понимают основных принципов, мы предоставляем шаблон конфигурации параметров, который может напрямую копировать задания., чтобы помочь разработчикам связанных данных и аналитикам более эффективно использовать Spark для автономной пакетной обработки и анализа отчетов SQL.

Рекомендуемый шаблон конфигурации параметров выглядит следующим образом:

  1. Скрипт метода отправки Spark-submit

  1. Скрипт метода отправки spark-sql

Настройка производительности Spark — расширенный

Для читателей, которые хотят понять основные принципы Spark, в этой статье представлены диаграммы взаимодействия трех распространенных методов отправки задач, таких как автономный, Yarn-клиент и Yarn-кластер, чтобы помочь соответствующим пользователям понять основные технические принципы Spark. Spark более интуитивно понятен и подходит для чтения. Следующие расширенные главы закладывают прочную основу.

standalone

  1. Spark-submit отправляет и создает процесс DriverActor путем отражения;

  2. Процесс Driver выполняет написанное приложение, конструирует sparkConf и конструирует sparkContext;

  3. При инициализации SparkContext создаются DAGScheduler и TaskScheduler, и причал запускает webui;

  4. TaskScheduler имеет процесс sparkdeployschedulebackend для связи с Мастером и запроса на регистрацию приложения;

  5. После того, как мастер принимает сообщение, он регистрирует приложение, использует алгоритм планирования ресурсов, уведомляет обработчик и позволяет обработчику запустить исполнителя;

  6. Рабочий запустит исполнитель для приложения, и после запуска исполнителя он будет зарегистрирован в TaskScheduler в обратном порядке;

  7. После того, как все Исполнители обратно зарегистрированы в TaskScheduler, Драйвер завершает инициализацию sparkContext;

  8. Драйвер продолжает выполнять написанное приложение, и каждый раз при выполнении действия будет создаваться задание;

  9. Задание будет отправлено в DAGScheduler, и DAGScheduler разделит задание на несколько этапов (алгоритм разделения этапов), и каждый этап создает набор задач;

  10. TaskScheduler будет передавать каждую задачу в наборе задач исполнителю для выполнения (алгоритм распределения задач);

  11. Каждый раз, когда Executor получает задачу, он использует taskRunner для инкапсуляции задачи, а затем извлекает поток из пула потоков исполнителя для выполнения taskRunner. (выполнение задачи: скопируйте написанный код/оператор/функцию, десериализуйте, а затем выполните задачу).

Yarn-client

  1. Отправить запрос в ResourceManager (RM), запрос на запуск ApplicationMaster (AM);

  2. RM размещает контейнер в NodeManager (NM) и запускает AM, который на самом деле является ExecutorLauncher;

  3. AM относится к RM для контейнера;

  4. RM назначает контейнер AM;

  5. AM запрашивает у NM запуск соответствующего Executor;

  6. После запуска исполнителя он обратно регистрируется в процессе Драйвера;

  7. Последующее разделение этапов, отправка набора задач аналогична автономному режиму.

Yarn-cluster

  1. Отправить запрос в ResourceManager (RM), запрос на запуск ApplicationMaster (AM);

  2. RM размещает контейнер на определенном NodeManager (NM) и запускает AM;

  3. AM относится к RM для контейнера;

  4. RM назначает контейнер AM;

  5. AM запрашивает у NM запуск соответствующего Executor;

  6. После запуска исполнителя он регистрируется обратно в AM;

  7. Последующее разделение этапов, отправка набора задач аналогична автономному режиму.

После понимания лежащего в основе взаимодействия трех вышеупомянутых общих задач следующая статья начинается сФормат хранения, перекос данных, конфигурация параметровПодождите, пока 3 аспекта будут расширены, и поделитесь с вами расширенной позицией по настройке производительности Spark.

Формат хранения (формат файла, алгоритм сжатия)

Как мы все знаем, разные SQL-движки имеют разные методы оптимизации для разных форматов хранения, например, Hive предпочитает orc, а Spark — паркет. В то же время при выполнении операций с большими даннымиТочечный поиск, запросы к широким таблицам и операции объединения больших таблиц встречаются относительно часто., для которого требуется формат файла, который лучше всего использоватьСтолбчатое хранилище и разделяемое. Поэтому мы рекомендуем форматы файлов колоночного хранилища на основе parquet и orc, а также алгоритмы сжатия на основе gzip, snappy и zlib. В сочетании мы рекомендуем использоватьпаркет+gzip, орк+zlibсочетание, такое сочетаниеПринимая во внимание колоночное хранение и делимостьПо сравнению с неразделимой комбинацией txt+gz, он больше подходит для нужд описанных выше сценариев больших данных.

Взяв в качестве примера онлайн-данные объемом около 500 ГБ, Getui провела тесты производительности для различных форматов файлов хранилища и комбинаций алгоритмов в различных кластерных средах и механизмах SQL. Данные испытаний показывают, что:При тех же условиях ресурсов формат хранения parquet+gz как минимум на 60 % быстрее, чем формат хранения text+gz, в многозначных запросах и соединениях с несколькими таблицами.

В сочетании с результатами тестов мы определили рекомендуемые форматы хранения для различных кластерных сред и механизмов SQL, как показано в следующей таблице:

В то же время мы также проверили потребление памяти parquet+gz и orc+zlib.Взяв в качестве примера один исторический раздел данных таблицы, parquet+gz и orc+zlib экономят 26% и 49% дискового пространства соответственно по сравнению с txt+gz.

Полные результаты теста следующие:

Видно, что parquet+gz и orc+zlib действительно эффективны в снижении затрат и повышении эффективности.Итак, как использовать эти два формата хранения? Действуйте следующим образом:

➤hive и spark включают алгоритм сжатия указанного формата файла

  • spark: set spark.sql.parquet.compression.codec=gzip; set spark.sql.orc.compression.codec=zlib;
  • hive: set hive.exec.compress.output=true; set mapreduce.output.fileoutputformat.compress=true; set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec;

➤ Укажите формат файла при создании таблицы

  • parquet 文件格式(序列化,输入输出类) CREATE EXTERNAL TABLE `test`(rand_num double) PARTITIONED BY (`day` int) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' ;
  • orc 文件格式(序列化,输入输出类) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat' ;

➤Онлайн настройка часов

ALTER TABLE db1.table1_std SET TBLPROPERTIES ('parquet.compression'='gzip');ALTER TABLE db2.table2_std SET TBLPROPERTIES ('orc.compression'='ZLIB');

➤ктас создать таблицу

create table tablename stored as parquet as select ……;create table tablename stored as orc TBLPROPERTIES ('orc.compress'='ZLIB')  as select ……;

перекос данных

Существует два типа перекоса данных: перекос карты и перекос уменьшения. Эта статья посвященауменьшить наклон, такие как группировка, объединение, которые распространены в SQL, могут быть наиболее тяжелыми. Когда возникает перекос данных, это обычно проявляется следующим образом:Некоторые задачи выполняются значительно медленнее, чем задачи в том же пакете, объем данных задачи значительно больше, чем у других задач, а некоторые задачи теряют файлы OOM и spark shuffle и т. д.. Как показано в приведенном ниже примере, в столбце «Длительность» и столбце «shuffleReadSize/Records» мы можем четко обнаружить, что объем данных, обрабатываемых некоторыми задачами, значительно увеличивается, а время, затрачиваемое на них, увеличивается, что приводит к перекосу данных:

Как решить проблему искажения данных?

Мы собрали 7 решений для исправления искажений данных, которые могут помочь вам решить распространенные проблемы искажения данных:

Решение 1. Предварительная обработка данных с помощью Hive ETL

То есть в кровных отношениях данных проблема перекоса продвигается вперед, так что нижестоящим пользователям не нужно учитывать проблему перекоса данных.

Эта программа распространяется наСильное нисходящее взаимодействиеуслуги, такие как запрос второго уровня/минутного уровня.

Решение 2. Отфильтруйте несколько ключей, вызывающих перекос

То есть убрать перекошенный большой ключ, схема вообще такаяИспользование с процентилями, если 99,99% записей с идентификаторами находятся в пределах 100 записей, то можно считать, что идентификаторы, отличные от 100 записей, исключены.

программаПолезно в статистических сценариях, а в подробном сценарии вам нужно увидеть, находится ли отфильтрованный большой ключ в центре внимания и внимания бизнеса.

Решение 3. Улучшите параллелизм операций перемешивания

То есть параметры spark.sql.shuffle.partitions динамически настраиваются, а количество разделов, записываемых задачей произвольной записи, увеличивается для достижения равномерного распределения ключей. SparkSQL2.3 По умолчанию значение равно 200. Разработчики могут добавить в сценарий запуска следующие параметры для динамической настройки значения:

  • conf spark.sql.shuffle.partitions=10000
  • conf spark.sql.adaptive.enabled=true
  • conf spark.sql.adaptive.shuffle.targetPostShuffleInputSize=134217728

программаочень простой, но он может играть более важную роль в оптимизации равномерного распределения ключей. Например, если изначально имеется 10 ключей, каждый с 50 записями, и только 1 раздел, то следующей задаче необходимо обработать 500 записей. При увеличении количества разделов каждая задача может обрабатывать 50 записей, 10 задач выполняются параллельно, а затраты времени составляют всего 1/10 от исходной задачи.Однако эту схему трудно оптимизировать для больших ключей.Например, если для большого ключа имеются миллионы записей, большой ключ все равно будет выделен одной задаче.

Решение 4. Преобразование reducejoin в mapjoin

Относится к объединению на стороне карты без процесса перемешивания. Взяв Spark в качестве примера, данные небольшого RDD могут быть отправлены на каждый рабочий узел (NM в режиме Yarn) с помощью широковещательных переменных, и соединение выполняется на каждом рабочем узле.

программаПодходит для небольших столов, соединяющих большие столы(Объем данных более 100 Гб). Пороговое значение по умолчанию для небольших таблиц здесь составляет 10 М. Небольшие таблицы ниже этого порога могут быть распределены по рабочим узлам. Конкретный регулируемый верхний предел должен быть меньше памяти, выделенной контейнером.

Решение 5. Выборка искаженных ключей и разделение операций соединения

Как показано в следующем примере: таблица соединяется с таблицей B, таблица A имеет большой ключ, таблица B не имеет большого ключа, идентификатор большого ключа равен 1, и имеется 3 записи.

Как разделить операцию соединения?

  • Во-первых, id1 в таблице A и таблице B разбиваются отдельно, а A' и B' больших ключей исключаются первыми, чтобы соединиться для достижения ненаклонной скорости;

  • Добавляем случайный префикс к большому ключу таблицы А, расширяем вместимость таблицы Б в N раз и соединяемся отдельно, только убираем случайный префикс после соединения;

  • Затем соедините две вышеуказанные части.

**⁕** Суть этого решения заключается в снижении риска искажения данных, возникающего, когда одна задача обрабатывает слишком много данных.Подходит для менее крупных ключейСлучай.

Решение 6. Присоединитесь, используя случайный префикс и масштабируя RDD

Например, когда таблица A присоединяется к таблице B, возьмем пример, в котором таблица A имеет большой ключ, а таблица B не имеет большого ключа:

  • Добавьте случайный префикс [1,n] к каждой записи в таблице A, расширьте таблицу B в N раз и выполните соединение.

  • Удалите случайные префиксы после завершения соединения.

программаПодходит для более крупных клавиш, но и увеличивает потребление ресурсов.

Решение седьмое: объединитель

То есть операция объединения выполняется на стороне карты, чтобы уменьшить объем данных, получаемых при тасовке.

**⁕** Эта схема подходит для таких сценариев, как накопление и суммирование.

В реальных сценариях рекомендуется, чтобы соответствующие разработчики анализировали конкретную ситуацию, и вышеуказанные методы также могут использоваться в сочетании для решения сложных проблем.

Конфигурация параметров искры

В случае отсутствия перекоса данных мы отсортировали и обобщили справочную таблицу конфигурации параметров, чтобы помочь вам выполнить настройку производительности Spark. Настройки этих параметров подходят для понимания и применения данных около 2T и в основном отвечают потребностям настройки в большинство сценариев.

Суммировать

В настоящее время Spark разработан до Spark3.x, а последняя версия Spark 3.1.2 выпущена (01 июня 2021 г.). Многие новые функции Spark 3.x, такие как динамическая обрезка разделов, значительные улучшения API-интерфейса Pandas, а также улучшенное отсечение и сжатие вложенных столбцов, дают хорошую идею для дальнейшего снижения затрат и повышения эффективности. В будущем Getui будет продолжать уделять внимание развитию Spark, а также продолжать практиковаться и делиться.