Pyspark-RDD

RDD

RDD(Resilient Distributed Dataset)弹性分布式数据集,是spark中最基本的数据抽象,代表一个不可变、可分区、其中元素可并行计算的集合

  • Resilient:RDD中的数据可存储再内存或磁盘中
  • Distributed:分布式存储数据(跨机器/跨进程),用于分布式计算
  • Dataset:一个用于存放数据的数据集合

特性

分区

RDD分区是RDD数据存储的最小单位,一份RDD数据本质上分隔成了多个分区

1
2
3
# 存储9个数字,设立三个分区
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)
rdd.glom().collect()
1
[[1,2,3],[4,5,6],[7,8,9]]

RDD方法会作用在其所有方法上

1
rdd.map(lambda x: x * 10).collect()
1
[10,20,30,40,50,60,70,80,90]

RDD之间具有依赖关系

1
2
3
4
5
6
sc = SparkContext(conf=conf)
rdd1 = sc.textFile("../test.text")
rdd2 = rdd1.flatMap(lambda x: x.split(' '))
rdd3 = rdd2.map(lambda x: (x, 1))
rdd4 = rdd3.reduceByKey(lambda a, b: a+b)
print(rdd4.collect())

Key-Value型RDD可以有分区器

KV型RDD:RDD内存储的数据是只有两个元素的二元元组

默认分区器:Hash分区规则,也可手动设置分区器:rdd.partitionBy()方法

:不是所有RDD都是KV型

RDD的分区规划:会尽量靠近数据所在的服务器

在初始RDD读取数据规划阶段,分区会尽量规划到存储数据所在服务器,直接读取本地数据,避免从网络读取数据

Spark会在确保并行计算能力的前提下,尽量确保本地读取

RDD创建

  • 通过并行化集合创建(本地对象转化为分布式RDD)
  • 读取外部数据源(读文件)

并行化创建

1
2
3
# arg1: 集合对象,如:list
# arg2:可选,指定分区数量
rdd = SparkContext.parallelize(arg1, arg2)

读取文件

通过textFile API来读取本地或者hdfs的数据

1
2
3
4
# arg1: 文件路径
# arg2:可选,最小分区数量
# 当arg2超出spark允许范围,参数失效
SparkContext.textFile(arg1, arg2)

通过wholeTextFile API来读取小文件,这个api偏向于少量分区读取数据,是pyspark基于小文件的优化

1
2
3
4
# arg1:文件路径
# arg2:可选,最小分区数量
# 当arg2超出spark允许范围,参数失效
SparkContext.wholeTextFiles(arg1, arg2)

RDD算子

方法、函数:本地对象的API

算子:分布式集合对象的API

RDD的算子分为两类:

  • Transformation:转换算子

    返回值仍旧是RDD的算子,构建执行计划

  • Action:行动算子

    返回值不再是RDD,使执行计划开始工作

Transformation算子

map

将RDD的数据一条条处理(处理逻辑基于map算子接收的处理函数),返回新的RDD

1
rdd.map(func)

e.g.

1
2
3
4
5
6
7
8
9
10
11
12
13
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("TEST").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 4, 5, 6], 3)


def add(data):
return data * 10


print(rdd.map(add).collect())
1
[10, 20, 30, 40, 50, 60]

flatMap

对RDD执行map操作,接着进行解除嵌套:

1
2
3
4
# 嵌套
lst = [[1, 2, 3], [4, 5, 6], [7, 8, 9]]
# 解除嵌套
lst = [1, 2, 3, 4, 5, 6, 7, 8, 9]

e.g.

1
2
3
4
5
6
7
8
9
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.parallelize(["hadoop spark hadoop", "spark hadoop hadoop", "hadoop flink spark"])
# 得到所有的单词, 组成RDD, flatMap的传入参数 和map一致, 就是给map逻辑用的, 解除嵌套无需逻辑(传参)
rdd2 = rdd.flatMap(lambda line: line.split(" "))
print(rdd2.collect())
1
['hadoop', 'spark', 'hadoop', 'spark', 'hadoop', 'hadoop', 'hadoop', 'flink', 'spark']

reduceByKey

针对KV型RDD自动按照key进行分组,然后根据提供的聚合逻辑完成组内数据(value)聚合操作

1
2
# 接受两个类型一致的传入参数,返回聚合值
rdd.reduceByKey(func)

e.g.

1
2
3
4
5
6
7
8
9
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('a', 1)])

# reduceByKey 对相同key 的数据执行聚合相加
print(rdd.reduceByKey(lambda a, b: a + b).collect())
1
[('b', 2), ('a', 3)]

mapValues

针对二元元组RDD,对其内部的二元元组value值进行map

1
2
# 传入二元元组的value值,func只对value进行处理
rdd.mapValues(func)

e.g.

1
2
3
4
5
6
7
8
9
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('a', 1)])

# reduceByKey 对相同key 的数据执行聚合相加
print(rdd.mapValues(lambda x: x * 10).collect())
1
[('a', 10), ('a', 10), ('b', 10), ('b', 10), ('a', 10)]

groupBy

将RDD的数据进行分组

1
2
3
# func要求传入一个参数,返回一个值,类型不做要求。相同的返回值将被放入同一个组中。
# 分组完成后,每一个组是一个二元元组,key就是返回值,所有同组数据放入一个迭代器对象中作为value
rdd.groupBy(func)

e.g.

1
2
3
4
5
6
7
8
9
10
11
12
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 2), ('b', 3)])

# 通过groupBy对数据进行分组
# groupBy传入的函数的 意思是: 通过这个函数, 确定按照谁来分组(返回谁即可)
# 分组规则 和SQL是一致的, 也就是相同的在一个组(Hash分组)
result = rdd.groupBy(lambda t: t[0])
print(result.map(lambda t:(t[0], list(t[1]))).collect())
1
[('b', [('b', 1), ('b', 2), ('b', 3)]), ('a', [('a', 1), ('a', 1)])]

Filter

过滤数据

1
2
# func返回值为True的参数保留,False丢弃
rdd.filter(func)

e.g.

1
2
3
4
5
6
7
8
9
10
11
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 4, 5, 6])

# 通过Filter算子, 过滤奇数
result = rdd.filter(lambda x: x % 2 == 1)

print(result.collect())
1
[1, 3, 5]

distinct

对RDD数据进行去重

1
2
# arg:去重分区数量,一般省略
rdd.distinct(arg)

e.g.

1
2
3
4
5
6
7
8
9
10
11
12
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 1, 1, 2, 2, 2, 3, 3, 3])

# distinct 进行RDD数据去重操作
print(rdd.distinct().collect())

rdd2 = sc.parallelize([('a', 1), ('a', 1), ('a', 3)])
print(rdd2.distinct().collect())
1
2
[2, 1, 3]
[('a', 1), ('a', 3)]

union

将两个RDD合并成一个RDD返回

1
2
3
# 仅合并,不会去重
# 可以合并不同类型的RDD
rdd.union(other_rdd)

e.g.

1
2
3
4
5
6
7
8
9
10
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd1 = sc.parallelize([1, 1, 3, 3])
rdd2 = sc.parallelize(["a", "b", "a"])

rdd3 = rdd1.union(rdd2)
print(rdd3.collect())
1
[1, 1, 3, 3, 'a', 'b', 'a']

join

对两个RDD执行JOIN操作(可实现SQL的内/外连接)

1
2
3
4
# jion算子只能用于二元元组
rdd.join(other_rdd) # 内连接
rdd.leftOuterJoin(other_rdd) # 左外连接
rdd.rightOuterJoin(other_rdd) # 右外连接

e.g.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd1 = sc.parallelize([ (1001, "zhangsan"), (1002, "lisi"), (1003, "wangwu"), (1004, "zhaoliu") ])
rdd2 = sc.parallelize([ (1001, "销售部"), (1002, "科技部")])

# 通过join算子来进行rdd之间的关联
# 对于join算子来说 关联条件 按照二元元组的key来进行关联
print(rdd1.join(rdd2).collect())

# 左外连接, 右外连接 可以更换一下rdd的顺序 或者调用rightOuterJoin即可
print(rdd1.leftOuterJoin(rdd2).collect())
1
2
[(1001, ('zhangsan', '销售部')), (1002, ('lisi', '科技部'))]
[(1004, ('zhaoliu', None)), (1001, ('zhangsan', '销售部')), (1002, ('lisi', '科技部')), (1003, ('wangwu', None))]

intersection

求两个RDD的交集,返回一个新RDD

1
rdd.intersection(other_rdd)

e.g.

1
2
3
4
5
6
7
8
9
10
11
12
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd1 = sc.parallelize([('a', 1), ('a', 3)])
rdd2 = sc.parallelize([('a', 1), ('b', 3)])

# 通过intersection算子求RDD之间的交集, 将交集取出 返回新RDD
rdd3 = rdd1.intersection(rdd2)

print(rdd3.collect())
1
[('a', 1)]

glom

将RDD的数据按照分区加上嵌套

例如RDD数据[1,2,3,4,5]有两个分区,经过glom处理后变成:[[1,2,3]],[4,5]]

1
rdd.glom()

e.g.

1
2
3
4
5
6
7
8
9
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 2)

print(rdd.glom().collect())
print(rdd.glom().flatMap(lambda x: x).collect()) # tips:解嵌套
1
2
[[1, 2, 3, 4], [5, 6, 7, 8, 9]]
[1, 2, 3, 4, 5, 6, 7, 8, 9]

groupByKey

针对KV型RDD自动按key分组

1
rdd.groupByKey()

e.g.

1
2
3
4
5
6
7
8
9
10
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([('a', 1), ('a', 1), ('b', 1), ('b', 1), ('b', 1)])

rdd2 = rdd.groupByKey()

print(rdd2.map(lambda x: (x[0], list(x[1]))).collect())
1
[('b', [1, 1, 1]), ('a', [1, 1])]

sortBy

基于指定的排序函数对RDD数据进行排序

1
2
3
# ascending:True-升序,False-降序
# numPartitions:用于排序的分区数量,要进行全局排序,设置为1
rdd.sortBy(func, ascending=False, numPartitions=1)

e.g.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([('c', 3), ('f', 1), ('b', 11), ('c', 3), ('a', 1), ('c', 5), ('e', 1), ('n', 9), ('a', 1)], 3)

# 使用sortBy对rdd执行排序

# 按照value 数字进行排序
# 参数1函数,告知Spark 按照数据的哪个列进行排序
# 参数2: True表示升序 False表示降序
# 参数3: 排序的分区数
"""注意: 如果要全局有序, 排序分区数需设置为1"""
print(rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=1).collect())

# 按照key来进行排序
print(rdd.sortBy(lambda x: x[0], ascending=False, numPartitions=1).collect())
1
2
[('f', 1), ('a', 1), ('e', 1), ('a', 1), ('c', 3), ('c', 3), ('c', 5), ('n', 9), ('b', 11)]
[('n', 9), ('f', 1), ('e', 1), ('c', 3), ('c', 3), ('c', 5), ('b', 11), ('a', 1), ('a', 1)]

sortByKey

针对KV型RDD按照key进行排序

1
2
3
4
# ascending:True-升序,False-降序
# numPartitions:用于排序的分区数量,要进行全局排序,设置为1
# keyfunc:在排序前对key进行的处理
rdd.sortByKey(ascending=True, numPartitions=None, keyfunc=<function RDD.<lambda>>)

e.g.

1
2
3
4
5
6
7
8
9
10
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([('a', 1), ('E', 1), ('C', 1), ('D', 1), ('b', 1), ('g', 1), ('f', 1),
('y', 1), ('u', 1), ('i', 1), ('o', 1), ('p', 1),
('m', 1), ('n', 1), ('j', 1), ('k', 1), ('l', 1)], 3)

print(rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda key: str(key).lower()).collect())
1
[('a', 1), ('b', 1), ('C', 1), ('D', 1), ('E', 1), ('f', 1), ('g', 1), ('i', 1), ('j', 1), ('k', 1), ('l', 1), ('m', 1), ('n', 1), ('o', 1), ('p', 1), ('u', 1), ('y', 1)]

repartition & coalesce

对RDD的分区执行重新分区(仅数量)

:尽量避免使用,影响并行计算性能。在合并到1个分区进行全局排序等场景下使用,尽可能避免增加分区,可能破坏内存迭代的计算管道

1
2
3
4
# n:决定新的分区数量
# coalesce中增加分区必须指定shuffle=True
rdd.repartition(n)
rdd.coalesce(n, shuffle)

e.g.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 4, 5], 3)

# repartition 修改分区
print(rdd.repartition(1).getNumPartitions())

print(rdd.repartition(5).getNumPartitions())

# coalesce 修改分区
print(rdd.coalesce(1).getNumPartitions())

print(rdd.coalesce(5, shuffle=True).getNumPartitions())
1
2
3
4
1
5
1
5

Action算子

countByKey

用于统计key出现的次数(一般适用于KV型RDD)

1
rdd.countByKey()

e.g.

1
2
3
4
5
6
7
8
9
10
11
12
13
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.textFile("hdfs://master:8020/input/words.txt")
rdd2 = rdd.flatMap(lambda x: x.split(" ")).map(lambda x: (x, 1))

# 通过countByKey来对key进行计数, 这是一个Action算子
result = rdd2.countByKey()

print(result)
print(type(result))
1
2
defaultdict(<class 'int'>, {'hadoop': 7, 'spark': 5, 'flink': 3})
<class 'collections.defaultdict'>

collect

将RDD各个分区内的数据统一收集到Driver中,形成一个list对象

1
rdd.collect()

:数据集大小不能超过Driver内存

reduce

对RDD数据集按照func逻辑进行聚合

1
2
# 对func:传入2个参数得到1个返回值,要求返回值和参数的类型保持一致
rdd.reduce(func)

e.g.

1
2
3
4
5
6
7
8
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 4, 5])

print(rdd.reduce(lambda a, b: a + b))
1
15

fold

同reduce,接受传入逻辑进行聚合,但是聚合是带有初始值的。这个初始值的聚合作用在:

  • 分区内聚合
  • 分区间聚合

例如:[[1, 2, 3], [4, 5, 6], [7, 8, 9]]

数据分布在三个分区

分区1:123聚合时带上10作为初始值得到16

分区2:456聚合时带上10作为初始值得到25

分区3:789聚合时带上10作为初始值得到34

最后再做3个分区间的聚合:16+25+34得到85

1
rdd.fold(src, func)

e.g.

1
2
3
4
5
6
7
8
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)

print(rdd.fold(10, lambda a, b: a + b))
1
85

first·take·count·top

first

取出RDD的第一个元素

1
2
>>>sc.parallelize([3, 2, 1]).first()
3

take

取出RDD的前n个元素,组合成list返回

1
2
>>>sc.parallelize([1, 2, 3, 4, 5, 6]).take(5)
[1, 2, 3, 4, 5]

count

计算RDD有多少条数据,返回值是一个数字

1
2
>>>sc.parallelize([3, 2, 1, 4, 5, 6]).count()
6

top

对RDD数据集进行降序排序,取结果前n个

1
2
>>>sc.parallelize([3, 2, 1, 4, 5, 6]).top(3)
[6, 5, 4]

takeSample

随机抽样RDD数据,可用于数据检查

1
2
3
4
# arg1:True表示运行取同一个数据,False表示不允许取同一个数据(和数据内容无关,是否重复表示的是同一个位置的数据)
# arg2:抽样数目
# arg3:可选,随机数种子,随意传进一个数字
takeSample(arg1:True/False, arg2:采样数, arg3:随机数种子)

e.g.

1
2
3
4
5
6
7
8
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 3, 5, 3, 1, 3, 2, 6, 7, 8, 6], 1)

print(rdd.takeSample(False, 5, 1))
1
[2, 7, 6, 6, 3]

takeOrdered

对RDD进行排序,取结果前n个

1
2
3
# arg1:需要几个数据
# arg2:对排序的数据进行更改(不会更改数据本身,仅在排序时使用)
rdd.takeOrdered(arg1, arg2)

e.g.

1
2
3
4
5
6
7
8
9
10
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 1)

print(rdd.takeOrdered(3))

print(rdd.takeOrdered(3, lambda x: -x))
1
2
[1, 2, 3]
[9, 7, 6]

foreach

对RDD的每一个元素执行提供的逻辑操作(同map),无返回值

1
rdd.foreach(func)

e.g.

1
2
3
4
5
6
7
8
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 1)

result = rdd.foreach(lambda x: print(x * 10))
1
2
3
4
5
6
7
10
30
20
40
70
90
60

特性:由Executor直接输出

saveAsTextFile

将RDD数据写入文本文件

支持:本地写出或hdfs等文件系统

1
rdd.saveAsTextFile()

e.g.

1
2
3
4
5
6
7
8
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 3)

rdd.saveAsTextFile("hdfs://master:8020/output/out_test1")
1
2
hadoop fs -ls /output/out_test1
hadoop fs -cat /output/out_test1/*
1
2
3
4
5
Found 4 items
-rw-r--r-- 3 root supergroup 0 2022-12-06 17:44 /output/out_test1/_SUCCESS
-rw-r--r-- 3 root supergroup 4 2022-12-06 17:44 /output/out_test1/part-00000
-rw-r--r-- 3 root supergroup 4 2022-12-06 17:44 /output/out_test1/part-00001
-rw-r--r-- 3 root supergroup 6 2022-12-06 17:44 /output/out_test1/part-00002
1
2
3
4
5
6
7
1
3
2
4
7
9
6

特性:由Executor直接写入文件

mapPartitions

不同于map每次操作一个分区的单一对象,mapPartitions一次操作一整个分区的数据,作为一个迭代器对象传入进来

1
rdd.mapPartitions(func)

e.g.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 3)


def process(iter):
result = list()
for it in iter:
result.append(it * 10)

return result


print(rdd.mapPartitions(process).collect())
1
[10, 20, 70]

foreachPartition

和foreach一致,foreach一条条处理,而foreachPartition一次处理一整个分区的数据,类似于没有返回值的mapPartitions

1
rdd.foreachPartition(func)

e.g.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([1, 3, 2, 4, 7, 9, 6], 3)


def process(iter):
result = list()
for it in iter:
result.append(it * 10)

print(result)


rdd.foreachPartition(process)
1
2
3
4
5
6
7
[20]
[20, 40]
[10]
[10, 30]
[70]
[70, 90]
[70, 90, 60]

partitionBy

对RDD进行自定义分区操作

1
2
3
# arg1:重新分区后的分区数量
# arg2:自定义分区规则,通过函数传入,函数返回值必须位int型(分区编号从0开始,不得超过分区数-1)
rdd.partitionBy(arg1, arg2)

e.g.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd = sc.parallelize([('hadoop', 1), ('spark', 1), ('hello', 1), ('flink', 1), ('hadoop', 1), ('spark', 1)])


# 使用partitionBy 自定义 分区
def process(k):
if 'hadoop' == k or 'hello' == k: return 0
if 'spark' == k: return 1
return 2


print(rdd.partitionBy(3, process).glom().collect())
1
[[('hadoop', 1), ('hello', 1), ('hadoop', 1)], [('spark', 1), ('spark', 1)], [('flink', 1)]]

RDD的数据是过程数据

RDD之间进行相互迭代计算(Transformation的转换),当执行开启以后,新RDD生成,旧RDD消失。所以RDD的数据是过程数据,仅在处理的过程中存在,一旦处理完成便会被释放,旨在最大化的合理利用系统资源

当RDD被释放后需要被重新使用,会从头开始执行

RDD缓存

防止当RDD被释放而又要被重新调用的情况下,避免从头执行代码,使用RDD缓存API

1
2
3
4
5
6
7
8
9
10
rdd.cache()                                  # 缓存到内存中
rdd.persist(StorageLevel.MEMORY_ONLY) # 仅在内存缓存
rdd.persist(StorageLevel.MEMORY_ONLY_2) # 仅在内存缓存,生成2个副本
rdd.persist(StorageLevel.DISK_ONLY) # 仅缓存到硬盘
rdd.persist(StorageLevel.DISK_ONLY_2) # 仅缓存到硬盘,生成2个副本
rdd.persist(StorageLevel.DISK_ONLY_3) # 仅缓存到硬盘,生成3个副本
rdd.persist(StorageLevel.MEMORY_AND_DISK) # 先在内存缓存,内存不够缓存到硬盘
rdd.persist(StorageLevel.MEMORY_AND_DISK_2) # 先在内存缓存,内存不够缓存到硬盘,生成2个副本
rdd.persist(StorageLevel.OFF_HEAP) # 堆外内存
rdd.unpersist() # 主动清理缓存

e.g.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import time
from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

rdd1 = sc.textFile("hdfs://master:8020/input/words.txt")
rdd2 = rdd1.flatMap(lambda x: x.split(" "))
rdd3 = rdd2.map(lambda x: (x, 1))

rdd3.cache()
rdd3.persist(StorageLevel.MEMORY_AND_DISK_2)

rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
print(rdd4.collect())

rdd5 = rdd3.groupByKey()
rdd6 = rdd5.mapValues(lambda x: sum(x))
print(rdd6.collect())

rdd3.unpersist()
time.sleep(100000)
1
2
[('hadoop', 1), ('hello', 3), ('spark', 1), ('flink', 1)]
[('hadoop', 1), ('hello', 3), ('spark', 1), ('flink', 1)]

:缓存分散存储在各Executor所在服务器中。缓存从设计上来说是不安全的,缓存一旦丢失,需要重新计算缓存,必须保留被缓存RDD的前置血缘关系

RDD CheckPoint

用于保存RDD数据,仅支持硬盘存储,且可以写入HDFS(cache不行),从设计上来说是安全的,不保留RDD的前置血缘关系

ChickPoint集中收集各个分区的数据进行存储,而非cache的分散存储

e.g.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import time
from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

# 1. 告知spark, 开启CheckPoint功能
sc.setCheckpointDir("hdfs://master:8020/output/ckp")
rdd1 = sc.textFile("hdfs://master:8020/input/words.txt")
rdd2 = rdd1.flatMap(lambda x: x.split(" "))
rdd3 = rdd2.map(lambda x: (x, 1))

# 调用checkpoint API 保存数据即可
rdd3.checkpoint()

rdd4 = rdd3.reduceByKey(lambda a, b: a + b)
print(rdd4.collect())

rdd5 = rdd3.groupByKey()
rdd6 = rdd5.mapValues(lambda x: sum(x))
print(rdd6.collect())

rdd3.unpersist()
time.sleep(100000)
1
2
[('hadoop', 1), ('hello', 3), ('spark', 1), ('flink', 1)]
[('hadoop', 1), ('hello', 3), ('spark', 1), ('flink', 1)]

Powered by Hexo and Hexo-theme-hiker

Copyright © 2017 - 2024 青域 All Rights Reserved.

UV : | PV :