1.简介
flink-streaming-platform-web系统是基于Apache Flink 封装的一个可视化的、轻量级的flink web客户端系统,用户只需在web 界面进行sql配置就能完成流计算任务。
主要功能:包含任务配置、启/停任务、告警、日志等功能,支持sql语法提示,格式化、sql语句校验。
目的:减少开发、降低成本 完全实现sql化 流计算任务。
1、主要功能
[1] 任务支持单流 、双流、 单流与维表等。
[2] 支持本地模式、yarn-per模式、STANDALONE模式。
[3] 支持catalog、hive。
[4] 支持自定义udf、连接器等,完全兼容官方连接器。
[5] 支持sql的在线开发,语法提示,格式化。
[6] 支持钉钉告警、自定义回调告警、自动拉起任务。
[7] 支持自定义Jar提交任务。
[8] 支持多版本flink版本(需要用户编译对应flink版本)。
[9] 支持自动、手动savepoint备份,并且从savepoint恢复任务。
[10] 支持批任务如:hive。
目前flink版本已经升级到1.13
2、流程说明
2.1 软件版本列表
flink-streaming-platform-web.tar.gz
flink-streaming-platform-webtagV20220120(flink1.13.2) (下载地址)
flink-1.13.2
下载地址:https://repo.huaweicloud.com/apache/flink/flink-1.13.2/flink-1.13.2-bin-scala_2.12.tgz
依赖的jar包
操作mysql需要的jar包(flink jdbc connector下载地址 )
flink-connector-jdbc_2.11-1.13.2.jar
mysql-connector-java-8.0.30.jar(根据使用的mysql版本来选择,各版本下载地址 )
操作kafka需要的jar包(flink kafka connector下载地址)
这里只需要下载这一个jar包即可:flink-sql-connector-kafka_2.11-1.13.6.jar
将上述3个jar包,拷贝到flink的lib目录下,如下图所示:
flink-connector-clickhouse-1.13.2-SNAPSHOT.jar flink-sql-connector-kafka_2.11-1.13.6.jar log4j-api-2.12.1.jar
flink-connector-jdbc_2.11-1.13.2.jar flink-sql-connector-mysql-cdc-2.2.0.jar log4j-core-2.12.1.jar
flink-csv-1.13.2.jar flink-sql-connector-tidb-cdc-2.2.1.jar log4j-slf4j-impl-2.12.1.jar
flink-dist_2.12-1.13.2.jar flink-table_2.12-1.13.2.jar mysql-connector-java-8.0.30.jar
flink-json-1.13.2.jar flink-table-blink_2.12-1.13.2.jar
flink-shaded-zookeeper-3.4.14.jar log4j-1.2-api-2.12.1.jar
2.2 添加第3放依赖jar包的方法
第一种方法:下载连接器Jar包后后直接放到 flink/lib/目录下就可以使用了,其缺点是:
1、该方案存在jar冲突可能,特别是连接器多了以后
2、在非yarn模式下每次新增jar需要重启flink集群服务器
第二种方法:配置每一个flink任务时,放到http的服务下填写到三方地址,例如设置内部的http repo下载源(公司内部建议放到内网的某个http服务)
http://abc.com/jars/flink-connector-jdbc_2.11-1.12.0.jar
http://abc.com/jars/flink-sql-connector-kafka_2.11-1.12.0.jar
http://abc.com/jars/flink-streaming-udf.jar
http://ccblog.cn/jars/mysql-connector-java-5.1.25.jar
如下图所示:
3. 启动
3.1 启动flink
cd /work/flink/flink-1.12.0/bin
./start-cluster.sh
3.2 启动flink-streaming-platform-web
从git上下载完,进行解压,记住,要和之前的Flink在一个目录
[root@node212 local]# pwd
/usr/local
[root@node212 local]# ls
bin etc flink-1.13.2 flink-streaming-platform-web-1.13.2 games include lib lib64 libexec sbin share src
安装mysql 5.7.36环境(略):
修改配置文件,主要修改数据库地址,在config的application.properties里
####jdbc信息
server.port=9084
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/flink_web?characterEncoding=UTF-8&useSSL=false
spring.datasource.username=root
spring.datasource.password=123456
useSSL=false 注意这个,如果你的MySQL支持ssl那就不用加,不支持,记得加上,防止报错
创建数据表脚本,依然是在flink_web下,脚本在 https://github.com/zhp8341/flink-streaming-platform-web/blob/master/docs/sql/flink_web.sql,请自行下载和创建。
修改flink-streaming-platform-web占用的内存:
[root@node212 bin]# vim /usr/local/flink-streaming-platform-web-1.13.2/bin/deploy.sh
##JAVA_OPTS设置
JAVA_OPTS="-Xmx512M -Xms512M -Xmn128M -XX:MaxMetaspaceSize=512M -XX:MetaspaceSize=512M -XX:+UseConcMarkSweepGC -Xdebug -Xrunjdwp:transport=dt_socket,address=9901,server=y,suspend=n -XX:+UseCMSInitiatingOccupancyOnly -XX:CMSInitiatingOccupancyFraction=70 -Dcom.sun.management.jmxremote.port=8999 -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -XX:+ExplicitGCInvokesConcurrentAndUnloadsClasses -XX:+CMSClassUnloadingEnabled -XX:+ParallelRefProcEnabled -XX:+CMSScavengeBeforeRemark -XX:ErrorFile=../logs/hs_err_pid%p.log -XX:HeapDumpPath=../logs -XX:+HeapDumpOnOutOfMemoryError"
启动进程:
cd /usr/local/flink-streaming-platform-web-1.13.2/bin
sh deploy.sh start
//停止命令
sh deploy.sh stop
解密,为啥在进入bin里面,不是在其他目录启动呢,看启动脚本,你就明白了,主要是目录层级的问题,仔细看看吧。
4. 配置flink-streaming-platform-web
4.1 登录页面
启动后,访问web页面
打开页面查看
http://172.16.1.212:9084
登录号:admin 密码 123456。
4.2 系统配置
如下图,在下拉框中选择参数,在变量值文本框中填入对应的键值,点击保存即可。配置好的参数会出现在下面的列表中
4.3 任务配置
· 新增配置
在配置管理下,新增一个配置,即新建一个flink sql任务,并提交保存。
CREATE TABLE ck_rsc (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
)
WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://172.16.1.220:8123',
'database-name' = 'aaaaa',
'table-name' = 'rsc',
'sink.batch-size' = '500',
'sink.flush-interval' = '1000',
'sink.max-retries' = '3',
'username'='default',
'password'=''
);
CREATE TABLE source (
id INT,
name STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '172.16.1.221',
'port' = '3306',
'username' = 'root',
'password' = 'Hexin123.',
'database-name' = 'source',
'table-name' = 'source'
);
insert into ck_rsc(id,name) select id,name from source;
数据库建表语句
mysql:
mysql> use source;
Database changed
mysql> show create table source;
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| source | CREATE TABLE `source` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(20) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 |
+--------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)
clickhouse:
node220 :) use aaaaa;
USE aaaaa
Query id: ea03f620-d45b-4b0b-b649-250e929396fc
Ok.
0 rows in set. Elapsed: 0.003 sec.
node220 :) desc rsc;
DESCRIBE TABLE rsc
Query id: 9f85e006-4787-4644-90b4-31885bea372b
┌─name─┬─type─────────────┬─default_type─┬─default_expression─┬─comment─┬─codec_expression─┬─ttl_expression─┐
│ id │ Int32 │ │ │ │ │ │
│ name │ Nullable(String) │ │ │ │ │ │
└──────┴──────────────────┴──────────────┴────────────────────┴─────────┴──────────────────┴────────────────┘
2 rows in set. Elapsed: 0.021 sec.
· 依次点击 开启配置 -> 提交任务
5. 观察运行结果
5.1 观察任务提交结果
点解日志详情,观察已提交任务的运行状况,如下图则表示任务提交成功,并且任务正在成功运行。
如果提交失败,请排查flink的log/下日志文件,和flink-streaming-platform-web的日志文件
5.2 观察数据变化
5.3 在flink的ui上观察任务
访问flink的UI地址 http://172.16.1.212:8081/#/overview
观察正在运行的任务
结束
整体就是这个样子,其实这些使用sql-client也可以实现,不过有了可视化页面,更方便人们操作,并且web也提供了很多其他的功能,并且也在长期迭代中,希望大家多多支持(还有我)
另外如果有问题,多看看日志。
下面给几个主要查资料的网站
Flink-streaming-platform-web
https://github.com/zhp8341/flink-streaming-platform-web
Flink相关jar的repo
https://repo1.maven.org/maven2/org/apache/flink/
Flink官网
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/