位置:首页 > 大数据在线教程 > Spark在线教程 > Spark核心编程

Spark核心编程

Spark 核心是整个项目的基础。它提供了分布式任务调度,调度和基本的 I/O 功能。Spark 使用一种称为RDD(弹性分布式数据集)一个专门的基础数据结构,是整个机器分区数据的逻辑集合。RDDS可以用两种方法来创建的;一个是在外部存储系统引用的数据集,第二个是通过应用转换(如map, filter, reducer, join)在现有RDDS。

RDD抽象通过语言集成API公开。这简化了编程的复杂性,因为应用程序的处理RDDS方式类似于操纵的本地集合数据。

Spark Shell

Spark提供了一个交互的shell − 一个强大的工具,以交互方式分析数据。 这是在 Scala或Python语言。Spark主要抽象称为弹性分布式数据集(RDD)项目的分布式采集。RDDS可以从Hadoop的输入格式来创建(如HDFS文件)或通过转化其他RDDS。

打开 Spark Shell

下面的命令用来打开Spark shell。
$ spark-shell

创建简单RDD

让我们从文本文件中创建一个简单的 RDD。使用下面的命令来创建一个简单的 RDD。
scala> val inputfile = sc.textFile(“input.txt”)
对上述命令的输出为:
inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at <console>:12
Spark RDD API引入了一些变革和一些动作来操纵RDD。

RDD 转换

RDD转换返回指向新的RDD,并允许创建RDDS之间的依赖关系。 在依赖关系链中的每个RDD(依赖关系的字串)具有这样的功能,用于计算其数据并具有一个指针(依赖性)到其父RDD。

Spark是懒惰的,所以什么都不会被执行,除非调用一些改造或行动将触发作业创建和执行。看单词计数示例,如下面的代码片段。

因此,RDD转型不是一组数据而是在程序中的一个步骤(可能是唯一的步骤)告诉Spark如何获取数据以及如何使用它。
下面给出是RDD转换的列表。
S.No
转换&含义
1

map(func)

返回一个新的分布式数据集,传递源的每个元素形成通过一个函数 func

2

filter(func)

返回由选择在func返回true,源元素组成了一个新的数据集
3

flatMap(func)

类似映射,但每个输入项目可以被映射到0以上输出项目(所以func应返回seq而不是单一的项目)
4

mapPartitions(func)

类似映射,只不过是单独的每个分区(块)上运行RDD,因此 func 的类型必须是Iterator<T> ⇒ Iterator<U> 对类型T在RDD上运行时

5

mapPartitionsWithIndex(func)

类似映射分区,而且还提供func 来表示分区的索引的整数值,因此 func 必须是类型 (Int, Iterator<T>) ⇒ Iterator<U> 当类型T在RDD上运行时

6

sample(withReplacement, fraction, seed)

采样数据的一小部分,有或没有更换,利用给定的随机数发生器的种子
7

union(otherDataset)

返回一个新的数据集,其中包含源数据和参数元素的结合
8

intersection(otherDataset)

返回包含在源数据和参数元素的新RDD交集
9

distinct([numTasks])

返回一个新的数据集包含源数据集的不同元素
10

groupByKey([numTasks])

当调用(K,V)数据集,返回(K, Iterable<V>) 对数据集

11

reduceByKey(func, [numTasks])

12

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

13

sortByKey([ascending], [numTasks])

14

join(otherDataset, [numTasks])

15

cogroup(otherDataset, [numTasks])

16

cartesian(otherDataset)

当上调用类型T和U的数据集,返回(T,U)对数据集(所有元素对)
17

pipe(command, [envVars])

RDD通过shell命令每个分区,例如:一个Perl或bash脚本。RDD元素被写入到进程的标准输入和线路输出,标准输出形式返回一个字符串RDD

18

coalesce(numPartitions)

减少RDD到numPartitions分区的数量。过滤大型数据集后,更高效地运行的操作
19

repartition(numPartitions)

打乱RDD数据随机创造更多或更少的分区,并在它们之间平衡。这总是打乱的所有数据在网络上
20

repartitionAndSortWithinPartitions(partitioner)

根据给定的分区重新分区RDD及在每个结果分区,排序键记录。这是调用重新分配排序在每个分区内,因为它可以推动分拣向下进入混洗机制效率更高。

动作

下表给出了操作,及其返回值的列表。
S.No 操作 & 含义
1

reduce(func)

合计数据集的元素,使用函数 func (其中有两个参数和返回一行). 该函数应该是可交换和可结合,以便它可以正确地在并行计算。

2

collect()

返回数据集的所有作为数组在驱动程序的元素。这是一个过滤器或其它操作之后返回数据的一个足够小的子集,通常是有用的

3

count()

返回该数据集的元素数
4

first()

返回的数据集的第一个元素(类似于使用(1))
5

take(n)

返回与该数据集的前n个元素的阵列。
6

takeSample (withReplacement,num, [seed])

返回数组的数据集num个元素,有或没有更换随机抽样,预指定的随机数发生器的种子可选

7

takeOrdered(n, [ordering])

返回RDD使用或者按其自然顺序或自定义比较的前第n个元素
8

saveAsTextFile(path)

写入数据集是一个文本文件中的元素(或一组文本文件),在给定的目录的本地文件系统,HDFS或任何其他的Hadoop支持的文件系统。Spark调用每个元素的 toString,将其转换为文件中的文本行

9

saveAsSequenceFile(path) (Java and Scala)

写入数据集,为Hadoop SequenceFile元素在给定的路径写入在本地文件系统,HDFS或任何其他Hadoop支持的文件系统。 这是适用于实现Hadoop可写接口RDDS的键 - 值对。在Scala中,它也可以在属于隐式转换为可写(Spark包括转换为基本类型,如 Int, Double, String 等等)类型。

10

saveAsObjectFile(path) (Java and Scala)

写入数据集的内容使用Java序列化为一个简单的格式,然后可以使用SparkContext.objectFile()加载。

11

countByKey()

仅适用于RDDS的类型 (K, V). 返回(K, Int)对与每个键的次数的一个HashMap。

12

foreach(func)

数据集的每个元素上运行函数func。这通常对于不良反应,例如更新累加器或与外部存储系统进行交互进行。

 − 在 foreach()以外修改变量,其他累加器可能会导致不确定的行为。请参阅了解闭包的更多细节。

RDD编程

让我们来看看几个RDD转换和操作RDD编程实现,用一个例子的协助说明。

示例

考虑一个单词计数的例子 − 它计算出现在文档中的每个单词。请看下面的文字为输入并保存在主目录中的 input.txt 文件。

input.txt − 作为输入文件

people are not as beautiful as they look, 
as they walk or as they talk.
they are only as beautiful  as they love, 
as they care as they share.
按照下面给出命令执行示例程序。

打开Spark-Shell

下面的命令用来打开spark shell. 通常情况下,spark 使用Scala构建。因此,Spark 程序需要在 Scala 环境中运行。

$ spark-shell 

如果Spark shell 成功打开,会发现下面的输出。看看输出“Spark 上下文可作为sc” 的最后一行表示Spark容器会自动创建Spark 上下文对象名为sc。启动程序的第一步骤之前,SparkContext 对象应该被创建。

Spark assembly has been built with Hive, including Datanucleus jars on classpath 
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop 
15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled;
   ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 
15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server 
15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. 
Welcome to 
      ____              __ 
     / __/__  ___ _____/ /__ 
    _\ \/ _ \/ _ `/ __/  '_/ 
   /___/ .__/\_,_/_/ /_/\_\   version 1.4.0 
      /_/  
		
Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) 
Type in expressions to have them evaluated. 
Spark context available as sc 
scala>

创建一个RDD

首先,我们必须使用 Spark-Scala API 读取输入文件,并创建一个RDD。

下面的命令被用于从给定位置读出的文件。这里,新的 RDD 使用输入文件名创建。这是在 textFile(“”)方法的参数字符串是用于输入文件名的绝对路径。然而,如果仅给出文件名,那么它输入文件则在当前位置。

scala> val inputfile = sc.textFile("input.txt")

执行字数转换

我们的目标是计算一个文件中的字数。分裂每一行成词创建一个平面地图(flatMap(line ⇒ line.split(“ ”)).

接下来,读每个词作为一个键和值 ‘1’ (<key, value> = <word,1>) 使用映射函数 (map(word ⇒ (word, 1)).

最后,加入类似的键值降低这些键 (reduceByKey(_+_)).

下面的命令用于执行字数统计逻辑。执行此操作后,不会有任何输出,因为这不是一个动作,这是一个转换; 指向一个新的RDD或告诉spark,用给定的数据来做什么)。

scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);

当前RDD

同时用RDD工作,如果想了解当前的RDD,那么可使用下面的命令。 它会告诉你关于当前RDD及其依赖调试的描述。

scala> counts.toDebugString

缓存转换

可以使用 persist() 或 cache() 方法标记一个RDD。在第一次计算的操作,这将被保存在存储器中的节点上。使用下面的命令来存储中间转换在内存中。

scala> counts.cache()

应用动作

应用动作(操作),比如存储所有的转换结果到一个文本文件中。saveAsTextFile(“”)方法字符串参数是输出文件夹的绝对路径。试试下面的命令来保存输出文本文件。在下面的例子中, ‘output’ 的文件夹为当前位置。

scala> counts.saveAsTextFile("output")

检查输出

打开另一个终端进入主目录(其中spark 在其他终端中执行)。下面的命令用于检查输出目录。

[hadoop@localhost ~]$ cd output/ 
[hadoop@localhost output]$ ls -1 
 
part-00000 
part-00001 
_SUCCESS

下面的命令是用来查看输出的 Part-00001 文件。

[hadoop@localhost output]$ cat part-00000

输出

(people,1) 
(are,2) 
(not,1) 
(as,8) 
(beautiful,2) 
(they, 7) 
(look,1) 
下面的命令是用来查看输出的 Part-00001 文件。
[hadoop@localhost output]$ cat part-00001 

输出

(walk, 1) 
(or, 1) 
(talk, 1) 
(only, 1) 
(love, 1) 
(care, 1) 
(share, 1) 

UN持久存储

UN持续存在之前,如果想看到用于该应用程序的存储空间,可使用下面的URL在浏览器中查看。
http://localhost:4040
这将会看到下面的屏幕,该屏幕显示用于应用程序,这些都在 Spark shell 运行的存储空间。
storage space
如果想特别的RDD存储空间,然后使用下面的命令。
Scala> counts.unpersist() 

将看到如下输出 −

15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list 
15/06/27 00:57:33 INFO BlockManager: Removing RDD 9 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) 
15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0 
15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) 
res7: cou.type = ShuffledRDD[9] at reduceByKey at <console>:14
为了验证在浏览器中的存储空间,使用下面的URL。
http://localhost:4040/
会看到下面的画面。它用于应用程序,这是在Spark shell运行存储空间。
Storage space for application