当前位置:首页 > 站长知识 > 数据库 > 正文内容

Canal实现MYSQL实时数据同步

2024-11-29数据库41

部署Canal-Admin

1. 拉取Canal-Admin镜像

为了兼容MYSQL8.0+, 我们需要拉取 v1.1.7的镜像

1docker pull canal/canal-admin:v1.1.7

2. 创建目录

1mkdir -p /data/canal-server/conf/

3. 创建canal_manager数据库

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110CREATE DATABASE /*!32312 IF NOT EXISTS*/ `canal_manager` /*!40100 DEFAULT CHARACTER SET utf8 COLLATE utf8_bin */; USE `canal_manager`; SET NAMES utf8;SET FOREIGN_KEY_CHECKS = 0; -- ------------------------------ Table structure for canal_adapter_config-- ----------------------------DROP TABLE IF EXISTS `canal_adapter_config`;CREATE TABLE `canal_adapter_config` (  `id` bigint(20) NOT NULL AUTO_INCREMENT,  `category` varchar(45) NOT NULL,  `name` varchar(45) NOT NULL,  `status` varchar(45) DEFAULT NULL,  `content` text NOT NULL,  `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ------------------------------ Table structure for canal_cluster-- ----------------------------DROP TABLE IF EXISTS `canal_cluster`;CREATE TABLE `canal_cluster` (  `id` bigint(20) NOT NULL AUTO_INCREMENT,  `name` varchar(63) NOT NULL,  `zk_hosts` varchar(255) NOT NULL,  `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ------------------------------ Table structure for canal_config-- ----------------------------DROP TABLE IF EXISTS `canal_config`;CREATE TABLE `canal_config` (  `id` bigint(20) NOT NULL AUTO_INCREMENT,  `cluster_id` bigint(20) DEFAULT NULL,  `server_id` bigint(20) DEFAULT NULL,  `name` varchar(45) NOT NULL,  `status` varchar(45) DEFAULT NULL,  `content` text NOT NULL,  `content_md5` varchar(128) NOT NULL,  `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,  PRIMARY KEY (`id`),  UNIQUE KEY `sid_UNIQUE` (`server_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ------------------------------ Table structure for canal_instance_config-- ----------------------------DROP TABLE IF EXISTS `canal_instance_config`;CREATE TABLE `canal_instance_config` (  `id` bigint(20) NOT NULL AUTO_INCREMENT,  `cluster_id` bigint(20) DEFAULT NULL,  `server_id` bigint(20) DEFAULT NULL,  `name` varchar(45) NOT NULL,  `status` varchar(45) DEFAULT NULL,  `content` text NOT NULL,  `content_md5` varchar(128) DEFAULT NULL,  `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,  PRIMARY KEY (`id`),  UNIQUE KEY `name_UNIQUE` (`name`)) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ------------------------------ Table structure for canal_node_server-- ----------------------------DROP TABLE IF EXISTS `canal_node_server`;CREATE TABLE `canal_node_server` (  `id` bigint(20) NOT NULL AUTO_INCREMENT,  `cluster_id` bigint(20) DEFAULT NULL,  `name` varchar(63) NOT NULL,  `ip` varchar(63) NOT NULL,  `admin_port` int(11) DEFAULT NULL,  `tcp_port` int(11) DEFAULT NULL,  `metric_port` int(11) DEFAULT NULL,  `status` varchar(45) DEFAULT NULL,  `modified_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8; -- ------------------------------ Table structure for canal_user-- ----------------------------DROP TABLE IF EXISTS `canal_user`;CREATE TABLE `canal_user` (  `id` bigint(20) NOT NULL AUTO_INCREMENT,  `username` varchar(31) NOT NULL,  `password` varchar(128) NOT NULL,  `name` varchar(31) NOT NULL,  `roles` varchar(31) NOT NULL,  `introduction` varchar(255) DEFAULT NULL,  `avatar` varchar(255) DEFAULT NULL,  `creation_date` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8; SET FOREIGN_KEY_CHECKS = 1; -- ------------------------------ Records of canal_user-- ----------------------------BEGIN;INSERT INTO `canal_user` VALUES (1, 'admin', '6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9', 'Canal Manager', 'admin', NULL, NULL, '2019-07-14 00:05:28');COMMIT; SET FOREIGN_KEY_CHECKS = 1;

4. 启动Canal-Admin

如果是服务器部署的话,记得开放对应的端口号 8089:8089

1docker run -d --name canal-admin -p 8089:8089 canal/canal-admin:v1.1.7

5. 拷贝配置文件

1docker cp canal-admin:/home/admin/canal-admin/conf/application.yml /data/canal-admin/conf/

6. 删除Canal-Admin容器

1docker rm -f canal-admin

7. 修改配置文件

(注意修改注释位置的信息) 一共3处修改

123456789101112131415161718192021server:  port: 8089spring:  jackson:    date-format: yyyy-MM-dd HH:mm:ss    time-zone: GMT+8 spring.datasource:  address: xxx.xxx.xxx.xxx:3306     #创建数据库canal_manager的地址及端口号  database: canal_manager              username: root                    #数据库账号  password: xxxxxx                  #数据库密码  driver-class-name: com.mysql.jdbc.Driver  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false  hikari:    maximum-pool-size: 30    minimum-idle: 1 canal:  adminUser: admin  adminPasswd: admin

8. 部署Canal-Admin

1234docker run --name canal-admin -p 8089:8089 \-v /data/canal-admin/conf/application.yml:/home/admin/canal-admin/conf/application.yml \-v /data/canal-admin/logs/:/home/admin/canal-admin/logs/ \-d canal/canal-admin:v1.1.7

9. 访问Canal-Admin

访问 xxx.xxx.xxx.xxx:8089, 前面换成自己的服务器、虚拟机地址

部署Canal-Server


0. 前置工作

查看MYSQL是否 开启log_bin日志 和 日志记录格式 是否为Row
一般都是开启的, 可以通过下列命令在 Navicat 或其他数据库工具进行查看

12show variables like 'log_bin';show variables like 'binlog_format';

授权账号权限, 复制下列命令执行即可

123CREATE USER canal IDENTIFIED BY 'canal';  GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';FLUSH PRIVILEGES;

1. 拉取Canal-Server镜像

1docker pull canal/canal-server:v1.1.7

2. 创建目录

1mkdir -p /data/canal-server/conf/

3. 启动Canal-Server

如果是服务器部署的话,记得开放对应的端口号 11111:11111

1docker run -d --name canal-server -p 11111:11111 canal/canal-server:v1.1.7

4. 拷贝配置文件

12docker cp canal-server:/home/admin/canal-server/conf/canal.properties /data/canal-server/conf/docker cp canal-server:/home/admin/canal-server/conf/example/instance.properties /data/canal-server/conf/

5. 删除Canal-Server

1docker rm -f canal-server

6. 修改配置文件

canal.properties 一共1处修改

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104# tcp bind ipcanal.ip =# register ip to zookeepercanal.register.ip =canal.port = 11111canal.metrics.pull.port = 11112 # canal admin configcanal.admin.manager = xxx.xxx.xxx.xxx:8089  # 改成自己的数据库地址canal.admin.port = 11110canal.admin.user = admincanal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 canal.zkServers =# flush data to zkcanal.zookeeper.flush.period = 1000canal.withoutNetty = false# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQcanal.serverMode = tcp# flush meta cursor/parse position to filecanal.file.data.dir = ${canal.conf.dir}canal.file.flush.period = 1000## memory store RingBuffer size, should be Math.pow(2,n)canal.instance.memory.buffer.size = 16384## memory store RingBuffer used memory unit size , default 1kbcanal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZEcanal.instance.memory.batch.mode = MEMSIZEcanal.instance.memory.rawEntry = true ## detecing configcanal.instance.detecting.enable = falsecanal.instance.detecting.sql = select 1canal.instance.detecting.interval.time = 3canal.instance.detecting.retry.threshold = 3canal.instance.detecting.heartbeatHaEnable = false canal.instance.transaction.size =  1024# mysql fallback connected to new master should fallback timescanal.instance.fallbackIntervalInSeconds = 60 # network configcanal.instance.network.receiveBufferSize = 16384canal.instance.network.sendBufferSize = 16384canal.instance.network.soTimeout = 30 # binlog filter configcanal.instance.filter.druid.ddl = truecanal.instance.filter.query.dcl = falsecanal.instance.filter.query.dml = falsecanal.instance.filter.query.ddl = falsecanal.instance.filter.table.error = falsecanal.instance.filter.rows = falsecanal.instance.filter.transaction.entry = falsecanal.instance.filter.dml.insert = falsecanal.instance.filter.dml.update = falsecanal.instance.filter.dml.delete = false # binlog format/image checkcanal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB # binlog ddl isolationcanal.instance.get.ddl.isolation = false # parallel parser configcanal.instance.parser.parallel = truecanal.instance.parser.parallelBufferSize = 256 # table meta tsdb infocanal.instance.tsdb.enable = truecanal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;canal.instance.tsdb.dbUsername = canalcanal.instance.tsdb.dbPassword = canal# dump snapshot interval, default 24 hourcanal.instance.tsdb.snapshot.interval = 24# purge snapshot expire , default 360 hour(15 days)canal.instance.tsdb.snapshot.expire = 360 ##########################################################       destinations        ##############################################################canal.destinations = example# conf root dircanal.conf.dir = ../conf# auto scan instance dir add/remove and start/stop instancecanal.auto.scan = truecanal.auto.scan.interval = 5canal.auto.reset.latest.pos.mode = false canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml canal.instance.global.mode = springcanal.instance.global.lazy = falsecanal.instance.global.manager.address = ${canal.admin.manager}canal.instance.global.spring.xml = classpath:spring/file-instance.xml # canal admin configcanal.admin.manager = xxx.xxx.xxx.xxx:8089 # 改成自己的数据库地址# admin auto registercanal.admin.register.auto = truecanal.admin.register.cluster =canal.admin.register.name =

instance.properties 一共2处修改

1234567891011121314151617181920212223242526272829303132333435363738################################################### mysql serverId , v1.0.26+ will autoGen canal.instance.mysql.slaveId=10 # 这里的ID不要和Mysql的重复即可,可以直接填10 # enable gtid use true/falsecanal.instance.gtidon=false # position infocanal.instance.master.address=xxx.xxx.xxx.xxx:3306 # 改成需要监听的数据库地址canal.instance.master.journal.name=canal.instance.master.position=canal.instance.master.timestamp=canal.instance.master.gtid= # rds oss binlogcanal.instance.rds.accesskey=canal.instance.rds.secretkey=canal.instance.rds.instanceId= # table meta tsdb infocanal.instance.tsdb.enable=true # username/passwordcanal.instance.dbUsername=canalcanal.instance.dbPassword=canalcanal.instance.connectionCharset = UTF-8canal.instance.enableDruid=false # table regexcanal.instance.filter.regex=.*\\..*# table black regexcanal.instance.filter.black.regex=mysql\\.slave_.* # mq configcanal.mq.topic=examplecanal.mq.partition=0canal.instance.multi.stream.on=false#################################################

7. 部署Canal-Server

12345docker run --name canal-server -p 11111:11111 \-v /data/canal-server/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \-v /data/canal-server/conf/canal.properties:/home/admin/canal-server/conf/canal.properties \-v /data/canal-server/logs/:/home/docker/canal-server/logs/ \-d canal/canal-server:v1.1.7

部署Canal-Adapter


由于这个 Canal-Adapter 作者并没有上传对应的镜像
但是他上传了对应的 tar ,所以我们可以制作对应的镜像
Canal-Adapter 的下载链接: Canal-Adapter

1. 创建对应的文件,文件如下

路径尽量跟我的一致 /opt/canal/ ,因为后面的文件都是在这个路径下

2. 对应的文件内容如下

Dockerfile

12345678910111213FROM openjdk:11COPY canal.adapter-*.tar.gz /tmp/     RUN \    mkdir -p /opt/canal-adapter && \    tar -zxf /tmp/canal.adapter-*.tar.gz -C /opt/canal-adapter && \    rm -r /tmp/canal.adapter-*.tar.gz COPY startup.sh /opt/canal-adapter/bin/startup.sh WORKDIR /opt/canal-adapter CMD ["sh", "-c", "sh /opt/canal-adapter/bin/startup.sh && tail -F logs/adapter/adapter.log"]

startup.sh

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879#!/bin/bash current_path=`pwd`case "`uname`" in    Linux)        bin_abs_path=$(readlink -f $(dirname $0))        ;;    *)        bin_abs_path=`cd $(dirname $0); pwd`        ;;esacbase=${bin_abs_path}/..export.UTF-8export BASE=$base if [ -f $base/bin/adapter.pid ] ; then    echo "found adapter.pid , Please run stop.sh first ,then startup.sh" 2>&2    exit 1fi if [ ! -d $base/logs ] ; then    mkdir -p $base/logsfi ## set java pathif [ -z "$JAVA" ] ; then  JAVA=$(which java)fi ALIBABA_JAVA="/usr/alibaba/java/bin/java"TAOBAO_JAVA="/opt/taobao/java/bin/java"if [ -z "$JAVA" ]; then  if [ -f $ALIBABA_JAVA ] ; then    JAVA=$ALIBABA_JAVA  elif [ -f $TAOBAO_JAVA ] ; then    JAVA=$TAOBAO_JAVA  else    echo "Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH." 2>&2    exit 1  fifi case "$#"in0 )  ;;2 )  if [ "$1" = "debug" ]; then    DEBUG_PORT=$2    DEBUG_SUSPEND="n"    JAVA_DEBUG_OPT="-Xdebug -Xnoagent -Djava.compiler=NONE -Xrunjdwp:transport=dt_socket,address=$DEBUG_PORT,server=y,suspend=$DEBUG_SUSPEND"  fi  ;;* )  echo "THE PARAMETERS MUST BE TWO OR LESS.PLEASE CHECK AGAIN."  exit;;esac str=`file -L $JAVA | grep 64-bit`if [ -n "$str" ]; then    JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -Xss256k -XX:+DisableExplicitGC -XX:+HeapDumpOnOutOfMemoryError"else    JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m "fi JAVA_OPTS=" $JAVA_OPTS -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true -Dfile.encoding=UTF-8"ADAPTER_OPTS="-DappName=canal-adapter" for i in $base/lib/*;    do CLASSPATH=$i:"$CLASSPATH";done CLASSPATH="$base/conf:$CLASSPATH"; echo "cd to $bin_abs_path for workaround relative path"cd $bin_abs_path echo CLASSPATH :$CLASSPATHexec $JAVA $JAVA_OPTS $JAVA_DEBUG_OPT $ADAPTER_OPTS -classpath .:$CLASSPATH com.alibaba.otter.canal.adapter.launcher.CanalAdapterApplication

3. 构造 Canal-Adapter 镜像

12cd /opt/canaldocker build -t canal/canal-adapter:v1.1.7 .

4. 创建目录

1mkdir -p /data/canal-adapter/conf/es7

5. 启动Canal-Adapter

1docker run -d --name canal-adapter -p 8081:8081 canal/canal-adapter:v1.1.7

6. 拷贝配置文件

123docker cp canal-adapter:/opt/canal-adapter/conf/application.yml /data/canal-adapter/conf/docker cp canal-adapter:/opt/canal-adapter/conf/bootstrap.yml /data/canal-adapter/conf/docker cp canal-adapter:/opt/canal-adapter/conf/es7/mytest_user.yml /data/canal-adapter/conf/es7

7. 删除Canal-Adapter

1docker rm -f canal-adapter

8. 修改配置文件

application.yml 一共7处地方

123456789101112131415161718192021222324252627282930313233343536373839404142server:  port: 8081spring:  jackson:    date-format: yyyy-MM-dd HH:mm:ss    time-zone: GMT+8    default-property-inclusion: non_null canal.conf:  mode: tcp #tcp kafka rocketMQ rabbitMQ  flatMessage: true  zookeeperHosts:  syncBatchSize: 1000  retries: -1  timeout:  accessKey:  secretKey:  consumerProperties:    # canal tcp consumer    canal.tcp.server.host: xxx.xxx.xxx.xxx:11111 #改成部署canal-server的地址    canal.tcp.zookeeper.hosts:    canal.tcp.batch.size: 500    canal.tcp.username:    canal.tcp.password:   srcDataSources:    defaultDS:      url: jdbc:mysql://xxx.xxx.xxx:3341/infusion-xxxxx?useUnicode=true # 监听的数据库地址      username: root                           #数据库账号      password: xxxxxxx                        #数据库密码  canalAdapters:  - instance: example                          #如果没改过的话 默认这个即可    groups:    - groupId: g1      outerAdapters:      - name: logger      - name: es7        hosts: http://xxx.xxx.xxx:9200         #部署Es的服务器地址        properties:          mode: rest # or rest          # security.auth: test:123456 #  only used for rest mode          cluster.name: es                      #部署Es的容器名字

bootstrap.yml

123456canal:  manager:    jdbc:      url: jdbc:mysql://xxxxxx:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8      username: root      password: xxxxxxx

coupon_record.yml 一共3处地方

12345678dataSourceKey: defaultDS destination: example        #如果之前canal-server没改就使用默认examplegroupId: g1 esMapping:  _index: coupon_record         #对应索引库名称  _id: _id   sql: "select id as _id,coupon_id,create_time,use_state,openid,user_type,user_name,coupon_title,start_time,end_time,order_id,price,condition_price,del_flag from sys_coupon_record"     commitBatch: 3000

9. 部署Canal-Adapter

123456docker run --name canal-adapter -p 8081:8081 \-v /data/canal-adapter/conf/application.yml:/opt/canal-adapter/conf/application.yml \-v /data/canal-adapter/conf/bootstrap.yml:/opt/canal-adapter/conf/bootstrap.yml \-v /data/canal-adapter/conf/es7:/opt/canal-adapter/conf/es7 \-v /data/canal-adapter/logs:/opt/canal-adapter/logs \-d canal/canal-adapter:v1.1.7

部署Canal三件套到此结束啦,内容可能有点多,需要认真理解!