Spark-SQL

SparkSQL

SparkSQL是spark的一个用于处理海量结构化数据的模块

  • 支持SQL语言
  • 自动优化
  • 性能强
  • 兼容HIVE
  • API流程简单
  • 支持标准化JDBC和ODBC连接

SparkSQL数据抽象

  • Pandas · DataFrame

    · 二维表数据结构

    · 单机(本地)集合

  • SparkCore · RDD

    · 无标准数据结构

    · 分布式(分区)集合

  • SparkSQL · DataFrame

    · 二维表数据结构

    · 分布式(分区)集合

SparkSession对象

RDD程序的执行入口对象:SparkContext

在Spark2.0以后,推出了SparkSession对象,来作为Spark编码的统一入口对象。SparkSession

  • 用于SparkSQL编程,作为入口对象
  • 用于SparkCore编程,通过SparkSession对象获取SparkContext

构建SparkSession对象:

1
2
3
4
5
6
7
from pyspark.sql import SparkSession

# 通过builder方法来构建SparkSession对象
# appName:设置程序名称
# config:配置常用属性
# getOrCreate:完成创建SparkSession对象
spark = SparkSession.builder.appName("test").master("local[*]").config("spark.sql.shuffle.partitions", "4").getOrCreate()

通过SparkSesion对象获取SparkContext对象:

1
2
3
4
5
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("test").master("local[*]").config("spark.sql.shuffle.partitions", "4").getOrCreate()

sc = spark.sparkContext

e.g.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("test").master("local[*]").config("spark.sql.shuffle.partitions", "4").getOrCreate()

sc = spark.sparkContext

df1 = spark.read.csv("hdfs://master:8020/input/pip_file.txt", sep=',', header=False)
df2 = df1.toDF("id", "package", "version")
df2.printSchema() # 表结构
df2.show() # 展示表
df2.createTempView("pip") # 创建"pip"表,保存在内存中
# SQL风格
spark.sql("""
SELECT * FROM pip WHERE package='pyspark'
""").show
# DSL风格
df2.where("package='pyspark'").limit(5).show()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
root
|-- id: string (nullable = true)
|-- package: string (nullable = true)
|-- version: string (nullable = true)

+---+-------------+--------+
| id| package| version|
+---+-------------+--------+
| 1| flask| 1.1.4|
| 2| fastapi| 0.78.0|
| 3| h5py| 2.10.0|
| 4| keras| 2.6.0|
| 5| jieba| 0.42.1|
| 6| numpy| 1.23.3|
| 7|opencv-python|4.6.0.66|
| 8| pandas| 1.4.3|
| 9| pillow| 9.2.0|
| 10| py4j|0.10.9.5|
| 11| pyspark| 3.3.1|
| 12| sklearn| 0.0|
| 13| tensorfolw| 2.6.2|
| 14| requests| 2.28.1|
| 15| redis| 3.5.3|
+---+-------------+--------+

+---+-------+-------+
| id|package|version|
+---+-------+-------+
| 11|pyspark| 3.3.1|
+---+-------+-------+

+---+-------+-------+
| id|package|version|
+---+-------+-------+
| 11|pyspark| 3.3.1|
+---+-------+-------+

DataFrame

DataFrame的组成

DataFrame是一个二维表结构,在结构层面:

  • StructureType对象描述整个DataFrame的表结构
  • StructField对象描述一个列的信息

在数据层面:

  • Row对象记录一行数据
  • Column对象记录一列数据并包含列的信息

DataFrame的构建

RDD转换

DataFrame对象可以从RDD转换而来

1
df = spark.createDataFrame(rdd, schema=['id', 'package'])

e.g.

1
2
3
4
5
6
7
8
9
10
11
12
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext

rdd = sc.textFile("hdfs://master:8020/input/pip_file.txt").map(lambda x: x.split(",")).map(lambda x:(int(x[0]), x[1]))

df = spark.createDataFrame(rdd, schema=['id', 'package'])
df.printSchema()
df.show(10, False) # 输出前十行数据,要全部显示默认设置为True
df.createOrReplaceTempView("pip")
spark.sql("SELECT * FROM pip WHERE id < 5").show()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
root
|-- id: long (nullable = true)
|-- package: string (nullable = true)

+---+-------------+
|id |package |
+---+-------------+
|1 |flask |
|2 |fastapi |
|3 |h5py |
|4 |keras |
|5 |jieba |
|6 |numpy |
|7 |opencv-python|
|8 |pandas |
|9 |pillow |
|10 |py4j |
+---+-------------+
only showing top 10 rows

+---+-------+
| id|package|
+---+-------+
| 1| flask|
| 2|fastapi|
| 3| h5py|
| 4| keras|
+---+-------+

StructType

通过StructType对象来定义DataFrame的表结构,转换RDD

1
2
3
4
5
# 导入StructType对象和类型
from pyspark.sql.types import StructType, StringType, IntegerType

# 定义表结构
schema = StructType().add("id", IntegerType(), nullable=True).add("package", StringType(), nullable=True)

e.g.

1
2
3
4
5
6
7
8
9
10
11
12
13
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType

spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext

rdd = sc.textFile("hdfs://master:8020/input/pip_file.txt").map(lambda x: x.split(",")).map(lambda x:(int(x[0]), x[1]))

schema = StructType().add("id", IntegerType(), nullable=True).add("package", StringType(), nullable=True)

df = spark.createDataFrame(rdd, schema=schema)
df.printSchema()
df.show()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
root
|-- id: integer (nullable = true)
|-- package: string (nullable = true)

+---+-------------+
| id| package|
+---+-------------+
| 1| flask|
| 2| fastapi|
| 3| h5py|
| 4| keras|
| 5| jieba|
| 6| numpy|
| 7|opencv-python|
| 8| pandas|
| 9| pillow|
| 10| py4j|
| 11| pyspark|
| 12| sklearn|
| 13| tensorfolw|
| 14| requests|
| 15| redis|
+---+-------------+

toDF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType

spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext

rdd = sc.textFile("hdfs://master:8020/input/pip_file.txt").map(lambda x: x.split(",")).map(lambda x:(int(x[0]), x[1]))

# toDF快速构建DataFrame
# 对列类型不敏感,默认string类型
df1 = rdd.toDF(["id", "package"])
df1.printSchema()
df1.show(5, False)

# 设置schema通过toDF构建DataFrame
schema = StructType().add("id", IntegerType(), nullable=True).add("package", StringType(), nullable=True)
df2 = rdd.toDF(schema=schema)
df2.printSchema()
df2.show(5, False)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
root
|-- id: long (nullable = true)
|-- package: string (nullable = true)

+---+-------+
|id |package|
+---+-------+
|1 |flask |
|2 |fastapi|
|3 |h5py |
|4 |keras |
|5 |jieba |
+---+-------+
only showing top 5 rows

root
|-- id: integer (nullable = true)
|-- package: string (nullable = true)

+---+-------+
|id |package|
+---+-------+
|1 |flask |
|2 |fastapi|
|3 |h5py |
|4 |keras |
|5 |jieba |
+---+-------+
only showing top 5 rows

Pandas

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import pandas as pd
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext

pddf = pd.DataFrame(
{
"id": [1, 2, 3],
"package": ["flask", "fastapi", "h5py"],
"version": ["1.1.4", "0.78.0", "2.10.0"]
}
)

df = spark.createDataFrame(pddf)
df.printSchema()
df.show()
1
2
3
4
5
6
7
8
9
10
11
12
root
|-- id: long (nullable = true)
|-- package: string (nullable = true)
|-- version: string (nullable = true)

+---+-------+-------+
| id|package|version|
+---+-------+-------+
| 1| flask| 1.1.4|
| 2|fastapi| 0.78.0|
| 3| h5py| 2.10.0|
+---+-------+-------+

通过文件构建DataFrame

text

1
sparksession.read.format("text|csv|json|jdbc|...").option("K", "V").schema(StructType|String).load("文件路径,支持本地文件系统和HDFS")

e.g.

1
2
3
4
5
6
7
8
9
10
11
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType

spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext

# 构建StructType,text数据源,读取数据的特点是将一整行当作一列来读取,默认列名为value,类型为string
schema = StructType().add("data", StringType(), nullable=True)
df = spark.read.format("text").schema(schema=schema).load("hdfs://master:8020/input/pip_file.txt")
df.printSchema()
df.show()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
root
|-- data: string (nullable = true)

+--------------------+
| data|
+--------------------+
| 1,flask,1.1.4|
| 2,fastapi,0.78.0|
| 3,h5py,2.10.0|
| 4,keras,2.6.0|
| 5,jieba,0.42.1|
| 6,numpy,1.23.3|
|7,opencv-python,4...|
| 8,pandas,1.4.3|
| 9,pillow,9.2.0|
| 10,py4j,0.10.9.5|
| 11,pyspark,3.3.1|
| 12,sklearn,0.0|
| 13,tensorfolw,2.6.2|
| 14,requests,2.28.1|
| 15,redis,3.5.3|
+--------------------+

json

json文件自带一定数据结构

1
2
3
4
5
6
7
8
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext

df = spark.read.format("json").load("hdfs://master:8020/input/pip_file.json")
df.printSchema()
df.show()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
root
|-- id: long (nullable = true)
|-- package: string (nullable = true)
|-- version: string (nullable = true)

+---+-------+-------+
| id|package|version|
+---+-------+-------+
| 1| flask| 1.1.4|
| 2|fastapi| 0.78.0|
| 3| h5py| 2.10.0|
| 4| keras| 2.6.0|
| 5| jieba| 0.42.1|
+---+-------+-------+

csv

1
2
3
4
5
6
7
8
9
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext

# 通过.option指定属性
df = spark.read.format("csv").option("sep", ";").option("header", True).option("encoding", "utf-8").schema("id INT, package STRING, version STRING").load("hdfs://master:8020/input/pip_file.csv")
df.printSchema()
df.show()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
root
|-- id: integer (nullable = true)
|-- package: string (nullable = true)
|-- version: string (nullable = true)

+---+-------------+--------+
| id| package| version|
+---+-------------+--------+
| 1| flask| 1.1.4|
| 2| fastapi| 0.78.0|
| 3| h5py| 2.10.0|
| 4| keras| 2.6.0|
| 5| jieba| 0.42.1|
| 6| numpy| 1.23.3|
| 7|opencv-python|4.6.0.66|
| 8| pandas| 1.4.3|
| 9| pillow| 9.2.0|
| 10| py4j|0.10.9.5|
| 11| pyspark| 3.3.1|
| 12| sklearn| 0.0|
| 13| tensorfolw| 2.6.2|
| 14| requests| 2.28.1|
| 15| redis| 3.5.3|
+---+-------------+--------+

parquet

parquet是spark中常用的一种列示存储文件格式

  • 内置schema(列名,列类型,是否为空)
  • 存储是以列作为存储格式
  • 存储是序列化存储在文件中(有压缩属性体积小)
1
2
3
4
5
6
7
8
9
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext

# 通过.option指定属性
df = spark.read.format("parquet").load("hdfs://master:8020/input/suers.parquet")
df.printSchema()
df.show()
1
2
3
4
5
6
7
8
9
10
11
12
root
|-- name: string (nullable = true)
|-- favorite_color: string (nullable = true)
|-- favorite_numbers: array (nullable = true)
| |-- element: integer (containsNull = true)

+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+

DataFrame编程

DataFrame支持两种编程风格

  • DSL风格

    被称为领域特定语言,是DataFrame的特有API

  • SQL风格

    使用SQL语句来直接处理DataFrame

DSL风格

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext

df = spark.read.format("csv").schema("id INT, package STRING, version STRING").load("hdfs://master:8020/input/pip_file.txt")

# 获取Colum对象
id_colum = df['id']
package_colum = df['package']

# DSL风格
# select API
df.select(['id', 'package']).show() # list
df.select('id', 'package').show() # 可变参数
df.select(id_colum, package_colum).show() # Colum对象

# filter API
df.filter('id < 5').show()
df.filter(df['id'] < 5).show() # Colum对象

# where API
df.where('id < 5').show()
df.where(df['id'] < 5).show() # Colum对象

# groupBy API
df.groupBy('package').count().show()
df.groupBy(df['package']).count().show() # Colum对象

# 返回值不是DataFrame,而是GroupedData对象
# 是一个有分组关系的数据结构,提供API做分组聚合
# SQL:group by后接聚合:sum、avg、count、min、max
# GroupedData类似于SQL分组后的数据结构,同样拥有上述5种聚合方法
# GroupedData调用聚合方法后,返回值依旧是DataFrame对象
# GroupedData只是一个中转对象,最终还是要获得DataFrame对象
r = df.groupBy('package')
print(r.sum().show(), r.avg().show(), r.count().show(), r.min().show(), r.max().show())

SQL风格

使用SQL风格,需要将DataFrame提前注册成表

1
2
3
df.createTempView("pip")            # 注册一个临时视图(表)
df.createOrReplaceTempView("pip") # 注册一个临时表,如果已存在则进行替换
df.createGlobalTempView("pip") # 注册一个全局表
  • 全局表:可跨SparkSession对象使用,在一个程序内的多个SparkSession中均可调用,查询时带上前缀global_temp

  • 临时表:仅在当前SparkSession中可用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext

df = spark.read.format("csv").schema("id INT, package STRING, version STRING").load("hdfs://master:8020/input/pip_file.txt")

# 注册临时表
df.createTempView("pip")
df.createOrReplaceTempView("pip")
df.createGlobalTempView("pip_2")

# 通过SparkSession SQL API执行sql语句
spark.sql("SELECT package, COUNT(*) AS cnt FROM pip GROUP BY package").show()
spark.sql("SELECT package, COUNT(*) AS cnt FROM global_temp.pip_2 GROUP BY package").show()

pyspark.sql.functions

PySpark提供的pyspark.sql.functions包包含一系列可供SparkSQL使用的计算函数

1
from pyspark.sql import functions as F

即可调用F对象调用函数进行计算。这些功能函数的返回值大多是Colum对象

WordCount示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("test").master("local[*]").getOrCreate()
sc = spark.sparkContext

# SQL风格处理
rdd = sc.textFile("hdfs://master:8020/input/words.txt").flatMap(lambda x: x.split(" ")).map(lambda x: [x])
df = rdd.toDF(["word"])
df.createTempView("words")
spark.sql("SELECT word, COUNT(*) AS cnt FROM words GROUP BY word ORDER BY cnt DESC").show()

# DSL风格处理
df = spark.read.format("text").load("hdfs://master:8020/input/words.txt")
# withColumn方法
# :对已存在的列进行操作,返回与一个新的列,如果名字和之前相同则替换,否则新建列
df.withColumn("value", F.explode(F.split(df['value'], " "))).show()
df2 = df.withColumn("value", F.explode(F.split(df['value'], " "))).show()
df2.groupBy("value").count().show()
df2.groupBy("value").count().withColumnRenamed("value", "name").withColumnRenamed("count", "cnt").orderBy("cnt", ascending=False).show()

SparkSQL Shuffle

spark.sql.shuffle.partitions参数是在spark sql计算过程中,shuffle算子阶段默认的分区数是200个。对于集群模式,200较为合适,如果在local模式下运行,200较多,会在调度上带来额外的损耗,所以在local模式下建议修改较低,例如2/4/10。这个参数和RDD中设置并行度的参数相互独立

可以按优先级在三处设置:

  • 代码设置

    1
    spark = SparkSession.builder.appName("test").master("local[*]").config("spark.sql.shuffle.partitions", "2").getOrCreate()
  • 客户端参数设置

    bin/spark-submit --conf "spark.sql.shuffle.partitions=100"

  • 配置文件设置

    conf/spark-defaults.conf spark.sql.shuffle.partitions 100

SparkSQL数据清洗

数据去重API

dropDuplicates

1
2
3
4
# DataFrame API
# 不设置参数,对全局的列联合起来进行比较,去除重复值,只保留一条
df.dropDuplicates().show()
df.dropDuplicates(['id', 'version']).show()

缺失值处理API

dropna

1
2
3
4
5
6
7
# DataFrame API
# 对缺失值的数据进行删除
# 不设置参数,只要列中存在null即删除整行
df.dropna().show()
# tresh=3表示,最少满足3个有效列,不满足即删除当前数据
df.dropna(thresh=3).show()
df.dropna(tresh=2,subset['id', 'version']).show()

fillna

1
2
3
4
5
6
7
# DataFrame API
# 对缺失值的数据进行填充
df.fillna("loss").show()
# 对指定列进行填充
df.fillna("M/A", subset['version']).show()
# 指定一个字典,对所有的列提供填充依据
df.fillna({"id": "unknown", "package": "unknown", "version": "none"}).show()

DataFrame数据写出

1
2
3
4
5
6
# mode:传入模式,append-追加,overwrite-覆盖,ignore-忽略,error-重复异常(默认)
# format:传入格式:text/csv/json/parquet/orc/avro/jdbc
# (text源仅支持单列df写出)
# option:设置属性,如.option("sep", ",")
# save:保存路径,支持本地文件系统和HDFS
df.write.mode().format().option(K, V).save(PATH)

e.g.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType
from pyspark.sql import functions as F

# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder.\
appName("test").\
master("local[*]").\
config("spark.sql.shuffle.partitions", 2).\
getOrCreate()
sc = spark.sparkContext

# 1. 读取数据集
schema = StructType().add("user_id", StringType(), nullable=True). \
add("movie_id", IntegerType(), nullable=True). \
add("rank", IntegerType(), nullable=True). \
add("ts", StringType(), nullable=True)
df = spark.read.format("csv"). \
option("sep", "\t"). \
option("header", False). \
option("encoding", "utf-8"). \
schema(schema=schema). \
load("hdfs://master:8020/input/u.data")

# Write text 写出, 只能写出一个列的数据, 需要将df转换为单列df
df.select(F.concat_ws("---", "user_id", "movie_id", "rank", "ts")).\
write.\
mode("overwrite").\
format("text").\
save("hdfs://master:8020/output/sql/text")

# Write csv
df.write.mode("overwrite").\
format("csv").\
option("sep", ";").\
option("header", True).\
save("hdfs://master:8020/output/sql/csv")

# Write json
df.write.mode("overwrite").\
format("json").\
save("hdfs://master:8020/output/sql/json")

# Write parquet
df.write.mode("overwrite").\
format("parquet").\
save("hdfs://master:8020/output/sql/parquet")

DataFrame JDBC

将mysql包添加进pyspark

1
2
cd /usr/local/anaconda3/envs/pyspark/lib/python3.8/site-packages/pyspark/jars/
rz

DataFrame读写数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 将DataFrame通过JDBC写入mysql
df.write.mode("overwrite").\
format("jdbc").\
option("url", "jdbc:mysql://master:3306/...").\
option("dbtable", "test").\
option("user", "root").\
option("password", "123456").\
save()

# JDBC会自动建表,因为DataFrame中含有表结构的信息
# 读mysql
df = spark.read.format("jdbc").\
option("url", "jdbc:mysql://master:3306/...").\
option("dbtable", "test").\
option("user", "root").\
option("password", "123456").\
load()

df.printSchema()
df.show()

SparkSQL函数定义

SparkSQL定义UDF函数

SparkSQL模块自带实现公共方法的位置在pyspark.sql.functions中,同时SparkSQL和Hive一样支持自定义函数:UDF和UDAF

目前python仅支持SparkSQL UDF自定函数

1
2
3
4
5
6
7
8
9
10
11
# 注册的UDF可用于DSL和SQL风格
# 返回值用于DSL风格,传参内的名称用于SQL风格
# arg1:注册的UDF名称,仅可用于SQL风格
# arg2:UDF处理逻辑,是一个单独的方法
# arg3:声明UDF的返回值类型,UDF注册时,必须声明返回值类型,并且UDF的真实返回值一定要和声明的返回值一致
# 返回值对象:是一个UDF对象,仅可用于DSL语法
# 这种方式定义的UDF,可以通过arg1的名称用于SQL风格,通过返回值对象用于DSL风格
sparksession.udf.register(arg1, arg2, arg3)

# 仅能用于DSL风格
pyspark.sql.functions.udf

e.g.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("test").master("local[*]").config("spark.sql.shuffle.partitions", 2).getOrCreate()
sc = spark.sparkContext

rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10]).map(lambda x: [x])
df = rdd.toDF(['num'])


# sparksession.udf.register()
def num_ride_10(num):
return num * 10


udf1 = spark.udf.register("udf1", num_ride_10, IntegerType())

# SQL风格使用
# selectExpr:以SELECT的表达式执行(SQL字符串)
# 普通select方法接受普通字符串字段名,或者返回值是Column对象的计算
df.selectExpr("udf1(num)").show()

# DSL风格
# 返回值UDF对象如果作为方法使用,传入的参数一定是Column对象
df.select(udf1(df['num'])).show()

# pyspark.sql.functions.udf
udf2 = F.udf(num_ride_10, IntegerType())
df.select(udf2(df['num'])).show()
df.selectExpr("udf2(num)").show()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
+---------+
|udf1(num)|
+---------+
| 10|
| 20|
| 30|
| 40|
| 50|
| 60|
| 70|
| 80|
| 90|
| 100|
+---------+

+---------+
|udf1(num)|
+---------+
| 10|
| 20|
| 30|
| 40|
| 50|
| 60|
| 70|
| 80|
| 90|
| 100|
+---------+

+----------------+
|num_ride_10(num)|
+----------------+
| 10|
| 20|
| 30|
| 40|
| 50|
| 60|
| 70|
+----------------+

注册返回值为数组类型的UDF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, ArrayType
import pandas as pd
from pyspark.sql import functions as F


# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder.appName("test").master("local[*]").config("spark.sql.shuffle.partitions", 2).getOrCreate()
sc = spark.sparkContext

# 构建一个RDD
rdd = sc.parallelize([["hadoop spark flink"], ["hadoop flink java"]])
df = rdd.toDF(["line"])


# 注册UDF, UDF的执行函数定义
def split_line(data):
return data.split(" ") # 返回值是一个Array对象
# TODO1 方式1 构建UDF
udf2 = spark.udf.register("udf1", split_line, ArrayType(StringType()))

# DLS风格
df.select(udf2(df['line'])).show()
# SQL风格
df.createTempView("lines")
spark.sql("SELECT udf1(line) FROM lines").show(truncate=False)

# TODO 2 方式2的形式构建UDF
udf3 = F.udf(split_line, ArrayType(StringType()))
df.select(udf3(df['line'])).show(truncate=False)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
+--------------------+
| udf1(line)|
+--------------------+
|[hadoop, spark, f...|
|[hadoop, flink, j...|
+--------------------+

+----------------------+
|udf1(line) |
+----------------------+
|[hadoop, spark, flink]|
|[hadoop, flink, java] |
+----------------------+

+----------------------+
|split_line(line) |
+----------------------+
|[hadoop, spark, flink]|
|[hadoop, flink, java] |
+----------------------+

注册返回值为字典类型的UDF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import string
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType, ArrayType


# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder.master("local[*]").config("spark.sql.shuffle.partitions", 2).getOrCreate()
sc = spark.sparkContext

# 假设 有三个数字 1 2 3 我们传入数字 ,返回数字所在序号对应的 字母 然后和数字结合形成dict返回
# 比如传入1 我们返回 {"num":1, "letters": "a"}
rdd = sc.parallelize([[1], [2], [3]])
df = rdd.toDF(["num"])


# 注册UDF
def process(data):
return {"num": data, "letters": string.ascii_letters[data]}


"""
UDF的返回值是字典的话, 需要用StructType来接收
"""
udf1 = spark.udf.register("udf1", process, StructType().add("num", IntegerType(), nullable=True).\
add("letters", StringType(), nullable=True))

df.selectExpr("udf1(num)").show(truncate=False)
df.select(udf1(df['num'])).show(truncate=False)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+---------+
|udf1(num)|
+---------+
|{1, b} |
|{2, c} |
|{3, d} |
+---------+

+---------+
|udf1(num)|
+---------+
|{1, b} |
|{2, c} |
|{3, d} |
+---------+

通过RDD模拟UDAF

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from pyspark.sql import SparkSession

# 0. 构建执行环境入口对象SparkSession
spark = SparkSession.builder.appName("test").master("local[*]").config("spark.sql.shuffle.partitions", 2).getOrCreate()
sc = spark.sparkContext

rdd = sc.parallelize([1, 2, 3, 4, 5], 3)
df = rdd.map(lambda x: [x]).toDF(['num'])

# 折中的方式 就是使用RDD的mapPartitions 算子来完成聚合操作
# 如果用mapPartitions API 完成UDAF聚合, 一定要单分区
single_partition_rdd = df.rdd.repartition(1)


def process(iter):
sum = 0
for row in iter:
sum += row['num']

return [sum] # 一定要嵌套list, 因为mapPartitions方法要求的返回值是list对象


print(single_partition_rdd.mapPartitions(process).collect())
1
[15]

SparkSQL窗口函数

开窗函数

开窗函数的引入是为了既显示聚集前的数据又显示聚集后的数据,即在每一行的最后一列添加聚合函数的结果

开窗用于为行为定义一个窗口(运算将要操作的行为集合),对一组值进行操作,不需要使用GROUP BY字句对数据进行分组,能够在同一行中同时返回基础行的列和聚合列

聚合函数和开窗函数

  • 聚合函数是将多行变为一行,count、avg…;如果要显示其他的列必须将列加入到GROUP BY

  • 开窗函数是将一行变成多行,可以不使用GROUP BY直接显示所有数据

开窗函数分类

  • 聚合开窗函数
  • 排序开窗函数
  • 分区类型NTILE的窗口函数

Powered by Hexo and Hexo-theme-hiker

Copyright © 2017 - 2024 青域 All Rights Reserved.

UV : | PV :