介绍
mysql2ch 是一个用于同步 MySQL 到 ClickHouse 的工具,支持全量同步与增量同步。特性
支持全量同步与增量同步。
支持 DDL 与 DML,当前支持 DDL 字段新增与删除,支持所有的 DML。
丰富的配置项。
依赖软件版本
kafka-2.1.0,用户缓冲 MySQL binlog 的消息队列。需要先安装好
redis-5.0.6,缓存 MySQL binlog position 与 file。需要先安装好
Python 3.6.8 运行环境
安装
pip3 install mysql2ch
yum install postgresql-devel
[root@k8smaster ~]# mysql2ch --version
mysql2ch version, 0.5.3
这个软件有以下BUG,需要修改后才能使用:
BUG1:
pyproject.toml报错
pyproject.toml报错,提示找不到文件
解决方法: 修改成绝对路径
[root@k8smaster mysql2ch-master]# vim /usr/local/lib/python3.6/site-packages/mysql2ch/cli.py
38 def version():
39 with open("/usr/local/lib/python3.6/site-packages/mysql2ch/pyproject.toml") as f:
40 ret = re.findall(r'version = "(\d+\.\d+\.\d+)"', f.read())
41 return ret[0]
BUG2:
cat /usr/local/lib/python3.6/site-packages/mysql2ch/convert.py
37 token_list.tokens.append(SQLToken(Keyword, "column"))
38 token_list.tokens.append(SQLToken(Whitespace, " "))
39 # tokens = parsed.token_next(i)[1].tokens #这段代码注释
41 token_list.tokens.append(tokens[0])
配置文件:
cat /etc/mysql2ch.ini
[root@k8smaster ~]# cat /etc/mysql2ch.ini
[core]
# when set True, will display sql information.
debug = True
# current support redis and kafka
broker_type = redis
mysql_server_id = 33
# optional, read from `show master status` result if empty
init_binlog_file =mysql-bin.000980
# optional, read from `show master status` result if empty
init_binlog_pos =3492738
# these tables skip delete, multiple separated with comma, format with schema.table
skip_delete_tables = mysql.user
# these tables skip update, multiple separated with comma, format with schema.table
skip_update_tables = mysql.user
# skip delete or update dmls, multiple separated with comma, example: delete,update
skip_dmls =
# how many num to submit,recommend set 20000 when production
insert_num = 1
# how many seconds to submit,recommend set 60 when production
insert_interval = 1
# auto do full etl at first when table not exists
auto_full_etl = True
[sentry]
# sentry environment
environment = development
sentry dsn
dsn = https://xxxxxxxx@sentry.test.com/1
[redis]
host = 127.0.0.1
port = 6379
password =
db = 0
prefix = mysql2ch
# enable redis sentinel
sentinel = false
# redis sentinel hosts,multiple separated with comma
sentinel_hosts = 127.0.0.1:5000,127.0.0.1:5001,127.0.0.1:5002
sentinel_master = master
# stream max len, will delete redundant ones with FIFO
queue_max_len = 200000
[mysql]
host = 192.168.66.33
port = 3306
user = root
password = Hexin2007
# sync schema, format with mysql.schema, each schema for one section.
[mysql.test]
# multiple separated with comma
tables = rsc
# kafka partition, need when broker_type=kafka
kafka_partition = 0
[clickhouse]
host = localhost
port = 9000
user = default
password =
# need when broker_type=kafka
[kafka]
# kafka servers,multiple separated with comma
servers = 127.0.0.1:9092
topic = mysql2ch
全量同步
你可能需要在开始增量同步之前进行一次全量导入,或者使用--renew重新全量导入。
注意:
如果字段decimal字段为空时,会报错,可以通过以下方式手动进行全量同步:
+++++++开始+++++++++++
单次单表同步
方法一:
先在clickhouse创建表结构
CREATE TABLE hexin.hexin_erp_order_goods(`id` Int32, `order_id` Int32, `product_id` String, `variant_id` String, `product_name` String, `product_count` Int32, `product_price` Decimal(10, 2), `payment` Decimal(10, 2), `currency` String, `exchange_rate` Decimal(12, 4), `product_img_url` String, `product_unit` String, `product_attributes` String, `store_sku` String, `asin` String, `apply_cn` String, `apply_en` String, `apply_price` Decimal(10, 2), `apply_code` String, `discount` Decimal(10, 2), `apply_weight` Decimal(10, 2), `humanity_created` Int8, `storage_id` Nullable(Int32), `order_time` Int32, `is_less` Nullable(Int8), `less_num` Nullable(Int32))ENGINE = MergeTree ORDER BY id SETTINGS index_granularity = 8192
再通过以下语句将mysql的数据导入到clickhouse中
INSERT INTO hexin.hexin_erp_order_goods select * from mysql('192.168.66.33:3306','hexin','hexin_erp_order_goods','root','Hexin2007')
++++++++++++结束+++++++++++++++++++++
[root@k8smaster mysql2ch-master]# mysql2ch --config /etc/mysql2ch.ini etl --schema test --renew
synch --config /etc/synch.ini etl --schema test --renew
2020-06-24 11:07:06 - mysql2ch.reader:45 - DEBUG - select COLUMN_NAME from information_schema.COLUMNS where TABLE_SCHEMA='test' and TABLE_NAME='rsc' and COLUMN_KEY='PRI'
2020-06-24 11:07:06 - mysql2ch.writer:28 - DEBUG - drop table test.rsc
2020-06-24 11:07:06 - mysql2ch.replication:37 - INFO - drop table success:test.rsc
2020-06-24 11:07:06 - mysql2ch.writer:28 - DEBUG - select count(*)from system.tables where database = 'test' and name = 'rsc'
2020-06-24 11:07:06 - mysql2ch.writer:28 - DEBUG - CREATE TABLE test.rsc ENGINE = MergeTree ORDER BY id AS SELECT * FROM mysql('192.168.66.33:3306', 'test', 'rsc', 'root', 'Hexin2007')
2020-06-24 11:07:07 - mysql2ch.writer:39 - DEBUG - select COLUMN_NAME, COLUMN_TYPE from information_schema.COLUMNS where TABLE_NAME = 'rsc' and COLUMN_TYPE like '%decimal%'and TABLE_SCHEMA = 'test'
2020-06-24 11:07:07 - mysql2ch.replication:44 - INFO - etl success:test.rsc
$ mysql2ch etl -h
usage: mysql2ch etl [-h] --schema SCHEMA [--tables TABLES] [--renew]
optional arguments:
-h, --help show this help message and exit
--schema SCHEMA Schema to full etl.
--tables TABLES Tables to full etl,multiple tables split with comma.
--renew Etl after try to drop the target tables.
生产者
监听 MySQL binlog 并生产至 kafka。 (这里有BUG,请查看BUG列表)
mysql2ch --config /etc/mysql2ch.ini produce
/usr/local/bin/synch --config /etc/synch.ini produce
[root@k8smaster mysql2ch-master]# mysql2ch --config /etc/mysql2ch.ini produce
2020-06-24 11:44:18 - mysql2ch.producer:39 - INFO - start producer success
2020-06-24 11:44:18 - mysql2ch.reader:45 - DEBUG - select COLUMN_NAME from information_schema.COLUMNS where TABLE_SCHEMA='test' and TABLE_NAME='rsc' and COLUMN_KEY='PRI'
2020-06-24 11:44:18 - mysql2ch.reader:88 - INFO - start sync at 2020-06-24 11:44:18
2020-06-24 11:44:18 - mysql2ch.reader:89 - INFO - mysql binlog: mysql-bin.000980:3492738
2020-06-24 11:47:35 - mysql2ch.producer:67 - DEBUG - send to queue success: key:test,event:{'table': 'rsc', 'schema': 'test', 'action': 'insert', 'values': {'id': 7, 'name': '111'}, 'event_unixtime': 1592970455335448, 'action_core': '2'}
2020-06-24 11:47:35 - mysql2ch.producer:68 - DEBUG - success set binlog pos:mysql-bin.000980:5034453
2020-06-24 11:48:39 - mysql2ch.producer:67 - DEBUG - send to queue success: key:test,event:{'table': 'rsc', 'schema': 'test', 'action': 'delete', 'values': {'id': 7, 'name': '111'}, 'event_unixtime': 1592970519132017, 'action_core': '1'}
2020-06-24 11:48:39 - mysql2ch.producer:68 - DEBUG - success set binlog pos:mysql-bin.000980:5054489
2020-06-24 11:48:39 - mysql2ch.producer:76 - INFO - success send 2 events in 1 seconds
2020-06-24 11:48:39 - mysql2ch.producer:67 - DEBUG - send to queue success: key:test,event:{'table': 'rsc', 'schema': 'test', 'action': 'insert', 'values': {'id': 7, 'name': '3333'}, 'event_unixtime': 1592970519132849, 'action_core': '2'}
2020-06-24 11:48:39 - mysql2ch.producer:68 - DEBUG - success set binlog pos:mysql-bin.000980:5054489
2020-06-24 11:49:08 - mysql2ch.producer:67 - DEBUG - send to queue success: key:test,event:{'table': 'rsc', 'schema': 'test', 'action': 'delete', 'values': {'id': 7, 'name': '3333'}, 'event_unixtime': 1592970548923095, 'action_core': '1'}
2020-06-24 11:49:08 - mysql2ch.producer:68 - DEBUG - success set binlog pos:mysql-bin.000980:5087311
2020-06-24 11:49:08 - mysql2ch.producer:76 - INFO - success send 2 events in 1 seconds
消费者
从kafka 消费并插入 ClickHouse,使用--skip-error跳过错误行。
mysql2ch --config /etc/mysql2ch.ini consume --schema test
/usr/local/bin/synch --config /etc/synch.ini consume --schema hexin
on': 'delete', 'values': {'id': 7, 'name': '3333'}, 'event_unixtime': 1592970548923095, 'action_core': '1'}
2020-06-24 11:49:08 - mysql2ch.writer:28 - DEBUG - select count(*) from system.mutations where is_done=0 and database = 'test'
2020-06-24 11:49:09 - mysql2ch.writer:28 - DEBUG - alter table test.rsc delete where id in (7)
2020-06-24 11:49:09 - mysql2ch.writer:194 - INFO - test.rsc:success delete 1 rows
2020-06-24 11:49:09 - mysql2ch.consumer:121 - INFO - success commit 1 events
2020-06-24 11:49:10 - mysql2ch.consumer:54 - DEBUG - block timeout 1 seconds, len of events:0
2020-06-24 11:49:11 - mysql2ch.consumer:54 - DEBUG - block timeout 1 seconds, len of events:0
2020-06-24 11:49:12 - mysql2ch.consumer:54 - DEBUG - block timeout 1 seconds, len of events:0
2020-06-24 11:49:13 - mysql2ch.consumer:54 - DEBUG - block timeout 1 seconds, len of events:0
2020-06-24 11:49:14 - mysql2ch.consumer:54 - DEBUG - block timeout 1 seconds, len of events:0
2020-06-24 11:49:15 - mysql2ch.consumer:54 - DEBUG - block timeout 1 seconds, len of events:0
$ mysql2ch consume -h
usage: mysql2ch consume [-h] --schema SCHEMA [--skip-error] [--auto-offset-reset AUTO_OFFSET_RESET]
optional arguments:
-h, --help show this help message and exit
--schema SCHEMA Schema to consume.
--skip-error Skip error rows.
--auto-offset-reset AUTO_OFFSET_RESET
Kafka auto offset reset,default earliest.
未测试:
使用 docker-compose(推荐)
version: "3"
services:
producer:
env_file:
- .env
depends_on:
- redis
image: long2ice/mysql2ch:latest
command: mysql2ch produce
# add more service if you need.
consumer.test:
env_file:
- .env
depends_on:
- redis
- producer
image: long2ice/mysql2ch:latest
# consume binlog of test
command: mysql2ch consume --schema test
redis:
hostname: redis
image: redis:latest
volumes:
- redis:/data
ui:
env_file:
- .env
ports:
- 5000:5000
depends_on:
- redis
- producer
- consumer
image: long2ice/mysql2ch
command: mysql2ch ui
volumes:
redis:
可选
Sentry,错误报告,在.env配置 SENTRY_DSN后开启。
开源许可
本项目遵从 MIT开源许可。