https://blog.csdn.net/Carson073/article/details/126156443
1、修改根目录的pom文件
新增hivereader、hivewriter
<modules>
<module>common</module>
<module>core</module>
<module>transformer</module>
<!-- reader -->
<module>mysqlreader</module>
<module>drdsreader</module>
<module>sqlserverreader</module>
<module>postgresqlreader</module>
<module>kingbaseesreader</module>
<module>oraclereader</module>
<module>odpsreader</module>
<module>otsreader</module>
<module>otsstreamreader</module>
<module>txtfilereader</module>
<module>hdfsreader</module>
<module>streamreader</module>
<module>ossreader</module>
<module>ftpreader</module>
<module>mongodbreader</module>
<module>rdbmsreader</module>
<module>hbase11xreader</module>
<module>hbase094xreader</module>
<module>tsdbreader</module>
<module>opentsdbreader</module>
<module>cassandrareader</module>
<module>gdbreader</module>
<module>oceanbasev10reader</module>
<module>hivereader</module>
<!-- writer -->
<module>mysqlwriter</module>
<module>tdenginewriter</module>
<module>drdswriter</module>
<module>odpswriter</module>
<module>txtfilewriter</module>
<module>ftpwriter</module>
<module>hdfswriter</module>
<module>streamwriter</module>
<module>otswriter</module>
<module>oraclewriter</module>
<module>sqlserverwriter</module>
<module>postgresqlwriter</module>
<module>kingbaseeswriter</module>
<module>osswriter</module>
<module>mongodbwriter</module>
<module>adswriter</module>
<module>ocswriter</module>
<module>rdbmswriter</module>
<module>hbase11xwriter</module>
<module>hbase094xwriter</module>
<module>hbase11xsqlwriter</module>
<module>hbase11xsqlreader</module>
<module>elasticsearchwriter</module>
<module>tsdbwriter</module>
<module>adbpgwriter</module>
<module>gdbwriter</module>
<module>cassandrawriter</module>
<module>clickhousewriter</module>
<module>oscarwriter</module>
<module>oceanbasev10writer</module>
<!-- common support module -->
<module>plugin-rdbms-util</module>
<module>plugin-unstructured-storage-util</module>
<module>hbase20xsqlreader</module>
<module>hbase20xsqlwriter</module>
<module>kuduwriter</module>
<module>tdenginereader</module>
<module>hivewriter</module>
</modules>
2、修改根目录的package.xml
新增hivereader、hivewriter的打包依赖
<fileSet>
<directory>hivereader/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>hivewriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
3、新建hivereader模块
项目结构
package.xml
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/reader/hivereader</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>hivereader-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/reader/hivereader</outputDirectory>
</fileSet>
<!--<fileSet>-->
<!--<directory>src/main/cpp</directory>-->
<!--<includes>-->
<!--<include>libhadoop.a</include>-->
<!--<include>libhadoop.so</include>-->
<!--<include>libhadoop.so.1.0.0</include>-->
<!--<include>libhadooppipes.a</include>-->
<!--<include>libhadooputils.a</include>-->
<!--<include>libhdfs.a</include>-->
<!--<include>libhdfs.so</include>-->
<!--<include>libhdfs.so.0.0.0</include>-->
<!--</includes>-->
<!--<outputDirectory>plugin/reader/hdfsreader/libs</outputDirectory>-->
<!--</fileSet>-->
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/reader/hivereader/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>
Constant.class
package com.alibaba.datax.plugin.reader.hivereader;
public class Constant {
public final static String TEMP_DATABASE_DEFAULT = "default"; // 参考CDH的default库
public static final String TEMP_DATABSE_HDFS_LOCATION_DEFAULT = "/user/{username}/warehouse/";// 参考CDH的default库的路径
public static final String TEMP_TABLE_NAME_PREFIX="tmp_datax_hivereader_";
// public final static String HIVE_CMD_DEFAULT = "hive"; //
public final static String HIVE_SQL_SET_DEFAULT = ""; //
public final static String FIELDDELIMITER_DEFAULT = "\\u0001"; //
public final static String NULL_FORMAT_DEFAULT="\\N" ;
public static final String TEXT = "TEXT";
public static final String ORC = "ORC";
public static final String CSV = "CSV";
public static final String SEQ = "SEQ";
public static final String RC = "RC";
}
DFSUtil.class
package com.alibaba.datax.plugin.reader.hivereader;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.unstructuredstorage.reader.ColumnEntry;
import com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderErrorCode;
import com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.io.RCFileRecordReader;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable;
import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.*;
public class DFSUtil {
private static final Logger LOG = LoggerFactory.getLogger(HiveReader.Job.class);
private org.apache.hadoop.conf.Configuration hadoopConf = null;
private String username = null;
private String specifiedFileType = null;
private Boolean haveKerberos = false;
private String kerberosKeytabFilePath;
private String kerberosPrincipal;
private static final int DIRECTORY_SIZE_GUESS = 16 * 1024;
public static final String HDFS_DEFAULTFS_KEY = "fs.defaultFS";
public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";
public DFSUtil(Configuration taskConfig) {
hadoopConf = new org.apache.hadoop.conf.Configuration();
//io.file.buffer.size 性能参数
//http://blog.csdn.net/yangjl38/article/details/7583374
Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG);
JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(taskConfig.getString(Key.HADOOP_CONFIG));
if (null != hadoopSiteParams) {
Set<String> paramKeys = hadoopSiteParams.getKeys();
for (String each : paramKeys) {
hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each));
}
}
hadoopConf.set(HDFS_DEFAULTFS_KEY, taskConfig.getString(Key.DEFAULT_FS));
this.username = taskConfig.getString(Key.USERNAME);
System.setProperty("HADOOP_USER_NAME", this.username);
//是否有Kerberos认证
this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false);
if (haveKerberos) {
this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);
this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL);
this.hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos");
}
this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath);
LOG.info(String.format("hadoopConfig details:%s", JSON.toJSONString(this.hadoopConf)));
}
private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath) {
if (haveKerberos && StringUtils.isNotBlank(this.kerberosPrincipal) && StringUtils.isNotBlank(this.kerberosKeytabFilePath)) {
UserGroupInformation.setConfiguration(this.hadoopConf);
try {
UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
} catch (Exception e) {
String message = String.format("kerberos认证失败,请确定kerberosKeytabFilePath[%s]和kerberosPrincipal[%s]填写正确",
kerberosKeytabFilePath, kerberosPrincipal);
throw DataXException.asDataXException(HiveReaderErrorCode.KERBEROS_LOGIN_ERROR, message, e);
}
}
}
/**
* 获取指定路径列表下符合条件的所有文件的绝对路径
*
* @param srcPaths 路径列表
* @param specifiedFileType 指定文件类型
*/
public HashSet<String> getAllFiles(List<String> srcPaths, String specifiedFileType) {
this.specifiedFileType = specifiedFileType;
if (!srcPaths.isEmpty()) {
for (String eachPath : srcPaths) {
LOG.info(String.format("get HDFS all files in path = [%s]", eachPath));
getHDFSAllFiles(eachPath);
}
}
return sourceHDFSAllFilesList;
}
private HashSet<String> sourceHDFSAllFilesList = new HashSet<String>();
public HashSet<String> getHDFSAllFiles(String hdfsPath) {
try {
FileSystem hdfs = FileSystem.get(new URI(hadoopConf.get(HDFS_DEFAULTFS_KEY)),hadoopConf,username);
//判断hdfsPath是否包含正则符号
if (hdfsPath.contains("*") || hdfsPath.contains("?")) {
Path path = new Path(hdfsPath);
FileStatus stats[] = hdfs.globStatus(path);
for (FileStatus f : stats) {
if (f.isFile()) {
if (f.getLen() == 0) {
String message = String.format("文件[%s]长度为0,将会跳过不作处理!", hdfsPath);
LOG.warn(message);
} else {
addSourceFileByType(f.getPath().toString());
}
} else if (f.isDirectory()) {
getHDFSAllFilesNORegex(f.getPath().toString(), hdfs);
}
}
} else {
getHDFSAllFilesNORegex(hdfsPath, hdfs);
}
return sourceHDFSAllFilesList;
} catch (IOException | InterruptedException | URISyntaxException e) {
String message = String.format("无法读取路径[%s]下的所有文件,请确认您的配置项fs.defaultFS, path的值是否正确," +
"是否有读写权限,网络是否已断开!", hdfsPath);
LOG.error(message);
throw DataXException.asDataXException(HiveReaderErrorCode.PATH_CONFIG_ERROR, e);
}
}
private HashSet<String> getHDFSAllFilesNORegex(String path, FileSystem hdfs) throws IOException {
// 获取要读取的文件的根目录
Path listFiles = new Path(path);
// If the network disconnected, this method will retry 45 times
// each time the retry interval for 20 seconds
// 获取要读取的文件的根目录的所有二级子文件目录
FileStatus stats[] = hdfs.listStatus(listFiles);
for (FileStatus f : stats) {
// 判断是不是目录,如果是目录,递归调用
if (f.isDirectory()) {
LOG.info(String.format("[%s] 是目录, 递归获取该目录下的文件", f.getPath().toString()));
getHDFSAllFilesNORegex(f.getPath().toString(), hdfs);
} else if (f.isFile()) {
addSourceFileByType(f.getPath().toString());
} else {
String message = String.format("该路径[%s]文件类型既不是目录也不是文件,插件自动忽略。",
f.getPath().toString());
LOG.info(message);
}
}
return sourceHDFSAllFilesList;
}
// 根据用户指定的文件类型,将指定的文件类型的路径加入sourceHDFSAllFilesList
private void addSourceFileByType(String filePath) {
// 检查file的类型和用户配置的fileType类型是否一致
boolean isMatchedFileType = checkHdfsFileType(filePath, this.specifiedFileType);
if (isMatchedFileType) {
LOG.info(String.format("[%s]是[%s]类型的文件, 将该文件加入source files列表", filePath, this.specifiedFileType));
sourceHDFSAllFilesList.add(filePath);
} else {
String message = String.format("文件[%s]的类型与用户配置的fileType类型不一致," +
"请确认您配置的目录下面所有文件的类型均为[%s]"
, filePath, this.specifiedFileType);
LOG.error(message);
throw DataXException.asDataXException(
HiveReaderErrorCode.FILE_TYPE_UNSUPPORT, message);
}
}
public InputStream getInputStream(String filepath) {
InputStream inputStream;
Path path = new Path(filepath);
try {
FileSystem fs = FileSystem.get(new URI(hadoopConf.get(HDFS_DEFAULTFS_KEY)),hadoopConf,username);
//If the network disconnected, this method will retry 45 times
//each time the retry interval for 20 seconds
inputStream = fs.open(path);
return inputStream;
} catch (IOException | URISyntaxException | InterruptedException e) {
String message = String.format("读取文件 : [%s] 时出错,请确认文件:[%s]存在且配置的用户有权限读取", filepath, filepath);
throw DataXException.asDataXException(HiveReaderErrorCode.READ_FILE_ERROR, message, e);
}
}
public void sequenceFileStartRead(String sourceSequenceFilePath, Configuration readerSliceConfig,
RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
LOG.info(String.format("Start Read sequence file [%s].", sourceSequenceFilePath));
Path seqFilePath = new Path(sourceSequenceFilePath);
SequenceFile.Reader reader = null;
try {
//获取SequenceFile.Reader实例
reader = new SequenceFile.Reader(this.hadoopConf,
SequenceFile.Reader.file(seqFilePath));
//获取key 与 value
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), this.hadoopConf);
Text value = new Text();
while (reader.next(key, value)) {
if (StringUtils.isNotBlank(value.toString())) {
UnstructuredStorageReaderUtil.transportOneRecord(recordSender,
readerSliceConfig, taskPluginCollector, value.toString());
}
}
} catch (Exception e) {
String message = String.format("SequenceFile.Reader读取文件[%s]时出错", sourceSequenceFilePath);
LOG.error(message);
throw DataXException.asDataXException(HiveReaderErrorCode.READ_SEQUENCEFILE_ERROR, message, e);
} finally {
IOUtils.closeStream(reader);
LOG.info("Finally, Close stream SequenceFile.Reader.");
}
}
public void rcFileStartRead(String sourceRcFilePath, Configuration readerSliceConfig,
RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
LOG.info(String.format("Start Read rcfile [%s].", sourceRcFilePath));
List<ColumnEntry> column = UnstructuredStorageReaderUtil
.getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
// warn: no default value '\N'
String nullFormat = readerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
Path rcFilePath = new Path(sourceRcFilePath);
FileSystem fs = null;
RCFileRecordReader recordReader = null;
try {
fs = FileSystem.get(rcFilePath.toUri(), hadoopConf,username);
long fileLen = fs.getFileStatus(rcFilePath).getLen();
FileSplit split = new FileSplit(rcFilePath, 0, fileLen, (String[]) null);
recordReader = new RCFileRecordReader(hadoopConf, split);
LongWritable key = new LongWritable();
BytesRefArrayWritable value = new BytesRefArrayWritable();
Text txt = new Text();
while (recordReader.next(key, value)) {
String[] sourceLine = new String[value.size()];
txt.clear();
for (int i = 0; i < value.size(); i++) {
BytesRefWritable v = value.get(i);
txt.set(v.getData(), v.getStart(), v.getLength());
sourceLine[i] = txt.toString();
}
UnstructuredStorageReaderUtil.transportOneRecord(recordSender,
column, sourceLine, nullFormat, taskPluginCollector);
}
} catch (IOException | InterruptedException e) {
String message = String.format("读取文件[%s]时出错", sourceRcFilePath);
LOG.error(message);
throw DataXException.asDataXException(HiveReaderErrorCode.READ_RCFILE_ERROR, message, e);
} finally {
try {
if (recordReader != null) {
recordReader.close();
LOG.info("Finally, Close RCFileRecordReader.");
}
} catch (IOException e) {
LOG.warn(String.format("finally: 关闭RCFileRecordReader失败, %s", e.getMessage()));
}
}
}
public void orcFileStartRead(String sourceOrcFilePath, Configuration readerSliceConfig,
RecordSender recordSender, TaskPluginCollector taskPluginCollector) {
LOG.info(String.format("Start Read orcfile [%s].", sourceOrcFilePath));
List<ColumnEntry> column = UnstructuredStorageReaderUtil
.getListColumnEntry(readerSliceConfig, com.alibaba.datax.plugin.unstructuredstorage.reader.Key.COLUMN);
String nullFormat = readerSliceConfig.getString(com.alibaba.datax.plugin.unstructuredstorage.reader.Key.NULL_FORMAT);
StringBuilder allColumns = new StringBuilder();
StringBuilder allColumnTypes = new StringBuilder();
boolean isReadAllColumns = false;
int columnIndexMax = -1;
// 判断是否读取所有列
if (null == column || column.size() == 0) {
int allColumnsCount = getAllColumnsCount(sourceOrcFilePath);
columnIndexMax = allColumnsCount - 1;
isReadAllColumns = true;
} else {
columnIndexMax = getMaxIndex(column);
}
for (int i = 0; i <= columnIndexMax; i++) {
allColumns.append("col");
allColumnTypes.append("string");
if (i != columnIndexMax) {
allColumns.append(",");
allColumnTypes.append(":");
}
}
if (columnIndexMax >= 0) {
JobConf conf = new JobConf(hadoopConf);
Path orcFilePath = new Path(sourceOrcFilePath);
Properties p = new Properties();
p.setProperty("columns", allColumns.toString());
p.setProperty("columns.types", allColumnTypes.toString());
try {
OrcSerde serde = new OrcSerde();
serde.initialize(conf, p);
StructObjectInspector inspector = (StructObjectInspector) serde.getObjectInspector();
InputFormat<?, ?> in = new OrcInputFormat();
FileInputFormat.setInputPaths(conf, orcFilePath.toString());
//If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds
//Each file as a split
//TODO multy threads
InputSplit[] splits = in.getSplits(conf, 1);
RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);//获取reader
Object key = reader.createKey();
Object value = reader.createValue();// OrcStruct
// 获取列信息
List<? extends StructField> fields = inspector.getAllStructFieldRefs();
List<Object> recordFields;
while (reader.next(key, value)) {//next 读取数据到 value(OrcStruct)
recordFields = new ArrayList<Object>();
for (int i = 0; i <= columnIndexMax; i++) {
Object field = inspector.getStructFieldData(value, fields.get(i));//从 OrcStruct 数组中 返回对应列 数据
recordFields.add(field);
}
transportOneRecord(column, recordFields, recordSender,
taskPluginCollector, isReadAllColumns, nullFormat);
}
reader.close();
} catch (Exception e) {
String message = String.format("从orcfile文件路径[%s]中读取数据发生异常,请联系系统管理员。"
, sourceOrcFilePath);
LOG.error(message);
throw DataXException.asDataXException(HiveReaderErrorCode.READ_FILE_ERROR, message);
}
} else {
String message = String.format("请确认您所读取的列配置正确!columnIndexMax 小于0,column:%s", JSON.toJSONString(column));
throw DataXException.asDataXException(HiveReaderErrorCode.BAD_CONFIG_VALUE, message);
}
}
private Record transportOneRecord(List<ColumnEntry> columnConfigs, List<Object> recordFields
, RecordSender recordSender, TaskPluginCollector taskPluginCollector, boolean isReadAllColumns, String nullFormat) {
Record record = recordSender.createRecord();
Column columnGenerated;
try {
if (isReadAllColumns) {
// 读取所有列,创建都为String类型的column
for (Object recordField : recordFields) {
String columnValue = null;
if (recordField != null) {
columnValue = recordField.toString();
}
columnGenerated = new StringColumn(columnValue);
record.addColumn(columnGenerated);
}
} else {
for (ColumnEntry columnConfig : columnConfigs) {
String columnType = columnConfig.getType();
Integer columnIndex = columnConfig.getIndex();
String columnConst = columnConfig.getValue();
String columnValue = null;
if (null != columnIndex) {
if (null != recordFields.get(columnIndex))
columnValue = recordFields.get(columnIndex).toString();
} else {
columnValue = columnConst;
}
Type type = Type.valueOf(columnType.toUpperCase());
// it's all ok if nullFormat is null
if (StringUtils.equals(columnValue, nullFormat)) {
columnValue = null;
}
switch (type) {
case STRING:
columnGenerated = new StringColumn(columnValue);
break;
case LONG:
try {
columnGenerated = new LongColumn(columnValue);
} catch (Exception e) {
throw new IllegalArgumentException(String.format(
"类型转换错误, 无法将[%s] 转换为[%s]", columnValue,
"LONG"));
}
break;
case DOUBLE:
try {
columnGenerated = new DoubleColumn(columnValue);
} catch (Exception e) {
throw new IllegalArgumentException(String.format(
"类型转换错误, 无法将[%s] 转换为[%s]", columnValue,
"DOUBLE"));
}
break;
case BOOLEAN:
try {
columnGenerated = new BoolColumn(columnValue);
} catch (Exception e) {
throw new IllegalArgumentException(String.format(
"类型转换错误, 无法将[%s] 转换为[%s]", columnValue,
"BOOLEAN"));
}
break;
case DATE:
try {
if (columnValue == null) {
columnGenerated = new DateColumn((Date) null);
} else {
String formatString = columnConfig.getFormat();
if (StringUtils.isNotBlank(formatString)) {
// 用户自己配置的格式转换
SimpleDateFormat format = new SimpleDateFormat(
formatString);
columnGenerated = new DateColumn(
format.parse(columnValue));
} else {
// 框架尝试转换
columnGenerated = new DateColumn(
new StringColumn(columnValue)
.asDate());
}
}
} catch (Exception e) {
throw new IllegalArgumentException(String.format(
"类型转换错误, 无法将[%s] 转换为[%s]", columnValue,
"DATE"));
}
break;
default:
String errorMessage = String.format(
"您配置的列类型暂不支持 : [%s]", columnType);
LOG.error(errorMessage);
throw DataXException
.asDataXException(
UnstructuredStorageReaderErrorCode.NOT_SUPPORT_TYPE,
errorMessage);
}
record.addColumn(columnGenerated);
}
}
recordSender.sendToWriter(record);
} catch (IllegalArgumentException iae) {
taskPluginCollector
.collectDirtyRecord(record, iae.getMessage());
} catch (IndexOutOfBoundsException ioe) {
taskPluginCollector
.collectDirtyRecord(record, ioe.getMessage());
} catch (Exception e) {
if (e instanceof DataXException) {
throw (DataXException) e;
}
// 每一种转换失败都是脏数据处理,包括数字格式 & 日期格式
taskPluginCollector.collectDirtyRecord(record, e.getMessage());
}
return record;
}
private int getAllColumnsCount(String filePath) {
Path path = new Path(filePath);
try {
Reader reader = OrcFile.createReader(path, OrcFile.readerOptions(hadoopConf));
return reader.getTypes().get(0).getSubtypesCount();
} catch (IOException e) {
String message = "读取orcfile column列数失败,请联系系统管理员";
throw DataXException.asDataXException(HiveReaderErrorCode.READ_FILE_ERROR, message);
}
}
private int getMaxIndex(List<ColumnEntry> columnConfigs) {
int maxIndex = -1;
for (ColumnEntry columnConfig : columnConfigs) {
Integer columnIndex = columnConfig.getIndex();
if (columnIndex != null && columnIndex < 0) {
String message = String.format("您column中配置的index不能小于0,请修改为正确的index,column配置:%s",
JSON.toJSONString(columnConfigs));
LOG.error(message);
throw DataXException.asDataXException(HiveReaderErrorCode.CONFIG_INVALID_EXCEPTION, message);
} else if (columnIndex != null && columnIndex > maxIndex) {
maxIndex = columnIndex;
}
}
return maxIndex;
}
private enum Type {
STRING, LONG, BOOLEAN, DOUBLE, DATE,
}
public boolean checkHdfsFileType(String filepath, String specifiedFileType) {
Path file = new Path(filepath);
try {
FileSystem fs = FileSystem.get(new URI(hadoopConf.get(HDFS_DEFAULTFS_KEY)),hadoopConf,username);
FSDataInputStream in = fs.open(file);
if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.CSV)
|| StringUtils.equalsIgnoreCase(specifiedFileType, Constant.TEXT)) {
boolean isORC = isORCFile(file, fs, in);// 判断是否是 ORC File
if (isORC) {
return false;
}
boolean isRC = isRCFile(filepath, in);// 判断是否是 RC File
if (isRC) {
return false;
}
boolean isSEQ = isSequenceFile(filepath, in);// 判断是否是 Sequence File
if (isSEQ) {
return false;
}
// 如果不是ORC,RC和SEQ,则默认为是TEXT或CSV类型
return !isORC && !isRC && !isSEQ;
} else if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.ORC)) {
return isORCFile(file, fs, in);
} else if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.RC)) {
return isRCFile(filepath, in);
} else if (StringUtils.equalsIgnoreCase(specifiedFileType, Constant.SEQ)) {
return isSequenceFile(filepath, in);
}
} catch (Exception e) {
String message = String.format("检查文件[%s]类型失败,目前支持ORC,SEQUENCE,RCFile,TEXT,CSV五种格式的文件," +
"请检查您文件类型和文件是否正确。", filepath);
LOG.error(message);
throw DataXException.asDataXException(HiveReaderErrorCode.READ_FILE_ERROR, message, e);
}
return false;
}
// 判断file是否是ORC File
private boolean isORCFile(Path file, FileSystem fs, FSDataInputStream in) {
try {
// figure out the size of the file using the option or filesystem
long size = fs.getFileStatus(file).getLen();
//read last bytes into buffer to get PostScript
int readSize = (int) Math.min(size, DIRECTORY_SIZE_GUESS);
in.seek(size - readSize);
ByteBuffer buffer = ByteBuffer.allocate(readSize);
in.readFully(buffer.array(), buffer.arrayOffset() + buffer.position(),
buffer.remaining());
//read the PostScript
//get length of PostScript
int psLen = buffer.get(readSize - 1) & 0xff;
int len = OrcFile.MAGIC.length();
if (psLen < len + 1) {
return false;
}
int offset = buffer.arrayOffset() + buffer.position() + buffer.limit() - 1
- len;
byte[] array = buffer.array();
// now look for the magic string at the end of the postscript.
if (Text.decode(array, offset, len).equals(OrcFile.MAGIC)) {
return true;
} else {
// If it isn't there, this may be the 0.11.0 version of ORC.
// Read the first 3 bytes of the file to check for the header
in.seek(0);
byte[] header = new byte[len];
in.readFully(header, 0, len);
// if it isn't there, this isn't an ORC file
if (Text.decode(header, 0, len).equals(OrcFile.MAGIC)) {
return true;
}
}
} catch (IOException e) {
LOG.info(String.format("检查文件类型: [%s] 不是ORC File.", file.toString()));
}
return false;
}
// 判断file是否是RC file
private boolean isRCFile(String filepath, FSDataInputStream in) {
// The first version of RCFile used the sequence file header.
final byte[] ORIGINAL_MAGIC = new byte[]{(byte) 'S', (byte) 'E', (byte) 'Q'};
// The 'magic' bytes at the beginning of the RCFile
final byte[] RC_MAGIC = new byte[]{(byte) 'R', (byte) 'C', (byte) 'F'};
// the version that was included with the original magic, which is mapped
// into ORIGINAL_VERSION
final byte ORIGINAL_MAGIC_VERSION_WITH_METADATA = 6;
// All of the versions should be place in this list.
final int ORIGINAL_VERSION = 0; // version with SEQ
final int NEW_MAGIC_VERSION = 1; // version with RCF
final int CURRENT_VERSION = NEW_MAGIC_VERSION;
byte version;
byte[] magic = new byte[RC_MAGIC.length];
try {
in.seek(0);
in.readFully(magic);
if (Arrays.equals(magic, ORIGINAL_MAGIC)) {
byte vers = in.readByte();
if (vers != ORIGINAL_MAGIC_VERSION_WITH_METADATA) {
return false;
}
version = ORIGINAL_VERSION;
} else {
if (!Arrays.equals(magic, RC_MAGIC)) {
return false;
}
// Set 'version'
version = in.readByte();
if (version > CURRENT_VERSION) {
return false;
}
}
if (version == ORIGINAL_VERSION) {
try {
Class<?> keyCls = hadoopConf.getClassByName(Text.readString(in));
Class<?> valCls = hadoopConf.getClassByName(Text.readString(in));
if (!keyCls.equals(RCFile.KeyBuffer.class)
|| !valCls.equals(RCFile.ValueBuffer.class)) {
return false;
}
} catch (ClassNotFoundException e) {
return false;
}
}
boolean decompress = in.readBoolean(); // is compressed?
if (version == ORIGINAL_VERSION) {
// is block-compressed? it should be always false.
boolean blkCompressed = in.readBoolean();
if (blkCompressed) {
return false;
}
}
return true;
} catch (IOException e) {
LOG.info(String.format("检查文件类型: [%s] 不是RC File.", filepath));
}
return false;
}
// 判断file是否是Sequence file
private boolean isSequenceFile(String filepath, FSDataInputStream in) {
byte[] SEQ_MAGIC = new byte[]{(byte) 'S', (byte) 'E', (byte) 'Q'};
byte[] magic = new byte[SEQ_MAGIC.length];
try {
in.seek(0);
in.readFully(magic);
if (Arrays.equals(magic, SEQ_MAGIC)) {
return true;
} else {
return false;
}
} catch (IOException e) {
LOG.info(String.format("检查文件类型: [%s] 不是Sequence File.", filepath));
}
return false;
}
}
HiveReader.class
package com.alibaba.datax.plugin.reader.hivereader;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.unstructuredstorage.reader.UnstructuredStorageReaderUtil;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.FastDateFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
public class HiveReader extends Reader {
/**
* Job 中的方法仅执行一次,Task 中方法会由框架启动多个 Task 线程并行执行。
* <p/>
* 整个 Reader 执行流程是:
* <pre>
* Job类init-->prepare-->split
*
* Task类init-->prepare-->startRead-->post-->destroy
* Task类init-->prepare-->startRead-->post-->destroy
*
* Job类post-->destroy
* </pre>
*/
public static class Job extends Reader.Job {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private Configuration readerOriginConfig = null;
@Override
public void init() {
LOG.info("init() begin...");
this.readerOriginConfig = super.getPluginJobConf();//获取配置文件信息{parameter 里面的参数}
this.validate();
LOG.info("init() ok and end...");
LOG.info("HiveReader流程说明[1:Reader的HiveQL导入临时表(TextFile无压缩的HDFS) ;2:临时表的HDFS到目标Writer;3:删除临时表]");
}
private void validate() {
this.readerOriginConfig.getNecessaryValue(Key.DEFAULT_FS,
HiveReaderErrorCode.DEFAULT_FS_NOT_FIND_ERROR);
List<String> sqls = this.readerOriginConfig.getList(Key.HIVE_SQL, String.class);
if (null == sqls || sqls.size() == 0) {
throw DataXException.asDataXException(
HiveReaderErrorCode.SQL_NOT_FIND_ERROR,
"您未配置hive sql");
}
//check Kerberos
Boolean haveKerberos = this.readerOriginConfig.getBool(Key.HAVE_KERBEROS, false);
if (haveKerberos) {
this.readerOriginConfig.getNecessaryValue(Key.KERBEROS_KEYTAB_FILE_PATH, HiveReaderErrorCode.REQUIRED_VALUE);
this.readerOriginConfig.getNecessaryValue(Key.KERBEROS_PRINCIPAL, HiveReaderErrorCode.REQUIRED_VALUE);
}
}
@Override
public List<Configuration> split(int adviceNumber) {
//按照Hive sql的个数 获取配置文件的个数
LOG.info("split() begin...");
List<String> sqls = this.readerOriginConfig.getList(Key.HIVE_SQL, String.class);
List<Configuration> readerSplitConfigs = new ArrayList<Configuration>();
Configuration splitedConfig = null;
for (String querySql : sqls) {
splitedConfig = this.readerOriginConfig.clone();
splitedConfig.set(Key.HIVE_SQL, querySql);
readerSplitConfigs.add(splitedConfig);
}
return readerSplitConfigs;
}
//全局post
@Override
public void post() {
LOG.info("任务执行完毕,hive reader post");
}
@Override
public void destroy() {
}
}
public static class Task extends Reader.Task {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private Configuration taskConfig;
private String hiveSql;
private String hiveJdbcUrl;
private String username;
private String password;
private String tmpPath;
private String tableName;
private String tempDatabase;
private String tempHdfsLocation;
// private String hive_cmd;
private String hive_sql_set;
private String fieldDelimiter;
private String nullFormat;
private String hive_fieldDelimiter;
private DFSUtil dfsUtil = null;
private HashSet<String> sourceFiles;
@Override
public void init() {
this.tableName = hiveTableName();
//获取配置
this.taskConfig = super.getPluginJobConf();//获取job 分割后的每一个任务单独的配置文件
this.hiveSql = taskConfig.getString(Key.HIVE_SQL);//获取hive sql
this.hiveJdbcUrl = taskConfig.getString(Key.HIVE_JDBC_URL);//获取hive jdbcUrl
this.username = taskConfig.getString(Key.USERNAME);//获取hive 用户名
this.password = taskConfig.getString(Key.PASSWORD);//获取hive 密码
this.tempDatabase = taskConfig.getString(Key.TEMP_DATABASE, Constant.TEMP_DATABASE_DEFAULT);// 临时表的数据库
this.tempHdfsLocation = taskConfig.getString(Key.TEMP_DATABASE_HDFS_LOCATION,
Constant.TEMP_DATABSE_HDFS_LOCATION_DEFAULT.replace("{username}", this.username));// 临时表的数据库路径
// this.hive_cmd = taskConfig.getString(Key.HIVE_CMD, Constant.HIVE_CMD_DEFAULT);
this.hive_sql_set = taskConfig.getString(Key.HIVE_SQL_SET, Constant.HIVE_SQL_SET_DEFAULT);
//判断set语句的结尾是否是分号,不是给加一个
if (!this.hive_sql_set.trim().endsWith(";")) {
this.hive_sql_set = this.hive_sql_set + ";";
}
this.fieldDelimiter = taskConfig.getString(Key.FIELDDELIMITER, Constant.FIELDDELIMITER_DEFAULT);
this.hive_fieldDelimiter = this.fieldDelimiter;
this.fieldDelimiter = StringEscapeUtils.unescapeJava(this.fieldDelimiter);
this.taskConfig.set(Key.FIELDDELIMITER, this.fieldDelimiter);//设置hive 存储文件 hdfs默认的分隔符,传输时候会分隔
this.nullFormat = taskConfig.getString(Key.NULL_FORMAT, Constant.NULL_FORMAT_DEFAULT);
this.taskConfig.set(Key.NULL_FORMAT, this.nullFormat);
//判断set语句的结尾是否是分号,不是给加一个
if (!this.tempHdfsLocation.trim().endsWith("/")) {
this.tempHdfsLocation = this.tempHdfsLocation + "/";
}
this.tmpPath = this.tempHdfsLocation + this.tableName;//创建临时Hive表 存储地址
LOG.info("配置分隔符后:" + this.taskConfig.toJSON());
this.dfsUtil = new DFSUtil(this.taskConfig);//初始化工具类
}
@Override
public void prepare() {
//创建临时Hive表,指定存储地址
String hiveQueryCmd = this.hive_sql_set + " use " + this.tempDatabase + "; create table "
+ this.tableName + " ROW FORMAT DELIMITED FIELDS TERMINATED BY '" + this.hive_fieldDelimiter
+ "' STORED AS TEXTFILE "
+ " as " + this.hiveSql;
LOG.info("hiveCmd ----> :" + hiveQueryCmd);
// String[] cmd = new String[]{this.hive_cmd, "-e", "\"" + hiveQueryCmd + " \""};
// LOG.info(cmd.toString());
//执行脚本,创建临时表
if (!HiveServer2ConnectUtil.execHiveSql(this.username, this.password, hiveQueryCmd, this.hiveJdbcUrl)) {
throw DataXException.asDataXException(
HiveReaderErrorCode.SHELL_ERROR,
"创建hive临时表脚本执行失败");
}
// if (!ShellUtil.exec(new String[]{this.hive_cmd, " -e", "\"" + hiveQueryCmd + " \""})) {
// throw DataXException.asDataXException(
// HiveReaderErrorCode.SHELL_ERROR,
// "创建hive临时表脚本执行失败");
// }
LOG.info("创建hive 临时表结束 end!!!");
LOG.info("prepare(), start to getAllFiles...");
List<String> path = new ArrayList<String>();
path.add(tmpPath);
this.sourceFiles = dfsUtil.getAllFiles(path, Constant.TEXT);
LOG.info(String.format("您即将读取的文件数为: [%s], 列表为: [%s]",
this.sourceFiles.size(),
StringUtils.join(this.sourceFiles, ",")));
}
@Override
public void startRead(RecordSender recordSender) {
//读取临时hive表的hdfs文件
LOG.info("read start");
for (String sourceFile : this.sourceFiles) {
LOG.info(String.format("reading file : [%s]", sourceFile));
//默认读取的是TEXT文件格式
InputStream inputStream = dfsUtil.getInputStream(sourceFile);
UnstructuredStorageReaderUtil.readFromStream(inputStream, sourceFile, this.taskConfig,
recordSender, this.getTaskPluginCollector());
if (recordSender != null) {
recordSender.flush();
}
}
LOG.info("end read source files...");
}
//只是局部post 属于每个task
@Override
public void post() {
LOG.info("one task hive read post...");
deleteTmpTable();
}
private void deleteTmpTable() {
String hiveCmd = this.hive_sql_set + " use " + this.tempDatabase + "; drop table if exists " + this.tableName;
LOG.info("清空数据:hiveCmd ----> :" + hiveCmd);
//执行脚本,删除临时表
if (!HiveServer2ConnectUtil.execHiveSql(this.username, this.password, hiveCmd, this.hiveJdbcUrl)) {
throw DataXException.asDataXException(
HiveReaderErrorCode.SHELL_ERROR,
"删除hive临时表脚本执行失败");
}
// if (!ShellUtil.exec(new String[]{this.hive_cmd, "-e", "\"" + hiveCmd + "\""})) {
// throw DataXException.asDataXException(
// HiveReaderErrorCode.SHELL_ERROR,
// "删除hive临时表脚本执行失败");
// }
}
@Override
public void destroy() {
LOG.info("hive read destroy...");
}
//创建hive临时表名称
private String hiveTableName() {
StringBuilder str = new StringBuilder();
FastDateFormat fdf = FastDateFormat.getInstance("yyyyMMdd");
str.append(Constant.TEMP_TABLE_NAME_PREFIX)
.append(fdf.format(new Date()))
.append("_")
.append(System.currentTimeMillis());
// .append("_").append(KeyUtil.genUniqueKey());
return str.toString().toLowerCase();
}
}
}
HiveReaderErrorCode.class
package com.alibaba.datax.plugin.reader.hivereader;
import com.alibaba.datax.common.spi.ErrorCode;
public enum HiveReaderErrorCode implements ErrorCode {
BAD_CONFIG_VALUE("HiveReader-00", "您配置的值不合法."),
SQL_NOT_FIND_ERROR("HiveReader-01", "您未配置hive sql"),
DEFAULT_FS_NOT_FIND_ERROR("HiveReader-02", "您未配置defaultFS值"),
ILLEGAL_VALUE("HiveReader-03", "值错误"),
CONFIG_INVALID_EXCEPTION("HiveReader-04", "参数配置错误"),
REQUIRED_VALUE("HiveReader-05", "您缺失了必须填写的参数值."),
SHELL_ERROR("HiveReader-06", "hive 脚本执行失败."),
PATH_CONFIG_ERROR("HdfsReader-09", "您配置的path格式有误"),
READ_FILE_ERROR("HdfsReader-10", "读取文件出错"),
FILE_TYPE_UNSUPPORT("HdfsReader-12", "文件类型目前不支持"),
KERBEROS_LOGIN_ERROR("HdfsReader-13", "KERBEROS认证失败"),
READ_SEQUENCEFILE_ERROR("HdfsReader-14", "读取SequenceFile文件出错"),
READ_RCFILE_ERROR("HdfsReader-15", "读取RCFile文件出错"),;
;
private final String code;
private final String description;
private HiveReaderErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s]. ", this.code,
this.description);
}
}
HiveServer2ConnectUtil.class
package com.alibaba.datax.plugin.reader.hivereader;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
public class HiveServer2ConnectUtil {
private static final Logger LOG = LoggerFactory.getLogger(HiveServer2ConnectUtil.class);
/**
* @param args
* @throws SQLException
*/
public static void main(String[] args) {
execHiveSql("hive", null,
"; use default; create table tmp_datax_hivereader_20220808_1659953092709 ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\u0001' STORED AS TEXTFILE as select id,username,password from default.t_user;",
"jdbc:hive2://10.252.92.4:10000");
}
/**
* hive执行多个sql
*
* @param username
* @param password
* @param hiveSql
* @param hiveJdbcUrl
* @return
*/
public static boolean execHiveSql(String username, String password, String hiveSql, String hiveJdbcUrl) {
try {
Class.forName("org.apache.hive.jdbc.HiveDriver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
System.exit(1);
}
try {
LOG.info("hiveJdbcUrl:{}", hiveJdbcUrl);
LOG.info("username:{}", username);
LOG.info("password:{}", password);
Connection conn = DriverManager.getConnection(hiveJdbcUrl, username, password);
Statement stmt = conn.createStatement();
String[] hiveSqls = hiveSql.split(";");
for (int i = 0; i < hiveSqls.length; i++) {
if (StringUtils.isNotEmpty(hiveSqls[i])) {
stmt.execute(hiveSqls[i]);
}
}
return true;
} catch (SQLException sqlException) {
LOG.error(sqlException.getMessage(), sqlException);
return false;
}
}
}
Key.class
package com.alibaba.datax.plugin.reader.hivereader;
public class Key {
/**
* 1.必选:hiveSql,defaultFS
* 2.可选(有缺省值):
* tempDatabase(default)
* tempHdfsLocation(/tmp/hive/)
* hive_cmd(hive)
* fieldDelimiter(\u0001)
* 3.可选(无缺省值):hive_sql_set
*/
public final static String DEFAULT_FS = "defaultFS";
// reader执行的hiveSql语句
public final static String HIVE_SQL = "hiveSql";
// hive的Jdbc链接
public final static String HIVE_JDBC_URL = "hiveJdbcUrl";
// hive的用户名
public final static String USERNAME = "username";
// hive的密码
public final static String PASSWORD = "password";
// 临时表所在的数据库名称
public final static String TEMP_DATABASE = "tempDatabase";
// 临时标存放的HDFS目录
public final static String TEMP_DATABASE_HDFS_LOCATION = "tempDatabasePath";
// hive -e命令
public final static String HIVE_CMD = "hive_cmd";
public final static String HIVE_SQL_SET = "hive_sql_set";
// 存储文件 hdfs默认的分隔符
public final static String FIELDDELIMITER = "fieldDelimiter";
public static final String NULL_FORMAT = "nullFormat";
public static final String HADOOP_CONFIG = "hadoopConfig";
public static final String HAVE_KERBEROS = "haveKerberos";
public static final String KERBEROS_KEYTAB_FILE_PATH = "kerberosKeytabFilePath";
public static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hivereader</artifactId>
<properties>
<hive.version>2.1.1</hive.version>
<hadoop.version>2.7.1</hadoop.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-serde</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-service</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-common</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive.hcatalog</groupId>
<artifactId>hive-hcatalog-core</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-unstructured-storage-util</artifactId>
<version>${datax-project-version}</version>
</dependency>
<!--<!– https://mvnrepository.com/artifact/org.pentaho/pentaho-aggdesigner-algorithm –>-->
<!--<dependency>-->
<!--<groupId>org.pentaho</groupId>-->
<!--<artifactId>pentaho-aggdesigner-algorithm</artifactId>-->
<!--<version>5.1.5-jhyde</version>-->
<!--<!–<scope>test</scope>–>-->
<!--</dependency>-->
<!--<!– https://mvnrepository.com/artifact/eigenbase/eigenbase-properties –>-->
<!--<dependency>-->
<!--<groupId>com.grgbanking.eigenbase</groupId>-->
<!--<artifactId>eigenbase</artifactId>-->
<!--<version>1.1.7</version>-->
<!--</dependency>-->
</dependencies>
<build>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<!-- assembly plugin -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
plugin_job_template.json
{
"name": "hivereader",
"parameter": {
"defaultFS": "hdfs://:",
"hiveJdbcUrl": "jdbc:hive2://",
"username": "hive",
"hiveSql": [
"select id,username,password from default.t_user;"
]
}
}
4、新建hivewriter模块
项目结构
package.xml
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
</includes>
<outputDirectory>plugin/writer/hivewriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>hivewriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/hivewriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/hivewriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>
Constans.class
package com.alibaba.datax.plugin.writer.hivewriter;
public class Constants {
public static final String TEMP_TABLE_NAME_PREFIX_DEFAULT="tmp_datax_hivewriter_";
public final static String HIVE_CMD_DEFAULT = "hive";
public final static String HIVE_SQL_SET_DEFAULT = "";
public final static String HIVE_TARGET_TABLE_COMPRESS_SQL= "";
public static final String WRITE_MODE_DEFAULT="insert";
public final static String HIVE_PRESQL_DEFAULT = "";
public final static String HIVE_POSTSQL_DEFAULT = "";
public static final String INSERT_PRE_SQL="SET hive.exec.dynamic.partition=true;"
+"SET hive.exec.dynamic.partition.mode=nonstrict;"
+"SET hive.exec.max.dynamic.partitions.pernode=100000;"
+"SET hive.exec.max.dynamic.partitions=100000;";
public final static String FIELDDELIMITER_DEFAULT = "\\u0001";
public final static String COMPRESS_DEFAULT="gzip";
// 此默认值,暂无使用
public static final String DEFAULT_NULL_FORMAT = "\\N";
}
HdfsHelper.class
package com.alibaba.datax.plugin.writer.hivewriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat;
import java.util.*;
public class HdfsHelper {
public static final Logger LOG = LoggerFactory.getLogger(HiveWriter.Job.class);
public FileSystem fileSystem = null;
public JobConf conf = null;
public org.apache.hadoop.conf.Configuration hadoopConf = null;
public static final String HADOOP_SECURITY_AUTHENTICATION_KEY = "hadoop.security.authentication";
public static final String HDFS_DEFAULTFS_KEY = "fs.defaultFS";
private String username = null;
// Kerberos
private Boolean haveKerberos = false;
private String kerberosKeytabFilePath;
private String kerberosPrincipal;
public void getFileSystem(String defaultFS, Configuration taskConfig) {
hadoopConf = new org.apache.hadoop.conf.Configuration();
Configuration hadoopSiteParams = taskConfig.getConfiguration(Key.HADOOP_CONFIG);
JSONObject hadoopSiteParamsAsJsonObject = JSON.parseObject(taskConfig.getString(Key.HADOOP_CONFIG));
if (null != hadoopSiteParams) {
Set<String> paramKeys = hadoopSiteParams.getKeys();
for (String each : paramKeys) {
hadoopConf.set(each, hadoopSiteParamsAsJsonObject.getString(each));
}
}
hadoopConf.set(HDFS_DEFAULTFS_KEY, defaultFS);
this.username = taskConfig.getString(Key.USERNAME);
System.setProperty("HADOOP_USER_NAME", this.username);
//是否有Kerberos认证
this.haveKerberos = taskConfig.getBool(Key.HAVE_KERBEROS, false);
if (haveKerberos) {
this.kerberosKeytabFilePath = taskConfig.getString(Key.KERBEROS_KEYTAB_FILE_PATH);
this.kerberosPrincipal = taskConfig.getString(Key.KERBEROS_PRINCIPAL);
hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, "kerberos");
}
this.kerberosAuthentication(this.kerberosPrincipal, this.kerberosKeytabFilePath);
conf = new JobConf(hadoopConf);
conf.setUser(this.username);
try {
LOG.info("defaultFS:{},user:{}", defaultFS, this.username);
fileSystem = FileSystem.get(new URI(hadoopConf.get(HDFS_DEFAULTFS_KEY)), conf, this.username);
} catch (IOException e) {
String message = String.format("获取FileSystem时发生网络IO异常,请检查您的网络是否正常!HDFS地址:[%s]",
"message:defaultFS =" + defaultFS);
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
} catch (Exception e) {
String message = String.format("获取FileSystem失败,请检查HDFS地址是否正确: [%s]",
"message:defaultFS =" + defaultFS);
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
}
if (null == fileSystem || null == conf) {
String message = String.format("获取FileSystem失败,请检查HDFS地址是否正确: [%s]",
"message:defaultFS =" + defaultFS);
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.CONNECT_HDFS_IO_ERROR, message);
}
}
private void kerberosAuthentication(String kerberosPrincipal, String kerberosKeytabFilePath) {
if (haveKerberos && StringUtils.isNotBlank(this.kerberosPrincipal) && StringUtils.isNotBlank(this.kerberosKeytabFilePath)) {
UserGroupInformation.setConfiguration(this.hadoopConf);
try {
UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
} catch (Exception e) {
String message = String.format("kerberos认证失败,请确定kerberosKeytabFilePath[%s]和kerberosPrincipal[%s]填写正确",
kerberosKeytabFilePath, kerberosPrincipal);
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.KERBEROS_LOGIN_ERROR, e);
}
}
}
/**
* 获取指定目录先的文件列表
*
* @param dir
* @return 拿到的是文件全路径,
*/
public String[] hdfsDirList(String dir) {
Path path = new Path(dir);
String[] files = null;
try {
FileStatus[] status = fileSystem.listStatus(path);
files = new String[status.length];
for(int i=0;i<status.length;i++){
files[i] = status[i].getPath().toString();
}
} catch (IOException e) {
String message = String.format("获取目录[%s]文件列表时发生网络IO异常,请检查您的网络是否正常!", dir);
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
}
return files;
}
/**
* 获取以fileName__ 开头的文件列表
*
* @param dir
* @param fileName
* @return
*/
public Path[] hdfsDirList(String dir, String fileName) {
Path path = new Path(dir);
Path[] files = null;
String filterFileName = fileName + "__*";
try {
PathFilter pathFilter = new GlobFilter(filterFileName);
FileStatus[] status = fileSystem.listStatus(path,pathFilter);
files = new Path[status.length];
for(int i=0;i<status.length;i++){
files[i] = status[i].getPath();
}
} catch (IOException e) {
String message = String.format("获取目录[%s]下文件名以[%s]开头的文件列表时发生网络IO异常,请检查您的网络是否正常!",
dir,fileName);
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
}
return files;
}
public boolean isPathexists(String filePath) {
Path path = new Path(filePath);
boolean exist = false;
try {
exist = fileSystem.exists(path);
} catch (IOException e) {
String message = String.format("判断文件路径[%s]是否存在时发生网络IO异常,请检查您的网络是否正常!",
"message:filePath =" + filePath);
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
}
return exist;
}
public boolean isPathDir(String filePath) {
Path path = new Path(filePath);
boolean isDir = false;
try {
isDir = fileSystem.isDirectory(path);
} catch (IOException e) {
String message = String.format("判断路径[%s]是否是目录时发生网络IO异常,请检查您的网络是否正常!", filePath);
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
}
return isDir;
}
public void deleteFiles(Path[] paths){
for(int i=0;i<paths.length;i++){
LOG.info(String.format("delete file [%s].", paths[i].toString()));
try {
fileSystem.delete(paths[i],true);
} catch (IOException e) {
String message = String.format("删除文件[%s]时发生IO异常,请检查您的网络是否正常!",
paths[i].toString());
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
}
}
}
public void deleteDir(Path path){
LOG.info(String.format("start delete tmp dir [%s] .",path.toString()));
try {
if(isPathexists(path.toString())) {
fileSystem.delete(path, true);
}
} catch (Exception e) {
String message = String.format("删除临时目录[%s]时发生IO异常,请检查您的网络是否正常!", path.toString());
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
}
LOG.info(String.format("finish delete tmp dir [%s] .",path.toString()));
}
public void renameFile(HashSet<String> tmpFiles, HashSet<String> endFiles) {
Path tmpFilesParent = null;
if (tmpFiles.size() != endFiles.size()) {
String message = String.format("临时目录下文件名个数与目标文件名个数不一致!");
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.HDFS_RENAME_FILE_ERROR, message);
} else {
try {
for (Iterator it1 = tmpFiles.iterator(), it2 = endFiles.iterator(); it1.hasNext() && it2.hasNext(); ) {
String srcFile = it1.next().toString();
String dstFile = it2.next().toString();
Path srcFilePah = new Path(srcFile);
Path dstFilePah = new Path(dstFile);
if (tmpFilesParent == null) {
tmpFilesParent = srcFilePah.getParent();
}
LOG.info(String.format("start rename file [%s] to file [%s].", srcFile, dstFile));
boolean renameTag = false;
long fileLen = fileSystem.getFileStatus(srcFilePah).getLen();
if (fileLen > 0) {
renameTag = fileSystem.rename(srcFilePah, dstFilePah);
if (!renameTag) {
String message = String.format("重命名文件[%s]失败,请检查您的网络是否正常!", srcFile);
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.HDFS_RENAME_FILE_ERROR, message);
}
LOG.info(String.format("finish rename file [%s] to file [%s].", srcFile, dstFile));
} else {
LOG.info(String.format("文件[%s]内容为空,请检查写入是否正常!", srcFile));
}
}
} catch (Exception e) {
String message = String.format("重命名文件时发生异常,请检查您的网络是否正常!");
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
} finally {
deleteDir(tmpFilesParent);
}
}
}
//关闭FileSystem
public void closeFileSystem(){
try {
fileSystem.close();
} catch (IOException e) {
String message = String.format("关闭FileSystem时发生IO异常,请检查您的网络是否正常!");
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.CONNECT_HDFS_IO_ERROR, e);
}
}
//textfile格式文件
public FSDataOutputStream getOutputStream(String path){
Path storePath = new Path(path);
FSDataOutputStream fSDataOutputStream = null;
try {
fSDataOutputStream = fileSystem.create(storePath);
} catch (IOException e) {
String message = String.format("Create an FSDataOutputStream at the indicated Path[%s] failed: [%s]",
"message:path =" + path);
LOG.error(message);
throw DataXException.asDataXException(HiveWriterErrorCode.Write_FILE_IO_ERROR, e);
}
return fSDataOutputStream;
}
/**
* 写textfile类型文件
*
* @param lineReceiver
* @param config
* @param fileName
* @param taskPluginCollector
*/
public void textFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,
TaskPluginCollector taskPluginCollector) {
char fieldDelimiter = config.getChar(Key.FIELD_DELIMITER);
List<Configuration> columns = config.getListConfiguration(Key.COLUMN);
String compress = config.getString(Key.COMPRESS, null);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm");
String attempt = "attempt_" + dateFormat.format(new Date()) + "_0001_m_000000_0";
Path outputPath = new Path(fileName);
//todo 需要进一步确定TASK_ATTEMPT_ID
conf.set(JobContext.TASK_ATTEMPT_ID, attempt);
FileOutputFormat outFormat = new TextOutputFormat();
outFormat.setOutputPath(conf, outputPath);
outFormat.setWorkOutputPath(conf, outputPath);
if (null != compress) {
Class<? extends CompressionCodec> codecClass = getCompressCodec(compress);
if (null != codecClass) {
outFormat.setOutputCompressorClass(conf, codecClass);
}
}
try {
RecordWriter writer = outFormat.getRecordWriter(fileSystem, conf, outputPath.toString(), Reporter.NULL);
Record record = null;
while ((record = lineReceiver.getFromReader()) != null) {
MutablePair<Text, Boolean> transportResult = transportOneRecord(record, fieldDelimiter, columns, taskPluginCollector,config);
if (!transportResult.getRight()) {
writer.write(NullWritable.get(),transportResult.getLeft());
}
}
writer.close(Reporter.NULL);
} catch (Exception e) {
String message = String.format("写文件文件[%s]时发生IO异常,请检查您的网络是否正常!", fileName);
LOG.error(message);
Path path = new Path(fileName);
deleteDir(path.getParent());
throw DataXException.asDataXException(HiveWriterErrorCode.Write_FILE_IO_ERROR, e);
}
}
public static MutablePair<Text, Boolean> transportOneRecord(
Record record, char fieldDelimiter, List<Configuration> columnsConfiguration, TaskPluginCollector taskPluginCollector, Configuration config) {
MutablePair<List<Object>, Boolean> transportResultList = transportOneRecord(record, columnsConfiguration, taskPluginCollector, config);
//保存<转换后的数据,是否是脏数据>
MutablePair<Text, Boolean> transportResult = new MutablePair<Text, Boolean>();
transportResult.setRight(false);
if (null != transportResultList) {
Text recordResult = new Text(StringUtils.join(transportResultList.getLeft(), fieldDelimiter));
transportResult.setRight(transportResultList.getRight());
transportResult.setLeft(recordResult);
}
return transportResult;
}
public Class<? extends CompressionCodec> getCompressCodec(String compress) {
Class<? extends CompressionCodec> codecClass = null;
if (null == compress) {
codecClass = null;
} else if ("GZIP".equalsIgnoreCase(compress)) {
codecClass = org.apache.hadoop.io.compress.GzipCodec.class;
} else if ("BZIP2".equalsIgnoreCase(compress)) {
codecClass = org.apache.hadoop.io.compress.BZip2Codec.class;
} else if ("SNAPPY".equalsIgnoreCase(compress)) {
//todo 等需求明确后支持 需要用户安装SnappyCodec
codecClass = org.apache.hadoop.io.compress.SnappyCodec.class;
// org.apache.hadoop.hive.ql.io.orc.ZlibCodec.class not public
//codecClass = org.apache.hadoop.hive.ql.io.orc.ZlibCodec.class;
} else {
throw DataXException.asDataXException(HiveWriterErrorCode.ILLEGAL_VALUE,
String.format("目前不支持您配置的 compress 模式 : [%s]", compress));
}
return codecClass;
}
/**
* 写orcfile类型文件
*
* @param lineReceiver
* @param config
* @param fileName
* @param taskPluginCollector
*/
public void orcFileStartWrite(RecordReceiver lineReceiver, Configuration config, String fileName,
TaskPluginCollector taskPluginCollector) {
List<Configuration> columns = config.getListConfiguration(Key.COLUMN);
String compress = config.getString(Key.COMPRESS, null);
List<String> columnNames = getColumnNames(columns);
List<ObjectInspector> columnTypeInspectors = getColumnTypeInspectors(columns);
StructObjectInspector inspector = (StructObjectInspector) ObjectInspectorFactory
.getStandardStructObjectInspector(columnNames, columnTypeInspectors);
OrcSerde orcSerde = new OrcSerde();
FileOutputFormat outFormat = new OrcOutputFormat();
if (!"NONE".equalsIgnoreCase(compress) && null != compress) {
Class<? extends CompressionCodec> codecClass = getCompressCodec(compress);
if (null != codecClass) {
outFormat.setOutputCompressorClass(conf, codecClass);
}
}
try {
RecordWriter writer = outFormat.getRecordWriter(fileSystem, conf, fileName, Reporter.NULL);
Record record = null;
while ((record = lineReceiver.getFromReader()) != null) {
MutablePair<List<Object>, Boolean> transportResult = transportOneRecord(record, columns, taskPluginCollector, config);
if (!transportResult.getRight()) {
writer.write(NullWritable.get(), orcSerde.serialize(transportResult.getLeft(), inspector));
}
}
writer.close(Reporter.NULL);
} catch (Exception e) {
String message = String.format("写文件文件[%s]时发生IO异常,请检查您的网络是否正常!", fileName);
LOG.error(message);
Path path = new Path(fileName);
deleteDir(path.getParent());
throw DataXException.asDataXException(HiveWriterErrorCode.Write_FILE_IO_ERROR, e);
}
}
public List<String> getColumnNames(List<Configuration> columns) {
List<String> columnNames = Lists.newArrayList();
for (Configuration eachColumnConf : columns) {
columnNames.add(eachColumnConf.getString(Key.NAME));
}
return columnNames;
}
/**
* id int,
* username string,
* telephone string,
* mail string,
* day string
*/
public String getColumnInfo(List<Configuration> columns) {
StringBuilder str = new StringBuilder();
List<String> columnNames = Lists.newArrayList();
for (int i = 0; i < columns.size(); i++) {
Configuration eachColumnConf = columns.get(i);
String name = eachColumnConf.getString(Key.NAME);//列名称
String type = eachColumnConf.getString(Key.TYPE);//列类型
str.append(name).append(" ").append(type);
if (i != (columns.size() - 1)) {
str.append(",");
}
}
return str.toString();
}
/**
* By LingZhy on 2021/4/29 16:58
*
* @return java.lang.String
* @description
* @params * @param columns
*/
public String getColumnName(List<Configuration> columns) {
StringBuilder str = new StringBuilder();
List<String> list = Lists.newArrayList();
for (int i = 0; i < columns.size(); i++) {
Configuration eachColumnConf = columns.get(i);
String name = eachColumnConf.getString(Key.NAME).toLowerCase();
list.add(name);
}
return String.join(",", list);
}
/**
* 根据writer配置的字段类型,构建inspector
*
* @param columns
* @return
*/
public List<ObjectInspector> getColumnTypeInspectors(List<Configuration> columns) {
List<ObjectInspector> columnTypeInspectors = Lists.newArrayList();
for (Configuration eachColumnConf : columns) {
SupportHiveDataType columnType = SupportHiveDataType.valueOf(eachColumnConf.getString(Key.TYPE).toUpperCase());
ObjectInspector objectInspector = null;
switch (columnType) {
case TINYINT:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Byte.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
case SMALLINT:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Short.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
case INT:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Integer.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
case BIGINT:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Long.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
case FLOAT:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Float.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
case DOUBLE:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Double.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
case TIMESTAMP:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(java.sql.Timestamp.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
case DATE:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(java.sql.Date.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
case STRING:
case VARCHAR:
case CHAR:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(String.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
case BOOLEAN:
objectInspector = ObjectInspectorFactory.getReflectionObjectInspector(Boolean.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
break;
default:
throw DataXException
.asDataXException(
HiveWriterErrorCode.ILLEGAL_VALUE,
String.format(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d]. 请修改表中该字段的类型或者不同步该字段.",
eachColumnConf.getString(Key.NAME),
eachColumnConf.getString(Key.TYPE)));
}
columnTypeInspectors.add(objectInspector);
}
return columnTypeInspectors;
}
public OrcSerde getOrcSerde(Configuration config) {
String fieldDelimiter = config.getString(Key.FIELD_DELIMITER);
String compress = config.getString(Key.COMPRESS);
String encoding = config.getString(Key.ENCODING);
OrcSerde orcSerde = new OrcSerde();
Properties properties = new Properties();
properties.setProperty("orc.bloom.filter.columns", fieldDelimiter);
properties.setProperty("orc.compress", compress);
properties.setProperty("orc.encoding.strategy", encoding);
orcSerde.initialize(conf, properties);
return orcSerde;
}
public static MutablePair<List<Object>, Boolean> transportOneRecord(
Record record, List<Configuration> columnsConfiguration,
TaskPluginCollector taskPluginCollector, Configuration config) {
MutablePair<List<Object>, Boolean> transportResult = new MutablePair<List<Object>, Boolean>();
transportResult.setRight(false);
List<Object> recordList = Lists.newArrayList();
int recordLength = record.getColumnNumber();
if (0 != recordLength) {
Column column;
for (int i = 0; i < recordLength; i++) {
column = record.getColumn(i);
//todo as method
if (null != column.getRawData()) {
String rowData = column.getRawData().toString();
SupportHiveDataType columnType = SupportHiveDataType.valueOf(
columnsConfiguration.get(i).getString(Key.TYPE).toUpperCase());
//根据writer端类型配置做类型转换
try {
switch (columnType) {
case TINYINT:
recordList.add(Byte.valueOf(rowData));
break;
case SMALLINT:
recordList.add(Short.valueOf(rowData));
break;
case INT:
recordList.add(Integer.valueOf(rowData));
break;
case BIGINT:
recordList.add(column.asLong());
break;
case FLOAT:
recordList.add(Float.valueOf(rowData));
break;
case DOUBLE:
recordList.add(column.asDouble());
break;
case STRING:
case VARCHAR:
case CHAR:
recordList.add(column.asString());
break;
case BOOLEAN:
recordList.add(column.asBoolean());
break;
case DATE:
recordList.add(new java.sql.Date(column.asDate().getTime()));
break;
case TIMESTAMP:
recordList.add(new java.sql.Timestamp(column.asDate().getTime()));
break;
default:
throw DataXException
.asDataXException(
HiveWriterErrorCode.ILLEGAL_VALUE,
String.format(
"您的配置文件中的列配置信息有误. 因为DataX 不支持数据库写入这种字段类型. 字段名:[%s], 字段类型:[%d]. 请修改表中该字段的类型或者不同步该字段.",
columnsConfiguration.get(i).getString(Key.NAME),
columnsConfiguration.get(i).getString(Key.TYPE)));
}
} catch (Exception e) {
// warn: 此处认为脏数据
String message = String.format(
"字段类型转换错误:你目标字段为[%s]类型,实际字段值为[%s].",
columnsConfiguration.get(i).getString(Key.TYPE), column.getRawData().toString());
taskPluginCollector.collectDirtyRecord(record, message);
transportResult.setRight(true);
break;
}
} else {
// warn: it's all ok if nullFormat is null
//recordList.add(null);
// fix 写入hdfs的text格式时,需要指定NULL为\N
String nullFormat = config.getString(Key.NULL_FORMAT);
if (nullFormat == null) {
recordList.add(null);
} else {
recordList.add(nullFormat);
}
}
}
}
transportResult.setLeft(recordList);
return transportResult;
}
}
作者:Jeebiz 创建时间:2022-10-16 01:30
更新时间:2024-07-10 22:56
更新时间:2024-07-10 22:56