RabbitMQ 队列创建
添加交换机 canal.exchange
添加队列 canal.queue
队列绑定交换机
Canal 配置和启动
Canal Server下载
- 官方文档:https://github.com/alibaba/canal/wiki
- 项目地址:https://github.com/alibaba/canal
- 下载地址:https://github.com/alibaba/canal/releases
进入下载地址,选择 canal.deployer-1.1.5.tar.gz
Canal Server配置
需要配置的东西就两项,一个是监听数据库配置,另一个是 RabbitMQ 连接配置。
改动的两个文件分别是 Canal 配置文件 canal.properties
和 实例配置文件 instance.properties
注意:一个 Server 可以配置多个实例监听 ,Canal 功能默认自带的有个 example 实例,本篇就用 example 实例 。如果增加实例,复制 example 文件夹内容到同级目录下,然后在 canal.properties 指定添加实例的名称。
配置 Canal 服务方式为 RabbitMQ 和 并配置RabbitMQ 连接信息:
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rabbitMQ
##################################################
######### RabbitMQ #############
##################################################
rabbitmq.host = 192.168.3.20
rabbitmq.virtual.host = /
rabbitmq.exchange = canal.exchange
rabbitmq.username = admin
rabbitmq.password = admin
rabbitmq.deliveryMode =
修改 instance 配置文件 conf/example/instance.properties ,配置监听数据库信息:
# MySQL 地址 + 端口
canal.instance.master.address=host:port
canal.instance.dbUsername=xxxx
canal.instance.dbPassword=xxxx
# 对应到 RabbitMQ 的话是 Routing key
canal.mq.topic=canal.routing.key
SpringBoot 整合 Canal + RabbitMQ
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
RabbitMQ 连接配置
spring:
rabbitmq:
host: x.youlai.tech
port: 5672
username: guest
password: guest
RabbitMQ 监听同步缓存
/**
* Canal + RabbitMQ 监听数据库数据变化
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class CanalListener {
private final ISysPermissionService permissionService;
private final ISysOauthClientService oauthClientService;
private final ISysMenuService menuService;
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "canal.queue", durable = "true"),
exchange = @Exchange(value = "canal.exchange"),
key = "canal.routing.key"
)
})
public void handleDataChange(String message) {
CanalMessage canalMessage = JSONUtil.toBean(message, CanalMessage.class);
String tableName = canalMessage.getTable();
log.info("Canal 监听 {} 发生变化;明细:{}", tableName, message);
if ("sys_oauth_client".equals(tableName)) {
log.info("======== 清除客户端信息缓存 ========");
oauthClientService.cleanCache();
} else if (Arrays.asList("sys_permission", "sys_role", "sys_role_permission").contains(tableName)) {
log.info("======== 刷新角色权限缓存 ========");
permissionService.refreshPermRolesRules();
} else if (Arrays.asList("sys_menu", "sys_role", "sys_role_menu").contains(tableName)) {
log.info("======== 清理菜单路由缓存 ========");
menuService.cleanCache();
}
}
}
实战测试
如果使用有来项目线上 RabbitMQ 测试,记得需要新建队列,否者多人消费同一队列会让你觉得 Canal 监听数据有丢失的现象。
接下来模拟测试,当直接在数据库修改菜单数据,能否让 Redis 的菜单路由缓存失效。
启动 Canal
切换到项目的 cd ./middleware/canal/deployer/bin 目录下,输入 startup 启动 Canal
https://developer.aliyun.com/article/1268867
作者:Jeebiz 创建时间:2023-07-11 20:36
最后编辑:Jeebiz 更新时间:2024-07-10 22:56
最后编辑:Jeebiz 更新时间:2024-07-10 22:56