Основное руководство по PySpark — первый шаг анализа больших данных (с реализацией кода)

глубокое обучение

Обзор

  • Данные растут с беспрецедентной скоростью
  • Как хранить, обрабатывать и использовать эти данные для машинного обучения?Spark может ответить на эти вопросы
  • Узнайте, что такое Spark, как он работает и какие компоненты задействованы.

Введение

Мы генерируем данные с беспрецедентной скоростью. Честно говоря, я не могу уследить за огромным объемом данных, генерируемых по всему миру! Я уверен, вы видели, сколько данных создается в наши дни. McKinsey, Gartner, IBM и другие компании предоставили данные о своих компаниях.

Вот некоторые невероятные цифры для справки. За один день отправлено более 500 миллионов твитов, 90 миллиардов электронных писем, 65 миллионов сообщений WhatsApp! Facebook генерирует 4 петабайта данных за 24 часа. Это невероятно!

Конечно, это также приносит проблемы. Как группа специалистов по обработке и анализу данных собирает так много данных? Как вы их обрабатываете и строите на их основе модели машинного обучения? Это волнующие вопросы, если вы специалист по данным или инженер данных.

Spark решает эти проблемы. Spark написан на языке Scala, который предоставляет интерфейсы для Scala, JAVA, Python и R. API, с которым работает PySpark. PySpark — это Python API, написанный на Python для поддержки Spark.

Традиционным способом работы с большими данными является использование распределенных сред, таких как Hadoop, но этим средам необходимо выполнять много операций чтения и записи на жестком диске. На самом деле время и скорость очень дороги. Вычислительная мощность также является важным препятствием.

PySpark справляется с этим эффективно и понятно. Итак, в этой статье мы начнем узнавать все об этом. Мы узнаем, что такое Spark, как установить его на свой компьютер, а затем углубимся в различные компоненты Spark. Код прикреплен к этой статье.

содержание

  1. Что такое Искра?
  2. Установите Apache Spark на свой компьютер
  3. Что такое Spark-приложение?
  4. Что такое сеанс Spark?
  5. Разделение в Spark
  6. конвертировать
  7. ленивое вычисление
  8. Типы данных в Spark

Что такое Искра?

Apache Spark — это среда распределенных кластерных вычислений с открытым исходным кодом для быстрой обработки, запросов и анализа больших данных.

На сегодняшний день это самая эффективная среда обработки данных на предприятии. Использование Spark стоит дорого, потому что требует много памяти для вычислений, но по-прежнему является фаворитом специалистов по данным и инженеров по работе с большими данными. В этой статье вы увидите, почему это так.

Организации, которые обычно полагаются на платформу Map-Reduce, теперь обращаются к среде Apache Spark. Spark выполняет вычисления в памяти и работает в 100 раз быстрее, чем фреймворки Map Reduce, такие как Hadoop. Spark популярен среди специалистов по данным, потому что он помещает распределение и кэширование данных в память и помогает им оптимизировать алгоритмы машинного обучения для больших данных.

Я рекомендую посетить официальную страницу Spark для более подробной информации. Он имеет обширную документацию и является хорошим справочным руководством по Spark: https://spark.apache.org/

Установите Apache Spark на свой компьютер

1. Загрузите Apache Spark

Самый простой способ установить Spark — через pip. Однако, согласно официальной документации Spark, это не рекомендуемый подход, поскольку пакет Spark Python не предназначен для замены всех других случаев.

Скорее всего, вы столкнетесь с множеством ошибок при реализации базового функционала. Он работает только для взаимодействия с существующими кластерами (автономными Spark, YARN или Mesos).

Итак, первый шаг — загрузить последнюю версию Apache Spark отсюда. Разархивируйте и переместите сжатый файл:

tar xzvf spark-2.4.4-bin-hadoop2.7.tgz 
mv spark-2.4.4-bin-hadoop2.7 spark
sudo mv spark/ /usr/lib/

2. Установите JAVA

Убедитесь, что в вашей системе установлена ​​JAVA. Я настоятельно рекомендую JAVA 8, поскольку известно, что у Spark2 есть проблемы с JAVA 9 и другими:

sudo apt install default-jre
sudo apt install openjdk-8-jdk

3. Установите инструменты сборки Scala (SBT)

Когда вы работаете над небольшим проектом с несколькими файлами исходного кода, проще скомпилировать их вручную. Но что если вы работаете над большим проектом с сотнями файлов исходного кода?В этом случае вам понадобится инструмент сборки.

SBT, сокращение от Scala Build Tool, управляет вашим проектом Spark и зависимостями библиотек, которые вы используете в своем коде.

Помните, что вам не нужно устанавливать его, если вы используете PySpark.. Но если вы используете JAVA или Scala для создания приложения Spark, вам необходимо установить SBT на свой компьютер. Выполните следующую команду для установки SBT:

echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo apt-key add
sudo apt-get update
sudo apt-get install sbt

4. Настройте искру

Затем откройте каталог конфигурации Spark и скопируйте шаблон среды Spark по умолчанию. это былоspark-env.sh.templateпоявилась форма. Открыть в редакторе:

cd /usr/lib/spark/conf/ 
cp spark-env.sh.template spark-env.sh 
sudo gedit spark-env.sh

Теперь в файлеspark-env.shсередина. Добавить кJAVA_HOME, и установите лимит памятиSPARKWORKERMEMORYСделайте задание. Здесь я выделил его как 4 ГБ:

## 添加变量
JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
SPARK_WORKER_MEMORY=4g

5. Установите переменные среды Spark

Откройте и отредактируйте с помощью следующей командыbashrcдокумент. этоbashrcФайл — это скрипт, который выполняется всякий раз, когда вы запускаете новый сеанс терминала:

## 打开bashrc
sudo gedit ~/bashrc

Добавьте в файл следующие переменные среды:

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64 
export SBT_HOME=/usr/share/sbt/bin/sbt-launch.jar 
export SPARK_HOME=/usr/lib/spark
export PATH=$PATH:$JAVA_HOME/bin
export PATH=$PATH:$SBT_HOME/bin:$SPARK_HOME/bin:$SPARK_HOME/sbin
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
export PYSPARK_PYTHON=python3
export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH

Теперь обновитеbashrcдокумент. Это перезапустит сеанс терминала с обновленным скриптом:

source ~/.bashrc

Теперь в терминальном типеpyspark, он откроется в браузере по умолчаниюJupyterи автоматически инициализируемая переменная с именемscСреда Spark (это точка входа для службы Spark):

Что такое Spark-приложение?

Приложение Spark — это экземпляр контекста Spark. Он состоит из процесса-драйвера и набора процессов-исполнителей.

Процесс драйвера отвечает за хранение информации о приложении Spark, реагирование на коды, диспетчеризацию и планирование работы в исполнителях.. Процесс драйвера очень важен, он является ядром приложения Spark и поддерживает всю необходимую информацию в течение всего срока службы приложения.

Исполнители несут ответственность за фактическое выполнение работы, порученной им водителем.. Поэтому каждый исполнитель отвечает только за две вещи:

  • Выполнить задачу, поставленную перед ним водителем
  • Сообщите о состоянии вычислений на исполнителе обратно на узел драйвера.

Что такое сеанс Spark?

Мы знаем, что процесс драйвера управляет приложением Spark.Процесс драйвера представляется пользователю как объект, называемый сеансом Spark.

Экземпляры сеанса Spark могут использовать Spark для выполнения пользовательских операций в кластере. В Scala и Python переменные сеанса Spark доступны при запуске консоли:

Разделение в Spark

Разделение означает, что полные данные не появляются в одном месте. Он разделен на блоки, которые размещены на разных узлах.

Если есть только один раздел, даже с тысячами исполнителей, Spark будет иметь только одну степень параллелизма. Кроме того, если есть несколько разделов, но только один исполнитель, параллелизм Spark все равно будет только один, потому что есть только один вычислительный ресурс.

В Spark API более низкого уровня позволяют нам определять количество разделов.

Давайте рассмотрим простой пример, чтобы понять, как секционирование помогает нам быстрее получать результаты. Мы создадим список из 20 миллионов случайных чисел от 10 до 1000 и посчитаем числа больше 200.

Давайте посмотрим, как быстро мы можем сделать это всего с одним разделом:

from random import randint 

# 创建一个随机数字的列表在10到1000之间
my_large_list = [randint(10,1000) for x in range(0,20000000)]

# 创建一个分区的列表
my_large_list_one_partition = sc.parallelize(my_large_list,numSlices=1)

# 检查分区数量
print(my_large_list_one_partition.getNumPartitions())
# >> 1

# 筛选数量大于等于200的数字
my_large_list_one_partition = my_large_list_one_partition.filter(lambda x : x >= 200)

# 在jupyter中运行代码 
# 执行以下命令来计算时间
%%time

# 列表中元素的数量
print(my_large_list_one_partition.count())
# >> 16162207

one_partition_f

При использовании одного раздела на фильтрацию чисел ушло 34,5 мс:

Теперь увеличим количество разделов до 5 и проверим время выполнения:

# 创建五个分区
my_large_list_with_five_partition = sc.parallelize(my_large_list, numSlices=5)

# 筛选数量大于等于200的数字
my_large_list_with_five_partition = my_large_list_with_five_partition.filter(lambda x : x >= 200)

%%time 

# 列表中元素的数量
print(my_large_list_with_five_partition.count())
# >> 16162207

При использовании 5 разделов на фильтрацию чисел ушло 11,1 мс:

конвертировать

В Spark структуры данных неизменяемы. Это означает, что их нельзя изменить после создания. Но если мы не можем изменить его, как мы можем его использовать?

Поэтому, чтобы внести изменения, нам нужно указать Spark, как изменять данные. Эти инструкции называются преобразованиями.

Вспомните пример, который мы видели выше. Просим Spark отфильтровать числа больше 200 — это по сути преобразование. В Spark есть два типа преобразований:

  • узкий переход: в узком преобразовании все элементы, необходимые для вычисления результата одного раздела, находятся в одном разделе родительского СДР. Например, если вы хотите отфильтровать числа меньше 100, вы можете сделать это для каждого раздела отдельно. Преобразованный новый раздел зависит только от одного раздела для вычисления результата
  • широкое преобразование: при широком преобразовании все элементы, необходимые для вычисления результата одного раздела, могут находиться в нескольких разделах родительского RDD. Например, если вы хотите подсчитать количество цифр, ваше преобразование зависит от всех разделов для подсчета конечного результата.

ленивое вычисление

Предположим, у вас есть очень большой файл данных с миллионами строк. Вам необходимо выполнить анализ с помощью таких операций, как сопоставление, фильтрация, случайное разбиение и даже самые простые операции сложения и вычитания.

Теперь, с большими наборами данных, даже базовое преобразование требует миллионов операций.

Оптимизация этих операций имеет решающее значение при работе с большими данными, и Spark справляется с этим очень творчески. Все, что вам нужно сделать, это сообщить Spark, какие преобразования вы хотите применить к набору данных, и Spark выполнит ряд преобразований. Когда вы запросите у Spark результат, он определит лучший путь, выполнит необходимые преобразования и выдаст результат.

Теперь давайте возьмем пример. У вас есть текстовый файл размером 1 ГБ и создано 10 разделов. Вы также выполняете некоторые преобразования и, наконец, просите показать первую строку. В этом случае Spark будет читать файл только из первого раздела, предоставляя результаты без необходимости чтения всего файла.

Давайте рассмотрим несколько практических примеров, чтобы увидеть, как Spark выполняет ленивые вычисления. На первом этапе мы создали список из 10 миллионов номеров и создали RDD с 3 разделами:

# 创建一个样本列表
my_list = [i for i in range(1,10000000)]

# 并行处理数据
rdd_0 = sc.parallelize(my_list,3)

rdd_0

Далее мы выполним очень простое преобразование, например, добавим 4 к каждому числу. Обратите внимание, что на данный момент Spark не начал никаких преобразований. Он записывает только серию преобразований в виде графа операций RDD. Как видите, с помощью функцииtoDebugStringПосмотрите график работы RDD:


# 每个数增加4
rdd_1 = rdd_0.map(lambda x : x+4)

# RDD对象
print(rdd_1)

#获取RDD运算图
print(rdd_1.toDebugString())

Мы видим, что PythonRDD[1] подключен к ParallelCollectionRDD[0]. Теперь давайте добавим преобразования, которые добавляют 20 ко всем элементам списка.

Вы можете подумать, что добавление 24 напрямую было бы лучше, если сначала добавить 4, а затем добавить 20 за один шаг. Но проверьте график работы RDD после этого шага:

# 每个数增加20
rdd_2 = rdd_1.map(lambda x : x+20)

# RDD 对象
print(rdd_2)

#获取RDD运算图
print(rdd_2.toDebugString())

Как мы видим, он автоматически пропускает лишние шаги и добавит 24 за один шаг. Поэтому Spark автоматически определяет лучший путь для выполнения операций и выполняет преобразования только при необходимости.

Давайте возьмем еще один пример, чтобы понять процесс ленивой оценки.

Предположим, у нас есть текстовый файл и мы создаем RDD с 4 разделами. Теперь мы определяем некоторые преобразования, такие как преобразование текстовых данных в нижний регистр, разделение слов, добавление некоторых префиксов к словам и т. д.

Однако, когда мы выполняем такое действие, как получение первого элемента преобразованных данных, в этом случае нам не нужно видеть полные данные для выполнения результата запроса, поэтому Spark выполняет преобразование только в первом разделе.

# 创建一个文本文件的RDD,分区数量= 4
my_text_file = sc.textFile('tokens_spark.txt',minPartitions=4)

# RDD对象
print(my_text_file)

# 转换小写
my_text_file = my_text_file.map(lambda x : x.lower())

# 更新RDD对象
print(my_text_file)

print(my_text_file.toDebugString())

Здесь мы записываем слова в нижний регистр и берем первые два символа каждого слова.

# 分割单词
my_text_file = my_text_file.map(lambda x : x[:2])

# RDD对象
print(my_text_file)

print(my_text_file.toDebugString())

# 在所有的转换后得到第一个元素
print(my_text_file.first())

Мы создали 4 текстовых файла с разделами. Но в зависимости от нужного нам результата нет необходимости читать и выполнять преобразования на всех разделах, поэтому Spack выполняется только на первом разделе.

Что, если мы хотим подсчитать, сколько слов появляется, в этом случае нам нужно прочитать все разделы:

print(my_text_file.countApproxDistinct())

Типы данных Spark MLlib

MLlib — это масштабируемая библиотека машинного обучения Spark. Он включает в себя некоторые часто используемые алгоритмы машинного обучения, такие как регрессия, классификация, уменьшение размерности, а также некоторые инструменты для выполнения основных статистических операций с данными.

В этой статье мы подробно обсудим некоторые типы данных, предоставляемые MLlib. В будущих статьях мы обсудим такие темы, как извлечение признаков и построение конвейеров машинного обучения.

локальный вектор

MLlib поддерживает два типа локальных векторов: плотные и разреженные. Используйте разреженные векторы, когда большинство чисел равны нулю. Для создания разреженного вектора нужно указать длину вектора — индексы ненулевых значений, которые должны быть строго возрастающими и ненулевыми.

from pyspark.mllib.linalg import Vectors

## 稠密向量
print(Vectors.dense([1,2,3,4,5,6,0]))
# >> DenseVector([1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 0.0])

### 稠密向量
### Vectors.sparse( length, index_of_non_zero_values, non_zero_values)
### 索引应该严格递增且非零值

print(Vectors.sparse(10, [0,1,2,4,5], [1.0,5.0,3.0,5.0,7]))
# >> SparseVector(10, {0: 1.0, 1: 5.0, 2: 3.0, 4: 5.0, 5: 7.0})

print(Vectors.sparse(10, [0,1,2,4,5], [1.0,5.0,3.0,5.0,7]).toArray())
# >> array([1., 5., 3., 0., 5., 7., 0., 0., 0., 0.]) 

точка метки

Помеченная точка — это локальный вектор, где каждый вектор имеет метку. Это можно использовать в обучении с учителем, когда у вас есть некоторые функции цели и метки, соответствующие этим функциям.

from pyspark.mllib.regression import LabeledPoint

# 设置一个标签与一个稠密向量
point_1 = LabeledPoint(1,Vectors.dense([1,2,3,4,5]))

# 特征 
print(point_1.features)

# 标签
print(point_1.label)

локальная матрица

Локальные матрицы хранятся на машине. MLlib поддерживает как плотные, так и разреженные матрицы. В разреженной матрице ненулевые значения элементов хранятся в формате упакованного разреженного столбца (формат CSC) в порядке возрастания столбцов.

# 导入矩阵
from pyspark.mllib.linalg import Matrices

# 创建一个3行2列的稠密矩阵
matrix_1 = Matrices.dense(3, 2, [1,2,3,4,5,6])

print(matrix_1)
# >> DenseMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], False)

print(matrix_1.toArray())
"""
>> array([[1., 4.],
          [2., 5.],
          [3., 6.]])
"""

# 创建一个稀疏矩阵
matrix_2 = Matrices.sparse(3, 3, [0, 1, 2, 3], [0, 0, 2], [9, 6, 8])

print(matrix_2)
# SparseMatrix(3, 3, [0, 1, 2, 3], [0, 0, 2], [9.0, 6.0, 8.0], False)

print(matrix_2.toArray())
"""
>> array([[9., 6., 0.],
          [0., 0., 0.],
          [0., 0., 8.]])
"""

Распределенная матрица

Распределенные матрицы хранятся в одном или нескольких rds. Очень важно выбрать подходящий формат распределенной матрицы. На данный момент реализовано четыре типа распределенных матриц:

  • матрица строк
    • Каждая строка является локальным вектором. Строки могут храниться на нескольких разделах
    • Такие алгоритмы, как случайные леса, могут быть реализованы с использованием матриц строк, поскольку алгоритм делит строки на несколько деревьев. Результаты одного дерева не зависят от других деревьев. Таким образом, мы можем воспользоваться преимуществами распределенной архитектуры для распараллеливания таких алгоритмов, как случайные леса для больших данных.
# 分布式数据类型——行矩阵
from pyspark.mllib.linalg.distributed import RowMatrix

# 创建RDD
rows = sc.parallelize([[1,2,3], [4,5,6], [7,8,9], [10,11,12]])

# 创建一个分布式行矩阵
row_matrix = RowMatrix(rows)


print(row_matrix)
# >> <pyspark.mllib.linalg.distributed.RowMatrix at 0x7f425884d7f0> 

print(row_matrix.numRows())
# >> 4

print(row_matrix.numCols())
# >> 3

  • матрица строк индекса
    • Это похоже на матрицу строк, где строки хранятся в нескольких разделах в упорядоченном виде. Присвойте значение индекса каждой строке. Он используется для алгоритмов, где важна последовательность, таких как данные временных рядов.
    • Его можно создать из RDD IndexedRow.
# 索引行矩阵

from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

#创建RDD
indexed_rows = sc.parallelize([
    IndexedRow(0, [0,1,2]),
    IndexedRow(1, [1,2,3]),
    IndexedRow(2, [3,4,5]),
    IndexedRow(3, [4,2,3]),
    IndexedRow(4, [2,2,5]),
    IndexedRow(5, [4,5,5])
])

# 创建IndexedRowMatrix
indexed_rows_matrix = IndexedRowMatrix(indexed_rows)

print(indexed_rows_matrix.numRows())
# >> 6

print(indexed_rows_matrix.numCols())
# >> 3
  • Координатная матрица
    • Координатная матрица может быть создана из RDD MatrixEntry.
    • Только когда размеры матрицы велики, мы используем координатную матрицу
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry

# 用MatrixEntry创建
matrix_entries = sc.parallelize([MatrixEntry(0, 5, 2), MatrixEntry(1, 1, 1), MatrixEntry(1, 5, 4)])

# 创建坐标矩阵
c_matrix = CoordinateMatrix(matrix_entries)

# 列数
print(c_matrix.numCols())
# >> 6

# 行数
print(c_matrix.numRows())
# >> 2

  • блочная матрица
    • В блочной матрице мы можем хранить разные подматрицы большой матрицы на разных машинах.
    • Нам нужно указать размеры блока. Как и в приведенном ниже примере, у нас есть 3X3, для каждого квадрата мы можем указать матрицу, указав координаты
# 导入库
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix

# 创建子矩阵块的RDD
blocks = sc.parallelize([((0, 0), Matrices.dense(3, 3, [1, 2, 1, 2, 1, 2, 1, 2, 1])),
                         ((1, 1), Matrices.dense(3, 3, [3, 4, 5, 3, 4, 5, 3, 4, 5])),
                         ((2, 0), Matrices.dense(3, 3, [1, 1, 1, 1, 1, 1, 1, 1, 1]))])

# 从子矩阵块的RDD中创建矩阵块,大小为3X3
b_matrix = BlockMatrix(blocks, 3, 3) 

#每一块的列数
print(b_matrix.colsPerBlock)
# >> 3

#每一块的行数
print(b_matrix.rowsPerBlock)
# >> 3

# 把块矩阵转换为局部矩阵
local_mat = b_matrix.toLocalMatrix()

# 打印局部矩阵
print(local_mat.toArray())
"""
>> array([[1., 2., 1., 0., 0., 0.],
          [2., 1., 2., 0., 0., 0.],
          [1., 2., 1., 0., 0., 0.],
          [0., 0., 0., 3., 3., 3.],
          [0., 0., 0., 4., 4., 4.],
          [0., 0., 0., 5., 5., 5.],
          [1., 1., 1., 0., 0., 0.],
          [1., 1., 1., 0., 0., 0.],
          [1., 1., 1., 0., 0., 0.]])
"""

конец

Мы сегодня много говорили. Spark — один из самых увлекательных языков в науке о данных, и я чувствую, что с ним как минимум знаком.

Это только начало нашего пути обучения PySpark!Я планирую рассказать больше в этой серии, включая несколько статей о различных задачах машинного обучения.

В следующей статье PySpark мы увидим, как извлекать функции, создавать конвейеры машинного обучения и строить модели.