предисловие
Подробнее см.:Полное руководство Spark Полное руководство План изучения Spark
Глава 6: Обработка различных типов данных
В этой главе рассказывается, как использовать методы, связанные с DataFrame, для работы с различными типами данных, в частности: логическими, числовыми, строковыми, датой и временем, нулевыми значениями, сложными массивами, картами, типами структур, определяемыми пользователем функциями (UDF).
Где найти правильный путь
Метод DataFrame (или DataSet), потому что DataFrame — это DataSet типа Row, так что в итоге это все равно метод DataSet, где его найти? Только официальный сайт,ссылка здесь
DataSet имеет множество подмодулей, таких как DataFrameStatFunctions, содержащие различные функции, связанные со статистикой, DataFrameNaFunctions, работающие с нулевыми данными (null).
Методы, связанные со столбцами, находятся здесь:ссылка здесь
Есть также некоторые методы, связанные с SQL:ссылка здесь
Обработка логических данных
На этот раз используется файл данныхdata/retail-data/by-day/2010-12-01.csv
scala> val df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("data/2010-12-01.csv")
df: org.apache.spark.sql.DataFrame = [InvoiceNo: string, StockCode: string ... 6 more fields]
scala> df.printSchema
root
|-- InvoiceNo: string (nullable = true)
|-- StockCode: string (nullable = true)
|-- Description: string (nullable = true)
|-- Quantity: integer (nullable = true)
|-- InvoiceDate: timestamp (nullable = true)
|-- UnitPrice: double (nullable = true)
|-- CustomerID: double (nullable = true)
|-- Country: string (nullable = true)
По сути, говорить не о чем, когда речь идет о булевых типах, это не что иное, как истинное, ложное, логическое сравнение (равно, не равно, больше или меньше и т. д.) и операторы или не. приложения в искре следующие:
# 等于
df.where(col("InvoiceNo").equalTo(536365)).show()
df.where(expr("InvoiceNo=536365")).show()
df.where("InvoiceNo=536365").show()
df.where(col("InvoiceNo")===536365).show()
# 不等于
df.where(not(col("InvoiceNo").equalTo(536365))).show()
df.where(!col("InvoiceNo").equalTo(536365)).show()
df.where(col("InvoiceNo")=!=536365).show()
df.where(expr("InvoiceNo!=536365")).show()
df.where("InvoiceNo!=536365").show()
# scala和python中还可以
df.where("InvoiceNo <> 536365").show()
И (и) или (или) не (не) проблема, как упоминалось ранее, последовательные фильтры (один за другим) и соединение также будут преобразованы искрой в один оператор для одновременного выполнения этих фильтров, и или соединение должно быть написано в том же заявлении, а не для того, чтобы свести на нет приведенный выше код
val priceFilter = col("UnitPrice") > 600
val descripFilter = col("Description").contains("POSTAGE")
df.where(col("StockCode").isin("DOT")).where(priceFilter.or(descripFilter)).show()
Логические выражения также можно использовать в других местах, например, при добавлении столбцов.
val DOTCodeFilter = col("StockCode") === "DOT"
val priceFilter = col("UnitPrice") > 600
val descripFilter = col("Description").contains("POSTAGE")
df.withColumn("isExpensive", DOTCodeFilter.and(priceFilter.or(descripFilter)))
.as("isExpensive") #重命名这里没必要
.select("unitPrice", "isExpensive").show(5)
df.withColumn("isExpensive",filter.and(price.or(descript))).where("isExpensive=true").show()
Лучше всего использовать этот метод, если сравниваемые поля пусты (null)eqNullSafe
scala> df.where(col("Description").equalTo("LOVE BUILDING BLOCK WORD")).show
scala> df.where(col("Description").eqNullSafe("LOVE BUILDING BLOCK WORD")).show
Дополнительные записи
- Как избавиться от веса?
df.distinct() #整体去重 df.dropDuplicates("InvoiceNo","InvoiceDate") #根据某些列去重
- Как узнать, пусто ли оно (null)?
# 具体就是isNull、isNotNull、isNaN(这个也不能叫空) df.where(col("Description").isNull).show
- Разница между NaN и NULL? null — это нулевое значение, а nan — это «не число», результат бессмысленной математической операции, например 0/0. Создание нана, как в искровой банке
float("nan")
Работа с числовыми данными
Это обычные операции сложения, вычитания, умножения и деления, а затем некоторые функции, такие как pow. Здесь также упоминаются две функции: одна — округление, а другая — коэффициент корреляции Пирсона для расчета корреляции.
Операция round() округляет. Операция round() округляет десятичные дроби в меньшую сторону.
# 一个是3.0,一个是2.0 df.select(round(lit("2.5")), bround(lit("2.5"))).show(2)
Обработка строковых данных
Это обычные строковые операции, такие как преобразование регистра, удаление начальных и конечных пробелов, разделение, получение подстрок и т. д., см.Ссылка на сайтвнизString functions
Обработка данных о дате и времени
Открыть поиск по ссылке:Date time functions
Обработка нулевых данных
Возвращаясь к основам, каковы методы работы с нулевыми данными в DataFrame в пандах, такие как fillna, dropna, isNull, isNaN и т. д., и есть соответствующие методы в spark sql, в подпакете na DataFrame (df.na._
) и чтоsql.functions._
Вниз.
Ранее упоминались методы isNull (isNaN) и isNotNull, а также несколько других методов для определения того, пусто ли оно.Методы оценки null в SQLметоды ifnull, nullif, nvl, nvl2
-
ifnull(expr1, expr2)
иnvl(expr1, expr2)
, если expr1 равно null, вернуть expr2, иначе вернуть expr1 -
nullif(expr1, expr2)
, если expr1 равно expr2, вернуть null, иначе вернуть expr1 -
nvl2(expr1, expr2, expr3)
, если expr1 равно null, вернуть expr3, иначе вернуть expr2
Затем drop удаляет строки, содержащие нули, fill заполняет один или несколько столбцов,Ссылка на документацию здесь
# 默认删除任何值为null的行
df.na.drop() # df.na.drop("any")
df.na.drop("all") # 所有列都为null才删除
df.na.drop("all", Seq("col1","col2")) # 也可以指定特定的列
# drop也可以删除像这种低于10的
df.na.drop(10,Seq("col1","col2")) # col1、col2中值小于10的(非)
# 可以指定对于什么类型的类填充什么值
df.na.fill(5:Integer)
df.na.fill(5:Double)
# 也可以针对特定的列填充特定的值
df.na.fill(5,Seq("col1","col2")) # 当然col1是Integer类型的
df.na.fill(Map("col1"->5,"col2"->"null")) # col1填充5,col2填充"null"
replace также может выполнять функцию заполнения нулевого значения, напримерdf.na.replace(Seq("col1","col2"),Map(""->"UNKNOWN"))
, что больше касается замены старых значений новыми, а не нулями
Существует также сортировка, упомянутая в главе 5, независимо от того, появляются ли нулевые данные до или после asc_nulls_first, desc_nulls_first и т. д.
Вот статья:Dealing with null in Spark
Обработка сложных типов данных
Я чувствую, что эту часть абсолютно необходимо освоить.Когда я впервые столкнулся с такого рода данными, мне потребовалось много времени, чтобы проверить данные. Этой части книги недостаточно, в ней рассказывается только об обработке запросов, позвольте мне добавить.
Методы обработки структур
Эта структура данных аналогична структуре языка C, которая может содержать различные типы данных. Или используйте приведенные выше данные, сначала создайте DataFrame, содержащую структуру
scala> val complexDF = df.selectExpr("struct(Description,InvoiceNo) as complex","Description","InvoiceNo")
scala> complexDF.printSchema
root
|-- complex: struct (nullable = false)
| |-- Description: string (nullable = true)
| |-- InvoiceNo: string (nullable = true)
|-- Description: string (nullable = true)
|-- InvoiceNo: string (nullable = true)
содержащие сложные типы данныхcomplexDF
Используется так же, как и предыдущий DataFrame, отличие заключается в том, как получить данные поля в структуре комплекса, существуют следующие способы:
complexDF.select(col("complex").getField("Description")).show(5,false) # getField方法/getItem方法也OK,二者有区别的
complexDF.select("complex.Description").show(5,false) # 或者直接dot [`.`],全选的话是`.*`
# sql
complexDF.createOrReplaceTempView("complex_df")
spark.sql("select complex.* from complex_df").show(5,false)
spark.sql("select complex.Description from complex_df").show(5,false)
Методы работы с массивами
В качестве имени массива, как и одного из свойств массивов, он может содержать данные только одного типа.Сначала создайте DataFrame, который содержит поля типа Array.Здесь в книге упоминается строка.split
Метод, который делит строку на второй обычный параметр, возвращает Column типа Array
def split(str: Column, pattern: String): Column
, Splits str around pattern (pattern is a regular expression).
# scala
scala> import org.apache.spark.sql.functions.split
import org.apache.spark.sql.functions.split
# 将Description通过空格分割
scala> df.select(split(col("Description")," ")).printSchema
root
|-- split(Description, ): array (nullable = true)
| |-- element: string (containsNull = true)
scala> df.select(split(col("Description")," ")).show(2)
+---------------------+
|split(Description, )|
+---------------------+
| [WHITE, HANGING, ...|
| [WHITE, METAL, LA...|
+---------------------+
# SQL做法,SELECT split(Description, ' ') FROM dfTable
Spark может преобразовывать такие сложные типы данных в другой столбец и запрашивать массив аналогично тому, как Python манипулирует массивами.
scala> df.select(split(col("Description")," ").alias("array_col")).select(expr("array_col[0]")).show(2)
+------------+
|array_col[0]|
+------------+
| WHITE|
| WHITE|
+------------+
# sql写法,SELECT split(Description, ' ')[0] FROM dfTable
# 当然还可以用getItem
scala> df.select(split(col("Description")," ").alias("array_col")).select(col("array_col").getItem(0)).show(2)
Чтобы получить длину массива, вы можете использоватьsize
метод (также подходит для карты)
def size(e: Column): Column
, Returns length of array or map.
scala> import org.apache.spark.sql.functions.size
import org.apache.spark.sql.functions.size
# 我这里Column是用$方式写的
scala> df.select(split($"Description", " ").alias("array_col")).withColumn("no_of_array",size($"array_col")).show(2,false)
+----------------------------------------+-----------+
|array_col |no_of_array|
+----------------------------------------+-----------+
|[WHITE, HANGING, HEART, T-LIGHT, HOLDER]|5 |
|[WHITE, METAL, LANTERN] |3 |
+----------------------------------------+-----------+
Проверить, содержит ли массив элементarray_contains
метод
def array_contains(column: Column, value: Any): Column
, Returns null if the array is null, true if the array contains value, and false otherwise.
Он в основном используется для оценки того, где условия
scala> import org.apache.spark.sql.functions.array_contains
import org.apache.spark.sql.functions.array_contains
scala> df.select(split(col("Description"), " ").alias("array_col")).withColumn("contains_WHITE",array_contains($"array_col","WHITE")).show(5,false)
+------------------------------------------+--------------+
|array_col |contains_WHITE|
+------------------------------------------+--------------+
|[WHITE, HANGING, HEART, T-LIGHT, HOLDER] |true |
|[WHITE, METAL, LANTERN] |true |
|[CREAM, CUPID, HEARTS, COAT, HANGER] |false |
|[KNITTED, UNION, FLAG, HOT, WATER, BOTTLE]|false |
|[RED, WOOLLY, HOTTIE, WHITE, HEART.] |true |
+------------------------------------------+--------------+
# sql中一样的
scala> val df1 = df.select(split(col("Description"), " ").alias("array_col"))
df1: org.apache.spark.sql.DataFrame = [array_col: array<string>]
scala> df1.createOrReplaceTempView("array_df")
scala> spark.sql("select *, array_contains(array_col,'WHITE') from array_df").show(5,false)
+------------------------------------------+--------------------------------+
|array_col |array_contains(array_col, WHITE)|
+------------------------------------------+--------------------------------+
|[WHITE, HANGING, HEART, T-LIGHT, HOLDER] |true |
|[WHITE, METAL, LANTERN] |true |
|[CREAM, CUPID, HEARTS, COAT, HANGER] |false |
|[KNITTED, UNION, FLAG, HOT, WATER, BOTTLE]|false |
|[RED, WOOLLY, HOTTIE, WHITE, HEART.] |true |
+------------------------------------------+--------------------------------+
# 多还是用来作为where条件的判断,这里随便举个例子
val df2 = df.select(split(col("Description"), " ").alias("array_col")).withColumn("item",$"array_col".getItem(0))
# 第二个参数也能传Column,判断是否包含对应位置的元素
df2.where("array_contains(array_col,item)").show(2) # 这样写实际是expr
df2.where(array_contains($"array_col",$"item")).show(2)
Стоит отметить, что метод записи Column в SQL,не ставить кавычки, с кавычками, он обрабатывается как строка, и его легко забыть, когда он написан.
также можно использоватьexplode
Метод преобразует сложные типы данных в набор строк (то есть каждый элемент в массиве/карте расширяется, чтобы сформировать новый столбец, соответствующий другим столбцам), как показано ниже.
def explode(e: Column): Column
, Creates a new row for each element in the given array or map column.
scala> import org.apache.spark.sql.functions.explode
scala> df.withColumn("splitted", split(col("Description"), " "))
.withColumn("exploded", explode(col("splitted")))
.select("Description", "InvoiceNo", "exploded").show(2)
+--------------------+---------+--------+
| Description|InvoiceNo|exploded|
+--------------------+---------+--------+
|WHITE HANGING HEA...| 536365| WHITE|
|WHITE HANGING HEA...| 536365| HANGING|
+--------------------+---------+--------+
# 我这里写了个简单点的
scala> val df4 = Seq((Seq(1,1,2),2),(Seq(1,2,3),3)).toDF("item","id")
df4: org.apache.spark.sql.DataFrame = [item: array<int>, id: int]
scala> df4.printSchema
root
|-- item: array (nullable = true)
| |-- element: integer (containsNull = false)
|-- id: integer (nullable = false)
scala> df4.show()
+---------+---+
| item| id|
+---------+---+
|[1, 1, 2]| 2|
|[1, 2, 3]| 3|
+---------+---+
# 就是展开了Array,然后对应其他列构成新的列
scala> df4.withColumn("exploded",explode($"item")).show
+---------+---+--------+
| item| id|exploded|
+---------+---+--------+
|[1, 1, 2]| 2| 1|
|[1, 1, 2]| 2| 1|
|[1, 1, 2]| 2| 2|
|[1, 2, 3]| 3| 1|
|[1, 2, 3]| 3| 2|
|[1, 2, 3]| 3| 3|
+---------+---+--------+
-
explode_outer
,такой жеexplode
, но когда массив или карта пусты или нулевые, они расширятся до нулевых -
arrays_overlap(a1,a2)
- Возвращает true, если массив a1 содержит хотя бы один непустой элемент массива a2.
- Возвращает null, если какой-либо массив содержит null
spark.sql("select arrays_overlap(array(1,2,3),array(3,4,5))").show
true
spark.sql("select arrays_overlap(array(1,2,3),array(4,5))").show
false
spark.sql("select arrays_overlap(array(1,2,3),array(4,5,null))").show
null
-
arrays_zip(array<T>, array<U>, ...):array<struct<T, U, ...>>
- Объединить n массивов в массив структур
- N-я структура (struct) содержит n-е значение всех входных массивов или null, если нет
scala> val df = spark.sql("select arrays_zip(array(1,2,3),array('4','5')) as array_zip")
scala> df.printSchema
root
|-- array_zip: array (nullable = false)
| |-- element: struct (containsNull = false)
| | |-- 0: integer (nullable = true)
| | |-- 1: string (nullable = true)
scala> df.select(col("array_zip").getItem(0)).show
+------------+
|array_zip[0]|
+------------+
| [1, 4]|
+------------+
-
element_at(array<T>, Int):T
иelement_at(map<K, V>, K):V
- Также подходит для карты, возвращает значение, соответствующее ключу, или ноль, если ключ не включен
scala> spark.sql("select element_at(array(1,2,3),-1)").show
+------------------------------+
|element_at(array(1, 2, 3), -1)|
+------------------------------+
| 3|
+------------------------------+
scala> spark.sql("select element_at(array(1,2,3),4)").show
+-----------------------------+
|element_at(array(1, 2, 3), 4)|
+-----------------------------+
| null|
+-----------------------------+
scala> spark.sql("select element_at(array(1,2,3),0)").show
java.lang.ArrayIndexOutOfBoundsException: SQL array indices start at 1
Есть также некоторые методы, применимые к Array, которые не подходят для скриншотов, перечисленные здесь:
-
reverse(e: Column): Column
, переворачивает строку или элементы массива- Примечание. Перевернутые строки типа «abc def» означают «fed cba».
-
flatten(array<array<T>>): array<T>
, преобразовать вложенный массив в массив, но если уровень структуры вложенного массива превышает 2, удаляется только один уровень вложенности
spark.sql("select flatten(array(array(1,2),array(3,4)))").show
[1, 2, 3, 4]
spark.sql("select flatten(array(array(array(1,2),array(3,4)),array(array(5,6))))").show(false)
[[1, 2], [3, 4], [5, 6]]
-
shuffle(e: Column): Column
, перемешать массив случайным образом -
slice(x: Column, start: Int, length: Int): Column
, то есть перехватывать массив аналогично python, но здесь перехватывать массив x из индекса start для перехвата массива элементов длины и возвращать его- еслиначало отрицательное число,ноОбрезать назад с конца, вроде не понятно объяснил, см. пример
- индекс начинается с 1
scala> spark.sql("select slice(array(1,2,3),1,2)").show
+---------------------------+
|slice(array(1, 2, 3), 1, 2)|
+---------------------------+
| [1, 2]|
+---------------------------+
scala> spark.sql("select slice(array(1,2,3),-2,2)").show # slice(array(1,2,3),-2,3)也是返回这个,length超过数组长也只是返回xxx。。。,就这个意思,我叙述不清
+----------------------------+
|slice(array(1, 2, 3), -2, 2)|
+----------------------------+
| [2, 3]|
+----------------------------+
-
sort_array(e: Column, asc: Boolean): Column
, что также является сортировкой массива, отличной от сортировки на рисунке выше.Можно указать порядок убывания
Методы работы с картами
Карта представляет собой данные в формате пары ключ-значение, spark sql предоставляетmap
Метод может преобразовать два столбца в столбцы карты,ключ не может быть нулевым, значение может быть
scala> df.select(map(col("Description"),col("InvoiceNo")).alias("complex_map")).show(2,false)
+----------------------------------------------+
|complex_map |
+----------------------------------------------+
|[WHITE HANGING HEART T-LIGHT HOLDER -> 536365]|
|[WHITE METAL LANTERN -> 536365] |
+----------------------------------------------+
# SQL写法,SELECT map(Description, InvoiceNo) as complex_map FROM dfTable
WHERE Description IS NOT NULL
Можно запрашивать как словарь в python
scala> val df1 = df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map"))
scala> df1.printSchema
root
|-- complex_map: map (nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)
scala> df1.select(expr("complex_map['WHITE METAL LANTERN']")).show(2)
+--------------------------------+
|complex_map[WHITE METAL LANTERN]|
+--------------------------------+
| null|
| 536365|
+--------------------------------+
упоминалось ранееexplode
Метод работает на карте
scala> df1.select($"complex_map",explode($"complex_map")).show(5,false)
+-----------------------------------------------+-----------------------------------+------+
|complex_map |key |value |
+-----------------------------------------------+-----------------------------------+------+
|[WHITE HANGING HEART T-LIGHT HOLDER -> 536365] |WHITE HANGING HEART T-LIGHT HOLDER |536365|
|[WHITE METAL LANTERN -> 536365] |WHITE METAL LANTERN |536365|
|[CREAM CUPID HEARTS COAT HANGER -> 536365] |CREAM CUPID HEARTS COAT HANGER |536365|
|[KNITTED UNION FLAG HOT WATER BOTTLE -> 536365]|KNITTED UNION FLAG HOT WATER BOTTLE|536365|
|[RED WOOLLY HOTTIE WHITE HEART. -> 536365] |RED WOOLLY HOTTIE WHITE HEART. |536365|
+-----------------------------------------------+-----------------------------------+------+
-
map_form_arrays(array<K>, array<V>): map<K, V>
, который объединяет заданные массивы в Map,Массив ключей не должен содержать null
-
map_from_entries(array<struct<K, V>>): map<K, V>
, который возвращает Map из заданного массива структур -
map_concat(map<K, V>, ...): map<K, V>
, который возвращает объединение нескольких Карт -
map_keys/values
, возвращает соответствующий ключ/значение столбца Map в виде массива - Также, как упоминалось выше
element_at
scala> val df2 = spark.sql("SELECT map(1, 'a', 2, 'b') as aMap, map(2, 'c', 3, 'd') as bMap")
scala> df2.printSchema # 进一步说明key不能为null
root
|-- aMap: map (nullable = false)
| |-- key: integer
| |-- value: string (valueContainsNull = false)
|-- bMap: map (nullable = false)
| |-- key: integer
| |-- value: string (valueContainsNull = false)
scala> df2.select(map_concat($"aMap",$"bMap")).show(false)
+--------------------------------+
|map_concat(aMap, bMap) |
+--------------------------------+
|[1 -> a, 2 -> b, 2 -> c, 3 -> d]|
+--------------------------------+
# keys
scala> df2.select(map_keys($"aMap")).show
+--------------+
|map_keys(aMap)|
+--------------+
| [1, 2]|
+--------------+
# values
scala> df2.select(map_values($"aMap")).show
+----------------+
|map_values(aMap)|
+----------------+
| [a, b]|
+----------------+
# map_keys($"aMap")(0)返回的是1
scala> df2.select(element_at($"aMap",map_keys($"aMap")(0))).show
+-----------------------------------+
|element_at(aMap, map_keys(aMap)[0])|
+-----------------------------------+
| a|
+-----------------------------------+
Способы работы с JSON
Данные в формате JSON очень распространены, Spark также предоставляет ряд методов для разбора или извлечения объектов JSON, но важно знать, что данные в этом формате хранятся в виде строк, типа JSON нет.
-
get_json_object(e: Column, path: String): Column
, извлечь объект json из строки json в соответствии с заданным путем json- e также является строкой в формате json,
spark.sql("""select get_json_object('{"key1":{"key2":[1,2,3]}}','$.key1.key2')""")
, просто пойми
- e также является строкой в формате json,
-
json_tuple(json: Column, fields: String*): Column
, если строка json имеет только один уровень, вы можете использовать этот метод для извлечения объекта json. -
from_json
, проанализируйте столбец столбца строки json в соответствующий столбец в соответствии с заданной схемой -
to_json
, преобразовать несколько столбцов в столбцы строк json
Сначала создайте df, содержащий столбец строки типа json.
# spark.range(1)是为了创建一个df
# 直接spark.sql("""select '{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""") 也是OK的
scala> val jsonDF = spark.range(1).selectExpr("""'{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""")
jsonDF: org.apache.spark.sql.DataFrame = [jsonString: string]
# jsonString是string类型
scala> jsonDF.show(false)
+-------------------------------------------+
|jsonString |
+-------------------------------------------+
|{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}|
+-------------------------------------------+
Посмотрите на использование get_json_object и json_tuple.
scala> jsonDF.select(get_json_object($"jsonString","$.myJSONKey")).show(false)
# 输出{"myJSONValue":[1,2,3]}
scala> jsonDF.select(get_json_object($"jsonString","$.myJSONKey.myJSONValue")).show(false)
# 输出[1,2,3] ,还是字符串,不是什么Array
scala> jsonDF.select(get_json_object($"jsonString","$.myJSONKey.myJSONValue[0]")).show(false)
# 输出1
scala> jsonDF.select(json_tuple($"jsonString","myJSONKey")).show
# 输出{"myJSONValue":[1,2,3]}
# 无法解析更深的层次,即提不出myJSONValue对应的
# 但json_tuple可以同时提取多个json对象出来
# 这里再创建一个
scala> val test = spark.sql("""select '{"key" : "value","key2" : "value2"}' as jsonString""")
scala> test.select(json_tuple($"jsonString","key","key2")).show
+-----+------+
| c0| c1|
+-----+------+
|value|value2|
+-----+------+
Затем посмотрите на методы from_json и to_json. Эти два метода имеют несколько перегрузок. Выберите правильный.
# 创建一个df,json_col对应的就是json字符串
scala> val df = Seq (
(0, """{"device_id": 0, "device_type": "sensor-ipad", "ip": "68.161.225.1", "cn": "United States", "timestamp" :1475600496 }"""),
(1, """{"device_id": 1, "device_type": "sensor-igauge", "ip": "213.161.254.1", "cn": "Norway", "timestamp" :1475600498 }""")).toDF("id","json_col")
df: org.apache.spark.sql.DataFrame = [id: int, json_col: string]
# 对应创建一个schema,可以mySchema.treeString查看
scala> val mySchema = new StructType().add("device_id",IntegerType).add("device_type",StringType).add("ip",StringType).add("cn",StringType).add("timestamp",TimestampType)
# from_json简单使用,会解析成一个Struct类型的列col(数据类型一样的话也可以是Array类型)
# 可以查看col的Schema,所以可以根据col.*查询全部,也可以col.属性查询特定属性
scala> df.select(from_json($"json_col",mySchema) as "col").select(expr("col.*")).show
+---------+-------------+-------------+-------------+-------------------+
|device_id| device_type| ip| cn| timestamp|
+---------+-------------+-------------+-------------+-------------------+
| 0| sensor-ipad| 68.161.225.1|United States|2016-10-05 01:01:36|
| 1|sensor-igauge|213.161.254.1| Norway|2016-10-05 01:01:38|
+---------+-------------+-------------+-------------+-------------------+
scala> df.select(from_json($"json_col",mySchema) as "col").select($"col.*").where($"col.cn"==="Norway").show
+---------+-------------+-------------+------+-------------------+
|device_id| device_type| ip| cn| timestamp|
+---------+-------------+-------------+------+-------------------+
| 1|sensor-igauge|213.161.254.1|Norway|2016-10-05 01:01:38|
+---------+-------------+-------------+------+-------------------+
Как видно из документацииto_json
Он заключается в преобразовании столбца, содержащего StructType, ArrayType или MapType, в строковый столбец JSON с указанным режимом (представленным в типе), поэтому сначала инкапсулируйте столбец, который будет преобразован в формат StructType, ArrayType или MapType.
# to_json 简单使用
scala> val df1 = df.select(from_json($"json_col",mySchema) as "col").select($"col.*")
# df1.printSchema
# 再把device_id、ip、timestamp 三列转为json字符串列
# 如果是所有列的化,这样写struct($"*")
scala> df1.select(to_json(struct($"device_id",$"ip",$"timestamp")).alias("json_col")).show(false)
+--------------------------------------------------------------------------------+
|json_col |
+--------------------------------------------------------------------------------+
|{"device_id":0,"ip":"68.161.225.1","timestamp":"2016-10-05T01:01:36.000+08:00"} |
|{"device_id":1,"ip":"213.161.254.1","timestamp":"2016-10-05T01:01:38.000+08:00"}|
+--------------------------------------------------------------------------------+
Использование пользовательских функций (UDF)
Одной из самых сильных возможностей Spark является определение собственных функций (UDF), которые позволяют выполнять необходимые операции преобразования с помощью Scala, Python или с использованием внешних библиотек (библиотек). Пользовательские функции могут вводить и возвращать один или несколько столбцов. Во-вторых, сила Spark UDF заключается в том, что вы можете писать их на многих разных языках программирования, но вам не нужно создавать их в эзотерических форматах или предметно-ориентированных языках, они просто работают, записывают данные. По умолчанию эти определяемые пользователем функции регистрируются как временные функции и используются в определенных SparkSession и Context, то есть создаются и используются по запросу.
Хотя вы можете писать UDF на Scala, Python или Java, есть некоторые последствия для производительности, о которых вы должны знать. Чтобы проиллюстрировать это, следующее прямо расскажет вам, что происходит, когда вы создаете пользовательскую функцию, а затем выполняете код в Spark, используя созданную пользовательскую функцию.
Первая — это фактическая функция, которая создаст простую функцию, решающую куб числа.power3
val df = spark.range(5)
def power3(number:Double):Double = number*number*number
power3
Также существует требование, чтобы вы не могли ввести нулевое значение
Хорошо, теперь, чтобы протестировать эту функцию, нам нужно зарегистрировать их в Spark, чтобы мы могли использовать их на всех рабочих машинах. Spark сериализует функцию в драйвере Driver и распространяет ее на все процессы Executor в сети. Конечно, это не имеет никакого отношения к языку.
При использовании этой функции также возможны две разные ситуации. Если эта функция написана на Scala, Java, вы можете использовать ее в JVM. Это означает, что у вас будет небольшая потеря производительности, за исключением того, что вы не сможете воспользоваться возможностями генерации кода Spark для встроенных функций. Но когда вы создаете или используете большое количество объектов, могут возникнуть проблемы с производительностью, которые будут оптимизированы в главе 19. Если бы эта функция была написана на Python, разница была бы. Spark запустит процесс Python в Worker, затем сериализует все данные в формате, который понимает Python (ранее данные были в JVM), затем выполнит функцию для данных построчно с помощью процесса Python и, наконец, вернет все данные. строки Результат выполнения отдается JVM и Spark. Диаграмма ниже отражает этот процесс
Уведомление:Стоимость запуска этого процесса Python высока, но реальная стоимость — это процесс сериализации данных в формат, который может обрабатывать Python. Поскольку это высокозатратный расчет, и после того, как данные поступают в Python, вычисления выполняет процесс Python, а Spark не может управлять памятью воркера. Если ресурсы рабочего ограничены, рабочий потерпит неудачу. Потому что процесс Java (JVM) будет конкурировать за ресурсы памяти на той же машине, что и процесс Python. Автор рекомендует использовать Scala для написания функций, и я также согласен с тем, что изучение Scala действительно экономит время и усилия при написании, но если вы не аннотируете его хорошо, то потом будет трудно понять. Конечно, это также можно написать на Python.
Это весь процесс создания, а затем регистрации этой функции, чтобы сделать ее доступной для DataFrame.
import org.apache.spark.sql.functions.udf
# 直接这样 udf(power3 _) 就行了
val power3udf = udf(power3(_:Double):Double)
Затем вы можете использовать его как любой другой метод DataFrame.
scala> df.select(power3udf($"num")).show
+--------+
|UDF(num)|
+--------+
| 0.0|
| 1.0|
| 8.0|
| 27.0|
| 64.0|
+--------+
Но это можно использовать только как метод на DataFrame.Его можно использовать только в выражениях, а не в строковых выражениях.Запутайтесь, см. ошибку ниже, что не разрешено в строковых выражениях.
scala> df.selectExpr("power3udf(num)").show
org.apache.spark.sql.AnalysisException: Undefined function: 'power3udf'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 0
Поэтому также необходимо зарегистрировать его как функцию Spark SQL, чтобы его можно было легко использовать.
# spark 2.x
spark.udf.register("power3",power3 _)
# spark 1.x 使用,sqlContext.udf.register("power3",power3 _)
# 再次查询
scala> df.selectExpr("power3(num)").show
+-------------------------------+
|UDF:power3(cast(num as double))|
+-------------------------------+
| 0.0|
| 1.0|
| 8.0|
| 27.0|
| 64.0|
+-------------------------------+
Видно, что эти дваudf
Хотя у него то же имя, но это метод другого класса. В любом случае, это зависит от ситуации. Я протестировал и обнаружил, что метод, зарегистрированный как Spark SQL, не может быть напрямую использован в операциях выражений DataFrame.
Эта серия примечаний к переводу исследования «Spark The Definitive Guide Learning» (авторитетное руководство Spark) включена сюда:josonle/Spark-The-Definitive-Guide-Learning
больше рекомендаций:Coding Now
Некоторые заметки, сделанные в ходе исследования, а также некоторые электронные книги, видеоресурсы и некоторые блоги, веб-сайты и инструменты, которые я считаю лучшими, я обычно собираю. Включает несколько основных компонентов больших данных, машинное обучение Python и анализ данных, Linux, операционные системы, алгоритмы, сети и т. д.