一、使用JDBC方式同步
1. 使用Canal组件完成binlog的解析和数据同步;
2. Canal-Server进程会伪装成MySQL的slave,使用MySQL的binlog同步协议完成数据同步;
3. Canal-Adapter进程负责从canal-server获取解析后的binlog,并且通过jdbc接口写入到MySQL中;
主机拓扑图:
ip:172.16.1.222 canal-admin canal-server canal-adapter kafka zookeeper
使用canal.adapter同步数据从MySQL到MySQL不同的库,不同的表中
案例:
通过canal-adapter,同步一张表的增量数据,从source.order_test到target.order_test下面
MySQL数据库安装略:
mysql数据库中的库表如下:
mysql> desc source.order_test;
+-------------+---------------+------+-----+-------------------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+---------------+------+-----+-------------------+----------------+
| id | bigint(20) | NO | PRI | NULL | auto_increment |
| order_id | varchar(64) | NO | UNI | NULL | |
| amount | decimal(10,2) | NO | | 0.00 | |
| create_time | datetime | NO | | CURRENT_TIMESTAMP | |
+-------------+---------------+------+-----+-------------------+----------------+
4 rows in set (0.00 sec)
mysql> desc target.order_test;
+-------------+---------------+------+-----+-------------------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+---------------+------+-----+-------------------+----------------+
| id | bigint(20) | NO | PRI | NULL | auto_increment |
| order_id | varchar(64) | NO | UNI | NULL | |
| amount | decimal(10,2) | NO | | 0.00 | |
| create_time | datetime | NO | | CURRENT_TIMESTAMP | |
+-------------+---------------+------+-----+-------------------+----------------+
4 rows in set (0.00 sec)
前题是要安装并启动canal-server服务,启动服务,并配置为kafka模式(将消息发送到kafka中)
canal-server安装配置略 都安装在同一台主机上
canal-server相关的组件:
zookeeper
kafka
canal-admin安装配置略
在canal-admin图型界面中,修改canal-server中的配置如下:
#################################################
######### destinations #############
#################################################
canal.destinations = example #在这里添加example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false
在canal-admin图型界面中,添加对源表order_test的instance管理,创建topic为source_order_test的表监听
配置如下:
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=172.16.1.222:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=QWqw12!@
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName=source
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.instance.filter.regex=source.order_test
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
canal.mq.topic=source_order_test
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
1.安装canal.adapter
从github上下载 canal.adapter-1.1.5.tar.gz安装包,解压。
[root@node222 canal-adapter]# pwd
/usr/local/canal-adapter
[root@node222 canal-adapter]# ll
总用量 12
drwxr-xr-x 2 root root 95 10月 24 22:17 bin
drwxrwxrwx 8 root root 287 10月 24 22:15 conf
drwxr-xr-x 2 root root 4096 10月 24 15:53 lib
drwxrwxrwx 3 root root 21 10月 24 16:23 logs
drwxrwxrwx 2 root root 4096 4月 19 2021 plugin
2.启动canal.adapter
修改conf/application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: kafka #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# kafka consumer 这里是消费kafka中的消息
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
srcDataSources:
defaultDS:
url: jdbc:mysql://172.16.1.222:3306/source?useUnicode=true
username: root
password: Rscpass123.
canalAdapters:
- instance: source_order_test # canal instance Name or mq topic name 这里是对源表的instance name 也就是topic的名称
groups:
- groupId: g1
outerAdapters:
- name: rdb
key: mysql1
properties:
jdbc.driverClassName: com.mysql.jdbc.Driver
jdbc.url: jdbc:mysql://172.16.1.222:3306/target?useUnicode=true
jdbc.username: root
jdbc.password: Rscpass123.
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
# - name: es
# hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
# properties:
# mode: transport # or rest
# # security.auth: test:123456 # only used for rest mode
# cluster.name: elasticsearch
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
修改conf/rdb/mytest_user.yml
[root@node222 conf]# ls
application.yml application.yml.20221024 bootstrap.yml es6 es7 hbase kudu logback.xml META-INF rdb
[root@node222 conf]# cd rdb
[root@node222 rdb]# cat mytest_user.yml
dataSourceKey: defaultDS
destination: source_order_test
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
database: source
table: order_test
targetDb: target
targetTable: order_test
targetPk:
id: id
mapAll: true
# targetColumns:
# id:
# name:
# role_id:
# c_time:
# test1:
etlCondition: "where c_time>={}"
commitBatch: 1 # 批量提交的大小
## Mirror schema synchronize config
#dataSourceKey: defaultDS
#destination: example
#groupId: g1
#outerAdapterKey: mysql1
#concurrent: true
#dbMapping:
# mirrorDb: true
# database: mytest
两张表的结构是一样的话,mapAll直接设置为true
如果表结构不一致的话,可以用targetColumns设置 : 从表字段名字: 主表字段名字
配置完成后,启动adapter
/usr/local/canal-adapter/bin/startup.sh
4.测试
源端
mysql> use source;
Database changed
mysql> desc order_test;
+-------------+---------------+------+-----+-------------------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+---------------+------+-----+-------------------+----------------+
| id | bigint(20) | NO | PRI | NULL | auto_increment |
| order_id | varchar(64) | NO | UNI | NULL | |
| amount | decimal(10,2) | NO | | 0.00 | |
| create_time | datetime | NO | | CURRENT_TIMESTAMP | |
+-------------+---------------+------+-----+-------------------+----------------+
4 rows in set (0.01 sec)
mysql> insert into source.order_test(id,order_id,amount,create_time) values(1,'rsc',10.00,now());
Query OK, 1 row affected (0.01 sec)
mysql> insert into source.order_test(id,order_id,amount,create_time) values(2,'gmy',20.00,now());
Query OK, 1 row affected (0.00 sec)
mysql> insert into source.order_test(id,order_id,amount,create_time) values(3,'ryf',20.00,now());
Query OK, 1 row affected (0.01 sec)
mysql> insert into source.order_test(id,order_id,amount,create_time) values(4,'ryx',20.00,now());
Query OK, 1 row affected (0.01 sec)
mysql> delete from order_test where id is not null;
Query OK, 4 rows affected (0.02 sec)
目标端:
mysql> select * from order_test;
+----+----------+--------+---------------------+
| id | order_id | amount | create_time |
+----+----------+--------+---------------------+
| 1 | rsc | 10.00 | 2022-10-24 22:41:00 |
| 2 | gmy | 20.00 | 2022-10-24 22:41:21 |
| 3 | ryf | 20.00 | 2022-10-24 22:41:27 |
| 4 | ryx | 20.00 | 2022-10-24 22:41:35 |
+----+----------+--------+---------------------+
4 rows in set (0.00 sec)
mysql> select * from order_test;
Empty set (0.00 sec)
对应的adapeter日志
2022-10-24 22:18:33.129 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"amount":"20.0","create_time":"2022-10-24 22:18:32","id":"4","order_id":"gmy2"},"database":"source","destination":"source_order_test","old":null,"table":"order_test","type":"INSERT"}
2022-10-24 22:41:00.189 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"amount":"10.0","create_time":"2022-10-24 22:41:00","id":"1","order_id":"rsc"},"database":"source","destination":"source_order_test","old":null,"table":"order_test","type":"INSERT"}
2022-10-24 22:41:21.995 [pool-3-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"amount":"20.0","create_time":"2022-10-24 22:41:21","id":"2","order_id":"gmy"},"database":"source","destination":"source_order_test","old":null,"table":"order_test","type":"INSERT"}
2022-10-24 22:41:28.054 [pool-1-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"amount":"20.0","create_time":"2022-10-24 22:41:27","id":"3","order_id":"ryf"},"database":"source","destination":"source_order_test","old":null,"table":"order_test","type":"INSERT"}
2022-10-24 22:41:35.345 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"amount":"20.0","create_time":"2022-10-24 22:41:35","id":"4","order_id":"ryx"},"database":"source","destination":"source_order_test","old":null,"table":"order_test","type":"INSERT"}
2022-10-24 22:42:30.299 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"amount":"10.0","create_time":"2022-10-24 22:41:00","id":"1","order_id":"rsc"},"database":"source","destination":"source_order_test","old":null,"table":"order_test","type":"DELETE"}
2022-10-24 22:42:30.304 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"amount":"20.0","create_time":"2022-10-24 22:41:35","id":"4","order_id":"ryx"},"database":"source","destination":"source_order_test","old":null,"table":"order_test","type":"DELETE"}
2022-10-24 22:42:30.398 [pool-1-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"amount":"20.0","create_time":"2022-10-24 22:41:27","id":"3","order_id":"ryf"},"database":"source","destination":"source_order_test","old":null,"table":"order_test","type":"DELETE"}
2022-10-24 22:42:30.480 [pool-3-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"amount":"20.0","create_time":"2022-10-24 22:41:21","id":"2","order_id":"gmy"},"database":"source","destination":"source_order_test","old":null,"table":"order_test","type":"DELETE"}
其他注意事项
# mapAll: true
targetColumns:
c1: c1
c2: c2
c3: c3
c5: c4
c1列在targetPk中已经指定了,可以不指定;其他列即使名字没有改变,比如c2 c3,也要写进去,否则这几列抓不到数据。
如果只同步部分列的数据,那么就可以不用写所有的列映射关系了。
重启
sh restart.sh
canal-adapter/conf/rdb/mytest_user.yml的groupId应该和canal-adapter/conf/application.yml中的保持一致如果要同步多张表或者多个不同数据源,只要在canal-adapter/conf/rdb/mytest_user.yml和canal-adapter/conf/application.yml中再增加一个groupId即可。