https://blog.csdn.net/Carson073/article/details/126303039
https://blog.csdn.net/carson073/category_11450049.html

1 快速介绍

GreenplumWriter插件实现了写入数据到 Greenplum Database 主库目的表的功能。在底层实现上,GreenplumWriter通过JDBC连接远程 Greenplum 数据库,并执行相应的 Copy FROM 语句将数据写入 Greenplum。

GreenplumWriter面向ETL开发工程师,他们使用GreenplumWriter从数仓导入数据到Greenplum。同时 GreenplumWriter亦可以作为数据迁移工具为DBA等用户提供服务。
2 实现原理

GreenplumWriter通过 DataX 框架获取 Reader 生成的协议数据,根据你配置生成相应的SQL插入语句

copy from ...

注意:
1. GreenplumWriter和MysqlWriter不同,不支持配置writeMode参数。

3 代码实现
1、根目录的代码

1、根目录的papackage.xml新增打包代码

        <fileSet>
            <directory>greenplumwriter/target/datax/</directory>
            <includes>
                <include>**/*.*</include>
            </includes>
            <outputDirectory>datax</outputDirectory>
        </fileSet>

2、根目录的pom.xml新增模块

    <module>greenplumwriter</module>

2、greenplumwriter模块下的代码

package.xml

    <assembly
            xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
        <id></id>
        <formats>
            <format>dir</format>
        </formats>
        <includeBaseDirectory>false</includeBaseDirectory>
        <fileSets>
            <fileSet>
                <directory>src/main/resources</directory>
                <includes>
                    <include>plugin.json</include>
                </includes>
                <outputDirectory>plugin/writer/greenplumwriter</outputDirectory>
            </fileSet>
            <fileSet>
                <directory>target/</directory>
                <includes>
                    <include>greenplumwriter-0.0.1-SNAPSHOT.jar</include>
                </includes>
                <outputDirectory>plugin/writer/greenplumwriter</outputDirectory>
            </fileSet>
        </fileSets>

        <dependencySets>
            <dependencySet>
                <useProjectArtifact>false</useProjectArtifact>
                <outputDirectory>plugin/writer/greenplumwriter/libs</outputDirectory>
                <scope>runtime</scope>
            </dependencySet>
        </dependencySets>
    </assembly>

CopyProcessor.java

    package com.alibaba.datax.plugin.writer.greenplumwriter;

    import com.alibaba.datax.common.element.Column;
    import com.alibaba.datax.common.element.Record;
    import com.alibaba.datax.common.exception.DataXException;
    import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
    import org.apache.commons.lang3.tuple.Triple;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.io.UnsupportedEncodingException;
    import java.sql.Types;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;

    public class CopyProcessor implements Callable<Long> {
        private static final char FIELD_DELIMITER = '|';
        private static final char NEWLINE = '\n';
        private static final char QUOTE = '"';
        private static final char ESCAPE = '\\';
        private static int MaxCsvSize = 4194304;
        private static final Logger LOG = LoggerFactory.getLogger(CopyProcessor.class);

        private int columnNumber;
        private CopyWriterTask task;
        private LinkedBlockingQueue<Record> queueIn;
        private LinkedBlockingQueue<byte[]> queueOut;
        private Triple<List<String>, List<Integer>, List<String>> resultSetMetaData;

        public CopyProcessor(CopyWriterTask task, int columnNumber,
                             Triple<List<String>, List<Integer>, List<String>> resultSetMetaData, LinkedBlockingQueue<Record> queueIn,
                             LinkedBlockingQueue<byte[]> queueOut) {
            this.task = task;
            this.columnNumber = columnNumber;
            this.resultSetMetaData = resultSetMetaData;
            this.queueIn = queueIn;
            this.queueOut = queueOut;
        }

        @Override
        public Long call() throws Exception {
            Thread.currentThread().setName("CopyProcessor");
            Record record = null;

            while (true) {
                record = queueIn.poll(1000L, TimeUnit.MILLISECONDS);

                if (record == null && false == task.moreRecord()) {
                    break;
                } else if (record == null) {
                    continue;
                }

                if (record.getColumnNumber() != this.columnNumber) {
                    // 源头读取字段列数与目的表字段写入列数不相等,直接报错
                    throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
                            String.format("列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 您可能配置了错误的表名称, 请检查您的配置并作出修改.",
                                    record.getColumnNumber(), this.columnNumber));
                }

                byte[] data = serializeRecord(record);

                if (data.length > MaxCsvSize) {
                    String s = new String(data).substring(0, 100) + "...";
                    LOG.warn("数据元组超过 {} 字节长度限制被忽略。" + s, MaxCsvSize);
                } else {
                    queueOut.put(data);
                }
            }

            return 0L;
        }

        /**
         * Any occurrence within the value of a QUOTE character or the ESCAPE
         * character is preceded by the escape character.
         */
        protected String escapeString(String data) {
            StringBuilder sb = new StringBuilder();

            for (int i = 0; i < data.length(); ++i) {
                char c = data.charAt(i);
                switch (c) {
                case 0x00:
                    LOG.warn("字符串中发现非法字符 0x00,已经将其删除");
                    continue;
                case QUOTE:
                case ESCAPE:
                    sb.append(ESCAPE);
                }

                sb.append(c);
            }
            return sb.toString();
        }

        /**
         * Non-printable characters are inserted as '\nnn' (octal) and '\' as '\\'.
         */
        protected String escapeBinary(byte[] data) {
            StringBuilder sb = new StringBuilder();

            for (int i = 0; i < data.length; ++i) {
                if (data[i] == '\\') {
                    sb.append('\\');
                    sb.append('\\');
                } else if (data[i] < 0x20 || data[i] > 0x7e) {
                    byte b = data[i];
                    char[] val = new char[3];
                    val[2] = (char) ((b & 07) + '0');
                    b >>= 3;
                    val[1] = (char) ((b & 07) + '0');
                    b >>= 3;
                    val[0] = (char) ((b & 03) + '0');
                    sb.append('\\');
                    sb.append(val);
                } else {
                    sb.append((char) (data[i]));
                }
            }

            return sb.toString();
        }

        protected byte[] serializeRecord(Record record) throws UnsupportedEncodingException {
            StringBuilder sb = new StringBuilder();
            Column column;
            for (int i = 0; i < this.columnNumber; i++) {
                column = record.getColumn(i);
                int columnSqltype = this.resultSetMetaData.getMiddle().get(i);

                switch (columnSqltype) {
                case Types.CHAR:
                case Types.NCHAR:
                case Types.VARCHAR:
                case Types.LONGVARCHAR:
                case Types.NVARCHAR:
                case Types.LONGNVARCHAR: {
                    String data = column.asString();

                    if (data != null) {
                        sb.append(QUOTE);
                        sb.append(escapeString(data));
                        sb.append(QUOTE);
                    }

                    break;
                }
                case Types.BINARY:
                case Types.BLOB:
                case Types.CLOB:
                case Types.LONGVARBINARY:
                case Types.NCLOB:
                case Types.VARBINARY: {
                    byte[] data = column.asBytes();

                    if (data != null) {
                        sb.append(escapeBinary(data));
                    }

                    break;
                }
                default: {
                    String data = column.asString();

                    if (data != null) {
                        sb.append(data);
                    }

                    break;
                }
                }

                if (i + 1 < this.columnNumber) {
                    sb.append(FIELD_DELIMITER);
                }
            }
            sb.append(NEWLINE);
            return sb.toString().getBytes("UTF-8");
        }
    }

CopyWorker.java

    package com.alibaba.datax.plugin.writer.greenplumwriter;

    import com.alibaba.datax.common.exception.DataXException;
    import com.alibaba.datax.plugin.rdbms.util.DBUtil;
    import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
    import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
    import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil;
    import org.postgresql.copy.CopyManager;
    import org.postgresql.core.BaseConnection;
    import org.postgresql.util.PSQLException;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.io.IOException;
    import java.io.PipedInputStream;
    import java.io.PipedOutputStream;
    import java.sql.Connection;
    import java.sql.SQLException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;

    public class CopyWorker implements Callable<Long> {
        private static final Logger LOG = LoggerFactory.getLogger(CopyWorker.class);

        private CopyWriterTask task = null;
        private Connection connection;
        private LinkedBlockingQueue<byte[]> queue = null;
        private FutureTask<Long> copyResult = null;
        private String sql = null;
        private PipedInputStream pipeIn = null;
        private PipedOutputStream pipeOut = null;
        private Thread copyBackendThread = null;

        public CopyWorker(CopyWriterTask task, String copySql, LinkedBlockingQueue<byte[]> queue) throws IOException {
            this.task = task;
            this.connection = task.createConnection();
            this.queue = queue;
            this.pipeOut = new PipedOutputStream();
            this.pipeIn = new PipedInputStream(pipeOut);
            this.sql = copySql;

            if (task.getMaxCsvLineSize() >= 1024) {
                changeCsvSizelimit(connection);
            }

            this.copyResult = new FutureTask<Long>(new Callable<Long>() {

                @Override
                public Long call() throws Exception {
                    try {
                        CopyManager mgr = new CopyManager((BaseConnection) connection);
                        return mgr.copyIn(sql, pipeIn);
                    } finally {
                        try {
                            pipeIn.close();
                        } catch (Exception ignore) {
                        }
                    }
                }
            });

            copyBackendThread = new Thread(copyResult);
            copyBackendThread.setName(sql);
            copyBackendThread.setDaemon(true);
            copyBackendThread.start();
        }

        @Override
        public Long call() throws Exception {
            Thread.currentThread().setName("CopyWorker");

            byte[] data = null;
            try {
                while (true) {
                    data = queue.poll(1000L, TimeUnit.MILLISECONDS);

                    if (data == null && false == task.moreData()) {
                        break;
                    } else if (data == null) {
                        continue;
                    }

                    pipeOut.write(data);
                }

                pipeOut.flush();
                pipeOut.close();
            } catch (Exception e) {
                try {
                    ((BaseConnection) connection).cancelQuery();
                } catch (SQLException ignore) {
                    // ignore if failed to cancel query
                }

                try {
                    copyBackendThread.interrupt();
                } catch (SecurityException ignore) {
                }

                try {
                    copyResult.get();
                } catch (ExecutionException exec) {
                    if (exec.getCause() instanceof PSQLException) {
                        throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, exec.getCause());
                    }
                    // ignore others
                } catch (Exception ignore) {
                }

                throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
            } finally {
                try {
                    pipeOut.close();
                } catch (Exception e) {
                    // ignore if failed to close pipe
                }

                try {
                    copyBackendThread.join(0);
                } catch (Exception e) {
                    // ignore if thread is interrupted
                }

                DBUtil.closeDBResources(null, null, connection);
            }

            try {
                Long count = copyResult.get();
                return count;
            } catch (Exception e) {
                throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
            }
        }

        private void changeCsvSizelimit(Connection conn) {
            List<String> sqls = new ArrayList<String>();
            sqls.add("set gp_max_csv_line_length = " + Integer.toString(task.getMaxCsvLineSize()));

            try {
                WriterUtil.executeSqls(conn, sqls, task.getJdbcUrl(), DataBaseType.PostgreSQL);
            } catch (Exception e) {
                LOG.warn("Cannot set gp_max_csv_line_length to " + task.getMaxCsvLineSize());
            }
        }
    }

CopyWriterJob.java

    package com.alibaba.datax.plugin.writer.greenplumwriter;

    import com.alibaba.datax.common.util.Configuration;
    import com.alibaba.datax.plugin.rdbms.util.DBUtil;
    import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
    import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
    import com.alibaba.datax.plugin.rdbms.writer.Constant;
    import com.alibaba.datax.plugin.rdbms.writer.Key;
    import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.sql.Connection;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    import java.util.ArrayList;
    import java.util.List;

    public class CopyWriterJob extends CommonRdbmsWriter.Job {
        private static final Logger LOG = LoggerFactory.getLogger(CopyWriterJob.class);
        private List<String> tables = null;

        public CopyWriterJob() {
            super(DataBaseType.PostgreSQL);
        }

        @Override
        public void init(Configuration originalConfig) {
            super.init(originalConfig);
        }

        @Override
        public void prepare(Configuration originalConfig) {
            String username = originalConfig.getString(Key.USERNAME);
            String password = originalConfig.getString(Key.PASSWORD);

            List<Object> conns = originalConfig.getList(Constant.CONN_MARK, Object.class);
            Configuration connConf = Configuration.from(conns.get(0).toString());

            // 这里的 jdbcUrl 已经 append 了合适后缀参数
            String jdbcUrl = connConf.getString(Key.JDBC_URL);
            tables = connConf.getList(Key.TABLE, String.class);

            Connection conn = DBUtil.getConnection(DataBaseType.PostgreSQL, jdbcUrl, username, password);

            List<String> sqls = new ArrayList<String>();

            for (String table : tables) {
                sqls.add("SELECT gp_truncate_error_log('" + table + "');");
                LOG.info("为 {} 清理 ERROR LOG. context info:{}.", table, jdbcUrl);
            }

            WriterUtil.executeSqls(conn, sqls, jdbcUrl, DataBaseType.PostgreSQL);
            DBUtil.closeDBResources(null, null, conn);

            super.prepare(originalConfig);
        }

        @Override
        public void post(Configuration originalConfig) {
            super.post(originalConfig);

            String username = originalConfig.getString(Key.USERNAME);
            String password = originalConfig.getString(Key.PASSWORD);

            // 已经由 prepare 进行了appendJDBCSuffix处理
            String jdbcUrl = originalConfig.getString(Key.JDBC_URL);

            Connection conn = DBUtil.getConnection(DataBaseType.PostgreSQL, jdbcUrl, username, password);

            for (String table : tables) {
                int errors = 0;
                ResultSet res = null;
                String sql = "SELECT count(*) from gp_read_error_log('" + table + "');";

                try {
                    res = DBUtil.query(conn, sql, 10);
                    if (res.next()) {
                        errors = res.getInt(1);
                    }
                    res.close();
                    conn.commit();
                } catch (SQLException e) {
                    LOG.debug("Fail to get error log info:" + e.getMessage());
                }

                if (errors > 0) {
                    LOG.warn("加载表 {} 时发现 {} 条错误数据, 使用 \"SELECT * from gp_read_error_log('{}');\" 查看详情", table,
                            errors, table);
                }
            }

            DBUtil.closeDBResources(null, null, conn);
        }
    }

CopyWriterTask.java

    package com.alibaba.datax.plugin.writer.greenplumwriter;

    import com.alibaba.datax.common.element.Record;
    import com.alibaba.datax.common.exception.DataXException;
    import com.alibaba.datax.common.plugin.RecordReceiver;
    import com.alibaba.datax.common.plugin.TaskPluginCollector;
    import com.alibaba.datax.common.util.Configuration;
    import com.alibaba.datax.plugin.rdbms.util.DBUtil;
    import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
    import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
    import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.sql.Connection;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.*;

    public class CopyWriterTask extends CommonRdbmsWriter.Task {
        private static final Logger LOG = LoggerFactory.getLogger(CopyWriterTask.class);
        private Configuration writerSliceConfig = null;
        private int numProcessor;
        private int maxCsvLineSize;
        private int numWriter;
        private int queueSize;
        private volatile boolean stopProcessor = false;
        private volatile boolean stopWriter = false;

        private CompletionService<Long> cs = null;

        public CopyWriterTask() {
            super(DataBaseType.PostgreSQL);
        }

        public String getJdbcUrl() {
            return this.jdbcUrl;
        }

        public Connection createConnection() {
            Connection connection = DBUtil.getConnection(this.dataBaseType, this.jdbcUrl, username, password);
            DBUtil.dealWithSessionConfig(connection, writerSliceConfig, this.dataBaseType, BASIC_MESSAGE);
            return connection;
        }

        private String constructColumnNameList(List<String> columnList) {
            List<String> columns = new ArrayList<String>();

            for (String column : columnList) {
                if (column.endsWith("\"") && column.startsWith("\"")) {
                    columns.add(column);
                } else {
                    columns.add("\"" + column + "\"");
                }
            }

            return StringUtils.join(columns, ",");
        }

        public String getCopySql(String tableName, List<String> columnList, int segment_reject_limit) {
            StringBuilder sb = new StringBuilder().append("COPY ").append(tableName).append("(")
                    .append(constructColumnNameList(columnList))
                    .append(") FROM STDIN WITH DELIMITER '|' NULL '' CSV QUOTE '\"' ESCAPE E'\\\\'");

            if (segment_reject_limit >= 2) {
                sb.append(" LOG ERRORS SEGMENT REJECT LIMIT ").append(segment_reject_limit).append(";");
            } else {
                sb.append(";");
            }

            String sql = sb.toString();
            return sql;
        }

        private void send(Record record, LinkedBlockingQueue<Record> queue)
                throws InterruptedException, ExecutionException {
            while (queue.offer(record, 1000, TimeUnit.MILLISECONDS) == false) {
                LOG.debug("Record queue is full, increase num_copy_processor for performance.");
                Future<Long> result = cs.poll();

                if (result != null) {
                    result.get();
                }
            }
        }

        public boolean moreRecord() {
            return !stopProcessor;
        }

        public boolean moreData() {
            return !stopWriter;
        }

        public int getMaxCsvLineSize() {
            return maxCsvLineSize;
        }

        @Override
        public void startWrite(RecordReceiver recordReceiver, Configuration writerSliceConfig,
                               TaskPluginCollector taskPluginCollector) {
            this.writerSliceConfig = writerSliceConfig;
            int segment_reject_limit = writerSliceConfig.getInt("segment_reject_limit", 0);
            this.queueSize = writerSliceConfig.getInt("copy_queue_size", 1000);
            this.queueSize = Math.max(this.queueSize, 10);
            this.numProcessor = writerSliceConfig.getInt("num_copy_processor", 4);
            this.numProcessor = Math.max(this.numProcessor, 1);
            this.numWriter = writerSliceConfig.getInt("num_copy_writer", 1);
            this.numWriter = Math.max(this.numWriter, 1);
            this.maxCsvLineSize = writerSliceConfig.getInt("max_csv_line_size", 0);

            String sql = getCopySql(this.table, this.columns, segment_reject_limit);
            LinkedBlockingQueue<Record> recordQueue = new LinkedBlockingQueue<Record>(queueSize);
            LinkedBlockingQueue<byte[]> dataQueue = new LinkedBlockingQueue<byte[]>(queueSize);
            ExecutorService threadPool;

            threadPool = Executors.newFixedThreadPool(this.numProcessor + this.numWriter);
            cs = new ExecutorCompletionService<Long>(threadPool);
            Connection connection = createConnection();

            try {

                this.resultSetMetaData = DBUtil.getColumnMetaData(connection, this.table,
                        constructColumnNameList(this.columns));
                for (int i = 0; i < numProcessor; i++) {
                    cs.submit(new CopyProcessor(this, this.columnNumber, resultSetMetaData, recordQueue, dataQueue));
                }

                for (int i = 0; i < numWriter; i++) {
                    cs.submit(new CopyWorker(this, sql, dataQueue));
                }

                Record record;
                while ((record = recordReceiver.getFromReader()) != null) {
                    send(record, recordQueue);
                    Future<Long> result = cs.poll();

                    if (result != null) {
                        result.get();
                    }
                }

                stopProcessor = true;
                for (int i = 0; i < numProcessor; i++) {
                    cs.take().get();
                }

                stopWriter = true;
                for (int i = 0; i < numWriter; i++) {
                    cs.take().get();
                }
            } catch (ExecutionException e) {
                throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e.getCause());
            } catch (Exception e) {
                throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
            } finally {
                threadPool.shutdownNow();
                DBUtil.closeDBResources(null, null, connection);
            }
        }
    }

GreenplumWriter.java

    package com.alibaba.datax.plugin.writer.greenplumwriter;

    import com.alibaba.datax.common.exception.DataXException;
    import com.alibaba.datax.common.plugin.RecordReceiver;
    import com.alibaba.datax.common.spi.Writer;
    import com.alibaba.datax.common.util.Configuration;
    import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
    import com.alibaba.datax.plugin.rdbms.writer.Key;

    import java.util.List;

    /**
     * @Description: GreenplumWriter
     * @Author: chenweifeng
     * @Date: 2022年08月11日 下午4:26
     **/
    public class GreenplumWriter extends Writer {

        public static class Job extends Writer.Job {
            private Configuration originalConfig = null;
            private CopyWriterJob copyJob;

            @Override
            public void init() {
                this.originalConfig = super.getPluginJobConf();

                // greenplumwriter
                String writeMode = this.originalConfig.getString(Key.WRITE_MODE);
                if (null != writeMode) {
                    throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
                            String.format("写入模式(writeMode)配置有误. 因为Greenplum Database不支持配置参数项 writeMode: %s, Greenplum Database仅使用insert sql 插入数据. 请检查您的配置并作出修改.", writeMode));
                }

                int segment_reject_limit = this.originalConfig.getInt("segment_reject_limit", 0);

                if (segment_reject_limit != 0 && segment_reject_limit < 2) {
                    throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, "segment_reject_limit 必须为0或者大于等于2");
                }

                this.copyJob = new CopyWriterJob();
                this.copyJob.init(this.originalConfig);
            }

            @Override
            public void prepare() {
                this.copyJob.prepare(this.originalConfig);
            }

            @Override
            public List<Configuration> split(int mandatoryNumber) {
                return this.copyJob.split(this.originalConfig, mandatoryNumber);
            }

            @Override
            public void post() {
                this.copyJob.post(this.originalConfig);
            }

            @Override
            public void destroy() {
                this.copyJob.destroy(this.originalConfig);
            }

        }

        public static class Task extends Writer.Task {
            private Configuration writerSliceConfig;
            private CopyWriterTask copyTask;

            @Override
            public void init() {
                this.writerSliceConfig = super.getPluginJobConf();
                this.copyTask = new CopyWriterTask();
                this.copyTask.init(this.writerSliceConfig);
            }

            @Override
            public void prepare() {
                this.copyTask.prepare(this.writerSliceConfig);
            }

            public void startWrite(RecordReceiver recordReceiver) {
                this.copyTask.startWrite(recordReceiver, this.writerSliceConfig,
                        super.getTaskPluginCollector());
            }

            @Override
            public void post() {
                this.copyTask.post(this.writerSliceConfig);
            }

            @Override
            public void destroy() {
                this.copyTask.destroy(this.writerSliceConfig);
            }
        }
    }

plugin.json

    {
        "name": "greenplumwriter",
        "class": "com.alibaba.datax.plugin.writer.greenplumwriter.GreenplumWriter",
        "description": "简单插件,有待测试验证.  原理: copy in",
        "developer": "Carson"
    }

plugin_template_job.json

   {
      "name": "greenplumwriter",
      "parameter": {
        "username": "",
        "password": "",
        "segment_reject_limit": 0,
        "column": [],
        "copy_queue_size": [],
        "num_copy_processor": 1,
        "num_copy_writer": 1,
        "connection": [
          {
            "jdbcUrl": "",
            "table": []
          }
        ],
        "preSql": [],
        "postSql": []
      }
    }

pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>datax-all</artifactId>
            <groupId>com.alibaba.datax</groupId>
            <version>0.0.1-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>

        <artifactId>greenplumwriter</artifactId>

        <properties>

        </properties>

        <dependencies>
            <dependency>
                <groupId>com.alibaba.datax</groupId>
                <artifactId>datax-common</artifactId>
                <version>${datax-project-version}</version>
                <exclusions>
                    <exclusion>
                        <artifactId>slf4j-log4j12</artifactId>
                        <groupId>org.slf4j</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>com.alibaba.datax</groupId>
                <artifactId>plugin-rdbms-util</artifactId>
                <version>${datax-project-version}</version>
            </dependency>
            <dependency>
                <groupId>com.pivotal</groupId>
                <artifactId>greenplum-jdbc</artifactId>
                <version>5.1.4</version>
            </dependency>
            <dependency>
                <groupId>org.postgresql</groupId>
                <artifactId>postgresql</artifactId>
                <version>42.2.23</version>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-dbcp2</artifactId>
                <version>2.7.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.11</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
            </dependency>
        </dependencies>

        <build>
            <plugins>
                <!-- compiler plugin -->
                <plugin>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>${project-sourceEncoding}</encoding>
                    </configuration>
                </plugin>
                <!-- assembly plugin -->
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <descriptors>
                            <descriptor>src/main/assembly/package.xml</descriptor>
                        </descriptors>
                        <finalName>datax</finalName>
                    </configuration>
                    <executions>
                        <execution>
                            <id>dwzip</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>


    </project>

4、运行测试

设计一个34个字段的表,分别是id、c1……c32、create_time,写入320w左右的数据,本次测试是mysql -> greenplum,数据大小大概300M左右

本地测试结果:

同集群的服务器上测试结果:

作者:Jeebiz  创建时间:2022-10-16 01:38
 更新时间:2024-07-10 22:56