前言

Elastic-Job-Lite 是一个分布式调度解决方案,定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务。

本项目将 Elastic-Job-Lite 封装为基于注解形式的使用,即 @ElasticJobLite(),去掉了 默认的 xml 配置文件。

使用方式

创建你的Spring Boot项目,并在 pom.xml 中添加依赖:

<dependency>
    <groupId>org.apache.shardingsphere.elasticjob</groupId>
    <artifactId>elasticjob-lite-spring-boot-starter</artifactId>
    <version>3.0.0-beta</version>
</dependency>

创建作业任务服务类:

1、Simple类型的任务 (监听器可选)
@ElasticJobLite(cron = "0/5 * * * * ?", jobName = "MySimpleJob", shardingTotalCount = 3, 
  shardingItemParameters = "0=B,1=S,2=G", listenerName = "mySimpleJobListener")
public class MySimpleJob implements SimpleJob {
  @Autowired
    private MyJobService myJobService;
    @Override
    public void execute(ShardingContext shardingContext) {
        //调用业务逻辑实现
        myJobService.doSomething();
    }
}

@Component("mySimpleJobListener")
public class MySimpleJobListener implements ElasticJobListener {
    private static final Logger log = LoggerFactory.getLogger(MySimpleJobListener.class);
    @Override
    public void afterJobExecuted(ShardingContexts arg0) {
        //任务执行后处理
        //...
    }
    @Override
    public void beforeJobExecuted(ShardingContexts arg0) {
        //任务执行前处理
        //...
    }
}

2、Dataflow类型的任务 (监听器可选)

@ElasticJobLite(jobType = JobType.DATAFLOW, cron = "0/10 * * * * ?", jobName = "myDataflowJob", 
  shardingTotalCount = 2, shardingItemParameters = "0=A,1=B", listenerName = "myDataflowJobListener")
public class MyDataflowJob implements DataflowJob<Student> {
    private static final Logger log = LoggerFactory.getLogger(MyDataflowJob.class);
    @Override
    public List<Student> fetchData(ShardingContext paramShardingContext) {
        List<Student> list = new ArrayList<Student>();
        Student s = new Student();
        s.setUserName("zhangshan");
        list.add(s);
        return list;
    }
    @Override
    public void processData(ShardingContext paramShardingContext, List<Student> paramList) {
        for (Student s: paramList) {
            log.info(s.getUserName());
        }
    }
}

@Component("myDataflowJobListener")
public class MyDataflowJobListener implements ElasticJobListener {
    private static final Logger log = LoggerFactory.getLogger(MyDataflowJobListener.class);
    @Override
    public void afterJobExecuted(ShardingContexts arg0) {
        //任务执行后处理
        //...
    }
    @Override
    public void beforeJobExecuted(ShardingContexts arg0) {
        //任务执行前处理
        //...
    }
}

配置文件 application.yml

spring:
  application:
     name: demoElasticJob
  elasticjob:
    #注册中心配置
    zookeeper:
      serverList: 192.168.20.81:2181
      namespace: namespace-demoElasticJob
#  datasource:
#    url: jdbc:mysql://192.168.20.81:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=false
#    username: root
#    password: root@echinacoop123
#    druid:
#      initial-size: 5 #连接池初始化大小
#      min-idle: 10 #最小空闲连接数
#      max-active: 20 #最大连接数
注解属性说明 @ElasticJobLite(),详细说明请参考:Elastic Job Lite 官网
属性 类型 必须 默认值 描述
jobType String JobType.SIMPLE JobType.SIMPLE 或 JobType.DATAFLOW
cron String cron表达式,用于控制作业触发时间
jobName String 任务名称
shardingTotalCount int 1 作业分片总数
shardingItemParameters String 分片序列号和参数用等号分隔,多个键值对用逗号分隔 分片序列号从0开始,不可大于或等于作业分片总数 如:0=a,1=b,2=c
jobParameter String 作业自定义参数 作业自定义参数,可通过传递该参数为作业调度的业务方法传参,用于实现带参数的作业。例:每次获取的数据量、作业实例从数据库读取的主键等
failover boolean false 是否开启失效转移
misfire boolean true 是否开启错过任务重新执行
description String 作业描述信息
jobExceptionHandler String 扩展异常处理类
executorServiceHandler String 扩展作业处理线程池类
disabled boolean false 作业是否禁止启动 可用于部署作业时,先禁止启动,部署结束后统一启动
overwrite boolean true 本地配置是否可覆盖注册中心配置 如果可覆盖,每次启动作业都以本地配置为准
monitorExecution boolean true 监控作业运行时状态 每次作业执行时间和间隔时间均非常短的情况,建议不监控作业运行时状态以提升效率。因为是瞬时状态,所以无必要监控。请用户自行增加数据堆积监控。并且不能保证数据重复选取,应在作业中实现幂等性。 每次作业执行时间和间隔时间均较长的情况,建议监控作业运行时状态,可保证数据不会重复选取。
monitorPort int -1 作业监控端口 建议配置作业监控端口, 方便开发者dump作业信息。
maxTimeDiffSeconds int -1 最大允许的本机与注册中心的时间误差秒数 如果时间误差超过配置秒数则作业启动时将抛异常 配置为-1表示不校验时间误差
jobShardingStrategyClass String 作业分片策略实现类全路径 默认使用平均分配策略 详情参见:作业分片策略 http://elasticjob.io/docs/elastic-job-lite/02-guide/job-sharding-strategy
eventTraceRdbDataSource String 作业事件追踪的数据源Bean引用
reconcileIntervalMinutes int 10 修复作业服务器不一致状态服务调度间隔时间,配置为小于1的任意值表示不执行修复 单位:分钟
listenerName String 前置后置任务监听实现类Bean引用,需实现ElasticJobListener接口
distributedListenerName String 前置后置任务分布式监听实现类Bean引用,需继承AbstractDistributeOnceElasticJobListener类
streamingProcess boolean false 是否流式处理数据 (DataflowJob类型的作业专有属性)

https://blog.csdn.net/shujianhua/article/details/93594203

作者:Jeebiz  创建时间:2020-11-25 10:39
 更新时间:2023-12-22 21:08