Three Min Series - How to create UDF in PyFlink 1.10

Motivation/动机

Python UDF has been well supported in Apache Flink 1.10. This article takes 3 minutes to show you how to use Python UDF in PyFlink

在Apache Flink 1.10 中已经对Python UDF进行了很好的支持,本篇用3分钟时间向大家介绍如何在PyFlink中使用Python UDF。

How to defined a Python UDF in PyFlink

Steps/操作步骤

  • Check Python Version
    1
    2
    jincheng:videos jincheng.sunjc$ python --version
    Python 3.7.6

It’s better to use Python 3.5+
最好使用Python3.5+

  • Install PyFlink
1
python -m pip install apache-flink
  • Defined Python UDFs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# -*- coding: utf-8 -*-

from pyflink.table import ScalarFunction, DataTypes
from pyflink.table.udf import udf


# Extend ScalarFunction
class Add(ScalarFunction):
def eval(self, i, j):
return i + j

add1 = udf(Add(), [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())


# Named Function
@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def add2(i, j):
return i + j

# Lambda Function
add3 = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())

# Callable Function
class CallableAdd(object):
def __call__(self, i, j):
return i + j

add4 = udf(CallableAdd(), [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())
  • Python UDF example
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# -*- coding: utf-8 -*-
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import FileSystem, OldCsv, Schema
from enjoyment.three_minutes.myudfs import add1, add2, add3, add4

import tempfile
sink_path = tempfile.gettempdir() + '/streaming.csv'

# init env
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)

# register function
t_env.register_function("add1", add1)
t_env.register_function("add2", add2)
t_env.register_function("add3", add3)
t_env.register_function("add4", add4)

t = t_env.from_elements([(1, 2, 'Welcome'), (2, 3, 'To'), (3, 4, 'PyFlink')], ['a', 'b', 'c'])

t_env.connect(FileSystem().path(sink_path))\
.with_format(OldCsv()
.field_delimiter(',')
.field("add1", DataTypes.BIGINT())
.field("add2", DataTypes.BIGINT())
.field("add3", DataTypes.BIGINT())
.field("add4", DataTypes.BIGINT())
.field("b", DataTypes.BIGINT())
.field("c", DataTypes.STRING()))\
.with_schema(Schema()
.field("add1", DataTypes.BIGINT())
.field("add2", DataTypes.BIGINT())
.field("add3", DataTypes.BIGINT())
.field("add4", DataTypes.BIGINT())
.field("b", DataTypes.BIGINT())
.field("c", DataTypes.STRING()))\
.register_table_sink("pyflink_sink")

t.select("add1(a, b), add2(a, b), add3(a, b), add4(a, b), b, c").insert_into("pyflink_sink")

t_env.execute("pyflink_udf")
  • Run and print the result

Video presentation/视频演示

视频

Q&A

If you have questions, you can leave a comment or send an email: sunjincheng121@gmail.com

如有疑问,您可以发表评论或发送电子邮件:sunjincheng121@gmail.com