Настройка производительности Spark — настройка в случайном порядке и устранение неполадок

Большие данные

Настройка в случайном порядке для настройки искры

Этот раздел начинается с объясненияПеремешать основные понятия; тогда дляHashShuffle,SortShuffleнастраивать;сторона карты,уменьшить сторонуТюнинг; затем для Spark вперекос данныхПроанализируйте и настройте проблему; наконец, Spark работаетИсправление проблем.

Эта статья была впервые опубликована в публичном аккаунте【Пять минут на изучение больших данных], эта учетная запись посвящена технологии больших данных и делится высококачественными оригинальными техническими статьями о больших данных.

1. Основная концепция Shuffle

1. ShuffleMapStage и ResultStage

ShuffleMapStage与ResultStage

При делении сценыПоследний этап называется FinalStage., который по сути является объектом ResultStage,Все предыдущие этапы называются ShuffleMapStage..

Окончание ShuffleMapStage сопровождается записью файла перемешивания на диск.

ResultStage в основном соответствует оператору действия в коде, то есть применение функции к набору данных каждого раздела RDD означает завершение операции задания.

2. Количество задач в Shuffle

Мы знаем, что Spark Shuffle делится на стадию сопоставления и стадию уменьшения, или называемую стадией ShuffleRead и стадией ShuffleWrite, тогда для Shuffle процесс сопоставления и процесс сокращения будут выполняться несколькими задачами, тогда количество задач сопоставления и уменьшение задач равно Как быть уверенным?

Предполагая, что задачи Spark читают данные из HDFS, затемКоличество начальных разделов RDD определяется количеством разбиений в файле., это,Разделение соответствует разделу сгенерированного RDD, мы предполагаем, что начальное количество разделов равно N.

После того, как начальный RDD вычисляется с помощью ряда операторов (при условии, что операторы переразбиения и объединения не выполняются для переразбиения, количество разделов остается таким же, как N, и если оператор перераспределения передается, количество разделов становится M) , мы Предполагая, что количество разделов остается неизменным, когда выполняется операция Shuffle,Количество задач на стороне карты равно количеству разделов, то есть количество задач карты равно N.

Этап на стороне уменьшения принимает значение по умолчанию.spark.default.parallelismЗначение этого элемента конфигурации используется в качестве количества разделов.Если он не настроен, в качестве количества разделов используется количество разделов последнего RDD на стороне карты.(то есть N), то количество разделов определяет количество задач на стороне сокращения.

3. Чтение данных на стороне уменьшения

В соответствии с разделением на этапы мы знаем, что задачи на стороне карты и задачи на стороне редукции не находятся на одном и том же этапе.Задача карты находится в ShuffleMapStage.,задача уменьшить находится в ResultStage, задача сопоставления будет выполняться первой, так как же задача сокращения, выполняемая позже, узнает, куда извлекать данные после размещения задачи сопоставления?

Процесс извлечения данных на стороне уменьшения выглядит следующим образом.:

  1. После выполнения задачи карты такая информация, как статус расчета и расположение небольшого файла на диске, будет инкапсулирована в объект MapStatus, а затем объект MapOutPutTrackerWorker в этом процессе отправит объект mapStatus объекту MapOutPutTrackerMaster объекта процесс Драйвер;
  2. Перед началом выполнения задачи сокращения MapOutputTrackerWorker в этом процессе сначала отправит запрос MapoutPutTrakcerMaster в процессе Driver, чтобы запросить информацию о расположении небольших файлов на диске;
  3. Когда все задачи Map выполнены, MapOutPutTrackerMaster в процессе Driver получает информацию о расположении всех небольших файлов на диске. В это время MapOutPutTrackerMaster сообщит MapOutPutTrackerWorker информацию о расположении небольшого файла на диске;
  4. После завершения предыдущих операций BlockTransforService будет извлекать данные из узла, где находится Executor0, и по умолчанию будут запущены пять подпотоков. Объем данных, извлекаемых каждый раз, не может превышать 48 МБ (задача сокращения каждый раз извлекает максимум 48 МБ данных и сохраняет извлеченные данные в 20 % памяти исполнителя).

Во-вторых, анализ HashShuffle.

В следующем обсуждении предполагается, что каждый Executor имеет 1 ядро ​​процессора.

1. Неоптимизированный HashShuffleManager

На этапе записи в случайном порядке после того, как этап завершает расчет, для следующего этапа могут выполняться операторы, подобные перетасовке (например, reduceByKey), а данные, обрабатываемые каждой задачей, «делятся» по ключу. «Отделение» означаетВыполнить алгоритм хеширования на том же ключе, так что один и тот же ключ записывается в один и тот же файл на диске, и каждый файл на диске принадлежит только одной задаче нижестоящего этапа.Перед записью данных на диск данные сначала будут записаны в буфер памяти, когда буфер памяти будет заполнен, он переполнится файлом на диске..

Сколько задач будет на следующем этапе и сколько дисковых файлов будет создано для каждой задачи на текущем этапе. Например, на следующем этапе всего 100 задач, тогда каждая задача текущего этапа должна создать 100 файлов на диске. Если на текущем этапе 50 задач, всего 10 Исполнителей, и каждый Исполнитель выполняет 5 задач, то всего на каждом Исполнителе будет создано 500 дисковых файлов, а на всех Исполнителях будет создано 5000 дисковых файлов. Отсюда видно, чтоКоличество дисковых файлов, созданных неоптимизированной операцией случайной записи, ошеломляет..

Этап чтения в случайном порядке обычно является тем, что этап делает в начале. В это время на сценеКаждой задаче нужно подтянуть все одинаковые ключи в результатах вычислений предыдущего этапа от каждого узла через сеть к своему узлу, а затем выполнить операции агрегации ключей или соединения.. В процессе случайной записи задача карты создает файл на диске для каждой задачи редукции на нижестоящей стадии, поэтому в процессе чтения в случайном порядке каждой задаче редукции нужно тянуть только с тех узлов, где все задачи карты вышестоящей стадии Это ваш собственный файл на диске.

Процесс вытягивания при случайном чтении заключается в агрегировании во время вытягивания. Каждая задача случайного чтения будет иметь свой собственный буфер,Каждый раз можно извлекать только данные того же размера, что и буфер., а затем выполнять такие операции, как агрегирование с помощью Map в памяти. После агрегирования пакета данных извлекается следующий пакет данных и помещается в буфер для операций агрегирования. И так далее, пока наконец не будут подтянуты все данные и не будет получен окончательный результат.

Как работает неоптимизированный HashShuffleManagerКак показано ниже:

未优化的HashShuffleManager工作原理

2. Оптимизированный HashShuffleManager

Для оптимизации HashShuffleManager мы можем установить параметр:spark.shuffle.consolidateFiles, значение этого параметра по умолчанию — false, а установка значения true включает механизм оптимизации.Если мы используем HashShuffleManager, рекомендуется включить эту опцию.

После включения механизма консолидации в процессе случайной записи задача не создает файл на диске для каждой задачи нижестоящего этапа, который появится в это время.shuffleFileGroupКонцепция чего-либо,Каждая группа shuffleFileGroup соответствует пакету файлов на диске, а количество файлов на диске совпадает с количеством задач на нисходящем этапе. Сколько ядер процессора на Executor, столько задач может выполняться параллельно. Каждая задача, выполняемая параллельно в первом пакете, создаст группу shuffleFileGroup и запишет данные в соответствующий файл на диске..

Когда ядро ​​ЦП Исполнителя завершает выполнение пакета задач, а затем выполняет следующий пакет задач,Следующая партия задач будет повторно использовать существующую группу shuffleFileGroup., включая в него файл на диске, то есть в это время задача будет записывать данные в существующий файл на диске, а не в новый файл на диске. следовательно,Механизм консолидации позволяет различным задачам повторно использовать один и тот же пакет дисковых файлов, так что дисковые файлы нескольких задач могут быть эффективно объединены в определенной степени, что значительно сокращает количество дисковых файлов и повышает производительность случайной записи..

Предполагая, что на втором этапе 100 задач, а на первом этапе 50 задач, всего остается 10 исполнителей (количество процессоров исполнителя равно 1), и каждый исполнитель выполняет 5 задач. Затем, когда изначально используется неоптимизированный HashShuffleManager, каждый исполнитель сгенерирует 500 файлов на диске, а все исполнители сгенерируют 5000 файлов на диске. Но после оптимизации в это время формула для расчета количества файлов на диске, созданных каждым Исполнителем:cpu core的数量 * 下一个stage的task数量, то есть каждый исполнитель создаст только 100 файлов на диске в это время, а все исполнители создадут только 1000 файлов на диске.

Как работает оптимизированный HashShuffleManagerКак показано ниже:

优化后的HashShuffleManager工作原理

Три, анализ SortShuffle

Механизм работы SortShuffleManager в основном делится на два типа, один из которыхнормальный рабочий механизм, другойобходной механизм. Когда количество задач случайного чтения меньше или равноspark.shuffle.sort.bypassMergeThresholdПри значении параметра (по умолчанию 200) включается механизм обхода.

1. Общий рабочий механизм

В этом режимеДанные сначала записываются в структуру данных в памяти., в это время, в соответствии с разными операторами тасования, могут быть выбраны разные структуры данных.Если это оператор тасования класса агрегации, например, reduceByKey, то будет выбрана структура данных Map, и агрегация будет выполняться через Map при записи в память.;Если это обычный оператор тасования, такой как соединение, тогда структура данных Array будет выбрана и записана непосредственно в память.. Затем каждый раз, когда часть данных записывается в структуру данных памяти, будет оцениваться, достигнут ли определенный критический порог. Если достигнут критический порог, делается попытка переполнить данные в структуре данных в памяти на диск, а затем очистить структуру данных в памяти.

Прежде чем переполнение будет записано в файл на диске, существующие данные в структуре данных памяти сортируются в соответствии с ключом. После сортировки данные записываются в файл на диск партиями. Номер пакета по умолчанию — 10 000, то есть отсортированные данные будут записываться в файл на диск пакетами в виде 10 000 данных в пакете. Запись файлов на диск реализована через BufferedOutputStream Java.BufferedOutputStream — буферизованный поток вывода Java.Он сначала буферизует данные в памяти и снова записывает их в файл на диске после переполнения буфера памяти, что может уменьшить количество дисковых операций ввода-вывода и повысить производительность..

Когда задача записывает все данные в структуру данных памяти, происходит несколько операций переполнения диска, а также создается несколько временных файлов. Наконец, все предыдущие временные файлы на диске будут объединены.процесс слияния, данные из всех предыдущих временных файлов на диске будут считаны, а затем по очереди записаны в окончательный файл на диске. Кроме того, поскольку задача соответствует только одному файлу на диске, а значит, все данные, подготовленные задачей для задач нижестоящего этапа, находятся в этом файле, поэтому будет записана отдельная копия.индексный файл, который определяет начальное и конечное смещения данных каждой нижестоящей задачи в файле.

SortShuffleManager значительно сокращает количество файлов благодаря процессу объединения дисковых файлов. Например, на первом этапе 50 задач, всего 10 Исполнителей, каждый Исполнитель выполняет 5 задач, а на втором этапе 100 задач. Поскольку каждая задача в конечном итоге имеет только один файл на диске, в настоящее время на каждом исполнителе имеется только 5 файлов на диске и только 50 файлов на диске для всех исполнителей.

Принцип работы SortShuffleManager обычного операционного механизмаКак показано ниже:

普通运行机制的SortShuffleManager工作原理

2. Механизм обхода

Условия срабатывания механизма байпаса следующие:

  • Количество задач перетасовки карт меньше, чемspark.shuffle.sort.bypassMergeThreshold=200значение параметра.
  • Это не оператор тасования агрегатного класса.

На этом этапе каждая задача создаст временный файл на диске для каждой нижестоящей задачи, хэширует данные по ключу, а затем запишет ключ в соответствующий файл на диске в соответствии с хеш-значением ключа. Разумеется, при записи файла на диск он также сначала записывается в буфер памяти, а после заполнения буфера переполняется в файл на диске. Наконец, все временные файлы на диске также объединяются в один файл на диске и создается один индексный файл.

Механизм записи на диск этого процесса на самом деле точно такой же, как у неоптимизированного HashShuffleManager, потому что должно быть создано удивительное количество файлов на диске, но в конце будет объединен только один файл на диске. Таким образом, небольшое количество конечных файлов на диске также повышает производительность чтения в случайном порядке по сравнению с неоптимизированным HashShuffleManager.

Отличие этого механизма от обычного механизма работы SortShuffleManager состоит в том, что: во-первых, отличается механизм записи на диск, во-вторых, не выполняется сортировка. Это,Самым большим преимуществом включения этого механизма является то, что в процессе случайной записи операции сортировки данных не требуются., что экономит эту часть накладных расходов на производительность.

Принцип работы SortShuffleManager обходного механизма работыКак показано ниже:

bypass运行机制的SortShuffleManager工作原理

В-четвертых, сопоставьте и уменьшите размер бокового буфера.

Во время выполнения задачи Spark, если объем данных, обрабатываемых на стороне карты при тасовке, относительно велик, но размер буфера на стороне карты фиксирован, может случиться так, что данные буфера на стороне карты часто сбрасываются и записываются. в файл на диске, что делает производительность очень высокой.Регулируя размер буфера на стороне карты, можно избежать частых операций ввода-вывода на диск, тем самым повышая общую производительность задач Spark..

Конфигурация буфера карты по умолчанию составляет 32 КБ. Если каждая задача обрабатывает 640 КБ данных, произойдет 640/32 = 20 операций записи с переполнением. Если каждая задача обрабатывает 64 000 КБ данных, произойдет 64000/32 = 2000 операций записи с переполнением. очень серьезное влияние на производительность.

Способ настройки буфера на стороне карты:

val conf = new SparkConf()
  .set("spark.shuffle.file.buffer", "64")

В процессе Spark Shuffle размер буферного буфера задачи уменьшения в случайном порядке определяет объем данных, которые задача уменьшения может буферизовать каждый раз, то есть объем данных, которые можно извлекать каждый раз.Если ресурсов памяти достаточно, соответствующее увеличение размера буфера извлекаемых данных может уменьшить количество извлечений данных, что также может уменьшить количество сетевых передач, тем самым повысив производительность..

Размер буфера извлечения данных на стороне сокращения можно определить с помощьюspark.reducer.maxSizeInFlightпараметр для установки, значение по умолчанию составляет 48 МБ. Способ установки этого параметра следующий:

Уменьшите конфигурацию буфера извлечения побочных данных:

val conf = new SparkConf()
  .set("spark.reducer.maxSizeInFlight", "96")

5. Количество повторов на стороне уменьшения и интервал времени ожидания

Во время Spark Shuffle, когда задача сокращения извлекает свои собственные данные, она автоматически повторяет попытку в случае сбоя из-за сбоев в сети или по другим причинам.Для заданий, которые содержат особенно трудоемкие операции перемешивания, рекомендуется увеличить максимальное количество повторных попыток.(например, 60 раз), чтобы избежать сбоя извлечения данных из-за таких факторов, как полная сборка мусора JVM или нестабильность сети.На практике установлено, что для процесса тасования с большим объемом данных (от миллиардов до десятков миллиардов) настройка этого параметра может значительно повысить стабильность..

Количество повторных попыток данных на стороне сокращения может быть передано черезspark.shuffle.io.maxRetriesПараметр установлен, который представляет собой максимальное количество повторных попыток, которые могут быть сделаны. Если получение не будет успешным в течение заданного количества раз, это может привести к сбою выполнения задания.Значение по умолчанию — 3. Способ установки этого параметра следующий:

Конфигурация количества попыток извлечения данных на стороне уменьшения:

val conf = new SparkConf()
  .set("spark.shuffle.io.maxRetries", "6")

Во время процесса Spark Shuffle, когда задача сокращения извлекает свои собственные данные, она автоматически повторяет попытку, если происходит сбой из-за сетевых аномалий или по другим причинам.После сбоя она будет ждать определенный интервал времени перед повторной попыткой.Вы можете повысить стабильность операции перемешивания, увеличив интервал (например, 60 с)..

Сторона сокращения извлекает данные и ждет, пока не пройдет интервалspark.shuffle.io.retryWaitПараметр установлен, значение по умолчанию 5 с, метод настройки этого параметра следующий:

Сторона сокращения извлекает данные и ожидает настройки интервала:

val conf = new SparkConf()
  .set("spark.shuffle.io.retryWait", "60s")

6. Порог для обходного механизма

Для SortShuffleManager, если количество задач уменьшения в случайном порядке меньше определенного порога, операция сортировки не будет выполняться в процессе записи в случайном порядке, а данные будут записаны непосредственно способом неоптимизированного HashShuffleManager, но все временные данные сгенерированы по каждой задаче будет записано в конце.Файлы на диске объединяются в один файл, и создается отдельный индексный файл.

когда тыПри использовании SortShuffleManager, если операция сортировки действительно не нужна, рекомендуется увеличить этот параметр больше, чем количество задач чтения в случайном порядке, тогда карта-сторона в это время не будет сортироваться, уменьшая накладные расходы на сортировку, но Таким образом, все еще будет создаваться большое количество файлов на диске, поэтому производительность случайной записи необходимо улучшить..

Порог операции сортировки SortShuffleManager можно установить с помощьюspark.shuffle.sort.bypassMergeThresholdЭтот параметр установлен, значение по умолчанию 200, способ установки этого параметра следующий:

Сторона сокращения извлекает данные и ожидает настройки интервала:

val conf = new SparkConf()
  .set("spark.shuffle.sort.bypassMergeThreshold", "400")

перекос данных

То есть количество данных, разделенных на каждую область, неравномерно, вы можете настроить разделитель, и вы можете разделить его, как хотите.

Проблема перекоса данных в Spark в основном относится к проблеме перекоса данных в процессе перемешивания, которая вызвана разными объемами данных, соответствующими разным ключам.Разные задачи обрабатывают разные объемы данных..

Например, сокращенная сторона должна обработать в общей сложности 1 миллион единиц данных.Первая и вторая задачи распределяются по 10 000 единиц данных соответственно, а расчет выполняется в течение 5 минут.Третья задача распределяется по 980 000 единиц данных. В настоящее время третья задача Задача может занять 10 часов, что делает выполнение всего задания Spark 10 часами, что является следствием перекоса данных.

Обратите внимание, чтобы отличитьперекос данныхиперегрузка данныхВ обоих случаях перекос данных означает, что нескольким задачам выделяется большая часть данных, поэтому несколько задач выполняются медленно; избыток данных означает, что всем задачам выделяется большой объем данных, и все задачи выполняются медленно.

Производительность перекоса данных:

  1. Большинство задач задания Spark выполняются быстро, и только ограниченное количество задач выполняется очень медленно.В это время может быть перекос данных, и задание может выполняться, но очень медленно;
  2. Большинство задач заданий Spark выполняются быстро, но некоторые задачи внезапно сообщают об OOM во время выполнения. После повторного выполнения несколько раз в определенной задаче сообщается об ошибке OOM. В это время может быть перекос данных, и задание не может езжу нормально..

Проблема перекоса данных позиционирования:

  1. Проверьте операторы перемешивания в коде, такие как reduceByKey, countByKey, groupByKey, join и другие, и оцените, будет ли здесь перекос данных в соответствии с логикой кода;
  2. Проверьте файл журнала задания Spark. Файл журнала будет записывать ошибку с точностью до определенной строки кода. Вы можете определить, на каком этапе произошла ошибка, основываясь на месте кода, где находится исключение, и какой оператор перетасовки соответствующий оператор тасования;

1. Предварительно агрегировать необработанные данные

1. Избегайте процесса перемешивания

В большинстве случаев источником данных для заданий Spark являются таблицы Hive, и эти таблицы Hive в основном представляют собой вчерашние данные после ETL. Чтобы избежать перекоса данных, мы можем рассмотреть возможность отказа от процесса перетасовки.Если процесс перетасовки избегается, возможность проблем с перекосом данных принципиально устраняется.

Если данные задания Spark поступают из таблицы Hive, вы можете сначала агрегировать данные в таблице Hive, например, сгруппировав по ключу, и склеив все значения, соответствующие одному и тому же ключу, в строку в специальном формате, так что ключ имеет только один фрагмент данных, после этого при обработке всех значений ключа требуется только операция сопоставления, а операция перемешивания не требуется. Таким образом, можно избежать операции перетасовки и маловероятно возникновение какой-либо проблемы перекоса данных.

Для работы данных в таблице Hive не обязательно склеивать их в строку, либо можно напрямую вычислять кумулятивный расчет каждых данных ключа.Чтобы отличить, разница между большим объемом обрабатываемых данных и перекосом данных.

2. Увеличьте степень детализации ключей (уменьшите вероятность перекоса данных и увеличьте объем данных для каждой задачи)

Если нет возможности агрегировать фрагмент данных для каждого ключа, в определенных сценариях можно рассмотреть возможность расширения детализации агрегирования ключа.

Например, в настоящее время имеется 100 000 пользовательских данных, а степень детализации текущего ключа — (провинция, город, район, дата).Теперь мы рассматриваем возможность расширения детализации и расширения детализации ключа до (провинция, город, дата). Число будет уменьшено, а разница в объеме данных между ключами также может быть уменьшена, что может смягчить явление и проблему перекоса данных. (Этот метод действителен только для определенных типов данных. Когда сценарий применения не подходит, перекос данных будет усугубляться)

2. Препроцессинг приводит к искажению ключей

1. Фильтр

Если в заданиях Spark разрешено отбрасывать некоторые данные, вы можете отфильтровать ключи, которые могут привести к искажению данных, и отфильтровать данные, соответствующие ключам, которые могут привести к искажению данных. Таким образом, искажение данных не произойдет. в Spark рабочих мест.

2. Используйте случайные ключи

При использовании таких операторов, как groupByKey и reduceByKey, вы можете использовать случайные ключи для достижения двойной агрегации, как показано на следующем рисунке:

随机key实现双重聚合

Сначала добавьте префикс случайного числа к ключу каждых данных с помощью оператора карты, разбейте ключ, измените исходный тот же ключ на другой ключ, а затем выполните первую агрегацию, чтобы исходная задача могла быть обработана данными. распределяется по нескольким задачам для локальной агрегации, затем префикс каждого ключа удаляется и агрегация выполняется снова.

Этот метод хорошо влияет на перекос данных, вызванный такими операторами, как groupByKey и reduceByKey, он применим только к операции перемешивания класса агрегации, и область его применения относительно узка. Если это операция перемешивания класса соединения, необходимо использовать другие решения..

Этот метод также можно попробовать, когда предыдущие решения не работают.

3. Образец выборки объединяет искаженные ключи по отдельности.

В Спарке,Если RDD имеет только один ключ, данные, соответствующие этому ключу, будут по умолчанию разбросаны во время процесса перемешивания и обработаны различными задачами на стороне сокращения..

Поэтому, когда данные искажены одним ключом, искаженный ключ может быть извлечен отдельно для формирования RDD, а затем RDD, составленный из искаженного ключа, может использоваться для отдельного присоединения к другим RDD. механизма данные в этом RDD будут распределены по нескольким задачам для операции объединения на этапе перемешивания.

Процесс раздельного соединения косой шпонки показан на следующем рисунке:

倾斜key单独join流程

Применимый анализ сценариев:

Для данных в RDD вы можете преобразовать их в промежуточную таблицу или напрямую использовать countByKey(), чтобы увидеть объем данных, соответствующий каждому ключу в RDD. данных для одного ключа Тем более, что вы можете рассмотреть возможность использования этого метода.

Когда объем данных очень велик, вы можете рассмотреть возможность использования выборочной выборки для получения 10 % данных, а затем проанализировать, какой ключ в 10 % данных может привести к искажению данных, а затем извлечь данные, соответствующие этот ключ отдельно.

Неприменимый анализ сценариев:

Если в RDD много ключей, вызывающих искажение данных, эта схема не применяется.

3. Улучшить, уменьшить параллелизм

Когда схема 1 и схема 2 плохо влияют на обработку перекоса данных, рассмотрите возможность увеличения параллелизма редукционной стороны в процессе перемешивания.Увеличение параллелизма редукционной стороны увеличивает количество задач на редукционной стороне , то распределяется каждая задача. Объем полученных данных будет соответственно уменьшен, тем самым облегчая проблему перекоса данных.

1. Настройка параллелизма на нижней стороне

В большинстве операторов тасования можно передать параметр настройки параллелизма, например, reduceByKey(500).Этот параметр будет определять параллелизм редукционной стороны в процессе тасования.При выполнении операции тасования он будет создан соответствующим образом. Указанное количество задач сокращения. Для операторов, подобных перетасовке в Spark SQL, таких как группировка, объединение и т. д., необходимо установить параметр, а именноspark.sql.shuffle.partitions, который представляет параллелизм задачи чтения в случайном порядке. Значение по умолчанию — 200, что слишком мало для многих сценариев.

Увеличение количества задач случайного чтения позволяет назначить несколько клавиш, изначально назначенных одной задаче, нескольким задачам, что позволяет каждой задаче обрабатывать меньше данных, чем раньше.

Например, если изначально 5 ключей, каждый ключ соответствует 10 элементам данных, и эти 5 ключей выделены задаче, то эта задача будет обрабатывать 50 элементов данных. После добавления задачи чтения в случайном порядке каждой задаче назначается ключ, то есть каждая задача обрабатывает 10 фрагментов данных, поэтому время выполнения каждой задачи естественно сократится.

2. Дефекты в настройке параллелизма на стороне редуктора

Улучшение параллелизма редукционной стороны принципиально не меняет характер и проблемы перекоса данных (схема 1 и вариант 2 принципиально предотвращают возникновение перекоса данных), просто чтобы облегчить и уменьшить нагрузку на данные при тасовке, уменьшите задачу, насколько это возможно, а также проблему перекоса данных, которая подходит для ситуации, когда объем данных, соответствующий большему количеству ключей, относительно велик.

Это решение обычно не может полностью устранить перекос данных, потому что если есть какие-то экстремальные ситуации, например, количество данных, соответствующее ключу, равно 1 миллиону, то независимо от того, насколько увеличивается номер вашей задачи, ключ, соответствующий 1 миллиону данных, будет определенно все еще будет выделено.Он обрабатывается в задаче, поэтому неизбежен перекос данных. Таким образом, можно сказать, что эта схема является средством, которое можно попытаться использовать при обнаружении перекоса данных, попытаться уменьшить перекос данных самым простым способом или использовать его в сочетании с другими схемами.

В идеале, после улучшения параллелизма на стороне редукции проблема перекоса данных будет в определенной степени облегчена или даже полностью устранена; однако в некоторых случаях это только сделает задачи, которые изначально выполнялись медленно из-за перекоса данных. работать быстрее.Немного улучшено, или устранена проблема ООМ некоторых задач, но работа по-прежнему медленная.В это время необходимо вовремя отказаться от третьего решения и начать пробовать последнее решение.

4. Используйте присоединение к карте

При нормальных обстоятельствах операция объединения будет выполнять процесс перемешивания, а также выполнять объединение с уменьшением, то есть сначала агрегировать все одинаковые ключи и соответствующие значения в задачу сокращения, а затем выполнять соединение. Процесс обычного соединения показан на следующем рисунке:

普通join过程

Обычное соединение будет проходить через процесс перемешивания, и после перемешивания это эквивалентно извлечению данных одного и того же ключа в задачу чтения в случайном порядке, а затем объединению, которое является соединением сокращения. Однако, если RDD относительно невелик, вы можете использовать широковещательный оператор полных данных + map для малого RDD, чтобы добиться того же эффекта, что и присоединение, то есть присоединение к карте. происходить. .

Примечание. RDD не может передаваться напрямую, только данные внутри RDD могут быть загружены в память драйвера путем сбора и последующей передачи..

1. Основная идея:

Вместо использования оператора соединения для операции соединения широковещательная переменная и оператор сопоставления используются для реализации операции соединения, тем самым полностью избегая операции типа перетасовки и полностью избегая возникновения и возникновения перекоса данных. Вытащите данные из меньшего СДР прямо в память на стороне Водителя через оператор сбора, а затем создайте для него широковещательную переменную; затем выполните оператор карты на другом СДР, в операторной функции, из широковещательной переменной Получите полный данные меньшего СДР и сравните их с каждыми данными текущего СДР по ключу соединения.Если ключ соединения один и тот же, то соедините данные двух СДР нужным вам способом.

В соответствии с приведенными выше идеями операция перемешивания вообще не будет выполняться, что принципиально устраняет проблему перекоса данных, которая может быть вызвана операцией объединения.

Когда операция соединения имеет проблему перекоса данных и объем данных в одном из RDD невелик, этот метод может быть предпочтительным, и эффект очень хороший..

Процесс присоединения карты показан на следующем рисунке:

map join过程

2. Неприменимый сценарный анализ:

Поскольку широковещательная переменная Spark сохраняет копию в каждом исполнителе, если объем данных обоих RDD относительно велик, то преобразование RDD с относительно большим объемом данных в широковещательную переменную может привести к переполнению памяти.

Исправление проблем

1. Избегайте OOM-out памяти

В процессе перемешивания задача редукции не ждет, пока задача на стороне карты запишет все свои данные на диск, а затем извлечет их, аКогда сторона карты записывает немного данных, задача редукции извлечет небольшую часть данных, а затем сразу же выполнит последующие операции, такие как агрегирование и использование операторных функций..

Сколько данных может извлечь задача на стороне сокращения, определяется буферным буфером, из которого сокращение извлекает данные, потому чтоИзвлеченные данные сначала помещаются в буфер, а затем выполняется последующая обработка.Размер буфера по умолчанию составляет 48 МБ..

Задача на стороне сокращения будет извлекать и вычислять одновременно, и она может не каждый раз заполняться 48 МБ данных. Она может обрабатываться, когда большую часть времени извлекается часть данных.

Несмотря на то, что увеличение размера буфера на стороне сокращения может уменьшить количество извлечений и улучшить производительность в случайном порядке, иногда объем данных на стороне карты очень велик, а скорость записи очень высока.В это время, когда все задачи на сторона сокращения, возможно, что все достигнут максимального предела собственного буфера, то есть 48 МБ, В это время вместе с кодом функции агрегации, выполняемой на стороне сокращения, может быть большое количество объектов создается, что может привести к переполнению памяти, т.е.OOM.

Если возникает проблема переполнения памяти на стороне уменьшения, мы можем рассмотреть возможность уменьшения размера буфера данных извлечения на стороне уменьшения, например, до 12 МБ..

Эта проблема возникла в реальной производственной среде, что является типичнымПринцип исполнения на исполнение. Уменьшение буфера для извлечения данных на стороне сокращения не так просто вызвать OOM, но, соответственно, количество извлечений на стороне повторения увеличивается, что приводит к увеличению накладных расходов на передачу по сети и снижению производительности.

Обратите внимание, что для обеспечения выполнения задачи рассмотрите возможность оптимизации производительности.

2. Избегайте сбоев при извлечении файлов в случайном порядке, вызванных GC

В работе Spark иногдаshuffle file not foundошибка, это очень распространенная ошибка,Иногда после возникновения этой ошибки выберите повторное выполнение, и эта ошибка больше не будет отображаться..

Возможные причины вышеуказанных проблемВ операции Shuffle задача следующего этапа хочет перейти к Исполнителю, где находится задача предыдущего этапа для извлечения данных.В результате другая сторона выполняет GC.Выполнение GC приведет к тому, что все рабочие сайты в Исполнитель остановить., такие как BlockManager, сетевое взаимодействие на основе netty и т. д., что приведет к тому, что последующая задача будет извлекать данные и не извлекать их в течение длительного времени, и об этом будет сообщено.shuffle file not foundошибка, и при повторном ее выполнении во второй раз эта ошибка больше не возникнет.

Производительность в случайном порядке можно настроить, отрегулировав два параметра: количество попыток передачи данных на стороне сокращения и временной интервал, когда сторона сокращения извлекает данные.Увеличение значения параметра увеличивает количество попыток извлечения данных на стороне сокращения, и каждый time Более длительный интервал ожидания после сбоя.

Сбой извлечения файла в случайном порядке, вызванный JVM GC, регулирует количество повторных попыток данных и временной интервал для извлечения данных на стороне сокращения:

val conf = new SparkConf()
  .set("spark.shuffle.io.maxRetries", "6")
  .set("spark.shuffle.io.retryWait", "60s")

3. Проблема всплеска трафика сетевой карты из-за режима YARN-CLIENT

В режиме YARN-клиент Драйвер запускается на локальном компьютере, и Драйвер отвечает за планирование всех задач, требуя частого взаимодействия с несколькими Исполнителями в кластере YARN.

Предполагая, что исполнителей 100 и задач 1000, каждому исполнителю выделяется 10 задач.После этого Драйвер часто связывается с 1000 задачами, запущенными на Исполнителе.Передача данных очень велика, а категория связи особенно высока. Это делает возможным, что во время выполнения задачи Spark трафик сетевой карты локальной машины резко возрастет из-за частого и большого сетевого обмена данными.

Уведомление,Режим YARN-клиент используется только в тестовой среде, и причина использования режима YARN-клиент заключается в том, что вы можете просматривать подробную и исчерпывающую информацию журнала., Просматривая журнал, вы можете зафиксировать проблемы в программе и избежать сбоев в производственной среде.

В производственной среде необходимо использовать режим кластера YARN. В режиме YARN-кластера не будет всплеска трафика сетевой карты локальной машины., Если в режиме YARN-кластера возникает проблема с сетевым подключением, ее должна решить группа эксплуатации и обслуживания.

4. Переполнение стека JVM в режиме YARN-CLUSTER не может быть выполнено

Когда задание Spark содержит содержимое SparkSQL, может быть возможно запустить его в режиме клиента YARN, но не удастся отправить и запустить в режиме кластера YARN (сообщается об ошибке OOM).

В режиме YARN-клиент, Драйвер работает на локальном компьютере, а конфигурация PermGen JVM, используемая Spark, представляет собой файл spark-class на локальном компьютере.Размер постоянной генерации JVM составляет 128 МБ., это не проблема, но вВ режиме YARN-кластера, Драйвер запускается на узле кластера YARN с использованием параметров по умолчанию, которые не были настроены,Размер постоянной генерации PermGen составляет 82 МБ..

SparkSQL должен выполнять очень сложный семантический анализ SQL, преобразование синтаксического дерева и т. д. Если сам оператор SQL очень сложный, это может привести к потере производительности и занятию памяти, особенно занятие PermGen будет больше. .

так,В настоящее время, если PermGen занимает более 82 МБ, но менее 128 МБ, он будет работать в режиме YARN-клиент, но не может работать в режиме YARN-кластера..

Решение вышеуказанной проблемы заключается в увеличении мощности PermGen (постоянная генерация), должен быть вspark-submitСоответствующие параметры задаются в скрипте, а метод настройки следующий:

--conf spark.driver.extraJavaOptions="-XX:PermSize=128M -XX:MaxPermSize=256M"

С помощью вышеуказанного метода устанавливается размер постоянного поколения драйвера, значение по умолчанию составляет 128 МБ, а максимальное значение составляет 256 МБ, что позволяет избежать вышеупомянутых проблем.

5. Избегайте переполнения стека SparkSQL JVM

Когда в операторе SQL SparkSQL есть сотни или тысячи ключевых слов или, может произойти переполнение памяти стека JVM на стороне драйвера.

Переполнение памяти стека JVM в основном связано с вызовом слишком большого количества уровней методов, что приводит к большому количеству очень глубокой рекурсии, которая превышает предел глубины стека JVM.. (Мы предполагаем, что когда SparkSQL имеет большое количество операторов or, при синтаксическом анализе SQL, таком как преобразование в синтаксическое дерево или создание плана выполнения, обработка или является рекурсивной. Когда имеется много операторов or, будет происходить много рекурсии. )

В настоящее время,Рекомендуется разделить оператор sql на несколько операторов sql для выполнения, и каждый оператор sql должен иметь как можно меньше предложений 100.. Согласно фактическому тесту производственной среды, ключевое слово или оператора SQL контролируется в пределах 100, что обычно не вызывает переполнения памяти стека JVM.


Чтобы увидеть больше хороших статей о больших данных, пожалуйста, обратите внимание на паблик-аккаунт [Пять минут на изучение больших данных

--end--

Рекомендация статьи:
Настройка производительности Spark — настройка оператора RDD