PyFlink 场景案例 - PyFlink实现CDN日志实时分析

放空自己

三十辐共一毂,当其无,有车之用。埏埴以为器,当其无,有器之用。凿户牖以为室,当其无,有室之用。— 深悟 “空” 之道。

CDN 日志实时分析综述

CDN将源站资源缓存至遍布全球的加速节点上,当终端用户请求获取该资源时,无需回源,系统自动调用离终端用户最近的CDN节点上已缓存的资源,那么如何进行实时日志分析呢?

架构

CDN日志的解析一般有一个通用的架构模式,就是首先要将各个边缘节点的日志数据进行采集,一般会采集到消息队列,然后将消息队列和实时计算集群进行集成进行实时的日志分析,最后将分析的结果写到存储系统里面。那么我今天的案例将架构实例化,消息队列采用Kafka,实时计算采用Flink,最终将数据存储到MySql中。如下图所示:

需求说明

阿里云实际的CDN日志数据结构如下(可能会不断丰富字段信息):

为了介绍方便,我们将实际的统计需求进行简化,示例将从CDN访问日志中,根据IP解析出其所属的地区,统计指标:

  • 按地区统计资源访问量
  • 按地区统计资源下载总量
  • 按地区统计资源平均下载速度

CDN实时日志分析UDF定义

这里我们需要定义一个 ip_to_province()的UDF,输入是ip地址,输出是地区名字字符串。UDF的输入类型是一个字符串,输出类型也是一个字符串。同时我们会用到地理区域查询服务(http://whois.pconline.com.cn/ipJson.jsp?ip=27.184.139.25),大家在自己的生产环境要替换为可靠的地域查询服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import re
import json
from pyflink.table import DataTypes
from pyflink.table.udf import udf
from urllib.parse import quote_plus
from urllib.request import urlopen

@udf(input_types=[DataTypes.STRING()], result_type=DataTypes.STRING())
def ip_to_province(ip):
"""
format:
{
'ip': '27.184.139.25',
'pro': '河北省',
'proCode': '130000',
'city': '石家庄市',
'cityCode': '130100',
'region': '灵寿县',
'regionCode': '130126',
'addr': '河北省石家庄市灵寿县 电信',
'regionNames': '',
'err': ''
}
"""
try:
urlobj = urlopen( \
'http://whois.pconline.com.cn/ipJson.jsp?ip=%s' % quote_plus(ip))
data = str(urlobj.read(), "gbk")
pos = re.search("{[^{}]+\}", data).span()
geo_data = json.loads(data[pos[0]:pos[1]])
if geo_data['pro']:
return geo_data['pro']
else:
return geo_data['err']
except:
return "UnKnow"

数据读取和结果写入定义

按照通用的作业结构,需要定义Source connector来读取Kafka数据,定义Sink connector来将计算结果存储到MySQL。最后是编写统计逻辑。在这特别说明一下,在PyFlink中也支持SQL DDL的编写,我们用一个简单的DDL描述,就完成了Source Connector的开发。其中connector.type填写kafka。SinkConnector也一样,用一行DDL描述即可,其中connector.type填写jdbc。

Kafka 数据源读取DDL定义

数据统计需求我们只选取核心的字段,比如:uuid,表示唯一的日志标示,client_ip表示访问来源,request_time表示资源下载耗时,response_size表示资源数据大小。Kafka数据字段进行简化如下:
-w344

DDL定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
kafka_source_ddl = """
CREATE TABLE cdn_access_log (
uuid VARCHAR,
client_ip VARCHAR,
request_time BIGINT,
response_size BIGINT,
uri VARCHAR
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'access_log',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'format.type' = 'csv',
'format.ignore-parse-errors' = 'true'
)
"""

MySql 数据写入DDL定义

其中我们发现我们需求是按地区分组,但是原始日志里面并没有地区的字段信息,所以我们需要定义一个Python UDF 更具 client_ip 来查询对应的地区。所以我们对应的MySqL统计结果表如下:
-w475
DDL定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
mysql_sink_ddl = """
CREATE TABLE cdn_access_statistic (
province VARCHAR,
access_count BIGINT,
total_download BIGINT,
download_speed DOUBLE
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/Flink',
'connector.table' = 'access_statistic',
'connector.username' = 'root',
'connector.password' = '123456',
'connector.write.flush.interval' = '1s'
)
"""

核心统计逻辑

我们首先要将client_ip转换为地区名字,然后在做数据统计,如下核心统计逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
# 核心的统计逻辑
t_env.from_path("cdn_access_log")\
.select("uuid, "
"ip_to_province(client_ip) as province, " # IP 转换为地区名称
"response_size, request_time")\
.group_by("province")\
.select( # 计算访问量
"province, count(uuid) as access_count, "
# 计算下载总量
"sum(response_size) as total_download, "
# 计算下载速度
"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
.insert_into("cdn_access_statistic")

完整的作业代码

我们整体看一遍完整代码结构,首先是核心依赖的导入,然后是我们需要创建一个ENV,并设置采用的planner(目前Flink支持Flink和blink两套planner)建议大家采用 blink planner。 接下来将我们刚才描述的kafka和mysql的ddl进行表的注册。再将Python UDF进行注册,这里特别提醒一点,UDF所依赖的其他文件也可以在API里面进行制定,这样在job提交时候会一起提交到集群。然后是核心的统计逻辑,最后调用executre提交作业。这样一个实际的CDN日志实时分析的作业就开发完成了。具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import os

from pyFlink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
from enjoyment.cdn.cdn_udf import ip_to_province
from enjoyment.cdn.cdn_connector_ddl import kafka_source_ddl, mysql_sink_ddl

# 创建Table Environment, 并选择使用的Planner
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(
env,
environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())

# 创建Kafka数据源表
t_env.sql_update(kafka_source_ddl)
# 创建MySql结果表
t_env.sql_update(mysql_sink_ddl)

# 注册IP转换地区名称的UDF
t_env.register_function("ip_to_province", ip_to_province)

# 添加依赖的Python文件
t_env.add_Python_file(
os.path.dirname(os.path.abspath(__file__)) + "/enjoyment/cdn/cdn_udf.py")
t_env.add_Python_file(os.path.dirname(
os.path.abspath(__file__)) + "/enjoyment/cdn/cdn_connector_ddl.py")

# 核心的统计逻辑
t_env.from_path("cdn_access_log")\
.select("uuid, "
"ip_to_province(client_ip) as province, " # IP 转换为地区名称
"response_size, request_time")\
.group_by("province")\
.select( # 计算访问量
"province, count(uuid) as access_count, "
# 计算下载总量
"sum(response_size) as total_download, "
# 计算下载速度
"sum(response_size) * 1.0 / sum(request_time) as download_speed") \
.insert_into("cdn_access_statistic")

# 执行作业
t_env.execute("pyFlink_parse_cdn_log")

环境搭建(MacOS)

安装MySQL

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$  brew install mysql
Updating Homebrew...
==> Auto-updated Homebrew!
...
...
MySQL is configured to only allow connections from localhost by default

To connect run:
mysql -uroot

To have launchd start mysql now and restart at login:
brew services start mysql
Or, if you don't want/need a background service you can just run:
mysql.server start

如果没有安装brew,执行以下指令安装:

1
$ /usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"

安装完成后,执行安全配置,将root用户的密码设置为JxXX&&l2j2#:

1
$ mysql_secure_installation

启动mysql服务:

1
2
3
4
5
6
7
$ brew services start mysql

==> Tapping homebrew/services
Cloning into '/usr/local/Homebrew/Library/Taps/homebrew/homebrew-services'...
...
...
==> Successfully started `mysql` (label: homebrew.mxcl.mysql)

登录mysql并创建flink数据库和cdn_access_statistic表,并确认开放端口号为默认的3306:

1
2
3
4
5
6
7
8
9
10
11
12
$ mysql -uroot -p
Enter password: JxXX&&l2j2#
mysql> use flink;
Database changed
mysql> CREATE TABLE cdn_access_statistic(
-> province VARCHAR(255) PRIMARY KEY,
-> access_count BIGINT,
-> total_download BIGINT,
-> download_speed DOUBLE
-> ) ENGINE=INNODB CHARSET=utf8mb4;
Query OK, 0 rows affected (0.01 sec)
exit

这样mysql的环境就准备好了。

安装Kafka

在Mac系统安装Kafka也非常方便,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
brew install kafka
Updating Homebrew...
==> Installing dependencies for kafka: zookeeper
...
...
==> Summary
/usr/local/Cellar/zookeeper/3.5.7: 394 files, 11.3MB
==> Installing kafka
...
...
==> kafka
To have launchd start kafka now and restart at login:
brew services start kafka
Or, if you don't want/need a background service you can just run:
zookeeper-server-start /usr/local/etc/kafka/zookeeper.properties & kafka-server-start /usr/local/etc/kafka/server.properties

安装完成后执行以下指令启动zookeeper和kafka:

1
2
3
4
5
$ cd /usr/local/opt/kafka/libexec/bin/
$ ./zookeeper-server-start.sh ../config/zookeeper.properties &
$ (回车)
$ ./kafka-server-start.sh ../config/server.properties &
$ (回车)

然后在kafka中创建名为cdn_access_log的topic:

1
2
3
$ kafka-topics --create --replication-factor 1 --partitions 1 --topic cdn_access_log --zookeeper localhost:2181
...
Created topic cdn_access_log.

这样kafka的环境也准备好了.

确保使用Python3.5+的版本,检验如下:

1
2
$ python --version
Python 3.7.6

说过pip install安装PyFlink

1
2
3
4
python -m pip install apache-flink==1.10.0
...
...
Successfully installed apache-beam-2.15.0 dill-0.2.9 pyarrow-0.14.1

检查我们是否成功安装了PyFlink和依赖的Beam,如下:

1
2
3
$ python -m pip list |grep apache-
apache-beam 2.15.0
apache-flink 1.10.0

如上显示说明已经完成安装。

Copy使用的Connector的JAR包

Flink默认是没有打包connector的,所以我们需要下载各个connector所需的jar包并放入PyFlink的lib目录。首先拿到PyFlink的lib目录的路径:

1
2
3
$ PYFLINK_LIB=`python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/lib')"`
$ echo $PYFLINK_LIB
/usr/local/lib/python3.7/site-packages/pyflink/lib

然后想需要的jar包下载到lib目录中去:

1
2
3
4
5
$ cd $PYFLINK_LIB
$ curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.11/1.10.0/flink-sql-connector-kafka_2.11-1.10.0.jar
$ curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-jdbc_2.11/1.10.0/flink-jdbc_2.11-1.10.0.jar
$ curl -O https://repo1.maven.org/maven2/org/apache/flink/flink-csv/1.10.0/flink-csv-1.10.0-sql-jar.jar
$ curl -O https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.19/mysql-connector-java-8.0.19.jar

最终lib的JARs如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
$ ls -la
total 310448
drwxr-xr-x 12 jincheng.sunjc admin 384 3 27 15:48 .
drwxr-xr-x 28 jincheng.sunjc admin 896 3 24 11:44 ..
-rw-r--r-- 1 jincheng.sunjc admin 36695 3 27 15:48 flink-csv-1.10.0-sql-jar.jar
-rw-r--r-- 1 jincheng.sunjc admin 110055308 2 27 13:33 flink-dist_2.11-1.10.0.jar
-rw-r--r-- 1 jincheng.sunjc admin 89695 3 27 15:48 flink-jdbc_2.11-1.10.0.jar
-rw-r--r-- 1 jincheng.sunjc admin 2885638 3 27 15:49 flink-sql-connector-kafka_2.11-1.10.0.jar
-rw-r--r-- 1 jincheng.sunjc admin 22520058 2 27 13:33 flink-table-blink_2.11-1.10.0.jar
-rw-r--r-- 1 jincheng.sunjc admin 19301237 2 27 13:33 flink-table_2.11-1.10.0.jar
-rw-r--r-- 1 jincheng.sunjc admin 489884 2 27 13:33 log4j-1.2.17.jar
-rw-r--r-- 1 jincheng.sunjc admin 2356711 3 27 15:48 mysql-connector-java-8.0.19.jar
-rw-r--r-- 1 jincheng.sunjc admin 20275 12 5 17:01 pyflink-demo-connector-0.1.jar
-rw-r--r-- 1 jincheng.sunjc admin 9931 2 27 13:33 slf4j-log4j12-1.7.15.jar

这样所有的环境就准备好了。

本地运行作业

  • 启动本地集群
    1
    $PYFLINK_LIB/../bin/start-cluster.sh local

启动成功后可以打开web界面(默认8081端口,我更改为4000)

  • 提交作业
    示例涉及到如下三个py文件:
  1. cdn_demo.py
  2. cdn_connector_ddl.py
  3. cdn_udf.py

我们在作业文件所在目录进行执行:

1
2
$ $PYFLINK_LIB/../bin/flink run -m localhost:4000 -py cdn_demo.py
Job has been submitted with JobID e71de2db44fd6cfb9bae93bd04ee2ec9

查看控制台如下:
-w1408

目前我们已经成功的将作业启动起来了,接下来我们向Kafka里面进行数据的写入。

Mock 数据

我们对CDN数据进行Mock,并进行统计可视化,工具代码请下载

  • 进行源码编译

    1
    2
    $ python setup.py sdist
    $ sudo python -m pip install dist/*
  • 运行工具

    1
    2
    3
    4
    5
    6
    7
    8
    9
    $ start_dashboard.py 
    --------------------------------------environment config--------------------------------------
    Target kafka port: localhost:9092
    Target kafka topic: cdn_access_log
    Target mysql://localhost:3306/flink
    Target mysql table: cdn_access_statistic
    Listen at: http://localhost:64731
    ----------------------------------------------------------------------------------------------
    To change above environment config, edit this file: /usr/local/bin/start_dashboard.py

打开可视化界面: (根据命令行提示的 Listen at:后面的地址)
-w1212

小结

本篇是场景案例系列,开篇分享了老子教导我们要学会放空自己,空杯才能注水。全篇围绕阿里云CDN日志实时分析需求进行展开,描述了任何利用PyFlink解决实际业务问题。最后又为大家提供了示例的数据的Mock工具和可视化统计页面,希望对大家有所帮助。