https://blog.csdn.net/Carson073/article/details/124356702

1、maven

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink-cdc-demo</artifactId>
    <version>2.0-SNAPSHOT</version>

    <properties>
        <flink.version>1.13.6</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.4.0</version>
        </dependency>
        <!--        <dependency>-->
        <!--            <groupId>mysql</groupId>-->
        <!--            <artifactId>mysql-connector-java</artifactId>-->
        <!--            <version>5.1.47</version>-->
        <!--            <scope>provided</scope>-->
        <!--        </dependency>-->
        <!--        <dependency>-->
        <!--            <groupId>org.postgresql</groupId>-->
        <!--            <artifactId>postgresql</artifactId>-->
        <!--            <version>42.2.23</version>-->
        <!--        </dependency>-->
        <!--        <dependency>-->
        <!--            <groupId>org.apache.flink</groupId>-->
        <!--            <artifactId>flink-table-planner-blink_2.12</artifactId>-->
        <!--            <version>${flink-version}</version>-->
        <!--        </dependency>-->
        <!--<dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.0.2</version>
        </dependency>-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <createDependencyReducedPom>false</createDependencyReducedPom>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>

                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!--如果要打包的话,这里要换成对应的 main class-->
                                    <!--                                    <mainClass>com.flink.cdc.demo.MysqlCdcMysql</mainClass>-->
                                    <mainClass>com.flink.cdc.demo.MySqlBinlogCdcMySql</mainClass>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                            <filters>
                                <filter>
                                    <artifact>*:*:*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>6</source>
                    <target>6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2、biglog的自定义序列化类

package com.flink.cdc.demo;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;


public class CustomDebeziumDeserializationSchema implements DebeziumDeserializationSchema<JSONObject> {

    private static final Logger LOGGER = LoggerFactory.getLogger(CustomDebeziumDeserializationSchema.class);

    private static final long serialVersionUID = 7906905121308228264L;

    public CustomDebeziumDeserializationSchema() {
    }

    /**
     * 新增:SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={file=mysql-bin.000220, pos=16692, row=1, snapshot=true}} ConnectRecord{topic='mysql_binlog_source.test_hqh.mysql_cdc_person', kafkaPartition=null, key=Struct{id=2}, keySchema=Schema{mysql_binlog_source.test_hqh.mysql_cdc_person.Key:STRUCT}, value=Struct{after=Struct{id=2,name=JIM,sex=male},source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=0,snapshot=true,db=test_hqh,table=mysql_cdc_person,server_id=0,file=mysql-bin.000220,pos=16692,row=0},op=c,ts_ms=1603357255749}, valueSchema=Schema{mysql_binlog_source.test_hqh.mysql_cdc_person.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
     * 更新:SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1603357705, file=mysql-bin.000220, pos=22964, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.test_hqh.mysql_cdc_person', kafkaPartition=null, key=Struct{id=8}, keySchema=Schema{mysql_binlog_source.test_hqh.mysql_cdc_person.Key:STRUCT}, value=Struct{before=Struct{id=8,name=TOM,sex=male},after=Struct{id=8,name=Lucy,sex=female},source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1603357705000,db=test_hqh,table=mysql_cdc_person,server_id=1,file=mysql-bin.000220,pos=23109,row=0,thread=41},op=u,ts_ms=1603357705084}, valueSchema=Schema{mysql_binlog_source.test_hqh.mysql_cdc_person.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
     * 删除:SourceRecord{sourcePartition={server=mysql_binlog_source}, sourceOffset={ts_sec=1603357268, file=mysql-bin.000220, pos=18510, row=1, server_id=1, event=2}} ConnectRecord{topic='mysql_binlog_source.test_hqh.mysql_cdc_person', kafkaPartition=null, key=Struct{id=7}, keySchema=Schema{mysql_binlog_source.test_hqh.mysql_cdc_person.Key:STRUCT}, value=Struct{before=Struct{id=7,name=TOM,sex=male},source=Struct{version=1.2.1.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1603357268000,db=test_hqh,table=mysql_cdc_person,server_id=1,file=mysql-bin.000220,pos=18655,row=0,thread=41},op=d,ts_ms=1603357268728}, valueSchema=Schema{mysql_binlog_source.test_hqh.mysql_cdc_person.Envelope:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
     *
     * @param sourceRecord sourceRecord
     * @param collector    out
     */
    @Override
    public void deserialize(SourceRecord sourceRecord, Collector<JSONObject> collector) {
        JSONObject resJson = new JSONObject();
        try {
            Struct valueStruct = (Struct) sourceRecord.value();
            Struct afterStruct = valueStruct.getStruct("after");
            Struct beforeStruct = valueStruct.getStruct("before");
            // 注意:若valueStruct中只有after,则表明插入;若只有before,说明删除;若既有before,也有after,则代表更新
            if (afterStruct != null && beforeStruct != null) {
                // 修改
                System.out.println("Updating >>>>>>>");
                LOGGER.info("Updated, ignored ...");
            } else if (afterStruct != null) {
                // 插入
                System.out.println("Inserting >>>>>>>");
                List<Field> fields = afterStruct.schema().fields();
                String name;
                Object value;
                for (Field field : fields) {
                    name = field.name();
                    value = afterStruct.get(name);
                    resJson.put(name, value);
                }
            } else if (beforeStruct != null) {
                // 删除
                System.out.println("Deleting >>>>>>>");
                LOGGER.info("Deleted, ignored ...");
            } else {
                System.out.println("No this operation ...");
                LOGGER.warn("No this operation ...");
            }
        } catch (Exception e) {
            System.out.println("Deserialize throws exception:");
            LOGGER.error("Deserialize throws exception:", e);
        }
        collector.collect(resJson);
    }

    @Override
    public TypeInformation<JSONObject> getProducedType() {
        return BasicTypeInfo.of(JSONObject.class);
    }
}

3、Writer

package com.flink.cdc.demo;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

public class MysqlBinlogWriter extends RichSinkFunction<JSONObject> {
    private static final Logger LOGGER = LoggerFactory.getLogger(MysqlBinlogWriter.class);

    private Connection connection = null;
    private PreparedStatement ps = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        if (connection == null) {
            Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动
            connection = DriverManager.getConnection("jdbc:mysql://10.252.92.4:3306", "root", "root");//获取连接
        }
        ps = connection.prepareStatement("insert into ods_flink_cdc_test.ods_t_test values (?,?,?)");
    }

    @Override
    public void invoke(JSONObject value, Context context) throws Exception {
        // 获取binlog
        try {
            Integer id = (Integer) value.get("id");
            String username = (String) value.get("username");
            String password = (String) value.get("password");
            ps.setInt(1, id);
            ps.setString(2, username);
            ps.setString(3, password);
            ps.executeUpdate();
            LOGGER.info(ps.toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (ps != null) {
            ps.close();
        }
        if (connection != null) {
            connection.close();
        }
        super.close();
    }
}

4、主类

package com.flink.cdc.demo;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

public class MySqlBinlogCdcMySql {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        SourceFunction<JSONObject> sourceFunction = MySQLSource.<JSONObject>builder()
                .hostname("10.252.92.4")
                .port(3306)
                .databaseList("flink_cdc_test") //订阅的库
                .username("root")
                .password("root")
                .deserializer(new CustomDebeziumDeserializationSchema())
                .build();

        DataStreamSource<JSONObject> dataStream = env.addSource(sourceFunction);
        dataStream.print();
        dataStream.addSink(new MysqlBinlogWriter());
        env.execute();
    }
}

5、打包到Flink服务器

成功! 老样子 版本要看好 并行度别在程序里设定了。

作者:Jeebiz  创建时间:2022-10-16 00:53
 更新时间:2024-07-10 22:56