Nebula-Spark和图算法

Nebula Spark Connector

下载地址&官方文档:【https://github.com/vesoft-inc/nebula-spark-connector

环境

· nebula:2.6.1
· hadoop:2.7
· spark:2.4.7
· pyspark:2.4.7
· python:3.7.16
· nebula-spark-connector:2.6.1

编译打包nebula-spark-connector

1
2
$ cd nebula-spark-connector-2.6.1/nebula-spark-connector
$ mvn clean package -Dmaven.test.skip=true -Dgpg.skip -Dmaven.javadoc.skip=true

成功后在nebula-spark-connector/target/ 目录下得到 nebula-spark-connector-2.6.1.jar文件

1
2
3
4
5
6
7
8
9
10
11
12
(base) [root@root target]# ll
total 106792
drwxr-xr-x 3 root root 17 Mar 11 14:14 classes
-rw-r--r-- 1 root root 1 Mar 11 14:14 classes.-497386701.timestamp
-rw-r--r-- 1 root root 1 Mar 11 14:14 classes.timestamp
-rw-r--r-- 1 root root 30701 Mar 11 14:15 jacoco.exec
drwxr-xr-x 2 root root 28 Mar 11 14:15 maven-archiver
-rw-r--r-- 1 root root 108375457 Mar 11 14:16 nebula-spark-connector-2.6.1.jar
-rw-r--r-- 1 root root 583482 Mar 11 14:16 nebula-spark-connector-2.6.1-javadoc.jar
-rw-r--r-- 1 root root 36358 Mar 11 14:16 nebula-spark-connector-2.6.1-sources.jar
-rw-r--r-- 1 root root 315392 Mar 11 14:15 original-nebula-spark-connector-2.6.1.jar
drwxr-xr-x 4 root root 37 Mar 11 14:15 site

PySpark 读取 NebulaGraph 数据

metaAddress"metad0:9559" 的 Nebula Graph 中读取整个 tag 下的数据为一个 dataframe:

1
2
3
4
5
6
7
8
df = spark.read.format(
"com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"returnCols", "name,age").option(
"metaAddress", "metad0:9559").option(
"partitionNumber", 1).load()

然后可以像这样 show 这个 dataframe:

1
2
3
4
5
6
7
8
>>> df.show(n=2)
+---------+--------------+---+
|_vertexId| name|age|
+---------+--------------+---+
|player105| Danny Green| 31|
|player109|Tiago Splitter| 34|
+---------+--------------+---+
only showing top 2 rows

PySpark 写 NebulaGraph 数据

默认不指定的情况下 writeModeinsert

写入点

1
2
3
4
5
6
7
8
9
10
11
df.write.format("com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"vidPolicy", "").option(
"vertexField", "_vertexId").option(
"batch", 1).option(
"metaAddress", "metad0:9559").option(
"graphAddress", "graphd1:9669").option(
"passwd", "nebula").option(
"user", "root").save()

删除点

如果想指定 delete 或者 update 的非默认写入模式,增加 writeMode 的配置,比如 delete 的例子:

1
2
3
4
5
6
7
8
9
10
11
12
df.write.format("com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "basketballplayer").option(
"label", "player").option(
"vidPolicy", "").option(
"vertexField", "_vertexId").option(
"batch", 1).option(
"metaAddress", "metad0:9559").option(
"graphAddress", "graphd1:9669").option(
"passwd", "nebula").option(
"writeMode", "delete").option(
"user", "root").save()

写入边

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
df.write.format("com.vesoft.nebula.connector.NebulaDataSource")\
.mode("overwrite")\
.option("srcPolicy", "")\
.option("dstPolicy", "")\
.option("metaAddress", "metad0:9559")\
.option("graphAddress", "graphd:9669")\
.option("user", "root")\
.option("passwd", "nebula")\
.option("type", "edge")\
.option("spaceName", "basketballplayer")\
.option("label", "server")\
.option("srcVertexField", "srcid")\
.option("dstVertexField", "dstid")\
.option("rankField", "")\
.option("batch", 100)\
.option("writeMode", "insert").save() # delete to delete edge, update to update edge

删除边

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
df.write.format("com.vesoft.nebula.connector.NebulaDataSource")\
.mode("overwrite")\
.option("srcPolicy", "")\
.option("dstPolicy", "")\
.option("metaAddress", "metad0:9559")\
.option("graphAddress", "graphd:9669")\
.option("user", "root")\
.option("passwd", "nebula")\
.option("type", "edge")\
.option("spaceName", "basketballplayer")\
.option("label", "server")\
.option("srcVertexField", "srcid")\
.option("dstVertexField", "dstid")\
.option("randkField", "")\
.option("batch", 100)\
.option("writeMode", "delete").save() # delete to delete edge, update to update edge

关于 PySpark 读写的 option

对于其他的 option,比如删除点的时候的 withDeleteEdge 可以参考 [nebula/connector/NebulaOptions.scala

](https://github.com/vesoft-inc/nebula-spark-connector/blob/master/nebula-spark-connector/src/main/scala/com/vesoft/nebula/connector/NebulaOptions.scala) 的字符串配置定义,我们可以看到它的字符串定义字段是 deleteEdge

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/** write config */
val RATE_LIMIT: String = "rateLimit"
val VID_POLICY: String = "vidPolicy"
val SRC_POLICY: String = "srcPolicy"
val DST_POLICY: String = "dstPolicy"
val VERTEX_FIELD = "vertexField"
val SRC_VERTEX_FIELD = "srcVertexField"
val DST_VERTEX_FIELD = "dstVertexField"
val RANK_FIELD = "randkField"
val BATCH: String = "batch"
val VID_AS_PROP: String = "vidAsProp"
val SRC_AS_PROP: String = "srcAsProp"
val DST_AS_PROP: String = "dstAsProp"
val RANK_AS_PROP: String = "rankAsProp"
val WRITE_MODE: String = "writeMode"
val DELETE_EDGE: String = "deleteEdge"

PySpark 调用 Nebula Spark Connector

Pycharm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# 导入SparkSession、findspark自动获取${SPARK_HOME},定义虚拟环境
import os
import findspark

from pyspark.sql import SparkSession

os.environ["HADOOP_CONF_DIR"] = "/exp/server/hadoop-2.7.7/etc/hadoop"
os.environ["YARN_CONF_DIR"] = "/exp/server/hadoop-2.7.7/etc/hadoop"
os.environ["SPARK_HOME"] = "/exp/server/spark-2.4.7-bin-hadoop2.7"
os.environ["PYSPARK_PYTHON"] = "/root/anaconda3/envs/pyspark38/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/root/anaconda3/envs/pyspark38/bin/python"
findspark.init()

# 构建SparkSession对象,导入target下的jar包
spark = SparkSession.builder.config(
"spark.jars",
"/exp/work/pyspark/nebula-spark-connector/nebula-spark-connector/target/nebula-spark-connector-2.6.1.jar").config(
"spark.driver.extraClassPath",
"/exp/work/pyspark/nebula-spark-connector/nebula-spark-connector/target/nebula-spark-connector-2.6.1.jar").appName(
"nebula-connector").getOrCreate()

# 读取nebula-graph数据
# read vertex
df_tag = spark.read.format(
"com.vesoft.nebula.connector.NebulaDataSource").option(
"sep", "\t").option(
"type", "vertex").option(
"spaceName", "space").option(
"label", "tag").option(
"returnCols", "").option(
"metaAddress", "metahost:9559").option(
"partitionNumber", 1).load()

# read edge
df_edge = spark.read.format(
"com.vesoft.nebula.connector.NebulaDataSource").option(
"sep", "\t").option(
"type", "edge").option(
"spaceName", "space").option(
"label", "edge").option(
"returnCols", "").option(
"metaAddress", "metahost:9559").option(
"partitionNumber", 1).load()

# 写回nebula-graph
# write vertex
df_tag.write.format("com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", "space").option(
"label", "tag").option(
"vidPolicy", "").option(
"vertexField", "_vertexId").option(
"batch", 1).option(
"metaAddress", "metahost:9559").option(
"graphAddress", "graphhost:9669").option(
"passwd", "nebula").option(
"writeMode", "update").option(
"user", "root").save()

df_tag.show()
df_edge.show()

Spark-submit

1
2
3
4
${SPARK_HOME}/bin/spark-submit --master local[*] \
--deploy-mode client \
--jars file:/exp/work/pyspark/nebula-spark-connector/nebula-spark-connector/target/nebula-spark-connector-2.6.1.jar \
/exp/work/pyspark/nebula/nebula_reader.py

NebulaAlgorithm

下载地址&官方文档:【<https://github.com/vesoft-inc/nebula-algorithm】

环境

· nebula:2.6.1
· hadoop:2.7
· spark:2.4.7
· pyspark:2.4.7
· python:3.7.16
· nebula-spark-connector:2.6.1
· nebula-algorithm:2.6.1

编译打包nebula-spark-connector

1
2
$ cd nebula-algorithm-2.6.1/nebula-algorithm
$ mvn clean package -Dgpg.skip -Dmaven.javadoc.skip=true -Dmaven.test.skip=true

成功后在nebula-algorithm/target/ 目录下得到 nebula-algorithm-2.6.1.jar文件

PySpark 调用 Nebula Algorithm

Pycharm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import os

import findspark
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from py4j.java_gateway import java_import


os.environ["HADOOP_CONF_DIR"] = "/exp/server/hadoop-2.7.7/etc/hadoop"
os.environ["YARN_CONF_DIR"] = "/exp/server/hadoop-2.7.7/etc/hadoop"
os.environ["SPARK_HOME"] = "/exp/server/spark-2.4.7-bin-hadoop2.7"
os.environ["PYSPARK_PYTHON"] = "/root/anaconda3/envs/pyspark37/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/root/anaconda3/envs/pyspark37/bin/python"
findspark.init()


spark = SparkSession.builder.config(
"spark.jars",
"/exp/work/pyspark/nebula-spark-connector/nebula-spark-connector/target/nebula-spark-connector-2.6.1.jar").config(
"spark.driver.extraClassPath",
"/exp/work/pyspark/nebula-spark-connector/nebula-spark-connector/target/nebula-spark-connector-2.6.1.jar").config(
"spark.jars",
"/exp/work/pyspark/nebula-algorithm/nebula-algorithm/target/nebula-algorithm-2.6.1.jar").config(
"spark.driver.extraClassPath",
"/exp/work/pyspark/nebula-algorithm/nebula-algorithm/target/nebula-algorithm-2.6.1.jar").appName(
"nebula-connector").getOrCreate()


# spark = SparkSession.builder.appName("PageRankExample").getOrCreate()
jspark = spark._jsparkSession

# import "com.vesoft.nebula.algorithm.config.SparkConfig"
java_import(spark._jvm, "com.vesoft.nebula.algorithm.config.SparkConfig")

# import "com.vesoft.nebula.algorithm.config.PRConfig"
java_import(spark._jvm, "com.vesoft.nebula.algorithm.config.PRConfig")

# import "com.vesoft.nebula.algorithm.lib.PageRankAlgo"
java_import(spark._jvm, "com.vesoft.nebula.algorithm.lib.PageRankAlgo")


# 将string类型vid转int类型vid
def convert_string_id_to_long_id(df):
df = df.drop("display_desc").drop("creation_time")
src_id_df = df.select("_srcId").withColumnRenamed("_srcId", "id")
dst_id_df = df.select("_dstId").withColumnRenamed("_dstId", "id")
id_df = src_id_df.union(dst_id_df).distinct()
encode_id = id_df.withColumn("encodedId", dense_rank().over(Window.orderBy("id")))
# encode_id.write.option("header", True).csv("file:///tmp/encodeId.csv")
src_join_df = df.join(encode_id, df._srcId == encode_id.id) \
.drop("_srcId") \
.drop("id") \
.withColumnRenamed("encodedId", "_srcId")

df_sv = df.join(encode_id, df._srcId == encode_id.id) \
.drop("_srcId") \
.drop("_rank") \
.drop("_dstId") \
.withColumnRenamed("id", "src_vid") \
.withColumnRenamed("encodedId", "_srcId")

df_dv = src_join_df.join(encode_id, src_join_df._dstId == encode_id.id) \
.drop("_dstId") \
.drop("_rank") \
.drop("degree") \
.withColumnRenamed("encodedId", "_dstId") \
.withColumnRenamed("_srcID", "_src") \
.withColumnRenamed("id", "dst_vid")

dst_join_df = src_join_df.join(encode_id, src_join_df._dstId == encode_id.id) \
.drop("_dstId") \
.drop("_rank") \
.drop("degree") \
.withColumnRenamed("encodedId", "_dstId").drop("id")

df_v = df_dv.join(df_sv, df_dv._src == df_sv._srcId).drop("_src")
return dst_join_df, df_v


df = spark.read.format(
"com.vesoft.nebula.connector.NebulaDataSource").option(
# "sep", "\t").option(
"type", "edge").option(
"spaceName", "DWD_GRAPH_LY_V3_2023").option(
"label", "edge_phone").option(
"returnCols", "").option(
"metaAddress", "192.168.100.45:9559").option(
"partitionNumber", 1).load()

df.orderBy("creation_time").show()


df_int, df_v = convert_string_id_to_long_id(df)
prConfig = spark._jvm.PRConfig(1, 0.8)
prResult = spark._jvm.PageRankAlgo.apply(jspark, df_int._jdf, prConfig, False)
df_v.show(20, False)
prResult.show(20, False)

# 将jdf转df
prResult = prResult.toDF()
pagerank_df = DataFrame(prResult, spark)
pagerank_df.sort(col("pagerank").desc()).show(60, False)


# TODO: PYSPARK (pyspark=2.4.7, python=3.7)
# conda activate pyspark37
# TODO: SPARK-SUBMIT
# ${SPARK_HOME}/bin/spark-submit --master local[*] \
# --deploy-mode client \
# --driver-class-path file:/exp/work/pyspark/nebula-spark-connector/nebula-spark-connector/target/nebula-spark-connector-2.6.1.jar \
# --driver-class-path file:/exp/work/pyspark/nebula-algorithm/nebula-algorithm/target/nebula-algorithm-2.6.1.jar \
# --jars file:/exp/work/pyspark/nebula-spark-connector/nebula-spark-connector/target/nebula-spark-connector-2.6.1.jar \
# --jars file:/exp/work/pyspark/nebula-algorithm/nebula-algorithm/target/nebula-algorithm-2.6.1.jar \
# /exp/work/pyspark/nebula/pagerank.py

Spark-submit

1
2
3
4
5
6
7
${SPARK_HOME}/bin/spark-submit --master local[*] \
--deploy-mode client \
--driver-class-path file:/exp/work/pyspark/nebula-spark-connector/nebula-spark-connector/target/nebula-spark-connector-2.6.1.jar \
--driver-class-path file:/exp/work/pyspark/nebula-algorithm/nebula-algorithm/target/nebula-algorithm-2.6.1.jar \
--jars file:/exp/work/pyspark/nebula-spark-connector/nebula-spark-connector/target/nebula-spark-connector-2.6.1.jar \
--jars file:/exp/work/pyspark/nebula-algorithm/nebula-algorithm/target/nebula-algorithm-2.6.1.jar \
/exp/work/pyspark/nebula/pagerank.py

Result

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
ssh://root@192.168.100.43:22/root/anaconda3/envs/pyspark37/bin/python -u /exp/work/pyspark/nebula/pagerank.py
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/exp/work/pyspark/nebula-algorithm/nebula-algorithm/target/nebula-algorithm-2.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/exp/server/spark-2.4.7-bin-hadoop2.7/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
23/03/24 14:42:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
+--------------------+--------------------+-----+-------------+------------+
| _srcId| _dstId|_rank|creation_time|display_desc|
+--------------------+--------------------+-----+-------------+------------+
|dwd_accounttelegr...|dwd_phone66802018560| 0| 1678692201| 注册手机号|
|dwd_accounttelegr...|dwd_phone12366827389| 0| 1678692201| 注册手机号|
|dwd_accounttelegr...|dwd_phone86151162...| 0| 1678692201| 注册手机号|
|dwd_accounttelegr...|dwd_phone79653470394| 0| 1678692210| 注册手机号|
|dwd_accounttelegr...|dwd_phone99890918...| 0| 1678692210| 注册手机号|
|dwd_accounttelegr...|dwd_phone99890535...| 0| 1678692210| 注册手机号|
|dwd_accounttelegr...|dwd_phone85620578...| 0| 1678692210| 注册手机号|
|dwd_accounttelegr...|dwd_phone99899666...| 0| 1678692210| 注册手机号|
|dwd_accounttelegr...|dwd_phone88802607465| 0| 1678692219| 注册手机号|
|dwd_accounttelegr...|dwd_phone62831556...| 0| 1678692228| 注册手机号|
|dwd_accounttelegr...|dwd_phone95965067...| 0| 1678692228| 注册手机号|
|dwd_accounttelegr...|dwd_phone85595212324| 0| 1678692228| 注册手机号|
|dwd_accounttelegr...|dwd_phone86132873...| 0| 1678692279| 注册手机号|
|dwd_accounttelegr...|dwd_phone63948774...| 0| 1678692279| 注册手机号|
|dwd_accounttelegr...|dwd_phone85590645678| 0| 1678692279| 注册手机号|
|dwd_accounttelegr...|dwd_phone63948229...| 0| 1678692288| 注册手机号|
|dwd_accounttelegr...|dwd_phone99899913...| 0| 1678692288| 注册手机号|
|dwd_accounttelegr...|dwd_phone99890302...| 0| 1678692288| 注册手机号|
|dwd_accounttelegr...|dwd_phone99890986...| 0| 1678692288| 注册手机号|
|dwd_accounttelegr...|dwd_phone16728042478| 0| 1678692297| 注册手机号|
+--------------------+--------------------+-----+-------------+------------+
only showing top 20 rows

23/03/24 14:42:39 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/24 14:42:39 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
[Stage 3:> (0 + 1) / 1][Stage 4:> (0 + 1) / 1]23/03/24 14:42:49 WARN storage.BlockManager: Block rdd_29_0 already exists on this machine; not re-adding it
23/03/24 14:42:50 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/24 14:42:50 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
23/03/24 14:42:50 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+----------------------+------+---------------------------------+------+
|dst_vid |_dstId|src_vid |_srcId|
+----------------------+------+---------------------------------+------+
|dwd_phone8613951884225|18141 |dwd_accountctrip1187326021 |1 |
|dwd_phone8618705166775|19486 |dwd_accountctrip18705166775 |2 |
|dwd_phone8613006584273|17582 |dwd_accountctripM2295153681 |3 |
|dwd_phone8615668271999|18526 |dwd_accountctripM2494074526 |4 |
|dwd_phone8616653556969|18757 |dwd_accountctripM2682271769 |5 |
|dwd_phone8613562835888|17938 |dwd_accountctripM3011519721 |6 |
|dwd_phone8618506413343|19344 |dwd_accountctripM4761127290 |7 |
|dwd_phone8618112957512|19178 |dwd_accountctripM4915141465 |8 |
|dwd_phone8615119464037|18222 |dwd_accountctripM548327259 |9 |
|dwd_phone8613123368388|17719 |dwd_accountctripM601536478 |10 |
|dwd_phone8617605442050|18905 |dwd_accountctrip_WeChat2269232661|11 |
|dwd_phone8613123368388|17719 |dwd_accountdidi101108284 |12 |
|dwd_phone8616653556969|18757 |dwd_accountdidi1641492054156 |13 |
|dwd_phone8615851895366|18609 |dwd_accountdidi1746082933402 |14 |
|dwd_phone8617561929739|18889 |dwd_accountdidi17598416475664 |15 |
|dwd_phone8615119464037|18222 |dwd_accountdidi25441524 |16 |
|dwd_phone8613006584273|17582 |dwd_accountdidi2881218873540 |17 |
|dwd_phone8613951884225|18141 |dwd_accountdidi3099925 |18 |
|dwd_phone8618600764544|19397 |dwd_accountdidi486186360832 |19 |
|dwd_phone8618705166775|19486 |dwd_accountdidi772722 |20 |
+----------------------+------+---------------------------------+------+
only showing top 20 rows

+-----+------------------+
|_id |pagerank |
+-----+------------------+
|19021|1.1105466561585526|
|9831 |0.8884373249268421|
|5354 |0.8884373249268421|
|4926 |0.8884373249268421|
|21377|1.1105466561585526|
|14609|1.1105466561585526|
|11852|1.1105466561585526|
|8390 |0.8884373249268421|
|10837|0.8884373249268421|
|4992 |0.8884373249268421|
|20894|1.1105466561585526|
|21780|1.1105466561585526|
|1813 |0.8884373249268421|
|9025 |0.8884373249268421|
|14554|1.1105466561585526|
|1780 |0.8884373249268421|
|16132|1.1105466561585526|
|22467|1.1105466561585526|
|2117 |0.8884373249268421|
|16321|1.1105466561585526|
+-----+------------------+
only showing top 20 rows

+-----+------------------+
|_id |pagerank |
+-----+------------------+
|18222|3.331639968475657 |
|19178|2.8874213060122367|
|19486|2.8874213060122367|
|12765|1.9989839810853947|
|13332|1.9989839810853947|
|13505|1.9989839810853947|
|17582|1.9989839810853947|
|18889|1.776874649853684 |
|19176|1.776874649853684 |
|12470|1.776874649853684 |
|19397|1.5547653186219736|
|18141|1.5547653186219736|
|11750|1.5547653186219736|
|18555|1.5547653186219736|
|13216|1.5547653186219736|
|18905|1.5547653186219736|
|17688|1.5547653186219736|
|19734|1.5547653186219736|
|12768|1.5547653186219736|
|18609|1.5547653186219736|
|18347|1.332655987390263 |
|19473|1.332655987390263 |
|12007|1.332655987390263 |
|12100|1.332655987390263 |
|16268|1.332655987390263 |
|19298|1.332655987390263 |
|23258|1.332655987390263 |
|17938|1.332655987390263 |
|20042|1.332655987390263 |
|12639|1.332655987390263 |
|13309|1.332655987390263 |
|12739|1.332655987390263 |
|18037|1.332655987390263 |
|18757|1.332655987390263 |
|18344|1.332655987390263 |
|12461|1.332655987390263 |
|20037|1.332655987390263 |
|13754|1.332655987390263 |
|18864|1.332655987390263 |
|21713|1.332655987390263 |
|16872|1.332655987390263 |
|15710|1.332655987390263 |
|12229|1.332655987390263 |
|19183|1.332655987390263 |
|17928|1.332655987390263 |
|19118|1.332655987390263 |
|20243|1.332655987390263 |
|12223|1.332655987390263 |
|23219|1.332655987390263 |
|12529|1.332655987390263 |
|20865|1.332655987390263 |
|16538|1.332655987390263 |
|13253|1.332655987390263 |
|13329|1.332655987390263 |
|13198|1.332655987390263 |
|17719|1.332655987390263 |
|14661|1.1105466561585526|
|18584|1.1105466561585526|
|12532|1.1105466561585526|
|20466|1.1105466561585526|
+-----+------------------+
only showing top 60 rows

封装并写回nebula-graph实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import os

import findspark
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from py4j.java_gateway import java_import

from config import nebula_config

# TODO: environment variables
os.environ["HADOOP_CONF_DIR"] = "/exp/server/hadoop-2.7.7/etc/hadoop"
os.environ["YARN_CONF_DIR"] = "/exp/server/hadoop-2.7.7/etc/hadoop"
os.environ["SPARK_HOME"] = "/exp/server/spark-2.4.7-bin-hadoop2.7"
os.environ["PYSPARK_PYTHON"] = "/root/anaconda3/envs/pyspark37/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/root/anaconda3/envs/pyspark37/bin/python"
findspark.init()


class PageRank:
"""
pagerank
"""
def __init__(self):
self.df = None
self.dfc = None
self.tag = None
self.id_df = None
self.df_int = None
self.encode_id = None
self.src_id_df = None
self.dst_id_df = None
self.src_join_df = None
self.dst_join_df = None

# TODO: spark object with nebula-spark-connector & nebula-algorithm
self.spark = SparkSession.builder.appName(
"pagerank")\
.master(
"local[*]")\
.config(
"spark.jars",
"/exp/work/pyspark/nebula-spark-connector/nebula-spark-connector/target/nebula-spark-connector-2.6.1.jar")\
.config(
"spark.driver.extraClassPath",
"/exp/work/pyspark/nebula-spark-connector/nebula-spark-connector/target/nebula-spark-connector-2.6.1.jar")\
.config(
"spark.jars",
"/exp/work/pyspark/nebula-algorithm/nebula-algorithm/target/nebula-algorithm-2.6.1.jar")\
.config(
"spark.driver.extraClassPath",
"/exp/work/pyspark/nebula-algorithm/nebula-algorithm/target/nebula-algorithm-2.6.1.jar")\
.getOrCreate()

# TODO: java import
self.jspark = self.spark._jsparkSession

# import "com.vesoft.nebula.algorithm.config.SparkConfig"
java_import(self.spark._jvm, "com.vesoft.nebula.algorithm.config.SparkConfig")

# import "com.vesoft.nebula.algorithm.config.PRConfig"
java_import(self.spark._jvm, "com.vesoft.nebula.algorithm.config.PRConfig")

# import "com.vesoft.nebula.algorithm.lib.PageRankAlgo"
java_import(self.spark._jvm, "com.vesoft.nebula.algorithm.lib.PageRankAlgo")

def set_tag(self, tag: list):
self.tag = tag

def exe(self):
self.sdf_create()

# TODO: covert fixed_string vid 2 long vid
def convert(self, df):
self.df = df.drop("creation_time")
self.src_id_df = self.df.select("_srcId").withColumnRenamed("_srcId", "id")
self.dst_id_df = self.df.select("_dstId").withColumnRenamed("_dstId", "id")
self.id_df = self.src_id_df.union(self.dst_id_df).distinct()
self.encode_id = self.id_df.withColumn("encodedId", dense_rank().over(Window.orderBy("id")))
self.src_join_df = self.df.join(self.encode_id, self.df._srcId == self.encode_id.id) \
.drop("_srcId") \
.drop("id") \
.withColumnRenamed("encodedId", "_srcId")

self.dst_join_df = self.src_join_df.join(self.encode_id, self.src_join_df._dstId == self.encode_id.id) \
.drop("_dstId") \
.drop("_rank") \
.drop("degree") \
.withColumnRenamed("encodedId", "_dstId").drop("id")
# self.dst_join_df.write.option("header", True).csv("file:/exp/work/pyspark/nebula/pr_file/1.csv")

# TODO: sdf build by nebula_reader
def sdf_create(self):
for i in range(len(self.tag)):
self.dfc = self.spark.read.format(
"com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "edge").option(
"spaceName", nebula_config.get("space")).option(
"label", f"{self.tag[i]}").option(
"returnCols", "creation_time").option(
"metaAddress", nebula_config.get("metaAddress")).option(
"partitionNumber", nebula_config.get("partitionNumber")).load()
if i == 0:
self.df = self.dfc
else:
self.df = self.df.unionByName(self.dfc)

self.convert(self.df)


# TODO: NEBULA-READER
def nebula_reader(sth, tag):
dataframe = sth.spark.read.format(
"com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", nebula_config.get("space")).option(
"label", f"{tag}").option(
"returnCols", "pagerank").option(
"metaAddress", nebula_config.get("metaAddress")).option(
"partitionNumber", nebula_config.get("partitionNumber")).load().drop("pagerank")
return dataframe


# TODO: UPDATE NEBULA-GRAPH
def nebula_writer(tag, dataframe):
dataframe.write.format("com.vesoft.nebula.connector.NebulaDataSource").option(
"type", "vertex").option(
"spaceName", nebula_config.get("space")).option(
"label", f"{tag}").option(
"vidPolicy", "").option(
"vertexField", "_vertexId").option(
"writeMode", "update").option(
"batch", nebula_config.get("batch")).option(
"metaAddress", nebula_config.get("metaAddress")).option(
"graphAddress", nebula_config.get("graphAddress")).option(
"passwd", nebula_config.get("passwd")).option(
"user", nebula_config.get("user")).save()


# TODO: process data
def process(tag: list, *args):
# TODO: algorithm object
obj = PageRank()
obj.set_tag(tag)
obj.exe()

encode_id = obj.encode_id
df_int = obj.dst_join_df
df_spark = df_int

# TODO: nebula-algorithm
config = obj.spark._jvm.PRConfig(1, 0.8)
result = obj.spark._jvm.PageRankAlgo.apply(obj.jspark, df_spark._jdf, config, False)

# TODO: jdf to sdf
result = result.toDF()
algo_df = DataFrame(result, obj.spark)

# TODO: long vid mapping fixed_string vid
df = encode_id.join(algo_df, encode_id.encodedId == algo_df._id) \
.drop("encodedId") \
.drop("_id") \
.withColumnRenamed("id", "_vertexId")

for arg in args:
df_ = nebula_reader(obj, arg)
df_ = df_.join(df, df_._vertexId == df._vertexId, "leftsemi")
df__ = df.join(df_, df._vertexId == df_._vertexId, "leftsemi")
nebula_writer(arg, df__)


if __name__ == '__main__':
process(["edge_phone"], "dwd_phone", "dwd_account")


# TODO: PYSPARK (pyspark=2.4.7, python=3.7)
# conda activate pyspark37
# TODO: SPARK-SUBMIT
# ${SPARK_HOME}/bin/spark-submit --master local[*] \
# --deploy-mode client \
# --driver-class-path file:/exp/work/pyspark/nebula-spark-connector/nebula-spark-connector/target/nebula-spark-connector-2.6.1.jar \
# --driver-class-path file:/exp/work/pyspark/nebula-algorithm/nebula-algorithm/target/nebula-algorithm-2.6.1.jar \
# --jars file:/exp/work/pyspark/nebula-spark-connector/nebula-spark-connector/target/nebula-spark-connector-2.6.1.jar \
# --jars file:/exp/work/pyspark/nebula-algorithm/nebula-algorithm/target/nebula-algorithm-2.6.1.jar \
# /exp/work/pyspark/nebula/pagerank.py

fastapi接口实例

`python
import os

import findspark

from pyspark.sql import SparkSession

from pyspark.sql import DataFrame
from pyspark.sql.functions import *
from typing import List
from fastapi import FastAPI, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel

from config import nebula_config, pagerank_dict
from algorithm.pagerank import PageRank
from algorithm.louvain import Louvain

app_router = FastAPI(docs_url=None, redoc_url=None)

CORS

origins = [
“*”
]

app_router.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=[““],
allow_headers=[“
“]
)

class NebulaAlgorithm(BaseModel):
algo: str
tag_list: List[str]

algorithm_dict = {
“pagerank”: PageRank(),
“louvain”: Louvain()
}

@app_router.post(“/nebula/algo”)
async def nebula_algorithm(
args: NebulaAlgorithm
):

# TODO: algorithm object
tags = []
for line in args.tag_list:
    for item in pagerank_dict.get(line):
        tags.append(item)

algo = algorithm_dict.get(args.algo)
obj = algo
obj.set_tag(tags)
obj.exe()

# TODO: nebula-algorithm
encode_id = obj.encode_id
df_int = obj.dst_join_df

if algo.__doc__.strip() == 'pagerank':
    if len(tags) == 1:
        df_spark = df_int
    else:
        df_pd = df_int.toPandas()
        values = df_pd.values.tolist()
        columns = df_pd.columns.tolist()
        df_spark = obj.spark.createDataFrame(values, columns)

    config = obj.spark._jvm.PRConfig(1, 0.8)
    result = obj.spark._jvm.PageRankAlgo.apply(obj.jspark, df_spark._jdf, config, False)

elif algo.__doc__.strip() == 'louvain':
    if len(tags) == 1:
        df_spark = df_int
    else:
        df_pd = df_int.toPandas()
        values = df_pd.values.tolist()
        columns = df_pd.columns.tolist()
        df_spark = obj.spark.createDataFrame(values, columns)

    config = obj.spark._jvm.LouvainConfig(20, 10, 0.5)
    result = obj.spark._jvm.LouvainAlgo.apply(obj.jspark, df_spark._jdf, config, False)

else:
    return

# TODO: jdf to sdf
result = result.toDF()
algo_df = DataFrame(result, obj.spark)

# TODO: long vid mapping fixed_string vid
df = encode_id.join(algo_df, encode_id.encodedId == algo_df._id) \
    .drop("encodedId") \
    .drop("_id") \
    .withColumnRenamed("id", "_vertexId")

df.show()

if name == “main“:
import uvicorn
uvicorn.run(app=”main:app_router”, reload=True, host=’0.0.0.0’, port=7792)

TODO: PYSPARK (pyspark=2.4.7, python=3.7)

conda activate pyspark37

TODO: SPARK-SUBMIT

${SPARK_HOME}/bin/spark-submit –master local[*] \

–deploy-mode client \

–driver-class-path file:/exp/work/pyspark/nebula-spark-connector/nebula-spark-connector/target/nebula-spark-connector-2.6.1.jar \

–driver-class-path file:/exp/work/pyspark/nebula-algorithm/nebula-algorithm/target/nebula-algorithm-2.6.1.jar \

–jars file:/exp/work/pyspark/nebula-spark-connector/nebula-spark-connector/target/nebula-spark-connector-2.6.1.jar \

–jars file:/exp/work/pyspark/nebula-algorithm/nebula-algorithm/target/nebula-algorithm-2.6.1.jar \

/exp/work/pyspark/nebula/main.py

Powered by Hexo and Hexo-theme-hiker

Copyright © 2017 - 2024 青域 All Rights Reserved.

UV : | PV :