Canal 配置和启动

Canal Server下载

进入下载地址,选择 canal.deployer-1.1.5.tar.gz

Canal Server配置

需要配置的东西就两项,一个是监听数据库配置,另一个是 RocketMQ 连接配置。
改动的两个文件分别是 Canal 配置文件 canal.properties 和 实例配置文件 instance.properties

注意:一个 Server 可以配置多个实例监听 ,Canal 功能默认自带的有个 example 实例,本篇就用 example 实例 。如果增加实例,复制 example 文件夹内容到同级目录下,然后在 canal.properties 指定添加实例的名称。

配置 Canal 服务方式为 RocketMQ 和 并配置 RocketMQ 连接信息:

# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rocketMQ

##################################################
#########             RocketMQ         #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 192.168.3.64:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =

修改 instance 配置文件 conf/example/instance.properties ,配置监听数据库信息:

# MySQL 地址 + 端口
canal.instance.master.address=host:port
canal.instance.dbUsername=xxxx
canal.instance.dbPassword=xxxx
# 对应到 RocketMQ 的话是 Topic
canal.mq.topic=example

SpringBoot 整合 Canal + RocketMQ

引入依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
     <artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>

RabbitMQ 连接配置

rocketmq:
  name-server: 127.0.0.1:9876
  # 是否开启自动配置
  producer:
    enable-msg-trace: true
    group: "service-producer"
    # 消息最大长度 默认 1024 * 4 (4M)
    max-message-size: 4096
    # 发送消息超时时间,默认 3000
    send-message-timeout: 3000
    # 发送消息失败重试次数,默认2
    retry-times-when-send-failed: 2
    retry-times-when-send-async-failed: 2

RabbitMQ 监听同步缓存


@Data
@NoArgsConstructor
@AllArgsConstructor
public class Employee {
    private Long id;
    private String username;
}


@AllArgsConstructor
@NoArgsConstructor
@Data
public class CanalSynDto {
    private List<Employee> data;
    private String database;
    private String table;
    private String type;
    //省略了一些不重要的内容
}

@Slf4j
@Component
//对应了canal的instance.properties 中的canal.mq.topic=example
@RocketMQMessageListener(topic = "example", //TOPIC主题,
        selectorExpression="*" //tag标签
        ,consumerGroup = "canal-syn-consumer"
        ,messageModel = MessageModel.CLUSTERING
)
public class CanalSynListenner  implements RocketMQListener<MessageExt> {

    //注入Redis API
    @Autowired
    private RedisTemplate<Object,Object> redisTemplate;

    @Override
    public void onMessage(MessageExt message) {
        try {
            //拿到MQ中的消息内容
            String json = new String(message.getBody(), "utf-8");
            //把数据转为实体类
            CanalSynDto canalSynDto = JSON.parseObject(json, CanalSynDto.class);
            log.info("canal同步 {}", canalSynDto);
            //如果是INSERT或者UPDATE,直接往Redis添加
            if(canalSynDto.getType().equals("INSERT") || canalSynDto.getType().equals("UPDATE")){
                //insert就添加,update就覆盖
                canalSynDto.getData().forEach(employee -> {
                    //以  ID为key,把对象存储到Redis中
                    redisTemplate.opsForValue().set("ID:"+employee.getId(),employee);
                });
             //删除命令
            }else if (canalSynDto.getType().equals("DELETE")){
                canalSynDto.getData().forEach(employee -> {
                    //以  ID为key,把对象从Redis中删除
                    redisTemplate.delete("ID:"+employee.getId());
                });
            }

        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
}

https://blog.csdn.net/u014494148/article/details/123336787

作者:Jeebiz  创建时间:2023-07-11 20:41
最后编辑:Jeebiz  更新时间:2024-07-10 22:56