【Kafka与Spark集成系列二】 Spark的安装及简单应用

【Kafka与Spark集成系列二】 Spark的安装及简单应用

下载Spark安装包是安装的第一步,下载地址为http://spark.apache.org/downloads.html。截止撰稿之时,Spark最新版本为2.3.1,如下图所示,我们可以从官网中选择spark-2.3.1-bin-hadoop2.7.tgz进行下载。

在下载过后,笔者是先将安装包拷贝至/opt目录下,然后执行相应的解压缩动作,示例如下:

[root@node1 opt]# tar zxvf spark-2.3.1-bin-hadoop2.7.tgz 
[root@node1 opt]# mv spark-2.3.1-bin-hadoop2.7 spark
[root@node1 opt]# cd spark
[root@node1 spark]#

在解压缩之后可以直接运行Spark,当然前提是要安装好JDK,并设置好环境变量JAVA_HOME。进入$SPARK_HOME/sbin目录下执行start-all.sh脚本启动Spark。脚本执行后,可以通过jps -l命令查看当前运行的进程信息,示例如下:

[root@node1 spark]# jps -l
23353 org.apache.spark.deploy.master.Master
23452 org.apache.spark.deploy.worker.Worker

可以看到Spark启动后多了Master和Worker进程,分别代表主节点和工作节点。我们还可以通过Spark提供的Web界面来查看Spark的运行情况,比如可以通过http://localhost:8080来查看Master的运行情况。

Spark中带有交互式的shell,可以用作即时数据分析。现在我们通过spark-shell来运行一个简单但又非常经典的单词统计的程序,以便可以简单的了解一下Spark的使用。首先是进入$SPARK_HOME/bin目录下(SPARK_HOME表示Spark安装的根目录,即本例中的/opt/spark)执行spark-shell命令来进行启动,可以通过–master参数来指定所需要连接的集群。spark-shell启动时,你会看到一些启动日志,示例如下:

[root@node1 spark]# bin/spark-shell --master spark://localhost:7077
2018-08-07 11:02:04 WARN Utils:66 - Your hostname, hidden.zzh.com resolves to
a loopback address: 127.0.0.1; using 10.xxx.xxx.xxx instead (on interface
eth0)
2018-08-07 11:02:04 WARN Utils:66 - Set SPARK_LOCAL_IP if you need to bind to
another address
2018-08-07 11:02:04 WARN NativeCodeLoader:62 - Unable to load native-hadoop
library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use
setLogLevel(newLevel).
Spark context Web UI available at http:// 10.xxx.xxx.xxx:4040
Spark context available as 'sc' (master = spark://localhost:7077, app id =
app-20180807110212-0000).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.3.1
/_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.

scala>

如此,我们便可以在“scala>”处键入我们想要输入的程序。

在将要演示的示例程序中,我们就近取材的以bin/spark-shell文件中的内容来进行单词统计。程序首先读取这个文件的内容,然后进行分词,在这里的分词方法是使用空格进行分割,最后统计单词出现的次数,下面就将这些步骤进行拆分,一步一步来讲解其中的细节。如无特殊说明,本章节的示例均以Scala语言进行编写。

首先是通过SparkContext(Spark在启动是已经自动创建了一个SparkContext对象,是一个叫做sc的变量)的textFile()方法读取bin/spark-shell文件,参考如下:

scala> val rdd = sc.textFile("/opt/spark/bin/spark-shell")
rdd: org.apache.spark.rdd.RDD[String] = /opt/spark/bin/spark-shell
MapPartitionsRDD[3] at textFile at <console>:24

然后使用split()方法按照空格进行分词,之后又通过flatMap()方法对处理后的单词进行展平,展平完毕之后使用map(x=>(x,1))对每个单词计数1,参考如下:

scala> val wordmap = rdd.flatMap(_.split(" ")).map(x=>(x,1))
wordmap: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at map at
<console>:25

最后使用reduceByKey(_+_)根据key也就是单词进行计数,这个过程是一个混洗(Shuffle)的过程,参考如下:

scala> val wordreduce = wordmap.reduceByKey(_+_)
wordreduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[6] at
reduceByKey at <console>:25

到这里我们便完成了单词统计,进一步的我们使用take(10)方法来获取前面10个单词统计的结果,参考如下:

scala> wordreduce.take(10)
res3: Array[(String, Int)] = Array((scala,2), (!=,1), (Unless,1), (this,4),
(starting,1), (under,4), (its,1), (reenable,2), (-Djline.terminal=unix",1),
(CYGWIN*),1))

我们发现结果并没有按照某种顺序进行排序,如果要看到诸如单词出现次数前10内容的话,还需要对统计后的结果进行排序。

scala> val wordsort = 
wordreduce.map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
wordsort: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[11] at map
at <console>:25

scala> wordsort.take(10)
res2: Array[(String, Int)] = Array(("",91), (#,37), (the,19), (in,7), (to,7),
(for,6), (if,5), (then,5), (this,4), (under,4))

上面的代码中首先使用map(x=>(x._2,x._1)对单词统计结果的键和值进行互换,然后通过sortByKey(false)方法对值进行降序排序,然后再次通过map(x=>(x._2,x._1)将键和值进行互换,最终的结果按照降序排序。


欢迎支持笔者的作品《深入理解Kafka: 核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客(ID: hiddenkafka)。
本文作者: 朱小厮

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×