1/искра обзор
Apache Spark是用 Scala 编程语言编写的。
Apache Spark是一个实时处理计算引擎,它对标的是mapreduce批处理计算引擎。
它在内存内计算,不把中间结果写入磁盘,从而减少了io的次数,从而实现了可以实时分析数据。
因为spark可以实时执行流处理,也可以处理批处理。
2/обзор pyspark
Apache Spark是用 Scala 编程语言编写的。
为了python开发者可以使用spark,体会到spark的牛逼之处,
所以,Apache spark社区发布了一个工具pyspark,其实可以理解为开发了一个扩展包,放在了pipy中,
因此,python开发者就可以通过pyspark这个扩展包来操作spark了。
之所以能把python和spark连接起来,正是由于一个名为Py4j的库,他们才能实现这一目标。
pyspark这个扩展包目前已经在pypi中了,可以直接安装,pip install pyspark
3/В каталоге bin каталога установки spark есть команды pyspark и команды spark-shell.
Если вы запускаете pyspark
所对应的图如下,因为pyspark是python和spark的结合,所以我们看到解释器是python
Если вы выполняете spark-shell
因为spark是用scala写的,所以解释器是scala
Создание РДД:
from pyspark import SparkContext
# 先初始化一个spark环境
# 有了环境。我们才可以干事
sc = SparkContext()
①集合转RDD
data = [1,2,3]
distData = sc.parallelize(data,3) #这行代码可以将列表转为RDD数据集
distData.collect() #这行代码可以打印输出RDD数据集#【触发一个job】
distData.reduce(lambda a,b :a+b) #【触发一个job】
注意:一个CPU可以设置2~4个partition
②外部数据集转RDD
distFile = sc.textFile("hello.txt") #将外部数据转换为RDD对象
distFile.collect()
Отправить искровое задание для запуска на сервере
./spark-submit --master local[2] --name spark0301 /home/hadoop/scrip
t/spark0301.py
Общие операторы ядра RDD Spark Core в бою PySpark:
【两个算子】
①transfermation: map、filter【过滤】、group by、distinct
map()是将传入的函数依次作用到序列的每个元素,每个元素都是独自被函数“作用”一次
②action: count, reduce, collect
注意:(1)
1)All transformations in Spark are lazy,in that they do not
compute their results right away.
-- Spark的transformations很懒,因为他们没有马上计算出结果
2)Instead they just remember the transformations applied to some
base dataset
-- 相反,他们只记得应用于基本数据集
(2)
1)action triggers the computation
-- 动作触发计算
2) action returns values to driver or writes data to external storage
-- action将返回值数据写入外部存储
【单词记忆】
applied to:施加到
Instead:相反
in that:因为
external storage:外部存储
map()是将传入的函数依次作用到序列的每个元素,每个元素都是独自被函数“作用”一次
(1)map
map(func)
#将func函数作用到数据集每一个元素上,生成一个新的分布式【数据集】返回
(2)filter
filter(func)
返回所有func返回值为true的元素,生成一个新的分布式【数据集】返回
(3)flatMap #flat压扁以后做map
flatMap(func)
输入的item能够被map或0或多个items输出,返回值是一个【Sequence】
(4)groupByKey:把相同的key的数据分发到一起
['hello', 'spark', 'hello', 'world', 'hello', 'world']
('hello',1) ('spark',1)........
(5)reduceByKey: 把相同的key的数据分发到一起并进行相应的计算
mapRdd.reduceByKey(lambda a,b:a+b)
[1,1] 1+1
[1,1,1] 1+1=2+1=3
[1] 1
(6)左连接:以左表为基准
右连接:以右表为基准
全连接:以左右都为基准
Упражнение 1: Программирование оператора преобразования
from pyspark import SparkConf, SparkContext
# 准备好连接spark环境的配置文件
conf = SparkConf().setMaster("local[2]").setAppName("spark0401")
# 连接spark,相当于创建了一个spark环境
sc = SparkContext(conf=conf)
'''
map:
map(func)
将func函数作用到分布式数据集rdd的每个元素上,生成一个新的分布式数据集rdd返回
'''
print("***************************map***************************")
def my_map1():
# 创建一个序列
data = [1,2,3,4,5]
# 将序列转换为RDD
# 把数据均匀得分在rdd1中的不同分区partition上(不同节点)
rdd1 = sc.parallelize(data)
# 使用函数对RDD进行作用,生成RDD2
rdd2 = rdd1.map(lambda x:x*2)
# 使用collect()讲结果输出
print(rdd2.collect())
my_map1()
def my_map2():
data = ["dog","tiger","lion","cat","panter","eagle"]
rdd1 = sc.parallelize(data)
rdd2 = rdd1.map(lambda x:(x,1)) #进来一个x,返回一个(x,1)的形式
print(rdd2.collect())
my_map2()
print("***************************filter***************************")
def my_filter():
#给一个数据
data = [1,2,3,4,5]
rdd1 = sc.parallelize(data)
mapRdd = rdd1.map(lambda x:x**2)
filterRdd = mapRdd.filter(lambda x:x>5)
print(filterRdd.collect())
'''
filter:
filter(func)
返回所有func返回值为true的元素,生成一个新的分布式数据集返回
'''
def my_filter():
data = [1,2,3,4,5]
rdd1 = sc.parallelize(data)
mapRdd = rdd1.map(lambda x:x*2)
filterRdd = mapRdd.filter(lambda x:x > 5)
print(filterRdd.collect())
print(sc.parallelize(data).map(lambda x:x*2).filter(lambda x:x>5).collect())
my_filter()
print("***************************flatMap()***************************")
# Wordcount第一步:
def my_flatMap():
#flatMap,将东西压扁/拆开 后做map
data = ["hello spark","hello world","hello world"]
rdd1 = sc.parallelize(data)
rdd2 = rdd1.flatMap(lambda line:line.split(" "))
print( rdd2.collect() )
my_flatMap()
print("***************************groupBy()***************************")
def my_groupBy():
data = ["hello spark","hello world","hello world"]
rdd = sc.parallelize(data)
mapRdd = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))
groupByRdd = mapRdd.groupByKey()
print(groupByRdd.collect())
print(groupByRdd.map(lambda x:{x[0]:list(x[1])}).collect())
my_groupBy()
print("***************************reduceByKey()***************************")
#出现Wordcount结果
def my_reduceByKey():
data = ["hello spark", "hello world", "hello world"]
rdd = sc.parallelize( data )
mapRdd = rdd.flatMap( lambda line: line.split(" ") ).map(lambda x: (x, 1))
reduceByKeyRdd = mapRdd.reduceByKey(lambda a,b:a+b)
print(reduceByKeyRdd.collect())
my_reduceByKey()
print("***************************sortByKey()***************************")
# 将Wordcount结果中数字出现的次数进行降序排列
def my_sort():
data = ["hello spark", "hello world", "hello world"]
rdd = sc.parallelize(data)
mapRdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))
reduceByKeyRdd = mapRdd.reduceByKey(lambda a, b: a + b)
#reduceByKeyRdd.sortByKey().collect() 此时是按照字典在排序
#reduceByKeyRdd.sortByKey(False).collect()
#先对对键与值互换位置,再排序,再换位置回来
reduceByKey=reduceByKeyRdd.map(lambda x:(x[1],x[0])).sortByKey(False).map(lambda x:(x[1],x[0])).collect()
print(reduceByKey)
my_sort()
print("***************************union()***************************")
def my_union():
a = sc.parallelize([1,2,3])
b = sc.parallelize([3,4,5])
U = a.union(b).collect()
print(U)
my_union()
print("***************************union_distinct()***************************")
def my_distinct():
#这个和数学并集一样了
a = sc.parallelize([1, 2, 3])
b = sc.parallelize([3, 4, 2])
D = a.union(b).distinct().collect()
print(D)
my_distinct()
print("***************************join()***************************")
def my_join():
a = sc.parallelize([("A", "a1"), ("C", "c1"), ("D", "d1"), ("F", "f1"), ("F", "f2")])
b = sc.parallelize([("A", "a2"), ("C", "c2"), ("C", "c3"), ("E", "e1")])
J = a.fullOuterJoin(b).collect
print(J)
my_join()
sc.stop()
'''
Spark Core核心算子回顾
-- Transformation算子编程:
map、filter、groupByKey、flatMap、reduceByKey、sortByKey、join等
'''
Упражнение 2. Программирование оператора действия
from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
conf = SparkConf().setMaster("local[2]").setAppName("spark0401")
sc = SparkContext(conf=conf)
def my_action():
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data)
rdd.collect()
rdd.count()
rdd.take(3) #取前三个
rdd.max() #最大值
rdd.min() #最小值
rdd.sum() #求和
rdd.reduce(lambda x, y: x + y) #相邻两个相加
rdd.foreach(lambda x: print(x))
my_action()
sc.stop()
4. Режим работы PySpark [код может работать в нескольких режимах]:
(1)local模式:主要是开发和测试时使用
--master 集群
--name 应用程序名称
--py-file
例子:
./spark-submit --master local[2] --name spark-local /home/hadoop/
script/spark0402.py file:///home/hadoop/data/hello.txt file:///home/hadoop/
wc/output
注意:
local:运行在一个线程上
local[k]:运行在k个线程上
local[K,F]:运行在K线程上,和最大错误设置
local[*]:用本地尽可能多的线程运行
将上述例子中的local[2]改为其他模式名即可在对应模式上运行
(2)standalone模式
hdfs: NameNode DataNode
yarn: ResourceManager NodeManager
master:
worker:
$SPARK_HOME/conf/slaves
hadoop000
假设你有5台机器,就应该进行如下slaves的配置
hadoop000
hadoop001
hadoop002
hadoop003
hadoop005
如果是多台机器,那么每台机器都在相同的路径下部署spark
启动spark集群
$SPARK_HOME/sbin/start-all.sh
ps: 要在spark-env.sh中添加JAVA_HOME,否则会报错
检测:jps: Master和Worker进程,就说明我们的standalone模式安装成功
(3)yarn模式:
- spark作为作业客户端而已,他需要做的事情就是提交作业到yarn上去执行
- yarn vs standalone
yarn: 你只需要一个节点,然后提交作业即可
这时是不需要spark集群的(不需要启动master和worker的)
standalone:你的spark集群上每个节点都需要部署spark
然后需要启动spark集群(需要master和worker)
- 例子:
./spark-submit --master yarn --name spark-yarn /home/hadoop/
script/spark0402.py hdfs://hadoop000:8020/wc.txt hdfs://hadoop000:8020/
wc/output
- 指定hadoop_conf或者yarn_conf_dir是为了指定加载其路径下面的配置文件,spark 想要跑在yarn
上势必要知道HDFS 和 yarn 的信息,不然 spark怎么找到yarn
- yarn支持client和cluster模式:那么driver运行在哪里呢?
本地时是client【默认】:提交作业的进程是不能停止的,否则作业就挂了
集群时是cluster:提交完作业,那么提交作业端就可以断开了,因为driver是运行在
am(application master)里面的
- yarn相关的报错信息:
Error: Cluster deploy mode is not applicable to Spark shells
pyspark/spark-shell : 交互式运行程序 client
如何查看已经运行完的yarn的日志信息: yarn logs -applicationId <applicationId>
Пять, обзор ядра Spark
1.名词解析:
Application :基于Spark的应用程序 = 1 driver + executors
pyspark、spark-shell都是应用程序
Driver program :在py文件的主方法__main__下创建一个SparkContext
Cluster manager :从外部去获取资源,同时可以设置申请多少资源
spark-submit --master local[2]/spark://hadoop000:7077/yarn
Deploy mode :区分driver在什么地方启动
In "cluster" mode, the framework launches the driver inside of the cluster.
In "client" mode, the submitter launches the driver outside of the cluster.
Worker node :工作节点,就像manage节点
Any node that can run application code in the cluster
standalone: slave节点 slaves配置文件
yarn: nodemanager
Executor :为工作节点上的应用程序启动的进程
- runs tasks
- keeps data in memory or disk storage across them
- Each application has its own executors.
Task :一个工作单元,将被发送给一个执行者
Job :一个action对应一个job
①Spark=a driver + executors
②driver = main方法 + sparkContext
③executors是一个进程启动在worknode上,能够运行任务能够缓存数据,而且每个应用程序有一组独立的executor
④申请资源时是通过Cluster manager去申请的,可以自定义本地或集群
⑤自定义运行时,Deploy mode可以跑在cluster上也可以跑在client上
⑥Executor运行在worknode上,task运行在Executor上,task(map、flatMap等属于task)从driver上发起
⑦Job是一个并行的计算,由多个①Spark=a driver + executors
②driver = main方法 + sparkContext
③executors是一个进程启动在worknode上,能够运行任务能够缓存数据,而且每个应用程序有一组独立的executor
④申请资源时是通过Cluster manager去申请的,可以自定义本地或集群
⑤自定义运行时,Deploy mode可以跑在cluster上也可以跑在client上组成,spark中一个action(save,collect)对应一个job
⑥Stage:一个job会被拆分为一个小的任务集,一个stage的边界往往是从某个地方取数据开始,到shuffle结束
6. Искра SQL
(1)Spark SQL前世今生:
SQL:MySQL、Oracle、DB2、SQLService
- 我们很熟悉的数据处理语言是SQL
- 但是数据量越来越大 ==> 大数据(Hive、Spark Core)
-- hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,
并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。
-- Spark Core:得熟悉Java、Python等语言
- 综上:能通过SQL语言处理大数据问题是人们最喜欢的啦
出现了:SQL on Hadoop
Hive on Map Reduce
Shark【没有了。。。】
Impala:比较吃内存,Cloudera
Presto:京东再用
发展:
Hive on Map Reduce
Shark on Spark
Spark SQL:on Spark:Spark社区的
共同点:metastore mysql,基于源数据建表
Hive on Spark:Hive社区的不同于Spark SQL,在Hive能运行的SparkSQL不一定可以
(2)官方描述:Spark SQL是Apache Spark的一个模块,是用来处理结构化数据的
①编程和SQL可以无缝对接:
支持SQL和DATa Frame API(Java、Scala、Python、R)
代码示例:results = spark.sql("SELCET * FROM people")
names = results.map(lambda p:p.name)
②统一的数据访问:可以直接将Hive、ORC、JSON、JDBC结果做连接
spark.read.json("s3n://...").registerTempTable("json")
results = spark.sql(
"""SELECT *
FROM people
JOIN hson ...""")
查询和连接不同数据源【Spark SQL不仅仅是SQL】
③Spark SQL 可以使用已经存在的Hive仓库matastores,UDFs等
④提供了标准的JDBC、ODBC接口,外部工具可以直接访问Spark
结:Spark SQL 强调的是“结构化数据”而非“SQL”