【Kafka与Spark集成系列三】 Spark编程模型

【Kafka与Spark集成系列三】 Spark编程模型

在Spark中,我们通过对分布式数据集的操作来表达我们的计算意图,这些计算会自动地在集群上并行进行。这样的数据集被称为弹性分布式数据集(Resilient Distributed Dataset),简称RDD。RDD是Spark对分布式数据和计算的基本抽象。在Spark中,对数据的所有操作不外乎创建RDD、转换已有RDD以及调用RDD操作进行求值。在《Spark的安装及简单应用》的单词统计示例中,rdd和wordmap都是MapPartitionsRDD类型的RDD,而wordreduce是ShuffledRDD类型的RDD。

RDD支持2种类型的操作:转换操作(Transformation Operation)和行动操作(Action Operation)。有些资料还会细分为创建操作、转换操作、控制操作以及行动操作等4种类型。转换操作会由一个RDD生成一个新的RDD。行动操作会对RDD计算出一个结果,并把结果返回到驱动器程序中,或者把结果存储到外部存储系统中。转换操作和行动操作的区别在于Spark计算RDD的方式不同。虽然你可以在任何时候定义新的RDD,但Spark只会惰性计算这些RDD。它们只有第一次在一个行动操作中用到时,才会真正计算。表中给出了转换操作和行动操作之间对比的更多细节。

类别 函数 区别
转换操作 map、filter、groupBy、join、union、reduce、sort、partitionBy等 返回值还是RDD,不会立马提交给Spark集群运行
行动操作 count、collect、take、save、show等 返回值不是RDD,会形成DAG图,提交给Spark集群运行并立即返回结果

通过转换操作,从已有的RDD中派生出新的RDD,Spark会使用谱系图(lineage graph,很多资料也会翻译为“血统”)来记录这些不同RDD之间的依赖关系。Spark需要用这些信息来按需计算每个RDD,也可以依赖谱系图在持久化的RDD丢失部分数据时恢复所丢失的数据。行动操作会把最终求得的结果返回到驱动器程序,或者写入外部存储系统中。由于行动操作需要生产实际的输出,它们会强制执行那些求值必须用到的RDD的转换操作。

Spark中RDD计算是以分区(Partition)为单位的,将RDD划分为很多个分区分布到集群的节点中,分区的多少涉及对这个RDD进行并行计算的粒度。如下图所示,实线方框A、B、C、D、E、F、G都表示的是RDD,阴影背景的矩形则表示分区。A、B、C、D、E、F、G之间的依赖关系构成整个应用的谱系图。


依赖关系还可以分为窄依赖和宽依赖。窄依赖(Narrow Dependencies)是指每个父RDD的分区都至多被一个子RDD的分区使用,而宽依赖(Wide Dependencies)是指多个子RDD的分区依赖一个父RDD的分区。图中,C和D之间是窄依赖,而A和B之间是宽依赖。RDD中行动操作的执行将会以宽依赖为分界来构建各个调度阶段,各个调度阶段内部的窄依赖则前后链接构成流水线。图中的3个虚线方框分别代表了3个不同的调度阶段。

对于执行失败的任务,只要它对应的调度阶段的父类信息仍然可用,该任务就会分散到其它节点重新执行。如果某些调度阶段不可用,则重新提交相应的任务,并以并行方式计算丢失的地方。在整个作业中如果某个任务执行缓慢,系统则会在其他节点上执行该任务的副本,并最终取最先得到的结果作为最终的结果。

下面就以与《Spark的安装及简单应用》中相同的单词统计程序来分析一下Spark的编程模型,与《Spark的安装及简单应用》中所不同的是,这里的是一个完整的Scala程序,程序所对应的Maven依赖如下:

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.1</version>
</dependency>

具体代码示例如下:

package scala.spark.demo
import org.apache.spark.{SparkConf, SparkContext}

object WordCount {
def main(args: Array[String]): Unit ={
val conf = new SparkConf().setAppName("WordCount").setMaster("local")①
val sc = new SparkContext(conf)②
val rdd = sc.textFile("/opt/spark-2.3.1-bin-hadoop2.7/bin/spark-shell")③
val wordcount = rdd.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)④
val wordsort = wordcount.map(x=>(x._2,x._1))
.sortByKey(false).map(x=>(x._2,x._1))⑤
wordsort.saveAsTextFile("/tmp/spark")⑥
sc.stop()⑦
}
}

main()方法主体中第①和第②行中首先创建一个SparkConf对象来配置应用程序,然后基于这个SparkConf创建了一个SparkContext对象。一旦有了SparkContext,就可以用它来创建RDD,第③行代码中调用了sc.textFile()来创建一个代表文件中各行文本的RDD。第④行中rdd.flatMap(.split(“ “)).map(x=>(x,1))这一段内容的依赖关系是窄依赖,而reduceByKey(\+_)操作对单词进行计数时属于宽依赖。第⑥行中将排序后的结果存储起来。最后第⑦行中使用stop()方法来关闭应用。

在$SPARK_HOME/bin目录中还有一个spark-submit脚本,用于将应用快速部署到Spark集群中。比如这里的WordCount程序,当我们希望通过spark-submit部署时,只需要将应用打包成jar包(即下面示例中的wordcount.jar)并上传到Spark集群中,然后通过spark-submit进行部署,示例如下:

[root@node1  spark]# bin/spark-submit --class scala.spark.demo.WordCount wordcount.jar --executor-memory 1G --master spark://localhost:7077
2018-08-06 15:39:54 WARN NativeCodeLoader:62 - Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
2018-08-06 15:39:55 INFO SparkContext:54 - Running Spark version 2.3.1
2018-08-06 15:39:55 INFO SparkContext:54 - Submitted application: WordCount
2018-08-06 15:39:55 INFO SecurityManager:54 - Changing view acls to: root
2018-08-06 15:39:55 INFO SecurityManager:54 - Changing modify acls to: root
(....省略若干)
2018-08-07 12:25:47 INFO AbstractConnector:318 - Stopped
Spark@6299e2c1{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2018-08-07 12:25:47 INFO SparkUI:54 - Stopped Spark web UI at
http://10.199.172.111:4040
2018-08-07 12:25:47 INFO MapOutputTrackerMasterEndpoint:54 –
MapOutputTrackerMasterEndpoint stopped!
2018-08-07 12:25:47 INFO MemoryStore:54 - MemoryStore cleared
2018-08-07 12:25:47 INFO BlockManager:54 - BlockManager stopped
2018-08-07 12:25:47 INFO BlockManagerMaster:54 - BlockManagerMaster stopped
2018-08-07 12:25:47 INFO
OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 –
OutputCommitCoordinator stopped!
2018-08-06 15:46:57 INFO SparkContext:54 - Successfully stopped SparkContext
2018-08-06 15:46:57 INFO ShutdownHookManager:54 - Shutdown hook called
2018-08-06 15:46:57 INFO ShutdownHookManager:54 - Deleting directory
/tmp/spark-fa955139-270c-4899-82b7-4959983a1cb0
2018-08-06 15:46:57 INFO ShutdownHookManager:54 - Deleting directory
/tmp/spark-3f359966-2167-4bb9-863a-2d8a8d5e8fbe

示例中的–class用来指定应用程序的主类,这里为scala.spark.demo.WordCount;–executor-memory用来指定执行器节点的内容,这里设置为1G。最后得到的输出结果如下所示:

[root@node1 spark]# ls /tmp/spark
part-00000 _SUCCESS
[root@node1 spark]# cat /tmp/spark/part-00000
(,91)
(#,37)
(the,19)
(in,7)
(to,7)
(for,6)
(if,5)
(then,5)
(under,4)
(stty,4)
(not,4)

欢迎支持笔者的作品《深入理解Kafka: 核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客(ID: hiddenkafka)。
本文作者: 朱小厮

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×