писпарк: Введение

Python Spark
писпарк: Введение

1/искра обзор

Apache Spark是用 Scala 编程语言编写的。
Apache Spark是一个实时处理计算引擎,它对标的是mapreduce批处理计算引擎。
它在内存内计算,不把中间结果写入磁盘,从而减少了io的次数,从而实现了可以实时分析数据。
因为spark可以实时执行流处理,也可以处理批处理。

2/обзор pyspark

Apache Spark是用 Scala 编程语言编写的。
为了python开发者可以使用spark,体会到spark的牛逼之处,
所以,Apache spark社区发布了一个工具pyspark,其实可以理解为开发了一个扩展包,放在了pipy中,
因此,python开发者就可以通过pyspark这个扩展包来操作spark了。

之所以能把python和spark连接起来,正是由于一个名为Py4j的库,他们才能实现这一目标。

pyspark这个扩展包目前已经在pypi中了,可以直接安装,pip install pyspark

3/В каталоге bin каталога установки spark есть команды pyspark и команды spark-shell.

Если вы запускаете pyspark

所对应的图如下,因为pyspark是python和spark的结合,所以我们看到解释器是python

image.png

Если вы выполняете spark-shell

因为spark是用scala写的,所以解释器是scala

image.png

Создание РДД:

from pyspark import SparkContext

# 先初始化一个spark环境
# 有了环境。我们才可以干事
sc = SparkContext() 

①集合转RDD
   data = [1,2,3]
   distData = sc.parallelize(data,3) #这行代码可以将列表转为RDD数据集
   distData.collect() #这行代码可以打印输出RDD数据集#【触发一个job】
   distData.reduce(lambda a,b :a+b) #【触发一个job】

   注意:一个CPU可以设置2~4个partition

②外部数据集转RDD
   distFile = sc.textFile("hello.txt") #将外部数据转换为RDD对象
   distFile.collect()

Отправить искровое задание для запуска на сервере

   ./spark-submit --master local[2] --name spark0301 /home/hadoop/scrip
   t/spark0301.py

Общие операторы ядра RDD Spark Core в бою PySpark:

  【两个算子】
   ①transfermation: map、filter【过滤】、group by、distinct
          map()是将传入的函数依次作用到序列的每个元素,每个元素都是独自被函数“作用”一次
   ②action: count, reduce, collect
   注意:(1)
          1)All transformations in Spark are lazy,in that they do not
            compute their results right away.
            -- Spark的transformations很懒,因为他们没有马上计算出结果
          2)Instead they just remember the transformations applied to some 
            base dataset
            -- 相反,他们只记得应用于基本数据集
        (2)
          1)action triggers the computation
            -- 动作触发计算
          2) action returns values to driver or writes data to external storage
            --  action将返回值数据写入外部存储
  【单词记忆】
           applied to:施加到
           Instead:相反
           in that:因为
           external storage:外部存储


map()是将传入的函数依次作用到序列的每个元素,每个元素都是独自被函数“作用”一次

image.png

(1)map
  map(func) 
  #将func函数作用到数据集每一个元素上,生成一个新的分布式【数据集】返回
(2)filter
  filter(func)
  返回所有func返回值为true的元素,生成一个新的分布式【数据集】返回
(3)flatMap  #flat压扁以后做map
  flatMap(func)
  输入的item能够被map或0或多个items输出,返回值是一个【Sequence】
(4)groupByKey:把相同的key的数据分发到一起
  ['hello', 'spark', 'hello', 'world', 'hello', 'world']
  ('hello',1) ('spark',1)........
(5)reduceByKey: 把相同的key的数据分发到一起并进行相应的计算
  mapRdd.reduceByKey(lambda a,b:a+b)
  [1,1]  1+1
  [1,1,1]  1+1=2+1=3
  [1]    1
(6)左连接:以左表为基准
   右连接:以右表为基准
   全连接:以左右都为基准

Упражнение 1: Программирование оператора преобразования

    from pyspark import SparkConf, SparkContext
    
    # 准备好连接spark环境的配置文件
    conf = SparkConf().setMaster("local[2]").setAppName("spark0401")
    
    # 连接spark,相当于创建了一个spark环境
    sc = SparkContext(conf=conf)
    '''
    map:
        map(func)
        将func函数作用到分布式数据集rdd的每个元素上,生成一个新的分布式数据集rdd返回
    '''
    print("***************************map***************************")
    def my_map1():
        # 创建一个序列
        data = [1,2,3,4,5]
        
        # 将序列转换为RDD
        # 把数据均匀得分在rdd1中的不同分区partition上(不同节点)
        rdd1 = sc.parallelize(data)
        
        # 使用函数对RDD进行作用,生成RDD2
        rdd2 = rdd1.map(lambda x:x*2)
        
        # 使用collect()讲结果输出
        print(rdd2.collect())

    my_map1()

    def my_map2():
        data = ["dog","tiger","lion","cat","panter","eagle"]
        rdd1 = sc.parallelize(data)
        
        rdd2 = rdd1.map(lambda x:(x,1)) #进来一个x,返回一个(x,1)的形式
        print(rdd2.collect())
        
    my_map2()
    
    
    print("***************************filter***************************")
    
    def my_filter():
        #给一个数据
        data = [1,2,3,4,5]
        rdd1 = sc.parallelize(data)
        mapRdd = rdd1.map(lambda x:x**2)
        filterRdd = mapRdd.filter(lambda x:x>5)
        print(filterRdd.collect())
    '''
    filter:
        filter(func)
        返回所有func返回值为true的元素,生成一个新的分布式数据集返回
    '''
    def my_filter():
        data = [1,2,3,4,5]
        rdd1 = sc.parallelize(data)
        mapRdd = rdd1.map(lambda x:x*2)
        filterRdd = mapRdd.filter(lambda x:x > 5)
        print(filterRdd.collect())
        print(sc.parallelize(data).map(lambda x:x*2).filter(lambda x:x>5).collect())
        
    my_filter()
    
    print("***************************flatMap()***************************")
    
    # Wordcount第一步:
    
    def my_flatMap():
        #flatMap,将东西压扁/拆开 后做map
        data = ["hello spark","hello world","hello world"]
        rdd1 = sc.parallelize(data)
        rdd2 = rdd1.flatMap(lambda line:line.split(" "))
        print( rdd2.collect() )
        
    my_flatMap()
    
    print("***************************groupBy()***************************")
    
    def my_groupBy():
        data = ["hello spark","hello world","hello world"]
        rdd = sc.parallelize(data)
        mapRdd = rdd.flatMap(lambda line:line.split(" ")).map(lambda x:(x,1))
        groupByRdd = mapRdd.groupByKey()
        print(groupByRdd.collect())
        print(groupByRdd.map(lambda x:{x[0]:list(x[1])}).collect())

    my_groupBy()

    print("***************************reduceByKey()***************************")
    
    #出现Wordcount结果
    def my_reduceByKey():
        data = ["hello spark", "hello world", "hello world"]
        rdd = sc.parallelize( data )
        mapRdd = rdd.flatMap( lambda line: line.split(" ") ).map(lambda x: (x, 1))
        reduceByKeyRdd = mapRdd.reduceByKey(lambda a,b:a+b)
        
        print(reduceByKeyRdd.collect())
    my_reduceByKey()

    print("***************************sortByKey()***************************")
    
    # 将Wordcount结果中数字出现的次数进行降序排列
    def my_sort():
        data = ["hello spark", "hello world", "hello world"]
        rdd = sc.parallelize(data)
        mapRdd = rdd.flatMap(lambda line: line.split(" ")).map(lambda x: (x, 1))
        reduceByKeyRdd = mapRdd.reduceByKey(lambda a, b: a + b)
        #reduceByKeyRdd.sortByKey().collect() 此时是按照字典在排序
        #reduceByKeyRdd.sortByKey(False).collect()
        #先对对键与值互换位置,再排序,再换位置回来
        reduceByKey=reduceByKeyRdd.map(lambda x:(x[1],x[0])).sortByKey(False).map(lambda x:(x[1],x[0])).collect()
        print(reduceByKey)
    my_sort()

    print("***************************union()***************************")
    
    def my_union():
        a = sc.parallelize([1,2,3])
        b = sc.parallelize([3,4,5])
        U = a.union(b).collect()
        print(U)
    my_union()

    print("***************************union_distinct()***************************")
    def my_distinct():
        #这个和数学并集一样了
        a = sc.parallelize([1, 2, 3])
        b = sc.parallelize([3, 4, 2])
        D = a.union(b).distinct().collect()
        print(D)
    my_distinct()

    print("***************************join()***************************")
    def my_join():
        a = sc.parallelize([("A", "a1"), ("C", "c1"), ("D", "d1"), ("F", "f1"), ("F", "f2")])
        b = sc.parallelize([("A", "a2"), ("C", "c2"), ("C", "c3"), ("E", "e1")])
        J = a.fullOuterJoin(b).collect
        print(J)
    my_join()

    sc.stop()

'''
Spark Core核心算子回顾
 -- Transformation算子编程:
   map、filter、groupByKey、flatMap、reduceByKey、sortByKey、join等
'''

Упражнение 2. Программирование оператора действия

from pyspark import SparkConf, SparkContext
if __name__ == '__main__':
    conf = SparkConf().setMaster("local[2]").setAppName("spark0401")
    sc = SparkContext(conf=conf)
    def my_action():
        data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
        rdd = sc.parallelize(data)
        rdd.collect()
        rdd.count()
        rdd.take(3)  #取前三个
        rdd.max()    #最大值
        rdd.min()    #最小值
        rdd.sum()    #求和
        rdd.reduce(lambda x, y: x + y) #相邻两个相加
        rdd.foreach(lambda x: print(x))
    my_action()
    sc.stop()

4. Режим работы PySpark [код может работать в нескольких режимах]:

(1)local模式:主要是开发和测试时使用
  --master     集群
  --name       应用程序名称
  --py-file

  例子:
     ./spark-submit --master local[2] --name spark-local /home/hadoop/
     script/spark0402.py file:///home/hadoop/data/hello.txt file:///home/hadoop/
     wc/output
  注意:
     local:运行在一个线程上
     local[k]:运行在k个线程上
     local[K,F]:运行在K线程上,和最大错误设置
     local[*]:用本地尽可能多的线程运行
     将上述例子中的local[2]改为其他模式名即可在对应模式上运行
(2)standalone模式
	hdfs: NameNode  DataNode
	yarn: ResourceManager NodeManager

	master:
	worker:

	$SPARK_HOME/conf/slaves
		hadoop000

		假设你有5台机器,就应该进行如下slaves的配置
		hadoop000
		hadoop001
		hadoop002
		hadoop003
		hadoop005
		如果是多台机器,那么每台机器都在相同的路径下部署spark
	启动spark集群
		$SPARK_HOME/sbin/start-all.sh
		ps: 要在spark-env.sh中添加JAVA_HOME,否则会报错
                检测:jps: Master和Worker进程,就说明我们的standalone模式安装成功
(3)yarn模式:
  - spark作为作业客户端而已,他需要做的事情就是提交作业到yarn上去执行
  - yarn vs standalone
    yarn: 你只需要一个节点,然后提交作业即可   
           这时是不需要spark集群的(不需要启动master和worker的)
    standalone:你的spark集群上每个节点都需要部署spark
                然后需要启动spark集群(需要master和worker)
  - 例子:
    ./spark-submit --master yarn --name spark-yarn /home/hadoop/
     script/spark0402.py hdfs://hadoop000:8020/wc.txt hdfs://hadoop000:8020/
     wc/output
  - 指定hadoop_conf或者yarn_conf_dir是为了指定加载其路径下面的配置文件,spark 想要跑在yarn
    上势必要知道HDFS 和 yarn 的信息,不然 spark怎么找到yarn 
  - yarn支持client和cluster模式:那么driver运行在哪里呢?
	本地时是client【默认】:提交作业的进程是不能停止的,否则作业就挂了
	集群时是cluster:提交完作业,那么提交作业端就可以断开了,因为driver是运行在
                       am(application master)里面的
  - yarn相关的报错信息:
    Error: Cluster deploy mode is not applicable to Spark shells
           pyspark/spark-shell : 交互式运行程序  client
    如何查看已经运行完的yarn的日志信息: yarn logs -applicationId <applicationId>

Пять, обзор ядра Spark

1.名词解析:
Application	:基于Spark的应用程序 =  1 driver + executors
  pyspark、spark-shell都是应用程序
Driver program	:在py文件的主方法__main__下创建一个SparkContext	
Cluster manager :从外部去获取资源,同时可以设置申请多少资源
  spark-submit --master local[2]/spark://hadoop000:7077/yarn
Deploy mode	:区分driver在什么地方启动
  In "cluster" mode, the framework launches the driver inside of the cluster. 
  In "client" mode, the submitter launches the driver outside of the cluster.
Worker node	:工作节点,就像manage节点
	Any node that can run application code in the cluster
	standalone: slave节点 slaves配置文件
	yarn: nodemanager
Executor	:为工作节点上的应用程序启动的进程
	- runs tasks 
	- keeps data in memory or disk storage across them
	- Each application has its own executors.	

Task	       :一个工作单元,将被发送给一个执行者
Job	       :一个action对应一个job
①Spark=a driver + executors
②driver = main方法 + sparkContext
③executors是一个进程启动在worknode上,能够运行任务能够缓存数据,而且每个应用程序有一组独立的executor
④申请资源时是通过Cluster manager去申请的,可以自定义本地或集群
⑤自定义运行时,Deploy mode可以跑在cluster上也可以跑在client上
⑥Executor运行在worknode上,task运行在Executor上,task(map、flatMap等属于task)从driver上发起
⑦Job是一个并行的计算,由多个①Spark=a driver + executors
②driver = main方法 + sparkContext
③executors是一个进程启动在worknode上,能够运行任务能够缓存数据,而且每个应用程序有一组独立的executor
④申请资源时是通过Cluster manager去申请的,可以自定义本地或集群
⑤自定义运行时,Deploy mode可以跑在cluster上也可以跑在client上组成,spark中一个action(save,collect)对应一个job
⑥Stage:一个job会被拆分为一个小的任务集,一个stage的边界往往是从某个地方取数据开始,到shuffle结束

6. Искра SQL

(1)Spark SQL前世今生:
  SQL:MySQL、Oracle、DB2、SQLService
       - 我们很熟悉的数据处理语言是SQL
       - 但是数据量越来越大 ==> 大数据(Hive、Spark Core)
          -- hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,
            并提供简单的sql查询功能,可以将sql语句转换为MapReduce任务进行运行。
          -- Spark Core:得熟悉Java、Python等语言
      - 综上:能通过SQL语言处理大数据问题是人们最喜欢的啦
出现了:SQL on Hadoop
  Hive on Map Reduce 
  Shark【没有了。。。】
  Impala:比较吃内存,Cloudera
  Presto:京东再用
发展:
Hive on Map Reduce 
Shark on Spark
Spark SQL:on Spark:Spark社区的
共同点:metastore mysql,基于源数据建表
Hive on Spark:Hive社区的不同于Spark SQL,在Hive能运行的SparkSQL不一定可以
(2)官方描述:Spark SQL是Apache Spark的一个模块,是用来处理结构化数据的
①编程和SQL可以无缝对接:
  支持SQL和DATa Frame API(Java、Scala、Python、R)
  代码示例:results = spark.sql("SELCET * FROM people")
            names = results.map(lambda p:p.name)
②统一的数据访问:可以直接将Hive、ORC、JSON、JDBC结果做连接
   spark.read.json("s3n://...").registerTempTable("json")
   results = spark.sql(
   """SELECT *
   FROM people
   JOIN hson ...""")
 查询和连接不同数据源【Spark SQL不仅仅是SQL】
③Spark SQL 可以使用已经存在的Hive仓库matastores,UDFs等
④提供了标准的JDBC、ODBC接口,外部工具可以直接访问Spark
结:Spark SQL 强调的是“结构化数据”而非“SQL”