使用canal-adapter-1.1.5时时增量同步数据从MySQL到MySQL

原理:
一、使用JDBC方式同步
1. 使用Canal组件完成binlog的解析和数据同步;
2. Canal-Server进程会伪装成MySQL的slave,使用MySQL的binlog同步协议完成数据同步;
3. Canal-Adapter进程负责从canal-server获取解析后的binlog,并且通过jdbc接口写入到MySQL中;

主机拓扑图:

ip:
172.16.1.222       canal-admin   canal-server  canal-adapter   kafka  zookeeper
 

使用canal.adapter同步数据从MySQL到MySQL不同的库,不同的表中

案例:
通过canal-adapter,同步一张表的增量数据,从source.order_test到target.order_test下面

MySQL数据库安装略:
mysql数据库中的库表如下:
mysql> desc source.order_test;
+-------------+---------------+------+-----+-------------------+----------------+
| Field       | Type          | Null | Key | Default           | Extra          |
+-------------+---------------+------+-----+-------------------+----------------+
| id          | bigint(20)    | NO   | PRI | NULL              | auto_increment |
| order_id    | varchar(64)   | NO   | UNI | NULL              |                |
| amount      | decimal(10,2) | NO   |     | 0.00              |                |
| create_time | datetime      | NO   |     | CURRENT_TIMESTAMP |                |
+-------------+---------------+------+-----+-------------------+----------------+
4 rows in set (0.00 sec)


mysql> desc target.order_test;
+-------------+---------------+------+-----+-------------------+----------------+
| Field       | Type          | Null | Key | Default           | Extra          |
+-------------+---------------+------+-----+-------------------+----------------+
| id          | bigint(20)    | NO   | PRI | NULL              | auto_increment |
| order_id    | varchar(64)   | NO   | UNI | NULL              |                |
| amount      | decimal(10,2) | NO   |     | 0.00              |                |
| create_time | datetime      | NO   |     | CURRENT_TIMESTAMP |                |
+-------------+---------------+------+-----+-------------------+----------------+
4 rows in set (0.00 sec)


前题是要安装并启动canal-server服务,启动服务,并配置为kafka模式(将消息发送到kafka中)
canal-server安装配置略 都安装在同一台主机上
        canal-server相关的组件:
                zookeeper
                kafka
canal-admin安装配置略

在canal-admin图型界面中,修改canal-server中的配置如下:
#################################################
#########                 destinations                #############
#################################################
canal.destinations = example        #在这里添加example
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = false


在canal-admin图型界面中,添加对源表order_test的instance管理,创建topic为source_order_test的表监听
配置如下:
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0


# enable gtid use true/false
canal.instance.gtidon=false


# position info
canal.instance.master.address=172.16.1.222:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=


# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=


# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal


#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=


# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=QWqw12!@
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName=source
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==


# table regex
canal.instance.filter.regex=source.order_test
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch


# mq config
canal.mq.topic=source_order_test
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
 
1.安装canal.adapter
从github上下载 canal.adapter-1.1.5.tar.gz安装包,解压。
[root@node222 canal-adapter]# pwd
/usr/local/canal-adapter
[root@node222 canal-adapter]# ll
总用量 12
drwxr-xr-x 2 root root   95 10月 24 22:17 bin
drwxrwxrwx 8 root root  287 10月 24 22:15 conf
drwxr-xr-x 2 root root 4096 10月 24 15:53 lib
drwxrwxrwx 3 root root   21 10月 24 16:23 logs
drwxrwxrwx 2 root root 4096 4月  19 2021 plugin

2.启动canal.adapter
修改conf/application.yml
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: kafka #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # kafka consumer 这里是消费kafka中的消息
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://172.16.1.222:3306/source?useUnicode=true
      username: root
      password: Rscpass123.
  canalAdapters:
  - instance: source_order_test # canal instance Name or mq topic name 这里是对源表的instance name 也就是topic的名称
    groups:
    - groupId: g1
      outerAdapters:
      - name: rdb
        key: mysql1
        properties:
          jdbc.driverClassName: com.mysql.jdbc.Driver
          jdbc.url: jdbc:mysql://172.16.1.222:3306/target?useUnicode=true
          jdbc.username: root
          jdbc.password: Rscpass123.
#      - name: rdb
#        key: oracle1
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
#      - name: es
#        hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
#        properties:
#          mode: transport # or rest
#          # security.auth: test:123456 #  only used for rest mode
#          cluster.name: elasticsearch
#        - name: kudu
#          key: kudu
#          properties:
#            kudu.master.address: 127.0.0.1 # ',' split multi address


修改conf/rdb/mytest_user.yml
[root@node222 conf]# ls
application.yml  application.yml.20221024  bootstrap.yml  es6  es7  hbase  kudu  logback.xml  META-INF  rdb
[root@node222 conf]# cd rdb
[root@node222 rdb]# cat mytest_user.yml
dataSourceKey: defaultDS
destination: source_order_test
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
  database: source
  table: order_test
  targetDb: target
  targetTable: order_test
  targetPk:
    id: id
  mapAll: true
#  targetColumns:
#    id:
#    name:
#    role_id:
#    c_time:
#    test1:
  etlCondition: "where c_time>={}"
  commitBatch: 1 # 批量提交的大小
## Mirror schema synchronize config
#dataSourceKey: defaultDS
#destination: example
#groupId: g1
#outerAdapterKey: mysql1
#concurrent: true
#dbMapping:
#  mirrorDb: true
#  database: mytest
两张表的结构是一样的话,mapAll直接设置为true 
如果表结构不一致的话,可以用targetColumns设置 : 从表字段名字: 主表字段名字

配置完成后,启动adapter
  /usr/local/canal-adapter/bin/startup.sh


4.测试
源端
mysql> use source;
Database changed
mysql> desc order_test;
+-------------+---------------+------+-----+-------------------+----------------+
| Field       | Type          | Null | Key | Default           | Extra          |
+-------------+---------------+------+-----+-------------------+----------------+
| id          | bigint(20)    | NO   | PRI | NULL              | auto_increment |
| order_id    | varchar(64)   | NO   | UNI | NULL              |                |
| amount      | decimal(10,2) | NO   |     | 0.00              |                |
| create_time | datetime      | NO   |     | CURRENT_TIMESTAMP |                |
+-------------+---------------+------+-----+-------------------+----------------+
4 rows in set (0.01 sec)


mysql> insert into source.order_test(id,order_id,amount,create_time) values(1,'rsc',10.00,now());
Query OK, 1 row affected (0.01 sec)


mysql> insert into source.order_test(id,order_id,amount,create_time) values(2,'gmy',20.00,now());
Query OK, 1 row affected (0.00 sec)


mysql> insert into source.order_test(id,order_id,amount,create_time) values(3,'ryf',20.00,now());
Query OK, 1 row affected (0.01 sec)


mysql> insert into source.order_test(id,order_id,amount,create_time) values(4,'ryx',20.00,now());
Query OK, 1 row affected (0.01 sec)




mysql> delete from order_test where id is not null;
Query OK, 4 rows affected (0.02 sec)


目标端:
mysql> select * from order_test;
+----+----------+--------+---------------------+
| id | order_id | amount | create_time         |
+----+----------+--------+---------------------+
|  1 | rsc      |  10.00 | 2022-10-24 22:41:00 |
|  2 | gmy      |  20.00 | 2022-10-24 22:41:21 |
|  3 | ryf      |  20.00 | 2022-10-24 22:41:27 |
|  4 | ryx      |  20.00 | 2022-10-24 22:41:35 |
+----+----------+--------+---------------------+
4 rows in set (0.00 sec)


mysql> select * from order_test;
Empty set (0.00 sec)

对应的adapeter日志

2022-10-24 22:18:33.129 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"amount":"20.0","create_time":"2022-10-24 22:18:32","id":"4","order_id":"gmy2"},"database":"source","destination":"source_order_test","old":null,"table":"order_test","type":"INSERT"}
2022-10-24 22:41:00.189 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"amount":"10.0","create_time":"2022-10-24 22:41:00","id":"1","order_id":"rsc"},"database":"source","destination":"source_order_test","old":null,"table":"order_test","type":"INSERT"}
2022-10-24 22:41:21.995 [pool-3-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"amount":"20.0","create_time":"2022-10-24 22:41:21","id":"2","order_id":"gmy"},"database":"source","destination":"source_order_test","old":null,"table":"order_test","type":"INSERT"}
2022-10-24 22:41:28.054 [pool-1-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"amount":"20.0","create_time":"2022-10-24 22:41:27","id":"3","order_id":"ryf"},"database":"source","destination":"source_order_test","old":null,"table":"order_test","type":"INSERT"}
2022-10-24 22:41:35.345 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"amount":"20.0","create_time":"2022-10-24 22:41:35","id":"4","order_id":"ryx"},"database":"source","destination":"source_order_test","old":null,"table":"order_test","type":"INSERT"}
2022-10-24 22:42:30.299 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"amount":"10.0","create_time":"2022-10-24 22:41:00","id":"1","order_id":"rsc"},"database":"source","destination":"source_order_test","old":null,"table":"order_test","type":"DELETE"}
2022-10-24 22:42:30.304 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"amount":"20.0","create_time":"2022-10-24 22:41:35","id":"4","order_id":"ryx"},"database":"source","destination":"source_order_test","old":null,"table":"order_test","type":"DELETE"}
2022-10-24 22:42:30.398 [pool-1-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"amount":"20.0","create_time":"2022-10-24 22:41:27","id":"3","order_id":"ryf"},"database":"source","destination":"source_order_test","old":null,"table":"order_test","type":"DELETE"}
2022-10-24 22:42:30.480 [pool-3-thread-1] DEBUG c.a.o.canal.client.adapter.rdb.service.RdbSyncService - DML: {"data":{"amount":"20.0","create_time":"2022-10-24 22:41:21","id":"2","order_id":"gmy"},"database":"source","destination":"source_order_test","old":null,"table":"order_test","type":"DELETE"}


其他注意事项

字段映射
#  mapAll: true
targetColumns:
   c1: c1
   c2: c2
   c3: c3
   c5: c4
c1列在targetPk中已经指定了,可以不指定;其他列即使名字没有改变,比如c2 c3,也要写进去,否则这几列抓不到数据。

如果只同步部分列的数据,那么就可以不用写所有的列映射关系了。

重启

sh restart.sh

canal-adapter/conf/rdb/mytest_user.yml的groupId应该和canal-adapter/conf/application.yml中的保持一致
如果要同步多张表或者多个不同数据源,只要在canal-adapter/conf/rdb/mytest_user.yml和canal-adapter/conf/application.yml中再增加一个groupId即可。
分割线
感谢打赏
江西数库信息技术有限公司
YWSOS.COM 平台代运维解决方案
 评论
 发表评论
姓   名:

Powered by AKCMS