https://blog.csdn.net/Carson073/article/details/126303039
https://blog.csdn.net/carson073/category_11450049.html
1 快速介绍
GreenplumWriter插件实现了写入数据到 Greenplum Database 主库目的表的功能。在底层实现上,GreenplumWriter通过JDBC连接远程 Greenplum 数据库,并执行相应的 Copy FROM 语句将数据写入 Greenplum。
GreenplumWriter面向ETL开发工程师,他们使用GreenplumWriter从数仓导入数据到Greenplum。同时 GreenplumWriter亦可以作为数据迁移工具为DBA等用户提供服务。
2 实现原理
GreenplumWriter通过 DataX 框架获取 Reader 生成的协议数据,根据你配置生成相应的SQL插入语句
copy from ...
注意:
1. GreenplumWriter和MysqlWriter不同,不支持配置writeMode参数。
3 代码实现
1、根目录的代码
1、根目录的papackage.xml新增打包代码
<fileSet>
<directory>greenplumwriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
2、根目录的pom.xml新增模块
<module>greenplumwriter</module>
2、greenplumwriter模块下的代码
package.xml
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
</includes>
<outputDirectory>plugin/writer/greenplumwriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>greenplumwriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/greenplumwriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/greenplumwriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>
CopyProcessor.java
package com.alibaba.datax.plugin.writer.greenplumwriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import org.apache.commons.lang3.tuple.Triple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.sql.Types;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class CopyProcessor implements Callable<Long> {
private static final char FIELD_DELIMITER = '|';
private static final char NEWLINE = '\n';
private static final char QUOTE = '"';
private static final char ESCAPE = '\\';
private static int MaxCsvSize = 4194304;
private static final Logger LOG = LoggerFactory.getLogger(CopyProcessor.class);
private int columnNumber;
private CopyWriterTask task;
private LinkedBlockingQueue<Record> queueIn;
private LinkedBlockingQueue<byte[]> queueOut;
private Triple<List<String>, List<Integer>, List<String>> resultSetMetaData;
public CopyProcessor(CopyWriterTask task, int columnNumber,
Triple<List<String>, List<Integer>, List<String>> resultSetMetaData, LinkedBlockingQueue<Record> queueIn,
LinkedBlockingQueue<byte[]> queueOut) {
this.task = task;
this.columnNumber = columnNumber;
this.resultSetMetaData = resultSetMetaData;
this.queueIn = queueIn;
this.queueOut = queueOut;
}
@Override
public Long call() throws Exception {
Thread.currentThread().setName("CopyProcessor");
Record record = null;
while (true) {
record = queueIn.poll(1000L, TimeUnit.MILLISECONDS);
if (record == null && false == task.moreRecord()) {
break;
} else if (record == null) {
continue;
}
if (record.getColumnNumber() != this.columnNumber) {
// 源头读取字段列数与目的表字段写入列数不相等,直接报错
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
String.format("列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 您可能配置了错误的表名称, 请检查您的配置并作出修改.",
record.getColumnNumber(), this.columnNumber));
}
byte[] data = serializeRecord(record);
if (data.length > MaxCsvSize) {
String s = new String(data).substring(0, 100) + "...";
LOG.warn("数据元组超过 {} 字节长度限制被忽略。" + s, MaxCsvSize);
} else {
queueOut.put(data);
}
}
return 0L;
}
/**
* Any occurrence within the value of a QUOTE character or the ESCAPE
* character is preceded by the escape character.
*/
protected String escapeString(String data) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < data.length(); ++i) {
char c = data.charAt(i);
switch (c) {
case 0x00:
LOG.warn("字符串中发现非法字符 0x00,已经将其删除");
continue;
case QUOTE:
case ESCAPE:
sb.append(ESCAPE);
}
sb.append(c);
}
return sb.toString();
}
/**
* Non-printable characters are inserted as '\nnn' (octal) and '\' as '\\'.
*/
protected String escapeBinary(byte[] data) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < data.length; ++i) {
if (data[i] == '\\') {
sb.append('\\');
sb.append('\\');
} else if (data[i] < 0x20 || data[i] > 0x7e) {
byte b = data[i];
char[] val = new char[3];
val[2] = (char) ((b & 07) + '0');
b >>= 3;
val[1] = (char) ((b & 07) + '0');
b >>= 3;
val[0] = (char) ((b & 03) + '0');
sb.append('\\');
sb.append(val);
} else {
sb.append((char) (data[i]));
}
}
return sb.toString();
}
protected byte[] serializeRecord(Record record) throws UnsupportedEncodingException {
StringBuilder sb = new StringBuilder();
Column column;
for (int i = 0; i < this.columnNumber; i++) {
column = record.getColumn(i);
int columnSqltype = this.resultSetMetaData.getMiddle().get(i);
switch (columnSqltype) {
case Types.CHAR:
case Types.NCHAR:
case Types.VARCHAR:
case Types.LONGVARCHAR:
case Types.NVARCHAR:
case Types.LONGNVARCHAR: {
String data = column.asString();
if (data != null) {
sb.append(QUOTE);
sb.append(escapeString(data));
sb.append(QUOTE);
}
break;
}
case Types.BINARY:
case Types.BLOB:
case Types.CLOB:
case Types.LONGVARBINARY:
case Types.NCLOB:
case Types.VARBINARY: {
byte[] data = column.asBytes();
if (data != null) {
sb.append(escapeBinary(data));
}
break;
}
default: {
String data = column.asString();
if (data != null) {
sb.append(data);
}
break;
}
}
if (i + 1 < this.columnNumber) {
sb.append(FIELD_DELIMITER);
}
}
sb.append(NEWLINE);
return sb.toString().getBytes("UTF-8");
}
}
CopyWorker.java
package com.alibaba.datax.plugin.writer.greenplumwriter;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
import org.postgresql.util.PSQLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class CopyWorker implements Callable<Long> {
private static final Logger LOG = LoggerFactory.getLogger(CopyWorker.class);
private CopyWriterTask task = null;
private Connection connection;
private LinkedBlockingQueue<byte[]> queue = null;
private FutureTask<Long> copyResult = null;
private String sql = null;
private PipedInputStream pipeIn = null;
private PipedOutputStream pipeOut = null;
private Thread copyBackendThread = null;
public CopyWorker(CopyWriterTask task, String copySql, LinkedBlockingQueue<byte[]> queue) throws IOException {
this.task = task;
this.connection = task.createConnection();
this.queue = queue;
this.pipeOut = new PipedOutputStream();
this.pipeIn = new PipedInputStream(pipeOut);
this.sql = copySql;
if (task.getMaxCsvLineSize() >= 1024) {
changeCsvSizelimit(connection);
}
this.copyResult = new FutureTask<Long>(new Callable<Long>() {
@Override
public Long call() throws Exception {
try {
CopyManager mgr = new CopyManager((BaseConnection) connection);
return mgr.copyIn(sql, pipeIn);
} finally {
try {
pipeIn.close();
} catch (Exception ignore) {
}
}
}
});
copyBackendThread = new Thread(copyResult);
copyBackendThread.setName(sql);
copyBackendThread.setDaemon(true);
copyBackendThread.start();
}
@Override
public Long call() throws Exception {
Thread.currentThread().setName("CopyWorker");
byte[] data = null;
try {
while (true) {
data = queue.poll(1000L, TimeUnit.MILLISECONDS);
if (data == null && false == task.moreData()) {
break;
} else if (data == null) {
continue;
}
pipeOut.write(data);
}
pipeOut.flush();
pipeOut.close();
} catch (Exception e) {
try {
((BaseConnection) connection).cancelQuery();
} catch (SQLException ignore) {
// ignore if failed to cancel query
}
try {
copyBackendThread.interrupt();
} catch (SecurityException ignore) {
}
try {
copyResult.get();
} catch (ExecutionException exec) {
if (exec.getCause() instanceof PSQLException) {
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, exec.getCause());
}
// ignore others
} catch (Exception ignore) {
}
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
} finally {
try {
pipeOut.close();
} catch (Exception e) {
// ignore if failed to close pipe
}
try {
copyBackendThread.join(0);
} catch (Exception e) {
// ignore if thread is interrupted
}
DBUtil.closeDBResources(null, null, connection);
}
try {
Long count = copyResult.get();
return count;
} catch (Exception e) {
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
}
}
private void changeCsvSizelimit(Connection conn) {
List<String> sqls = new ArrayList<String>();
sqls.add("set gp_max_csv_line_length = " + Integer.toString(task.getMaxCsvLineSize()));
try {
WriterUtil.executeSqls(conn, sqls, task.getJdbcUrl(), DataBaseType.PostgreSQL);
} catch (Exception e) {
LOG.warn("Cannot set gp_max_csv_line_length to " + task.getMaxCsvLineSize());
}
}
}
CopyWriterJob.java
package com.alibaba.datax.plugin.writer.greenplumwriter;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.datax.plugin.rdbms.writer.Key;
import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
public class CopyWriterJob extends CommonRdbmsWriter.Job {
private static final Logger LOG = LoggerFactory.getLogger(CopyWriterJob.class);
private List<String> tables = null;
public CopyWriterJob() {
super(DataBaseType.PostgreSQL);
}
@Override
public void init(Configuration originalConfig) {
super.init(originalConfig);
}
@Override
public void prepare(Configuration originalConfig) {
String username = originalConfig.getString(Key.USERNAME);
String password = originalConfig.getString(Key.PASSWORD);
List<Object> conns = originalConfig.getList(Constant.CONN_MARK, Object.class);
Configuration connConf = Configuration.from(conns.get(0).toString());
// 这里的 jdbcUrl 已经 append 了合适后缀参数
String jdbcUrl = connConf.getString(Key.JDBC_URL);
tables = connConf.getList(Key.TABLE, String.class);
Connection conn = DBUtil.getConnection(DataBaseType.PostgreSQL, jdbcUrl, username, password);
List<String> sqls = new ArrayList<String>();
for (String table : tables) {
sqls.add("SELECT gp_truncate_error_log('" + table + "');");
LOG.info("为 {} 清理 ERROR LOG. context info:{}.", table, jdbcUrl);
}
WriterUtil.executeSqls(conn, sqls, jdbcUrl, DataBaseType.PostgreSQL);
DBUtil.closeDBResources(null, null, conn);
super.prepare(originalConfig);
}
@Override
public void post(Configuration originalConfig) {
super.post(originalConfig);
String username = originalConfig.getString(Key.USERNAME);
String password = originalConfig.getString(Key.PASSWORD);
// 已经由 prepare 进行了appendJDBCSuffix处理
String jdbcUrl = originalConfig.getString(Key.JDBC_URL);
Connection conn = DBUtil.getConnection(DataBaseType.PostgreSQL, jdbcUrl, username, password);
for (String table : tables) {
int errors = 0;
ResultSet res = null;
String sql = "SELECT count(*) from gp_read_error_log('" + table + "');";
try {
res = DBUtil.query(conn, sql, 10);
if (res.next()) {
errors = res.getInt(1);
}
res.close();
conn.commit();
} catch (SQLException e) {
LOG.debug("Fail to get error log info:" + e.getMessage());
}
if (errors > 0) {
LOG.warn("加载表 {} 时发现 {} 条错误数据, 使用 \"SELECT * from gp_read_error_log('{}');\" 查看详情", table,
errors, table);
}
}
DBUtil.closeDBResources(null, null, conn);
}
}
CopyWriterTask.java
package com.alibaba.datax.plugin.writer.greenplumwriter;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class CopyWriterTask extends CommonRdbmsWriter.Task {
private static final Logger LOG = LoggerFactory.getLogger(CopyWriterTask.class);
private Configuration writerSliceConfig = null;
private int numProcessor;
private int maxCsvLineSize;
private int numWriter;
private int queueSize;
private volatile boolean stopProcessor = false;
private volatile boolean stopWriter = false;
private CompletionService<Long> cs = null;
public CopyWriterTask() {
super(DataBaseType.PostgreSQL);
}
public String getJdbcUrl() {
return this.jdbcUrl;
}
public Connection createConnection() {
Connection connection = DBUtil.getConnection(this.dataBaseType, this.jdbcUrl, username, password);
DBUtil.dealWithSessionConfig(connection, writerSliceConfig, this.dataBaseType, BASIC_MESSAGE);
return connection;
}
private String constructColumnNameList(List<String> columnList) {
List<String> columns = new ArrayList<String>();
for (String column : columnList) {
if (column.endsWith("\"") && column.startsWith("\"")) {
columns.add(column);
} else {
columns.add("\"" + column + "\"");
}
}
return StringUtils.join(columns, ",");
}
public String getCopySql(String tableName, List<String> columnList, int segment_reject_limit) {
StringBuilder sb = new StringBuilder().append("COPY ").append(tableName).append("(")
.append(constructColumnNameList(columnList))
.append(") FROM STDIN WITH DELIMITER '|' NULL '' CSV QUOTE '\"' ESCAPE E'\\\\'");
if (segment_reject_limit >= 2) {
sb.append(" LOG ERRORS SEGMENT REJECT LIMIT ").append(segment_reject_limit).append(";");
} else {
sb.append(";");
}
String sql = sb.toString();
return sql;
}
private void send(Record record, LinkedBlockingQueue<Record> queue)
throws InterruptedException, ExecutionException {
while (queue.offer(record, 1000, TimeUnit.MILLISECONDS) == false) {
LOG.debug("Record queue is full, increase num_copy_processor for performance.");
Future<Long> result = cs.poll();
if (result != null) {
result.get();
}
}
}
public boolean moreRecord() {
return !stopProcessor;
}
public boolean moreData() {
return !stopWriter;
}
public int getMaxCsvLineSize() {
return maxCsvLineSize;
}
@Override
public void startWrite(RecordReceiver recordReceiver, Configuration writerSliceConfig,
TaskPluginCollector taskPluginCollector) {
this.writerSliceConfig = writerSliceConfig;
int segment_reject_limit = writerSliceConfig.getInt("segment_reject_limit", 0);
this.queueSize = writerSliceConfig.getInt("copy_queue_size", 1000);
this.queueSize = Math.max(this.queueSize, 10);
this.numProcessor = writerSliceConfig.getInt("num_copy_processor", 4);
this.numProcessor = Math.max(this.numProcessor, 1);
this.numWriter = writerSliceConfig.getInt("num_copy_writer", 1);
this.numWriter = Math.max(this.numWriter, 1);
this.maxCsvLineSize = writerSliceConfig.getInt("max_csv_line_size", 0);
String sql = getCopySql(this.table, this.columns, segment_reject_limit);
LinkedBlockingQueue<Record> recordQueue = new LinkedBlockingQueue<Record>(queueSize);
LinkedBlockingQueue<byte[]> dataQueue = new LinkedBlockingQueue<byte[]>(queueSize);
ExecutorService threadPool;
threadPool = Executors.newFixedThreadPool(this.numProcessor + this.numWriter);
cs = new ExecutorCompletionService<Long>(threadPool);
Connection connection = createConnection();
try {
this.resultSetMetaData = DBUtil.getColumnMetaData(connection, this.table,
constructColumnNameList(this.columns));
for (int i = 0; i < numProcessor; i++) {
cs.submit(new CopyProcessor(this, this.columnNumber, resultSetMetaData, recordQueue, dataQueue));
}
for (int i = 0; i < numWriter; i++) {
cs.submit(new CopyWorker(this, sql, dataQueue));
}
Record record;
while ((record = recordReceiver.getFromReader()) != null) {
send(record, recordQueue);
Future<Long> result = cs.poll();
if (result != null) {
result.get();
}
}
stopProcessor = true;
for (int i = 0; i < numProcessor; i++) {
cs.take().get();
}
stopWriter = true;
for (int i = 0; i < numWriter; i++) {
cs.take().get();
}
} catch (ExecutionException e) {
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e.getCause());
} catch (Exception e) {
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
} finally {
threadPool.shutdownNow();
DBUtil.closeDBResources(null, null, connection);
}
}
}
GreenplumWriter.java
package com.alibaba.datax.plugin.writer.greenplumwriter;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.writer.Key;
import java.util.List;
/**
* @Description: GreenplumWriter
* @Author: chenweifeng
* @Date: 2022年08月11日 下午4:26
**/
public class GreenplumWriter extends Writer {
public static class Job extends Writer.Job {
private Configuration originalConfig = null;
private CopyWriterJob copyJob;
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
// greenplumwriter
String writeMode = this.originalConfig.getString(Key.WRITE_MODE);
if (null != writeMode) {
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
String.format("写入模式(writeMode)配置有误. 因为Greenplum Database不支持配置参数项 writeMode: %s, Greenplum Database仅使用insert sql 插入数据. 请检查您的配置并作出修改.", writeMode));
}
int segment_reject_limit = this.originalConfig.getInt("segment_reject_limit", 0);
if (segment_reject_limit != 0 && segment_reject_limit < 2) {
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, "segment_reject_limit 必须为0或者大于等于2");
}
this.copyJob = new CopyWriterJob();
this.copyJob.init(this.originalConfig);
}
@Override
public void prepare() {
this.copyJob.prepare(this.originalConfig);
}
@Override
public List<Configuration> split(int mandatoryNumber) {
return this.copyJob.split(this.originalConfig, mandatoryNumber);
}
@Override
public void post() {
this.copyJob.post(this.originalConfig);
}
@Override
public void destroy() {
this.copyJob.destroy(this.originalConfig);
}
}
public static class Task extends Writer.Task {
private Configuration writerSliceConfig;
private CopyWriterTask copyTask;
@Override
public void init() {
this.writerSliceConfig = super.getPluginJobConf();
this.copyTask = new CopyWriterTask();
this.copyTask.init(this.writerSliceConfig);
}
@Override
public void prepare() {
this.copyTask.prepare(this.writerSliceConfig);
}
public void startWrite(RecordReceiver recordReceiver) {
this.copyTask.startWrite(recordReceiver, this.writerSliceConfig,
super.getTaskPluginCollector());
}
@Override
public void post() {
this.copyTask.post(this.writerSliceConfig);
}
@Override
public void destroy() {
this.copyTask.destroy(this.writerSliceConfig);
}
}
}
plugin.json
{
"name": "greenplumwriter",
"class": "com.alibaba.datax.plugin.writer.greenplumwriter.GreenplumWriter",
"description": "简单插件,有待测试验证. 原理: copy in",
"developer": "Carson"
}
plugin_template_job.json
{
"name": "greenplumwriter",
"parameter": {
"username": "",
"password": "",
"segment_reject_limit": 0,
"column": [],
"copy_queue_size": [],
"num_copy_processor": 1,
"num_copy_writer": 1,
"connection": [
{
"jdbcUrl": "",
"table": []
}
],
"preSql": [],
"postSql": []
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>greenplumwriter</artifactId>
<properties>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>${datax-project-version}</version>
</dependency>
<dependency>
<groupId>com.pivotal</groupId>
<artifactId>greenplum-jdbc</artifactId>
<version>5.1.4</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.23</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>2.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.11</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
4、运行测试
设计一个34个字段的表,分别是id、c1……c32、create_time,写入320w左右的数据,本次测试是mysql -> greenplum,数据大小大概300M左右
本地测试结果:
同集群的服务器上测试结果:
更新时间:2024-07-10 22:56