一、DataX-WEB 前端界面

部署完成后,在浏览器中输入 http://ip:port/index.html 就可以访问对应的主界面(ip为datax-admin部署所在服务器ip,port为为datax-admin 指定的运行端口9527)

输入用户名 admin 密码 123456 就可以直接访问系统

二、DataX-WEB API(可用于扩展)

datax-web部署成功后,可以了解datax-web API相关内容,网址: http://ip:port/doc.html

三、DataX-WEB 运行日志

部署完成之后,在modules/对应的项目/data/applogs下(用户也可以自己指定日志,修改application.yml 中的logpath地址即可),用户可以根据此日志跟踪项目实际启动情况

如果执行器启动比admin快,执行器会连接失败,日志报”拒绝连接”的错误,一般是先启动admin,再启动executor,30秒之后会重连,如果成功请忽略这个异常。

四、DataX-WEB 实操

1. 查看执行器

查看web界面是否有注册成功的执行器,另外执行器可以根据需要改名称。

2.创建项目

3.任务模板构建

3.2 路由策略:当执行器集群部署时,提供丰富的路由策略,包括:
FIRST(第一个):固定选择第一个机器;
LAST(最后一个):固定选择最后一个机器;
ROUND(轮询):依次分配任务;
RANDOM(随机):随机选择在线的机器;
CONSISTENT_HASH(一致性HASH):每个任务按照Hash算法固定选择某一台机器,且所有任务均匀散列在不同机器上。
LEAST_FREQUENTLY_USED(最不经常使用):使用频率最低的机器优先被选举;
LEAST_RECENTLY_USED(最近最久未使用):最久为使用的机器优先被选举;
FAILOVER(故障转移):按照顺序依次进行心跳检测,第一个心跳检测成功的机器选定为目标执行器并发起调度;
BUSYOVER(忙碌转移):按照顺序依次进行空闲检测,第一个空闲检测成功的机器选定为目标执行器并发起调度;
阻塞处理策略:调度过于密集执行器来不及处理时的处理策略;

  • 单机串行:调度请求进入单机执行器后,调度请求进入FIFO队列并以串行方式运行;
  • 丢弃后续调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,本次请求将会被丢弃并标记为失败;
  • 覆盖之前调度:调度请求进入单机执行器后,发现执行器存在运行的调度任务,将会终止运行中的调度任务并清空队列,然后运行本地调度任务;
    增量增新建议将阻塞策略设置为丢弃后续调度或者单机串行
    设置单机串行时应该注意合理设置重试次数(失败重试的次数*每次执行时间<任务的调度周期),重试的次数如果设置的过多会导致数据重复,例如任务30秒执行一次,每次执行时间需要20秒,设置重试三次,如果任务失败了,第一个重试的时间段为1577755680-1577756680,重试任务没结束,新任务又开启,那新任务的时间段会是1577755680-1577758680

3.3任务类型
先选择DataX任务,后续配置完详细任务后可以更改3.4其它
可以根据需求填写

4.数据源配置

根据不同数据源,配置参数。

5.任务构建

构建reader

这里没按上面操作生成映射,直接使用任务管理->添加手动配置

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader",
          "parameter": {
            "username": "root",
            "password": "root",
            "column": [
              "*"
            ],
            "where": " save_time >= FROM_UNIXTIME(${lastTime}) and save_time < FROM_UNIXTIME(${currentTime})",
            "splitPk": "id",
            "connection": [
              {
                "table": [
                  "uc_op_amazon_api_store_download"
                ],
                "jdbcUrl": [
                  "jdbc:mysql://x.x.x.210:3306/test_system"
                ]
              }
            ]
          }
        },
        "writer": {
          "name": "mysqlwriter",
          "parameter": {
            "writeMode": "insert",
            "username": "root",
            "password": "root",
            "column": [
              "*"
            ],
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://192.168.88.192:3306/mytest?useUnicode=true&characterEncoding=utf8",
                "table": [
                  "uc_op_amazon_api_store_download"
                ]
              }
            ]
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 6
      }
    }
  }
}
-DstartId='%s' -DendId='%s'

# 表名
uc_op_business_reports

#主键
id

#id自增配置条件

“where”: “ id>= ${startId} and id<= ${endId}”,

入库前 清表

"preSql": [
    "truncate table uc_op_dynnamic_ad_api_day"
],

异常情况

问题 1:

配置远程数据源过程中出现异常,提示远程210不能连接

Code:[MYSQLErrCode-02], Description:[数据库服务的IP地址或者Port错误,请检查填写的IP地址和Port或者联系DBA确认IP地址和Port是否正确。如果是同步中心用户请联系DBA确认idb上录入的IP和PORT信息和数据库的当前实际信息是一致的]. - 具体错误信息为:com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure The last packet successfully received from the server was 174 milliseconds ago. The last packet sent successfully to the server was 174 milliseconds ago.

把./datax-admin/lib/mysql-connector-java-5.1.47.jar替换成mysql-connector-java-5.1.49.jar好了

问题 2:

2022-04-28 16:00:08.872 [New I/O server worker #1-3] ERROR c.a.otter.canal.server.netty.handler.SessionHandler - something goes wrong with channel:[id: 0x31df79de, /211.103.136.38:44900 => /x.x.x.210:11111], exception=java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:192)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:322)
at org.jboss.netty.channel.socket.nio.NioWorker.processSelectedKeys(NioWorker.java:281)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:201)
at org.jboss.netty.util.internal.IoWorkerRunnable.run(IoWorkerRunnable.java:46)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

作者:Jeebiz  创建时间:2022-06-15 12:11
 更新时间:2024-07-10 22:56