spark | Научить вас использовать spark для предварительной обработки данных

Spark

Эта статья возникла из личного публичного аккаунта:TechFlow, оригинальность это не просто, прошу внимания


Сегодняискровая темаВ седьмой статье рассмотрим анализ и обработку данных spark.

фильтр для дедупликации

В машинном обучении и анализе данных понимание и знакомство с данными являются самыми основными. Так называемую умную женщину трудно приготовить без риса.Если мы говорим, что использование данных для построения модели или поддержки сложного бизнеса верхнего уровня похоже на приготовление пищи. Тогда данные не являются «рисом», в лучшем случае его можно рассматривать только как нешелушенный рис. Чтобы приготовить вкусное блюдо, сырой рис необходимо обработать.

Однако обработка не должна вестись без разбора, многие занимаются обработкой данных, просто забивая себе голову.Удаление нулей, нормализация и горячее, этот процесс очень знаком. Чтобы, когда я это делаю, я не думал о том, в чем смысл этих процедур. Наша обработка данных также является целенаправленной, и мы используем разные стратегии для разных ситуаций. Итак, к этому моменту вы должны были понять, что первая задача — сначала получить базовое представление о данных.быть осведомленным.

Итак, откуда вы знаете, что делать? Давайте сначала рассмотрим конкретный пример, предположив, что у нас сейчас есть такой пакет данных:

df = spark.createDataFrame([
(1, 144.5, 5.9, 33, 'M'),
(2, 167.2, 5.4, 45, 'M'),
(3, 124.1, 5.2, 23, 'F'),
(4, 144.5, 5.9, 33, 'M'),
(5, 133.2, 5.7, 54, 'F'),
(3, 124.1, 5.2, 23, 'F'),
(5, 129.2, 5.3, 42, 'M'),
], ['id', 'weight', 'height', 'age', 'gender'])

Этот пакет данных на первый взгляд не кажется проблемой, но на самом деле в нем скрыто несколько ям.

Во-первых, есть две части данных с идентификатором 3. Не только это, но и характеристики этих двух частей данных абсолютно одинаковы. Во-вторых, объекты данных с id 1 и 4 тоже одинаковые, но id разные. Кроме того, есть два данных с id 5, но их характеристики разные. Очевидно, это не один и тот же фрагмент данных, это должна быть ошибка при записи.

Итак, для таких данных, как мы находим в них проблемы и как мы их исправляем?

Начнем с самого простого, сначалаНайти точно такие же данные. Мы можем использовать метод count, чтобы найти количество записей во всем наборе данных, и использовать Different().count() для получения количества данных после дедупликации. Используя эти два вместе, вы можете увидеть, есть ли полное дублирование данных.

Видно, что прямой счет равен 7, а если добавить различные, то получится 6, т. е.Полное дублирование данных. Тогда мы можем знать, что нам нужно сделать некоторую дедупликацию, чтобы удалить полностью дублированные строки.Это очень просто удалить.Фрейм данных поставляется с ним.dropDuplicatesметод, мы можем вызвать его напрямую:

Очевидно, только что отсутствовал один фрагмент данных с тем же идентификатором 3, и он был удален.

Далее продолжаем анализировать, как судить, есть ли ситуация, когда id другой, а остальные данные совпадают?

На самом деле, Different.count используется точно так же, но нам нужно убрать id из категории операции count Different. Мы можем получить имена столбцов в кадре данных через столбцы, мыПройдите список имен и отфильтруйте идентификатор.

Здесь мы по-прежнему применяем different.count, но ограничиваем область использования через select перед его использованием.Выполнять вычисления дедупликации только для столбцов, отличных от id.

DropDuplicate может не только отличаться, но и ограничивать сферу действия. Используемый метод также очень прост, мы управляем им через подмножество переменных, мыПередайте список, представляющий диапазон дубликатов.

Очевидно, что наши данные уменьшились еще на единицу. Это означает, что мы убрали случай, когда id другой, но контент тот же, и, наконец, оставили случай, когда id тот же, но контент другой. Эта ситуация обычно возникает из-за ошибки при записи, например из-за неправильной обработки параллелизма, в результате чего два разных сообщения используют один и тот же идентификатор.

Это очень просто, потому что мы прошли полную дедупликацию, поэтому обычно не должно быть записей с одинаковым идентификатором. Поэтому нам нужно только судить, повторяется ли идентификатор. Метод оценки также очень прост, мы считаем количество идентификаторов.

Здесь мы, как и раньше, можем судить по Different.count, а здесь мы вводим новый метод под названием agg. agg — это сокращение от агрегата, что буквально означает агрегирование. С агг мы можемАгрегированные вычисления по некоторым столбцам, такие как сумма, мин., макс. В этой задаче вычисления агрегации, которые мы хотим выполнить, являются count и count отличными.У этих двух также есть готовые функции, и мы можем их импортировать и использовать напрямую.

То есть через agg мыАгрегированные операции могут выполняться с разными столбцами одновременно., мы обнаружили, что после добавления различных осталось только 4, что указывает на то, что есть два разных идентификатора данных с одним и тем же идентификатором.

Следующее, что нам нужно сделать, это сгенерировать новый идентификатор для этих данных, чтобы убедиться, что идентификатор каждого фрагмента данных уникален. У этого также есть специальная функция, мы просто вызываем ее напрямую:

Метод monotonally_increasing_idбудет генерировать уникальный и увеличивающийся идентификатор, чтобы мы сгенерировали новый идентификатор и завершили фильтрацию дедупликации всех данных.

Нулевая обработка

Когда мы закончили фильтрацию и очистку данных и это еще не конец, нам также нужно разобраться с нулевыми значениями. Поскольку фактические данные часто не идеальны, могут быть случаи, когда данные не собираются для некоторых функций. Нулевые значения обычно не могут напрямую входить в модель, поэтому нам нужно иметь дело с нулевыми значениями.

Давайте создадим еще один пакет данных:

df_miss = spark.createDataFrame([
(1, 143.5, 5.6, 28, 'M', 100000),
(2, 167.2, 5.4, 45, 'M', None),
(3, None , 5.2, None, None, None),
(4, 144.5, 5.9, 33, 'M', None),
(5, 133.2, 5.7, 54, 'F', None),
(6, 124.1, 5.2, None, 'F', None),
(7, 129.2, 5.3, 42, 'M', 76000),
], ['id', 'weight', 'height', 'age', 'gender', 'income'])

По сравнению с предыдущими данными эти данные ближе к нашей реальной ситуации, например, есть несколько строк данных, которые в основном пусты, и есть некоторые столбцы, которые в основном пусты. Поскольку реальные данные часто распределяются неравномерно, некоторые признаки и выборки относительно редки. Например, некоторые теги или модели поведения являются узкоспециализированными, и у многих пользователей их нет, или некоторые пользователи имеют очень редкое поведение и используют продукт только изредка, поэтому большинство функций отсутствуют.

Таким образом, мы можем захотеть увидеть, какие образцы отсутствуют более серьезно, мы хотим получить пару идентификатора и количество отсутствующих функций. Эту операцию сложно реализовать через собственный API фрейма данных, нам нужноСначала преобразуйте фрейм данных в rdd, а затем сделайте это через MapReduce.:

image-20200525163206376
image-20200525163206376

Мы видим, что у 3 больше всего отсутствующих значений, поэтому мы можем посмотреть эти данные отдельно:

Мы также можем посмотреть на ситуацию с пропущенными значениями в каждом столбце и на то, какая доля отсутствует. Поскольку нам нужно агрегировать каждый столбец, здесь используется метод agg:

Этот код может выглядеть немного сложнее из-за операции *. Потому что, когда функция agg передает список, она может работать с несколькими столбцами. И здесь мы хотим посчитать каждый столбец. Из-за большого количества колонок вручную пронумеровать нам явно нереально. Таким образом, мы реализуем его с помощью цикла, а оператор * означает развертывание цикла.count('*') эквивалентен count(1) в операторе SQL, что означает вычисление общего количества баров.

Из результатов видно, что функция дохода отсутствует больше всего, при этом 71% данных пусты. Тогда очевидно, что эта функция нам мало полезна, потому что пропажа слишком серьезная, а возможности восполнения нет. Итак, мы удаляем эту строку:

После того, как мы удалили данные о доходах, мы обнаружили, что все еще есть несколько очень серьезных пропущенных строк.хочу установить порог, чтобы отфильтровать строки с более чем определенным количеством вакансий функций, потому что эффект также невелик.

Эту функцию не нужно разрабатывать самим, ее поддерживает нативное API в датафрейме.

После такой обработки оставшееся недостающее относительно невелико. В настоящее время мы не хотим его больше удалять, потому что только некоторые данные свободны, а другие данные все еще действуют.Если удалить, объем данных будет недостаточным. Итак, наш обычный способзаполните эти функции.

Заполнение пропущенных значений является очень распространенным методом обработки данных, и существует несколько способов его заполнения. НапримерВы можете заполнить среднее значение, или вы можете заполнить медиану или моду, и вы можете обучить другую модель прогнозировать на основе других функций.. Короче, методов еще много, здесь мы используем самый простой метод, то есть среднее значение для заполнения. Взгляните, как среднее заполнение работает в искре.

Поскольку он должен быть заполнен, очевидно, что сначала необходимо вычислить среднее значение. Итак, сначала нам нужно вычислить среднее значение каждой функции. Здесь пол следует исключить, потому что пол является категориальным признаком и не имеет среднего значения. Поэтому, если вы хотите указать пол, вы можете указать только режим или использовать модель для прогнозирования, а не напрямую использовать среднее значение.

Сам расчет среднего значения не сложен и похож на серию операций только что. Но одно замечание, мы получили результат здесь, ноно не может быть передан напрямую в качестве параметра. Поскольку метод fillna в кадре данных поддерживает только передачу целого числа, числа с плавающей запятой, строки или словаря. Итак, нам нужно преобразовать эти данные в dict. Преобразование здесь немного проблематично, потому что фрейм данных нельзя преобразовать напрямую, нам нужноСначала преобразуйте в панд, а затем вызовите метод to_dict в пандах.

У нас есть среднее значение типа dict, и мы можем использовать его для заполнения:

Суммировать

В реальной работе или соревнованиях kaggle процессы обработки и анализа данных намного сложнее, чем те, которые описаны в статье. Но дедупликация, фильтрация и заполнение — самые основные и важные части обработки данных. Можно даже сказать, что как бы ни менялись сценарии применения и обновлялись методы решения задач, это незаменимые части.

Если вам понравилась эта статья, пожалуйстаобращать внимание, подбодрите меня и облегчите доступ к другим статьям.

В этой статье используетсяmdniceнабор текста