https://blog.csdn.net/Carson073/article/details/126728411

参考博客:

Datax 二次开发插件详细过程键盘上的艺术家w的博客-CSDN博客datax kafkareader

简书-DataX kafkawriter

背景

基于阿里开源DataX3.0版本,开发kafka的读写驱动,可以实现从mysql、postgresql抽取数据到kafka,从kafka 消费消息写入hdfs等功能。

1、整体模块代码结构

1、kafkareader

2、kafkawriter

3、package.xml

            <fileSet>
                <directory>kafkareader/target/datax/</directory>
                <includes>
                    <include>**/*.*</include>
                </includes>
                <outputDirectory>datax</outputDirectory>
            </fileSet>        
            <fileSet>
                <directory>kafkawriter/target/datax/</directory>
                <includes>
                    <include>**/*.*</include>
                </includes>
                <outputDirectory>datax</outputDirectory>
            </fileSet>

4、pom.xml

<module>kafkareader</module>
<module>kafkawriter</module>

2、kafkareader模块代码

package.xml

    <assembly
            xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
        <id></id>
        <formats>
            <format>dir</format>
        </formats>
        <includeBaseDirectory>false</includeBaseDirectory>
        <fileSets>
            <fileSet>
                <directory>src/main/resources</directory>
                <includes>
                    <include>plugin.json</include>
                </includes>
                <outputDirectory>plugin/reader/kafkareader</outputDirectory>
            </fileSet>
            <fileSet>
                <directory>target/</directory>
                <includes>
                    <include>kafkareader-0.0.1-SNAPSHOT.jar</include>
                </includes>
                <outputDirectory>plugin/reader/kafkareader</outputDirectory>
            </fileSet>
        </fileSets>

        <dependencySets>
            <dependencySet>
                <useProjectArtifact>false</useProjectArtifact>
                <outputDirectory>plugin/reader/kafkareader/libs</outputDirectory>
                <scope>runtime</scope>
            </dependencySet>
        </dependencySets>
    </assembly>

DateUtil.java

    package com.alibaba.datax.plugin.reader.kafkareader;

    import java.text.SimpleDateFormat;
    import java.util.Date;

    /**
     * @Description: TODO
     * @Author: chenweifeng
     * @Date: 2022年09月05日 下午1:43
     **/
    public class DateUtil {

        /**
         * 时间转 yyyy-MM-dd HH:mm:ss 字符串
         *
         * @param date
         * @return
         */
        public static String targetFormat(Date date) {
            String dateString = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date);
            return dateString;
        }

        /**
         * 时间转字符串
         *
         * @param date
         * @param pattern
         * @return
         */
        public static String targetFormat(Date date, String pattern) {
            String dateString = new SimpleDateFormat(pattern).format(date);
            return dateString;
        }
    }

JsonUtilJava.java

    package com.alibaba.datax.plugin.reader.kafkareader;

    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.fastjson.TypeReference;

    import java.util.HashMap;

    /**
     * @Description: TODO
     * @Author: chenweifeng
     * @Date: 2022年09月05日 下午1:47
     **/
    public class JsonUtilJava {

        /**
         * json转hashmap
         *
         * @param jsonData
         * @return
         */
        public static HashMap<String, Object> parseJsonStrToMap(String jsonData) {
            HashMap<String, Object> hashMap = JSONObject.parseObject(jsonData, new TypeReference<HashMap<String, Object>>() {
            });
            return hashMap;
        }
    }

KafkaReader.java

    package com.alibaba.datax.plugin.reader.kafkareader;

    import com.alibaba.datax.common.element.Record;
    import com.alibaba.datax.common.element.StringColumn;
    import com.alibaba.datax.common.exception.DataXException;
    import com.alibaba.datax.common.plugin.RecordSender;
    import com.alibaba.datax.common.spi.Reader;
    import com.alibaba.datax.common.util.Configuration;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.io.FileNotFoundException;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.time.Duration;
    import java.util.*;
    import java.util.regex.Matcher;
    import java.util.regex.Pattern;

    public class KafkaReader extends Reader {

        public static class Job extends Reader.Job {
            private static final Logger LOG = LoggerFactory.getLogger(Job.class);

            private Configuration originalConfig = null;


            @Override
            public void init() {
                this.originalConfig = super.getPluginJobConf();
                // warn: 忽略大小写

                String topic = this.originalConfig.getString(Key.TOPIC);
                Integer partitions = this.originalConfig.getInt(Key.KAFKA_PARTITIONS);
                String bootstrapServers = this.originalConfig.getString(Key.BOOTSTRAP_SERVERS);
                String groupId = this.originalConfig.getString(Key.GROUP_ID);
                Integer columnCount = this.originalConfig.getInt(Key.COLUMNCOUNT);
                String split = this.originalConfig.getString(Key.SPLIT);
                String filterContaintsStr = this.originalConfig.getString(Key.CONTAINTS_STR);
                String filterContaintsFlag = this.originalConfig.getString(Key.CONTAINTS_STR_FLAG);
                String conditionAllOrOne = this.originalConfig.getString(Key.CONDITION_ALL_OR_ONE);
                String parsingRules = this.originalConfig.getString(Key.PARSING_RULES);
                String writerOrder = this.originalConfig.getString(Key.WRITER_ORDER);
                String kafkaReaderColumnKey = this.originalConfig.getString(Key.KAFKA_READER_COLUMN_KEY);

                LOG.info("topic:{},partitions:{},bootstrapServers:{},groupId:{},columnCount:{},split:{},parsingRules:{}",
                        topic, partitions, bootstrapServers, groupId, columnCount, split, parsingRules);

                if (null == topic) {
                    throw DataXException.asDataXException(KafkaReaderErrorCode.TOPIC_ERROR,
                            "没有设置参数[topic].");
                }
                if (partitions == null) {
                    throw DataXException.asDataXException(KafkaReaderErrorCode.PARTITION_ERROR,
                            "没有设置参数[kafka.partitions].");
                } else if (partitions < 1) {
                    throw DataXException.asDataXException(KafkaReaderErrorCode.PARTITION_ERROR,
                            "[kafka.partitions]不能小于1.");
                }
                if (null == bootstrapServers) {
                    throw DataXException.asDataXException(KafkaReaderErrorCode.ADDRESS_ERROR,
                            "没有设置参数[bootstrap.servers].");
                }
                if (null == groupId) {
                    throw DataXException.asDataXException(KafkaReaderErrorCode.KAFKA_READER_ERROR,
                            "没有设置参数[groupid].");
                }
                if (columnCount == null) {
                    throw DataXException.asDataXException(KafkaReaderErrorCode.PARTITION_ERROR,
                            "没有设置参数[columnCount].");
                } else if (columnCount < 1) {
                    throw DataXException.asDataXException(KafkaReaderErrorCode.KAFKA_READER_ERROR,
                            "[columnCount]不能小于1.");
                }
                if (null == split) {
                    throw DataXException.asDataXException(KafkaReaderErrorCode.KAFKA_READER_ERROR,
                            "[split]不能为空.");
                }
                if (filterContaintsStr != null) {
                    if (conditionAllOrOne == null || filterContaintsFlag == null) {
                        throw DataXException.asDataXException(KafkaReaderErrorCode.KAFKA_READER_ERROR,
                                "设置了[filterContaintsStr],但是没有设置[conditionAllOrOne]或者[filterContaintsFlag]");
                    }
                }
                if (parsingRules == null) {
                    throw DataXException.asDataXException(KafkaReaderErrorCode.KAFKA_READER_ERROR,
                            "没有设置[parsingRules]参数");
                } else if (!parsingRules.equals("regex") && parsingRules.equals("json") && parsingRules.equals("split")) {
                    throw DataXException.asDataXException(KafkaReaderErrorCode.KAFKA_READER_ERROR,
                            "[parsingRules]参数设置错误,不是regex,json,split其中一个");
                }
                if (writerOrder == null) {
                    throw DataXException.asDataXException(KafkaReaderErrorCode.KAFKA_READER_ERROR,
                            "没有设置[writerOrder]参数");
                }
                if (kafkaReaderColumnKey == null) {
                    throw DataXException.asDataXException(KafkaReaderErrorCode.KAFKA_READER_ERROR,
                            "没有设置[kafkaReaderColumnKey]参数");
                }
            }

            @Override
            public void preCheck() {
                init();
            }

            @Override
            public List<Configuration> split(int adviceNumber) {
                List<Configuration> configurations = new ArrayList<Configuration>();

                Integer partitions = this.originalConfig.getInt(Key.KAFKA_PARTITIONS);
                for (int i = 0; i < partitions; i++) {
                    configurations.add(this.originalConfig.clone());
                }
                return configurations;
            }

            @Override
            public void post() {
            }

            @Override
            public void destroy() {

            }

        }

        public static class Task extends Reader.Task {

            private static final Logger LOG = LoggerFactory.getLogger(Task.class);
            //配置文件
            private Configuration readerSliceConfig;
            //kafka消息的分隔符
            private String split;
            //解析规则
            private String parsingRules;
            //是否停止拉去数据
            private boolean flag;
            //kafka address
            private String bootstrapServers;
            //kafka groupid
            private String groupId;
            //kafkatopic
            private String kafkaTopic;
            //kafka中的数据一共有多少个字段
            private int count;
            //是否需要data_from
            //kafka ip 端口+ topic
            //将包含/不包含该字符串的数据过滤掉
            private String filterContaintsStr;
            //是包含containtsStr 还是不包含
            //1 表示包含 0 表示不包含
            private int filterContaintsStrFlag;
            //全部包含或不包含,包含其中一个或者不包含其中一个。
            private int conditionAllOrOne;
            //writer端要求的顺序。
            private String writerOrder;
            //kafkareader端的每个关键子的key
            private String kafkaReaderColumnKey;
            //异常文件路径
            private String exceptionPath;
            //
            private Boolean enableAutoCommit;
            //
            private Integer maxPollRecords;

            @Override
            public void init() {
                flag = true;
                this.readerSliceConfig = super.getPluginJobConf();
                split = this.readerSliceConfig.getString(Key.SPLIT);
                bootstrapServers = this.readerSliceConfig.getString(Key.BOOTSTRAP_SERVERS);
                groupId = this.readerSliceConfig.getString(Key.GROUP_ID);
                kafkaTopic = this.readerSliceConfig.getString(Key.TOPIC);
                count = this.readerSliceConfig.getInt(Key.COLUMNCOUNT);
                filterContaintsStr = this.readerSliceConfig.getString(Key.CONTAINTS_STR);
                filterContaintsStrFlag = this.readerSliceConfig.getInt(Key.CONTAINTS_STR_FLAG);
                conditionAllOrOne = this.readerSliceConfig.getInt(Key.CONTAINTS_STR_FLAG);
                parsingRules = this.readerSliceConfig.getString(Key.PARSING_RULES);
                writerOrder = this.readerSliceConfig.getString(Key.WRITER_ORDER);
                kafkaReaderColumnKey = this.readerSliceConfig.getString(Key.KAFKA_READER_COLUMN_KEY);
                exceptionPath = this.readerSliceConfig.getString(Key.EXECPTION_PATH);
                enableAutoCommit = Objects.isNull(this.readerSliceConfig.getBool(Key.ENABLE_AUTO_COMMIT)) ? true : this.readerSliceConfig.getBool(Key.EXECPTION_PATH);
                maxPollRecords = Objects.isNull(this.readerSliceConfig.getInt(Key.MAX_POLL_RECORDS)) ? 200 : this.readerSliceConfig.getInt(Key.MAX_POLL_RECORDS);
                LOG.info(filterContaintsStr);
            }

            @Override
            public void startRead(RecordSender recordSender) {

                Properties props = new Properties();
                props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
                props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId != null ? groupId : UUID.randomUUID().toString());
                props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
                props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
                props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
                props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);


                KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
                consumer.subscribe(Collections.singletonList(kafkaTopic));

                Record oneRecord = null;
                int commitSyncMaxNum = maxPollRecords;
                int commitSyncNum = 0;
                while (flag) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                    for (ConsumerRecord<String, String> record : records) {

                        String value = record.value();
                        //定义过滤标志
                        int ifNotContinue = filterMessage(value);
                        //如果标志修改为1了那么就过滤掉这条数据。
                        if (ifNotContinue == 1) {
                            LOG.info("过滤数据: " + record.value());
                            continue;
                        }
                        oneRecord = buildOneRecord(recordSender, value);
                        //如果返回值不等于null表示不是异常消息。
                        if (oneRecord != null) {
                            LOG.info("oneRecord:{}", oneRecord.toString());
                            recordSender.sendToWriter(oneRecord);
                        }
                    }
                    recordSender.flush();

                    if (!enableAutoCommit) {
                        commitSyncNum++;
                        if (commitSyncNum >= commitSyncMaxNum) {
                            consumer.commitSync();
                            commitSyncNum = 0;
                        }
                    }

                    //判断当前事件是不是0点,0点的话进程他退出
                    Date date = new Date();
                    if (DateUtil.targetFormat(date).split(" ")[1].substring(0, 2).equals("00")) {
                        destroy();
                    }

                }
            }

            private int filterMessage(String value) {
                //如果要过滤的条件配置了
                int ifNotContinue = 0;

                if (filterContaintsStr != null) {
                    String[] filterStrs = filterContaintsStr.split(",");
                    //所有
                    if (conditionAllOrOne == 1) {
                        //过滤掉包含filterContaintsStr的所有项的值。
                        if (filterContaintsStrFlag == 1) {
                            int i = 0;
                            for (; i < filterStrs.length; i++) {
                                if (!value.contains(filterStrs[i])) break;
                            }
                            if (i >= filterStrs.length) ifNotContinue = 1;
                        } else {
                            //留下掉包含filterContaintsStr的所有项的值
                            int i = 0;
                            for (; i < filterStrs.length; i++) {
                                if (!value.contains(filterStrs[i])) break;
                            }
                            if (i < filterStrs.length) ifNotContinue = 1;
                        }

                    } else {
                        //过滤掉包含其中一项的值
                        if (filterContaintsStrFlag == 1) {
                            int i = 0;
                            for (; i < filterStrs.length; i++) {
                                if (value.contains(filterStrs[i])) break;
                            }
                            if (i < filterStrs.length) ifNotContinue = 1;
                        }
                        //留下包含其中一下的值
                        else {
                            int i = 0;
                            for (; i < filterStrs.length; i++) {
                                if (value.contains(filterStrs[i])) break;
                            }
                            if (i >= filterStrs.length) ifNotContinue = 1;
                        }
                    }
                }
                return ifNotContinue;

            }

            private Record buildOneRecord(RecordSender recordSender, String value) {
                Record record = null;
                if (parsingRules.equals("regex")) {
                    record = parseRegex(value, recordSender);
                } else if (parsingRules.equals("json")) {
                    record = parseJson(value, recordSender);
                } else if (parsingRules.equals("split")) {
                    record = parseSplit(value, recordSender);
                }
                LOG.info("record:{}", record.toString());
                return record;
            }

            private Record parseSplit(String value, RecordSender recordSender) {
                Record record = recordSender.createRecord();
                String[] splits = value.split(this.split);
                if (splits.length != count) {
                    writerErrorPath(value);
                    return null;
                }
                return parseOrders(Arrays.asList(splits), record);
            }

            private Record parseJson(String value, RecordSender recordSender) {
                LOG.info("parseJson value :{}", value);
                Record record = recordSender.createRecord();
                HashMap<String, Object> map = JsonUtilJava.parseJsonStrToMap(value);
                LOG.info("map :{}", map);
                List<Map<String, Object>> mapData = (List<Map<String, Object>>) map.get("data");
                LOG.info("mapData :{}", mapData);

                LOG.info("parseJson kafkaReaderColumnKey :{}", kafkaReaderColumnKey);
                String[] columns = kafkaReaderColumnKey.split(",");

                if (mapData.size() != columns.length) {
                    throw new RuntimeException("kafka字段数和columns的字段数不一致,无法映射数据");
                }

                ArrayList<String> datas = new ArrayList<>();
                for (int i = 0; i < columns.length; i++) {
                    datas.add(String.valueOf(mapData.get(i).get("rawData")));
                }

    //            for (String column : columns) {
    //                datas.add(map.get(column).toString());
    //            }
                if (datas.size() != count) {
                    writerErrorPath(value);
                    return null;
                }
                LOG.info("datas:{}", datas);
                return parseOrders(datas, record);
            }

            private Record parseRegex(String value, RecordSender recordSender) {
                Record record = recordSender.createRecord();
                ArrayList<String> datas = new ArrayList<String>();
                Pattern r = Pattern.compile(split);
                Matcher m = r.matcher(value);
                if (m.find()) {
                    if (m.groupCount() != count) {
                        writerErrorPath(value);
                    }
                    for (int i = 1; i <= count; i++) {
                        //  record.addColumn(new StringColumn(m.group(i)));
                        datas.add(m.group(i));
                        return record;
                    }
                } else {
                    writerErrorPath(value);
                }

                return parseOrders(datas, record);
            }

            private void writerErrorPath(String value) {
                if (exceptionPath == null) return;
                FileOutputStream fileOutputStream = null;
                try {
                    fileOutputStream = getFileOutputStream();
                    fileOutputStream.write((value + "\n").getBytes());
                    fileOutputStream.close();
                } catch (FileNotFoundException e) {
                    e.printStackTrace();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }

            private FileOutputStream getFileOutputStream() throws FileNotFoundException {
                return new FileOutputStream(exceptionPath + "/" + kafkaTopic + "errordata" + DateUtil.targetFormat(new Date(), "yyyyMMdd"), true);
            }

            private Record parseOrders(List<String> datas, Record record) {
                //writerOrder
                String[] orders = writerOrder.split(",");
                LOG.info("writerOrder:{}", writerOrder);
                for (String order : orders) {
                    if (order.equals("data_from")) {
                        record.addColumn(new StringColumn(bootstrapServers + "|" + kafkaTopic));
                    } else if (order.equals("uuid")) {
                        record.addColumn(new StringColumn(UUID.randomUUID().toString()));
                    } else if (order.equals("null")) {
                        record.addColumn(new StringColumn("null"));
                    } else if (order.equals("datax_time")) {
                        record.addColumn(new StringColumn(DateUtil.targetFormat(new Date())));
                    } else if (isNumeric(order)) {
                        record.addColumn(new StringColumn(datas.get(new Integer(order) - 1)));
                    }
                }
                return record;
            }

            public static boolean isNumeric(String str) {
                for (int i = 0; i < str.length(); i++) {
                    if (!Character.isDigit(str.charAt(i))) {
                        return false;
                    }
                }
                return true;
            }

            @Override
            public void post() {
            }

            @Override
            public void destroy() {
                flag = false;
            }


        }
    }

KafkaReaderErrorCode.java

    package com.alibaba.datax.plugin.reader.kafkareader;

    import com.alibaba.datax.common.spi.ErrorCode;

    /**
     * @Description: TODO
     * @Author: chenweifeng
     * @Date: 2022年09月05日 上午11:14
     **/
    public enum KafkaReaderErrorCode implements ErrorCode {
        TOPIC_ERROR("KafkaReader-00", "您没有设置topic参数"),
        PARTITION_ERROR("KafkaReader-01", "您没有设置kafkaPartitions参数"),
        ADDRESS_ERROR("KafkaReader-02", "您没有设置bootstrapServers参数"),
        KAFKA_READER_ERROR("KafkaReader-03", "参数错误"),
        ;


        private final String code;
        private final String description;

        private KafkaReaderErrorCode(String code, String description) {
            this.code = code;
            this.description = description;
        }

        @Override
        public String getCode() {
            return this.code;
        }

        @Override
        public String getDescription() {
            return this.description;
        }

        @Override
        public String toString() {
            return String.format("Code:[%s], Description:[%s].", this.code,
                    this.description);
        }
    }

Key.java

    package com.alibaba.datax.plugin.reader.kafkareader;

    /**
     * @Description: TODO
     * @Author: chenweifeng
     * @Date: 2022年09月05日 上午11:08
     **/
    public class Key {

        public final static String TOPIC = "topic";// 主题
        public final static String KAFKA_PARTITIONS = "kafkaPartitions";// 分区数
        public final static String BOOTSTRAP_SERVERS = "bootstrapServers";//
        public final static String GROUP_ID = "groupId";// 消费组
        public final static String COLUMNCOUNT = "columnCount";// 字段数
        public final static String SPLIT = "split";// 分割符
        public final static String CONTAINTS_STR = "filterContaints";// 过滤的字符串 英文逗号隔开
        public final static String CONTAINTS_STR_FLAG = "filterContaintsFlag";// 1 表示包含 0 表示不包含
        public final static String CONDITION_ALL_OR_ONE = "conditionAllOrOne";// 1 全部包含或不包含,0 包含其中一个或者不包含其中一个
        public final static String PARSING_RULES = "parsingRules";//
        public final static String WRITER_ORDER = "writerOrder";//
        public final static String KAFKA_READER_COLUMN_KEY = "kafkaReaderColumnKey";//
        public final static String EXECPTION_PATH = "execptionPath";//
        public final static String ENABLE_AUTO_COMMIT = "enableAutoCommit";// 是否自动提交
        public final static String MAX_POLL_RECORDS = "maxPollRecords";// 最大Poll数

    }

plugin.json

    {
        "name": "kafkareader",
        "class": "com.alibaba.datax.plugin.reader.kafkareader.KafkaReader",
        "description": "kafka reader 插件",
        "developer": "chenweifeng"
    }

plugin_job_template.json

    {
      "name": "kafkareader",
      "parameter": {
        "topic": "Event",
        "bootstrapServers": "192.168.7.128:9092",
        "kafkaPartitions": "1",
        "columnCount": 11,
        "groupId": "ast",
        "filterContaints": "5^1,6^5",
        "filterContaintsFlag": 1,
        "conditionAllOrOne": 0,
        "parsingRules": "regex",
        "writerOrder": "uuid,1,3,6,4,8,9,10,11,5,7,2,null,datax_time,data_from",
        "kafkaReaderColumnKey": "a",
        "execptionPath": "/opt/module/datax/log/errorlog"
      }
    }

3、kafkawriter模块代码

package.xml

    <assembly
            xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
            xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
            xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
        <id></id>
        <formats>
            <format>dir</format>
        </formats>
        <includeBaseDirectory>false</includeBaseDirectory>
        <fileSets>
            <fileSet>
                <directory>src/main/resources</directory>
                <includes>
                    <include>plugin.json</include>
                </includes>
                <outputDirectory>plugin/writer/kafkawriter</outputDirectory>
            </fileSet>
            <fileSet>
                <directory>target/</directory>
                <includes>
                    <include>kafkawriter-0.0.1-SNAPSHOT.jar</include>
                </includes>
                <outputDirectory>plugin/writer/kafkawriter</outputDirectory>
            </fileSet>
        </fileSets>

        <dependencySets>
            <dependencySet>
                <useProjectArtifact>false</useProjectArtifact>
                <outputDirectory>plugin/writer/kafkawriter/libs</outputDirectory>
                <scope>runtime</scope>
            </dependencySet>
        </dependencySets>
    </assembly>

KafkaWriter.java

    package com.alibaba.datax.plugin.writer.kafkawriter;

    import com.alibaba.datax.common.element.Column;
    import com.alibaba.datax.common.element.Record;
    import com.alibaba.datax.common.exception.DataXException;
    import com.alibaba.datax.common.plugin.RecordReceiver;
    import com.alibaba.datax.common.spi.Writer;
    import com.alibaba.datax.common.util.Configuration;
    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.ListTopicsResult;
    import org.apache.kafka.clients.admin.NewTopic;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;

    /**
     * @ClassName: KafkaWriter
     * @Author: majun
     * @CreateDate: 2019/2/20 11:06
     * @Version: 1.0
     * @Description: datax kafkawriter
     */

    public class KafkaWriter extends Writer {

        public static class Job extends Writer.Job {

            private static final Logger logger = LoggerFactory.getLogger(Job.class);
            private Configuration conf = null;

            @Override
            public List<Configuration> split(int mandatoryNumber) {
                List<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
                for (int i = 0; i < mandatoryNumber; i++) {
                    configurations.add(conf);
                }
                return configurations;
            }

            private void validateParameter() {
                this.conf.getNecessaryValue(Key.BOOTSTRAP_SERVERS, KafkaWriterErrorCode.REQUIRED_VALUE);
                this.conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);
            }

            @Override
            public void init() {
                this.conf = super.getPluginJobConf();
                logger.info("kafka writer params:{}", conf.toJSON());
                this.validateParameter();
            }


            @Override
            public void destroy() {

            }
        }

        public static class Task extends Writer.Task {
            private static final Logger logger = LoggerFactory.getLogger(Task.class);
            private static final String NEWLINE_FLAG = System.getProperty("line.separator", "\n");

            private Producer<String, String> producer;
            private String fieldDelimiter;
            private Configuration conf;
            private Properties props;

            @Override
            public void init() {
                this.conf = super.getPluginJobConf();
                fieldDelimiter = conf.getUnnecessaryValue(Key.FIELD_DELIMITER, "\t", null);

                props = new Properties();
                props.put("bootstrap.servers", conf.getString(Key.BOOTSTRAP_SERVERS));
                props.put("acks", conf.getUnnecessaryValue(Key.ACK, "0", null));//这意味着leader需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。
                props.put("retries", conf.getUnnecessaryValue(Key.RETRIES, "0", null));
                // Controls how much bytes sender would wait to batch up before publishing to Kafka.
                //控制发送者在发布到kafka之前等待批处理的字节数。
                //控制发送者在发布到kafka之前等待批处理的字节数。 满足batch.size和ling.ms之一,producer便开始发送消息
                //默认16384   16kb
                props.put("batch.size", conf.getUnnecessaryValue(Key.BATCH_SIZE, "16384", null));
                props.put("linger.ms", 1);
                props.put("key.serializer", conf.getUnnecessaryValue(Key.KEYSERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
                props.put("value.serializer", conf.getUnnecessaryValue(Key.VALUESERIALIZER, "org.apache.kafka.common.serialization.StringSerializer", null));
                producer = new KafkaProducer<String, String>(props);
            }

            @Override
            public void prepare() {
                if (Boolean.valueOf(conf.getUnnecessaryValue(Key.NO_TOPIC_CREATE, "false", null))) {

                    ListTopicsResult topicsResult = AdminClient.create(props).listTopics();
                    String topic = conf.getNecessaryValue(Key.TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE);

                    try {
                        if (!topicsResult.names().get().contains(topic)) {
                            new NewTopic(
                                    topic,
                                    Integer.valueOf(conf.getUnnecessaryValue(Key.TOPIC_NUM_PARTITION, "1", null)),
                                    Short.valueOf(conf.getUnnecessaryValue(Key.TOPIC_REPLICATION_FACTOR, "1", null))
                            );
                            List<NewTopic> newTopics = new ArrayList<NewTopic>();
                            AdminClient.create(props).createTopics(newTopics);
                        }
                    } catch (Exception e) {
                        throw new DataXException(KafkaWriterErrorCode.CREATE_TOPIC, KafkaWriterErrorCode.REQUIRED_VALUE.getDescription());
                    }
                }
            }

            @Override
            public void startWrite(RecordReceiver lineReceiver) {
                logger.info("start to writer kafka");
                Record record = null;
                while ((record = lineReceiver.getFromReader()) != null) {//说明还在读取数据,或者读取的数据没处理完
                    //获取一行数据,按照指定分隔符 拼成字符串 发送出去
                    if (conf.getUnnecessaryValue(Key.WRITE_TYPE, WriteType.TEXT.name(), null).toLowerCase()
                            .equals(WriteType.TEXT.name().toLowerCase())) {
                        producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
                                recordToString(record),
                                recordToString(record))
                        );
                    } else if (conf.getUnnecessaryValue(Key.WRITE_TYPE, WriteType.TEXT.name(), null).toLowerCase()
                            .equals(WriteType.JSON.name().toLowerCase())) {
                        producer.send(new ProducerRecord<String, String>(this.conf.getString(Key.TOPIC),
                                recordToString(record),
                                record.toString())
                        );
                    }
                    logger.info("complete write " + record.toString());
                    producer.flush();
                }
            }

            @Override
            public void destroy() {
                if (producer != null) {
                    producer.close();
                }
            }

            /**
             * 数据格式化
             *
             * @param record
             * @return
             */
            private String recordToString(Record record) {
                int recordLength = record.getColumnNumber();
                if (0 == recordLength) {
                    return NEWLINE_FLAG;
                }
                Column column;
                StringBuilder sb = new StringBuilder();
                for (int i = 0; i < recordLength; i++) {
                    column = record.getColumn(i);
                    sb.append(column.asString()).append(fieldDelimiter);
                }

                sb.setLength(sb.length() - 1);
                sb.append(NEWLINE_FLAG);

                logger.info("recordToString:{}",sb.toString());
                return sb.toString();
            }
        }
    }

KafkaWriterErrorCode.java

    package com.alibaba.datax.plugin.writer.kafkawriter;

    import com.alibaba.datax.common.spi.ErrorCode;

    public enum KafkaWriterErrorCode implements ErrorCode {

        REQUIRED_VALUE("KafkaWriter-00", "您缺失了必须填写的参数值."),
        CREATE_TOPIC("KafkaWriter-01", "写入数据前检查topic或是创建topic失败.");

        private final String code;
        private final String description;

        private KafkaWriterErrorCode(String code, String description) {
            this.code = code;
            this.description = description;
        }

        @Override
        public String getCode() {
            return this.code;
        }

        @Override
        public String getDescription() {
            return this.description;
        }

        @Override
        public String toString() {
            return String.format("Code:[%s], Description:[%s].", this.code,
                    this.description);
        }

    }

Key.java

    package com.alibaba.datax.plugin.writer.kafkawriter;

    /**
     * @ClassName: Key
     * @Author: majun
     * @CreateDate: 2019/2/20 11:17
     * @Version: 1.0
     * @Description: TODO
     */

    public class Key {
    //
    //    bootstrapServers": "",
    //            "topic": "",
    //            "ack": "all",
    //            "batchSize": 1000,
    //            "retries": 0,
    //            "keySerializer":"org.apache.kafka.common.serialization.StringSerializer",
    //            "valueSerializer": "org.apache.kafka.common.serialization.StringSerializer",
    //            "fieldFelimiter": ","

        public static final String BOOTSTRAP_SERVERS = "bootstrapServers";

        // must have
        public static final String TOPIC = "topic";

        public static final String ACK = "ack";

        public static final String BATCH_SIZE = "batchSize";

        public static final String RETRIES = "retries";

        public static final String KEYSERIALIZER = "keySerializer";

        public static final String VALUESERIALIZER = "valueSerializer";

        // not must , not default
        public static final String FIELD_DELIMITER = "fieldDelimiter";

        public static final String NO_TOPIC_CREATE = "noTopicCreate";

        public static final String TOPIC_NUM_PARTITION = "topicNumPartition";

        public static final String TOPIC_REPLICATION_FACTOR = "topicReplicationFactor";

        public static final String WRITE_TYPE = "writeType";
    }

WriterType.java

    package com.alibaba.datax.plugin.writer.kafkawriter;

    public enum WriteType {
        JSON("json"),
        TEXT("text");

        private String name;

        WriteType(String name) {
            this.name = name;
        }
    }

plugin.json

    {
      "name": "kafkawriter",
      "class": "com.alibaba.datax.plugin.writer.kafkawriter.KafkaWriter",
      "description": "kafka writer 插件",
      "developer": "chenweifeng"
    }

plugin_job_template.json

    {
      "name": "kafkawriter",
      "parameter": {
        "bootstrapServers": "10.1.20.150:9092",
        "topic": "test-topic",
        "ack": "all",
        "batchSize": 1000,
        "retries": 0,
        "keySerializer": "org.apache.kafka.common.serialization.StringSerializer",
        "valueSerializer": "org.apache.kafka.common.serialization.StringSerializer",
        "fieldFelimiter": ",",
        "writeType": "json",
        "topicNumPartition": 1,
        "topicReplicationFactor": 1
      }
    }

4、由kafka写到kafka的例子

    {
        "job": {
            "setting": {
                "speed": {
                     "channel": 12
                },
                "errorLimit": {
                    "record": 0,
                    "percentage": 0.02
                }
            },
            "content": [
                {
                    "reader": {
                          "name": "kafkareader",
                          "parameter": {
                            "topic": "test_datax_kafka_read",
                            "bootstrapServers": "10.254.21.6:59292,10.254.21.1:59292,10.254.21.2:59292",
                            "kafkaPartitions": 12,
                            "columnCount": 34,
                            "groupId": "datax_kafka_kafka",
                            "filterContaints": "5^1,6^5",
                            "filterContaintsFlag": 1,
                            "conditionAllOrOne": 0,
                            "parsingRules": "json",
                            "writerOrder": "1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34",
                            "kafkaReaderColumnKey": "id,c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12,c13,c14,c15,c16,c17,c18,c19,c20,c21,c22,c23,c24,c25,c26,c27,c28,c29,c30,c31,c32,crt_time",
                            "execptionPath": "/Users/chenweifeng/datax/log/errorlog",
                            "split":"\t"
                          }
                    },
                    "writer": {
                        "name": "kafkawriter",
                        "parameter": {
                            "print": true,
                            "topic": "test_datax_kafka_write",
                            "bootstrapServers": "10.254.21.6:59292,10.254.21.1:59292,10.254.21.2:59292",
                            "fieldDelimiter": "\t",
                            "batchSize": 20,
                            "writeType": "json",
                            "notTopicCreate": false,
                            "topicNumPartition": 12,
                            "topicReplicationFactor": 3
                  }
              }
                }
            ]
        }
    }
作者:Jeebiz  创建时间:2022-10-16 00:36
 更新时间:2024-07-10 22:56