https://blog.csdn.net/Carson073/article/details/124356702
1、maven
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink-cdc-demo</artifactId>
<version>2.0-SNAPSHOT</version>
<properties>
<flink.version>1.13.6</flink.version>
<scala.binary.version>2.11</scala.binary.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.4.0</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>mysql</groupId>-->
<!-- <artifactId>mysql-connector-java</artifactId>-->
<!-- <version>5.1.47</version>-->
<!-- <scope>provided</scope>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.postgresql</groupId>-->
<!-- <artifactId>postgresql</artifactId>-->
<!-- <version>42.2.23</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-table-planner-blink_2.12</artifactId>-->
<!-- <version>${flink-version}</version>-->
<!-- </dependency>-->
<!--<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>2.0.2</version>
</dependency>-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.75</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!--如果要打包的话,这里要换成对应的 main class-->
<!-- <mainClass>com.flink.cdc.demo.MysqlCdcMysql</mainClass>-->
<mainClass>com.flink.cdc.demo.MySqlBinlogCdcMySql</mainClass>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
<filters>
<filter>
<artifact>*:*:*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>6</source>
<target>6</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
2、biglog的自定义序列化类
package com.flink.cdc.demo;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class CustomDebeziumDeserializationSchema implements DebeziumDeserializationSchema<JSONObject> {
private static final Logger LOGGER = LoggerFactory.getLogger(CustomDebeziumDeserializationSchema.class);
private static final long serialVersionUID = 7906905121308228264L;
public CustomDebeziumDeserializationSchema() {
}
/**
* 新增:SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000220, pos=16692, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test_hqh.mysql_cdc_person', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_binlog_source.test_hqh.mysql_cdc_person.Key:STRUCT}, value=Struct{after=Struct{id=2,name=JIM,sex=male},source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=test_hqh,table=mysql_cdc_person,server_id=0,file=mysql-bin.000220,pos=16692,row=0},op=c,ts_ms=1603357255749}, valueSchema=Schema{mysql_binlog_source.test_hqh.mysql_cdc_person.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
* 更新:SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1603357705, file=mysql-bin.000220, pos=22964, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.test_hqh.mysql_cdc_person', kafkaPartition=null, key=Struct{id=8}, keySchema=Schema{mysql_binlog_source.test_hqh.mysql_cdc_person.Key:STRUCT}, value=Struct{before=Struct{id=8,name=TOM,sex=male},after=Struct{id=8,name=Lucy,sex=female},source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1603357705000,db=test_hqh,table=mysql_cdc_person,server_id=1,file=mysql-bin.000220,pos=23109,row=0,thread=41},op=u,ts_ms=1603357705084}, valueSchema=Schema{mysql_binlog_source.test_hqh.mysql_cdc_person.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
* 删除:SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1603357268, file=mysql-bin.000220, pos=18510, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.test_hqh.mysql_cdc_person', kafkaPartition=null, key=Struct{id=7}, keySchema=Schema{mysql_binlog_source.test_hqh.mysql_cdc_person.Key:STRUCT}, value=Struct{before=Struct{id=7,name=TOM,sex=male},source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1603357268000,db=test_hqh,table=mysql_cdc_person,server_id=1,file=mysql-bin.000220,pos=18655,row=0,thread=41},op=d,ts_ms=1603357268728}, valueSchema=Schema{mysql_binlog_source.test_hqh.mysql_cdc_person.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
*
* @param sourceRecord sourceRecord
* @param collector out
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<JSONObject> collector) {
JSONObject resJson = new JSONObject();
try {
Struct valueStruct = (Struct) sourceRecord.value();
Struct afterStruct = valueStruct.getStruct("after");
Struct beforeStruct = valueStruct.getStruct("before");
// 注意:若valueStruct中只有after,则表明插入;若只有before,说明删除;若既有before,也有after,则代表更新
if (afterStruct != null && beforeStruct != null) {
// 修改
System.out.println("Updating >>>>>>>");
LOGGER.info("Updated, ignored ...");
} else if (afterStruct != null) {
// 插入
System.out.println("Inserting >>>>>>>");
List<Field> fields = afterStruct.schema().fields();
String name;
Object value;
for (Field field : fields) {
name = field.name();
value = afterStruct.get(name);
resJson.put(name, value);
}
} else if (beforeStruct != null) {
// 删除
System.out.println("Deleting >>>>>>>");
LOGGER.info("Deleted, ignored ...");
} else {
System.out.println("No this operation ...");
LOGGER.warn("No this operation ...");
}
} catch (Exception e) {
System.out.println("Deserialize throws exception:");
LOGGER.error("Deserialize throws exception:", e);
}
collector.collect(resJson);
}
@Override
public TypeInformation<JSONObject> getProducedType() {
return BasicTypeInfo.of(JSONObject.class);
}
}
3、Writer
package com.flink.cdc.demo;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class MysqlBinlogWriter extends RichSinkFunction<JSONObject> {
private static final Logger LOGGER = LoggerFactory.getLogger(MysqlBinlogWriter.class);
private Connection connection = null;
private PreparedStatement ps = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
if (connection == null) {
Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动
connection = DriverManager.getConnection("jdbc:mysql://10.252.92.4:3306", "root", "root");//获取连接
}
ps = connection.prepareStatement("insert into ods_flink_cdc_test.ods_t_test values (?,?,?)");
}
@Override
public void invoke(JSONObject value, Context context) throws Exception {
// 获取binlog
try {
Integer id = (Integer) value.get("id");
String username = (String) value.get("username");
String password = (String) value.get("password");
ps.setInt(1, id);
ps.setString(2, username);
ps.setString(3, password);
ps.executeUpdate();
LOGGER.info(ps.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void close() throws Exception {
super.close();
if (ps != null) {
ps.close();
}
if (connection != null) {
connection.close();
}
super.close();
}
}
4、主类
package com.flink.cdc.demo;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class MySqlBinlogCdcMySql {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SourceFunction<JSONObject> sourceFunction = MySQLSource.<JSONObject>builder()
.hostname("10.252.92.4")
.port(3306)
.databaseList("flink_cdc_test") //订阅的库
.username("root")
.password("root")
.deserializer(new CustomDebeziumDeserializationSchema())
.build();
DataStreamSource<JSONObject> dataStream = env.addSource(sourceFunction);
dataStream.print();
dataStream.addSink(new MysqlBinlogWriter());
env.execute();
}
}
5、打包到Flink服务器
成功! 老样子 版本要看好 并行度别在程序里设定了。
作者:Jeebiz 创建时间:2022-10-16 00:53
更新时间:2024-07-10 22:56
更新时间:2024-07-10 22:56