Canal将mysql变化数据投送(同步)到MQ 清单 1 2 3 4 5 6 192.168.8.221 bigdata1 192.168.8.222 bigdata2 192.168.8.223 bigdata3 canal.admin-1.1.5.tar.gz canal.deployer-1.1.5.tar.gz
集群安装前提 java 1.8 (安装略)
zookeeper (安装略)
mysql(安装略)
mysql配置 1 2 3 4 5 6 7 mysql - u root - p < / data/ canal/ admin/ conf/ canal_manager.sql create user 'canal' @'%' IDENTIFIED BY 'Abc123!!!' ;grant ALL PRIVILEGES ON canal_manager.* TO 'canal' @'%' ;flush privileges;
需复制的mysql配置 1 2 3 4 需要给canal用户赋予复制的权限 create user 'canal' @'%' identified WITH mysql_native_password by 'Abc123@!!!' ;GRANT SELECT , REPLICATION SLAVE, REPLICATION CLIENT ON * .* TO 'canal' @'%' ;flush privileges;
安装admin 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 vi conf/application.yml server: port: 18089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: 192.168.8.211:3306 database: canal_manager username: canal password: Abc123!!! driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address} /${spring.datasource.database} ?useUnicode=true &characterEncoding=UTF-8&useSSL=false &allowPublicKeyRetrieval=true hikari: maximum-pool-size: 30 minimum-idle: 1 canal: adminUser: admin adminPasswd: 123456 ./startup_admin.sh cat startup_admin.sh export JAVA_HOME=/data/soft/jdk1.8.0_301export PATH=$JAVA_HOME /bin:$PATH export CLASSPATH=.:$JAVA_HOME /lib/dt.jar:$JAVA_HOME /lib/tools.jar/data/canal/admin/bin/startup.sh
登录控制台 http://192.168.8.211 admin/123456
新建一个集群
配置并启动deployer 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 vi deployer/conf/canal_local.properties canal.register.ip = canal.admin.manager = 192.168.8.221:18089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9 canal.admin.register.auto = true canal.admin.register.cluster = canal-cluster canal.admin.register.name = export JAVA_HOME=/data/soft/jdk1.8.0_301export PATH=$JAVA_HOME /bin:$PATH export CLASSPATH=.:$JAVA_HOME /lib/dt.jar:$JAVA_HOME /lib/tools.jar/data/canal/deployer/bin/startup.sh local
控制台配置server(集群配置) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 canal.ip =canal.register.ip = canal.port = 11111 canal.metrics.pull.port = 11112 canal.admin.manager = 192.168.8.221:18089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9 canal.admin.register.auto = true canal.admin.register.cluster = canal-cluster canal.zkServers = 192.168.8.221:12181,192.168.8.222:12181,192.168.8.223:12181 canal.zookeeper.flush.period = 1000 canal.withoutNetty = false canal.serverMode = rocketMQ canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 canal.instance.memory.buffer.size = 16384 canal.instance.memory.buffer.memunit = 1024 canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.rawEntry = true canal.instance.detecting.enable = false canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false canal.instance.transaction.size = 1024 canal.instance.fallbackIntervalInSeconds = 60 canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 canal.instance.filter.druid.ddl = true canal.instance.filter.query.dcl = true canal.instance.filter.query.dml = true canal.instance.filter.query.ddl = true canal.instance.filter.table.error = true canal.instance.filter.rows = false canal.instance.filter.transaction.entry = true canal.instance.filter.dml.insert = false canal.instance.filter.dml.update = false canal.instance.filter.dml.delete = false canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB canal.instance.get.ddl.isolation = false canal.instance.parser.parallel = true canal.instance.parser.parallelBufferSize = 256 canal.instance.tsdb.enable = false canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:} canal.instance.tsdb.dbUsername = canal canal.instance.tsdb.dbPassword = canal canal.instance.tsdb.snapshot.interval = 24 canal.instance.tsdb.snapshot.expire = 360 canal.destinations = canal.conf.dir = ../conf canal.auto.scan = true canal.auto.scan.interval = 5 canal.auto.reset.latest.pos.mode = false canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml canal.instance.global.mode = spring canal.instance.global.lazy = false canal.instance.global.manager.address = ${canal.admin.manager} canal.instance.global.spring.xml = classpath:spring/default-instance.xml canal.aliyun.accessKey =canal.aliyun.secretKey =canal.aliyun.uid =canal.mq.flatMessage = true canal.mq.canalBatchSize = 50 canal.mq.canalGetTimeout = 100 canal.mq.accessChannel = local canal.mq.database.hash = true canal.mq.send.thread.size = 30 canal.mq.build.thread.size = 8 rocketmq.producer.group = mysql-canal-mq rocketmq.enable.message.trace = false rocketmq.customized.trace.topic =rocketmq.namespace =rocketmq.namesrv.addr = hxkj07:9876 rocketmq.retry.times.when.send.failed = 0 rocketmq.vip.channel.enabled = false rocketmq.tag =
创建并配置instance 控制台新建一个instance,配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 ################################################# ## mysql serverId , v1.0.26+ will autoGen # canal.instance.mysql.slaveId=0 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=hxkj07:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=false #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=Hxkj2022!!! canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex canal.instance.filter.regex=front_telematics.machinery_implement_combination_record,baseinfo.t_implement_sku,baseinfo.t_device_sku,iot.t_device,front_telematics.t_bg_agronomy_operate,baseinfo.t_fault_definittion_description # table black regex canal.instance.filter.black.regex= # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic=mysql_canal_mq # dynamic topic route by schema or table regex #canal.mq.dynamicTopic= canal.mq.partition=0 # hash partition config canal.mq.partitionsNum=4 canal.mq.partitionHash=.*\\..*:$pk$,.*\\..*:id #canal.mq.partitionHash=test.table:id^name,.*\\..* #################################################
检查mq 所有配置正常情况下,修改目标mysql,就能在rocketmq看到相应的message
错误解决 1 2 Caused by: java.io.IOException: caching_sha2_password Auth failed alter user 'canal' @'%' identified with mysql_native_password by 'Abc123!!!' ;
老年佛系运维 | biglovewheat@126.com