官方链接:
https://github.com/itinycheng/flink-connector-clickhouse
编译的前题是要安装好jdk和mvn,这里省略Flink ClickHouse Connector
Flink SQL connector for ClickHouse database, this project Powered by ClickHouse JDBC.
Currently, the project supports Source/Sink Table and Flink Catalog.
Please create issues if you encounter bugs and any help for the project is greatly appreciated.
Connector Options
Option Required Default Type Description
url required none String The ClickHouse jdbc url in format clickhouse://<host>:<port>.
username optional none String The 'username' and 'password' must both be specified if any of them is specified.
password optional none String The ClickHouse password.
database-name optional default String The ClickHouse database name.
table-name required none String The ClickHouse table name.
use-local optional false Boolean Directly read/write local tables in case of distributed table engine.
sink.batch-size optional 1000 Integer The max flush size, over this will flush data.
sink.flush-interval optional 1s Duration Over this flush interval mills, asynchronous threads will flush data.
sink.max-retries optional 3 Integer The max retry times when writing records to the database failed.
sink.write-local optional false Boolean Removed from version 1.15, use use-local instead.
sink.partition-strategy optional balanced String Partition strategy: balanced(round-robin), hash(partition key), shuffle(random).
sink.partition-key optional none String Partition key used for hash strategy.
sink.ignore-delete optional true Boolean Whether to ignore delete statements.
sink.parallelism optional none Integer Defines a custom parallelism for the sink.
scan.partition.column optional none String The column name used for partitioning the input.
scan.partition.num optional none Integer The number of partitions.
scan.partition.lower-bound optional none Long The smallest value of the first partition.
scan.partition.upper-bound optional none Long The largest value of the last partition.
catalog.ignore-primary-key optional true Boolean Whether to ignore primary keys when using ClickHouseCatalog to create table. defaults to true.
Upsert mode notice:
Distributed table don't support the update/delete statements, if you want to use the update/delete statements, please be sure to write records to local table or set use-local to true.
The data is updated and deleted by the primary key, please be aware of this when using it in the partition table.
Data Type Mapping
Flink Type ClickHouse Type
CHAR String
VARCHAR String / IP / UUID
STRING String / Enum
BOOLEAN UInt8
BYTES FixedString
DECIMAL Decimal / Int128 / Int256 / UInt64 / UInt128 / UInt256
TINYINT Int8
SMALLINT Int16 / UInt8
INTEGER Int32 / UInt16 / Interval
BIGINT Int64 / UInt32
FLOAT Float32
DOUBLE Float64
DATE Date
TIME DateTime
TIMESTAMP DateTime
TIMESTAMP_LTZ DateTime
INTERVAL_YEAR_MONTH Int32
INTERVAL_DAY_TIME Int64
ARRAY Array
MAP Map
ROW Not supported
MULTISET Not supported
RAW Not supported
编译安装:
Maven Dependency
The project isn't published to the maven central repository, we need to deploy/install to our own repository before use it, step as follows:
# clone the project
git clone https://github.com/itinycheng/flink-connector-clickhouse.git
# enter the project directory
cd flink-connector-clickhouse/
# display remote branches
git branch -r
# checkout the branch you need
git checkout $branch_name
这边编译的是1.13.2版本
# install or deploy the project to our own repository
mvn clean install -DskipTests
mvn clean deploy -DskipTests
报错没有关系,因为没有配置远程库,实际上文件已生成:
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 31.504 s
[INFO] Finished at: 2022-08-16T22:17:50+08:00
[INFO] Final Memory: 51M/288M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-deploy-plugin:2.7:deploy (default-deploy) on project flink-connector-clickhouse: Deployment failed: repository element was not specified in the POM inside distributionManagement element or in -DaltDeploymentRepository=id::layout::url parameter -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException
[root@node212 flink-connector-clickhouse]# ls
dependency-reduced-pom.xml LICENSE mvnw mvnw.cmd pom.xml README.md src target tools
[root@node212 flink-connector-clickhouse]# cd target/
[root@node212 target]# ls
checkstyle-checker.xml generated-test-sources
checkstyle-result.xml maven-archiver
checkstyle-suppressions.xml maven-status
classes original-flink-connector-clickhouse-1.13.2-SNAPSHOT.jar
flink-connector-clickhouse-1.13.2-SNAPSHOT.jar test-classes
generated-sources
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-clickhouse</artifactId>
<version>1.13.2-SNAPSHOT</version>
</dependency>
++++++++++++++++++++++++++++++++++++++++++++++++++++++++=
How to use
Create and read/write table
-- register a clickhouse table `t_user` in flink sql.
CREATE TABLE t_user (
`user_id` BIGINT,
`user_type` INTEGER,
`language` STRING,
`country` STRING,
`gender` STRING,
`score` DOUBLE,
`list` ARRAY<STRING>,
`map` Map<STRING, BIGINT>,
PRIMARY KEY (`user_id`) NOT ENFORCED
) WITH (
'connector' = 'clickhouse',
'url' = 'clickhouse://{ip}:{port}',
'database-name' = 'tutorial',
'table-name' = 'users',
'sink.batch-size' = '500',
'sink.flush-interval' = '1000',
'sink.max-retries' = '3'
);
-- read data from clickhouse
SELECT user_id, user_type from t_user;
-- write data into the clickhouse table from the table `T`
INSERT INTO t_user
SELECT cast(`user_id` as BIGINT), `user_type`, `lang`, `country`, `gender`, `score`, ARRAY['CODER', 'SPORTSMAN'], CAST(MAP['BABA', cast(10 as BIGINT), 'NIO', cast(8 as BIGINT)] AS MAP<STRING, BIGINT>) FROM T;
Create and use ClickHouseCatalog
Scala
val tEnv = TableEnvironment.create(setting)
val props = new util.HashMap[String, String]()
props.put(ClickHouseConfig.DATABASE_NAME, "default")
props.put(ClickHouseConfig.URL, "clickhouse://127.0.0.1:8123")
props.put(ClickHouseConfig.USERNAME, "username")
props.put(ClickHouseConfig.PASSWORD, "password")
props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s")
val cHcatalog = new ClickHouseCatalog("clickhouse", props)
tEnv.registerCatalog("clickhouse", cHcatalog)
tEnv.useCatalog("clickhouse")
tEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select...");
Java
TableEnvironment tEnv = TableEnvironment.create(setting);
Map<String, String> props = new HashMap<>();
props.put(ClickHouseConfig.DATABASE_NAME, "default")
props.put(ClickHouseConfig.URL, "clickhouse://127.0.0.1:8123")
props.put(ClickHouseConfig.USERNAME, "username")
props.put(ClickHouseConfig.PASSWORD, "password")
props.put(ClickHouseConfig.SINK_FLUSH_INTERVAL, "30s");
Catalog cHcatalog = new ClickHouseCatalog("clickhouse", props);
tEnv.registerCatalog("clickhouse", cHcatalog);
tEnv.useCatalog("clickhouse");
tEnv.executeSql("insert into `clickhouse`.`default`.`t_table` select...");
SQL
> CREATE CATALOG clickhouse WITH (
'type' = 'clickhouse',
'url' = 'clickhouse://127.0.0.1:8123',
'username' = 'username',
'password' = 'password',
'database-name' = 'default',
'use-local' = 'false',
...
);
> USE CATALOG clickhouse;
> SELECT user_id, user_type FROM `default`.`t_user` limit 10;
> INSERT INTO `default`.`t_user` SELECT ...;
Roadmap
Implement the Flink SQL Sink function.
Support array and Map types.
Support ClickHouseCatalog.
Implement the Flink SQL Source function.