监听 MySQL 数据库数据变更

1、mysql 需要开启 bin_log 且 binlog_format=row

log-bin=mysql-bin
binlog_format=row
server-id = 1
binlog_row_image=FULL

#账户授权
grant process on *.* to 数据库用户名;
GRANT SELECT, SHOW VIEW, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO '数据库用户名'@'%';
flush privileges;

2、sqlserver 需要开启cdc模式

-- 开启cdc
USE YSPT  
GO  
EXEC sys.sp_cdc_enable_db  
GO 
--  配置具体的表
USE YSPT
GO
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name   = 'xuenian_info',
@role_name     = NULL,
@supports_net_changes = 1
GO
--  查询映射表
EXEC sys.sp_cdc_help_change_data_capture
-- 删除表映射
EXEC sys.sp_cdc_disable_table
    @source_schema = 'dbo',
    @source_name   =  'xueqi_info',
    @capture_instance = 'dbo.xueqi_info_CT'
    GO
-- 删除所有映射
USE YSPT
GO  
EXEC sys.sp_cdc_disable_db  
GO  
-- 查询开启cdc 的表
SELECT name,is_tracked_by_cdc FROM sys.tables WHERE is_tracked_by_cdc = 1;
-- 查询开启cdc 的库
SELECT name,is_cdc_enabled FROM sys.databases WHERE is_cdc_enabled = 1;

安装 kafka 3.6.0 新版本的kafka 已包含zookeeper 直接启动(已安装,可忽略)

下载连接器插件 https://repo1.maven.org/maven2/io/debezium/ 2.4.1 版本 解压至 kafka/plugins 下

新增plugins 文件夹,修改 connect-distributed.properties plugin.path

zookeeper-server-start ..\..\config\zookeeper.properties
kafka-server-start ..\..\config\server.properties
connect-distributed ..\..\config\connect-distributed.properties

创建连接器

启动 connect-distributed 会开启对外rest服务端口8083,可动态注册connectors

GET /connectors – 返回所有正在运行的connector名。
POST /connectors – 新建一个connector; 请求体必须是json格式并且需要包含name字段和config字段,name是connector的名字,config是json格式,必须包含你的connector的配置信息。
GET /connectors/{name} – 获取指定connetor的信息。
GET /connectors/{name}/config – 获取指定connector的配置信息。
PUT /connectors/{name}/config – 更新指定connector的配置信息。
GET /connectors/{name}/status – 获取指定connector的状态,包括它是否在运行、停止、或者失败,如果发生错误,还会列出错误的具体信息。
GET /connectors/{name}/tasks – 获取指定connector正在运行的task。
GET /connectors/{name}/tasks/{taskid}/status – 获取指定connector的task的状态信息。
PUT /connectors/{name}/pause – 暂停connector和它的task,停止数据处理知道它被恢复。
PUT /connectors/{name}/resume – 恢复一个被暂停的connector。
POST /connectors/{name}/restart – 重启一个connector,尤其是在一个connector运行失败的情况下比较常用
POST /connectors/{name}/tasks/{taskId}/restart – 重启一个task,一般是因为它运行失败才这样做。
DELETE /connectors/{name} – 删除一个connector,停止它的所有task并删除配置。
//源文地址: https://www.cnblogs.com/EminemJK/p/14688907.html

sqlserver 示例 https://debezium.io/documentation/reference/2.4/connectors/sqlserver.html

通过 POST 请求 /connectors 接口,请求内容如下,请根据你的实际情况进行调整。

{
    "name": "sst-sqlserver-connector",
    "config": {
        "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector",
        "database.hostname": "192.168.3.194",
        "database.port": "1433",
        "database.user": "hzty_web",
        "database.password": "hztychenminghua88865747",
        "database.names": "YSPT",
        "topic.prefix": "sst.table.name",
                "table.include.list": "dbo.yonghu_school,dbo.yonghu_info,dbo.jiaoshi_info,dbo.xuesheng_info,dbo.jiazhang_info,dbo.nianji_info,dbo.xuenianbanji_info,dbo.xueshengxuenian_info,dbo.xuenian_info",
          "skip.messages.without.change":true,
          "include.schema.changes": "false",
        "schema.history.internal.kafka.bootstrap.servers": "192.168.3.73:9092,192.168.3.74:9092,192.168.3.75:9092",
        "schema.history.internal.kafka.topic": "sst.schemahistory.fullfillment",
        "database.encrypt":false
    }
}

mysql示例 https://debezium.io/documentation/reference/2.4/connectors/mysql.html

通过 POST 请求 /connectors 接口,请求内容如下,请根据你的实际情况进行调整。

{
    "name": "standard-mysql-connector",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.hostname": "192.168.3.91",
        "database.port": "13306",
        "database.user": "kafka",
        "database.password": "123456",
        "database.server.id": "184054",
        "database.server.name": "数据库名称",
        "topic.prefix": "standard.table.name",
        "database.include.list": "kafka_test",
        "schema.history.internal.kafka.bootstrap.servers": "192.168.3.73:9092,192.168.3.74:9092,192.168.3.75:9092",
        "schema.history.internal.kafka.topic": "standard.schemahistory.fullfillment",
        "include.schema.changes": "false",
        "table.include.list": "bmp_standard.standard_school,bmp_standard.standard_area,bmp_standard.standard_teacher,bmp_standard.standard_student,bmp_standard.standard_user,bmp_standard.standard_class_student,bmp_standard.standard_grade,bmp_standard.standard_class,bmp_standard.standard_term",
        "skip.messages.without.change": "false"
    }
}

监听成功后会自动创建每张表变更的 topic topic.prefix+表名

kafka-topics --list --bootstrap-server localhost:9092

查看客户端消息

kafka-console-consumer --bootstrap-server localhost:9092 --consumer-property group.id=group1 --consumer-property client.id=consumer-1  --topic sst.table.name.YSPT.dbo.yonghu_school
作者:Jeebiz  创建时间:2024-09-23 09:58
最后编辑:Jeebiz  更新时间:2024-11-01 10:06