Эта статья возникла из личного публичного аккаунта:TechFlow, оригинальность это не просто, прошу внимания
СегодняЧетвертая часть искровой темыСтатья, давайте взглянем на Pair RDD.
определение
В предыдущих статьях мы познакомились с соответствующими концепциями RDD, а также поняли основные операции преобразования и операции действия RDD. Сегодня давайте рассмотрим самые распространенные среди RDDPairRDD, также известный как пара ключ-значение RDD, можно понимать как KVRDD.
КВ хорошо понял, т.е.сочетание ключа и значения, например dict в Python или map в C++ и Java, базовыми элементами являются пары ключ-значение. По сравнению с предыдущим базовым RDD, pariRDD может поддерживать больше операций, является относительно более гибким и может выполнять более сложные функции. Например, мы можем агрегировать по ключу или вычислять пересечение и т. д.
Таким образом, pairRDD — это просто RDD, тип данных которого — структура KV, он не имеет большого значения, так что вам не нужно об этом беспокоиться.
Парные операции преобразования RDD
Пара RDD также является RDD, поэтому операция преобразования RDD, представленная перед Pair RDD, также может использоваться естественным образом. Они чем-то похожинаследование классовОтношения, RDD — это родительский класс, а Pair RDD — это подкласс, который реализует некоторые новые функции. Подкласс может вызывать все методы суперкласса, но суперкласс не может вызывать методы подкласса.
При вызове следует отметить, что, поскольку формат данных в нашей Pair RDD представляет собой кортеж KV, функция, которую мы передаем, должна быть длябиграммаданные, иначе могут быть проблемы с результатом операции. Ниже мы перечисляем некоторые из наиболее часто используемых конверсионных действий.
Для удобства демонстрации мы используем фиксированный RDD для выполнения различных операций преобразования, чтобы интуитивно понять, что делают эти операции преобразования.
ex1 = sc.parallelize([[1, 2], [3, 4], [3, 5]])
ключи, значения и sortByKey
Эти три операции преобразования должны быть наиболее часто используемыми и самыми простыми, настолько простыми, что мы можем буквально догадываться об их значении.
Давайте сначала посмотрим на ключи и значения:
Первый элемент двойки в нашем RDD будет использоваться в качестве ключа, а второй элемент будет использоваться в качестве значения.Следует отметить, что это не карта или дикт, поэтомуИ ключ, и значение могут повторяться.
sortByKey также очень интуитивно понятен. Из буквального значения мы можем видеть, что данные в RDD сортируются в соответствии со значением ключа. Аналогично, давайте посмотрим на результаты:
mapValues и плоскиеMapValues
mapValues нельзя использовать напрямую, в то время какФункция должна быть передана в качестве параметра. Это означает выполнить эту функцию для всех значений, Например, если мы хотим преобразовать все значения в строки, мы можем сделать это:
Операция flatMapValuesВопреки нашему восприятию, все мы знаем, что операция flatMap может разбить вложенный массив, но как разбить вложенность значения? В конце концов, наше значение не обязательно является массивом, который относится к функции, которую мы передали. Работа этого flatMap на самом деле предназначена для результата, возвращаемого функцией, то есть функция вернет итератор, а затем сломает вверх по содержимому На самом деле это значение в этом итераторе.
Мне может быть немного скучно это выражать, давайте посмотрим на пример, чтобы понять:
Я не знаю, является ли этот результат неожиданным.Весь процесс такой.После того, как мы вызываем операцию flatMapValues, мы возвращаем итератор, и содержимое итератора является диапазоном(x,x+3). ФактическиКаждому ключу соответствует один такой итератор, а затем разбить содержимое итератора и сформировать новую пару с ключом.
groupByKey, reduceByKey и foldByKey
Эти две функции также относительно близки.Давайте поговорим о первой.Если вы изучали SQL, вы должны быть хорошо знакомы со значением группировки по операции. Неважно, если вы этого не знаете, сгруппируйте поЭто может быть просто понято как слияние или группировка. То есть значения с одним и тем же значением ключа объединяются вместе, и результатом является пара RDD списка ключей, то есть мы помещаем значения с одним и тем же значением ключа в список.
Давайте также рассмотрим пример:
Результат, который мы получаем после вызова groupby, — это объект, поэтому нам нужно вызвать mapValues, чтобы преобразовать его в список, прежде чем мы сможем его использовать, иначе мы не можем использовать collect для его получения.
ReducebyKey и GroupByKey аналогичны, но GroupByKey возвращается только вместе, а редьюцебук является результатом входящей функции Reduce и выполняет сокращение после REDUCE. Давайте рассмотрим пример:
В этом примере мы выполняем накопление, складывая значения с одинаковым значением ключа.
Использование foldByKey и fold не сильно отличается, единственная разница в том, что мыДобавлена логика агрегации на основе значения ключа. Если мы установим начальное значение раздела равным 0, то оно работает почти так же, как и reduceByKey:
Нам нужно только очистить foldByKeyНачальное значение для разделаВот и все.
combineByKey
Это также очень важная и непростая для понимания операция преобразования. Давайте сначала посмотрим на ее параметры. Всего она принимает 5 параметров. Давайте пройдемся по ним один за другим, сначала первый параметр, этоcreateCombiner.
что он делаетинициализация, инициализировать значение в соответствии с нашими потребностями, например, преобразовать строковый тип в int или выполнить другие операции. Мы можем записать это в виде V => C, где V — значение, а C — новое значение после нашей инициализации.
Он будет передан во вторую функцию как новая пара вместе со значением, поэтому вторая функцияПринимает двойку с аргументами (C, V). Что мы хотим сделать, так это определить объединение этих двух кортежей, чтобы вторую функцию можно было записать как (C, V) => C. Комментарии в исходном коде и онлайн-туториалах написаны таким образом, но я думаю, что это может быть трудно понять из-за появления двух C. Я думаю, что это можно записать как (C, V) => D, что лучше .
Последняя функция — объединить D, поэтому ее можно записать как (D, D) => D.
На данный момент мы, кажется, понимаем его принцип, но кажется, что есть много вопросительных знаков, и мы всегда чувствуем, что что-то не так. Я долго думал об этом, прежде чем нашел источник проблемы, где слияние. Вы обнаружили, что вторая функция и третья функция используются для слияния,Почему мы сливаем дважды, какая между ними разница? Если эта проблема не понята, то ее использование должно быть неправильным.Я лично считаю, что эта проблема является ядром этой операции преобразования, и блоги, которые не объясняют эту проблему четко, недостаточно ясны.
По сути, логика этих двух слияний схожа, ноСфера консолидацииНе то же самое, первое слияние для раздела, а второе слияние для ключа. Поскольку в spark данные могут храниться более чем в одном разделе, мы должны объединить их дважды: первый раз для интеграции данных внутри раздела и второй раз для объединения между разделами. Поскольку данные разных разделов могут быть далеко друг от друга, это приведет к тому, что время передачи по сети будет слишком большим, поэтому мы надеемся, что передаваемые данные будут как можно меньше, что является причиной двойного группирования.
Давайте посмотрим на другой пример:
В этом примере мы рассчиталиСреднее количество вхождений каждого слова, давайте рассмотрим его по частям. Во-первых, наша первая функция преобразует значение в кортеж (1, значение).0-й элемент кортежа представляет количество документов, в которых встречается слово, а 1-й элемент представляет количество вхождений в документе. Так что для второй функции, то есть функции, которая агрегирует внутри группы, нам нужно только добавить единицу к количеству появляющихся документов и накопить количество вхождений. Потому что объектами этой агрегации являются все элементы типа (1, значение), то есть до агрегации результата нет.
В третьей функции мы также накапливаем общее количество вхождений, потому что результат, обработанный этой функцией, — это результат того, что каждый раздел был агрегирован один раз. Например, яблоко появляется в двух документах в одном разделе, всего 20 раз, и в трех документах в одном разделе, всего 30 раз, тогда, очевидно, мы появляемся в 5 документах всего, всего 30 раз, появляется 50 раз.
Так как мы хотим рассчитать среднее, поэтому мы используемОбщее количество вхождений, деленное на количество отображаемых документов. Наконец, после карты, поскольку то, что мы получаем, все еще является 2-кортежом, мы не можем собирать напрямую, нам нужно использовать collectAsMap.
Приведенный выше пример покажем диаграммой, будет несложно понять:
подключить операцию
В Spark, помимо основных операций преобразования, Spark также предоставляет дополнительныеподключить операциюДайте пару РДД. Благодаря соединению мы можем легко управлять RDD как коллекцией. Метод работы также очень прост, что очень похоже на форму операционных таблиц данных в SQL, то есть на операцию соединения. Операцию соединения можно разделить на соединение (внутреннее соединение), левое соединение и правое соединение.
Если вы знакомы с SQL, разница между этими тремя должна быть очень ясна, это то же самое, что и соединение в SQL. Ничего страшного, если вы не знакомы, это не сложно объяснить. При объединении мы часто используем одну таблицу для присоединения к другой таблице, точно так же, как при вычитании двух чисел, мы используем одно число для вычитания другого числа. Например, A.join(B), мы ставимA называется левой таблицей, B называется правой таблицей. Так называемое объединение — это соединение строк с одним и тем же полем или некоторыми значениями поля в двух таблицах.
Например, стол — это стол для студентов, а стол — это стол для посещаемости. Как только наши две таблицы связаны с идентификатором студента, мы получаем запись о посещаемости студента. Но так как это установленная ассоциация,Возникнет ситуация, когда данные не могут быть сопоставлены.. Например, учащийся не посещал занятия или его студенческий билет был неправильно записан в табеле посещаемости. Для ситуации, когда данные не связаны, у нас есть четыре способа справиться с этим. Первый — отбросить все данные, которые нельзя с ним связать. Второй — сохранить все поля, а поля, которые не могут быть связаны, записываются как NULL. В-третьих, левый стол не связан с резервированием, а правый стол отбрасывается. Четвертый - сохранить правую таблицу и отказаться от левой таблицы.
На следующем рисунке показаны эти четыре присоединения, которые очень ярки.
Давайте рассмотрим несколько практических примеров.
Сначала создайте набор данных:
ex1 = sc.parallelize([['frank', 30], ['bob', 9], ['silly', 3]])
ex2 = sc.parallelize([['frank', 80], ['bob', 12], ['marry', 22], ['frank', 21], ['bob', 22]])
Затем мы запускаем эти четыре объединения по отдельности и наблюдаем за результатами после объединений.
Из результатов видно, что если в обоих наборах данных есть несколько данных с одинаковым значением ключа, искра умножит их и сопоставит вместе.
Операция действия
Наконец, мы рассмотрим операцию действия пары RDD. пара RDD также является rdd, поэтомуОперации действия, применимые к обычным rdd, также применимы к парным rdd. Но вдобавок к этому Spark разработал для него уникальные действия.
countByKey
Операция countByKey, как следует из ее названия, вычисляет количество вхождений каждого значения ключа на основе значения ключа, что эквивалентно оператору SQL count groupby. Давайте рассмотрим конкретный пример:
collectAsMap
Это тоже очень понятно, на самом деле речь идет о конечном результатевывод в виде карты:
Как вы можете видеть из возвращенных результатов, вывод имеет тип dict. То есть «карта» в Python.
lookup
Это слово кажется относительно редким, но на самом деле оно обозначаетНайдите значение соответствующего значения по ключевому значению. То есть обычно используемая функция get, когда мы передаем значение ключа, автоматически возвращает все значения, соответствующие значению ключа. Если значений несколько, будет возвращен список.
Суммировать
На данный момент введены все парные операции, связанные с RDD. Пара rdd очень часто встречается в нашем повседневном использовании, и ее очень удобно использовать для реализации некоторых более сложных операций.
Кроме того, сегодняшняя статья содержит много контента, и чтобы его полностью понять, нужно приложить немного усилий. Это не то, чего можно достичь, прочитав статью, но это не имеет значения. Когда мы впервые изучаем, нам нужно только иметь общее впечатление об этих API и методах использования. Конкретные детали использования можно проверить позже, когда они используются данные о.
На сегодняшней статье все. Если вы чувствуете, что что-то приобрели, пожалуйста, нажмитеПодпишитесь или сделайте ретвитЧто ж, твое маленькое усилие много значит для меня.