Spark基础

定义Apache Spark是用于大规模数据处理的统一分析引擎。其核心数据结构:弹性分布式数据集(RDD)能够在大规模集群中做内存运算,且具有一定容错方式。

Spark框架

组成

· Spark Core:以RDD为数据抽象,提供Python、Java、Scala、R语言的API和Spark的核心功能,是Spark运行的基础;

· SparkSQL:基于SparkCore,提供机构化数据处理模块,支持以SQL语言对数据进行处理;同时可以作为StructuredStreaming模块的基础,进行数据流式计算;

· SparkStreaming:基于SparkCore,提供数据流式计算;

· MLlib:基于SparkCore,内置大量机器学习库和算法API,进行机器学习计算;

· GraphX:基于SparkCore,提供了大量图计算API,用于分布式图计算

运行模式

· 本地模式(单机/local):以一个独立进程,通过内部的多线程模拟Spark运行环境

· Standalone模式(集群):Spark的各个角色以独立进程形式存在,组成集群环境

· Hadoop YARN模式(集群):Spark的各个角色运行在YARN容器内部,组成集群环境

· Kubernetes模式(容器集群):Spark的各个角色运行在Kubernetes容器内部,组成集群环境

· 云服务模式

角色

  • 资源

· Master角色:集群资源管理

· Worker角色:单机资源管理(所在服务器资源管理)

  • 任务

· Driver角色:单个任务管理

· Executor角色:单个任务计算

环境搭建

Hadoop伪分布式搭建

准备master虚拟机

  • 配置静态ip
1
vim /etc/sysconfig/network-scripts/ifcfg-ens33
1
2
3
4
5
6
7
8
9
10
11
#修改
BOOTPROTO="static"
ONBOOT="yes"

# 新增
IPADDR="192.168.80.129"
GATEWAY="192.168.80.2"
NETMASK="255.255.255.0"
DNS1="8.8.8.8"
DNS2="114.114.114.114"
IPV6_PRIVACY="no"
  • 重启网卡服务
1
service network restart
  • 关闭防火墙,并禁止防火墙开机自启
1
2
systemctl stop firewalld
systemctl disable firewalld
  • 关闭selinux
1
vim /etc/selinux/config
1
2
3
4
5
6
7
8
9
10
11
# This file controls the state of SELinux on the system.
# SELINUX= can take one of these three values:
# enforcing - SELinux security policy is enforced.
# permissive - SELinux prints warnings instead of enforcing.
# disabled - No SELinux policy is loaded.
SELINUX=disabled
# SELINUXTYPE= can take one of three values:
# targeted - Targeted processes are protected,
# minimum - Modification of targeted policy. Only selected processes are protected.
# mls - Multi Level Security protection.
SELINUXTYPE=targeted
  • 重启master
1
reboot

重启完成后将相关软件包(jdk、hadoop、spark等)导入至/usr/local目录下并解压

  • 解压jdk
1
tar -zxvf jdk-8u211-linux-x64.tar.gz -C /usr/local

配置jdk环境变量

1
vim /etc/profile
1
2
3
4
5
#set java environment 
JAVA_HOME=/usr/local/jdk1.8.0_211
CLASSPATH=.:$JAVA_HOME/lib
PATH=$JAVA_HOME/bin:$PATH
export JAVA_HOME CLASSPATH PATH

重启环境变量

1
source /etc/profile

检查安装

1
java -version
1
2
3
java version "1.8.0_211"
Java(TM) SE Runtime Environment (build 1.8.0_211-b12)
Java HotSpot(TM) 64-Bit Server VM (build 25.211-b12, mixed mode)

配置linux集群

将master完整克隆两台node1、node2机器,配置静态ip

配置主机名,分别执行:

1
vim /etc/hostname

将三台机器的主机名设置为master、node1、node2

配置三台虚拟机的域名映射

1
vim /etc/hosts
1
2
3
4
5
6
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6

192.168.80.129 master
192.168.80.130 node1
192.168.80.131 node2

此时三台机器已经可以互相ping通

1
2
3
ping master
ping node1
ping node2

分别重启三台机器

生成三台机器的公钥和私钥,分别执行:

1
ssh-keygen -t rsa

反复回车,在/root/.ssh隐藏目录下生成私钥id_rsa和公钥id_rsa.pub

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Generating public/private rsa key pair.
Enter file in which to save the key (/root/.ssh/id_rsa):
Created directory '/root/.ssh'.
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /root/.ssh/id_rsa.
Your public key has been saved in /root/.ssh/id_rsa.pub.
The key fingerprint is:
SHA256:/Vd3sEsHhz3CcxGwbRp5lj6+srZ7OpwAvyKS+qoTi9Q root@master
The key's randomart image is:
+---[RSA 2048]----+
| ..o.|
| . +oo|
| B+Oo|
| o @=.|
| . S + .oo=|
| o E + ..++|
|o o . = +. |
|.o o . . . B ..|
| .oo+.. . . .=O. |
+----[SHA256]-----+

在三台虚拟机执行命令将公钥拷贝到master

1
ssh-copy-id master
1
2
3
4
5
6
7
8
9
10
11
12
The authenticity of host 'master (192.168.80.129)' can't be established.
ECDSA key fingerprint is SHA256:9UGNQgh5SWXh/1Z9iWTOzBSbqXf8kjbxc5SC73j9ct4.
ECDSA key fingerprint is MD5:ee:b1:5d:3c:a5:2b:2e:08:cd:85:44:68:fe:c7:29:d9.
Are you sure you want to continue connecting (yes/no)? yes
/usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed
/usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys
root@master's password:

Number of key(s) added: 1

Now try logging into the machine, with: "ssh 'master'"
and check to make sure that only the key(s) you wanted were added.

将master的公钥拷贝到node上

1
2
scp /root/.ssh/authorized_keys node1:/root/.ssh
scp /root/.ssh/authorized_keys node2:/root/.ssh
1
2
3
4
5
6
7
The authenticity of host 'node1 (192.168.80.130)' can't be established.
ECDSA key fingerprint is SHA256:9UGNQgh5SWXh/1Z9iWTOzBSbqXf8kjbxc5SC73j9ct4.
ECDSA key fingerprint is MD5:ee:b1:5d:3c:a5:2b:2e:08:cd:85:44:68:fe:c7:29:d9.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added 'node1,192.168.80.130' (ECDSA) to the list of known hosts.
root@node1's password:
authorized_keys 100% 1177 1.2MB/s 00:00

测试ssh免密登录

1
2
3
ssh node1
ssh node2
exit

安装hadoop

解压安装包并重命名

1
2
tar -zxvf hadoop-3.3.1.tar.gz -C /usr/local
mv hadoop-3.3.1 hadoop

修改配置文件

1
cd /usr/local/hadoop/etc/hadoop

hadoop-env.sh

1
vim hadoop-env.sh
1
2
3
4
5
6
export JAVA_HOME=/usr/local/jdk1.8.0_211
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root

core-site.xml

1
vim core-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
<configuration>
<!-- 设置默认使用的文件系统 Hadoop支持file、HDFS、GFS、ali|Amazon云等文件系统 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://master:8020</value>
</property>

<!-- 设置Hadoop本地保存数据路径 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/usr/local/data/hadoop</value>
</property>

<!-- 设置HDFS web UI用户身份 -->
<property>
<name>hadoop.http.staticuser.user</name>
<value>root</value>
</property>

<!-- 整合hive 用户代理设置 -->
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>

<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
</configuration>

hdfs-site.xml

1
vim hdfs-site.xml
1
2
3
4
5
6
7
<configuration>
<!-- 设置SNN进程运行机器位置信息 -->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>node1:9868</value>
</property>
</configuration>

mapred-site.xml

1
vim mapred-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<configuration>
<!-- 设置MR程序默认运行模式: yarn集群模式 local本地模式 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>

<!-- MR程序历史服务器端地址 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>master:10020</value>
</property>

<!-- 历史服务器web端地址 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>master:19888</value>
</property>

<property>
<name>yarn.app.mapreduce.am.env</name>
<value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
</property>

<property>
<name>mapreduce.map.env</name>
<value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
</property>

<property>
<name>mapreduce.reduce.env</name>
<value>HADOOP_MAPRED_HOME=${HADOOP_HOME}</value>
</property>
</configuration>

yarn-site.xml

1
vim yarn-site.xml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
<configuration>
<!-- 设置YARN集群主角色运行机器位置 -->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>master</value>
</property>

<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

<!-- 是否将对容器实施物理内存限制 -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>

<!-- 是否将对容器实施虚拟内存限制。 -->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

<!-- 开启日志聚集 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

<!-- 设置yarn历史服务器地址 -->
<property>
<name>yarn.log.server.url</name>
<value>http://master:19888/jobhistory/logs</value>
</property>

<!-- 保存的时间7天 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
</configuration>

workers

1
vim workers
1
2
3
master
node1
node2

slaves

1
vim workers
1
2
3
master
node1
node2

将安装包分发至其他机器

1
2
3
cd /usr/local
scp -r hadoop root@node1:/usr/local
scp -r hadoop root@node2:/usr/local

配置hadoop环境变量,分发至其他机器

1
vim /etc/profile
1
2
3
# set hadoop env
export HADOOP_HOME=/usr/local/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
1
2
scp /etc/profile node1:/etc/
scp /etc/profile node2:/etc/

在三台机器分别重启环境变量

1
source /etc/profile

首次启动,先格式化namenode

1
./bin hdfs namenode -format

启动

1
2
3
start-dfs.sh
start-yarn.sh
mapred --daemon start historyserver

或者

1
2
start-all.sh
mapred --daemon start historyserver

查看任务

1
jps
1
2
3
4
5
6
61488 Jps
60417 NameNode
60597 DataNode
60981 ResourceManager
61133 NodeManager
61567 JobHistoryServer

查看web页面(hadoop3.0版本以后web端口跟改为9870)

master:9870

关闭任务

1
2
3
stop-dfs.sh
stop-yarn.sh
mapred --daemon stop historyserver

或者

1
2
stop-all.sh
mapred --daemon stop historyserver

Anaconda3安装

1
sh ./Anaconda3-2021.05-Linux-x86_64.sh

配置清华源

1
vim ~/.condarc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
channels:
- defaults
show_channel_urls: true
default_channels:
- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/main
- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/r
- https://mirrors.tuna.tsinghua.edu.cn/anaconda/pkgs/msys2
custom_channels:
conda-forge: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
msys2: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
bioconda: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
menpo: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
pytorch: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud
simpleitk: https://mirrors.tuna.tsinghua.edu.cn/anaconda/cloud

创建pyspark虚拟环境

1
conda create -n pyspark python=3.8

Spark local模式搭建

安装

解压spark安装包并重命名

1
2
tar -zxvf spark-3.2.0-bin-hadoop3.2.tgz -C /usr/local
mv spark-3.2.0-bin-hadoop3.2 spark

配置环境变量

1
vim /etc/profile
1
2
3
export SPARK_HOME=/usr/local/spark
export PYSPARK_PYTHON=/usr/local/anaconda3/envs/pyspark/bin/python3.8
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
1
source /etc/profile

:查找pyspark虚拟环境位置:

1
cd /usr/local/anaconda3/envs/pyspark/bin
1
vim ~/.bashrc
1
2
export JAVA_HOME=/usr/local/jdk1.8.0_211
export PYSPARK_PYTHON=/usr/local/anaconda3/envs/pyspark/bin/python3.8

启动pyspark交互式解释器

1
./bin/pyspark
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Python 3.8.8 (default, Apr 13 2021, 19:58:26) 
[GCC 7.3.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/12/02 17:38:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.2.0
/_/

Using Python version 3.8.8 (default, Apr 13 2021 19:58:26)
Spark context Web UI available at http://master:4040
Spark context available as 'sc' (master = local[*], app id = local-1669973937905).
SparkSession available as 'spark'.
>>>

进入浏览器任务页面master:4040,可以查看信息

执行一条pyspark指令后:

Spark StandAlone模式搭建

StandAlone

StandAlone模式是Spark自带的集群模式,Master角色以Master进程形式存在,Worker角色以Worker进程形式存在。其中Driver角色运行在Master进程内,Executor角色运行在Worker进程内。此外,还可以开启第三个进程:历史服务器(HistoryServer),用于保存Spark app运行后的事件日志。

环境分发

1
2
scp Anaconda3-2021.05-Linux-x86_64.sh node1:`pwd`/
scp Anaconda3-2021.05-Linux-x86_64.sh node2:`pwd`/

进入node1、node2安装anaconda,同master:配置conda源,创建虚拟环境

将master:/etc/profile和bashrc中的环境变量复制到node1、2中

1
2
3
4
scp /etc/profile node1:/etc/profile
scp /etc/profile node2:/etc/profile
scp ~/.bashrc node1:~/
scp ~/.bashrc node2:~/

创建hadoop用户(仅有root用户可跳过)

hadoop用户拥有yarn的最高权限

新建用户

1
2
sudo adduser hadoop
passwd hadoop

添加用户组

1
sudo usermod -a -G hadoop hadoop

赋予root权限

1
vim /etc/sudoers
1
2
3
## Allow root to run any commands anywhere 
root ALL=(ALL) ALL
hadoop ALL=(ALL) ALL

添加权限

1
2
3
cd /usr/local/
chown -R hadoop:hadoop hadoop*
chown -R hadoop:hadoop spark*

配置spark配置文件

1
2
3
4
su - hadoop
cd /usr/local/spark/conf
mv workers.template workers
vim workers
1
2
3
master
node1
node2
1
2
mv spark-env.sh.template spark-env.sh
vim spark-env.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
## 设置JAVA安装目录
JAVA_HOME=/usr/local/jdk1.8.0_211

## HADOOP软件配置文件目录,读取HDFS上文件和运行YARN集群
HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
YARN_CONF_DIR=/usr/local/hadoop/etc/hadoop

## 指定spark master的IP和提交任务的通信端口
# 告知spark的master运行在哪个机器上
export SPARK_MASTER_HOST=master
# 告知sparkmaster的通讯端口
export SPARK_MASTER_PORT=7077
# 告知spark master的 webui端口
SPARK_MASTER_WEBUI_PORT=8080

# worker cpu可用核数
SPARK_WORKER_CORES=1
# worker可用内存
SPARK_WORKER_MEMORY=1g
# worker的工作通讯地址
SPARK_WORKER_PORT=7078
# worker的 webui地址
SPARK_WORKER_WEBUI_PORT=8081

## 设置历史服务器
# 配置的意思是 将spark程序运行的历史日志 存到hdfs的/sparklog文件夹中
SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://master:8020/sparklog/ -Dspark.history.fs.cleaner.enabled=true"

启动hadoop

1
hadoop fs -ls /

此时没有sparklog文件。创建sparklog文件,赋予权限

1
2
hadoop fs -mkdir /sparklog
hadoop fs -chmod 777 /sparklog

继续配置

1
2
mv spark-defaults.conf.template spark-defaults.conf
vim spark-defaults.conf
1
2
3
4
5
6
# 开启spark的日期记录功能
spark.eventLog.enabled true
# 设置spark日志记录的路径
spark.eventLog.dir hdfs://master:8020/sparklog/
# 设置spark日志是否启动压缩
spark.eventLog.compress true
1
2
mv log4j.properties.template log4j.properties
vim log4j.properties

将INFO改为WARN,减少冗余日志

1
2
# Set everything to be logged to the console
log4j.rootCategory=WARN, console

分发spark配置文件

1
2
3
cd /usr/local
scp -r spark node1:`pwd`/
scp -r spark node2:`pwd`/

启动spark集群

启动历史服务器进程

1
2
3
cd spark
sbin/start-history-server.sh
jps
1
2
3
4
5
6
7
49297 JobHistoryServer
49778 HistoryServer
48296 DataNode
49835 Jps
48734 ResourceManager
48910 NodeManager
48127 NameNode

启动集群

1
2
sbin/start-all.sh
jps
1
2
3
4
5
6
7
8
9
49297 JobHistoryServer
50162 Jps
50020 Master
48296 DataNode
50107 Worker
48734 ResourceManager
48910 NodeManager
48127 NameNode
49903 HistoryServer

进入master:8080web端口可以看到spark集群界面

StandAlone集群测试

1
2
cd /usr/local/spark/bin
./pyspark --master spark://master:7077
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Python 3.8.15 (default, Nov 24 2022, 15:19:38) 
[GCC 11.2.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
22/12/05 11:54:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.2.0
/_/

Using Python version 3.8.15 (default, Nov 24 2022 15:19:38)
Spark context Web UI available at http://master:4040
Spark context available as 'sc' (master = spark://master:7077, app id = app-20221205115438-0000).
SparkSession available as 'spark'.
>>>

在/usr/local下创建一个words.txt文件,写入

1
2
3
hadoop spark flink
hadoop spark flink hadoop hadoop
hadoop spark flink hadoop hadoop spark spark

创建input文件夹,将文件上传至hdfs

1
2
3
4
hadoop fs -mkdir /input/
hdfs dfs -put /usr/local/words.txt /input/
hadoop fs -ls /input
hadoop fs -cat /input/words.txt

执行:

1
sc.textFile("hdfs://master:8020/input/words.txt").flatMap(lambda line: line.split(" ")).map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b).collect()
1
[('hadoop', 7), ('spark', 5), ('flink', 3)]

Spark on YARN

  • 将spark部署到yarn集群中可以提高对资源的利用率,无需部署spark集群,只需要一台充当spark客户端的服务器即可提交任务到yarn集群运行

  • Master角色由yarn的ResourceManager担任

  • Worker角色由yarn的NodeManager担任
  • Driver角色运行在yarn容器内或提交任务的客户端进程中
  • Executor运行在yarn提供的容器内

让spark计算任务运行在yarn容器内部,资源管理交友yarn的ResourceManager和NodeManager代替

启动Spark on YARN

1
2
3
cd /usr/local/spark
./sbin/stop-all.sh #关闭standalone集群
bin/pyspark --master yarn
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Python 3.8.15 (default, Nov 24 2022, 15:19:38) 
[GCC 11.2.0] :: Anaconda, Inc. on linux
Type "help", "copyright", "credits" or "license" for more information.
22/12/05 15:27:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/12/05 15:27:49 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.2.0
/_/

Using Python version 3.8.15 (default, Nov 24 2022 15:19:38)
Spark context Web UI available at http://master:4040
Spark context available as 'sc' (master = yarn, app id = application_1670210110128_0001).
SparkSession available as 'spark'.
>>> sc.parallelize([1,2,3,4,5]).map(lambda x:x*10).collect()
[10, 20, 30, 40, 50]
>>>

执行程序测试,或通过spark客户端spark-submit提交代码,spark‘算法会运行在容器中

1
./bin/spark-submit --master yarn /usr/local/spark/examples/src/main/python/pi.py 100

Spark on YARN部署模式

  • Cluster(集群模式)

    Driver运行在yarn容器内部,和ApplicationMaster在同一个容器内

  • Client(客户端模式)

    Driver运行在客户端进程中,例如Driver运行在spark-submit客户端的进程中

其中集群模式在容器内进行通讯,效率高,但是日志同样存放于容器内部

Client

1
bin/spark-submit --master yarn --deploy-mode client /usr/local/spark/examples/src/main/python/pi.py 100

Cluster

1
bin/spark-submit --master yarn --deploy-mode cluster /usr/local/spark/examples/src/main/python/pi.py 100

Powered by Hexo and Hexo-theme-hiker

Copyright © 2017 - 2024 青域 All Rights Reserved.

UV : | PV :