1、在core模块下新增AESEncryptTransformer

package com.alibaba.datax.core.transport.transformer;

import cn.hutool.core.util.StrUtil;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.core.util.AESUtil;
import com.alibaba.datax.transformer.Transformer;

import java.util.Arrays;



public class AESEncryptTransformer extends Transformer {
    public AESEncryptTransformer() {
        super.setTransformerName("dx_aes_encrypt");
    }

    @Override
    public Record evaluate(Record record, Object... paras) {
        int columnIndex;
        String aesKey = null;

        try {
            if (paras.length == 1) {
                columnIndex = (Integer) paras[0];
            } else if (paras.length == 2) {
                columnIndex = (Integer) paras[0];
                aesKey = (String) paras[1];
            } else {
                throw new RuntimeException(getTransformerName() + " paras at moust 2");
            }
        } catch (Exception e) {
            throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_ILLEGAL_PARAMETER, "paras:" + Arrays.asList(paras).toString() + " => " + e.getMessage());
        }


        Column column = record.getColumn(columnIndex);

        try {
            String oriValue = column.asString();
            //如果字段为空,跳过,不进行解密操作
            if (oriValue == null) {
                return record;
            }

            String newValue = "";
            if (StrUtil.isBlank(aesKey)) {
                newValue = AESUtil.encrypt(oriValue, null);
            } else {
                newValue = AESUtil.encrypt(oriValue, aesKey);
            }

            record.setColumn(columnIndex, new StringColumn(newValue));

        } catch (Exception e) {
            throw DataXException.asDataXException(TransformerErrorCode.TRANSFORMER_RUN_EXCEPTION, e.getMessage(), e);
        }

        return record;
    }
}

2、把转换器注册进去

3、Pom文件新增依赖

<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.6.3</version>
</dependency>

4、新增加密工具类

package com.alibaba.datax.core.util;

import cn.hutool.core.util.HexUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.crypto.SecureUtil;

public class AESUtil {

    private static final String AES_KEY = "AES_KEY";


    private static String keyStr = "ad1725339b2dd0a68903c57b6uyt23ca";

    public static String encrypt(String content, String key) {
        if (StrUtil.isBlank(content)) {
            return content;
        }
        byte[] KEY_BYTES = getKeyBytes(key);
        return HexUtil.encodeHexStr(SecureUtil.aes(KEY_BYTES).encrypt(content), false);
    }

    private static byte[] getKeyBytes(String key) {
        if (StrUtil.isBlank(key)) {
            key = keyStr;
        }
        byte[] KEY_BYTES = new byte[16];
        int i = 0;
        for (byte b : key.getBytes()) {
            KEY_BYTES[i++ % 16] ^= b;
        }
        return KEY_BYTES;
    }

    public static String decrypt(String content) {
        if (StrUtil.isBlank(content)) {
            return content;
        }
        return SecureUtil.aes(getKeyBytes(null)).decryptStr(content);
    }

    public static void main(String[] args) {
        System.out.println(decrypt("37C612CC32BE40154FA19886CE56765D"));
        System.out.println(decrypt(decrypt("979AD63F61607260F4ABBAC41C211ADA0240070159598AEF532436DEF406560982172EE0194424B544A75E3555B0FA14")));
    }

}

5、打包后运行,Job模板如下

{
    "job": {
        "setting": {
            "speed": {
                "channel": 3
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.02
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "root",
                        "password": "root",
                        "column": [
                            "姓名",
                            "年龄",
                            "专业",
                            "部门"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "人员"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://100.89.9.20:3306/test?useSSL=false&allowPublicKeyRetrieval=true&useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai"
                                ]
                            }
                        ]
                    }
                },
                "transformer":[
                    {
                        "name":"dx_aes_encrypt",
                        "parameter":{
                            "columnIndex":0,
                            "paras":[
                                "ad17ooiu9b2dd0a68903c57b6uy093ca"
                            ]
                        }
                    }
                ],
                "writer": {
                    "name": "postgresqlwriter",
                    "parameter": {
                        "print":true,
                        "column": [
                            "姓名",
                            "年龄",
                            "专业",
                            "部门"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "jdbc:postgresql://100.89.9.6:5432/人力数据0727",
                                "table": ["人员"]
                            }
                        ],
                        "password": "123456",
                        "username": "postgres"
                    }
                }
            }
        ]
    }
}

6、执行结果

作者:Jeebiz  创建时间:2022-10-16 01:26
 更新时间:2024-07-10 22:56