В среде MapReduce этап Shuffle является мостом между Map и Reduce, а этап Map выводит данные на этап Reduce через процесс Shuffle.Поскольку Shuffle включает в себя чтение и запись на диск и сетевой ввод-вывод, производительность Shuffle напрямую влияет на производительность всей программы.. В Spark также есть фазы Map и Reduce, поэтому также происходит Shuffle.
> Статьи сначала будут публиковаться в публичном аккаунте【Пять минут на изучение больших данных】
Spark Shuffle
Spark Shuffle делится на два типа: перемешивание на основе хэширования и перемешивание на основе сортировки. Давайте сначала представим их процесс разработки, который поможет нам лучше понять Shuffle:
До Spark 1.1 в Spark был реализован только один метод Shuffle, то есть Shuffle на основе хэша. Реализация Shuffle на основе Sort была представлена в Spark 1.1, а после Spark 1.2 реализация по умолчанию была изменена с Shuffle на основе Hash на Shuffle на основе Sort, то есть используемый ShuffleManager был изменен с хэша по умолчанию на sort.В версии Spark 2.0 Hash Shuffle больше не используется..
Причина, по которой Spark с самого начала предоставляет механизм реализации Shuffle на основе Hash, заключается в том, чтобы избежать ненужной сортировки.Все думают о MapReduce в Hadoop, который использует сортировку как фиксированный шаг, а есть много задач, которые не требуют сортировки. также будет сортировать его, вызывая много ненужных накладных расходов.
В реализации Shuffle на основе хэша задача каждого этапа Mapper будет задачей каждого этапа Reduce. Создайте файл, обычно большое количество файлов (то есть соответствующий промежуточным файлам M*R, где M представляет Mapper Количество задач на этапе, R представляет количество задач на этапе сокращения) с большим количеством случайных операций ввода-вывода на диск и большим объемом накладных расходов памяти.
Чтобы решить вышеуказанные проблемы, в Spark 0.8.1 была введена реализация Shuffle на основе хэша.Перемешать
Механизм консолидации (т.е. механизм слияния файлов), механизм обработки для слияния промежуточных файлов, созданных на стороне Mapper. через свойства конфигурацииspark.shuffie.consolidateFiles=true
, уменьшая количество промежуточных файлов. Путем слияния файлов генерация промежуточных файлов может быть изменена таким образом, чтобы каждая исполнительная единица предназначалась для каждого сокращения.
Задача фазы создает файл.
> Единица выполнения соответствует: количеству ядер на каждой стороне картографа / каждой задаче > Количество выделенных ядер (по умолчанию 1). Наконец, количество файлов может быть изменено с M*R на E*C/T*R, где > E представляет количество Исполнителей, C представляет количество доступных Ядер, а T представляет количество Ядер, выделенных Задачей.
Spark 1.1 представляет Sort Shuffle:
В реализации Hash-based Shuffle количество сгенерированных файлов промежуточных результатов будет зависеть от количества задач в фазе Reduce, то есть от параллелизма стороны Reduce, поэтому количество файлов по-прежнему неуправляемо и не может реально решить проблема. Чтобы лучше решить эту проблему, в Spark 1.1 была введена реализация Shuffle на основе Sort, а после Spark 1.2 реализация по умолчанию также была изменена с реализации Shuffle на основе Hash на реализацию Shuffle на основе Sort, то есть использование ShuffleManager изменен с хэша по умолчанию на сортировку.
В Sort-based Shuffle задачи каждого этапа Mapper не создают отдельный файл для задач каждого этапа Reduce, а все записываются в файл данных, и в то же время создается индексный файл. этап может получать связанные данные через индексный файл. Непосредственным преимуществом отказа от большого количества файлов является сокращение случайного дискового ввода-вывода и накладных расходов на память. Количество окончательно сгенерированных файлов уменьшается до 2*M, где M представляет количество задач на этапе Mapper, а задачи на каждом этапе Mapper генерируют два файла (1 файл данных, 1 индексный файл), а окончательное количество files — это M файлы данных и M индексные файлы. Таким образом, окончательное количество файлов равно 2*M.
Начиная со Spark 1.4, реализация Shuffie на основе Tungsten-Sort также была введена в процесс Shuffle Оптимизация, выполненная проектом Tungsten, может значительно повысить производительность Spark при обработке данных. (Вольфрам в переводе на китайский язык - вольфрамовая проволока)
> Примечание. В некоторых конкретных сценариях приложений производительность механизма Shuffle на основе хэша будет превышать производительность механизма реализации Shuffle на основе Sort.
Картинка для понимания истории итераций Spark Shuffle:
Почему Spark наконец отказался от HashShuffle и стал использовать Sorted-Based Shuffle?
Мы можем найти ответ в самой фундаментальной и актуальной проблеме оптимизации Spark: Spark, использующий HashShuffle, создает большое количество файлов во время перемешивания. При увеличении объема данных количество генерируемых файлов становится неконтролируемым, что серьезно ограничивает производительность и масштабируемость Spark, поэтому Spark должен решить эту проблему и уменьшить количество файлов, генерируемых ShuffleWriter на стороне Mapper, чтобы Spark может От масштаба сотен кластеров до масштаба, который может поддерживать тысячи или даже десятки тысяч кластеров.
Но идеально ли использовать Sorted-Based Shuffle? Ответ — нет. Sorted-Based Shuffle также имеет недостатки. Его недостатком является функция сортировки. Она заставляет данные сначала сортироваться на стороне Mapper, что приводит к увеличению скорости сортировка немного медленная. К счастью, появился Tungsten-Sort Shuffle, который улучшил алгоритм сортировки и оптимизировал скорость сортировки. Вольфрам-Сорт Shuffle был включен в Sorted-Based Shuffle, и механизм Spark автоматически распознает, что программа нуждается в Sorted-Based Shuffle. Перемешивание или перемешивание вольфрамовой сортировки.
Ниже приводится подробный анализ основного принципа выполнения каждого Shuffle:
1. Анализ перемешивания хэшей
В следующем обсуждении предполагается, что каждый Executor имеет 1 ядро процессора.
1. HashShuffleManager
На этапе записи в случайном порядке, после завершения расчета одного этапа, для следующего этапа могут быть выполнены операторы, подобные перетасовке (например, reduceByKey), и данные, обрабатываемые каждой задачей, «делятся» по ключу. «Отделение» означаетВыполнить алгоритм хеширования на том же ключе, так что один и тот же ключ записывается в один и тот же файл на диске, и каждый файл на диске принадлежит только одной задаче нижестоящего этапа.Перед записью данных на диск данные сначала записываются в буфер памяти, когда буфер памяти заполняется, он переполняется в файл на диске..
Сколько задач будет на следующем этапе и сколько дисковых файлов будет создано для каждой задачи на текущем этапе. Например, на следующем этапе всего 100 задач, тогда каждая задача текущего этапа должна создать 100 файлов на диске. Если на текущем этапе 50 задач, всего 10 Исполнителей, и каждый Исполнитель выполняет 5 задач, то всего на каждом Исполнителе будет создано 500 дисковых файлов, а на всех Исполнителях будет создано 5000 дисковых файлов. Отсюда видно, чтоКоличество дисковых файлов, созданных неоптимизированной операцией случайной записи, ошеломляет..
Этап чтения в случайном порядке обычно является тем, что этап делает в начале. В это время на сценеКаждой задаче нужно подтянуть все одинаковые ключи в результатах вычислений предыдущего этапа от каждого узла через сеть к своему узлу, а затем выполнить операции агрегации ключей или соединения.. В процессе случайной записи задача карты создает файл на диске для каждой задачи редукции нижестоящей стадии, поэтому в процессе чтения в случайном порядке каждой задаче редукции нужно тянуть только с тех узлов, где все задачи карты вышестоящей стадии принадлежит Это ваш собственный файл на диске.
Процесс извлечения случайного чтения представляет собой агрегацию во время извлечения. Каждая задача чтения в случайном порядке будет иметь свой собственный буферный буфер,Каждый раз извлекайте данные того же размера, что и буферный буфер., а затем выполнять такие операции, как агрегирование с помощью Map в памяти. После агрегирования пакета данных извлекается следующий пакет данных и помещается в буфер для операций агрегирования. И так далее, пока наконец не будут подтянуты все данные и не будет получен окончательный результат.
Как работает HashShuffleManagerКак показано ниже:
2. Оптимизированный HashShuffleManager
Для оптимизации HashShuffleManager мы можем установить параметр:spark.shuffle.consolidateFiles
, значение этого параметра по умолчанию — false, установите для него значение true, чтобы включить механизм оптимизации.Если мы используем HashShuffleManager, рекомендуется включить эту опцию.
После включения механизма консолидации в процессе случайной записи задача не создает файл на диске для каждой задачи нижестоящего этапа.shuffleFileGroupКонцепция чего-либо,Каждая группа shuffleFileGroup соответствует пакету файлов на диске, а количество файлов на диске совпадает с количеством задач на нисходящем этапе. Сколько ядер процессора на Executor, столько задач может выполняться параллельно. Каждая задача, выполняемая параллельно в первом пакете, создаст группу shuffleFileGroup и запишет данные в соответствующий файл на диске..
Когда ядро ЦП Исполнителя завершает выполнение пакета задач, а затем выполняет следующий пакет задач,Следующая партия задач будет повторно использовать существующую группу shuffleFileGroup., включая в него файл на диске, то есть в это время задача будет записывать данные в существующий файл на диске, а не в новый файл на диске. следовательно,Механизм консолидации позволяет различным задачам повторно использовать один и тот же пакет дисковых файлов, так что дисковые файлы нескольких задач могут быть эффективно объединены в определенной степени, что значительно сокращает количество дисковых файлов и повышает производительность случайной записи..
Предполагая, что на втором этапе 100 задач, на первом этапе 50 задач, всего остается 10 Исполнителей (количество ЦП Исполнителя равно 1), и каждый Исполнитель выполняет 5 задач. Затем, когда изначально используется неоптимизированный HashShuffleManager, каждый исполнитель сгенерирует 500 файлов на диске, а все исполнители сгенерируют 5000 файлов на диске. Но после оптимизации в это время формула для расчета количества файлов на диске, созданных каждым Исполнителем:cpu core的数量 * 下一个stage的task数量
, то есть каждый исполнитель в это время создаст только 100 файлов на диске, а все исполнители создадут только 1000 файлов на диске.
> Эта функция имеет очевидные преимущества, но почему Spark не установил эту функцию в качестве опции по умолчанию в реализации на основе Hash Shuffle, официальное заявление заключается в том, что эта функция нестабильна.
Как работает оптимизированный HashShuffleManagerКак показано ниже:
Преимущества и недостатки механизма Shuffle на основе хэша
преимущество:
- Ненужные накладные расходы на сортировку могут быть опущены.
- Избегает накладных расходов памяти, необходимых для сортировки.
недостаток:
- Создается слишком много файлов, что может оказать давление на файловую систему.
- Случайное чтение и запись большого количества небольших файлов приводит к определенной нагрузке на диск.
- Объем кэш-памяти, необходимый для записи блока данных, также увеличится, что приведет к увеличению нагрузки на память.
Во-вторых, анализ SortShuffle
Механизм работы SortShuffleManager в основном делится на три типа:
1. нормальный рабочий механизм;
2. обходной рабочий механизм, когда количество задач случайного чтения меньше или равноspark.shuffle.sort.bypassMergeThreshold
При значении параметра (по умолчанию 200) будет включен механизм обхода;
3. Как работает сортировка вольфрама, вам необходимо установить элементы конфигурации, чтобы включить этот операционный механизмspark.shuffle.manager=tungsten-sort
. Включение этой конфигурации также не гарантирует, что этот операционный механизм будет использоваться (поясняется позже).
1. Общий рабочий механизм
В этом режимеДанные сначала записываются в структуру данных в памяти., в это время, в соответствии с разными операторами тасования, могут быть выбраны разные структуры данных.Если это оператор тасования класса агрегации, например, reduceByKey, то будет выбрана структура данных Map, и агрегация будет выполняться через Map при записи в память.;Если это обычный оператор тасования, такой как соединение, тогда структура данных Array будет выбрана и записана непосредственно в память.. Затем каждый раз, когда часть данных записывается в структуру данных памяти, будет оцениваться, достигнут ли определенный критический порог. Если достигнут критический порог, то делается попытка переполнить данные в структуре данных в памяти на диск, а затем очистить структуру данных в памяти.
Прежде чем переполнение будет записано в файл на диске, существующие данные в структуре данных памяти сортируются в соответствии с ключом. После сортировки данные записываются в файл на диск партиями. Номер пакета по умолчанию — 10 000, то есть отсортированные данные будут записываться в файл на диск пакетами в виде 10 000 данных в пакете. Запись файлов на диск реализована через BufferedOutputStream Java.BufferedOutputStream — это буферизованный поток вывода Java.Он сначала буферизует данные в памяти и снова записывает их в файл на диске, когда буфер памяти переполняется, что может уменьшить количество дисковых операций ввода-вывода и повысить производительность..
Когда задача записывает все данные в структуру данных памяти, происходит несколько операций переполнения диска, а также создается несколько временных файлов. Наконец, все предыдущие временные файлы на диске будут объединены.процесс слияния, данные из всех предыдущих временных файлов на диске будут считаны, а затем по очереди записаны в окончательный файл на диске. Кроме того, поскольку задаче соответствует только один файл на диске, а значит, все данные, подготовленные задачей для задачи нижестоящего этапа, находятся в этом файле, поэтому будет записана отдельная копия.индексный файл, который определяет начальное и конечное смещения данных каждой нижестоящей задачи в файле.
SortShuffleManager значительно сокращает количество файлов из-за процесса слияния файлов на диске. Например, на первом этапе 50 задач, всего 10 Исполнителей, каждый Исполнитель выполняет 5 задач, а на втором этапе 100 задач. Поскольку каждая задача имеет только один дисковый файл, на данный момент на каждом исполнителе имеется только 5 дисковых файлов и только 50 дисковых файлов на всех исполнителях.
Принцип работы SortShuffleManager обычного рабочего механизмаКак показано ниже:
2. обходной рабочий механизм
Когда количество задач на стороне Reducer относительно невелико, механизм реализации на основе Hash Shuffle оказывается значительно быстрее, чем механизм реализации на основе Sort Shuffle, поэтому механизм реализации на основе Sort Shuffle обеспечивает запасное решение, которым является обход механизм работы.. Для побочных задач Reducer меньше, чем свойства конфигурацииspark.shuffle.sort.bypassMergeThreshold
При установке номера используйте запасной план со стилем Hash.
Условия срабатывания механизма байпаса следующие:
- Количество задач перетасовки карты меньше, чемspark.shuffle.sort.bypassMergeThreshold=200
значение параметра.
- оператор тасования, не являющийся агрегатным классом.
На этом этапе каждая задача создаст временный файл на диске для каждой последующей задачи, хэширует данные в соответствии с ключом, а затем запишет ключ в соответствующий файл на диске в соответствии с хеш-значением ключа. Разумеется, при записи файла на диск он также сначала записывается в буфер памяти, а после заполнения буфера переполняется в файл на диске. Наконец, все временные файлы на диске также объединяются в один файл на диске и создается один индексный файл.
Механизм записи на диск этого процесса на самом деле точно такой же, как у неоптимизированного HashShuffleManager, потому что должно быть создано удивительное количество файлов на диске, но в конце будет объединен только один файл на диске. Таким образом, небольшое количество конечных файлов на диске также повышает эффективность этого механизма при случайном чтении по сравнению с неоптимизированным HashShuffleManager.
Отличие этого механизма от обычного механизма работы SortShuffleManager в том, что: во-первых, другой механизм записи на диск, во-вторых, он не будет сортироваться. Это,Самым большим преимуществом включения этого механизма является то, что в процессе случайной записи операции сортировки данных не требуются., что экономит эту часть накладных расходов на производительность.
Как работает SortShuffleManager обходного рабочего механизмаКак показано ниже:
3. Механизм работы Tungsten Sort Shuffle
Механизм реализации Shuffle на основе Tungsten Sort в основном использует оптимизацию, сделанную проектом Tungsten для эффективной обработки. Перемешать.
Spark предоставляет свойства конфигурации для выбора конкретного механизма реализации Shuffle, но следует отметить, что, хотя Spark по умолчанию открывает механизм реализации, основанный на SortShuffle, на самом деле, обратитесь к части ядра фреймворка Shuffle, чтобы узнать о реализации, основанной на SortShuffle. Механизм и механизм реализации, основанный на Tungsten Sort Shuffle, используют SortShuffleManager, а конкретный внутренний механизм реализации оценивается двумя предоставленными методами:
Если он не основан на сортировке вольфрамом, пропуститеSortShuffleWriter.shouldBypassMergeSort
Метод оценивает, нужно ли вернуться к механизму реализации Shuffle в стиле Hash.Если условия, возвращаемые методом, не выполняются, передатьSortShuffleManager.canUseSerializedShuffle
Метод оценивает, нужно ли использовать механизм реализации на основе Tungsten Sort Shuffle, и когда оба метода возвращают false, то есть когда соответствующие условия не выполняются, автоматически будет использован механизм нормальной работы.
Поэтому при установкеspark.shuffle.manager=tungsten-sort
Однако нет гарантии, что будет использоваться механизм реализации Shuffle, основанный на сортировке вольфрамом.
Для реализации механизма Tungsten Sort Shuffle должны быть выполнены следующие условия.:
1. Нет операции агрегирования или требования для сортировки вывода в зависимости Shuffle.
2. Сериализатор Shuffle поддерживает перемещение сериализованных значений (в настоящее время поддерживаются только сериализаторы, настроенные с помощью KryoSerializer Spark SQL framework).
3. Количество выходных разделов в процессе Shuffle меньше 16777216.
На самом деле существуют и другие ограничения в процессе использования, такие как введение модели управления памятью в стиле страницы, длина одной внутренней записи не может превышать 128 МБ (для конкретной модели памяти обратитесь к классу PackedRecordPointer) . Кроме того, ограничение на количество разделов также вызвано этой моделью памяти.
Таким образом, текущий механизм реализации, основанный на Tungsten Sort Shuffle, все еще является относительно жестким.
Преимущества и недостатки механизма перемешивания на основе сортировки
преимущество:
- Значительно уменьшено количество мелких файлов и уменьшено использование памяти на стороне Mapper;
- Spark может обрабатывать не только мелкомасштабные данные, но даже при обработке крупномасштабных данных он не будет легко достигать узких мест производительности.
недостаток:
- Если количество Задач в Mapper слишком велико, все равно будет генерироваться много мелких файлов.В это время, в процессе передачи данных из Shuffle на сторону Reducer, Reducer должен будет записывать большое количество записей на столько же времени на десериализацию, что приводит к большому потреблению памяти и огромной нагрузке на сборщик мусора, что приводит к замедлению работы системы или даже к сбою;
- Форсирует сортировку на стороне Mapper, даже если сами данные не нуждаются в сортировке;
- Его необходимо сортировать на основе самих записей, что является самым фатальным потреблением производительности Sort-Based Shuffle.
Использованная литература:
- «Бизнес-трилогия Spark Big Data»