各种定时任务快速入门
2024-09-30 08:00:49 # Technical # Notes

最近一个轻量级的项目中涉及到了自定义定时任务调度的需求,借此熟悉各种定时任务实现方式

crontab

Linux 中定时任务由 cron(crond)这个系统服务来控制,这个系统服务事默认启动的。用户可以通过 crontgab 命令设置自己的定时任务

crontab 命令格式:

1
crontab [参数]

相关参数:

参数 功能
-u 指定用户(没指定时默认当前用户)
-e 编辑某个用户的 crontab 文件内容
-l 显示某个用户的 crontab 文件内容
-r 删除某个用户的 crontab 文件
-i 删除某个用户的 crontab 文件时需确认

服务启停

1
2
3
4
5
6
7
8
9
10
#查看运行状态
service crond status
#启动服务
service crond start
#关闭服务
service crond stop
#重启服务
service crond restart
#重新载入配置
service crond reload

定时任务格式

使用 crontab -e 进入定时任务编辑页后,定义定时任务的执行周期与执行命令

1
2
3
4
5
6
7
8
9
10
.---------------------表示分钟 00~59 每分钟用 * 或者 */1 表示
| .------------------表示小时 00~23
| | .---------------表示日期 01~31
| | | .------------表示月份 01 ~12
| | | | .---------表示星期 0~6(从星期天开始)
| | | | | .----表示要运行的命令
| | | | | |
| | | | | |
* * * * * command
分 时 日 月 周 命令

除了数字外,还有一个符号

  • * :表示任意时间
  • , :表示不连续的时间,如:0 8,12,16 * * * command 表示每天的 8、12、16时0分都执行一次
  • - :表示连续的范围,如:0 5 * * 1-5 command 表示周一到周五的凌晨 5 点执行一次
  • */n :表示每隔多久执行一次,如:*/10 * * * * command 表示每隔 10 分钟执行一次

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#每1分钟执行一次myCommand
* * * * * myCommand
#每小时的第3和第15分钟执行
3,15 * * * * myCommand
#在上午8点到11点的第3和第15分钟执行
3,15 8-11 * * * myCommand
#每隔两天的上午8点到11点的第3和第15分钟执行
3,15 8-11 */2 * * myCommand
#每周一上午8点到11点的第3和第15分钟执行
3,15 8-11 * * 1 myCommand
#每晚的21:30重启smb
30 21 * * * /etc/init.d/smb restart
#每月1、10、22日的4 : 45重启smb
45 4 1,10,22 * * /etc/init.d/smb restart
#每周六、周日的1 : 10重启smb
10 1 * * 6,0 /etc/init.d/smb restart
#每天18 : 00至23 : 00之间每隔30分钟重启smb
0,30 18-23 * * * /etc/init.d/smb restart
#每星期六的晚上11 : 00 pm重启smb
0 23 * * 6 /etc/init.d/smb restart
#每一小时重启smb
0 */1 * * * /etc/init.d/smb restart
#晚上11点到早上7点之间,每隔一小时重启smb
0 23-7/1 * * * /etc/init.d/smb restart

特殊用法

crontab 除了使用 -e 的方式来编辑外,还可以通过管道符追加的方式来编辑

1
2
#当前用户的crontab内容输出并追加一行指令,并将修改后的内容重新加载到crontab中
(crontab -l; echo "0 4 * * * /backups.sh") | crontab -

crontab - 是一个特殊的语法,用于将标准输入中的内容作为新的 crontab 内容进行替换。通常,crontab 命令用于编辑、查看和管理用户的定时任务列表。当使用 crontab - 时,它会读取标准输入中的内容,并将其作为新的 crontab 内容进行替换,从而更新用户的定时任务列表。在给定的命令中,crontab -l 用于列出当前用户的 crontab 内容,然后通过管道操作符 | 将其输出作为标准输入传递给 crontab - 命令,以实现替换操作

还或者:

1
2
3
4
5
crontab - <<EOF
0 4 * * * /backups.sh
* * * * * /path/to/script1.sh
30 8 * * * /path/to/script2.sh
EOF

Timer

Timer 是 jdk 中提供的定时器工具,用于在后台执行指定任务

Timer 类就相当于一个任务调度器,里面包含了一个 TimerThread 线程,这个线程无限循环从 TaskQueue 中获取 TimerTask

6 中 TimerTask

  • **schedule(TimerTask task, Date time) **:指定时间点 time 执行
  • **schedule(TimerTask task, long delay)**:延迟 delay 后执行(毫秒)
  • **schedule(TimerTask task, Date firstTime,long period)**:指定时间点 firstTime 执行后,以固定频率 period 执行
  • **schedule(TimerTask task, long delay, long period)**:延迟 delay 后,以固定频率 period 执行
  • **scheduleAtFixedRate(TimerTask task,Date firstTime,long period)**:指定时间 firstTime 执行后,以固定频率 period 执行
  • **scheduleAtFixedRate(TimerTask task, long delay, long period)**:延迟 delay 后,以固定频率 period 执行

调度方式

上面提到,一个 Timer 实例就相当于一个调度器,而 Timer 只有一个执行线程,所以只要在一个 Timer 内的任务,就会相互影响

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":" + 1111);
}
}, 1000, 3000);
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ":" + 222);
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}, 1000, 5000);
}

out:

1
2
3
Timer-0:1111
Timer-0:222
...

这里两个定时任务都在同一个 timer 中,一个执行快,一个执行慢,二者都是同一个执行线程,这时就会出现执行慢的阻塞执行快的

如果第二个执行慢的使用另一个 timer 则不会影响第一个任务

schedule 与 scheduleAtFixedRate

上面 6 中 TimerTask 中,3-4 和 5-6 看似相同,它们的主要差别在于延迟的处理

还是以上面的例子说明,将 schedule 换成 scheduleAtFixedRate

第二个任务依然会阻塞第一个任务,但不同的是,当到第一个任务执行时,被阻塞的任务会立马执行,保证任务的整体执行次数

为方便观察,将第二个任务的阻塞改为只阻塞 3 次:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public static void main(String[] args) {
int i = 0;
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
log.info("{}:1111", Thread.currentThread().getName());
}
}, 1000, 3000);
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
log.info("{}:222", Thread.currentThread().getName());
if (i < 3) {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
i++;
}
}, 1000, 5000);
}

out:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
15:54:02.167 INFO --- [Timer-0] net.venom.core.Test: Timer-0:1111
15:54:02.172 INFO --- [Timer-0] net.venom.core.Test: Timer-0:222
15:54:12.180 INFO --- [Timer-0] net.venom.core.Test: Timer-0:1111
15:54:12.183 INFO --- [Timer-0] net.venom.core.Test: Timer-0:222
15:54:22.200 INFO --- [Timer-0] net.venom.core.Test: Timer-0:1111
15:54:22.203 INFO --- [Timer-0] net.venom.core.Test: Timer-0:1111
15:54:22.204 INFO --- [Timer-0] net.venom.core.Test: Timer-0:222
------------------------------------------------------------------
15:54:32.217 INFO --- [Timer-0] net.venom.core.Test: Timer-0:1111
15:54:32.219 INFO --- [Timer-0] net.venom.core.Test: Timer-0:1111
15:54:32.219 INFO --- [Timer-0] net.venom.core.Test: Timer-0:222
15:54:32.220 INFO --- [Timer-0] net.venom.core.Test: Timer-0:1111
15:54:32.220 INFO --- [Timer-0] net.venom.core.Test: Timer-0:222
15:54:32.221 INFO --- [Timer-0] net.venom.core.Test: Timer-0:1111
15:54:32.221 INFO --- [Timer-0] net.venom.core.Test: Timer-0:1111
15:54:32.222 INFO --- [Timer-0] net.venom.core.Test: Timer-0:222
15:54:32.222 INFO --- [Timer-0] net.venom.core.Test: Timer-0:1111
15:54:32.222 INFO --- [Timer-0] net.venom.core.Test: Timer-0:1111
15:54:32.222 INFO --- [Timer-0] net.venom.core.Test: Timer-0:222

可以看出,第三个 222 打印出来后,一瞬间就执行了超多任务,这些都是前面被阻塞的任务

优缺点

  • 使用非常方便,jdk 原生提供,不用引入第三方依赖

  • Timer 是单线程执行,如果某个任务执行过长,会出现阻塞

  • 如果 TimerTask 抛出 RuntimeException,整个 Timer 都会停掉

ScheduleExecutorService

ScheduledExecutorService 是 JDK1.5 版本引进的定时任务,该类位于 java.util.concurrent 并发包下

ScheduledExecutorService 是基于多线程的,设计的初衷是为了解决 Timer 单线程执行,多个任务之间会互相影响的问题

主要包含 4 个方法:

  • **schedule(Runnable command,long delay,TimeUnit unit)**:延迟时间的调度,只执行一次,调度之后可通过 Future.get() 阻塞直至任务执行完毕
  • **schedule(Callable callable,long delay,TimeUnit unit)**:延迟时间的调度,只执行一次,调度之后可通过 Future.get() 阻塞直至任务执行完毕,并且可以获取执行结果。
  • scheduleAtFixedRate:以固定频率执行的任务
  • scheduleWithFixedDelay:以固定延时执行任务,延时是相对当前任务结束为起点计算开始时间

调度方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) {
ScheduledExecutorService pool = Executors.newScheduledThreadPool(3);
pool.scheduleAtFixedRate(() -> {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("1 do...");
}, 2, 3, TimeUnit.SECONDS);
pool.scheduleAtFixedRate(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
log.info("2 do...");
}, 2, 3, TimeUnit.SECONDS);
pool.scheduleAtFixedRate(() -> {
log.info("3 do...");
}, 2, 3, TimeUnit.SECONDS);
}

out:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
16:23:09.920 INFO --- [pool-1-thread-3] net.venom.core.Test: 3 do...
16:23:11.927 INFO --- [pool-1-thread-2] net.venom.core.Test: 2 do...
16:23:12.921 INFO --- [pool-1-thread-3] net.venom.core.Test: 3 do...
16:23:14.926 INFO --- [pool-1-thread-2] net.venom.core.Test: 2 do...
16:23:15.910 INFO --- [pool-1-thread-3] net.venom.core.Test: 3 do...
16:23:17.926 INFO --- [pool-1-thread-2] net.venom.core.Test: 2 do...
16:23:18.921 INFO --- [pool-1-thread-3] net.venom.core.Test: 3 do...
16:23:19.931 INFO --- [pool-1-thread-1] net.venom.core.Test: 1 do...
16:23:20.926 INFO --- [pool-1-thread-2] net.venom.core.Test: 2 do...
16:23:21.918 INFO --- [pool-1-thread-3] net.venom.core.Test: 3 do...
16:23:23.921 INFO --- [pool-1-thread-2] net.venom.core.Test: 2 do...
16:23:24.917 INFO --- [pool-1-thread-3] net.venom.core.Test: 3 do...
16:23:26.921 INFO --- [pool-1-thread-2] net.venom.core.Test: 2 do...
16:23:27.914 INFO --- [pool-1-thread-3] net.venom.core.Test: 3 do...
16:23:29.927 INFO --- [pool-1-thread-2] net.venom.core.Test: 2 do...
16:23:29.943 INFO --- [pool-1-thread-1] net.venom.core.Test: 1 do...

这里多线程虽然可以避免多个任务间的影响,但如果是任务自身的执行缓慢,那么还是存在 上次任务对下次任务的阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) {
int i = 0;
ScheduledExecutorService pool = Executors.newScheduledThreadPool(3);
pool.scheduleAtFixedRate(() -> {
if (i < 3) {
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
log.info("do...");
i++;
}, 10, 3, TimeUnit.SECONDS);
}

out:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16:16:08.894 INFO --- [pool-1-thread-1] net.venom.core.Test: do...
16:16:18.907 INFO --- [pool-1-thread-1] net.venom.core.Test: do...
16:16:28.912 INFO --- [pool-1-thread-2] net.venom.core.Test: do...
-----------------------------------------------------------------
16:16:28.915 INFO --- [pool-1-thread-2] net.venom.core.Test: do...
16:16:28.916 INFO --- [pool-1-thread-3] net.venom.core.Test: do...
16:16:28.916 INFO --- [pool-1-thread-3] net.venom.core.Test: do...
16:16:28.917 INFO --- [pool-1-thread-3] net.venom.core.Test: do...
16:16:28.917 INFO --- [pool-1-thread-3] net.venom.core.Test: do...
16:16:28.918 INFO --- [pool-1-thread-3] net.venom.core.Test: do...
16:16:28.919 INFO --- [pool-1-thread-3] net.venom.core.Test: do...
16:16:28.919 INFO --- [pool-1-thread-3] net.venom.core.Test: do...
16:16:31.876 INFO --- [pool-1-thread-3] net.venom.core.Test: do...
16:16:34.879 INFO --- [pool-1-thread-3] net.venom.core.Test: do...
16:16:37.873 INFO --- [pool-1-thread-3] net.venom.core.Test: do...

可以明显发现,第三次执行完后,瞬间就有很多任务执行

优缺点

  • 基于线程池的定时任务,多个任务之间不会相互影响
  • 不支持复杂的定时规则

DelayQueue

DelayQueue 是 JUC 下提供的延迟队列,用于实现延时任务。如订单下单 15 分钟未付款则直接取消订单。DelayQueue 是 BlockingQueue 的一种,底层是一个基于 PriorityQueue 实现的一个无界队列。DelayQueue 通过 ReentrantLock 实现了互斥访问,通过 Condition 实现了线程间的等待和唤醒,所以它是 线程安全

调度方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedTask> delayedQueue = new DelayQueue<>();
delayedQueue.add(new DelayedTask(System.currentTimeMillis() + 20000, () -> log.info("延迟 20 秒")));
delayedQueue.add(new DelayedTask(System.currentTimeMillis() + 10000, () -> log.info("延迟 10 秒")));
delayedQueue.add(new DelayedTask(System.currentTimeMillis() + 30000, () -> log.info("延迟 30 秒")));

while (!delayedQueue.isEmpty()) {
delayedQueue.take().run();
}
}

@AllArgsConstructor
public static class DelayedTask implements Delayed {

/**
* 延迟时间
*/
private long delayTime;

/**
* 执行任务
*/
private Runnable task;

/**
* Description:获取延迟时间
* @param unit 时间单位
* @return long 剩余延迟时间
*/
@Override
public long getDelay(@NonNull TimeUnit unit) {
return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}

/**
* Description:比较延迟时间,将任务按升序排序
* @param o 任务
* @return int
*/
@Override
public int compareTo(@NonNull Delayed o) {
return Long.compare(this.delayTime, ((DelayedTask) o).delayTime);
}

public void run() {
task.run();
}
}

out:

1
2
3
15:50:58.933 INFO --- [main] net.venom.core.Test: 延迟 10 秒
15:51:08.933 INFO --- [main] net.venom.core.Test: 延迟 20 秒
15:51:18.935 INFO --- [main] net.venom.core.Test: 延迟 30 秒

可以看出 DelayQueue 会按照延迟时间排序后的顺序取出任务进行执行

那么如果一个大的任务执行超时,是否会阻塞后面的任务呢?

1
2
3
4
5
6
7
8
delayedQueue.add(new DelayedTask(System.currentTimeMillis() + 10000, () -> {
log.info("延迟 10 秒");
try {
TimeUnit.SECONDS.sleep(30);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}));

out:

1
2
3
15:58:57.545 INFO --- [main] net.venom.core.Test: 延迟 10 秒
15:59:27.563 INFO --- [main] net.venom.core.Test: 延迟 20 秒
15:59:27.563 INFO --- [main] net.venom.core.Test: 延迟 30 秒

可以看出,延迟 10 秒的任务严重阻塞了其他任务,当延迟 10 秒的任务执行完毕后,其他任务立即执行

但是 DelayQueue 最大的优势就是它是 线程安全 的,这意味着我们可以使用多线程从队列中获取任务来执行,从而避免某个任务对其他任务的阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
ExecutorService pool = Executors.newFixedThreadPool(3);
pool.submit(() -> {
while (!delayedQueue.isEmpty()) {
try {
delayedQueue.take().run();
} catch (InterruptedException e) {
}
}
});
pool.submit(() -> {
while (!delayedQueue.isEmpty()) {
try {
delayedQueue.take().run();
} catch (InterruptedException e) {
}
}
});
pool.submit(() -> {
while (!delayedQueue.isEmpty()) {
try {
delayedQueue.take().run();
} catch (InterruptedException e) {
}
}
});

out:

1
2
3
16:10:25.134 INFO --- [pool-1-thread-1] net.venom.core.Test: 延迟 10 秒
16:10:35.121 INFO --- [pool-1-thread-2] net.venom.core.Test: 延迟 20 秒
16:10:45.127 INFO --- [pool-1-thread-3] net.venom.core.Test: 延迟 30 秒

优缺点

  • 简单易用:DelayQueue 在 JUC 下,不需要引入第三方依赖,使用简单

  • 线程安全:多线程可以安全访问和操作其中的元素

  • 场景局限:DelayQueue 仅支持延迟执行任务,无法实现周期性执行任务

  • 内存占用:DelayQueue 使用优先级队列来实现,需要维护内部的任务队列,当任务数量较多时,会占用一定内存

  • 无法动态调整时间:一旦任务被添加到 DelayQueue 中,其执行时间就固定了,无法动态调整

  • 不适用于大规模任务:DelayQueue 内部使用了🔒来保证线程安全,当任务量较大时,可能会出现性能瓶颈

TimmingWheel

时间轮(TimmingWheel)是一种 实现延迟功能 的巧妙算法。如果一个系统存在大量的任务调度,时间轮可以高效的利用线程资源来进行批量化调度。相较于 jdk 自带的 TimerScheduledExecutorServiceDelayQueue 而言,时间轮是一种更加高效的调度模型

调度方式

时间轮是一个存储定时任务的 环形队列,底层采用数组实现,数组中每个元素可以放一个定时任务列表(TimerTaskList)。TimerTaskList 是一个 环形双向链表,链表中每一项都是一个定时任务实体(TimerTaskEntry),其中封装了具体的定时任务(TimerTask)

时间轮结构

  • tickMs:基本时间跨度,时间轮由多个时间格组成,每个格子代表时间轮的基本时间跨度
  • wheelSize:时间格数量,时间轮的格子数量是固定的
  • startMs:时间轮起始时间
  • Interval:时间轮跨度,可以由 tickMs * wheelSize 计算得出

处理流程

  1. 若时间轮的 tickMs=1ms,wheelSize=20,那么可以计算得出 interval 为 20ms
  2. 初始情况下表盘指针 currentTime 指向时间格 0,此时有一个定时为 2ms 的任务插入进来会存放到时间格为 2 的 TimerTaskList 中
  3. 随着时间的不断推移,指针 currentTime 不断向前推进,过了 2ms 之后,当到达时间格 2 时,就需要将时间格 2 所对应的 TimeTaskList 中的任务做相应的到期操作
  4. 此时若又有一个定时为 8ms 的任务插入进来,则会存放到时间格 10 中,currentTime 再过 8ms 后会指向时间格 10
  5. 如果同时有一个定时为 19ms 的任务插入进来,新来的 TimerTaskEntry 会复用原来的 TimerTaskList,所以它会插入到原本已经到期的时间格 1 中

多重时间轮

如果此时有个定时为 350ms 的任务该如何处理?直接扩充 wheelSize 的大小么?

很多业务场景不乏几万甚至几十万毫秒的定时任务,这个 wheelSize 的扩充没有底线,就算将所有的定时任务的到期时间都设定一个上限,比如 100 万毫秒,那么这个 wheelSize 为 100 万毫秒的时间轮不仅占用很大的内存空间,而且效率也会拉低。所以 层级时间轮(类似十进制/二进制的计数方式) 的概念应运而生,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中

层级时间轮

复用之前的案例,第一层的时间轮 tickMs=1ms, wheelSize=20, interval=20ms。第二层的时间轮的 tickMs 为第一层时间轮的 interval,即为 20ms。每一层时间轮的 wheelSize 是固定的,都是 20,那么第二层的时间轮的总体时间跨度 interval 为 400ms。以此类推,这个 400ms 也是第三层的 tickMs 的大小,第三层的时间轮的总体时间跨度为 8000ms

  1. 当到达时间格 2 时,如果此时有个定时为 350ms 的任务,显然第一层时间轮不能满足条件,所以就 时间轮升级 到第二层时间轮中,最终被插入到第二层时间轮中时间格 17 所对应的 TimerTaskList 中;
  2. 如果此时又有一个定时为 450ms 的任务,那么显然第二层时间轮也无法满足条件,所以又升级到第三层时间轮中,最终被插入到第三层时间轮中时间格 1 的 TimerTaskList 中;
  3. 注意到在到期时间在 [400ms,800ms) 区间的多个任务(比如446ms、455ms以及473ms的定时任务)都会被放入到第三层时间轮的时间格 1 中,时间格 1 对应的TimerTaskList的超时时间为400ms;
  4. 随着时间的流逝,当次 TimerTaskList 到期之时,原本定时为 450ms 的任务还剩下 50ms 的时间,还不能执行这个任务的到期操作。这里就有一个 时间轮降级 的操作,会将这个剩余时间为 50ms 的定时任务重新提交到层级时间轮中,此时第一层时间轮的总体时间跨度不够,而第二层足够,所以该任务被放到第二层时间轮到期时间为 [40ms,60ms) 的时间格中;
  5. 再经历了 40ms 之后,此时这个任务又被“察觉”到,不过还剩余 10ms,还是不能立即执行到期操作。所以还要再有一次时间轮的降级,此任务被添加到第一层时间轮到期时间为 [10ms,11ms) 的时间格中,之后再经历 10ms 后,此任务真正到期,最终执行相应的到期操作

具体代码实现

优缺点

  • 高效的任务调度:时间轮算法通过将任务按照延迟时间划分到不同的时间槽中,实现了高效的任务调度,能够在 O(1) 的时间复杂度内找到需要执行的任务
  • 适用于大量短期任务:时间轮算法适用于大量短期任务的调度
  • 需要手动实现:jdk 中没有提供具体实现,需要自己手动实现或引入第三方工具
  • 无法动态调整执行时间:一旦任务被添加到时间轮中,其执行时间就是固定的,无法动态调整
  • 不支持周期性任务:时间轮算法主要用于执行一次性任务,不适用于周期性任务的调度
  • 内存消耗较大:如果时间轮的槽位数量过多,或者每个槽位中的任务数量较多,可能会占用较多的内存空间
  • 不适于长时间延迟的任务:时间轮算法适用于短期任务的调度,对于长时间延迟的任务,可能需要其他算法来处理

Spring Task

Spring Task 是 Spring3 以上版本自带的定时任务

调用方式

  1. SpringBoot 启动类上加上 @EnableScheduling
  2. 使用 @Scheduled 定义定时规则

Spring Cron 规则

1
2
3
4
5
6
7
8
9
10
.------------------------表示秒:0~59,支持 * , - /
| .---------------------表示分:0~59,支持 * , - /
| | .------------------表示时:0~23,支持 * , - /
| | | .---------------表示日期:1~31,支持 * , - / ? L W C
| | | | .-----------表示月:1~12,支持 * , - /
| | | | | .-------表示星期:1~7(从星期天开始),支持 * , - / ? L C #
| | | | | | .---表示年:1970~2099(可选的),支持 * , - /
| | | | | | |
* * * * * * *
秒 分 时 日期 月 星期 年

需要注意的是,「日期」和「星期」都多了一个特殊符「?」,

符号含义:

  • *****:任意值,在月域中,* 表示每个月,在星期域中,* 表示星期的每一天
  • **,**:枚举值,在分钟域中,5,20 表示分别在 5 分钟和 20 分钟触发一次
  • **-**:范围,在分钟域中,5-20 表示从 5 分钟到 20 分钟之间 每隔一分钟触发一次
  • **/**:指定数值的增量,在分钟域中,0/15 表示从第 0 分钟开始,每 15 分钟,3/20 表示从第 3 分钟开始,每 20 分钟
  • **?**:不指定值,仅 日期和星期 支持该字符, 当 日期星期 其中之一被指定了值以后,为了避免冲突,需要将另一个域的值设为 ?
  • L:Last,表示 最后一天,仅 日期和星期域 支持该字符,指定 L 字符时,避免指定列表或者范围,否则,会导致逻辑问题
    • 日期域 中,L 表示 某个月的最后一天。在 星期域 中,L 表示一个星期的最后一天,也就是星期日SUN
    • 如果在 L 前有具体的内容,例如,在星期域中的 6L 表示这个月的最后一个星期六
  • W:除周末以外的有效工作日,在离指定日期的最近的有效工作日触发事件。W 字符寻找最近有效工作日时不会跨过当前月份,连用字符 LW 时表示为指定月份的最后一个工作日。在日期域中 5W,如果 5日 是星期六,则将在最近的工作日星期五,即 4日 触发。如果 5日 是星期天,则将在最近的工作日星期一,即 6日 触发;如果 5日 在星期一到星期五中的一天,则就在 5日 触发
  • C:允许在日期域和星期域出现,这个字符依靠一个指定的「日历」, 也就是说这个表达式的值依赖于相关的「日历」的计算结果,如果没有「日历」关联,则等价于所有包含的「日历」。如:日期域是 5C 表示关联「日历」中第一天,或者这个月开始的第一天的后 5 天。星期域是 1C 表示关联「日历」中第一天,或者星期的第一天的后 1 天,也就是周日的后一天(周一)
  • **#**:确定 每个月第几个星期几,仅 星期域 支持该字符,在星期域中,4#2 表示某月的第二个星期四

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 每天上午10:15执行任务
0 15 10 ? * *
# 每天上午10:15执行任务
0 15 10 * * ?
# 每天中午12:00执行任务
0 0 12 * * ?
# 每天上午10:00点、下午14:00以及下午16:00执行任务
0 0 10,14,16 * * ?
# 每天上午09:00到下午17:00时间段内每隔半小时执行任务
0 0/30 9-17 * * ?
# 每天下午14:00到下午14:59时间段内每隔1分钟执行任务
0 * 14 * * ?
# 每天下午14:00到下午14:05时间段内每隔1分钟执行任务
0 0-5 14 * * ?
# 每天下午14:00到下午14:55时间段内每隔5分钟执行任务
0 0/5 14 * * ?
# 每天下午14:00到下午14:55、下午18:00到下午18:55时间段内每隔5分钟执行任务
0 0/5 14,18 * * ?
# 每个星期三中午12:00执行任务
0 0 12 ? * WED
# 每月15日上午10:15执行任务
0 15 10 15 * ?
# 每月最后一日上午10:15执行任务
0 15 10 L * ?
# 每月最后一个星期六上午10:15执行任务
0 15 10 ? * 6L
# 每月第三个星期六上午10:15执行任务
0 15 10 ? * 6#3
# 每年3月的每个星期三下午14:10和14:44执行任务
0 10,44 14 ? 3 WED

优缺点

  • 使用简单,Spring 做了很好的封装,适用于绝大多数单机版的业务场景
  • 支持复杂 cron 表达式
  • 默认 单线程,需要手动配置线程池
  • 不支持分布式场景
  • 定时任务不能持久化

Quartz

Quartz 是一个功能强大、灵活、可靠的作业调度开源框架,用于在 Java 应用程序中实现任务调度和定时任务执行。它提供了丰富的功能,能够满足各种复杂的调度需求,包括简单的定时任务、复杂的作业调度、集群环境下的任务调度等。

Quartz 的主要特点和功能包括:

  • 灵活的任务调度:Quartz 支持基于时间、间隔、表达式等多种方式的任务调度,能够满足各种复杂的调度需求。
  • 作业管理:Quartz 允许用户定义和管理各种类型的作业(Job),包括普通的任务、持久化的任务、集群环境下的任务等。
  • 可靠性和容错性:Quartz 提供了可靠的任务执行机制和容错机制,能够保证任务按时执行,即使在系统故障或者异常情况下也能够保证任务的执行。
  • 集群支持:Quartz 提供了集群支持功能,多个 Quartz 实例可以组成一个集群,实现任务的负载均衡和高可用性。
  • 持久化支持:Quartz 支持将任务调度信息持久化到数据库中,确保任务信息不会丢失,即使系统重启也能够恢复任务调度状态。
  • 监听器和触发器:Quartz 提供了丰富的监听器和触发器机制,能够对任务的执行过程进行监控和控制。
  • 插件机制:Quartz 提供了插件机制,用户可以根据自己的需求扩展和定制 Quartz 的功能

调用方式

引入相关依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-quartz</artifactId>
</dependency>

创建定时任务执行类

1
2
3
4
5
6
7
public class QuartzTestJob extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
String userName = (String) context.getJobDetail().getJobDataMap().get("userName");
System.out.println("userName:" + userName);
}
}

创建调度程序 JobDetail 和调度器 Trigger

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Configuration
public class QuartzConfig {
@Value("${test.cron}")
private String testCron;

/**
* 创建定时任务
*/
@Bean
public JobDetail quartzTestDetail() {
JobDetail jobDetail = JobBuilder.newJob(QuartzTestJob.class)
.withIdentity("quartzTestDetail", "QUARTZ_TEST")
.usingJobData("userName", "susan")
.storeDurably()
.build();
return jobDetail;
}

/**
* 创建触发器
*/
@Bean
public Trigger quartzTestJobTrigger() {
//每隔5秒执行一次
CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(testCron);

//创建触发器
Trigger trigger = TriggerBuilder.newTrigger()
.forJob(quartzTestDetail())
.withIdentity("quartzTestJobTrigger", "QUARTZ_TEST_JOB_TRIGGER")
.withSchedule(cronScheduleBuilder)
.build();
return trigger;
}
}

camel + quartz2

引入依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring-boot-starter</artifactId>
<version>2.20.1</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-quartz2</artifactId>
<version>2.20.1</version>
</dependency>

新建任务

1
2
3
4
5
6
7
8
9
10
11
12
@Slf4j
public class QuartzTestJob {

@Value(value = "${test.switch:false}")
private boolean testSwitch;

public void execJob() {
if (testSwitch) {
log.info("Start QuartzTestJob!");
}
}
}

新建路由

1
2
3
4
5
6
7
8
9
10
11
12
13
@Configuration
public class TestSchedule extends RouteBuilder {

@Value(value = "${testJob.cron:0 0 9 * * ?}")
private String testJobCron;

@Override
public void configure() throws Exception {
from("quartz2://group/QuartzTestJob?cron=" + testJobCron + "&stateful=true")
.routeId("QuartzTestJobTimer").setHeader("ROUTING_KEY", simple("'QuartzTestJob'"))
.to("class:com.venom.job.QuartzTestJob?method=execJob");
}
}

Quartz Cron 的格式与 Spring Task 相同

优点

  • 灵活性高:Quartz 提供了丰富的任务调度功能,支持多种触发器类型、灵活的任务调度配置,能够满足各种复杂的调度需求。
  • 可靠性强:Quartz 具有良好的容错机制和任务状态管理,能够保证任务的可靠执行,即使在系统异常或者故障情况下也能够确保任务的执行。
  • 集群支持:Quartz 支持集群部署,多个 Quartz 实例可以组成一个集群,实现任务的负载均衡和高可用性。
  • 持久化支持:Quartz 提供了持久化支持,可以将任务调度信息持久化到数据库中,确保任务信息不会丢失,即使系统重启也能够恢复任务调度状态。
  • 丰富的监听器机制:Quartz 提供了丰富的监听器机制,能够对任务的执行过程进行监控和控制,实现任务执行过程的定制化管理。
  • 易于集成:Quartz 是一个开源框架,易于集成到各种 Java 应用程序中,可以与 Spring、JEE 等技术栈无缝集成,提高了开发效率。

缺点

  • 学习曲线较陡:Quartz 的功能较为丰富,对于新手来说学习曲线可能较陡,需要一定的时间和精力去理解和掌握。
  • 配置复杂:Quartz 的配置相对复杂,尤其是在集群部署和持久化配置方面,需要仔细配置才能保证系统的稳定性和可靠性。
  • 资源占用较高:Quartz 的集群模式下,需要占用一定的系统资源和数据库资源,特别是在任务调度频繁的情况下,可能会占用较多的资源。
  • 不适用于实时任务:Quartz 是一个基于时间的作业调度框架,不适用于实时任务的调度,如果需要实时任务调度,可能需要其他更适合的技术方案。
  • 文档和社区支持有限:相对于一些其他流行的开源框架,Quartz 的文档和社区支持相对有限,可能会在使用过程中遇到一些问题

xxl-job

xxl-job 是大众点评(许雪里)开发的一个分布式任务调度平台,专注于解决大数据场景下的任务调度问题。它是一个开源项目,提供了任务调度和管理、任务执行器、任务日志查看等功能,适用于各种规模的企业应用场景。

xxl-job 的主要特点和功能包括:

  • 分布式任务调度:xxl-job 提供了分布式任务调度功能,支持在多台服务器上进行任务调度和执行,能够满足大规模任务调度的需求。
  • Web 控制台:xxl-job 提供了 Web 控制台,用于任务的管理、监控和调度配置,管理员可以通过 Web 控制台方便地管理任务和查看任务执行情况。
  • 任务执行器:xxl-job 提供了任务执行器,用于执行具体的任务逻辑,任务执行器可以在多台服务器上部署,通过注册到调度中心进行任务调度和执行。
  • 任务分片:xxl-job 支持任务分片功能,可以将一个任务分成多个子任务,每个子任务由不同的执行器执行,提高了任务的执行效率和并发能力。
  • 任务调度策略:xxl-job 支持多种任务调度策略,包括固定频率调度、固定延迟调度、Cron 表达式调度等,能够满足各种复杂的调度需求。
  • 任务日志查看:xxl-job 提供了任务日志查看功能,可以方便地查看任务的执行日志和执行状态,帮助管理员快速定位和排查问题。
  • 报警机制:xxl-job 支持报警机制,可以通过邮件、短信等方式发送任务执行结果和异常信息,及时通知管理员处理。

调用方式

引入依赖

1
2
3
4
5
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.3.1</version>
</dependency>

添加配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
# xxl-job, access token
xxl.job.accessToken=default_token
# xxl-job executor appname
xxl.job.executor.appname=xxl-job-executor-sample
# xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
xxl.job.executor.address=
# xxl-job executor server-info
xxl.job.executor.ip=
xxl.job.executor.port=9999
# xxl-job executor log-path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
# xxl-job executor log-retention-days
xxl.job.executor.logretentiondays=30

配置 xxl-job bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Configuration
public class XxlJobConfig {
private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);

@Value("${xxl.job.admin.addresses}")
private String adminAddresses;

@Value("${xxl.job.accessToken}")
private String accessToken;

@Value("${xxl.job.executor.appname}")
private String appname;

@Value("${xxl.job.executor.address}")
private String address;

@Value("${xxl.job.executor.ip}")
private String ip;

@Value("${xxl.job.executor.port}")
private int port;

@Value("${xxl.job.executor.logpath}")
private String logPath;

@Value("${xxl.job.executor.logretentiondays}")
private int logRetentionDays;


@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);

// Bean方法模式
// 通过扫描@XxlJob方式注册

// 注册Bean类模式
XxlJobExecutor.registJobHandler("beanClassDemoJobHandler", new BeanClassDemoJob());

return xxlJobSpringExecutor;
}
}

新建 Job

  1. 在Spring Bean实例中,开发Job方法
  2. 为 Job 方法添加注解 @XxlJob(value="job's name", init = "job 初始化方法", destroy = "job 销毁方法")
  3. 需要通过 “XxlJobHelper.log” 打印执行日志
  4. 默认任务结果为「成功」状态,不需要主动设置,如有诉求,比如设置任务结果为失败,可以通过 XxlJobHelper.handleFail/handleSuccess 自主设置任务结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
@Slf4j
@Component
public class BeanMethodDemoJob {

/**
* 1、简单任务示例(Bean模式)
*/
@XxlJob("demoJobHandler")
public void demoJobHandler() {
XxlJobHelper.log("demoJobHandler execute...");
}

/**
* 2、分片广播任务
*/
@XxlJob("shardingJobHandler")
public void shardingJobHandler() throws Exception {
// logback console日志
log.info("shardingJobHandler execute...");

// 通过xxl记录到DB中的日志
XxlJobHelper.log("shardingJobHandler execute...");

// 分片参数
int shardIndex = XxlJobHelper.getShardIndex();
int shardTotal = XxlJobHelper.getShardTotal();

XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);

// 业务逻辑
for (int i = 0; i < shardTotal; i++) {
if (i==shardIndex) {
XxlJobHelper.log("第 {} 片, 命中分片开始处理", i);
} else {
XxlJobHelper.log("第 {} 片, 忽略", i);
}
}

}


/**
* 3、命令行任务
*/
@XxlJob("commandJobHandler")
public void commandJobHandler() throws Exception {
XxlJobHelper.log("commandJobHandler execute...");

String command = XxlJobHelper.getJobParam();
int exitValue = -1;

BufferedReader bufferedReader = null;
try {
// command process
ProcessBuilder processBuilder = new ProcessBuilder();
processBuilder.command(command);
processBuilder.redirectErrorStream(true);

Process process = processBuilder.start();
//Process process = Runtime.getRuntime().exec(command);

BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));

// command log
String line;
while ((line = bufferedReader.readLine())!=null) {
XxlJobHelper.log(line);
}

// command exit
process.waitFor();
exitValue = process.exitValue();
} catch (Exception e) {
XxlJobHelper.log(e);
} finally {
if (bufferedReader!=null) {
bufferedReader.close();
}
}

if (exitValue==0) {
// default success
} else {
XxlJobHelper.handleFail("command exit value(" + exitValue + ") is failed");
}

}


/**
* 4、跨平台Http任务
* 参数示例:
* "url: http://www.baidu.com\n" +
* "method: get\n" +
* "data: content\n";
*/
@XxlJob("httpJobHandler")
public void httpJobHandler() throws Exception {
XxlJobHelper.log("httpJobHandler execute...");

// param parse
String param = XxlJobHelper.getJobParam();
if (param==null || param.trim().length()==0) {
XxlJobHelper.log("param[" + param + "] invalid.");

XxlJobHelper.handleFail();
return;
}

String[] httpParams = param.split("\n");
String url = null;
String method = null;
String data = null;
for (String httpParam : httpParams) {
if (httpParam.startsWith("url:")) {
url = httpParam.substring(httpParam.indexOf("url:") + 4).trim();
}
if (httpParam.startsWith("method:")) {
method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase();
}
if (httpParam.startsWith("data:")) {
data = httpParam.substring(httpParam.indexOf("data:") + 5).trim();
}
}

// param valid
if (url==null || url.trim().length()==0) {
XxlJobHelper.log("url[" + url + "] invalid.");

XxlJobHelper.handleFail();
return;
}
if (method==null || !Arrays.asList("GET", "POST").contains(method)) {
XxlJobHelper.log("method[" + method + "] invalid.");

XxlJobHelper.handleFail();
return;
}
boolean isPostMethod = method.equals("POST");

// request
HttpURLConnection connection = null;
BufferedReader bufferedReader = null;
try {
// connection
URL realUrl = new URL(url);
connection = (HttpURLConnection) realUrl.openConnection();

// connection setting
connection.setRequestMethod(method);
connection.setDoOutput(isPostMethod);
connection.setDoInput(true);
connection.setUseCaches(false);
connection.setReadTimeout(5 * 1000);
connection.setConnectTimeout(3 * 1000);
connection.setRequestProperty("connection", "Keep-Alive");
connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");

// do connection
connection.connect();

// data
if (isPostMethod && data!=null && data.trim().length() > 0) {
DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
dataOutputStream.write(data.getBytes("UTF-8"));
dataOutputStream.flush();
dataOutputStream.close();
}

// valid StatusCode
int statusCode = connection.getResponseCode();
if (statusCode!=200) {
throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
}

// result
bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
StringBuilder result = new StringBuilder();
String line;
while ((line = bufferedReader.readLine())!=null) {
result.append(line);
}
String responseMsg = result.toString();

XxlJobHelper.log(responseMsg);

return;
} catch (Exception e) {
XxlJobHelper.log(e);

XxlJobHelper.handleFail();
return;
} finally {
try {
if (bufferedReader!=null) {
bufferedReader.close();
}
if (connection!=null) {
connection.disconnect();
}
} catch (Exception e2) {
XxlJobHelper.log(e2);
}
}

}

/**
* 5、生命周期任务示例:任务初始化与销毁时,支持自定义相关逻辑;
*/
@XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy")
public void demoJobHandler2() throws Exception {
XxlJobHelper.log("demoJobHandler2, execute...");
}

public void init() {
log.info("init");
}

public void destroy() {
log.info("destroy");
}
}

优缺点

  • 有界面管理定时任务,支持弹性扩容缩容、动态分片、故障转移、失败报警等功能。它的功能非常强大,很多大厂在用,可以满足绝大多数业务场景
  • 和 quartz 一样,通过数据库分布式锁,来控制任务不能重复执行。在任务非常多的情况下,有一些性能问题

Redis

Redis 可以用来处理延迟类的任务,主要基于两种方案:Redis 过期事件监听和 Redisson 的延迟队列

Redis 过期事件监听

Spring Data Redis 专门提供了一个监听 Redis Key 过期事件的监听器 KeyExpirationEventMessageListener

继承它,并且重写 doHandleMessage(Message message) 方法即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Component
public class KeyExpireListener extends KeyExpirationEventMessageListener {

final static Logger logger = LoggerFactory.getLogger(KeyExpireListener.class);

// 通过构造函数注入 RedisMessageListenerContainer 给 KeyExpirationEventMessageListener
public KeyExpireListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}

@Override
public void doHandleMessage(Message message) {

// 过期的 key
byte[] body = message.getBody();

// 消息通道
byte[] channel = message.getChannel();

logger.info("message = {}, channel = {}", new String(body), new String(channel));
}
}

redis 默认不开启过期事件提醒,需要修改 Redis 配置文件中 notify-keyspace-events Ex 开启提醒

可以通过 config get notify-keyspace-events 获取配置信息

Redisson 延迟队列

先看 Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException {
// 初始化 Redisson
Config config = new Config();
config.useSingleServer().setAddress("redis://172.29.2.10:7000");
RedissonClient redisson = Redisson.create(config);
// 获取队列
RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("test_queue");
RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue);

// 模拟消费者
new Thread() {
public void run() {
while(true) {
try {
System.err.println( blockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
}.start();

// 模拟生产者
for(int i=1;i<=5;i++) {
// 向阻塞队列放入数据
delayedQueue.offer("fffffffff"+i, 13, TimeUnit.SECONDS);
}
}

大致原理:

Demo 运行后会发现 Redis 中存在两个列队 redisson_delay_queue_timeout:test_queueredisson_delay_queue:test_queue

这两个队列中,带有timeout关键字的那条是一个ZSet集合,另一个是普通的List

但是为什么要设计两条队列呢,后边我查阅了相关资料,大概整理了下Redisson的延迟队列原理如下:

  • 客户端启动,redisson 先订阅一个 key,同时 BLPOP key 0 无限监听一个阻塞队列(等里面有数据了就返回)
  • 当有数据 put 时,redisson 先把数据放到一个 zset 集合(按延时到期时间的时间戳为分数排序),同时发布上面订阅的 key,发布内容为数据到期的 timeout,此时客户端进程开启一个延时任务,延时时间为发布的 timeout
  • 客户端进程的延时任务到了时间执行,从 zset 分页取出过了当前时间的数据,然后将数据 rpush 到第一步的阻塞队列里。然后将当前数据从 zset 移除,取完之后,又执行 BLPOP key 0 无限监听一个阻塞队列。这一部分的逻辑,客户端会发送一个 lua 脚本给到服务端去操作:org.redisson.RedissonDelayedQueue 源码里面的 pushTaskAsync 函数有 lua 脚本内容
  • 上一步客户端监听的阻塞队列返回取到数据,回调到 RBlockingQueue 的 take 方法。于是,我们就收到了数据

这里并未在生产中使用过 Redisson 的延迟队列,网上对此的问题也颇多,需谨慎使用

相关详细原理参靠: 关于Redisson延迟队列的一些思考 (qq.com) Redisson 延时队列 原理 详解 - 知乎 (zhihu.com)

MQ

大部分消息队列,例如 RocketMQ、RabbitMQ,都支持定时/延时消息。定时消息和延时消息本质其实是相同的,都是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。

不过,在使用 MQ 定时消息之前一定要看清楚其使用限制,以免不适合项目需求,例如 RocketMQ 定时时长最大值默认为 24 小时且不支持自定义修改、只支持 18 个 Level 的延时并不支持任意时间

Thanks