Deep dive how to support Python UDF in Apache Flink 1.10

Deep dive how to support Python UDF in Apache Flink 1.10

Background

在Apache Flink 1.9版中,我们引入了PyFlink模块,支持了Python Table API。Python用户可以完成数据转换和数据分析的作业。但是,您可能会发现在PyFlink 1.9中还不支持定义Python UDFs,对于想要扩展系统内置功能的Python用户来说,这可能诸多不便。

在刚刚发布的ApacheFlink 1.10中,PyFlink添加了对Python UDFs的支持。这意味着您可以从现在开始用Python编写UDF并扩展系统的功能。此外,本版本还支持Python UDF环境和依赖管理,因此您可以在UDF中使用第三方库,从而利用Python生态丰富的第三方库资源。

Apache Flink 1.9 introduced the Python Table API, allowing developers and data engineers to write Python Table API jobs for Table Transformations and Analysis, such as Python ETL or Aggregate jobs. However, Python users faced some limitations when it came to support for Python UDFs in Apache Flink 1.9, preventing them from extending the system’s built-in functionality.

In Apache Flink1.10, the community further extended the support for Python by adding Python UDFs in PyFlink. Additionally, both the Python UDF Environment and Dependency Management are now supported, allowing users to import third-party libraries in the UDFs, leveraging Python’s rich set of third-party libraries.

Architecture of Python UDFs supports in Pyflink

在深入了解如何定义和使用Python UDFs之前,我们将解释UDFs在PyFlink中工作的架构和背景,并提供一些有关我们底层实现的细节介绍。

Before diving into how you can define and use Python UDFs, we explain the architecture and background behind how UDFs work in PyFlink and provide some additional context about the implementation of our approach.

Apache Beam是一个统一编程模型框架,实现了可使用任何语言开发可以运行在任何执行引擎上的批处理和流处理作业,这得益于Beam的Portability Framework,如下图所示:

Apache Beam is an advanced unified programming model implement batch and streaming data processing jobs that run on any execution engine and any languages.This benefits from the Beam Portability Framework, as shown in the following figure:


Figure 1: Portability Framework

上图是Beam的 Portability Framework 的体系结构。 它描述了Beam如何支持多种语言和多种引擎的方式。 关于Flink Runner部分,我们可以说是Beam on Flink。 那么,这与PyFlink支持Python UDF有什么关系呢? 这将接下来“Flink on Beam” 中介绍。

The above figure is the architecture of the Beam Portability Framework. It describes how the Beam supports multiple languages and multiple runners. Regarding Flink Runner, we can say that it is Beam on Flink. So what does this have to do with Python UDFs supports in PyFlink? This is covered in the next Flink on Beam section.

Apache Flink是一个开源项目,因此,它的社区也更多地使用开源。 例如,PyFlink中对Python UDF的支持选择了基于Apache Beam这辆豪华跑车之上进行构建。:)

Apache Flink is an open source project, so, its community also more embraces open source. Such as, Python UDFs support in PyFlink chose to build based on Apache Beam(A luxury sports car ). :)


Figure 2: Flink on Beam

PyFlink对Python UDFs的支持上,Python的运行环境管理以及Python运行环境Python VM和Java运行环境JVM的通讯至关重要。幸运的是,Apache Beam的Portability Framework完美解决了这个问题。所以才有了如下PyFlink on Beam Portability Framework的架构如下:

In PyFlink’s support for Python UDFs, the management of the Python runtime environment and the communication between the Python runtime environment Python VM and the Java runtime environment JVM are critical. Fortunately, Apache Beam’s Portability Framework solves this problem perfectly. Therefore, we have the following architecture: PyFlink on Beam Portability Framework.


Figure 3: PyFlink on Beam Portability Framework

Beam Portability Framework 是一个成熟的多语言支持框架,框架高度抽象了语言之间的通信协议(gRPC),定义了数据的传输格式(Protobuf),并且根据通用流计算框架所需要的组件,抽象个各种服务,比如, DataService,StateService,MetricsService等。在这样一个成熟的框架下,PyFlink可以快速的构建自己的Python算子,同时重用Apache Beam Portability Framework中现有SDK harness组件,可以支持多种Python运行模式,如:Process,Docker,etc.,这使得PyFlink对Python UDF的支持变得非常容易,在Apache Flink 1.10中的功能也非常的稳定和完整。那么为啥说是Apache Flink和Apache Beam 共同打造呢,是因为我发现目前Apache Beam Portability Framework的框架也存在很多优化的空间,所以我在Beam社区进行了优化讨论,并且在Beam 社区也贡献了30+的优化补丁

Beam Portability Framework is a mature multi-language support framework. The framework highly abstracts the communication protocol between languages (gRPC), defines the data transmission format (Protobuf), and abstracts each component according to the components required by the general stream computing framework services, such as, DataService, StateService, MetricsService, etc. Under such a mature framework, PyFlink can quickly build its own Python operators, while reusing the existing SDK harness components in the Apache Beam Portability Framework, and can support multiple Python operating modes, such as: Process, Docker, etc., which It makes PyFlink support for Python UDF very easy, and the features in Apache Flink 1.10 are also very stable and complete. So why did it say that Apache Flink and Apache Beam were jointly built? Because there is still a lot of work for optimization in the current Apache Beam Portability Framework framework, so I conducted optimization discussions in the Beam community, and contributed to the Beam community 30+ optimized patches.

Communication between JVM and Python VM

由于Python UDF无法直接在JVM中运行,因此需要由Apache Flink算子在初始化时启动的Python进程来准备Python执行环境。 Python ENV服务负责启动,管理和终止Python进程。 如下图4所示,Apache Flink算子和Python执行环境之间的通信和涉及多个组件:

Since Python UDFs cannot directly run in JVM, they are executed within the Python environment, in a Python Process initiated by the Apache Flink operator upon initialization. The Python environment is responsible for launching, managing and tearing down the Python Process. As illustrated in Figure 4 below, several components are involved in the communication between the Apache Flink operator and the Python execution environment:


Figure 4: Communication between JVM and Python VM

  • 环境管理服务: 负责启动和终止Python执行环境。
  • 数据服务: 负责在Apache Flink算子和Python执行环境之间传输输入数据和接收用户UDF的执行结果。
  • 日志服务: 是记录对用户UDF日志输出支持的机制。 它可以将用户UDF产生的日志传输到Apache Flink算子,并与Apache Flink的日志系统集成。

说明: 其中 metrics 服务 计划在 Apache Flink 1.11 进行支持。

  • Environment Service: Responsible for launching and destroying the Python execution environment.
  • Data Service: Responsible for transferring the input data and the user-defined function execution results between the Apache Flink operator and the Python execution environment.
  • Logging Service: Mechanism for logging support for user defined functions. It allows transferring log entries produced by user defined functions to the Apache Flink operator and integrates with Apache Flink’s own logging system.

NOTE: Supporting metrics is currently planned for Apache Flink 1.11.

图5 描述了从Java 算子到Python进程之间初始化和执行UDF的概要流程。

Figure 5 below describes the high-level flow between initializing and executing UDFs from the Java operator to the Python process.


Figure 5: High-level flow between Python VM and JVM
流程可以概括为如下两部分:

  • 初始化Python执行环境。
    • Python UDF Runner启动所需的gRPC服务,如数据服务、日志服务等。
    • Python UDF Runner另起进程并启动Python执行环境。
    • Python worker向PythonUserDefinedFunctionRunner进行注册。
    • Python UDF Runner向Python worker发送需要在Python进程中执行的用户定义函数。
    • Python worker将用户定义的函数转换为Beam 执行算子(注意:目前,PyFlink利用Beam的可移植性框架[1]来执行Python UDF)。
    • Python worker和Flink Operator之间建立gRPC连接,如数据连接、日志连接等。
  • 处理输入元素。
    • Python UDF Runner通过gRPC数据服务将输入元素发送给Python worker执行。
    • Python用户定义函数还可以在执行期间通过gRPC日志服务和metrics服务将日志和metrics收集到Python UDF Runner。
    • 执行结果可以通过gRPC数据服务发送到Python UDF Runner。

The high-level flow can be summarized in two parts:

  • Initialization of the Python execution environment.

    • The Python UDF Runner starts the gRPC services, such as the data service, logging service, etc.
    • The Python UDF Runner launches the Python execution environment in a separate process.
    • The Python worker registers to PythonUserDefinedFunctionRunner.
    • The Python UDF Runner sends the user defined functions to be executed in the Python worker.
    • The Python worker transforms the user defined functions to Beam operations. (Note that we leverage the power of Beam’s Portability Framework to execute the Python UDF.)
    • The Python worker establishes the gRPC connections, such as the data connection, logging connection, etc.
  • Processing of the input elements.

    • The Python UDF Runner sends the input elements via the gRPC data service to the Python worker for execution.
    • The Python use defined function can access the state via gRPC state service during execution.
    • The Python user defined function can also aggregate the logging and metrics to the Python UDF Runner via the gRPC logging service and the metrics service during execution.
    • The execution results are finally sent to the Python UDF Runner via the gRPC data service.

How to use PyFlink with UDFs in Apache Flink 1.10

本节将介绍用户如何定义UDF,并完整展示了如何安装PyFlink,如何在PyFlink中定义/注册/调用UDF,以及如何执行作业。

This section provides some Python user defined function (UDF) examples, Including how to install PyFlink, how to define/register/invoke UDFs in PyFlink and how to execute the job.

我们需要先安装PyFlink,可以通过PyPI获得,并且可以使用pip install进行便捷安装。

注意: 安装和运行PyFlink需要Python 3.5或更高版本。

Using Python in Apache Apache Flink requires installing PyFlink. PyFlink is available through PyPi and can be easily installed using pip:

1
$ python -m pip install apache-Apache Flink

Please note that Python 3.5 or higher is required to install and run PyFlink.

Define a Python UDF

除了扩展基类ScalarFunction之外,定义Python UDF的方法有很多。下面的示例显示了定义Python UDF的不同方法,该函数以BIGINT类型的两列作为输入参数,并返回它们的和作为结果。

There are many ways to define a Python scalar function, besides extending the base class ScalarFunction. The following example shows the different ways of defining a Python scalar function that takes two columns of BIGINT as input parameters and returns the sum of them as the result.

  • Option 1: extending the base class ScalarFunction
1
2
3
4
5
class Add(ScalarFunction):
def eval(self, i, j):
return i + j

add = udf(Add(), [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
  • Option 2: Python function
1
2
3
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def add(i, j):
return i + j
  • option 3: lambda function
1
add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
  • option 4: callable function
1
2
3
4
5
class CallableAdd(object):
def __call__(self, i, j):
return i + j

add = udf(CallableAdd(), [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
  • option 5: partial function
partial_add(i, j, k):
1
2
3
4
  return i + j + k

add = udf(functools.partial(partial_add, k=1), [DataTypes.BIGINT(), DataTypes.BIGINT()],
DataTypes.BIGINT())

Register a Python UDF

  • register the Python function

table_env.register_function("add", add)

  • Invoke a Python UDF

my_table.select("add(a, b)")

  • Example Code

下面是一个使用Python UDF的完整示例。
Below, you can find a complete example of using Python UDF.

PyFlink.datastream import StreamExecutionEnvironment
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
from PyFlink.table import StreamTableEnvironment, DataTypes
from PyFlink.table.descriptors import Schema, OldCsv, FileSystem
from PyFlink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)

t_env.register_function("add", udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT()))

t_env.connect(FileSystem().path('/tmp/input')) \
.with_format(OldCsv()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
.with_format(OldCsv()
.field('sum', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('sum', DataTypes.BIGINT())) \
.create_temporary_table('mySink')

t_env.from_path('mySource')\
.select("add(a, b)") \
.insert_into('mySink')

t_env.execute("tutorial_job")
  • Submit the job

Firstly, you need to prepare the input data in the “/tmp/input” file. For example,

1
$ echo "1,2" > /tmp/input

Next, you can run this example on the command line,

1
$ python python_udf_sum.py

The command builds and runs the Python Table API program in a local mini-cluster. You can also submit the Python Table API program to a remote cluster using different command lines, (see more details here).

Finally, you can see the execution result on the command line:

1
2
$ cat /tmp/output
3

Python UDF dependency management

在许多情况下,您可能希望在Python UDF中导入第三方依赖。下面的示例将指导您如何管理依赖项。

In many cases, you would like to import third-party dependencies in the Python UDF. The example below provides detailed guidance on how to manage such dependencies.

假设您想使用mpmath来执行上述示例中两数的和。Python UDF逻辑可能如下:
Suppose you want to use the mpmath to perform the sum of the example above. The Python UDF may look like:

1
2
3
4
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def add(i, j):
from mpmath import fadd # add third-party dependency
return int(fadd(1, 2))

要使其在不包含依赖项的工作节点上运行,可以使用以下API指定依赖项:

To make it available on the worker node that does not contain the dependency, you can specify the dependencies with the following API:

1
2
3
# echo mpmath==1.1.0 > requirements.txt
# pip download -d cached_dir -r requirements.txt --no-binary :all:
t_env.set_python_requirements("/path/of/requirements.txt", "/path/of/cached_dir")

用户需要提供一个requirements.txt文件,并且在里面申明使用的第三方依赖。如果无法在群集中安装依赖项(网络问题),则可以使用参数“requirements_cached_dir”,指定包含这些依赖项的安装包的目录,如上面的示例所示。依赖项将上传到群集并脱机安装。

A requirements.txt file that defines the third-party dependencies is used. If the dependencies cannot be accessed in the cluster, then you can specify a directory containing the installation packages of these dependencies by using the parameter “requirements_cached_dir”, as illustrated in the example above. The dependencies will be uploaded to the cluster and installed offline.

下面是一个使用依赖管理的完整示例:

Below, we showcase a complete example of using dependency management for Python UDFs in Apache Flink 1.10:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from PyFlink.datastream import StreamExecutionEnvironment
from PyFlink.table import StreamTableEnvironment, DataTypes
from PyFlink.table.descriptors import Schema, OldCsv, FileSystem
from PyFlink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def add(i, j):
from mpmath import fadd
return int(fadd(1, 2))

t_env.set_python_requirements("/tmp/requirements.txt", "/tmp/cached_dir")
t_env.register_function("add", add)

t_env.connect(FileSystem().path('/tmp/input')) \
.with_format(OldCsv()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
.with_format(OldCsv()
.field('sum', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('sum', DataTypes.BIGINT())) \
.create_temporary_table('mySink')

t_env.from_path('mySource')\
.select("add(a, b)") \
.insert_into('mySink')

t_env.execute("tutorial_job")

Submit the job

Firstly, you need to prepare input data in the “/tmp/input” file. For example,

echo "1,2" >link
1
2


Secondly, you can prepare the dependency requirements file and the cache dir,

1
2
$ echo "mpmath==1.1.0" > /tmp/requirements.txt
$ pip download -d /tmp/cached_dir -r /tmp/requirements.txt --no-binary :all:

Next, you can run this example on the command line,

1
$ python python_udf_sum.py

Finally, you can see the execution result on the command line:

1
2
$ cat /tmp/output
3

Quick start

PyFlink为大家提供了一种非常方便的开发体验方式 - PyFlink Shell。当成功执行 python -m pip install apache-flink之后,你可以直接以pyflink-shell.sh local来启动一个PyFlink Shell进行开发体验,如下所示:

Pyflink provides a very convenient way to development - pyflink shell. After successfully executing python -m pip install apache-flink, you can directly start a pyflink shell with pyflink-shell.sh local as shown below:

More

不仅仅是简单的ETL场景支持,PyFlink可以完成很多复杂场的业务场景需求,比如我们最熟悉的双11大屏的场景,如下:

Not only simple ETL scenario support, PyFlink can fulfill the requirements of many complex business scenario, such as the double 11 large screen scenario that we are most familiar with, as follows:

关于上面示例的更多详细请查阅 这里
More detail about above example can be found here.

Conclusion & Upcoming work

在本博客中,我们介绍了PyFlink中Python UDF的架构,并给出了如何定义、注册、调用和运行UDF的示例。随着1.10的发布,它将为Python用户提供更多的可能来编写Python作业逻辑。同时,我们一直积极与社区合作,不断改进PyFlink的功能和性能。今后,我们计划在标量和聚合函数中引入对Pandas的支持;通过SQL客户端增加对Python UDF使用的支持,以扩展Python UDF的使用范围;并做更多的性能改进。近期,邮件列表上有一个关于新功能支持的讨论,您可以查看并找到更多详细信息。

In this blog post, we introduced the architecture of Python UDFs in PyFlink and provided some examples on how to define, register and invoke UDFs. Apache Flink 1.10 brings Python support in the framework to new levels, allowing Python users to write even more magic with their preferred language. The community is actively working towards continuously improving the functionality and performance of PyFlink. Future work in upcoming releases will introduce support for Pandas UDFs in scalar and aggregate functions, add support to use Python UDFs through the SQL client to further expand the usage scope of Python UDFs and finally work towards even more performance improvements. To find more information about the upcoming work with Python in Apache Apache Flink you can join the discussion on the Apache Apache Flink mailing and share your suggestions and thoughts with the community.

在社区贡献者的不断努力之下,PyFlink的功能可以如上图一样可以迅速从幼苗变成大树:

With the continuous efforts of community contributors, PyFlink can quickly change from seedlings to trees as shown in the follows figure:

PyFlink Want You

PyFlink是一个新组件,仍然需要做很多工作。 因此,热诚欢迎每个人加入对PyFlink的贡献,包括提出问题,提交错误报告,提出新功能,加入讨论,贡献代码或文档。。。期望在PyFlink见到你!

PyFlink is a new component and still needs a lot of work. Therefore, we sincerely welcome everyone to join our contribution to PyFlink, including asking questions, submitting error reports, proposing new functions, joining discussions, contributing codes or documents…

Hope to see you in PyFlink!