spark --rdd часто используемые операции преобразования и действия

Spark

Эта статья возникла из личного публичного аккаунта:TechFlow, оригинальность это не просто, прошу внимания


Сегоднятретья часть искрыВ статье мы продолжаем рассматривать некоторые операции RDD.

Ранее мы упоминали, что операции RDD в spark можно разделить на два типа.трансформация, другойдействие действие. В операции преобразования spark не будет вычислять за нас результат, а сгенерирует новый узел RDD для записи операции. Только когда операция действия будет выполнена, spark произведет весь расчет с нуля.

Операции преобразования можно дополнительно разделить на операции преобразования для элементов и операции преобразования для множеств.

Конверсионные действия для отдельных элементов

Операции преобразования элементов очень распространены, и наиболее часто используемыми являются карта и плоская карта. Судя по названию, обе они являются операциями карты.Мы все знаем операции карты, которые упоминались в предыдущих статьях MapReduce и статьях о карте Python и сокращении использования. Короче говоря, вы можете сопоставить операцию с каждым элементом.

Например, предположим, что у нас есть последовательность [1, 3, 4, 7], и мы хотим возвести в квадрат каждый элемент в ней. Конечно, мы можем сделать это с помощью цикла for, но в spark лучше использовать карту.

nums = sc.parallelize([1, 3, 4, 7])
spuare = nums.map(lambda x: x * x)

Мы знаем, что карта — это операция преобразования, поэтому квадратвсе еще RDD, мы не получим результат, выведя его напрямую, а получим только соответствующую информацию СДР:

График преобразования внутреннего RDD выглядит следующим образом:

Если мы хотим увидеть результат, мы должны выполнить операцию действия,например взять, давайте посмотрим, чтобы увидеть результаты:

В соответствии с нашими ожиданиями, операция с картой должна быть знакома учащимся, которые обращали на нее внимание раньше, так что же это за плоская карта?

Разница в этом флэте, мы все знаем, что флэт значит плоск, а флэтмап означает результат после выполнения карты.Плоский. Грубо говоря, если результатом выполнения карты является массив, то массив будет дизассемблирован, а содержимое внутри будет извлечено и объединено воедино.

Давайте посмотрим на пример:

texts = sc.parallelize(['now test', 'spark rdd'])
split = texts.map(lambda x: x.split(' '))

Поскольку объект, который мы выполняем, представляет собой строку, строка получит строку после операции разделения.массив строк. Если мы выполним карту, результатом будет:

Что, если мы сделаем плоскую карту? Мы также можем попробовать:

Сравните, вы заметили разницу?

Да, результатом выполнения карты является массив массивов, потому что каждая строка является массивом после разделения, и мы, соединяя массивы вместе, естественно получаем массив массивов. И flatMap сгладит эти массивы и соберет их вместе, что является самой большой разницей между ними.

Конверсионные действия в коллекциях

Выше описаны операции преобразования для элементов, давайте посмотрим на операции преобразования для коллекций.

Операции над множествами, вероятно, объединение, различные, пересечение и вычитание. Мы можем сначала посмотреть на следующий рисунок, чтобы иметь интуитивное ощущение, а затем мы проанализируем их один за другим:

Прежде всего, давайте посмотрим на Different, который, как следует из названия, предназначен для устранения дублирования. Это то же самое, что и отдельные в SQL.Ввод этой операции - два набора RDD.После выполнения будет создан новый RDD.Все элементы уникальныиз. Следует отметить, что выполнение Different обходится дорого, потому что оно выполняет операцию перемешивания для перемешивания всех данных, чтобы гарантировать, что существует только одна копия каждого элемента. Если вы не понимаете, что означает операция перемешивания, не беда, мы сосредоточимся на объяснении этого в следующих статьях. Только имейте в виду, что это дорого.

Вторая операция — объединение, тоже хорошо изученная — объединяет два СДР.Все элементы вместе. Вы можете думать об этом как об операции расширения в списке Python.Как и расширение, он не обнаруживает повторяющиеся элементы, поэтому, если два объединенных набора имеют одинаковые элементы, они не будут фильтроваться, а будут зарезервированы.

Третья операция — пересечение, т. е.перекресток, то есть часть, где два набора перекрываются. Это должно быть вполне понятно, давайте взглянем на следующий рисунок:

Синяя часть на рисунке ниже, то есть пересечение двух множеств A и B, является результатом A.intersection(B), который является общим элементом между двумя множествами. Так же эта операциятакже выполняет перемешивание, поэтому накладные расходы одинаковы, и эта операция удаляет повторяющиеся элементы.

Последнее — вычитание, т. е.разница, является элементом, принадлежащим A, но не B, и мы также можем использовать график для представления:

Часть, заштрихованная серым цветом на приведенном выше рисунке, представляет собой разницу между двумя наборами A и B. Точно так же эта операция также будет выполнять перемешивание, что требует очень много времени.

Помимо перечисленных, существуют декартовы, а именноДекартово произведение, выборочная выборка и другие операции над множествами, но, условно говоря, используется немного меньше, поэтому я не буду здесь слишком подробно о ней рассказывать, заинтересованные студенты могут узнать об этом, и это не сложно.

Операция действия

Наиболее часто используемой операцией действия в СДР должна быть операция получения результата, ведь мы давно вычисляем, чтобы получить результат, и только получение СДР явно не является нашей целью. RDD полученных результатов в основномвозьми, возьми и собери, У этих трех нет специального использования, поэтому я кратко представлю их.

Где collect должен получить все результаты и вернуть все элементы. И take, и top должны передавать параметрУкажите количество баров, take — вернуть указанное количество результатов из RDD, top — вернуть лучшие результаты из RDD, top и take используются точно так же, разница только в том, является ли результат лучшим.

В дополнение к этому, есть также очень часто используемое действие — подсчет, о котором не следует много говорить, операция подсчета количества фрагментов данных, вы можете узнать, сколько фрагментов данных имеется, путем подсчета.

reduce

В дополнение к этим относительно простым, давайте представим еще два более интересных.Во-первых, давайте введем сокращение. Уменьшение, как следует из названия, является сокращением в MapReduce, его использованиеПочти то же самое, что и сокращение в Python., который принимает функцию для выполнения операции слияния. Давайте посмотрим на пример:

В этом примере наша функция редукции состоит в том, чтобы добавить два целых числа, и механизм редукции повторит эту операцию, чтобы объединить все данные, так что окончательный результат будет 1 + 3 + 4 + 7 = 15.

fold

В дополнение к уменьшению есть действие, называемое сгибом, которое точно такое же, как и уменьшение, с той лишь разницей, чтоОн может настроить начальное значение, а для раздела также возьмем в качестве примера приведенный выше пример:

Может показаться немного запутанным, если посмотреть на этот пример напрямую, но простое объяснение сделает его понятным, но это не сложно. Мы заметили, что добавили дополнительный параметр 2, когда использовали распараллеливание для создания данных, это2 означает количество разделов. Несложно понять, что массив [1, 3, 4, 7] будет разделен на две части, но если собирать напрямую, то это все равно исходное значение.

Теперь мы используем fold, передавая два аргумента и начальное значение 2 в дополнение к функции. Таким образом, весь процесс расчета выглядит следующим образом:

Ответ для первого раздела 1 + 3 + 2 = 6, для второго раздела ответ 4 + 7 + 2 = 13, и, наконец, два раздела объединены: 6 + 13 + 2 = 21.

То есть мыНачальное значение присваивается результату каждого раздела, и присваивает начальное значение результату после слияния разделов.

aggregate

Честно говоря, это действие труднее всего понять, потому что оно довольно аномально. Во-первых, как для сокращения, так и для свертки требуется, чтобы тип возвращаемого значения был таким же, как тип данных rdd. Например, если тип данных — int, возвращаемый результат также должен быть int.

Но для некоторых сценариев это неприменимо.Например, если мы хотим усреднить, нам нужно знать сумму термина и количество вхождений термина, поэтому нам нужно вернуть два значения. В это время значение, которое мы инициализируем, должно быть 0, 0, то есть оно начинается с 0 для сложения и подсчета, а затем нам нужно передать две функции, например написать это:

nums.aggregate((0, 0), lambda x, y: (x[0] + y, x[1] + 1), lambda x, y: (x[0] + y[0], x[1] + y[1]))

Вы неизбежно будете сбиты с толку, когда увидите эту строку кода, не волнуйтесь, мы объясним ее по частям.

Сначала первая лямбда-функция,Здесь x не одно значение, а два значения, или 2-кортеж, то есть результат, который мы наконец вернули.В нашем ожидании возврата первое возвращаемое число является суммой nums, а второе возвращаемое число является количеством чисел в nums. И y здесь является результатом ввода nums.Очевидно, что результатом ввода nums является только целое число, поэтому y здесь одномерное. Затем мы требуем, чтобы сумма использовала, конечно, x[0] + y, то есть значение y добавляется к первому измерению, а второе измерение естественным образом добавляется на единицу, потому что каждый раз, когда мы читаем число, мы должны добавить один.

Это относительно легко понять, вторая функция может быть немного трудоемкой, вторая функция отличается от первой, она не используется для обработки данных nums, нодля обработки разделов. Когда мы выполняем агрегат, искра выполняется не в одном потоке, он разбивает данные в цифрах на множество партиций, после того как каждая партиция получит результат, ее нужно объединить, эта функция будет вызываться при слиянии.

Как и в случае с первой функцией, первый x — это окончательный результат, а y — это значение, которое необходимо объединить после завершения других операций с разделами. Итак, y здесь двумерный, первое измерение — это сумма определенного раздела, а второе измерение — это количество элементов в определенном разделе, тогда, конечно, мы должны добавить его к x.

На приведенном выше рисунке показан процесс расчета при наличии двух разделов, гдеlambda1 — это первая анонимная функция, которую мы передаем, по той же причине lambda2 — это вторая анонимная функция, которую мы передали. Я думаю, что комбинированная диаграмма должна быть легкой для понимания.

Помимо этих нескольких действий, мы не будем их повторять из соображений экономии места, а если они появятся в следующих статьях, мы подробно объясним их. Одна из основных причин, по которой новички более устойчивы к искре обучения, заключается в том, что они считают ее слишком сложной, и даже в операциях различают операции преобразования и операции действия. На самом деле все этоОптимизация производительности для ленивой оценки. Таким образом мы можем совмещать несколько операций и выполнять их вместе, тем самым снижая потребление вычислительных ресурсов.Для распределенных вычислительных фреймворков очень важным показателем является производительность.Понимая это,зачем искра делает такой дизайн?Разобраться несложно.

Не только искра, но и фреймворки глубокого обучения, такие как TensorFlow. По сути, многие, казалось бы, нелогичные конструкции имеют более глубокие причины. Поняв это, на самом деле легко догадаться. Все операции, которые получают конечный результат, часто являются действиями действия, если просто некоторые расчеты, девять из десяти конверсионных действий.

постоянная работа

RDD в Spark оцениваются лениво, и иногда мы хотимИспользование одного и того же RDD несколько раз. Если мы просто вызовем операцию действия, то spark многократно пересчитает RDD и все соответствующие ему данные и другие зависимости, что, очевидно, приносит много накладных расходов. Естественно, мы хотим кэшировать часто используемые RDD и использовать их всякий раз, когда они нам нужны, вместо того, чтобы перезапускать их каждый раз, когда мы их используем.

Чтобы решить эту проблему, spark предоставляетУпорствооперация. Так называемое постоянство можно просто понимать как кэширование. Использование также очень простое, нам нужно только сохранить RDD:

texts = sc.parallelize(['now test', 'hello world'])
split = texts.split(lambda x: x.split(' '))
split.persist()

После вызова постоянства RDD будет кэшироваться впамять или дискСреди них мы можем вызвать его и использовать в любое время, когда нам это нужно, поэтому нам не нужно запускать весь процесс перед ним снова и снова. И искра поддерживает несколько уровней персистентных операций, мы можем передатьStorageLevelпеременная для управления. Давайте посмотрим на значение этого StorageLevel:

Мы можем выбрать соответствующий уровень кэша в соответствии с нашими потребностями. Конечно, поскольку есть настойчивость, естественно, естьАнти-персистентность, для некоторых RDD, которые больше не нужно кэшировать, мы можем вызвать unpersist, чтобы удалить их из кэша.

Хотя сегодняшнее содержание, кажется, имеет различные операции, некоторые из них не часто используются Нам нужно только общее впечатление Детали конкретных операций можно внимательно изучить, когда они используются. Я надеюсь, что каждый можетигнорировать эти несущественные детали, чтобы понять суть ядра.

На сегодняшней статье все. Если вы чувствуете, что что-то приобрели, пожалуйста, нажмитеПодпишитесь или сделайте ретвитЧто ж, твое маленькое усилие много значит для меня.