Тщательно изучите процесс тасования в Spark (UnsafeShuffleWriter of Shuffle Writer)

искусственный интеллект алгоритм Spark .NET

Первый личный публичный аккаунт Spark, обмен технологиями, синхронизация с личным сайтом coolplayer.net, все перепечатки запрещены без моего согласия

давным-давноэта статьяВ ней должно быть написано, но я считаю эту часть слишком интересной, поэтому просто открываю статью.

Поскольку UnsafeShuffleWriter использует унифицированное управление памятью, вы можете обратиться к предыдущей статье, если мало что знаете об этом.эта статья

Общий процесс

UnsafeShuffleWriter поддерживает ShuffleExternalSorter для внешней сортировки.О внешней сортировке я уже говорил в предыдущей статье.Внешняя сортировка заключается в частичной сортировке данных и выводе данных на диск,а затем окончательно слить глобальную сортировку,так как это тоже внешняя сортировка, в чем отличие от SortShuffleWriter, тут только ShuffleInMemorySorter в памяти сначала по id раздела записи Отсортированные данные сериализуются, сжимаются и выводятся в сегмент временного файла, а позиция поиска каждого сегмента раздела записывается, чтобы данные каждого раздела можно было прочитать отдельно позже, а поток чтения распаковывается и реверсируется. Сериализованный, вы можете читать нормально.

Весь процесс заключается в непрерывной вставке данных в ShuffleInMemorySorter, применении памяти, если памяти нет, переносе их в файл, если они не могут применяться для памяти, и, наконец, объединении в большой файл, который глобально упорядочен в соответствии с идентификатором раздела.

SortShuffleWriter против UnsafeShuffleWriter

разница UnsafeShuffleWriter SortShuffleWriter
Сортировать по В конце концов, это просто сортировка по разделам Сначала отсортируйте раздел, и ключи того же раздела будут отсортированы.
aggregation Нет сериализации еды, нет агрегации поддержка агрегации

Условия использования UnsafeShuffleWriter

  • Агрегация или порядок ключей не указаны, поскольку ключ не закодирован в указателе порядка, поэтому существует только порядок на уровне раздела.

  • Сначала сериализуются исходные данные, и десериализация больше не требуется.После сортировки соответствующих метаданных сериализатору необходимо поддерживать перемещение и считывание соответствующих данных в указанном месте. KryoSerializer и пользовательские сериализаторы spark sql поддерживают эту функцию.

  • Количество разделов должно быть меньше 16777216, поскольку номер раздела представлен 24-битным числом.

  • Предполагается, что каждый раздел использует 27 бит для представления смещения записи, поэтому запись не может быть больше этого значения.

Сортировка памяти и выходные файлы

Рассмотрим пример сортировки записей. Стандартная процедура сортировки включает в себя сохранение набора указателей для записей и использование быстрой сортировки для замены указателей до тех пор, пока все записи не будут отсортированы. Сортировка обычно обеспечивает хорошую частоту попаданий в кэш в зависимости от характера последовательного сканирования. Однако сортировка набора указателей имеет низкую частоту попаданий в кэш, поскольку каждая операция сравнения требует разыменования двух указателей, соответствующих данным в двух случайных местах в памяти.

Итак, как мы можем улучшить локальность кеша при сортировке? Один из способов сделать это — хранить ключ сортировки каждой записи последовательно через указатель. Мы используем 8 байтов (идентификатор раздела в качестве ключа и реальный указатель данных), чтобы представить часть данных и поместить их в массив сортировки, Каждая операция сравнения и сортировки должна только линейно искать каждую пару указателей-ключей , чтобы не генерировать никаких случайных сканирований. Таким образом, если часть всех записей отсортирована, данные в этих данных можно отсортировать напрямую, что значительно повысит производительность.

Конечно, здесь данные сортируются. UnsafeShuffleWriter использует RadixSort. Это очень просто, поэтому я не буду его представлять. Для разных и понятных, пожалуйста, обратитесь к этому документу http://bubkoo.com/2014/01/15/sort -алгоритм/основание- сортировка/

Выше описан процесс подачи заявки на память.Применяемая память записывается как страница в выделенныхСтраницах.При разливе память освобождается.Есть используемая в данный момент currentPage.Если ее не хватает продолжаем подавать.

Вы можете посмотреть на картинку выше, каждый раз, когда запись вставляется на страницу, partitionId + pageNumber + offset in page вставляется в LongArray как элемент, и когда данные наконец прочитаны, LongArray сортируется по RadixSort, и после сортировки исходные данные индексируются в соответствии с элементом указателя по очереди, так что уровень раздела упорядочен.

При переносе файлов UnsafeShuffleInMemorySorter создает итератор данных и возвращает итератор, отсортированный по идентификатору раздела. Каждый элемент детализации итератора представляет собой указатель, соответствующий структуре данных PackedRecordPointer. Структура данных, определенная PackedRecordPointer,[24 bit partition number][13 bit memory page number][27 bit offset in page]Тогда по указателю можно получить настоящую запись, и она была сериализована при входе в UnsafeShuffleExternalSorter в начале, так что здесь чисто массив байтов записи. Данные различных разделов в файле представлены файловым сегментом, и соответствующая информация хранится в структуре данных SpillInfo.

Объединить файлы

Индекс раздела каждого файла разлива хранится в структуре данных SpillInfo.Перед окончанием задачи нам нужно сделать операцию mergeSpills.Если fastMergeEnabled и метод сжатия поддерживает конкатенацию сжатых данных, мы можем просто подключить сжатые данные один и тот же раздел вместе. и десериализовать без распаковки. Используйте эффективную технику копирования данных, такую ​​как transferTo NIO, чтобы избежать распаковки и буферная копия.

Добро пожаловать, чтобы обратить вниманиеобмен искровыми технологиями