在xxl-job中使用分片任务
本文介绍在如何在 xxl-job中使用创建并使用分片任务。
xxl-job
是国内开源的一款轻量级分布式任务调度平台,开发者是大众点评的工程师,其目前维护一个
开源社区,里面还有很多已经发布或尚在孵化的开源项目。
任务分片是一个以空间换时间的概念,旨在将耗时任务进行拆分,然后同时执行,拆分之后执行的结果对任务任务原来不分片执行的结果没有影响。
比如要核对id从1-1000的用户的邮箱信息,找出无效的邮箱信息。可以将id分成合适的多小段,1-100,101-200,...,901-1000,然后交给不同的任务去执行。这就是任务分片的简单模型。
在阅读此文之前,需要理解xxl-job的基本模型与工作流程,其核心概念有2:
调度中心
负责管理调度信息,按照调度配置发出调度请求,自身不承担业务代码。调度系统与任务解耦,提高了系统可用性和稳定性,同时调度系统性能不再受限于任务模块;
支持可视化、简单且动态的管理调度信息,包括任务新建,更新,删除,GLUE开发和任务报警等,所有上述操作都会实时生效,同时支持监控调度结果以及执行日志,支持执行器Failover。
执行器
负责接收调度请求并执行任务逻辑。任务模块专注于任务的执行等操作,开发和维护更加简单和高效;接收“调度中心”的执行请求、终止请求和日志请求等。
调度中心自动发现并注册执行器,并且通过执行器提供的api对任务进行调度(执行/终止等操作)。
本文测试所使用的xxl-job所有模块基于最新的迭代版本v2.3.0,此次迭代中配置分片任务的方式与之前版本有些许不同:
- 【新增】新增任务辅助工具
XxlJobHelper
:提供统一任务辅助能力,包括:任务上下文信息维护获取(任务参数、任务ID、分片参数)、日志输出、任务结果设置……等; ShardingUtil
组件废弃:改用XxlJobHelper.getShardIndex()/getShardTotal();
获取分片参数;XxlJobLogger
组件废弃:改用XxlJobHelper.log
进行日志输出;
其他更新日志可以查看该版本的 RELEASE NOTE
在xxl-job中,可以通过两种方式实现分片任务:
- 单实例多任务执行分片,等效于前文中的例子,将一个大任务拆分成多个小任务;
- 多实例单任务执行分片,将任务按照一定的方式分配到多个实例上去运行——以多个实例上配置相同的任务为前提;
模拟需要完成的工作 #
在开始之前,我们先模拟需要完成的工作:有100个账户,每个账户有随机1-10条数据需要处理。代码片段如下
1private final static Random RAND = new Random(47);
2
3/**
4 * 100 ids
5 */
6private static final List<Integer> CITY_ID_LIST = new ArrayList<Integer>() {{
7 for (int i = 1; i <= 100; i++) {
8 add(i);
9 }
10}};
11
12/**
13 * task num for each id
14 */
15private static int task_num_per_id;
16
17/**
18 * 任务数据库
19 */
20private static final Map<Integer, List<String>> TASKS;
21
22static {
23 TASKS = new HashMap<>();
24 CITY_ID_LIST.forEach(city -> {
25 task_num_per_id = RAND.nextInt(10);
26 List<String> cityTasks = new ArrayList<>(task_num_per_id);
27 IntStream.rangeClosed(1, task_num_per_id).forEach(index -> {
28 String orderInfo = city + "------NO." + index;
29 cityTasks.add(orderInfo);
30 });
31 TASKS.put(city, cityTasks);
32 });
33}
单实例多任务分片 #
当使用单执行器实例时,我们可以在调度中心创建多个任务,通过分配不同的任务参数来实现任务的分片。任务的实现代码如下所示:
1@XxlJob("singleExecutorMultiThreads")
2public void singleExecutorMultiThreadsCityJob() throws Exception {
3 // 当不配置参数时,此方法返回空字符串""
4 String shards = XxlJobContext.getXxlJobContext().getJobParam();
5 XxlJobHelper.log("XXL-JOB, 单机分片任务开始. 分片参数:{}", shards);
6
7 if (StringUtils.isEmpty(shards)) {
8// XxlJobHelper.handleFail("任务参数不能为空!");
9
10 // 不分片,全量执行
11 IntStream.range(0, CITY_ID_LIST.size())
12 .forEach(i -> {
13 int cityId = CITY_ID_LIST.get(i);
14 List<String> task = TASKS.get(cityId);
15 task.forEach(t -> XxlJobHelper.log("【{}】执行【{}】,
16 任务内容为:{}",
17 Thread.currentThread().getName(), cityId, t));
18 });
19 } else {
20 // 分片执行
21 Arrays.stream(shards.split(","))
22 .map(String::trim)
23 .filter(StringUtils::isNotBlank)
24 .map(Integer::parseInt)
25 .forEach(cityId -> {
26 List<String> task = TASKS.get(cityId);
27 Optional.ofNullable(task).ifPresent(todoTasks -> {
28 todoTasks.forEach(t ->
29 XxlJobHelper.log("【{}】执行【{}】,
30 任务内容为:{}",
31 Thread.currentThread().getName(), cityId, t));
32 });
33 });
34 }
35
36}
在调度中心,我们可以像这样创建分片任务:
可以根据情况创建任务数,来进行单实例复杂任务的分片。
配置完成后的任务列表看起来像这样:
这2个任务使用不同的任务参数,其他配置可以大体相同甚至完全一致。
这里需要说明的是,
当前版本对于任务参数的处理做了修改,JobHandler
中的方法不再直接直接调用含有参数的任务方法,而是通过XxlJobContext.getXxlJobContext().getJobParam()
直接在任务中获取分片参数。
配置完成之后,我们执行任务,即可以看到调度日志(部分):
可以看到,执行器开启了不同的线程分别执行分片任务,这样可以节省任务执行的时间开销。
这就是xxl-job的单实例分片任务创建方法。
需要说明的是,上面的任务如果不设置分片参数,那么将会执行全部的任务。
多实例分片广播 #
在多执行器实例的情况下,分片任务有多种路由策略,此处暂且不讨论路由策略,在分片广播的模式下进行测试。分片任务的实现代码如下:
1@XxlJob("multiExecutorsSharding")
2public void multiExecutorShardingCityJob() throws Exception {
3 XxlJobHelper.log("XXL-JOB, 多实例分片任务开始.");
4 int shardIndex = XxlJobHelper.getShardIndex();
5 int shardTotal = XxlJobHelper.getShardTotal();
6
7 IntStream.range(0, CITY_ID_LIST.size()).forEach(i -> {
8 if (i % shardTotal == shardIndex) {
9 int cityId = CITY_ID_LIST.get(i);
10 List<String> task = TASKS.get(cityId);
11 Optional.ofNullable(task).ifPresent(todoTasks -> {
12 todoTasks.forEach(t -> XxlJobHelper.log("实例【{}】执行【{}】,
13 任务内容为:{}",
14 shardIndex, cityId, t));
15 });
16 }
17 });
18}
可以看到,实际上是通过对所有执行器实例id取模的方式,将任务均匀地分配到所有的执行器上去执行。
调度中心创建任务像这样:
在这个模式下,任务并没有设置分片参数,不过我们需要额外启动一个执行器实例:
java -jar -Dserver.port=8082 -Dxxl.job.executor.port=9998
xxl-job-executor-sample-springboot-2.3.0-SNAPSHOT.jar
上述指定的端口需要和之前的实例不同即可。
运行任务成功之后,我们可以看到调度日志(部分):
从日志可以看出,任务被规律地分配到了2个执行器实例上。