修改 canal.properties 文件,配置输出到 Kafka,有以下几处要改:

# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = kafka

##################################################
#########              Kafka              #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0

kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"

接下来修改 instance.properties 文件:

# MySQL 地址 + 端口
canal.instance.master.address=host:port
canal.instance.dbUsername=xxxx
canal.instance.dbPassword=xxxx
# mq config
# 对应到 RabbitMQ 的是 Routing key
canal.mq.topic=canal-routing-key
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\..*
作者:Jeebiz  创建时间:2023-07-11 20:40
最后编辑:Jeebiz  更新时间:2024-07-10 22:56