Spark学习手册(二):Spark官方手册读摘

  本来想买本Spark的书的,无奈最近买的书有点多超预算了。再看看官方的开发文档写的还是挺全的,就想着自己边读文档边自己做一下记录吧。毕竟,别人网上写的东西都是提纲挈领的没有那么细,看英文虽然繁琐一些,但也好理解一些;但是搞一个完整的手册翻译也太费精力了,这个读摘的定位跟Pro Git速查笔记一样——适合回顾而不适合学习。
  Spark与HDFS环境的搭建,请参照HDFS支撑下的Spark搭建与尝试-construct-spark-hdfs.html)。个人感觉,理解Spark的核心,是理解其RDD(Resilient Distributed Dataset)的数据模型!同时,以下部分,会尽量用Python(Python3)来测试,不过有些特性或者功能官方还没有支持。
  还有,由于本人对Spark还不太理解,文中的很多术语直接英文,不敢造次。
Spark

一、常用术语解释

  Spark中一个Application由1个driver program构成,由程序中的SparkContext定位,SparkContext可以链接各种类型的Cluster Manager(Spark自身的standalone、Mesos、YARN),然后申请executors,申请到之后将程序代码发送到executors,最终由SparkContext发送tasks到executors以执行之。

术语 说明
application spark上的用户应用程序,由一个driver program和多个executors组成
driver program 运行main并创建SparkContext的进程
worker node 任何执行代码的集群节点
executor worker node启动的程序,执行tasks并在内存或者磁盘上保存数据
task 每一个executor的执行单元
job 由多个tasks构成的并行计算操作,一般由action操作催生
stage 并行计算的核心是计算过程中不会产生shuffle,所以stage就是在shuffle边界产生的tasks集合,一个job一般由多个stages组成

二、官方首页显示的Spark的优势

  • 速度快,当数据常驻内存会比Hadoop快100倍,即使硬盘访问,速度也会快10倍以上。这同在后面的文档中强调的,可以使用persist将Spark数据常驻内存,同时惰性计算加大了计算的效率和开销有关。将数据保存到内存里面,可以减少磁盘以及HDFS等操作,对于数据挖掘这种常常需要迭代计算的情形尤为适合。
  • 接口简单可用,提供Java/Scala/Python/R的接口,个人比较喜欢Python。
  • 包含Spark SQL,Spark Streaming,MLlib,GraphX模块,可以说从数据源、算法、生成显示的部分都囊括了。
  • 和Hadoop,Mesos等的结合。暂时还没体会,不过用HDFS是刚刚滴,Spark支持本机standalone模式,后面可能会测试跟Hadoop YARN进行集成。

    三、Spark启动和实用

  • bin/pyspark会启动一个交互式的shell,并初始化SparkContext和sqlContext(注意查看提示实用的python版本),比如PYSPARK_PYTHON=python3.4 bin/pyspark;
    –master local/local[4]/spark://host:port….,指定master的地址,当为local的时候在本机执行,不进行分布式操作;
  • bin/spark-submit提交一个脚本的任务。比如
    1
    ➜  ~  bin/spark-submit /root/wordc.py

四、RDD数据表示

  RDD数据模型会对数据进行分区,可以在各个集群节点上进行并行计算。RDD的数据来源既可以是程序中的数据,也可以是来自外部存储的或者是网路上的数据。

  • 程序中的数据可以使用sc的parallelize方法,对一个可迭代或者collection的数据进行并行化生成一个可以并行化计算的分布式数据集, 此函数接受一个分区数目参数,一个分区运行一个task,一般是一个CPU分配2~4个分区,系统一般会自动根据集群的规模设置分区数目。

    1
    2
    data = [1, 2, 3, 4, 5]
    distData = sc.parallelize(data)
  • 外部数据主要是hadoop支持的数据,形式有文本数据,序列文件(SequenceFiles)以及其他Hadoop支持的输入格式;
    文本文件主要用textFile函数,把文件读入,形成以行为单位的数据集,

    1
    2
    3
    distFile = sc.textFile("/root/data.txt")
    print(distFile.collect())
    distFile.map(lambda s: len(s)).reduce(lambda a, b: a + b) #计算总的词的个数

  注意:如果指示的文件路径是本地路径,那么在各个work节点上都必须要能在相同的路径访问该文件,比如拷贝过去,或者用网络文件系统共享等到各个work节点的相同路径处;对于输入的文件,可以是路径、含*的通配符、gz压缩文件;函数还支持partition数目的参数,HDFS会把文件分block,默认一个partition对应一个block,也可以设置partition数目大于block数目。
  wholeTextFiles 可以读取一个目录下的所有文件,然后保存成文件名—内容格式;
  RDD.saveAsPickleFile和SparkContext.pickleFile,可以保存RDD以及Python的数据;
  SequenceFiles,比较特殊的,为Hadoop所用二进制形式来存储key-value模式的平面文件,其数据的读写在pyspark中实际上是进行了Python--Java数据类型的底层转换的,但是对用户来说是透明的;

1
2
3
rdd = sc.parallelize(range(1, 4)).map(lambda x: (x, "a" * x ))
rdd.saveAsSequenceFile("root/aaaa.txt")
sc.sequenceFile("root/aaaa.txt").collect()

  • Spark SQL;
  • RDD支持两种操作:transformation和action,前者是将一个数据集转换成另外一个数据集,而后者让程序计算并将结果返回给driver程序。前者比如常见的map函数,会将数据集中的每一个元素通过某个函数转换成RDD表示的数据集,而reduce就是把RDD的所有元素通过某些函数聚集起来并返回给driver程序。
      Spark所有的计算都是惰性的,只有在driver程序需要返回而执行action操作时,才会进行真正的计算,这样可以增加Spark的计算效率,减少无用计算和数据传递。
      具体的Transformation函数和Action函数见下文的列表。
      一个map/reduce的例子程序

    1
    2
    3
    lines = sc.textFile("data.txt")
    lineLengths = lines.map(lambda s: len(s))
    totalLength = lineLengths.reduce(lambda a, b: a + b)
  • Spark依赖于传奇函数使其节点进行计算任务,函数包含:lamda表达式、文件中定义的局部函数、定义在模块中的函数。
    函数模式举例如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    #!/usr/bin/python3
    from pyspark import SparkConf, SparkContext

    if __name__ == "__main__":
    def word_c(string):
    words = string.split()
    return len(words)

    conf = SparkConf().setAppName("WORD_TEST").setMaster("local[2]")
    sc = SparkContext(conf=conf)
    distFile = sc.textFile("/root/wordc.py")
    rest = distFile.map(word_c).reduce(lambda a, b: a + b)
    print("Total:%d" %(rest))

  然后,使用” bin/spark-submit /root/wordc.py”提交任务。

  • Closures
      这个可能是Java中什么闭包的概念。总之,在Spark中被worker节点执行的函数中,不要使用全局变量,如果在本地非分布式环境的话,实际使用的一个JVM虚拟机结果可能是正常的,但是如果是分布式的,那么这个变量会被序列化并传递给每个worker节点一份拷贝,在worker节点上更新的都是自己本地副本而不会反馈回driver程序,所以最终driver程序中的变量压根就没被更新。
      如果要使用全局变量,请使用Broadcasting vars和Accumulators

    • Broadcasting vars是共享的只读变量,一般worker节点上执行计算的数据,既可以通过执行函数传递过去,也可以通过这种Broadcasting vars方式传递过去,Broadcasting vars让所有worker节点保持只读的数据拷贝,而且其使用的Broadcasting算法使得这种传递更为的有效率,在使用中一旦被创建就不应当改变其值,以保证各个节点的数据都是一致的,同时也不应当在各个执行节点上被更新。

      1
      2
      3
      4
      >>> arr = [2, 3, 5, 9, 10]
      >>> broadcastVar = sc.broadcast(arr)
      >>> broadcastVar.value
      [2, 3, 5, 9, 10]
    • Accumulators能够保证在各个worker节点对该变量的更新都能安全的实现,其默认支持数字类型,代码可以继承AccumulatorParam类扩充为其他类型(只需实现zero和addInPlace函数),同时也只能在driver程序中才能读到其值,worker节点只能进行更新操作。此处还需要注意:对于在actions中的更新,系统能保证变量只会被更新一次,所以对于重启的task不会被再次更新;但在transform中的更新操作没有这个保证,重启任务可能会被更新多次,或许可以这么理解:transform自己不会进行实际的计算操作,是由action驱动的计算规则,所以无法保存task计算状态(自己猜的~)。

      1
      2
      3
      4
      5
      6
      >>> accum = sc.accumulator(0)
      >>> distFile = sc.textFile("/root/wordc.py")
      >>> distFile.map(lambda s: (s,len(s))).foreach(print) #查看??
      >>> distFile.map(lambda s: len(s)).foreach(lambda x: accum.add(x))
      >>> accum.value
      396

  这里引申出打印数据,每个工作节点的print到的标准输出都是本地的,所以如果要在driver程序的节点上看到打印结果,必须将数据收集起来然后在本地打印显示:

1
2
3
distFile = sc.textFile("/root/wordc.py")
print(distFile.collect())
print(distFile.take(3))

五、RDD的Transformations和Actions函数

  • Transformations
函数 说明
map(func) 通过func将源数据的每一个元素进行转换生成RDD
filter(func) 通过func选择那些返回为true的数据组成新的数据集
flatMap(func) 同map,但是每一个输入元素可能会被映射成0或者多个items
mapPartitions(func) 类似map,不过是在RDD各个partition上单独运行的,所以func的参数都是迭代器的(同上面结果可能会有些差异)
mapPartitionsWithIndex(func)
sample(withReplacement, fraction, seed) 取样
union(otherDataset) 两个数据集的交集
intersection(otherDataset) 两个数据集的并集
distinct([numTasks])) 取出不含重复值的数据集
groupByKey([numTasks]) 集合数据集的(K, V)产生(K, Iterable) ,注意如果是如果可能,reduceByKey/aggregateByKey性能会更好
reduceByKey(func, [numTasks]) 将(K, V)中同K的V通过func方式集合起来
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
sortByKey([ascending], [numTasks]) 通过K进行排序
join(otherDataset, [numTasks]) 将(K, V)、(K, W)联合为(K, (V, W))
cogroup(otherDataset, [numTasks]) 将(K, V)和(K, W)返回(K, (Iterable, Iterable))元祖
cartesian(otherDataset) 笛卡尔,对于T、U数据集,返回(T, U)Pair
pipe(command, [envVars]) 对每一个partition,通过将每个元素作为command的stdin输入,然后将stdout输出定位到string类型的RDD数据集
coalesce(numPartitions) 合并,减少partition数目
repartition(numPartitions) 随机的Reshuffle数据
repartitionAndSortWithinPartitions(partitioner) repartition和sort集成的更高效实现
  • Actions
函数 说明
reduce(func) 集合
collect() 将数据集的所有元素以数组的方式返回给driver program
count() 返回数据集元素的个数
first() 返回第一个元素,类似take(1)
take(n) 返回前面的n个元素
takeSample(withReplacement, num, [seed]) 返回随机的num个元素
takeOrdered(n, [ordering]) 返回通过自然排序或者定制比较器的n个元素
saveAsTextFile(path) 将每个元素调用toString方法成为一行,然后保存写入到文件中
saveAsSequenceFile(path) (Java and Scala)
saveAsObjectFile(path) (Java and Scala)
countByKey() 只适用于(K,V)类型,返回(K, Int)计数,默认并行task数为2
foreach(func) 对每个元素执行func,通常用于Accumulators等操作

六、Shuffle

  主要是数据的重新分配,涉及到执行磁盘IO、数据序列化、网络IO等操作,是十分耗费资源的操作。比如对于reduceByKey操作,系统会读取所有的partition,收集keys形成一个元祖,然后再在所有的partitions中将这些key的值进行搜集整理而得到最终的答案。为了向shuffle组织数据,Spark会产生一些map任务集来负责组织数据,reduce任务集来聚集数据,两者中间会有交互的数据文件,因此会消耗大量的内存,如果内存不够甚至会写入磁盘导致磁盘IO。
  会导致Shuffl的操作有:repartiton(repartition, coalesce);ByKey(groupByKey, reduceByKey);join(cogroup, join)

七、RDD Persistence

  Spark的缓存是在action触发后,将指定缓存的数据结果保留在某些地方,同时缓存是容错的,当任何一个partition出错数据丢失时候,缓存的数据会重新按照原来的方式计算出来。
  通过persist传递各种Storage Level,可以让缓存在内存与磁盘、平坦与压缩、CPU计算量等方面做出各种妥协和权衡。默认的级别是StorageLevel.MEMORY_ONLY,cache()函数为其缩写。
  Spark会自动跟踪缓存的使用,必要的时候会根据LRU的原则丢弃旧的缓存,当然用户也可以调用 RDD.unpersist()显式地删除某些rdd的缓存。

Spark其他部分待续~

参考