Apache Flink 说道系列 - 如何在PyFlink 1.10中自定义Python UDF

开篇说道

老子在《道德经》第七章中说:"天长地久,天地所以能长且久者,以其不自生,故能长生。是以圣人后其身而身先,外其身而身存。以其无私,故能成其私。"。其核心内容简单来说就是 "利他",即:人为自己照亮别人,外其身而身存!这里我也分享一个真实的事情和一个我不记得在哪里读到的故事:

前些时我回老家发现村里给每个盲人发了一个拐杖,这个拐杖竟然还有手电筒,我第一反应很是诧异:"盲人要手电筒干什么?", 不过我想聪明的你一定在笑我傻,盲人的手电筒当然是为了照亮行路。没错,盲人打手电筒照亮漆黑夜路,方便其他行人的同时,也保护了自己!

还有一个故事,那是在战争年代,一个班长带队去打仗,当一个炮弹飞来那一刹那,他刚要卧倒,却看见一个新兵还在傻傻的站着,没有丝毫避让的意识,他立马扑过去,用身体保护着新兵,当一声巨响之后,他刚要训斥新兵为啥不懂得卧倒自保的时候,后头发现,炮弹正将他最初要卧倒的地方炸的很大的坑。

“天长地久,天地所以能长且久者,以其不自生,故能长生”,天地之所以长久是因为他们的从不考虑自身的得失,而是默默无闻的滋养着万物,万物自然割舍不下天地,天地自会伴随生生不息的万物长存。“是以圣人后其身而身先,外其身而身存。以其无私,故能成其私” 品行高尚的人总是先考虑别人的得失,当然人人都感激其德行,自然也就尊为圣人。

我们都是普通人,但我们始终要 相信小的伟大,为别人,也为自己,始终保持“利他”的精神。

这篇博客的意义

每篇博客都有其预期的目标,本篇希望再Apache Flink 1.10发布之前提前给大家介绍一下PyFlink的Python UDF的功能,让Apche Fink的Python用户可以轻松的应用PyFlink开发自己的业务需求。

Python UDF的发展趋势

我们知道PyFlink是在Apache Flink 1.9版新增的,那么在Apache Flink 1.10中Python UDF功能支持的速度是否能够满足用户的急切需求呢?

直观的判断,PyFlink Python UDF的功能可以如上图一样可以迅速从幼苗变成大树,为啥有此判断,请继续往下看...

Flink on Beam

我们都知道有Beam on Flink的场景,就是Beam支持多种Runner,也就是说Beam SDK 编写的Job可以运行在Flink之上。如下图所示:

上面这图是Beam Portability Framework的架构图,他描述了Beam如何支持多语言,如何支持多Runner,单独说Apache Flink的时候我们就可以说是Beam on Flink,那么怎么解释 Flink on Beam呢?

在Apache Flink 1.10中我们所说的 Flink on Beam更精确的说是PyFlink on Beam Portability Framework。我们看一下简单的架构图,如下: Beam Portability Framework 是一个成熟的多语言支持框架,框架高度抽象了语言之间的通信协议(gRPC),定义了数据的传输格式(Protobuf),并且根据通用流计算框架所需要的组件,抽象个各种服务,比如DataService,StateService,MetricsService等。在这样一个成熟的框架下,PyFlink可以快速的构建自己的Python算子,同时重用Apache Beam Portability Framework中现有SDK harness组件,可以支持多种Python运行模式,如:Process,Docker,etc.,这使得PyFlink对Python UDF的支持变得非常容易,在Apache Flink 1.10中的功能也非常的稳定和完整。那么为啥说是Apache Flink和Apache Beam 共同打造呢,是因为我发现目前Apache Beam Portability Framework的框架也存在很多优化的空间,所以我在Beam社区进行了优化讨论,详情,并且在Beam 社区也贡献了20+的优化补丁,详情。 概要了解了Apache Flink 1.10中Python UDF的架构之后,我们还是切入的代码部分,看看如何开发和使用Python UDF...

如何定义Python UDF

在Apache Flink 1.10中我们有多种方式进行UDF的定义,比如:

  • Extend ScalarFunction, e.g.:

1
2
3
class HashCodeMean(ScalarFunction):
def eval(self, i, j):
return (hash(i) + hash(j)) / 2

  • Lambda Function

1
lambda i, j: (hash(i) + hash(j)) / 2

  • Named Function

1
2
def hash_code_mean(i, j):
return (hash(i) + hash(j)) / 2

  • Callable Function

1
2
3
class CallableHashCodeMean(object):
def __call__(self, i, j):
return (hash(i) + hash(j)) / 2

我们发现上面定义函数除了第一个扩展ScalaFunction的方式是PyFlink特有的,其他方式都是Python语言本身就支持的,也就是说,在Apache Flink 1.10中PyFlink允许以任何Python语言所支持的方式定义UDF。

如何使用Python UDF

那么定义完UDF我们应该怎样使用呢?Apache Flink 1.10中提供了2种Decorators,如下:

  • Decorators - udf(), e.g. :

1
2
udf(lambda i, j: (hash(i) + hash(j)) / 2,
[for input types], [for result types])

  • Decorators - @udf, e.g. :

1
2
3
@udf(input_types=..., result_type=...) 
def hash_code_mean(…):
return …

然后在使用之前进行注册,如下:

1
st_env.register_function("hash_code", hash_code_mean)

接下来就可以在Table API/SQL中进行使用了,如下:

1
my_table.select("hash_code_mean(a, b)").insert_into("Results")

目前为止,我们已经完成了Python UDF的定义,声明和注册了。接下来我们还是看一个完整的势力吧:)

案例描述

  • 需求: 假设 苹果 公司要统计该公司产品在双11期间各城市的销售数量和销售金额分布情况。
  • 数据格式 每一笔订单是一个字符串,字段用逗号分隔, 例如:

1
2
3
4
ItemName, OrderCount, Price, City
-------------------------------------------
iPhone 11, 30, 5499, Beijing\n
iPhone 11 Pro,20,8699,Guangzhou\n

案例分析

根据案例的需求和数据结构分析,我们需要对原始字符串进行结构化解析,那么需要一个按“,”号分隔的UDF(split)和一个能够将各个列信息展平的DUF(get)。同时我们需要根据城市进行分组统计。

核心实现

UDF定义

  • Split UDF

1
2
3
4
@udf(input_types=[DataTypes.STRING()],
result_type=DataTypes.ARRAY(DataTypes.STRING()))
def split(line):
return line.split(",")

  • Get UDF

1
2
3
@udf(input_types=[DataTypes.ARRAY(DataTypes.STRING()), DataTypes.INT()], result_type=DataTypes.STRING())
def get(array, index):
return array[index]

注册UDF

  • 注册 Split UDF

    1
    t_env.register_function("split", split)

  • 注册 Get UDF

1
t_env.register_function("get", get)

核心实现逻辑

如下代码我们发现核心实现逻辑非常简单,只需要对数据进行解析和对数据进行集合计算:

1
2
3
4
5
t_env.from_table_source(SocketTableSource(port=9999))\        .alias("line")\        .select("split(line) as str_array")\        .select("get(str_array, 3) as city, "                     "get(str_array, 1).cast(LONG) as count, "                     "get(str_array, 2).cast(LONG) as unit_price")\        .select("city, count, count * unit_price as total_price")\       
.group_by("city")\ .select("city, sum(count) as sales_volume, sum(total_price)
as sales")\
.insert_into("sink")
t_env.execute("Sales Statistic")

上面的代码我们假设是一个Socket的Source,Sink是一个Chart Sink,那么最终运行效果图,如下: -w1087

我总是认为在博客中只是文本描述而不能让读者真正的在自己的机器上运行起来的博客,不是好博客,所以接下来我们看看按照我们下面的操作,是否能在你的机器上也运行起来? :)

环境

因为目前PyFlink还没有部署到PyPI上面,在Apache Flink 1.10发布之前,我们需要通过构建flink的master分支源码来构建运行我们Python UDF的PyFlink版本。

源代码编译

在进行编译代码之前,我们需要你已经安装了JDK8Maven3x

  • 下载解压

1
2
tar -xvf apache-maven-3.6.1-bin.tar.gz
mv -rf apache-maven-3.6.1 /usr/local/

  • 修改环境变量(~/.bashrc)

1
2
3
MAVEN_HOME=/usr/local/apache-maven-3.6.1
export MAVEN_HOME
export PATH=${PATH}:${MAVEN_HOME}/bin

除了JDK和MAVEN完整的环境依赖性如下:

  • JDK 1.8+ (1.8.0_211)
  • Maven 3.x (3.2.5)
  • Scala 2.11+ (2.12.0)
  • Python 3.6+ (3.7.3)
  • Git 2.20+ (2.20.1)
  • Pip3 19+ (19.1.1)

我们看到基础环境安装比较简单,我这里就不每一个都贴出来了。如果大家有问题欢迎邮件或者博客留言。

  • 下载flink源代码:

1
git clone https://github.com/apache/flink.git

  • 编译

1
2
3
4
5
6
7
8
9
10
11
cd flink
mvn clean install -DskipTests
...
...
[INFO] flink-walkthrough-datastream-scala ................. SUCCESS [ 0.192 s]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 18:34 min
[INFO] Finished at: 2019-12-04T23:03:25+08:00
[INFO] ------------------------------------------------------------------------

  • 构建PyFlink发布包

1
2
3
4
5
6
7
cd flink-python; python3 setup.py sdist bdist_wheel
...
...
adding 'apache_flink-1.10.dev0.dist-info/WHEEL'
adding 'apache_flink-1.10.dev0.dist-info/top_level.txt'
adding 'apache_flink-1.10.dev0.dist-info/RECORD'
removing build/bdist.macosx-10.14-x86_64/wheel

  • 安装PyFlink(PyFlink 1.10 需要Python3.6+)

1
2
3
4
pip3 install dist/*.tar.gz
...
...
Successfully installed apache-beam-2.15.0 apache-flink-1.10.dev0 avro-python3-1.9.1 cloudpickle-1.2.2 crcmod-1.7 dill-0.2.9 docopt-0.6.2 fastavro-0.21.24 future-0.18.2 grpcio-1.25.0 hdfs-2.5.8 httplib2-0.12.0 mock-2.0.0 numpy-1.17.4 oauth2client-3.0.0 pbr-5.4.4 protobuf-3.11.1 pyarrow-0.14.1 pyasn1-0.4.8 pyasn1-modules-0.2.7 pydot-1.4.1 pymongo-3.9.0 pyyaml-3.13 rsa-4.0

也可以查看一下,我们核心需要apache-beam和apache-flink,如下命令:

1
2
3
4
5
6
7
jincheng:flink-python jincheng.sunjc$ pip3 list
Package Version
----------------------------- ---------
alabaster 0.7.12
apache-beam 2.15.0
apache-flink 1.10.dev0
atomicwrites 1.3.0

如上信息证明你我们所需的Python依赖已经okay了,接下来回过头来在看看如何进行业务需求的开发。

PyFlinlk的Job结构

一个完成的PyFlink的Job需要有外部数据源的定义,有业务逻辑的定义和最终计算结果输出的定义。也就是 Source connector, Transformations, Sink connector,接下来我们根据这个三个部分进行介绍来完成我们的需求。

Source Connector

我们需要实现一个Socket Connector,首先要实现一个StreamTableSource, 核心代码是实现getDataStream,代码如下:

1
2
3
4
5
6
@Override
public DataStream<Row> getDataStream(StreamExecutionEnvironment env) {
return env.socketTextStream(hostname, port, lineDelimiter, MAX_RETRY)
.flatMap(new Spliter(fieldNames.length, fieldDelimiter, appendProctime))
.returns(getReturnType());
}

上面代码利用了StreamExecutionEnvironment中现有socketTextStream方法接收数据,然后将业务订单数据传个一个FlatMapFunction, FlatMapFunction主要实现将数据类型封装为 Row,详细代码查阅 Spliter

同时,我们还需要在Python封装一个SocketTableSource,详情查阅 socket_table_source.py

Sink Connector

我们预期要得到的一个效果是能够将结果数据进行图形化展示,简单的思路是将数据写到一个本地的文件,然后在写一个HTML页面,使其能够自动更新结果文件,并展示结果。所以我们还需要自定义一个Sink来完成该功能,我们的需求计算结果是会不断的更新的,也就是涉及到Retraction(如果大家不理解这个概念,可以查阅我以前的博客),目前在Flink里面还没有默认支持Retract的Sink,所以我们需要自定义一个RetractSink,比如我们实现一下CsvRetractTableSink. CsvRetractTableSink的核心逻辑是缓冲计算结果,每次更新进行一次全量(这是个纯demo,不能用于生产环境)文件输出。源代码查阅CsvRetractTableSink

同时我们还需要利用Python进行封装,详见 chart_table_sink.py

chart_table_sink.py我们封装了一个http server,这样我们可以在浏览器中查阅我们的统计结果。

业务逻辑

完成自定义的 Source 和 Sink之后我们终于可以进行业务逻辑的开发了,其实整个过程自定义 Source和Sink是最麻烦的,核心计算逻辑似乎要简单的多。

  • 设置Python版本(很重要) 如果你本地环境 python 命令版本是2.x,那么需要对Python版本进行设置,如下:

1
t_env.get_config().set_python_executable("python3")

PyFlink 1.10之后支持Python3.6+版本。

  • 读取数据源 PyFlink读取数据源非常简单,如下:

1
2
3
...
...
t_env.from_table_source(SocketTableSource(port=9999)).alias("line")

上面这一行代码定义了监听端口 9999 的数据源,同时结构化Table只有一个名为line的列。

  • 解析原始数据 我们需要对上面列进行分析,为了演示 Python UDF,我们在SocketTableSource中并没有对数据进行预处理,所以我们利用上面 UDF定义 一节定义的UDF,来对原始数据进行预处理。

1
2
3
4
5
...
...
.select("split(line) as str_array")
.select("get(str_array, 3) as city, " "get(str_array, 1).cast(LONG) as count, " "get(str_array, 2).cast(LONG) as unit_price")
.select("city, count, count * unit_price as total_price")

  • 统计分析 核心的统计逻辑是根据 city 进行分组,然后对 销售数量和销售金额进行求和,如下:

1
2
3
4
5
...
...
.group_by("city")
.select("city, sum(count) as sales_volume, sum(total_price)
as sales")\

  • 计算结果输出 计算结果写入到我们自定义的Sink中,如下:

1
2
3
...
...
.insert_into("sink")

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.demo import ChartConnector, SocketTableSource
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes
from pyflink.table.descriptors import Schema
from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(
env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
t_env.connect(ChartConnector())\
.with_schema(Schema()
.field("city", DataTypes.STRING())
.field("sales_volume", DataTypes.BIGINT())
.field("sales", DataTypes.BIGINT()))\
.register_table_sink("sink")


@udf(input_types=[DataTypes.STRING()],
result_type=DataTypes.ARRAY(DataTypes.STRING()))
def split(line):
return line.split(",")


@udf(input_types=[DataTypes.ARRAY(DataTypes.STRING()), DataTypes.INT()],
result_type=DataTypes.STRING())
def get(array, index):
return array[index]

t_env.get_config().set_python_executable("python3")

t_env.register_function("split", split)
t_env.register_function("get", get)
t_env.from_table_source(SocketTableSource(port=6666))\
.alias("line")\
.select("split(line) as str_array")\
.select("get(str_array, 3) as city, "
"get(str_array, 1).cast(LONG) as count, "
"get(str_array, 2).cast(LONG) as unit_price")\
.select("city, count, count * unit_price as total_price")\
.group_by("city")\
.select("city, "
"sum(count) as sales_volume, "
"sum(total_price) as sales")\
.insert_into("sink")

t_env.execute("Sales Statistic")

上面代码中大家会发现一个陌生的部分,就是 from pyflink.demo import ChartConnector, SocketTableSource. 其中 pyflink.demo是哪里来的呢?其实就是包含了上面我们介绍的 自定义Source/Sink(Java&Python)。 下面我们来介绍如何增加这个 pyflink.demo模块。

安装 pyflink.demo

为了大家方便我把自定义Source/Sink(Java&Python)的源代码放到了 这里 ,大家可以进行如下操作:

  • 下载源码

1
git clone https://github.com/sunjincheng121/enjoyment.code.git

  • 编译源码

1
cd enjoyment.code/PyUDFDemoConnector/; mvn clean install

  • 构建发布包

1
2
3
4
5
6
7
python3 setup.py sdist bdist_wheel
...
...
adding 'pyflink_demo_connector-0.1.dist-info/WHEEL'
adding 'pyflink_demo_connector-0.1.dist-info/top_level.txt'
adding 'pyflink_demo_connector-0.1.dist-info/RECORD'
removing build/bdist.macosx-10.14-x86_64/wheel

  • 安装Pyflink.demo

1
2
3
4
5
6
pip3 install dist/pyflink-demo-connector-0.1.tar.gz
...
...
Successfully built pyflink-demo-connector
Installing collected packages: pyflink-demo-connector
Successfully installed pyflink-demo-connector-0.1

出现上面信息证明已经将PyFlink.demo模块成功安装。接下来我们可以运行我们的示例了 :)

运行示例

示例的代码在上面下载的源代码里面已经包含了,为了简单,我们利用PyCharm打开enjoyment.code/myPyFlink。 同时在Terminal 启动一个端口:

1
nc -l 6666

启动blog_demo,如果一切顺利,启动之后,控制台会输出一个web地址,如下所示: 我们打开这个页面,开始是一个空白页面,如下:

我们尝试将下面的数据,一条,一条的发送给Source Connector:

1
2
3
4
5
6
7
8
9
10
11
12
13
iPhone 11,30,5499,Beijing
iPhone 11 Pro,20,8699,Guangzhou
MacBook Pro,10,9999,Beijing
AirPods Pro,50,1999,Beijing
MacBook Pro,10,11499,Shanghai
iPhone 11,30,5999,Shanghai
iPhone 11 Pro,20,9999,Shenzhen
MacBook Pro,10,13899,Hangzhou
iPhone 11,10,6799,Beijing
MacBook Pro,10,18999,Beijing
iPhone 11 Pro,10,11799,Shenzhen
MacBook Pro,10,22199,Shanghai
AirPods Pro,40,1999,Shanghai

当输入第一条订单 iPhone 11,30,5499,Beijing,之后,页面变化如下:

随之订单数据的不断输入,统计图不断变化。一个完整的gif演示如下:

小结

本篇从架构到UDF接口定义,在到具体的实例,向大家介绍了在Apache Flink 1.10 发布之后,如何利用PyFlink进行业务开发,其中 用户自定义 Source和Sink部分比较复杂,这也是目前社区需要进行改进的部分(Java/Scala)。真正的核心逻辑部分其实比较简单,为了大家按照本篇进行实战操作有些成就感,所以我增加了自定义Source/Sink和图形化部分。但如果大家想简化实例的实现也可以利用Kafka作为Source和Sink,这样就可以省去自定义的部分,做起来也会简单一些。 同时该篇的 开篇说道 部分重点分享了老子 "利他" 的思想,希望我们大家共勉,做一个有助于他人的人。谢谢大家!