300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > Spark大数据技术与应用

Spark大数据技术与应用

时间:2019-11-17 02:22:40

相关推荐

Spark大数据技术与应用

第一章

1.Spark是什么

概念

Spark是一个大规模数据处理的统一分析引擎。

特点

迅速、通用、易用、支持多种资源管理器

迅速

Spark用十分之一的计算资源,获得了比Hadoop快3倍的速度。

通用

可以用Spark进行sql查询、流式计算、机器学习、图计算。

易用

支持多种编程语言API,包括Java、Scala、Python、R

支持多种支援管理器

Spark可以使用单机集群模式来运行,也可以在Hadoop YARN、Apache Mesos、Kubernates上运行,或者在“云”里运行。

Spark可以访问HDFS、Alluxio、Apache Cassandra、Apache HBase、Apache Hive等上百种数据源。

Spark与Hadoop

区别与联系

解决问题的方式不一样

Hadoop是分布式数据设施。

Spark只是一个专门的工具,不会进行分布式数据的存储。

两者可合可分

Hadoop可用自身的MapReduce来代替Spark

Spark可不依赖Hadoop,而选择其他基于云的数据系统平台。

Spark相对于MapReduce的优势

中间结果输出

Hadoop:两步计算、磁盘存储

Spark:多步计算、内存存储

数据格式和内存布局

Hadoop:使用HDFS

Spark:使用RDD

误区!!!

1.Spark是基于内存的技术

大多数的人会认为Spark都是基于内存的计算的,但是基于如下两个情况,Spark会落地于磁盘

Spark避免不了shuffle

如果数据过大(比服务器的内存还大)也会落地于磁盘

参考链接

2.Spark要比Hadoop快 10x-100x

在比较短的作业确实能快上100倍,但是在真实的生产环境下,一般只会快 2.5x ~ 3x!

3.Spark的存在将代替Hadoop

目前备受追捧的Spark还有很多缺陷,比如:

稳定性方面,由于代码质量问题,Spark长时间运行会经常出错,在架构方面,由于大量数据被缓存在RAM中,Java回收垃圾缓慢的情况严重,导致Spark性能不稳定,在复杂场景中SQL的性能甚至不如现有的Map/Reduce。

不能处理大数据,单独机器处理数据过大,或者由于数据出现问题导致中间结果超过RAM的大小时,常常出现RAM空间不足或无法得出结果。然而,Map/Reduce运算框架可以处理大数据,在这方面,Spark不如Map/Reduce运算框架有效。

不能支持复杂的SQL统计;目前Spark支持的SQL语法完整程度还不能应用在复杂数据分析中。在可管理性方面,SparkYARN的结合不完善,这就为使用过程中埋下隐忧,容易出现各种难题。

参考链接

用途

推荐系统

实时日志系统

快速查询系统

定制广告系统

用户图计算系统

2.Spark的生态系统

生态系统

Spark Core

Spark Core提供Spark SQL、Spark Streaming、MLlib、GraphX四大模块,进行离线计算,产生RDD弹性分布式数据集。

Spark SQL && DataFrame

Spark SQL是一种结构化的数据处理模块

DataFrame是Spark SQL提供的一个编程抽象,相当于一个列数据的分布式的采集组织,在关系数据库或R/Python中的概念相当于一个

Spark Streaming

Spark Streaming处理实时数据流并容错

MLIib

MLlib是Spark提供的可扩展的机器学习

MLlib提供的API主要分为以下两类:

spark.mllib包提供主要APIspark.ml包提供构建机器学习工作流的高层次API

GraphX

GraphX是Spark面向图计算提供的框架与算法库

3.Spark的架构与原理

常见术语

架构设计

作业运行流程

核心原理

4.Spark 2.X新特性

2.x对比1.x

2.x基本上是基于1.x进行了更多的功能和模块的扩展以及性能的提升:

引入很多优秀特性,性能上有较大提升,API更易用实现了离线计算和流计算API的统一实现了Spark SQL和 Hive SQL操作API的统一

新特性

1.精简的API

统一DataFrame和Dataset接口为datasets新增SparkSession入口,统一旧的SQLContext与HiveContext支持SQL 标准,支持子查询,Spark SQL性能有2-10倍的提升

2.搭载了第二代引擎

主要思想:在运行时使用优化后的字节码,将整体查询合成为单个函数,不再使用虚拟函数调用,而是利用CPU来注册中间数据。

3.智能化程度

Structured Streaming引入了低延迟的连续处理通过改善Pandas UDFs的性能来提升PySpark支持第四种调度引擎 Kubernetes Clusters支持 Stream-to-stream Joins

第二章

1.Spark环境搭建

2.Spark集群启动与关闭

Spark运行模式

在mesos或者yarn集群管理器上部署运行在standalone和local的模式下部署运行

启动

start-all.sh(已设置好环境变量)

关闭

stop-all.sh(已设置好环境变量)

3.Spark应用提交到集群

spark-submit //提交任务命令--master spark://master:7077 //提交集群的地址--deploy-mode client //部署模式为client模式--executor-memory 512M //设置每个执行单元使用512Mb的内存空间--total-executor-cores 4//每个执行单元为4个核demo.py //实际提交的应用程序,具体以实际为准

第三章

1.Python编程语言

不用多说。。。

2.Pyspark启动与日志设置

PySpark启动

local、standalone、yarn、mesos

以local模式启动

pyspark --master local[4]

以Yarn模式启动

pyspark --master yarn-client

以Standalone模式启动

pyspark --master spark://Spark:7077

以Mesos模式启动

pyspark --master mesos://Mesos:7077

日志设置

日志级别包括:ALL,DEBUG,ERROR,FATAL,INFO,OFF,TRACE,WARN

控制日志输出内容的方式有两种:

修改log4j.properties,默认控制台输出INFO及以上级别信息

log4j.rooCategory=INFO,console

代码中使用setLogLevel(logLevel)控制日志输出

from pyspark import SparkContextsc = SparkContext("local", "First App")sc.setLogLevel("WARN")

3.PySpark开发

就是安装环境,编译器可以用Anaconda,Jupyter notebook,pycharm,pyspark是一个python的第三方库,可以通过pip安装,但是如果安装了Spark包,bin目录里会包含pyspark

第四章

1.RDD简介

几个问题

RDD是什么?

弹性分布式数据集只读的、分区记录的集合只能基于在稳定物理存储中的数据集和其他已有的RDD上执行确定性操作来创建

什么是弹性?

RDD可以在内存和磁盘之间手动或自动切换RDD可以通过转换成为其他的RDDRDD可以存储任意类型的数据

存储的内容?

初代RDD:真实数据的分区信息,单个分区的读取方法

子代RDD:初代RDD产生子代RDD的原因(动作),初代RDD的引用

数据读取发生在什么时候?

task在executor上运行的时候

五个主要属性

创建RDD

下面代码都是Python API,使用pyspark

基于外部数据源创建

distFile = sc.textFile("file:///FILE_TO_PATH")#textFile支持从多种源创建RDD,如hdfs://,s3n://distFile.count()#计算文本的行数

基于数据集合创建

data = [1, 2, 3, 4, 5]distData = sc.parallelize(data) #通过并行化创建RDD#parallelize可以传入分片个数参数,否则采用defaultParallelismdistData.count() #返回RDD中元素的个数

RDD操作

两种算子(Operation)

如何区分?

transformation算子一定会返回一个rddAction有的没有返回值,也可能有返回值,但是一定不是rdd

2.RDD算子

Transformation算子-Value型

map(f, preservesPartitioning=False)

通过对这个RDD的每个元素应用一个函数来返回一个新的RDD。

>>> rdd = sc.parallelize(['b', 'a', 'c'])>>> sorted(rdd.map(lambda x: (x, 1)).collect())[('a',1), ('b',1), ('c',1)]

flatMap(f, preservesPartition=False)

将函数应用于该RDD的所有元素,然后将结果平坦化(压扁),从而返回新的RDD。

>>> rdd = sc.parallelize([2, 3, 4])>>> rdd2 = rdd.map(lambda x: range(1, x))>>> rdd2.collect()[[1], [1, 2], [1, 2, 3]]>>> rdd1 = rdd.flatMap(lambda x: range(1, x))>>> rdd1.collect()[1, 1, 2, 1, 2, 3]

flatMap与map的区别:

mapPartitions(f, preservesPartitioning=False)

它的输入函数应用于每个分区,也就是把每个分区中的内容作为整体来处理的

>>> rdd = sc.parallelize([1, 2, 3, 4], 2)# 上面第二个参数是分区数,所以分成了[1, 2]和[3, 4]。# 不管分区数为多少,都是取下界。比如上面假如分区数为3,则界限分别在4/3和8/3,取下界则分成[1], [2], [3, 4]。>>> def f(iterator): yield sum(iterator)>>> rdd.mapPartitions(f).collect()[3, 7]

mapPartitions的性能比map要好很多,因为map必须一个元素一个元素操作,而mapPartitions是将一个分区中的所有元素拿到后再进行操作。mapParitions不适合在内存小、数据量大的环境下使用,因为它一次性获取一个分区所有元素,拿一个集合去引用所有元素,在所有元素未操作完之前,引用依然存在,已完成的数据不会释放,内存一直被占用(容易内存溢出)。因此map虽然慢,但是安全。

mapPartitionsWithIndex(f, preservesPartitioning=False)

与mapPartitions的区别:

mapPartitionsWithIndex中传入的函数要求接收两个参数第一个参数为分区编号第二个为对应分区的元素组成的迭代器 mapPartitionsWithIndex可以只对特定分区进行操作

>>> rdd = sc.parallelize([1, 2, 3, 4], 4) # [1] [2] [3] [4]>>> def f(splitIndex, iterator): yield splitIndex>>> rdd.mapPartitionsWithIndex(f).sum()6 # 0+1+2+3

filter(f)

对每个元素应用f函数,返回值为true的元素在RDD中保留,返回值为false的元素将被过滤掉。

>>> rdd = sc.parallelize([1, 2, 3, 4, 5])>>> rdd.filter(lambda x: x % 2 == 0).collect()[2, 4]

distinct(numPartitions=None)

将RDD中的元素进行去重操作

>>> rdd = sc.parallelize([1, 1, 2, 3])>>> rdd.distinct().collect()[1, 2, 3]

union(other)

合并两个RDD,结果中包含两个RDD中的所有元素

>>> rdd1 = sc.parallelize([1, 2, 3, 4])>>> rdd2 = sc.parallelize([5, 6, 7, 8])>>> rdd1.union(rdd2).collect()[1, 2, 3, 4, 5, 6, 7, 8]

intersection(other)

返回这个RDD和另一个RDD的交集,输出将不包含任何重复的元素

>>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])>>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])>>> rdd1.intersectioni(rdd2).collect()[1, 2, 3]

subtract(other)

返回RDD1中出现,但是不在RDD2中出现的元素,不去重

>>> rdd1 = sc.parallelize([('a', 1), ('b', 4), ('b', 5), ('a', 3)])>>> rdd2 = sc.parallelize([('a', 3), ('c', None)])>>> rdd1.subtract(rdd2).collect()[('a', 1), ('b', 4), ('b', 5)]

sortBy(K, ascending=True, numPartitions=None)

根据指定的KeyFunc对标准RDD进行排序

>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]>>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]>>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]

Transformation算子-Key-Value型

这类RDD称为PairRDD

mapValues(f)

针对(Key, Value)型数据中的Value进行Map操作,而不对Key进行处理。

>>> rdd = sc.parallelize([('a', 1), ('b', 2), ('c', 3)])>>> rdd.mapValues(lambda value: value + 2).glom().collect() # glom()将同一分区的元素合并到一个列表里[[('a', 3), ('b', 4), ('c', 5)]]

flatMapValues(f)

完成mapValues处理后,再对结果进行扁平化处理。

>>> rdd = sc.parallelize([('a', ['x', 'y']), ('b', ['p', 'r'])])>>> rdd.flatMapValues(lambda x: x).collect()[('a', 'x'), ('a', 'y'), ('b', 'p'), ('b', 'r')]

reduceByKey(func, numPartitions=None, partitionFunc=portable_hash)

相同Key值的value值进行对应函数运算,类似于hdp得combiner操作。

>>> from operator import add>>> rdd = sc.parallelize([('a', 1), ('b', 2), ('a', 3)])>>> rdd.reduceByKey(add).collect()[('a', 4), ('b', 2)]

groupByKey(numPartitions=None, partitionFunc=portable_hash)

将Pair RDD中相同Key的值放在一个序列中

>>> rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)])>>> rdd.groupByKey().mapValues(len).collect()[('a', 2), ('b', 1)]>>> rdd.groupByKey().mapValues(list).collect()[('a', [1 1]), ('b', [1])]

:如果分组的目的是为了聚合(比如求综合或求平均),那还是建议使用reduceByKeyaggregateByKey,二者会提供更好的性能。另外groupBy系列算子涉及到一个将数据打乱而又重新组和的操作,这个操作我们称之为shuffle

sortByKey(ascending=True, numPartitions=None, keyfunc=lambda x:x)

根据key值进行排序,默认升序

>>> tmp = [('a', 1), ('B', 2), ('1', 3), ('d', 4)]>>> sc.parallelize(tmp).sortByKey()[('1', 3), ('B', 2), ('a', 1), ('d', 4)]>>> sc.parallelize(tmp).sortByKey(True, None, keyfunc=lambda k: k.lower()).collect()[('1', 3), ('a', 1), ('B', 2), ('d', 4)]

:sortByKey只能操作PairRDD(由(key, value)对组成),而sortBy范围更广,操作标准RDD。查看sortBy源码可以发现,后者包含前者。

keys()

返回一个仅包含键的RDD

>>> m = sc.parallelize([(1, 2), (3, 4)]).keys()>>> m.collect()[1, 3]

values()

返回一个仅包含值的RDD

>>> m = sc.parallelize([(1, 2), (3, 4)]).values()>>> m.collect()[2, 4]

join(rdd)

可以将两个RDD按照相同的Key值join起来

>>> x = sc.parallelize([('a', 1), ('b', 4)])>>> y = sc.parallelize([('a', 2), ('a', 3)])>>> x.join(y).collect()[('a', (1, 2)), ('a', (1, 3))]

leftOuterJoin(rdd)

左外连接,与SQL中的左外连接一致

>>> x = sc.parallelize([('a', 1), ('b', 4)])>>> y = sc.parallelize([('a', 2)])>>> x.leftOuterJoin(y).collect()[('a', (1, 2)), ('b', (4, None))]

rightOuterJoin(rdd)

右外连接,与SQL中的右外连接一致

>>> x = sc.parallelize([('a', 1), ('b', 4)])>>> y = sc.parallelize([('a', 2)])>>> x.rightOuterJoin(y).collect()[('a', (1, 2))]

Action算子

collect()

返回RDD中的所有元素。

>>> sc.parallelize([1, 2]).collect()[1, 2]

count()

返回RDD中的所有元素的个数。

>>> sc.parallelize([1, 2]).count()2

reduce(f)

通过指定的聚合方法来对RDD中元素进行聚合。

>>> from operator import add>>> sc.parallelize([1, 2, 3 ,4 ,5]).reduce(add)15>>> sc.parallelize([]).reduce(add)Traceback (most recent call last):ValueError: Can not reduce() empty RDD

take(num)

从RDD中返回前num个元素的列表

>>> sc.parallelize([4, 6, 8, 2, 9]).take(2)[4, 6]>>> sc.parallelize([4, 6, 8, 2, 9]).take(10)[4, 6, 8, 2, 9]

takeOrdered(num)

从RDD中返回前num个最小的元素的列表,结果默认升序排列

>>> sc.parallelize([4,6,8,2,9]).takeOrdered(2)[2, 4]>>> sc.parallelize([4,6,8,2,9]).takeOrdered(10)[2, 4, 6, 8, 9]

first()

从RDD中返回第一个元素

>>> sc.parallelize([2,3,4,5,6]).first()2

top(num, key=None)

从RDD中返回最大的前num个元素列表,结果默认降序排列

如果Key参数有值,则先对各元素进行对应处理

注:会把所有数据都加载到内存,所以该方法只有在数据很小时使用

>>> sc.parallelize([10,4,2,12,3]).top(1)[12]>>> sc.parallelize([2,3,4,5,6],2).top(2)[6, 5)>>> sc.parallelize([10, 4, 2, 12, 3]).top(4, key=str)[4, 3, 2, 12] # 字符串的大小比较是从左往右

foreach(f)

遍历RDD的每个元素,并执行f函数操作,无返回值。

python API 中的foreach是相当特殊,有别于Scala API。

你若是习惯性地运行rdd.collect().foreach()必定会报错,因为collect()返回一个list,list怎么会有foreach方法?python API 不存在RDD集合对象的操作方法。

所以,只能使用rdd.foreach(f),通过运行,你会发现没有打印输出:

那输出在哪儿呢?

look,输出在我的pyspark服务器端,并且还是无序、不全的输出。

我们要知道rdd.foreach(f)是在Executor端进行分布式操作。由于Spark的分布式特性,不能保证打印命令将所有数据发送到Driver端的输出流中。自然也无法决定输出顺序。

另外,在Scala API 中,支持rdd.collect().foreach(println)循环遍历操作,并且是在Driver端输出,因此输出是完整而有序的。

foreachPartition(f)

对每个分区执行f函数操作,无返回值

>>> def f(iterator):...s = sum(iterator)...print(s)>>> sc.parallelize([1,2,3,4,5],3).foreachPartition(f) # 1 2+3+4 5195

saveAsTextFile(path, compressionCodecClass=None)

将RDD中的元素以字符串的格式存储在文件系统中。

>>> rdd = sc.parallelize(['foo', 'bar'], 2)>>> rdd.saveAsTextFile('/home/...')>>> rdd.saveAsTextFile('hdfs://host:8020/...')

collectAsMap()

以字典形式,返回PairRDD中的键值对。如果key重复,则后面的value覆盖前面的。

>>> rdd = sc.parallelize([(1, 2), (3, 4)])>>> rdd.collectAsMap(){1: 2, 3: 4}>>> rdd = sc.parallelize([(1, 2), (3, 4), (1, 4)]){1: 4, 3: 4}

countByKey()

以字典形式,返回PairRDD中key值出现的次数

>>> rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 1)])>>> rdd.countByKey()[('a', 2), ('b', 1)]

3.共享变量

累加器

为甚么要引入累加器?

看下面这段代码:

counter = 0rdd = sc.parallelize(range(10))def increment(x):global countercounter += xrdd.foreach(increment)print("Counter value: ", counter)# Counter value: 0

在输出之前,我们按惯性思维肯定会觉得答案应该是45,但结果却是0。

实际上Spark分配的多个Executor的确执行了累加操作,但是它们并没有返回值,即最后并没有把结果返回给Driver,如下图:

欲累加1,2,3,4,Executor1执行了1+2=3,Executor2执行了3+4=7,但是并没能把3和7返回给Driver。

于是引入了累加器,它告诉Spark,最后要把Executor的结果返回给Driver,并执行Merge合并操作,更新sum值。

引入accumulator,并对上面代码做如下修改:

accumulator:一个全局共享变量, 可以完成对信息进行聚合操作。

counter = sc.accumulator(0) #初始值为0的累加器rdd = sc.parallelize(range(10))def increment(x):global countercounter += xrdd.foreach(increment)print("Counter value: ", counter.value)# Counter value: 45

注意事项!!!

少加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行累加。

如上面改为rdd.map(increment)则结果为0多加:转换算子中调用累加器,如果没有行动算子的话,那么不会执行累加。

如上面改为

rdd = rdd.map(increment)rdd.collect() #计算元素总和rdd.collect()

结果为90,第二次累加第一次的结果。累加器在Driver端定义赋初始值,累加器只能在Driver端读取最后的值,在Excutor端更新。累加器不是一个调优的操作,因为如果不这样做,结果是错的。

广播变量

为什么要引入广播变量?

闭包数据,都是以Task为单位发送的,每个任务中包含闭包数据。这样可能会导致,一个Executor中含有大量重复的数据,并且占用大量的内存。

Executor其实就是一个JVM,所以在启动时,会自动分配内存。完全可以将任务中的闭包数据放置在Executor的内存中,达到共享的目的。

Spark中的广播变量就可以将闭包的数据保存到Executor的内存中。

Spark中的广播变量不能够更改:分布式共享只读变量。

概念

Spark1.x:HttpBroadcast、TorrentBroadcast

Spark2.x:TorrentBroadcast、TorrentBroadcast:点到点的传输,有效避免单点故障,提高网络利用率,减少节点压力。

Broadcast:

一个全局共享变量,可以广播只读变量。一般用于处理共享配置文件,通用的数据子,常用的数据结构等等;不适合存放太大的数据不会内存溢出,因为其数据的保存的 Storage Level 是 MEMORY_AND_DISK 的方式

#eg>>> b = sc.broadcast([1,2,3,4,5]) # b是广播变量>>> b.value[1, 2, 3, 4, 5]>>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()[1, 2, 3, 4, 5, 1, 2, 3, 4, 5]>>> b.unpersist()# 空

注意事项!!!

能不能将一个RDD使用广播变量广播出去?

不能,因为RDD是不存储数据的。可以将RDD的结果广播出去。广播变量只能在Driver端定义,不能再Executor端定义。在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。如果Executor端用到了Driver的变量,如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。如果Executor端用到了Driver的变量,如果使用广播变量在每个Executor中只有一份Driver端的变量副本。

4.RDD依赖关系

RDD只能基于在稳定物理存储中的数据集和其他已有的RDD上执行确定性操作来创建。能从其他RDD通过确定操作创建新的RDD的原因是RDD含有从其他RDD衍生(计算)出本RDD的相关信息(即血统,Lineage)Dependency代表了RDD之间的依赖关系,分为窄依赖和宽依赖

注意:一个RDD对不同的父节点可能有不同的依赖方式,可能对父节点1是宽依赖,对父节点2是窄依赖。

窄依赖

指每个父RDD的一个Partition最多被子RDD的一个Partition所使用。(独生子女)比如map、filter、union等;

宽依赖

指一个父RDD的Partition会被多个子RDD的Partition所使用;(超生)比如groupByKey、reduceByKey、sortByKey等

DAG的生成

DAG(有向无环图),原始的RDD通过一系列的转换就形成了DAG,在Spark里每一个操作生成了一个RDD,RDD之间连一条边,最后这些RDD和他们之间的边组成了一个有向无环图。

有了计算的DAG图,Spark内核下一步的任务就是根据DAG图将计算划分成任务集,也就是Stage,这样可以将任务提交到计算结点进行真正的计算。Stage划分的目的:把RDD生成一个个task提交到Executor中执行,所以需要把RDD先划分Stage再生成task。一个Stage生成n个分区个task。Stage划分依据:根据RDD之间的依赖关系将DAG划分成为不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算,对于宽依赖,由于有shuffle的存在,只能在parentRDD中处理完成后才开始接下来的计算,因此宽依赖是划分Stage的依据。Stage划分过程:找到最后的RDD,向前找,以宽依赖划分(宽依赖前的)为一个Stage,整体划分为一个Stage,直到所有RDD划分完。

5.RDD的持久化

对数据进行持久化,是为了避免多次计算同一个RDD。

如果一个RDD需要重复使用,那么需要从头再次执行来获取数据。RDD对象可以重用,但数据无法重用。持久化操作必须在行动算子执行时完成RDD对象的持久化操作既可为重用,也是因数据执行较长从而提高效率

持久化方法

持久化存储等级

持久化实例

>>> from pyspark import StorageLevel>>> rdd = sc.parallelize(['b', 'a', 'c'])>>> rdd.persist()ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:475>>> rdd.getStorageLevel() # 取得存储等级StorageLevel(False, True, False, False, 1)>>> rdd.unpersist() # 解持久化ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:475>>> rdd.persist(StorageLevel.DISK_ONLY)ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:475>>> rdd.getStorageLevel()StorageLevel(True, False, False, False, 1)

Spark RDD 结束,参考我的另一篇博客Spark RDD 编程指导进行编程练习。

第五章 DataFrame与Spark SQL

1.DataFrame

-----------------------------未完待续-------------------------------

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。