Hadoop с Python, часть первая. Простенький "PySpark

Большие данные
Hadoop с Python, часть первая. Простенький "PySpark

Введение

Вот как использовать PythonХадуп (Часть 1 серии статей о распределенных вычислительных платформах).

Цель этой серии статей состоит в том, чтобы сосредоточиться на обсужденииконкретные инструменты и рецепты дляРешить повторяющиеся проблемы, с которыми сталкиваются многие эксперты по данным, например.

  • Использование PythonМобильная HDFS(распределенная файловая система Hadoop)документ.

  • Загрузить данные из HDFSв структуру данных, напримерSparkилиpandasDataFrame для вычислений.

  • будетанализРезультат **** отпишитесьHDFS.

Первым инструментом в этой серии является Spark..Фреймворк, который определяет себя как унифицированный аналитический механизм для крупномасштабной обработки данных.

Apache Spark

Apache Spark

Установка PySpark и findspark

Я рекомендую вам использовать виртуальную среду conda__. Если вы не знаете, как настроить conda, прочтите этоСтатья.

Сначала установитеfindspark, эта библиотека поможет вам интегрировать Spark в рабочий процесс Python, а также установитьpyspark, если вы работаете на своем локальном компьютере, а не в правильном кластере Hadoop.

Если вы следуете этому руководству в кластере Hadoop, вы можете пропустить установку PySpark..

conda install -c conda-forge findspark -y
# optional, for local setup
conda install -c conda-forge pyspark openjdk -y

Настройка Spark с помощью findspark

После установки findspark вы можете настроить использование Spark в своем коде Python.

местныйиКод в режиме кластера, распакуйте нужные вам строки и настройте пути в соответствии с вашей конкретной инфраструктурой и версиями библиотек (пути для cloudera Spark должны быть аналогичны приведенным здесь).

import findspark
# Local Spark
findspark.init(‘/home/cloudera/miniconda3/envs/<your_environment_name>/lib/python3.7/site-packages/pyspark/’)
# Cloudera Cluster Spark
findspark.init(spark_home=’/opt/cloudera/parcels/SPARK2–2.3.0.cloudera4–1.cdh5.13.3.p0.611179/lib/spark2/’)

В этом руководстве используется виртуальная машина Cloudera Quickstart (дистрибутив Linux CentOS, имя пользователя_cloudera_), не забудьте настроить путь в соответствии с вашей инфраструктурой!

Создайте приложение Spark

После инициализации Spark мы должны создать приложение Spark, выполнить следующий код и убедиться, что вы **указываете нужный мастер,** как'yarn', в случае соответствующего кластера Hadoop, или'local[*]', в случае полностью локальной установки.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(‘example_app’).master(‘yarn’).getOrCreate()

Рецепты и варианты использования PySpark

Когда у нас будет Spark на работе, давайте начнем взаимодействовать с Hadoop, используя некоторые из его распространенных вариантов использования.

Получение списка баз данных Hive

Давайте получим существующую базу данных. Я предполагаю, что вы уже знакомы с API Spark DataFrame и его методами.

spark.sql("show databases").show()

У вас должно получиться что-то вроде этого.

+------------+ 
|databaseName| 
+------------+ 
|         db1| 
|     default| 
|     fhadoop| 
+------------+

Преобразование pandas DataFrame в Spark DataFrame

Первая интеграция посвящена перемещению данных из библиотеки pandas (которая является стандартной библиотекой Python для выполнения операций с данными в памяти) в Spark.

Во-первых, давайте загрузим pandas DataFrame. Это касается качества воздуха в **Мадриде (** просто для удовлетворения вашего любопытства, но не важно для переноса данных из одного места в другое). Вы можете скачать это здесь. убедитесь, что у вас установлена ​​библиотекаpytables, читатьhdf5форматировать данные.

import pandas as pd
air_quality_df = pd.read_hdf(‘data/air_quality/air-quality-madrid/madrid.h5’, key=’28079008') 
air_quality_df.head()

Эти данные представляют собой временной ряд многих известных загрязняющих веществ, таких как оксиды азота, озон и другие.

1_QerdAmxGouvRd-H4eaOD0g.png

Давайте внесем некоторые изменения в этот DataFrame, например сброс индекса даты и времени, чтобы избежать потери информации при загрузке в Spark. Столбцы даты также будут преобразованы в строки, поскольку у Spark есть некоторые проблемы с обработкой дат (связанные с языковым стандартом системы, часовым поясом и т. д.), если они не настроены в соответствии с вашим языковым стандартом.

air_quality_df.reset_index(inplace=True) 
air_quality_df[‘date’] = air_quality_df[‘date’].dt.strftime(‘%Y-%m-%d %H:%M:%S’)

Мы можем просто использоватьcreateDataFrameЗагрузите из pandas в Spark.

air_quality_sdf = spark.createDataFrame(air_quality_df)

Как только DataFrame загружается в Spark (например,air_quality_sdf), им можно легко управлять с помощью API PySpark DataFrame.

air_quality_sdf.select('date', 'NOx').show(5)

Вывод должен быть примерно таким.

+— — — — — — — — — -+ — — — — — — — — — + 
|               date|                NOx| 
+ — — — — — — — — — + — — — — — — — — — + 
|2001–07–01 01:00:00|             1017.0| 
|2001–07–01 02:00:00| 409.20001220703125| 
|2001–07–01 03:00:00| 143.39999389648438| 
|2001–07–01 04:00:00|  149.3000030517578| 
|2001–07–01 05:00:00| 124.80000305175781| 
+ — — — — — — — — — + — — — — — — — — — + 
only showing top 5 rows

Создать таблицу Hive из Spark DataFrame

Чтобы сохранить Spark DataFrame в HDFS, где его можно запрашивать с помощью ядра Hadoop SQL по умолчанию (Hive), простой стратегией (не единственной стратегией) является создание временного представления из этого DataFrame.

air_quality_sdf.createOrReplaceTempView("air_quality_sdf")

После создания представления времени его можно использовать в движке Spark SQL.create table as selectсоздать настоящую таблицу. Перед созданием этой таблицы я создам таблицу с именемanalyticsновую базу данных для его хранения.

sql_create_database = """
create database if not exists analytics
location '/user/cloudera/analytics/'
"""
result_create_db = spark.sql(sql_create_database)

Затем мы можем создать новую таблицу там.

sql_create_table = """ 
create table if not exists analytics.pandas_spark_hive 
using parquet as select 
to_timestamp(date) as date_parsed, * 
from air_quality_sdf
"""
result_create_table = spark.sql(sql_create_table)

Чтение данных из таблицы Hive с помощью PySpark

После того, как мы создали нашу таблицу Hive, результаты можно проверить с помощью механизма Spark SQL, загрузив результаты обратно, например, выбрав изменение концентрации загрязнителя озона с течением времени.

spark.sql("select * from analytics.pandas_spark_hive") \ .select("date_parsed", "O_3").show(5)

Вывод :

+ — — — — — — — — — + — — — — — — — — — + 
| date_parsed       |                O_3| 
+ — — — — — — — — — + — — — — — — — — — + 
|2001–07–01 01:00:00|  9.010000228881836| 
|2001–07–01 02:00:00|  23.81999969482422| 
|2001–07–01 03:00:00| 31.059999465942383| 
|2001–07–01 04:00:00| 23.780000686645508| 
|2001–07–01 05:00:00| 29.530000686645508| 
+ — — — — — — — — — + — — — — — — — — — + 
only showing top 5 rows

Надеюсь, вам понравится этот пост. В течение следующих нескольких недель мы опубликуем серию статей о других инструментах, которые вы можете использовать для освоения Hadoop с Python.