https://blog.csdn.net/Carson073/article/details/124344699

背景

基于select语句的Flink-CDC 适用于数据同步的全量同步的场景,可以结合 Azkaban 或者dolphin scheduler 做定时调度 T+1 数据同步。
1、maven

   <properties>
        <flink.version>1.13.6</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>1.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.20</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.75</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <createDependencyReducedPom>false</createDependencyReducedPom>
                </configuration>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>

                        <configuration>
                            <transformers>

                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!--如果要打包的话,这里要换成对应的 main class-->
                                    <mainClass>com.flink.cdc.demo.MysqlCdcMysql</mainClass>
                                </transformer>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                            <filters>
                                <filter>
                                    <artifact>*:*:*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>6</source>
                    <target>6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

2、MysqlReader

package com.flink.cdc.demo;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class MysqlReader extends RichSourceFunction<Tuple3<Integer, String, String>> {

    private Connection connection = null;
    private PreparedStatement ps = null;


    //该方法主要用于打开数据库连接,下面的ConfigKeys类是获取配置的类
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动
        connection = DriverManager.getConnection("jdbc:mysql://10.252.92.4:3306", "root", "root");//获取连接
        ps = connection.prepareStatement("select id,username,password from flink_cdc_test.t_test");
    }


    @Override
    public void run(SourceContext<Tuple3<Integer, String, String>> sourceContext) throws Exception {
        ResultSet resultSet = ps.executeQuery();
        while (resultSet.next()) {
            Tuple3<Integer, String, String> tuple = new Tuple3<Integer, String, String>();
            tuple.setFields(resultSet.getInt(1), resultSet.getString(2), resultSet.getString(3));
            sourceContext.collect(tuple);
        }
    }

    @Override
    public void cancel() {
        try {
            super.close();
            if (connection != null) {
                connection.close();
            }
            if (ps != null) {
                ps.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3、MysqlWriter

package com.flink.cdc.demo;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;



public class MysqlWriter extends RichSinkFunction<Tuple3<Integer, String, String>> {
    private Connection connection = null;
    private PreparedStatement ps = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        if (connection == null) {
            Class.forName("com.mysql.jdbc.Driver");//加载数据库驱动
            connection = DriverManager.getConnection("jdbc:mysql://10.252.92.4:3306", "root", "root");//获取连接
        }

        ps = connection.prepareStatement("insert into ods_flink_cdc_test.ods_t_test values (?,?,?)");
        System.out.println("完成");
    }

    @Override
    public void invoke(Tuple3<Integer, String, String> value, Context context) throws Exception {
        //获取JdbcReader发送过来的结果
        try {
            ps.setInt(1, value.f0);
            ps.setString(2, value.f1);
            ps.setString(3, value.f2);
            ps.executeUpdate();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (ps != null) {
            ps.close();
        }
        if (connection != null) {
            connection.close();
        }
        super.close();
    }
}

4、主类MysqlCdcMysql

package com.flink.cdc.demo;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class MysqlCdcMysql {
    public static void main(String[] args) throws Exception {
//        ExecutionEnvironment env  =  ExecutionEnvironment.createRemoteEnvironment("localhost",8081,"D:\\flink-steven\\target\\flink-0.0.1-SNAPSHOT.jar");

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 8081, "D:\\flink-steven\\target\\flink-0.0.1-SNAPSHOT.jar");
// 最好不要在程序设置并行度 如果设置了8 要保证安装的flink配置里面的parallelism这个参数大于8 不然会导致资源异常
//        env.setParallelism(8);
        DataStreamSource<Tuple3<Integer, String, String>> dataStream = env.addSource(new MysqlReader());
        dataStream.print();
        dataStream.addSink(new MysqlWriter());
        env.execute("Flink cost MySQL data to write MySQL");
    }
}

5、本地运行

6、打成jar包进行上传

注意:flink版本要和maven里的版本一致 scala版本也要保持一致

7、运行

作者:Jeebiz  创建时间:2022-10-16 00:48
 更新时间:2024-07-10 22:56