Apache Flink 视频系列 - 必修课!一小时吃透PyFlink

本文整理于 Apache Flink PMC/Apache Beam Committer 孙金城(金竹)的《PyFlink 架构、应用案例及未来规划》直播分享。详见:http://dwz.date/QFx

<center>图1</center>

开场

大家晚上好,很开心以直播的方式与大家进行第二次PyFlink的分享。这里可能有些朋友是第一次看我直播,所以还是先简单的自我介绍一下。我叫孙金城,来自阿里巴巴,淘宝花名金竹,平时喜欢写一些和Flink相关的技术博客。早在2016我就被Apache Flink的纯流式架构所吸引,目前是Apache Flink的PMC 成员。近年来各大互联网公司的大数据部门都有要建设统一用户接口,透明切换底层大数据计算引擎的平台需求,这一点上Apache Beam做的非常优秀,经过不断的贡献,目前我也是Apache Beam的Committer。随着5G的到来,IoT领域似乎要迎来一个崭新的春天,IoT与流计算的融合势在必行,目前我也是Apache IoTDB的Committer。作为一个开源爱好者,除了对各个开源项目进行贡献,我也喜欢参加ASF组织的各种活动,目前也是ASF Beijing的Member。那么在自我介绍的同时也与大家分享了我对开源项目选择性参与和对知识获取的思考。

<center>图2</center>

分享目标

我希望通过今天的分享让大家了解到Python用户如何使用Flink现有的所有功能,同时和大家介绍Python生态如何利用Flink具备分布式的数据处理能力。那么以怎样的方式让大家了解到这些呢?我将以4个方面进行今天的分享,第一部分 主要介绍PyFlink的深层含义,并探讨存在的意义。第二部分,将细致分析PyFlink现有架构和架构背后的思考。 第三部分 将介绍PyFlink能应用在怎样的业务场景中,并结合CDN的业务场景让大家对PyFlink的业务开发有很好的体感认知。最后一部分,我将与大家分享PyFlink未来的重点功能规划和承载的使命愿景。我认为任何事物长久的,可持续发展,都需要有自身灵魂所在,都需要树立正确的使命愿景,就像阿里巴巴以让天下没有难做的生意为使命,以活102年,并到2036年,服务20亿消费者,创造1亿就业机会和帮助1000万中小企业为愿景一样,定义和寻找PyFlink的使命愿景对PyFlink的持续发展至关重要,我们在最后大家一起讨论。

<center>图3</center>

PyFlink 是啥?必要性?

<center>图4</center>

Flink on Python and Python on Flink

那么PyFlink是什么?这个问题也许会让人感觉问题的答案太明显了,那就是Flink + Python,也就是Flink on Python。那么到底Flink on Python 意味着这什么呢?那么一个非常容易想到的方面就是能够让Python用户享受到Flink的所有功能。其实不仅如此,PyFlink的存在还有另外一个非常重要的意义就是,Python on Flink,我们可以将Python丰富的生态计算能力运行在Flink框架之上,这将极大的推动Python生态的发展。其实,如果你在仔细深究一下,你会发现这个结合并非偶然。

<center>图5</center>

Python生态和大数据生态

Pythoh生态与大数据生态有密不可分的关系,我们先看看大家都在用Python解决什么实际问题?通过一份用户调查我们发现,大多数Python用户正在解决 ”数据分析“,”机器学习“的问题,那么这些问题场景在大数据领域也有很好的解决方案。那么Python生态和大数据生态结合,抛开扩大大数据产品的受众用户之外,对Python生态一个特别重要到意义就是 单机到分布式的能力增强,我想,这也是大数据时代海量数据分析对Python生态的强需求。

<center>图6</center>

Why Flink and Why Python

好了,Python生态和大数据的结合是时代的要求,那么Flink为啥选择Python生态作为多语言支持的切入点,而不是Go或者R呢?作为用户的你,为啥选择PyFlink而不是PySpark或者PyHive呢?首先我们说说选择Flink的理由:

  • 第一,最主要的是架构优势,Flink是纯流架构的流批统一的计算引擎;
  • 第二,从ASF的客观统计看,Flink是2019年度最活跃的开源项目,这意味着Flink鲜活的生命力;
  • 第三,Flink不仅仅是开源项目而且也经历过无数次,各个大数据公司的生产环境的历练,值得信赖!

那么我再来看看Flink在选择多语言支持时候,为啥选择了Python而不是其他语言呢?我们还是看一下数据统计,如下:Python语言流行程度仅次于Java和C,其实我们发现自18年开始Python的发展非常迅速,并且还在持续。那么Java/Scala是Flink的默认语言,所以选择Python来进行Flink多语言的支持似乎很合理。这些权威的统计信息,大家可以在我提供的链接进行查看更详细的信息。

<center>图7</center>

目前看PyFlink的产生是时代的必然,但仅仅想清楚PyFlink存在的意义还远远不够,因为我们最终的目标是让Flink和Python用户受益,真真正正的解决实际的现实问题。所以,我们还需要继续深入,一起探究PyFlink该如何落地?

<center>图8</center>

PyFlink 架构,为什么?

任何事情在想清楚之后,还要做明白,要将PyFlink落地,首要解决的是分析清楚要达成的核心目标和要达成目标解决的核心问题。那么PyFlink的核心目标到底是什么呢?

<center>图9</center>

PyFlink的核心目标

我们在前面的分析过程中已经提到过,这里我们再具体化一下,PyFlink的核心目标就是:

  • 第一,将Flink能力输出到Python用户,进而可以让Python用户使用所有的Flink能力。
  • 第二,将Python生态现有的分析计算功能运行到Flink上,进而增强Python生态对大数据问题的解决能力。

围绕这2个核心的目标,我们再来分析,要达成这样的目标,需要解决的核心问题是什么?

<center>图10</center>

Flink功能Python化

为了PyFlink落地,我们需要在Flink上开发一套和现有Java一样的Python的引擎吗?答案是 NO,这在Flink1.8之前已经尝试过。我们做设计有一个很好的原则就是追求以最小的代价完成既定的目标,所以最好的方式是仅仅提供一层Python API 复用现有的计算引擎。那么对于Flink而言我们要提供怎样的Python API呢?那就是我们熟知的:

  • High-level的TableAPI/SQL;
  • 有状态的DataStream API;

好,我们现在的思考越来越切近Flink内部了,接踵而来的问题就是,我们如何提供一套Python的Table API和DataStream API呢?核心要解决的问题是什么呢?

<center>图11</center>

Flink功能Python化的核心问题

核心问题显而易见是Python VM和Java VM的握手,他们之间要建立通讯,这是Flink多语言支持的核心问题。好,面对核心问题我们要进行技术选型. Here we go...

<center>图12</center>

Flink功能Python化的VM通讯技术选型

就当前的Java VM和Python VM通讯的问题而言,目前比较显著的解决方案有Apache Beam,一个著名的多语言多引擎支持项目,另外一个专门解决Java VM和Python VM通讯问的的Py4J。我们从不同视角进行分析对比, 首先,Py4J和Beam对比,就好像有穿山功能的穿山甲和一个力量强大的大象,要穿越一到墙,我们可以打个洞,也可以推到整面墙。所以在当前VM通讯的场景,Beam显得有些复杂。因为Beam在通用性上做了很多的努力,在极端情况会丧失一定程度的灵活性。

<center>图13</center>

从另一个视角来看,Flink本身有交互式编程的需求,比如FLIP-36,同时还要在多语言支持的同时,确保各种语言的接口设计语义一致性,这些在Beam现有的架构下很难满足。所以在这样一种思考下,我们选择Py4J作为Java VM和Python VM之间通讯的桥梁。

<center>图14</center>

Flink功能Python化的技术架构

其实如果我们解决了Python VM和Java VM通讯的问题,本质上是在努力达成我们第一个目标,就是将现有Flink功能输出给Python用户,也就是我们Flink1.9所完成的工作,接下来我们看看Flink1.9 PyFlink API 的架构,如下:我们利用Py4J解决通讯问题,在PythonVM启动一个Gateway,并且Java VM启动一个Gateway Server用于接受Python 的请求,同时在Python API里面提供和Java API一样的对象,比如 TableENV, Table,等等。这样Python在写Python API的时候本质是在调用Java API。当然,在Flink 1.9中还解决了作业部署问题,我们可以用Python命令,Python shell和CLI等多种方式进行作业提交。

<center>图15</center>

那么基于这样的架构又怎样的优势呢?第一个就是简单,并确保Python API语义和Java API的一致性,第二点,Python 作业可以达到和Java一样的极致性能,那么Java的性能怎样呢?我想大家已经熟知,在去年双11 Flink Java API已经具备了每秒25.51亿次的数据处理的能力。

<center>图16</center>

Python生态分布化

OK,在完成了现有Flink功能向Python用户的输出之后,接下来我们继续探讨,如何将Python生态功能引入Flink中,进而将Python 功能分布式化。如何达成?通常我们可以有如下2种做法: 1. 选择有代表性的Python类库,将其API增加到PyFlink中,这种方式是一个漫长的过程,因为Python的生态库太多了,但无论如何,我们在引入这些APIs之前,首要解决的问题是,解决Python的执行问题。我们结合现有Flink Table API的现状和现有Python类库的特点,我们可以对现有所有的Python类库功能视为 用户自定义函数(UDF),集成到Flink中。这样我们就找到了集成Python生态到Flink中的手段是将其视为UDF,也就是我们Flink1.10中的工作。那么集成的核心问题是什么?没错,刚才说过,是Python UDF的执行问题。好,我们针对这个核心问题进行技术选型吧,Here we go...

<center>图17</center>

Python生态分布化的UDF执行技术选型

解决Python UDF执行问题可不仅仅是VM之间通讯的问题了,它涉及到Python执行环境的管理,业务数据在Java和Python之间的解析,Flink State Backend能力向Python的输出,Python UDF执行的监控等等,是一个非常复杂的问题。面对这样复杂的问题,前面我们介绍过Apache Beam,支持多引擎多语言,无所不能的大象可以出场了,我们来看一下Beam是怎么解决Python UDF执行问题的 :)Beam为了解决多语言和多引擎支持问题高度抽象了一个叫 Portability Framework 的架构,如下图,Beam目前可以支持Java/Go/Python等多种语言,其中图下方 Beam Fu Runners 和 Execution之间就解决了 引擎和UDF执行环境的问题。其核心是对利用Protobuf进行数据结构抽象,利用gRPC协议进行通讯,同时封装了核心的gRPC 服务。所以这时候Beam更像是一只萤火虫,照亮了PyFlink解决UDF执行问题之路。:)(多说一嘴,萤火虫已经成为了Aapche Beam的吉祥物)。我们接下来看看Beam到底提供了哪些gRPC服务。

<center>图18</center>

如图 Runner部分是Java的算子执行,SDK Worker部分是Python的执行环境,Beam已经抽象Control/Data/State/Logging等服务。并这些服务已经在Beam的Flink runner上稳定高效的运行了很久了。所以在PyFlink UDF执行上面我们可以站在巨人的肩膀上了:),这里我们发现Apache Beam 在API层面和在UDF的执行层面都有解决方案,而PyFlink在API层面采用了Py4J解决VM通讯问题,在UDF执行需求上采用了Beam的Protability Framework解决UDF执行环境问题。这也表明了PyFlink在技术选型上严格遵循以最小的代价达成既定目标的原则,在技术选型上永远会选择最合适的,最符合PyFlink长期发展的技术架构。(BTW,与Beam的合作过程中,我也向Beam社区提交了20+的优化patch)。

<center>图19</center>

Python生态分布化的UDF技术架构

OK,我们再整体看一下 PyFlink UDF的整体架构。在UDF的架构中我们我既要考虑Java VM和Python VM的通讯问题,又要考虑在编译阶段和在运行阶段的不同需求。图中我们以绿色表示Java VM的行为,蓝色表示Python VM的行为。首先我们看看编译阶段,也就是local的设计,在local的设计是纯API的mapping调用,我们仍然要过Py4J来解决通讯问题,也就是如图Python每执行一个API就会同步的调用Java所对应的API。对UDF的支持上,需要添加UDF注册的API,register_function,但仅仅是注册还不够,用户在自定义Python UDF的时候往往会依赖一些三方库,所以我们还需要增加添加依赖的方法,那就是一系列的add方法,比如add_Python_file()。再编写Python作业的同时,Java API也会同时被调用在提交作业之前,Java端会构建.

PyFlink_UDF.2020-03-10 15_01_30

<center>图20</center>

JobGraph。然后通过CLI等多种方式将作业提交到集群进行运行。我们再来看看运行时Python和Java的不同分工情况,首先在Java端与普通Java作业一样,JobMaster将作业分配给TaskManger,TaskManager会执行一个个Task,task里面就涉及到了Java和Python的算子执行。在Python UDF的算子中我们会设计各种gRPC服务来完成Java VM和Python VM的各种通讯,比如 DataService 完成业务数据通讯,StateService完成Python UDF对Java Statebackend的调用,当然还有Logging和Metrics等其他服务。这些服务都是基于Beam的Fn API来构建的,最终在Python的Worker里面运行用户的UDF,运行结束之后再利用对应的gRPC服务将结果返回给Java端的PythonUDF算子。当然Python的worker不仅仅是Process模式,可以是Docker模式甚至是External的服务集群。这种扩展机制,为后面PyFlink与Python生态的其他框架集成打下了坚实的基础,在后面我们介绍PyFlink大图的时候,我们会介绍这一点:)。好,这就是PyFlink 在1.10中引入Python UDF支持的架构。那么这样的架构有怎样的优势呢?

首先,Beam是一个成熟的多语言支持框架,基于Beam进行架构我们后面可以很容易进行其他语言的支持扩展。 同时Beam对State的服务抽象也方便PyFlink增加对Stateful UDF的支持。还有一个方面是方便维护,同一套框架由Apache Beam和Apache Flink两个非常活跃的社区共同维护和优化 ...

<center>图21</center>

PyFlink 场景,怎么用?

好了解了这么多关于PyFlink的架构和架构背后的思考,我们还是以一个具体场景案例,来增加一些对PyFlink的体感吧!

<center>图22</center>

PyFlink 适用的场景

好,在具体的案例之前我们先简单分享一些PyFlink所能适用的业务场景。首先PyFlink既然是Python+Flink,那其适用场景也可以从java和Python两方面去分析,第一个Java所适用的场景PyFlink都适用:

  • 第一个,事件驱动型,比如:刷单,监控等;
  • 第二个,数据分析型的,比如:库存,双11大屏等;
  • 第三个,数据管道,也就是ETL场景,比如一些日志的解析等;
  • 第四个,机器学习,比如个性推荐等;
  • 第五个,科学计算类;

这些都可以尝试使用PyFlink来完成。除此之外还有Python生态特有的场景,比如科学计算等。那么这么多的应用场景,PyFlink到底有哪些可用的API呢?

<center>图23</center>

PyFlink的安装

使用具体的API开发之前,首先要安装PyFlink,目前PyFlink支持pip install 进行安装,这里特别提醒一下具体命令是:pip install apache-Flink

幻灯片24

<center>图24</center>

PyFlink的APIs

目前PyFlink API完全与Java Table API对齐,各种关系操作都支持,同时对window也有很好的支持,并且这里稍微提一下就是PyFlink里面有些易用性API比SQL还要强大,比如:这些对columns进行操作的APIs。除了这些APIs,PyFlink还提供多种定义Python UDF的方式。

<center>图25</center>

PyFlink的UDF定义

首先,可以扩展ScalarFunction,这种方式可以提供更多的辅助功能,比如添加Metrics。除此之外Python语言所支持的任何方式的方法定义,在PyFlink UDF中都是支持的,比如:Lambda Function,Named Function和CallableFunction等。当定义完方法后,我们用PyFlink所提供的Decorators进行打标,并描述input和output的数据类型就可以了。当然后面版本我们也可以根据Python语言的type hint特性再进一步简化,进行类型推导。为了直观,我们看一个具体的UDF定义的例子:

<center>图26</center>

Python UDF定义示例

我们定义两个数相加的例子,首先导入必须的类,然后是刚才我们提到的几种定义方式。这个简单直接,我们闲话少叙,开始看看实际的案例吧:)

<center>图27</center>

PyFlink案例-阿里云CDN实时日志分析

我们这里以一个阿里云CDN实时日志分析的例子来介绍如何用PyFlink解决实际的业务问题。CDN我们都很熟悉,为了进行资源的下载加速。那么CDN日志的解析一般有一个通用的架构模式,就是首先要将各个边缘节点的日志数据进行采集,一般会采集到消息队列,然后将消息队列和实时计算集群进行集成进行实时的日志分析,最后将分析的结果写到存储系统里面。那么我今天的案例将架构实例化,消息队列采用Kafka,实时计算采用Flink,最终将数据存储到MySql中。

<center>图28</center>

阿里云CDN实时日志分析需求说明

我们在来看看业务统计的需求,为了介绍方便,我们将实际的统计需求进行简化,示例中只进行按地区分组,进行资源访问量,下载量和下载速度的统计。数据格式我们只选取核心的字段,比如:uuid,表示唯一的日志标示,client_ip表示访问来源,request_time表示资源下载耗时,response_size表示资源数据大小。其中我们发现我们需求是按地区分组,但是原始日志里面并没有地区的字段信息,所以我们需要定义一个Python UDF 根据 client_ip 来查询对应的地区。好,我们首先看如何定义这个UDF。

<center>图29</center>

阿里云CDN实时日志分析UDF定义

这里我们用了刚才提到的named function的方式定义一个 ip_to_province()的UDF,输入是ip地址,输出是地区名字字符串。我们这里描述了输入类型是一个字符串,输出类型也是一个字符串。当然这里面的查询服务仅供演示,大家在自己的生产环境要替换为可靠的地域查询服务。

<center>图30</center>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import re
import json
from pyFlink.table import DataTypes
from pyFlink.table.udf import udf
from urllib.parse import quote_plus
from urllib.request import urlopen

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def ip_to_province(ip):
"""
format:
{
'ip': '27.184.139.25',
'pro': '河北省',
'proCode': '130000',
'city': '石家庄市',
'cityCode': '130100',
'region': '灵寿县',
'regionCode': '130126',
'addr': '河北省石家庄市灵寿县 电信',
'regionNames': '',
'err': ''
}
"""
try:
urlobj = urlopen( \
'http://whois.pconline.com.cn/ipJson.jsp?ip=%s' % quote_plus(ip))
data = str(urlobj.read(), "gbk")
pos = re.search("{[^{}]+\}", data).span()
geo_data = json.loads(data[pos[0]:pos[1]])
if geo_data['pro']:
return geo_data['pro']
else:
return geo_data['err']
except:
return "UnKnow"

阿里云CDN实时日志分析Connector定义

我们完成了需求分析和UDF的定义,我们开始进行作业的开发了,按照通用的作业结构,需要定义Source connector来读取Kafka数据,定义Sink connector来将计算结果存储到MySQL。最后是编写统计逻辑。在这特别说明一下,在PyFlink中也支持SQL DDL的编写,我们用一个简单的DDL描述,就完成了Source Connector的开发。其中connector.type填写kafka。SinkConnector也一样,用一行DDL描述即可,其中connector.type填写jdbc。描述connector的逻辑非常简单,我们在看看核心统计逻辑是否也一样简单:)

<center>图31</center>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34

kafka_source_ddl = """
CREATE TABLE cdn_access_log (
uuid VARCHAR,
client_ip VARCHAR,
request_time BIGINT,
response_size BIGINT,
uri VARCHAR
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'access_log',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'format.type' = 'csv',
'format.ignore-parse-errors' = 'true'
)
"""

mysql_sink_ddl = """
CREATE TABLE cdn_access_statistic (
province VARCHAR,
access_count BIGINT,
total_download BIGINT,
download_speed DOUBLE
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/Flink',
'connector.table' = 'access_statistic',
'connector.username' = 'root',
'connector.password' = 'root',
'connector.write.flush.interval' = '1s'
)
"""

阿里云CDN实时日志分析核心统计逻辑

首先从数据源读取数据,然后需要先将clien_ip 利用我们刚才定义的ip_to_province(ip)转换为具体的地区。之后,在进行按地区分组,统计访问量,下载量和资源下载速度。最后将统计结果存储到结果表中。这个统计逻辑中,我们不仅使用了Python UDF,而且还使用了Flink 内置的 Java AGG函数,sum和count。

<center>图32</center>

1
2
3
4
5
6
7
8
9
10
11
12
13
# 核心的统计逻辑
t_env.from_path("cdn_access_log")\
.select("uuid, "
"ip_to_province(client_ip) as province, " # IP 转换为地区名称
"response_size, request_time")\
.group_by("province")\
.select( # 计算访问量
"province, count(uuid) as access_count, "
# 计算下载总量
"sum(response_size) as total_download, "
# 计算下载速度
"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
.insert_into("cdn_access_statistic")

阿里云CDN实时日志分析完整代码

我们在整体看一遍完整代码,首先是核心依赖的导入,然后是我们需要创建一个ENV,并设置采用的planner(目前Flink支持Flink和blink两套planner)建议大家采用 blink planner。 接下来将我们刚才描述的kafka和mysql的ddl进行表的注册。再将Python UDF进行注册,这里特别提醒一点,UDF所依赖的其他文件也可以在API里面进行制定,这样在job提交时候会一起提交到集群。然后是核心的统计逻辑,最后调用executre提交作业。这样一个实际的CDN日志实时分析的作业就开发完成了。我们再看一下实际的统计效果。

幻灯片33

<center>图33</center>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import os

from pyFlink.datastream import StreamExecutionEnvironment
from pyFlink.table import StreamTableEnvironment, EnvironmentSettings
from enjoyment.cdn.cdn_udf import ip_to_province
from enjoyment.cdn.cdn_connector_ddl import kafka_source_ddl, mysql_sink_ddl

# 创建Table Environment, 并选择使用的Planner
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(
env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())

# 创建Kafka数据源表
t_env.sql_update(kafka_source_ddl)
# 创建MySql结果表
t_env.sql_update(mysql_sink_ddl)

# 注册IP转换地区名称的UDF
t_env.register_function("ip_to_province", ip_to_province)

# 添加依赖的Python文件
t_env.add_Python_file(
os.path.dirname(os.path.abspath(__file__)) + "/enjoyment/cdn/cdn_udf.py")
t_env.add_Python_file(os.path.dirname(
os.path.abspath(__file__)) + "/enjoyment/cdn/cdn_connector_ddl.py")

# 核心的统计逻辑
t_env.from_path("cdn_access_log")\
.select("uuid, "
"ip_to_province(client_ip) as province, " # IP 转换为地区名称
"response_size, request_time")\
.group_by("province")\
.select( # 计算访问量
"province, count(uuid) as access_count, "
# 计算下载总量
"sum(response_size) as total_download, "
# 计算下载速度
"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
.insert_into("cdn_access_statistic")

# 执行作业
t_env.execute("pyFlink_parse_cdn_log")

### 阿里云CDN实时日志分析运行效果 我们采用mock的数据向kafka发送CDN日志数据,右边实时的按地区统计资源的访问量,下载量和下载速度。这个示例的mock数据工具,源代码和操作过程,在今天的直播后,会更新到我的博客当中。方便大家在自己的环境中进行体验。

<center>图34</center>

PyFlink 未来,会怎样?

总体来看PyFlink的业务开发还是非常简洁的,不用关心底层的实现细节,只需要按照SQL或者Table API的方式描述业务逻辑就行。那么,我们再整体看看PyFlink的未来会怎样呢?

幻灯片35

<center>图35</center>

PyFlink 本心驱动 Roadmap

PyFlink的发展始终要以本心驱动,我们要围绕将现有Flink功能输出到Python用户,将Python生态功能集成到Flink当中为目标。PyFlink的Roadmap如图所示:首先解决Python VM和Java VM的通讯问题,然后将现有的Table API功能暴露给Python用户,提供Python Table API,这也就是Flink 1.9中所进行的工作,接下来我们要为将Python功能集成到Flink做准备就是集成Apache Beam,提供Python UDF的执行环境,并增加Python 对其他类库依赖的管理功能,为用户提供User-defined-Funciton的接口定义,支持Python UDF,这就是Flink 1.10所做的工作。为了进一步扩大Python生态的分布式功能,PyFlink将提供Pandas的Series和DataFram的支持,也就是用户可以在PyFlink中直接使用Pandas的UDF。同时为增强用户的易用性,让用户有更多的方式使用PyFlink,后续增加在Sql Client中使用Python UDF。面对Python用户的机器学习问题,增加Python 的 ML pipeline API。监控Python UDF的执行情况对,对实际的生产业务非常关键,所以PyFlink会增加Python UDF的Metric管理. 这些在Flink1.11中将与用户见面。但这些功能只是PyFlink规划的冰山一角,后续我们还要进行性能优化,图计算API,Pandas on Flink的Pandas原生API 等等。。。进而完成不断将Flink现有功能推向Python生态,将Python 生态的强大功能不断集成到Flink当中,进而完成Python生态分布化的初衷。

<center>图36</center>

## PyFlink 1.11 预览 我们快速的预览一下即将与大家见面的Flink 1.11中的PyFlink的重点内容。

功能

好,我们将视角由远方拉近到Flink1.11版本PyFlink的核心功能,PyFlink会围绕着 功能,性能和易用性不断努力,在1.11在功能上会增加Pandas UDF的支持,这样Pandas生态的实用类库功能可以在PyFlink中直接使用,比如累积分布函数,CDF等。

<center>图37</center>

还会增加ML Pipeline API的支持,这样大家可以利用PyFlink完成一些机器学习场景的业务需求,我这里是一个使用pyFlink完成KMeans的示例。

<center>图38</center>

性能

在性能上PyFlink也会有更多的投入,我们利用Codegen,CPython,优化序列化和反序列化的方式提高PythonUDF的执行性能,目前我们初步对1.10和1.11进行性能对比来看,1.11将比1.10有近15倍的性能提升。

<center>图39</center>

易用性

在用户的易用性上PyFlink会在SQL DDL和SQL Client中增加对Python UDF的支持。让用户有更多的方式选择来使用PyFlink。

<center>图40</center>

PyFlink 大图(使命愿景)

今天已经介绍了很多,比如什么是PyFlink,PyFlink的存在的意义,PyFlink API架构,UDF架构,以及架构背后的取舍和现有架构的优势,并介绍了CDN的案例,介绍了PyFlink的Roadmap,预览了Flink 1.11版本中PyFlink的重点,那么接下来还有什么呢?

那么最后我们再来看看PyFlink的未来会怎样?在以“Flink功能Python化,Python生态分布化”的使命驱动下,PyFlink会有怎样的布局?我们快速分享一下:PyFlink是Apache Flink的一部分,涉及到Runtime层面和API层面。在这两个层面PyFlink会有怎样的发展?Runtime层面,PyFlink会构建解决Java VM和Python VM的通讯问题的gRPC通用服务,比如(Control/Data/State等)在这套框架之上会抽象出Java 的Python UDF算子,Python的执行容器构建,支持多种Python的Execution,比如 Process,Docker和External,尤其值得强调的是External以Socket的方式提供了无限的扩展能力,在后续的Python生态集成上至关重要。API层面,我们会使命驱动,将Flink上所以的API进行Python化,当然这也依托于引入Py4J的VM通讯框架之上,PyFlink会逐渐增加各种API的支持,Python Table API,UDX的接口API,ML Pipeline,DataStream,CEP,Gelly,State,等Flink所具备的Java APIs和Python生态用户的最爱Pandas APIs等。在这些API的基础之上,PyFlink还会不断的进行生态系统的集成,比如 方便用户开发的Notebook的集成,Zeppelin,Jupyter,并与阿里开源的Alink进行集成,目前PyAlink已经完全应用了PyFlink所提供的功能,后面还会和现有的AI系统平台进行集成,比如大家熟知的TensorFlow等等。所以此时我会发现使命驱动的力量会让PyFlink的生命线不断延续...当然这种生命的延续更需要更多的血液融入。这里再次强调一下PyFlink的使命:“Flink能力Python化,Python生态分布化”。目前PyFlink的核心贡献者们正以这样的使命而持续活跃在社区。

<center>图41 </center>

PyFlink核心贡献者及问题支持

在分享的最后,我想介绍一下目前PyFlink的核心贡献者。他们是:

  • 付典,目前付典是Flink以及另外两个Apache顶级项目的Committer,在PyFlink模块做了巨大的贡献。
  • 黄兴勃,目前专注PyFlink的UDF性能优化,曾经是阿里与安全算法挑战赛的冠军,在AI和中间件性能比赛中也有很好的成绩。
  • 程鹤群,为大家做过多次分享,相信大家还记得他为大家带来的《Flink知识图谱》分享。
  • 钟葳,关注PyFlink的UDF依赖管理和易用性工作,目前已经有很多的代码贡献。
  • 孙金城, 就是我自己,全力投入PyFlink的建设。

大家后续在使用PyFlink的时候,如果有什么问题都可以联系我们中的任何一位寻求支持。

<center>图42 </center>

当然遇到通用性问题还是建议大家邮件到Flink的用户列表和中文用户列表,这样能问题共享。当然如果你遇到特别急的个别问题,也非常欢迎您邮件到刚才介绍的小伙伴邮箱,同时,为了问题的积累和有效的分享,更期望大家遇到问题可以在Stackoverflow进行提问题。首先搜索你遇到问题是否已经被解答过,如果没有,请描述清楚,最后提醒大家要为问题打上 PyFlink的tags。这样我们及时订阅回复您问题。

<center>图43</center>

总结

也欢迎大家订阅我的博客,我们可以有更多的线下交流。那么在最后我也简单的总结一下今天的分享,今天对深入剖析了PyFlink深层含义;介绍了PyFlink API架构是核心采用Py4J框架进行VM之间的通讯,API的设计上Python API和Java API保持语义的一致;还介绍了Python UDF架构,以集成Apache Beam 的Portability Framework的方式获取高效稳定的Python UDF的支持,并且细致分析了架构背后思考,对技术选型的取舍和现有架构的优势;然后介绍了PyFlink所适用的业务场景,并以阿里云CDN日志实时分析的案例让大家对PyFlink的使用有一定的体感;最后介绍了PyFlink的Roadmap和预览了Flink 1.11版本中PyFlink的重点,预期PyFlink 1.11相对于1.10会有15倍以上的性能提升,最后和大家一起分享了PyFlink的使命,PyFlink的使命是”Flink能力Python化,Python生态分布化”。留在最后的是提供给大家一种更有效的问题求助的方式,大家有什么问题可以随时抛给刚才向大家介绍的PyFlink小伙伴,那么这些小伙伴已经在直播群里了,接下来有什么问题,我们可以一起探讨。:)再次感谢大家参加今天分享,谢谢大家!

<center>图44</center>