1/ Обработать кадр? Вычислительный движок?
什么是处理框架?什么是计算引擎?二者之间又有什么关系呢?
“处理框架”和“计算引擎”之间的区别没有什么权威的定义,
但大多数我们把处理框架定义为承担类似作用的`一系列组件`,
这其中的组件包括负责数据存储的组件,比如hdfd等分布式文件存储系统。
负责计算的组件,比如mapreduce和spark,后者storm等,
负责资源管理和任务调度的组建,比如yarn
例如Apache hadoop可以看作一种以hdfs作为数据存储和以mapreduce作为计算引擎的处理框架,更准确一点叫做批处理框架。
spark是一种内存计算引擎,是一种可以进行流式计算的计算引擎。
例如apache spark可以纳入到hadoop框架中,并取代MapReduce计算引擎,这样构成的框架就是流式处理框架。
也就是说处理框架的范围大于计算引擎,及计算引擎是处理框架的一部分,只负责数据的计算。
spark是继mapreduce之后的新一代分布式计算引擎,是基于内存计算的,所有比较快,是流式的。
2 / схема архитектуры искры
spark的架构图如下所示:
spark主要有4种工具,spark core
spark sql
spark streaming
spark mllib(spark ml)
spark graphx
<1>Spark Core:
包含Spark的基本功能;尤其是定义RDD这种数据结构的API、操作以及这两者上的动作。
其他Spark的库(工具)都是构建在RDD和Spark Core之上的。
<2>Spark SQL:
提供通过Apache Hive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。
每个数据库表被当做一个RDD,Spark SQL查询被转换为Spark操作。
<3>Spark Streaming:
对实时数据流进行处理和控制。
Spark Streaming允许程序能够像普通RDD一样处理实时数据。
<4>spark MLlib:
一个常用机器学习算法库,算法被实现为对RDD的Spark操作。
这个库包含可扩展的学习算法,比如分类、回归等需要对大量数据集进行迭代的操作。
<5>GraphX:
控制图、并行图操作和计算的一组算法和工具的集合。
GraphX扩展了RDD API,包含控制图、创建子图、访问路径上所有顶点的操作
3/Что такое искра
spark是一种可以快速处理数据的计算引擎。
spark是加州大学伯克利分校的AMP实验室所开源的类似Hadoop MapReduce的通用并行计算框架,
spark拥有mapreduce批计算引擎所具有的优点,同时比mapreduce批计算引擎更好的是:
spark Job的中间输出结果可以保存在内存中,从而不再需要反复读写HDFS,减少了I/O。
因此Spark能更好地适用于数据挖掘与机器学习等需要`迭代`的算法。
在某些场景下,spark处理速度更快,可以达到`低延迟`的目的。
在spark计算引擎之前,大家用的是mapreduce批计算引擎。
mapreduce批计算引擎,适用于处理离线数据.
mapreduce具有一定的延迟性,但是当今社会,一切都是快节奏,很多场景都是不允许延迟的,
spark计算引擎的出现,就是解决这个问题的。
4/Разница между искрой и картой
Прошлое и настоящее Спарка
一提到大数据,相信很多人第一时间想到的是 Hadoop MapReduce。
没错,Hadoop MapReduce 为大数据处理技术奠定了基础,搭建了一个最基础的大数据处理框架。
近年来,spark发展速度很快。Spark与Hadoop MapReduce在业界有两种说法:
(1)Spark将代替HadoopMapReduce,成为未来大数据处理发展的方向;
(2)Spark将会和Hadoop结合,形成更大的生态圈。
其实 Spark 和 Hadoop MapReduce 的重点应用场合有所不同。
相对于 Hadoop MapReduce 来说,Spark 有点“青出于蓝而胜于蓝”的感觉,
Spark 是在Hadoop MapReduce 框架上发展起来的,在它的身上我们能明显看到 MapReduce 的影子,
所以Spark 并非从头创新,而是站在了巨人“MapReduce”的肩膀上。
千秋功罪,留于日后评说,我们暂且搁下争议。
Каковы преимущества mapreduce и spark
1) Скорость расчета высокая
大数据处理首先追求的是速度。
Spark到底有多快?
用官方的话说,“Spark允许Hadoop集群中的应用程序在内存中以100倍的速度运行,
即使在磁盘上运行也能快10倍”。
可能有的读者看到这里会大为感叹,的确如此,在有迭代计算的领域,Spark的计算速度远远超过MapReduce,
并且迭代次数越多,Spark的优势越明显。
这是因为Spark很好地利用了目前服务器内存越来越大这一优点,通过减少磁盘I/O来达到性能提升。
它们将中间处理数据的中间结果全部放到了内存中,仅在必要时才批量存入硬盘中。
2) Гибкое приложение, простое в использовании
知道 AMPLab 的 Lester 为什么放弃 MapReduce 吗?
因为他需要把很多精力放到Map和Reduce的编程模型上,极为不便。
Spark在简单的Map及Reduce操作之外,还支持SQL查询、流式查询及复杂查询,比如开箱即用的机器学习算法。
同时,用户可以在同一个工作流中无缝地搭配这些能力,应用十分灵活。
Spark核心部分的代码为63个Scala文件,非常的轻量级。
并且允许 Java、Scala、Python 开发者在自己熟悉的语言环境下进行工作,通过建立在Java、Scala、Python、SQL(应对交互式查询)的标准API以方便各行各业使用,同时还包括大量开箱即用的机器学习库。
它自带80多个高等级操作符,允许在Shell中进行交互式查询。即使是新手,也能轻松上手应用。
3) Совместимость с конкурентами
Spark可以独立运行,除了可以运行在当下的YARN集群资源管理外,还可以读取已有的任何Hadoop数据。
它可以运行在任何Hadoop数据源上,比如HBase、HDFS等。
有了这个特性,让那些想从Hadoop应用迁移到Spark上的用户方便了很多。
Spark有兼容竞争对手的胸襟,何愁大事不成?
4) Исключительная производительность обработки в реальном времени
MapReduce更加适合处理离线数据,
而Spark很好地支持实时的流计算,依赖Spark Streaming对数据进行实时处理。
Spark Streaming 具备功能强大的API,允许用户快速开发流应用程序。
Spark Streaming 无须额外的代码和配置,就可以做大量的恢复和交付工作。
5)/другое
1/mapreduce和Spark都是大数据计算引擎,数据量最起码是GB和TB级别的。
2/Hadoop包括hdfs分布式文件存储系统和mapreduce分布式计算引擎这些组件。
3/spark只是一个分布式计算引擎,它自身没有数据存储系统
所以如果想使用spark来处理数据,还需要依托其他的分布式文件存储系统来作为spark的数据输入和存储。
大家最常用的还是hdfs,所以hdfs和spark的组合还是很受欢迎的。
4/Hadoop mapreduce实质上更多是一个分布式数据基础设施: 它将巨大的数据集分派到一个由多台计算机组成的集群namenode datanode中,在集群中的多个节点上进行存储,
意味着您不需要购买和维护昂贵的硬件,Hadoop还会索引和跟踪这些数据,让大数据处理和分析效率达到前所未有的高度。
5/Hadoop除了提供了一个HDFS分布式数据存储功能之外,还提供了叫做MapReduce的大数据计算引擎
所以我们完全可以抛开Spark,使用Hadoop自身的MapReduce计算引擎来完成数据的处理,
所以如果你处理的的是离线数据,或者你可以容忍延迟,你完全可以使用mapreduce,而不使用spark。
6/spark必须依附其他的分布式文件存储系统,除了hdfs,还可以是其他的。
但Spark默认来说还是被用在Hadoop上面的,毕竟大家都认为它们的结合是最好的。
8/spark数据处理速度秒杀mapreduce
spark因为其处理数据的方式不一样,spark有DAG,RDD等机制,会比mr快很多。
MapReduce是分步对数据进行处理的:
“从集群中读取数据,进行一次计算,得到中间结果,将中间结果写到集群,
然后再从集群中读取中间结果,进行下一次的计算,将结果写到集群,等等”
反观Spark:
它会在内存中以接近“实时”的时间完成所有的数据计算,
从集群中读取数据,完成所有必须的分析处理,将结果写回集群,最终完成,中间不回写hdfs。
Spark的批处理速度比MapReduce快近10倍,内存中的数据分析速度则快近100倍,
如果需要处理的数据和结果需求大部分情况下是静态的,且你也有耐心等待批处理的完成的话,MapReduce的处理方式也是完全可以接受的,
但如果你需要对流数据进行分析,比如那些来自于工厂的传感器收集回来的数据,又或者说你的应用是需要多重数据处理的,那么你也许更应该使用Spark进行处理,
大部分机器学习算法都是需要多重数据处理的,此外,通常会用到Spark的应用场景有以下方面:
实时的市场活动,
在线产品推荐,
网络安全分析,
机器日记监控等。
9/灾难恢复
两者的灾难恢复方式迥异,但是都很不错。
因为Hadoop将每次处理后的数据都写入到磁盘上,所以其天生就能很有弹性的对系统错误进行处理;
Spark的数据对象存储在分布于数据集群中的叫做弹性分布式数据集(RDD: Resilient Distributed Dataset)中,这些数据对象既可以放在内存,也可以放在磁盘,
所以RDD同样也可以提供完成的灾难恢复功能。
3/ Важные понятия в искре
<1>RDD
RDD:即弹性分布式数据集
Resilient(弹性的) Distributed(分布式) Datasets(数据集)
弹性可理解为:数据存储是弹性的,可存储在内存,也可存储在磁盘,可以自由切换存储的模式。
是优先存储在内存中的,如果存不下了,就写入到本地磁盘中,这叫做溢写,或者溢出
RDD中的partition大小不同,数量不定。
分布式可理解为:数据分布在集群的不同的节点上。
RDD是弹性分布式数据集的逻辑抽象概念,物理数据存储在不同的节点上,但对用户透明,用户不需要知道数据实际存在哪台机器。
RDD包含的内容下图所示:
RDD为啥设置为只读的?
只能读,这表示数据是不变的,这保证了RDD的数据一致性。
在计算过程中更安全可靠,此外RDD可能包含多个分区,这些分区可能在不同的机器上。
对数据的计算函数:RDD包含了对数据的计算函数,也就是得到这个RDD所要经过的计算。
计算数据的位置:对用户而言不需要知道数据在哪里,这些信息隐含在RDD的结构中。
分区器:对数据分区依赖的分区算法,如hash分区器
依赖的RDD信息:该RDD可能依赖的父RDD信息,用于失败重算或计算的DAG划分。
RDD是Spark数据结构最基本的抽象化概念之一。
本质上 RDD是不存数据的,存的是计算逻辑
根据上图,我们可以打个形象的比方:
有一个流水线工厂里面有工人RDD1、RDD2、RDD3,
当一个工件下来的时候,RDD1做的是flatMap()加工,
做完之后,被加工后的工件传到RDD2,然后RDD2做map()加工,
之后再传给RDD3做一些其他加工。
最后,直到工件加工完成出货为止。。。
RDD采用这样的一种弹性分布式数据集当作临时结果,达到对数据的计算优化和高效处理
注:从前到后的加工顺序可以理解为 DAG有向无环图
根据上图,我们可以总结出RDD的特性:
①一个RDD可以有多个分区,这些分区可以在不同的节点上。
②对RDD进行一个函数操作,会对该RDD内的所有分区中的所有数据都执行相同的函数操作。
③一个RDD依赖于其他RDD,RDD1->RDD2->RDD3->RDD4->RDD5,
若RDD1中某节点数据丢失,
后面的RDD会根据前面的信息进行重新计算
④对于Key-Value的RDD可以制定一个partitioner,告诉他如何分片。常用hash/range
⑤移动数据不如移动计算,注:移动数据,不如移动计算。考虑磁盘IO和网络资源传输等
RDD之间的依赖关系?
前面的RDD是后面的RDD的父RDD,后面的RDD是前面的RDD的子RDD。
依赖关系有2种,窄依赖narrow dependence,宽依赖wide dependence。
窄依赖:父rdd的一个分区,只能被子rdd的一个分区使用
宽依赖:父rdd的一个分区,可以被子rdd的多个分区使用。
一般来说,窄依赖不会发生洗牌(shuller),宽依赖会发生洗牌,shuller。
RDD之间的依赖有血统lineage这样一个概念。回记录rdd的元数据和转换行为,以便恢复丢失的分区数据。
<2>DAG
有向无环图。
有方向的,没有闭环的,
相对于mapreduce计算引擎,spark支持DAG,能缓存中间数据,
减少数据落盘次数,不用频繁对磁盘进行IO。
<3>RDD和DAG之间的关系
多个RDD之间相连,最后这些RDD和他们之间的边组成一个有向无环图,就是这个DAG。
最终构成了一个spark job的拓扑结构,有向无环图。
不只是spark,现在很多计算引擎都是dag模型的,例如tez就是mr的dag改进。
<4>partition分区
partition是弹性分布式数据集RDD中的最小单元,
RDD是由分布在集群中各个节点上的各个partition组成的
同一个RDD中的partition大小不一(它不像hdfd中的数据块block,大小是一样的128M),
而且不同rdd中的partition的数量也是不同的。
是根据application里的算子和最初从分布式文件存储系统(比如hdfs)读入的数块数量决定的,
同一个RDD中,为什么要分区?
RDD是一种弹性分布式的数据集,
由于数据量很大,因此要它被切分并存储在各个节点的分区partition当中。
从而当我们对RDD进行操作时,实际上是对每个分区中的数据并行操作。
把数据分配在多个partition中,这样并行的更多,速度更快。
RDD中是如何进行分区的?
有3种方法:
1)HashPartitioner
HashPartitioner确定分区的方式:
partition = key.hashCode () % numPartitions
弊端:弊端是数据不均匀,容易导致数据倾斜,极端情况下某几个分区会拥有rdd的所有数据。
2)RangePartitioner
RangePartitioner会对key值进行排序,然后将key值被划分成分区份数key值集合。
特点:RangePartitioner分区则尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,
也就是说一个分区中的元素肯定都是比另一个分区内的元素小或者大;
但是分区内的元素是不能保证顺序的。
简单的说就是将一定范围内的数映射到某一个分区内。
3)CustomPartitioner
CustomPartitioner可以根据自己具体的应用需求,自定义分区。
spark从hdfs读取数据文件,默认是怎么分区的?
Spark从HDFS读入文件的分区数默认等于HDFS文件的块数(blocks),
HDFS中的block是分布式存储的最小单元。
如果我们上传一个30GB的非压缩的文件到HDFS,HDFS默认的块容量大小128MB,因此该文件在HDFS上会被分为235块(30GB/128MB);
Spark读取SparkContext.textFile()读取该文件,默认分区数等于块数即235。
分区数越多越好吗?
不是的,分区数太多意味着任务task数太多,
每次调度任务也是很耗时的,所以分区数太多会导致总体耗时增多。
分区数太少会有什么影响?
分区数太少的话,会导致一些节点(集群中的节点)没有分配到任务;
另一方面,分区数少则每个分区要处理的数据量就会增大,从而对每个节点的内存要求就会提高;
还有分区数不合理,会导致数据倾斜问题。
合理的分区数是多少?如何设置?
总核数=executor-cores * num-executor
一般合理的分区数设置为总核数的2~3倍
<5>算子
有2种算子:转换算子,action算子(动作算子)
<6>SparkContext类
这是连接spark集群的类,通过这个类的实例化对象,我们可以创建连接到spark集群,
这就相当于创建了一个环境。
sc = pyspark.SparkContext( conf )
sc是一个实例化对象,说白了,就是一个环境(从字面上理解,上下文环境),spark环境。
有了这样一个环境,我们才能开发spark应用程序。
SparkContext意义:主入口点
SparkContext作用:连接Spark集群
SparkConf作用:创建SparkContext前得使用SparkConf进行配置,以键值对形式进行
①创建SparkContext
- 连接到Spark“集群”:local、standalone、yarn、mesos
- 通过SparkContext来创建RDD、广播变量到集群
②创建SparkContext前还需要创建SparkConf对象
③SparkConf().setAppName(appname).setMaster('local')这个设置高于系统设置
④pyspark.SparkContext连接到Spark‘集群’即master(local[单机]、Standalone[标准]
、yarn(hadoop上)、mesos),创建RDD,广播变量到集群
⑤代码:
conf这个变量,其就是就是一些配置。比如名称,集群模式等设置。
conf = SparkConf().setAppName(‘xxxxx’).setMaster('local')
sc = SparkContext(Conf=Conf)
<7>SparkConf
是在创建sc环境的时候,指定的一些参数。
比如appname,集群模式等。
4/ Как spark выполняет распределенные вычисления
上图是一个spark的wordcount例子,根据上述stage划分原则,这个spark job划分为2个stage,
stage1中包括3个rdd.
rdd1通过flatmap()得到rdd2,
rdd2通过map()得到rdd3.
然后rdd3通过reducebykey()得到rdd4.
最后写入到hdfs中。
有三行,分别是数据读取、计算和存储过程。
仅看代码,用户根本体会不到背后,数据是并行计算的。
从图中能看出数据分布在不同分区(也可以理解不同机器节点上),
数据经过flatMap、map和reduceByKey算子在不同RDD的分区中流转,
这些算子(action)就是上面所说对RDD对数据进行计算的函数.
下图从更高角度看:
spark的运行架构由Driver(可理解为master)和Executor(可理解为worker或slave)组成,
Driver负责把用户代码进行DAG切分,划分为不同的Stage,
然后把每个Stage对应的task调度提交到Executor进行计算,
这样Executor就并行执行同一个Stage的task。
5/искровой перекос данных
数据倾斜,简单来说就是数据分配不均匀,集群中有的节点分的多,有的节点分的少。
从而导致分的多的节点需要更多的时间才能完成task,所以整个job的计算时间也就随着增多。
执行spark job时,数据倾斜一般发生在shuffle过程中,因为spark的shuffle过程需要进行数据的重新划分
在执行shuffle过程中,spark把各个节点上相同key的数据拉取到同一个处理节点的task中进行处理,
具体来说,就是把各个节点中的各个partition中的相同key的数据拉到一个节点上进行处理。
比如对数据按照某个维度key进行aggr集合或者join等shuffle操作。
在此过程中,如果不同的key的数据量差别很大,恰巧这样的key分到了不同的节点上,则会导致数据倾斜。
例如一个聚合作业中,大部分key对应100条数据,但是少数个别key却对应了100万条左右的数据,
那么在执行时若一个task处理一个key对应的数据,则大部分task节点很快计算完成,个别task要处理100万条数据,所以需要花费较多的时间才能计算完成。
而整个spark作业的运行速度是由运行时间最长的task决定的,这里整个作业的运行时间变得很长,不能充分利用spark的并行能力,极大地影响作业的处理性能。
如上图,是一个简单的数据倾斜示意图,在shuffle过程中对数据按照key进行重新划分,
其中一个key("hello")对应的数据量远大于其他key("world","start")的数据量,
从而出现了数据倾斜。
这就导致后续的task处理任务中,task1的运行时间肯定远远高于其他的task2、task3的运行时间。
该spark job的运行时间也由task1的运行时间决定。
因此,在处理过程中出现数据倾斜时,spark作业的运行会非常缓慢,无法体现出spark处理大数据的高效并行优势,
甚至可能因为某些task处理的数据量过大导致内存(OOM,out of memory)溢出,使得整个作业无法正常执行。
6 / феномен наклона искровых данных
数据倾斜发生后,会有2种现象
<1>大部份的task执行的时候会很快,当发生数据倾斜的task会执行很长时间。
<2>有时候数据倾斜直接报OOM即:JVM Out Of Memory内存溢出的错误。
这就不是简单的计算速度慢的问题了,而是根本没有内存去处理了,导致spark job任务挂掉。
7/Spark решение для искажения данных
Метод оптимизации фильтра с наклонным ключом
在spark job作业中发生数据倾斜,
如果我们能够确定只是少数个别的key数据量过大,导致数据倾斜,
而这些key又对计算结果影响不大,在这种情况下,我们可以采用这种过滤key的方法。
针对发生倾斜的key,预先直接过滤掉它们,使得后面的计算能够高效地快速完成。
也就是说:在执行spark作业的时候,先对数据做一个调研,看看数据量比较大的key都是什么样的数据,
如果都是一些空值,或者是一些不重要的key,则可以直接过滤掉。
Улучшите метод оптимизации параллелизма для операций в случайном порядке, искусственно указав параллелизм.
在spark的action算子中,有一些常用的action算子会触发shuffle操作,
如reduceByKey、aggregateByKey、groupByKey、cogroup、join、repartition等。
spark job出现数据倾斜时,很可能就是我们的开发代码中使用的这些算子导致的,
因为这些算子的执行会触发shuffle过程,进而引起数据的重新划分,导致数据倾斜的产生。
默认情况下,我们不会考虑shuffle操作的并行度分配,而是交由spark控制。
但是spark的默认shuffle并行度(即shuffle read task的数量)为200,这对于很多场景都有点过小。
在出现数据倾斜时,我们可以显式提高shuffle操作的并行度,以缓解某些一般的数据倾斜情况。
这种方法需要在使用shuffle类算子时,自己指定并行参数。
在开发中我们可以对使用到的shuffle类算子预先传入一个并行参数,
如aggregateByKey(1000),显式设置该算子执行时shuffle read task的数量为1000,
增加实际执行的并行度到1000。
这样增加shuffle read task的数量后,可以让原本分配给一个task的多个key数据重新划分给多个task,通过提高并行度再次分散数据,从而减少每个task的处理数据量,有效缓解和减轻数据倾斜的影响。
上图展示了一个并行度优化的简单情形。
如上图所示,由于低并行度发生数据倾斜问题时,在提高了shuffle操作的并行度之后,
之前由一个task处理的数据量被重新分散到不同的多个task(task1、task2、task3)中进行处理,
这样原来的各个task每次处理的数据量减少很多,从而缓解了存在的数据倾斜问题,能够明显提高处理速度。
显式提高shuffle操作的并行度的方法也是一种易于实现的优化方法。
这种方法可以有效缓解数据量较大而并行度较低的数据倾斜问题,但是它并不能彻底根除倾斜问题。
如果数据倾斜的原因是某些(个)key数据量较大,则这时提高并行度并不能改善数据倾斜的低性能。
同样的这种方法适用场景也很有限,只适用于一些普通场景。
Добавьте случайные числа и агрегируйте один раз, а затем снова агрегируйте.
在spark执行作业中,涉及聚合aggr操作时往往会比较容易出现数据倾斜现象。
如果在spark作业执行聚合类操作算子如reduceByKey、aggregateByKey等的过程中,发生数据倾斜时,
即出现某些key要聚合的数据量较大的情况下,我们可以采用两阶段聚合优化方法,先将相同的数据打乱成不同的局部数据,进行局部聚合后再统一合并聚合。
7/искровой оператор
<1>transform,转换算子
1) Оператор преобразования типа значения, это преобразование не запускает отправку задания, а обрабатываемые элементы данных являются данными типа значения. 2) Оператор преобразования типа «ключ-значение», это преобразование не запускает отправку задания, а обрабатываемые элементы данных являются данными типа «ключ-значение». распараллелить ([х, х, х, х, х, х]) Эта функция является оператором преобразования, который просто распределяет данные по разным разделам (узлам), а не выполняет операций.
<2>action,动作算子
Такие операторы инициируют SparkContext для отправки заданий. Простейший оператор действия collect() rdd.map(функция) rdd.filter(lmabda x: x > 1) rdd.distinct() rdd.randomSplit([0.4,0.6]) разделить набор данных, 40%, 60%
8/Основное различие и связь между spark ml и spark mlib
ml和mllib都是spark中的机器学习库(工具),目前常用的机器学习功能2个库都能满足需求。
但是spark官方推荐使用spark ml, 因为spark ml功能更全面更灵活,未来会主要支持spark ml
mllib很有可能会被废弃(据说可能是在spark3.0中deprecated)。
ml主要操作的是DataFrame, 而mllib操作的是RDD,也就是说二者面向的数据集不一样。
相比于mllib在RDD提供的基础操作,ml在DataFrame上的抽象级别更高,数据和操作耦合度更低。
ml中的操作可以使用pipeline, 跟sklearn一样,可以把很多操作(比如特征缩放/特征编码/模型训练等)以管道的形式串起来,然后让数据在这个管道中流动。大家可以脑补一下Linux管道在做任务组合时有多么方便。
ml中无论是什么模型,都提供了统一的算法操作接口,比如模型训练都是`fit`;不像mllib中不同模型会有各种各样的`trainXXX`。
mllib在spark2.0之后进入`仅维护状态`, 这个状态通常只修复BUG不增加新功能。
以上就是ml和mllib的主要异同点。下面是ml和mllib逻辑回归的例子,可以对比看一下,虽然都是模型训练和预测,但是画风很不一样。
менеджер кластера управления искровым кластером
当前支持三种集群管理器:
<1>spark独立集群管理器(Standalone)
一种简单的Spark集群管理器,很容易建立集群,基于Spark自带的的Master-Worker集群
<2>Apache Mesos
一种能够运行Haoop MapReduce和服务应用的集群管理器
<3>Hadoop YARN(集群资源的管理,包括资源管理和任务调度)
Spark可以和Hadoop集成,利用Yarn进行资源调度
如果在集群中仅有Spark程序,那么可以使用Spark独立的集群管理器。
如果有其他的mapreduce程序,那么需要使用Yarn或者Mesos管理器。
其中基于Yarn有两种提交模式,一种是基于Yarn的yarn-cluster模式,一种是基于Yarn的yarn-client模式。
искра и хауп
Hadoop框架有两个核心组件,分布式文件存储模块HDFS和分布式计算引擎Mapreduce
spark本身只是一个分布式计算引擎,并没有自己的分布式文件存储系统,因此spark的分析大多依赖于Hadoop的分布式文件存储系统HDFS
Hadoop的Mapreduce计算引擎与spark计算引擎都可以进行数据计算,
而相比于Mapreduce适合处理离线数据(允许一定的延迟),spark的速度更快并且提供的功能更加丰富。
二者的关系图如下所示:
Широкие и узкие зависимости
窄依赖(后面变窄了)
窄依赖是指1个父RDD的一个分区对应1个子RDD的一个分区。
换句话说,一个父RDD的一个分区对应于一个子RDD的一个分区,或者多个父RDD的分区对应于一个子RDD的分区。
所以窄依赖又可以分为两种情况:
1个子RDD的分区对应于1个父RDD的分区,比如map,filter,union等算子
1个子RDD的分区对应于N个父RDD的分区,比如co-partioned join
宽依赖(后面变宽了)
宽依赖是指1个父RDD分区对应多个子RDD分区。宽依赖有分为两种情况
1个父RDD对应非全部多个子RDD分区,比如groupByKey,reduceByKey,sortByKey
1个父RDD对应所有子RDD分区,比如未经协同划分的join
总结:如果父RDD分区对应1个子RDD的分区就是窄依赖,否则就是宽依赖。
spark的这种依赖关系的设置,使得spark天然具有容错的优势。
сценическое разделение
因为shuffle必须等到所有的父rdd的分区数据都准备就绪了,后面的子rdd才能开始读取数据。
因此spark的设计是让父rdd把结果写在本地磁盘,完全写完之后(准备好之后),再通知后面的rdd。
后面的rdd则去读取父rdd存在本地的数据作为自己的输入,然后进行计算。
由于上述特征,使得shuffle过程必须分为2个阶段stage才能完成。
stage1:父rdd把结果写到本地磁盘,例如reducebykey,根据key进行聚合。
stage2:后面的rdd(子rdd)读取所有父rdd的数据作为输入数据,开始计算。
Почему данные родительского rdd записываются на локальный диск, а не помещаются в память
后面的rdd(也就是子rdd)要读取这个数据,如果是存储在内存中,假如出现数据丢失的情况,则后面所有的步骤都不能进行了,这违背了父rdd需要全部ready的原则。
同一个stage中的task是并行计算的,下一个stage要等到前一个stage ready之后才会进行。
spark是根据是否有宽依赖作为依据来划分stage的。
最后的结果阶段叫做resultstage,之前的结果阶段是shufflemapstage,如下图所示:
从上图可以知道:
<1>遇到宽依赖就断开,遇到窄依赖就把当前的rdd加入到该stage
<2>每个stage中的task的数据由该stage中最后一个rdd的分区的数量决定
<3>最后一个stage里面的任务类型是resulttask,前面所有的stage的任务类型是shufflemaptask
<4>