Введение
Вот как использовать PythonХадуп (Часть 1 серии статей о распределенных вычислительных платформах).
Цель этой серии статей состоит в том, чтобы сосредоточиться на обсужденииконкретные инструменты и рецепты дляРешить повторяющиеся проблемы, с которыми сталкиваются многие эксперты по данным, например.
-
Использование PythonМобильная HDFS(распределенная файловая система Hadoop)документ.
-
Загрузить данные из HDFSв структуру данных, напримерSparkилиpandasDataFrame для вычислений.
-
будетанализРезультат **** отпишитесьHDFS.
Первым инструментом в этой серии является Spark..Фреймворк, который определяет себя как унифицированный аналитический механизм для крупномасштабной обработки данных.
Apache Spark
Установка PySpark и findspark
Я рекомендую вам использовать виртуальную среду conda__. Если вы не знаете, как настроить conda, прочтите этоСтатья.
Сначала установитеfindspark
, эта библиотека поможет вам интегрировать Spark в рабочий процесс Python, а также установитьpyspark
, если вы работаете на своем локальном компьютере, а не в правильном кластере Hadoop.
Если вы следуете этому руководству в кластере Hadoop, вы можете пропустить установку PySpark..
conda install -c conda-forge findspark -y
# optional, for local setup
conda install -c conda-forge pyspark openjdk -y
Настройка Spark с помощью findspark
После установки findspark вы можете настроить использование Spark в своем коде Python.
местныйиКод в режиме кластера, распакуйте нужные вам строки и настройте пути в соответствии с вашей конкретной инфраструктурой и версиями библиотек (пути для cloudera Spark должны быть аналогичны приведенным здесь).
import findspark
# Local Spark
findspark.init(‘/home/cloudera/miniconda3/envs/<your_environment_name>/lib/python3.7/site-packages/pyspark/’)
# Cloudera Cluster Spark
findspark.init(spark_home=’/opt/cloudera/parcels/SPARK2–2.3.0.cloudera4–1.cdh5.13.3.p0.611179/lib/spark2/’)
В этом руководстве используется виртуальная машина Cloudera Quickstart (дистрибутив Linux CentOS, имя пользователя
_cloudera_
), не забудьте настроить путь в соответствии с вашей инфраструктурой!
Создайте приложение Spark
После инициализации Spark мы должны создать приложение Spark, выполнить следующий код и убедиться, что вы **указываете нужный мастер,** как'yarn'
, в случае соответствующего кластера Hadoop, или'local[*]'
, в случае полностью локальной установки.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(‘example_app’).master(‘yarn’).getOrCreate()
Рецепты и варианты использования PySpark
Когда у нас будет Spark на работе, давайте начнем взаимодействовать с Hadoop, используя некоторые из его распространенных вариантов использования.
Получение списка баз данных Hive
Давайте получим существующую базу данных. Я предполагаю, что вы уже знакомы с API Spark DataFrame и его методами.
spark.sql("show databases").show()
У вас должно получиться что-то вроде этого.
+------------+
|databaseName|
+------------+
| db1|
| default|
| fhadoop|
+------------+
Преобразование pandas DataFrame в Spark DataFrame
Первая интеграция посвящена перемещению данных из библиотеки pandas (которая является стандартной библиотекой Python для выполнения операций с данными в памяти) в Spark.
Во-первых, давайте загрузим pandas DataFrame. Это касается качества воздуха в **Мадриде (** просто для удовлетворения вашего любопытства, но не важно для переноса данных из одного места в другое). Вы можете скачать это здесь. убедитесь, что у вас установлена библиотекаpytables
, читатьhdf5
форматировать данные.
import pandas as pd
air_quality_df = pd.read_hdf(‘data/air_quality/air-quality-madrid/madrid.h5’, key=’28079008')
air_quality_df.head()
Эти данные представляют собой временной ряд многих известных загрязняющих веществ, таких как оксиды азота, озон и другие.
Давайте внесем некоторые изменения в этот DataFrame, например сброс индекса даты и времени, чтобы избежать потери информации при загрузке в Spark. Столбцы даты также будут преобразованы в строки, поскольку у Spark есть некоторые проблемы с обработкой дат (связанные с языковым стандартом системы, часовым поясом и т. д.), если они не настроены в соответствии с вашим языковым стандартом.
air_quality_df.reset_index(inplace=True)
air_quality_df[‘date’] = air_quality_df[‘date’].dt.strftime(‘%Y-%m-%d %H:%M:%S’)
Мы можем просто использоватьcreateDataFrame
Загрузите из pandas в Spark.
air_quality_sdf = spark.createDataFrame(air_quality_df)
Как только DataFrame загружается в Spark (например,air_quality_sdf
), им можно легко управлять с помощью API PySpark DataFrame.
air_quality_sdf.select('date', 'NOx').show(5)
Вывод должен быть примерно таким.
+— — — — — — — — — -+ — — — — — — — — — +
| date| NOx|
+ — — — — — — — — — + — — — — — — — — — +
|2001–07–01 01:00:00| 1017.0|
|2001–07–01 02:00:00| 409.20001220703125|
|2001–07–01 03:00:00| 143.39999389648438|
|2001–07–01 04:00:00| 149.3000030517578|
|2001–07–01 05:00:00| 124.80000305175781|
+ — — — — — — — — — + — — — — — — — — — +
only showing top 5 rows
Создать таблицу Hive из Spark DataFrame
Чтобы сохранить Spark DataFrame в HDFS, где его можно запрашивать с помощью ядра Hadoop SQL по умолчанию (Hive), простой стратегией (не единственной стратегией) является создание временного представления из этого DataFrame.
air_quality_sdf.createOrReplaceTempView("air_quality_sdf")
После создания представления времени его можно использовать в движке Spark SQL.create table as select
создать настоящую таблицу. Перед созданием этой таблицы я создам таблицу с именемanalytics
новую базу данных для его хранения.
sql_create_database = """
create database if not exists analytics
location '/user/cloudera/analytics/'
"""
result_create_db = spark.sql(sql_create_database)
Затем мы можем создать новую таблицу там.
sql_create_table = """
create table if not exists analytics.pandas_spark_hive
using parquet as select
to_timestamp(date) as date_parsed, *
from air_quality_sdf
"""
result_create_table = spark.sql(sql_create_table)
Чтение данных из таблицы Hive с помощью PySpark
После того, как мы создали нашу таблицу Hive, результаты можно проверить с помощью механизма Spark SQL, загрузив результаты обратно, например, выбрав изменение концентрации загрязнителя озона с течением времени.
spark.sql("select * from analytics.pandas_spark_hive") \ .select("date_parsed", "O_3").show(5)
Вывод :
+ — — — — — — — — — + — — — — — — — — — +
| date_parsed | O_3|
+ — — — — — — — — — + — — — — — — — — — +
|2001–07–01 01:00:00| 9.010000228881836|
|2001–07–01 02:00:00| 23.81999969482422|
|2001–07–01 03:00:00| 31.059999465942383|
|2001–07–01 04:00:00| 23.780000686645508|
|2001–07–01 05:00:00| 29.530000686645508|
+ — — — — — — — — — + — — — — — — — — — +
only showing top 5 rows
Надеюсь, вам понравится этот пост. В течение следующих нескольких недель мы опубликуем серию статей о других инструментах, которые вы можете использовать для освоения Hadoop с Python.