Spark-SQL

SparkSQL

SparkSQL是spark的一个用于处理海量结构化数据的模块

  • 支持SQL语言
  • 自动优化
  • 性能强
  • 兼容HIVE
  • API流程简单
  • 支持标准化JDBC和ODBC连接

SparkSQL数据抽象

  • Pandas · DataFrame

    · 二维表数据结构

    · 单机(本地)集合

  • SparkCore · RDD

    · 无标准数据结构

    · 分布式(分区)集合

  • SparkSQL · DataFrame

    · 二维表数据结构

    · 分布式(分区)集合

SparkSession对象

RDD程序的执行入口对象:SparkContext

在Spark2.0以后,推出了SparkSession对象,来作为Spark编码的统一入口对象。SparkSession

  • 用于SparkSQL编程,作为入口对象
  • 用于SparkCore编程,通过SparkSession对象获取SparkContext

构建SparkSession对象:

1
2
3
4
5
6
7
from pyspark.sql import SparkSession

# 通过builder方法来构建SparkSession对象
# appName:设置程序名称
# config:配置常用属性
# getOrCreate:完成创建SparkSession对象
spark = SparkSession.builder.appName("test").master("local[*]").config("spark.sql.shuffle.partitions", "4").getOrCreate()

通过SparkSesion对象获取SparkContext对象:

1
2
3
4
5
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("test").master("local[*]").config("spark.sql.shuffle.partitions", "4").getOrCreate()

sc = spark.sparkContext

Read More

Spark-Core

广播变量

本地对象被发送到同个Executor内每个分区的处理线程上使用,这样每个分区实际上存放了重复的数据。而Executor本质上是进程,进程内资源共享,没必要将本地对象分发给所有分区,造成内存浪费

解决方案:将本地对象设置为广播变量

1
2
3
4
5
# 1.将本地对象标记为广播变量
broadcast = sc.broadcast(var)
# 2.使用广播变量,从broadcast对象中取出本地对象
value = broadcast.value
# 当传输的是广播对象时,spark会只给每个Executor分发一份数据

当本地集合对象和分布式集合对象(RDD)进行关联时,需要将本地集合对象封装为广播变量

  • 节省网络IO次数
  • 降低Executor内存占用

累加器

当执行累加操作时,各个分区累加自身的内容

1
2
# spark提供累加器变量,参数是初始值
acmlt = sc.accumulator(0)

e.g.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("test").setMaster("local[*]")
sc = SparkContext(conf=conf)

acmlt = sc.accumulator(0)


def counts(data):
global acmlt
acmlt += 1
return data


rdd1 = sc.parallelize([1, 2, 3], 3)
rdd2 = rdd1.map(counts)
print(rdd2.collect())
print(acmlt)
1
2
[1, 2, 3]
3

:累加器可能因血缘关系导致重复的累加,例如一个RDD被释放后累加已经完成,此时再使用该RDD将会导致重复累加。可通过cache缓存机制来解决

Read More

Pyspark-RDD

RDD

RDD(Resilient Distributed Dataset)弹性分布式数据集,是spark中最基本的数据抽象,代表一个不可变、可分区、其中元素可并行计算的集合

  • Resilient:RDD中的数据可存储再内存或磁盘中
  • Distributed:分布式存储数据(跨机器/跨进程),用于分布式计算
  • Dataset:一个用于存放数据的数据集合

特性

分区

RDD分区是RDD数据存储的最小单位,一份RDD数据本质上分隔成了多个分区

1
2
3
# 存储9个数字,设立三个分区
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 3)
rdd.glom().collect()
1
[[1,2,3],[4,5,6],[7,8,9]]

RDD方法会作用在其所有方法上

1
rdd.map(lambda x: x * 10).collect()
1
[10,20,30,40,50,60,70,80,90]

RDD之间具有依赖关系

1
2
3
4
5
6
sc = SparkContext(conf=conf)
rdd1 = sc.textFile("../test.text")
rdd2 = rdd1.flatMap(lambda x: x.split(' '))
rdd3 = rdd2.map(lambda x: (x, 1))
rdd4 = rdd3.reduceByKey(lambda a, b: a+b)
print(rdd4.collect())

Key-Value型RDD可以有分区器

KV型RDD:RDD内存储的数据是只有两个元素的二元元组

默认分区器:Hash分区规则,也可手动设置分区器:rdd.partitionBy()方法

:不是所有RDD都是KV型

RDD的分区规划:会尽量靠近数据所在的服务器

在初始RDD读取数据规划阶段,分区会尽量规划到存储数据所在服务器,直接读取本地数据,避免从网络读取数据

Spark会在确保并行计算能力的前提下,尽量确保本地读取

RDD创建

  • 通过并行化集合创建(本地对象转化为分布式RDD)
  • 读取外部数据源(读文件)

并行化创建

1
2
3
# arg1: 集合对象,如:list
# arg2:可选,指定分区数量
rdd = SparkContext.parallelize(arg1, arg2)

读取文件

通过textFile API来读取本地或者hdfs的数据

1
2
3
4
# arg1: 文件路径
# arg2:可选,最小分区数量
# 当arg2超出spark允许范围,参数失效
SparkContext.textFile(arg1, arg2)

通过wholeTextFile API来读取小文件,这个api偏向于少量分区读取数据,是pyspark基于小文件的优化

1
2
3
4
# arg1:文件路径
# arg2:可选,最小分区数量
# 当arg2超出spark允许范围,参数失效
SparkContext.wholeTextFiles(arg1, arg2)

Read More

Pyspark

Pyspark库安装

(本文基于上文spark基础)

Python库安装

在三台机器分别安装pyspark

1
2
conda activate pyspark
pip install pyspark -i https://mirror.baidu.com/pypi/simple

Windows补丁

将hadoop.dll置于C:/windows/system32/目录下,然后配置hadoop工具包的环境变量

安装相关python库

1
pip install pyspark pyhive pymysql jieba -i https://mirror.baidu.com/pypi/simple

为pycharm添加ssh解释器环境

Read More

Spark基础

定义Apache Spark是用于大规模数据处理的统一分析引擎。其核心数据结构:弹性分布式数据集(RDD)能够在大规模集群中做内存运算,且具有一定容错方式。

Spark框架

组成

· Spark Core:以RDD为数据抽象,提供Python、Java、Scala、R语言的API和Spark的核心功能,是Spark运行的基础;

· SparkSQL:基于SparkCore,提供机构化数据处理模块,支持以SQL语言对数据进行处理;同时可以作为StructuredStreaming模块的基础,进行数据流式计算;

· SparkStreaming:基于SparkCore,提供数据流式计算;

· MLlib:基于SparkCore,内置大量机器学习库和算法API,进行机器学习计算;

· GraphX:基于SparkCore,提供了大量图计算API,用于分布式图计算

运行模式

· 本地模式(单机/local):以一个独立进程,通过内部的多线程模拟Spark运行环境

· Standalone模式(集群):Spark的各个角色以独立进程形式存在,组成集群环境

· Hadoop YARN模式(集群):Spark的各个角色运行在YARN容器内部,组成集群环境

· Kubernetes模式(容器集群):Spark的各个角色运行在Kubernetes容器内部,组成集群环境

· 云服务模式

角色

  • 资源

· Master角色:集群资源管理

· Worker角色:单机资源管理(所在服务器资源管理)

  • 任务

· Driver角色:单个任务管理

· Executor角色:单个任务计算

Read More

Nebula-全文检索

连接ES

添加Nebula全文索引模板文件

http://localhost:5601/app/dev_tools#/console

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
curl -H "Content-Type: application/json; charset=utf-8" -XPUT http://localhost:9200/_template/nebula_index_template -d '
{
"template": "nebula*",
"settings": {
"index": {
"number_of_shards": 3,
"number_of_replicas": 1
}
},
"mappings": {
"properties" : {
"tag_id" : { "type" : "long" },
"column_id" : { "type" : "text" },
"value" :{ "type" : "keyword"}
}
}
}'

登录ES客户端

SIGN IN TEXT SERVICE (localhost:9200,"admin","123456")

查看ES客户端详情

SHOW TEXT SEARCH CLIENTS;

退出ES客户端

SIGN OUT TEXT SERVICE

配置Nebula

安装storage服务

进入/usr/local/nebula根目录下,进入/etc子目录,找到nebula-storaged-listener.conf.production文件,复制一份并去除.production后缀。将文件中的listenr地址改为真实地址。

随后启动listener:./bin/nebula-storaged --flagfile etc/nebula-storaged-listener.conf

添加listener

ADD LISTENER ELASTICSEARCH 192.168.100.1:9789,192.168.100.2:9789;(注:如果有多台图库集群,都要配置)

进入nebula-console,执行SHOW LISTENER可以查看图空间的所有listener。

要删除所有listener,执行:REMOVE LISTENER ELASTICSEARCH(注:一个图空间仅可执行一次)

Read More

Docker

CentOS Docker安装

CentOS7要求64位,且内核版本高于3.10

查看CentOS内核版本:uname -r

· 安装Docker:卸载旧版本

1
2
3
4
5
6
7
8
$ sudo yum remove docker \
docker-client \
docker-client-latest \
docker-common \
docker-latest \
docker-latest-logrotate \
docker-logrotate \
docker-engine

· 安装所需软件包

1
2
3
$ sudo yum install -y yum-utils \
device-mapper-persistent-data \
lvm2

· 设置Docker仓库(清华源)

1
2
3
$ sudo yum-config-manager \
--add-repo \
https://mirrors.tuna.tsinghua.edu.cn/docker-ce/linux/centos/docker-ce.repo

· 安装Docker Engine-Community

1
$ sudo yum install docker-ce docker-ce-cli containerd.io

Read More

Nebula-fastapi接口维护文档

普通接口

1.导入fastapi、连接nebula连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from fastapi import FastAPI
from nebula2.gclient.net import ConnectionPool
from nebula2.Config import Config


# 关闭在线文档,防止攻击
app_router = FastAPI(docs_url=None, redoc_url=None)

config = Config()
config.max_connection_pool_size = 10
# 连接超时时间
config.timeout = 60000
# 关闭空闲连接时间
config.idle_time = 0
# 检查空闲连接时间间隔
config.interval_check = -1
# 初始化连接池
connection_pool = ConnectionPool()
# 如果给定的服务器正常,则返回true,否则返回false
ok = connection_pool.init([('host', 9669)], config)

if __name__ == "__main__":
import uvicorn
uvicorn.run(app="nebula_api:app_router", reload=True, debug=True, host=host, port=port)

2.CORS跨域访问设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from fastapi.middleware.cors import CORSMiddleware


# CORS
origins = [
"*"
]

app_router.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"]
)

Read More

PaddleOCR表格识别

PaddleOCR2.5根目录下的ppstructure文件模块是PaddleOCR提供的一个可用于复杂文档结构分析处理的OCR工具包
github文档页面:https://github.com/PaddlePaddle/PaddleOCR/blob/release/2.5/ppstructure/README_ch.md

安装依赖

· 安装paddleocr version>=2.5

1
pip install "paddleocr>=2.5"

· 安装版面分析依赖包layoutparser

1
pip install -U https://paddleocr.bj.bcebos.com/whl/layoutparser-0.0.0-py3-none-any.whl

· 安装DocVQA依赖包paddlenlp(DocVQA功能,选装)

1
pip install paddlenlp

快速开始

在PaddleOCR/ppstructure目录下进入CMD命令行,或者创建python脚本启动
· 表格识别

1
paddleocr --image_dir=docs/table/table.jpg --type=structure --layout=false

python脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import os
import cv2
from paddleocr import PPStructure,save_structure_res

table_engine = PPStructure(layout=False, show_log=True)

save_folder = './output'
img_path = 'PaddleOCR/ppstructure/docs/table/table.jpg'
img = cv2.imread(img_path)
result = table_engine(img)
save_structure_res(result, save_folder, os.path.basename(img_path).split('.')[0])

for line in result:
line.pop('img')
print(line)

· 版面分析

1
paddleocr --image_dir=docs/table/1.png --type=structure --table=false --ocr=false

python脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import os
import cv2
from paddleocr import PPStructure,save_structure_res

table_engine = PPStructure(table=False, ocr=False, show_log=True)

save_folder = './output'
img_path = 'PaddleOCR/ppstructure/docs/table/1.png'
img = cv2.imread(img_path)
result = table_engine(img)
save_structure_res(result, save_folder, os.path.basename(img_path).split('.')[0])

for line in result:
line.pop('img')
print(line)

· 版面分析+表格识别

1
paddleocr --image_dir=docs/table/1.png --type=structure

python脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import os
import cv2
from paddleocr import PPStructure, draw_structure_result, save_structure_res
from PIL import Image

table_engine = PPStructure(show_log=True)

save_folder = './output'
img_path = './ppstructure/docs/table/table.jpg'
img = cv2.imread(img_path)
result = table_engine(img)
save_structure_res(result, save_folder, os.path.basename(img_path).split('.')[0])

for line in result:
line.pop('img')
print(line)


font_path = './doc/fonts/simfang.ttf'
image = Image.open(img_path).convert('RGB')
im_show = draw_structure_result(image, result, font_path=font_path)
im_show = Image.fromarray(im_show)
im_show.save('result.jpg')

Read More

PaddleOCR

安装

进入PaddleOCR的github页面(https://github.com/PaddlePaddle/PaddleOCR),进行下载和解压

使用pip进行安装,这里因为速度很慢推荐使用百度源

1
pip install paddlepaddle -i https://mirror.baidu.com/pypi/simple

安装shapely(https://www.lfd.uci.edu/~gohlke/pythonlibs/),下载shapely对应python和系统版本的安装包(我使用的是py39,windows_64),python版本目前不能超过3.9

将下载好的Shapely-1.8.2-cp39-cp39-win_amd64.whl放进python根目录下的libs文件夹内,通过cmd或pycharm终端使用pip执行安装

1
pip install Shapely-1.8.2-cp39-cp39-win_amd64.whl

完成以后接着来到PaddleOCR目录下,通过终端安装依赖,这里同样推荐使用百度源

1
pip install -r requirements.txt -i https://mirror.baidu.com/pypi/simple

到这里,PaddleOCR的安装完成了

如果再执行pip install -r requirements.txt -i https://mirror.baidu.com/pypi/simple到安装opencv4.4.0.46包时报错:

error: subprocess-exited-with-error

说明python的版本可能存在问题,需要切换虚拟环境或回退python版本,因为opencv-python目前仅支持py3.6-3.9版本

opencv-python镜像:https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple/opencv-python/

Read More


Powered by Hexo and Hexo-theme-hiker

Copyright © 2017 - 2025 青域 All Rights Reserved.

UV : | PV :