Spark-Core

广播变量

本地对象被发送到同个Executor内每个分区的处理线程上使用,这样每个分区实际上存放了重复的数据。而Executor本质上是进程,进程内资源共享,没必要将本地对象分发给所有分区,造成内存浪费

解决方案:将本地对象设置为广播变量

1
2
3
4
5
# 1.将本地对象标记为广播变量
broadcast = sc.broadcast(var)
# 2.使用广播变量,从broadcast对象中取出本地对象
value = broadcast.value
# 当传输的是广播对象时,spark会只给每个Executor分发一份数据

当本地集合对象和分布式集合对象(RDD)进行关联时,需要将本地集合对象封装为广播变量

  • 节省网络IO次数
  • 降低Executor内存占用

累加器

当执行累加操作时,各个分区累加自身的内容

1
2
# spark提供累加器变量,参数是初始值
acmlt = sc.accumulator(0)

e.g.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

acmlt = sc.accumulator(0)


def counts(data):
global acmlt
acmlt += 1
return data


rdd1 = sc.parallelize([1, 2, 3], 3)
rdd2 = rdd1.map(counts)
print(rdd2.collect())
print(acmlt)
1
2
[1, 2, 3]
3

:累加器可能因血缘关系导致重复的累加,例如一个RDD被释放后累加已经完成,此时再使用该RDD将会导致重复累加。可通过cache缓存机制来解决

DAG

Spark的核心是根据RDD来实现的,Spark Scheduler(spark任务调度)是spark核心实现的重要一环,其功能是组织处理RDD中每个分区的数据,根据RDD的依赖关系构建DAG有向无环图,再基于DAG划分Stage,将每个Stage中的任务发送到指定节点运行,合理规划资源的利用

DAG标准定义

有向无环图:有方向而没有形成闭环的执行流程图

  • 有向:具有执行方向
  • 无环:没有闭环

Action算子和Job

一个Action会产生一个DAG,如果代码中存在3个Action则会产生3个DAG;

每个DAG在应用程序运行时产生一个Job(应用程序内的子任务)

1个Action = 1个DAG = 1个Job

这样的代码运行起来在spark中被称为Application

DAG和分区

DAG的最终作用是为了构建spark详细执行的物理计划,由于spark是分布式多分区的,所以DAG和分区间也具有关联

DAG的宽窄依赖和阶段划分

在Spark RDD前后之间的血缘关系,分为:

  • 窄依赖:父RDD的一个分区,将全部数据发送给子RDD的一个分区;
  • 宽依赖:父RDD的一个分区,将数据发送给子RDD的多个分区,别名:shuffle

对于spark,会根据DAG,按照宽依赖划分不同的DAG阶段。划分依据:从后向前,每遇到宽依赖就划分出一个阶段,称之为stage。在stage内部,一定是窄依赖

Spark的内存迭代计算

窄依赖同一线程内走管道交互,进入宽依赖走网络IO交互

Spark默认收到全局并行度的限制,除了个别算子有特殊分区的情况,大部分算子都会遵循全局并行度的要求来划分自己的分区数。例如全局并行度是3,大部分算子的默认分区都是3->不建议再独立通过arg来指定分区数

Spark并行度

Spark的并行:在同一时间内,有多少task在同时运行

Spark的并行度:并行能力,当设置为6,即共有6个task在并行运行,RDD的分区被规划为6个分区

Spark并行度设置(优先级由高到低):

  • 代码
  • 客户端参数
  • 配置文件
  • 默认值(1),并不会全部以1来运行,多数情况下基于读取文件的分片数量来作为默认并行度

全局并行度配置参数:

spark.default.parallelism

  • 代码中设置:
1
2
conf = SparkConf()
conf.set("spark.default.parallelism", "100")
  • 客户端提交参数中设置bin/spark-submit --conf "spark.default.parallelism=100"

  • 配置文件conf/spark-defaults.conf中设置spark.default.parallelism 100

:全局并行度是推荐设置,不要针对RDD更改分区,可能会影响内存迭代管道的构建,或者产生额外的shuffle

针对RDD并行度的设置(不推荐):

· repartition算子

· coalesce算子

· partitionBy算子

规划Spark集群并行度

设置为CPU总核心的2~10倍(或更高)*

比如集群可用的CPU核心数量为100个,建议并行度200~1000(确保是CPU核心的整数倍)

  • 设置为最少2倍:

    CPU的一个核心同一时间只能做一件事,当拥有100个核心的情况下,设置100并行度,能利用全部的CPU,但task的压力不均衡,一旦某个task先执行完毕,会导致某个CPU核心的空闲。所以将task并行分配的数量增多,例如设置1000并行度,同一时间内有100个task在运行,900个在等待,但可以确保某个task运行完毕后会不断有task补上,不让CPU处于空闲状态,最大程度利用集群的资源

Spark任务调度

Spark任务由Driver进行调度

包括:

  • 逻辑DAG产生
  • 分区DAG产生
  • 基于分区DAG构建线程task并划分
  • 将task分配给Executor并监控其工作

Spark程序调度流程

  • 构建Driver(Driver
  • 构建SparkContext执行环境入口对象(Driver
  • 基于DAG scheduler调度器构建逻辑task分配(Driver
  • 基于task scheduler调度器将逻辑task分配到各个Executor上执行并监控(Driver
  • Worker(Executor)被task scheduler管理监控,遵从指令干活并汇报进度(Worker

Driver内部组件

  • DAG调度器

    将逻辑DAG进行处理,最终得到逻辑上的task划分

  • Task调度器

    基于DAG调度器的产出,来规划这些逻辑的task应该在哪些物理的Executor上运行,以及监控它们

层级关系梳理

  • 1个spark环境可运行多个Application;
  • 1个代码成功运行生成一个Application;
  • 1个Application内部有多个Job;
  • 1个Action算子产生1个Job,每个Job有自己的DAG执行图;
  • 1个Job的DAG基于宽窄依赖划分不同的阶段;
  • 1个阶段里基于分区数量形成多个并行的内存迭代管道;
  • 1个内存迭代管道形成1个task(DAG调度器划分将Job内划分出具体的task任务,1个Job被划分出的task在逻辑上被称为这个job的taskset)

Powered by Hexo and Hexo-theme-hiker

Copyright © 2017 - 2024 青域 All Rights Reserved.

UV : | PV :