Spark学习手册(三):Spark模块学习摘读

  Spark在其基础架构之上支持四大模块,分别是SparkSQL、SparkStreaming、MLlib和GraphX,本文将对这几个模块的手册进行阅读摘录。

Spark 模块

一、Spark SQL

1.1 简介

  同Spark SQL的交互方式包括SQL、DataFrames API和Datasets API,但是其内部的执行引擎是一样的,只是对外表现的接口不一样而已。

  • SQL:可以使用基础的SQL语法或HiveQL,当在别的编程语言中执行SQL,返回的是DataFrame格式的结果。SQL的交互方式包括命令行方式,以及JDBC、ODBC对数据库的访问接口。
  • DataFrame:一种通过命名列组织的分布式数据存储,概念上和关系数据库的表等价,可以接受文件、数据库、RDD等数据源来创建。
  • Datasets:暂不支持Python,没研究。

1.2 从文件创建DataFrame

  基础SQLContext环境创建,并从json文件创建DataFrame

1
2
3
4
5
6
7
8
9
10
11
>>> from pyspark.sql import SQLContext
>>> sqlContext = SQLContext(sc)
>>> df = sqlContext.read.json("/root/spark-1.6.1-bin-hadoop2.6/examples/src/main/resources/people.json")
>>> df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+

DataFrame的常用操作接口(看意思就明白的)

1
2
3
4
5
>>> df.printSchema()
>>> df.select("name").show()
>>> df.select(df['name'], df['age'] + 1).show()
>>> df.filter(df['age'] > 21).show()
>>> df.groupBy("age").count().show()

1.3 从RDD创建DataFrame

  将已有的RDD转换为DataFrame有两种方法:一种是映射(reflection);另外一种是通过变成接口创建表,然后将RDD数据应用到表上面去

  • Reflection:通过创建(table_column_name, type)的Row对象,然后通过map将RDD每一行转换成Row,简单但是不够灵活

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    from pyspark.sql import SQLContext, Row
    sqlContext = SQLContext(sc)

    >>> lines = sc.textFile("/root/spark-1.6.1-bin-hadoop2.6/examples/src/main/resources/people.txt")
    >>> parts = lines.map(lambda l: l.split(","))
    >>> print(people.collect())
    [['Michael', ' 29'], ['Andy', ' 30'], ['Justin', ' 19']]
    >>> people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))
    >>> print(people.collect())
    [Row(age=29, name='Michael'), Row(age=30, name='Andy'), Row(age=19, name='Justin')]

    # 创建schema,并注册成表
    >>> schemaPeople = sqlContext.createDataFrame(people)
    >>> schemaPeople.registerTempTable("people")
    >>> schemaPeople.printSchema()
    root
    |-- age: long (nullable = true)
    |-- name: string (nullable = true)

    # 访问表内容
    >>> teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
    >>> print(teenagers.collect()) # teenagers 是RDD的类型,可以常规的访问
    [Row(name='Justin')]
  • 编程的方式创建
      这种一般是事先不知道表结构,比如传递过来动态解析的表结构的情况下,就只能这么操作了。
    下面的例子可以显示出来,其中列名是schemaString是通过动态的字符串创建的,所以在实际使用中可以各种方式指定,灵活性比较的强

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    from pyspark.sql import SQLContext, Row
    sqlContext = SQLContext(sc)

    >>> lines = sc.textFile("/root/spark-1.6.1-bin-hadoop2.6/examples/src/main/resources/people.txt")
    >>> parts = lines.map(lambda l: l.split(","))
    >>> people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

    # 创建schema string
    schemaString = "name age"
    >>> fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
    >>> schema = StructType(fields)
    >>> print(fields)
    [StructField(name,StringType,true), StructField(age,StringType,true)]

    # 将创建的schema应用到people数据上去
    >>> schemaPeople = sqlContext.createDataFrame(people, schema)
    >>> schemaPeople.registerTempTable("people")
    >>> schemaPeople.printSchema()
    root
    |-- name: string (nullable = true)
    |-- age: string (nullable = true)
    >>> results = sqlContext.sql("SELECT name FROM people")
    >>> print(results.collect())
    [Row(name='29'), Row(name='30'), Row(name='19')]
    >>> df2 = sqlContext.read.load("/root/namesAndAgesData.parquet") #default is parquet

其中,save支持额外的SaveMode参数,可选的值为”error”、”append”、”overwrite”、”ignore”,类似于普通文件接口,他们在目标文件存在的时候,表现为不同的形式。

1.4 Data Sources

  • 文件的加载和保存
    这里是通用方式的文件加载和保存,意味着parquet和json都能使用。

    1
    2
    3
    4
    >>> df = sqlContext.read.load("/root/spark-1.6.1-bin-hadoop2.6/examples/src/main/resources/people.json", format="json")
    >>> print(df.collect())
    [Row(age=None, name='Michael'), Row(age=30, name='Andy'), Row(age=19, name='Justin')]
    >>> df.select("name", "age").write.save("namesAndAgesData.parquet", format="parquet")
  • 编程方式加载和访问parquet格式文件的例子

    1
    2
    3
    4
    5
    >>> parquetFile = sqlContext.read.parquet("/root/namesAndAgesData.parquet")
    >>> parquetFile.registerTempTable("parquetFile"); #可以注册为临时表
    >>> teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 39")
    >>> print(teenagers.collect())
    [Row(name='Justin'), Row(name='Andy')]
  • Json文件格式
    同上面的parquet一样的,只是对应的接口改成了sqlContext.read.json。同时,json还支持如下方式创建RDD:

    1
    2
    3
    4
    5
    6
    7
    >>> anotherPeopleRDD = sc.parallelize([\
    ... '{"name":"Yin1","address":{"city1":"Columbus","state":"Ohios"}}',\
    ... '{"name":"Yin2","address":{"city2":"Columbus","state":"Ohiot"}}'])
    >>> print(anotherPeopleRDD.collect())
    ['{"name":"Yin1","address":{"city1":"Columbus","state":"Ohios"}}', '{"name":"Yin2","address":{"city2":"Columbus","state":"Ohiot"}}']
    >>> anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)
    [Row(address=Row(city1='Columbus', city2=None, city3=None, state='Ohios'), name='Yin1'), Row(address=Row(city1=None, city2='Columbus', city3=None, state='Ohiot'), name='Yin2')]
  • JDBC/ODBC访问
    启动的时候,需要指定JDBC的驱动,这个驱动可以在mysql的官方网站下载

    1
    root@ubuntu1404-node01:~/spark-1.6.1-bin-hadoop2.6# PYSPARK_PYTHON=python3.4 SPARK_CLASSPATH=mysql-connector-java-5.1.38-bin.jar bin/pyspark

然后,就可以操作数据库了

1
2
3
4
>>> from pyspark.sql import *
>>> sqlContext = SQLContext(sc)
>>> df = sqlContext.read.format('jdbc').options(url='jdbc:mysql://113.106.94.201:3306/n_xxxxx', dbtable='v5_xxxxxx',user='xxxxx', password='xxxxxx').load()
>>> df.take(3)

二、Spark Streaming

  Spark Streaming是Spark核心组件对在线流数据处理的扩展。在内部,Spark接受在线输入流数据,将其分成batches,然后再将结果输出,其提供了一个高层次抽象的DStream,表示为一种持续的输入流,其输入可以为(Kafka、Flume、Kinesis……反正我也不懂……)以及其他的DStream,在内部,DStream表示为一系列的RDDs构成。

2.1 一个简单的例子,统计词的个数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":
sc = SparkContext("local[2]", appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 1) # 1s batch interval,为应用需求延迟和系统节点资源的权衡

lines = ssc.socketTextStream("localhost",9999) # 建立Dstream,从TCP获得数据
counts = lines.flatMap(lambda line: line.split(" "))\ #flatMap,一对多,一个句子分多个词
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b)
counts.pprint() # 打印最开始的10个元素

ssc.start() # 真正从此开始,开始接受输入并计算输出
ssc.awaitTermination() # 等待结束(手动停止或者出错,调用streamingContext.stop())

从两外一个终端开启”nc -lk 9999”不断输入作为数据源;另外一个终端提交任务”PYSPARK_PYTHON=python3.4 bin/spark-submit ../wordc_net.py “。
注意:
默认的Spark打印的是INFO,这时候屏幕会不断的滚动消息,将LogLevel设置为WARN才能看的清楚。
一旦sc调用start(),就不能添加或者设置新的流计算;
一旦sc停止,就不能被重新启动了;
同一时刻一个JVM只能有一个活动的sc;
当StreamingContext调用stop()的时候,默认也会将SparkContext停止掉,要想保持SparkContext活着,那么stop()的参数请将stopSparkContext设置为false;
一个SparkContext可以重用被创建多个StreamingContext,只要之前的StreamingContext关闭停止就可以了。

2.2 Discretized Streams (DStreams,离散流)

  DStream作为Spark Streaming的抽象数据类型,其内部原理就是对于采集到的每隔一个batch interval时间间隔的数据,生成一个该阶段的RDD,然后,上层对于DStream的操作,实际上就映射到底层的RDD的操作了。
  在一个程序中,可以创建多个input DStream,这些数据都会被同时接收和处理,但是需要考虑到实际executor的执行能力。还需要特别注意的是,一个input DStream接收数据是需要占用一个thread的,所以如果有n个输入,需要>2n个执行单元(local[2n]),否则就只能收数据,而收到的数据没有线程来处理了。
  除了上面的TCP输入,还支持textFileStream输入,Spark Streaming会监测指定的文件目录,对于目录中新增的文件(嵌套目录不支持),会被当作新的数据源被处理,但是不支持文件内容的修改和新增等,这就意味着Spark Streaming一旦监测到该新增文件,就只接受一次该文件,因此一般都是别的地方将文件生成后,移动到监测的目录中。由于这种方式不需要使用receiver,所以这种情况不用增加额外的线程数目。

1
streamingContext.textFileStream(dataDirectory)

2.3 Transformations

  和之前的支持差不多,这里只列出差异的函数。由于DStream底层就是RDD支撑的,所以这里函数和RDD函数之间的关系还是比较微妙的

术语 说明
countByValue() 计数,对于每个元素K,得到(K, Long)
transform(func) 对源DStream的每个RDD使用func转换,生成新的DStream,比如增加过滤器等,十分的强大
updateStateByKey(func) 通过func将上一个时刻DStream每个key的状态,更新为新的状态,如果上个状态没有该key,其值为None

这里的例子,是动态的对输入流的词频进行累计统计操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def updateFunction(newValues, runningCount):
if runningCount is None: #如果之前不存在,初始值为0
runningCount = 0
return sum(newValues, runningCount)

if __name__ == "__main__":

sc = SparkContext(appName="PythonStreamingNetworkWordCount")
ssc = StreamingContext(sc, 1)
ssc.checkpoint("checkpoint") #必须的

lines = ssc.socketTextStream("localhost",9999)
counts = lines.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.updateStateByKey(updateFunction) #更新在此
counts.pprint()

ssc.start()
ssc.awaitTermination()

2.4 窗口操作

  需要两个参数:窗口尺寸、滑动间隔,参数值都必须是batch interval的整数倍。其对应的接口函数在下面列出,其中第一个函数用于生成一个新的DStream,其余的都是原有RDD的聚合函数,只是作用于windows窗口模式下的多个RDD而已。

术语 说明
window(windowLength, slideInterval) 形成一个新的DStream
countByWindow(windowLength, slideInterval)
reduceByWindow(func, windowLength, slideInterval)
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 这个函数比较的厉害,有个反向函数invFunc,整个函数的意思就是对进入窗口的新数据用func,而对离开窗口的数据用invFunc
countByValueAndWindow(windowLength, slideInterval, [numTasks])

  还有,默认窗口操作的数据都是persist缓存的,而默认的StorageLevel是MEMORY_ONLY_SER,不需要开发者手动调用。对于网络传递进来的数据流,默认的存储是复制到两个不同的节点上以实现容错。

2.5 DStream的输出操作

  实际类似RDD的actions,一般是现实、保存数据到网络或者文件系统上,从而驱动了真正计算的执行(最后foreachRDD实际是func的操作驱动真正计算)
| 术语 | 说明 |
| — |:—–:|
| pprint() | 打印出最开始的10个元素,主要用于调试 |
| saveAsTextFiles(prefix, [suffix]) | 将DStream的数据保存为prefix-TIME_IN_MS[.suffix] |
| foreachRDD(func) | 针对每个RDD的操作,这个func是在driver中执行的~rdd.foreachPartition |

2.6 Checkpointing

  既然是流数据的,那么程序就理应全天候跑的,Checkpointing就是设计出来应对这种应用的容错机制(系统出错\JVM崩溃等),用于带有容错机制的文件系统(如HDFS)中。Spark支持的Checkpointing包括

  • Metadata Checkpointing
    保存了包括创建streaming应用的配置、DStream的操作、正在排队还未完成计算的batches等。这是用于driver programer节点出错时候的恢复。
  • Data Checkpoint
    保存生成的RDDs到存储系统中,主要应对的是有状态的transformation,因为要生成新的RDD需要之前的RDD。当然,随着系统的执行,这样的依赖链会越来愈长,所以系统会周期性的将带有状态的transformation的中间RDDs保存到存储系统中。
    对于程序中用到updateStateByKey/reduceByKeyAndWindow(with inverse function)这类有状态的transformation,以及期望driver programer能从错误中恢复出来,那么就需要使用checkpointing机制。

通过

1
streamingContext.checkpoint(checkpointDirectory)

设置Checkpointing目录,可以开启checkpointing机制。然后通常是调用

1
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)

这个函数,当检测checkpointDirectory不存在的时候,说明是第一次执行,functionToCreateContext被调用,当driver programer出错而自动重新启动(由部署的deployment infrastructure支持)的时候,这个目录存在了,就不会再次调用functionToCreateContext函数来创建SparkContext和StreamingContext了。

2.7 实时性

  总结来说,Spark Stream实际就是一个时间窗口内的RDD操作,然后通过增加各种函数来关联之前的数据,从本质上来说,算是一个大颗粒的周期性任务,如果时间间隔太大,延迟就严重;间隔太小,反复的提交调度任务,系统的吞吐量降低,负载也会加重。

三、MLlib

  包括常用的分类、聚类、回归等算法的实现。

四、GraphX

  这个可不是图形化现实,而是基于图的算法,比如大名鼎鼎的PageRank,此处暂且不论了。

参考