Canal 配置和启动
Canal Server下载
- 官方文档:https://github.com/alibaba/canal/wiki
- 项目地址:https://github.com/alibaba/canal
- 下载地址:https://github.com/alibaba/canal/releases
进入下载地址,选择 canal.deployer-1.1.5.tar.gz
Canal Server配置
需要配置的东西就两项,一个是监听数据库配置,另一个是 RocketMQ 连接配置。
改动的两个文件分别是 Canal 配置文件 canal.properties
和 实例配置文件 instance.properties
注意:一个 Server 可以配置多个实例监听 ,Canal 功能默认自带的有个 example 实例,本篇就用 example 实例 。如果增加实例,复制 example 文件夹内容到同级目录下,然后在 canal.properties 指定添加实例的名称。
配置 Canal 服务方式为 RocketMQ 和 并配置 RocketMQ 连接信息:
# tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rocketMQ
##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 192.168.3.64:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag =
修改 instance 配置文件 conf/example/instance.properties ,配置监听数据库信息:
# MySQL 地址 + 端口
canal.instance.master.address=host:port
canal.instance.dbUsername=xxxx
canal.instance.dbPassword=xxxx
# 对应到 RocketMQ 的话是 Topic
canal.mq.topic=example
SpringBoot 整合 Canal + RocketMQ
引入依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
RabbitMQ 连接配置
rocketmq:
name-server: 127.0.0.1:9876
# 是否开启自动配置
producer:
enable-msg-trace: true
group: "service-producer"
# 消息最大长度 默认 1024 * 4 (4M)
max-message-size: 4096
# 发送消息超时时间,默认 3000
send-message-timeout: 3000
# 发送消息失败重试次数,默认2
retry-times-when-send-failed: 2
retry-times-when-send-async-failed: 2
RabbitMQ 监听同步缓存
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Employee {
private Long id;
private String username;
}
@AllArgsConstructor
@NoArgsConstructor
@Data
public class CanalSynDto {
private List<Employee> data;
private String database;
private String table;
private String type;
//省略了一些不重要的内容
}
@Slf4j
@Component
//对应了canal的instance.properties 中的canal.mq.topic=example
@RocketMQMessageListener(topic = "example", //TOPIC主题,
selectorExpression="*" //tag标签
,consumerGroup = "canal-syn-consumer"
,messageModel = MessageModel.CLUSTERING
)
public class CanalSynListenner implements RocketMQListener<MessageExt> {
//注入Redis API
@Autowired
private RedisTemplate<Object,Object> redisTemplate;
@Override
public void onMessage(MessageExt message) {
try {
//拿到MQ中的消息内容
String json = new String(message.getBody(), "utf-8");
//把数据转为实体类
CanalSynDto canalSynDto = JSON.parseObject(json, CanalSynDto.class);
log.info("canal同步 {}", canalSynDto);
//如果是INSERT或者UPDATE,直接往Redis添加
if(canalSynDto.getType().equals("INSERT") || canalSynDto.getType().equals("UPDATE")){
//insert就添加,update就覆盖
canalSynDto.getData().forEach(employee -> {
//以 ID为key,把对象存储到Redis中
redisTemplate.opsForValue().set("ID:"+employee.getId(),employee);
});
//删除命令
}else if (canalSynDto.getType().equals("DELETE")){
canalSynDto.getData().forEach(employee -> {
//以 ID为key,把对象从Redis中删除
redisTemplate.delete("ID:"+employee.getId());
});
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
https://blog.csdn.net/u014494148/article/details/123336787
作者:Jeebiz 创建时间:2023-07-11 20:41
最后编辑:Jeebiz 更新时间:2024-07-10 22:56
最后编辑:Jeebiz 更新时间:2024-07-10 22:56