Three Min Series - How PyFlink does ETL

Motivation/动机

Apache Flink Python API (PyFlink) is a new feature provided by Flink 1.9. Many users do not understand the business scenarios for PyFlink. This article will introduce how PyFlink is applied to ETL.

Apache Flink Python API(PyFlink)是Flink 1.9提供的一项新功能。 许多用户不了解PyFlink的业务场景。 本文将介绍如何将PyFlink应用于ETL。

What's ETL/什么是ETL

ETL is a type of data integration that refers to the three steps (extract, transform, load) used to blend data from multiple sources.

ETL是一种数据集成类型,它涉及用于混合来自多个源的数据的三个步骤(提取,转换,加载)。

PyFlink is very suitable for ETL, that is, extracting data by defining a Source Connector. After transforming the data through various operators, the data is loaded into another store by defining a Sink connector.

PyFlink非常适合ETL,即通过定义Source Connector提取数据。 然后通过各种运算符对数据进行转换之后,通过定义Sink Connector将数据加载到另一个存储中。

User Case/案例

Data & Req / 数据和ETL需求

The Data info as follows:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
BeiJing,7355291
HeBei,20813492
ShanXi,10654162
NeiMeng,8470472
LiaoNing,15334912
JiLin,9162183
HeLongJiang,13192935
ShangHai,8893483
JiangSu,25635291
ZheJiang,20060115
AnHui,19322432
FuJian,11971873
JiangXi,11847841
ShanDong,30794664
HeNan,26404973
HuBei,17253385
HuNan,19029894
GuangDong,32222752
GuangXi,13467663
ChongQing,10272559
SiChuang,26383458
GuiZhou,10745630
YunNan,12695396
ShanXi,11084516
GanSu,7113833
XinJiang,6902850

Source Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# -*- coding: utf-8 -*-

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem

import os
import time

# Data source file (Suppose our data is stored in a CSV file)
source_file = 'Population_information.csv'
# Result file (Suppose store the data into the CSV file)
sink_file = 'Population_information_More_Than_5_Million.csv'

# Create a Environment
exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)

t_env = BatchTableEnvironment.create(exec_env)


# Delete the result file if the it exists,
if os.path.exists(sink_file):
os.remove(sink_file)

# Create the Source connector by using `connect` method.
t_env.connect(FileSystem().path(source_file)) \
.with_format(OldCsv()
.field('Province', DataTypes.STRING())
.field('Population', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('Province', DataTypes.STRING())
.field('Population', DataTypes.BIGINT())) \
.register_table_source('source_tab')

# Create the Sink connector by using `connect` method.
t_env.connect(FileSystem().path(sink_file)) \
.with_format(OldCsv()
.field_delimiter(',')
.field('Province', DataTypes.STRING())
.field('Population', DataTypes.BIGINT())) \
.with_schema(Schema()
.field('Province', DataTypes.STRING())
.field('Population', DataTypes.BIGINT())) \
.register_table_sink('sink_tab')

# Extract provinces with population over 5 million
t_env.from_path('source_tab') \
.select('Province, Population') \
.where('Population >= 5000000') \
.insert_into('sink_tab')

# Execute the Job
print("run: " + str(time.time()))
t_env.execute("ETL_Extract_provinces_with_population_over_5_million")
print("finish: " + str(time.time()))

Video presentation/视频演示

<iframe src="//player.bilibili.com/player.html?aid=84500579&cid=144530466&page=1" scrolling="no" border="0" frameborder="no" framespacing="0" allowfullscreen="true" > </iframe>

Q&A

If you have questions, you can leave a comment or send an email: sunjincheng121@gmail.com

如有疑问,您可以发表评论或发送电子邮件:sunjincheng121@gmail.com