Pyspark

Pyspark库安装

(本文基于上文spark基础)

Python库安装

在三台机器分别安装pyspark

1
2
conda activate pyspark
pip install pyspark -i https://mirror.baidu.com/pypi/simple

Windows补丁

将hadoop.dll置于C:/windows/system32/目录下,然后配置hadoop工具包的环境变量

安装相关python库

1
pip install pyspark pyhive pymysql jieba -i https://mirror.baidu.com/pypi/simple

为pycharm添加ssh解释器环境

SparkContext对象

Spark Application程序的入口为SparkContext。任何一个spark应用都要先构建SparkContext对象:

  • 创建SparkConf对象
  • 基于SparkConf创建SparkContext
1
2
conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)

WorldCount测试程序

Windows

在pycharm中新建python脚本,通过解释器执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# coding:utf8
from pyspark import SparkConf, SparkContext
# 提交到yarn集群执行时,需配置环境变量
# import os
# os.environ["HADOOP_CONF_DIR"] = "/usr/local/hadoop/etc/hadoop"
# os.environ["YARN_CONF_DIR"] = "/usr/local/hadoop/etc/hadoop"

if __name__ == '__main__':
# 通过脚本执行时无需在代码中指定Master
# conf = SparkConf().setAppName("WordCountHelloWorld")
# 直接在pycharm执行
conf = SparkConf().setAppName("WordCountHelloWorld").setMaster("local[*]")
# 通过SparkConf对象构建SparkContext对象
sc = SparkContext(conf=conf)

# 需求 : wordcount单词计数, 读取HDFS上的words.txt文件, 对其内部的单词统计出现 的数量
# 读取hdfs文件
file_rdd = sc.textFile("hdfs://master:8020/input/words.txt")
# 读取本地文件
# file_rdd = sc.textFile("/usr/local/words.txt")

# 将单词进行切割, 得到一个存储全部单词的集合对象
words_rdd = file_rdd.flatMap(lambda line: line.split(" "))

# 将单词转换为元组对象, key是单词, value是数字1
words_with_one_rdd = words_rdd.map(lambda x: (x, 1))

# 将元组的value 按照key来分组, 对所有的value执行聚合操作(相加)
result_rdd = words_with_one_rdd.reduceByKey(lambda a, b: a + b)

# 通过collect方法收集RDD的数据打印输出结果
print(result_rdd.collect())

CentOS

在根目录创建一份py脚本,通过spark客户端执行

1
2
/usr/local/spark/bin/spark-submit --master local[*] /root/helloworld.py
/usr/local/spark/bin/spark-submit --master yarn /root/helloworld.py

基本原理

Code

构建SparkContext对象等非任务处理由Driver执行,RDD数据任务处理由Executor执行,再由Driver处理分布式计算结果

Master Node

spark自身的JVM框架JVM Driver和JVM Executor之间可以相互通讯,Python通过构建SparkContext对象与JVM Driver进行连接(Python的Driver代码翻译成JVM代码-py4j库,变成JVM Driver)

Worker Node

Driver的操作指令发送给JVM Executor(RPC),JVM Executor再通过pyspark守护进程将指令发送给pyspark守护进程,pyspark守护进程将指令调度到运行的python进程中去。Executor端本质上是由python进程再工作

Driver段是直接由py4j直接翻译过去,Executor端则是转发

Powered by Hexo and Hexo-theme-hiker

Copyright © 2017 - 2024 青域 All Rights Reserved.

UV : | PV :