Three Min Series - How to using PyFlink Shell

Three Min Series - How to using PyFlink Shell

Motivation/动机

Apache Flink 1.10 was just released shortly. PyFlink provides users with the most convenient way to experience-PyFlink Shell. This article takes 3 minutes to tell you how to quickly experience PyFlink.

Apache Flink 1.10 刚刚发布不久,PyFlink为用户提供了一种最便捷的体验方式 - PyFlink Shell. 本篇用3分钟时间向大家介绍如何快速体验PyFlink。

How to deploy the PyFlink Job

As shown in the figure above, we can deploy PyFlink jobs to MiniCluster / Standalone / Yarn or Cloud clusters.

如上图所示,我们可以将PyFlink作业部署到MiniCluster/Standalone/Yarn或者Cloud集群。

Steps/操作步骤

  • Check Python Version
    1
    2
    jincheng:videos jincheng.sunjc$ python --version
    Python 3.7.6

It's better to use Python 3.5+ 最好使用Python3.5+

  • Install PyFlink

1
python -m pip install apache-flink

  • Start PyFlink Shell

1
pyflink-shell.sh local

  • Python UDF example

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from PyFlink.datastream import StreamExecutionEnvironment
from PyFlink.table import StreamTableEnvironment, DataTypes
from PyFlink.table.descriptors import Schema, OldCsv, FileSystem
from PyFlink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = StreamTableEnvironment.create(env)

@udf(input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
def add(i, j):
from mpmath import fadd
return int(fadd(1, 2))

t_env.set_python_requirements("/tmp/requirements.txt", "/tmp/cached_dir")
t_env.register_function("add", add)

t_env.connect(FileSystem().path('/tmp/input')) \
.with_format(OldCsv()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('a', DataTypes.BIGINT())
.field('b', DataTypes.BIGINT())) \
.create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
.with_format(OldCsv()
.field('sum', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('sum', DataTypes.BIGINT())) \
.create_temporary_table('mySink')

t_env.from_path('mySource')\
.select("add(a, b)") \
.insert_into('mySink')

t_env.execute("tutorial_job")

  • Submit PyFlink Job to Cluster

We can submit the job to the standalone cluster with the following command, and see the video demonstration for details.

我们可以用如下命令将作业提交到Standalone集群,详细情况查阅视频演示。

1
pyflink-shell.sh remote <hostname> <portnumber>

Video presentation/视频演示

视频

Q&A

If you have questions, you can leave a comment or send an email: sunjincheng121@gmail.com

如有疑问,您可以发表评论或发送电子邮件:sunjincheng121@gmail.com