RabbitMQ 队列创建

添加交换机 canal.exchange

添加队列 canal.queue

队列绑定交换机


Canal 配置和启动

Canal Server下载

进入下载地址,选择 canal.deployer-1.1.5.tar.gz

Canal Server配置

需要配置的东西就两项,一个是监听数据库配置,另一个是 RabbitMQ 连接配置。
改动的两个文件分别是 Canal 配置文件 canal.properties 和 实例配置文件 instance.properties

注意:一个 Server 可以配置多个实例监听 ,Canal 功能默认自带的有个 example 实例,本篇就用 example 实例 。如果增加实例,复制 example 文件夹内容到同级目录下,然后在 canal.properties 指定添加实例的名称。

配置 Canal 服务方式为 RabbitMQ 和 并配置RabbitMQ 连接信息:

# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rabbitMQ

##################################################
#########             RabbitMQ         #############
##################################################
rabbitmq.host = 192.168.3.20
rabbitmq.virtual.host = /
rabbitmq.exchange = canal.exchange
rabbitmq.username = admin
rabbitmq.password = admin
rabbitmq.deliveryMode =

修改 instance 配置文件 conf/example/instance.properties ,配置监听数据库信息:

# MySQL 地址 + 端口
canal.instance.master.address=host:port
canal.instance.dbUsername=xxxx
canal.instance.dbPassword=xxxx
# 对应到 RabbitMQ 的话是 Routing key
canal.mq.topic=canal.routing.key

SpringBoot 整合 Canal + RabbitMQ

引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

RabbitMQ 连接配置

spring:
  rabbitmq:
    host: x.youlai.tech
    port: 5672
    username: guest
    password: guest

RabbitMQ 监听同步缓存

/**
 * Canal + RabbitMQ 监听数据库数据变化
 */
@Component
@Slf4j
@RequiredArgsConstructor
public class CanalListener {
    private final ISysPermissionService permissionService;
    private final ISysOauthClientService oauthClientService;
    private final ISysMenuService menuService;
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "canal.queue", durable = "true"),
                    exchange = @Exchange(value = "canal.exchange"),
                    key = "canal.routing.key"
            )
    })
    public void handleDataChange(String message) {
        CanalMessage canalMessage = JSONUtil.toBean(message, CanalMessage.class);
        String tableName = canalMessage.getTable();
        log.info("Canal 监听 {} 发生变化;明细:{}", tableName, message);
        if ("sys_oauth_client".equals(tableName)) {
            log.info("======== 清除客户端信息缓存 ========");
            oauthClientService.cleanCache();
        } else if (Arrays.asList("sys_permission", "sys_role", "sys_role_permission").contains(tableName)) {
            log.info("======== 刷新角色权限缓存 ========");
            permissionService.refreshPermRolesRules();
        } else if (Arrays.asList("sys_menu", "sys_role", "sys_role_menu").contains(tableName)) {
            log.info("======== 清理菜单路由缓存 ========");
            menuService.cleanCache();
        }
    }
}

实战测试

如果使用有来项目线上 RabbitMQ 测试,记得需要新建队列,否者多人消费同一队列会让你觉得 Canal 监听数据有丢失的现象。

接下来模拟测试,当直接在数据库修改菜单数据,能否让 Redis 的菜单路由缓存失效。

启动 Canal
切换到项目的 cd ./middleware/canal/deployer/bin 目录下,输入 startup 启动 Canal

https://developer.aliyun.com/article/1268867

作者:Jeebiz  创建时间:2023-07-11 20:36
最后编辑:Jeebiz  更新时间:2024-07-10 22:56