最近一个轻量级的项目中涉及到了自定义定时任务调度的需求,借此熟悉各种定时任务实现方式
crontab
Linux 中定时任务由 cron(crond)这个系统服务来控制,这个系统服务事默认启动的。用户可以通过 crontgab
命令设置自己的定时任务
crontab 命令格式:
相关参数:
参数 |
功能 |
-u |
指定用户(没指定时默认当前用户) |
-e |
编辑某个用户的 crontab 文件内容 |
-l |
显示某个用户的 crontab 文件内容 |
-r |
删除某个用户的 crontab 文件 |
-i |
删除某个用户的 crontab 文件时需确认 |
服务启停
1 2 3 4 5 6 7 8 9 10
| #查看运行状态 service crond status #启动服务 service crond start #关闭服务 service crond stop #重启服务 service crond restart #重新载入配置 service crond reload
|
定时任务格式
使用 crontab -e
进入定时任务编辑页后,定义定时任务的执行周期与执行命令
1 2 3 4 5 6 7 8 9 10
| .---------------------表示分钟 00~59 每分钟用 * 或者 */1 表示 | .------------------表示小时 00~23 | | .---------------表示日期 01~31 | | | .------------表示月份 01 ~12 | | | | .---------表示星期 0~6(从星期天开始) | | | | | .----表示要运行的命令 | | | | | | | | | | | | * * * * * command 分 时 日 月 周 命令
|
除了数字外,还有一个符号
- * :表示任意时间
- , :表示不连续的时间,如:
0 8,12,16 * * * command
表示每天的 8、12、16时0分都执行一次
- - :表示连续的范围,如:
0 5 * * 1-5 command
表示周一到周五的凌晨 5 点执行一次
- */n :表示每隔多久执行一次,如:
*/10 * * * * command
表示每隔 10 分钟执行一次
实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| #每1分钟执行一次myCommand * * * * * myCommand #每小时的第3和第15分钟执行 3,15 * * * * myCommand #在上午8点到11点的第3和第15分钟执行 3,15 8-11 * * * myCommand #每隔两天的上午8点到11点的第3和第15分钟执行 3,15 8-11 */2 * * myCommand #每周一上午8点到11点的第3和第15分钟执行 3,15 8-11 * * 1 myCommand #每晚的21:30重启smb 30 21 * * * /etc/init.d/smb restart #每月1、10、22日的4 : 45重启smb 45 4 1,10,22 * * /etc/init.d/smb restart #每周六、周日的1 : 10重启smb 10 1 * * 6,0 /etc/init.d/smb restart #每天18 : 00至23 : 00之间每隔30分钟重启smb 0,30 18-23 * * * /etc/init.d/smb restart #每星期六的晚上11 : 00 pm重启smb 0 23 * * 6 /etc/init.d/smb restart #每一小时重启smb 0 */1 * * * /etc/init.d/smb restart #晚上11点到早上7点之间,每隔一小时重启smb 0 23-7/1 * * * /etc/init.d/smb restart
|
特殊用法
crontab 除了使用 -e
的方式来编辑外,还可以通过管道符追加的方式来编辑
1 2
| #当前用户的crontab内容输出并追加一行指令,并将修改后的内容重新加载到crontab中 (crontab -l; echo "0 4 * * * /backups.sh") | crontab -
|
crontab -
是一个特殊的语法,用于将标准输入中的内容作为新的 crontab 内容进行替换。通常,crontab
命令用于编辑、查看和管理用户的定时任务列表。当使用 crontab -
时,它会读取标准输入中的内容,并将其作为新的 crontab 内容进行替换,从而更新用户的定时任务列表。在给定的命令中,crontab -l
用于列出当前用户的 crontab 内容,然后通过管道操作符 |
将其输出作为标准输入传递给 crontab -
命令,以实现替换操作
还或者:
1 2 3 4 5
| crontab - <<EOF 0 4 * * * /backups.sh * * * * * /path/to/script1.sh 30 8 * * * /path/to/script2.sh EOF
|
Timer
Timer 是 jdk 中提供的定时器工具,用于在后台执行指定任务
Timer 类就相当于一个任务调度器,里面包含了一个 TimerThread 线程,这个线程无限循环从 TaskQueue 中获取 TimerTask
6 中 TimerTask
- **schedule(TimerTask task, Date time) **:指定时间点 time 执行
- **schedule(TimerTask task, long delay)**:延迟 delay 后执行(毫秒)
- **schedule(TimerTask task, Date firstTime,long period)**:指定时间点 firstTime 执行后,以固定频率 period 执行
- **schedule(TimerTask task, long delay, long period)**:延迟 delay 后,以固定频率 period 执行
- **scheduleAtFixedRate(TimerTask task,Date firstTime,long period)**:指定时间 firstTime 执行后,以固定频率 period 执行
- **scheduleAtFixedRate(TimerTask task, long delay, long period)**:延迟 delay 后,以固定频率 period 执行
调度方式
上面提到,一个 Timer 实例就相当于一个调度器,而 Timer 只有一个执行线程,所以只要在一个 Timer 内的任务,就会相互影响
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public static void main(String[] args) { Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { System.out.println(Thread.currentThread().getName() + ":" + 1111); } }, 1000, 3000); timer.schedule(new TimerTask() { @Override public void run() { System.out.println(Thread.currentThread().getName() + ":" + 222); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { throw new RuntimeException(e); } } }, 1000, 5000); }
|
out:
1 2 3
| Timer-0:1111 Timer-0:222 ...
|
这里两个定时任务都在同一个 timer 中,一个执行快,一个执行慢,二者都是同一个执行线程,这时就会出现执行慢的阻塞执行快的
如果第二个执行慢的使用另一个 timer 则不会影响第一个任务
schedule 与 scheduleAtFixedRate
上面 6 中 TimerTask 中,3-4 和 5-6 看似相同,它们的主要差别在于延迟的处理
还是以上面的例子说明,将 schedule 换成 scheduleAtFixedRate
第二个任务依然会阻塞第一个任务,但不同的是,当到第一个任务执行时,被阻塞的任务会立马执行,保证任务的整体执行次数
为方便观察,将第二个任务的阻塞改为只阻塞 3 次:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public static void main(String[] args) { int i = 0; Timer timer = new Timer(); timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { log.info("{}:1111", Thread.currentThread().getName()); } }, 1000, 3000); timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { log.info("{}:222", Thread.currentThread().getName()); if (i < 3) { try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { throw new RuntimeException(e); } } i++; } }, 1000, 5000); }
|
out:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| 15:54:02.167 INFO --- [Timer-0] net.venom.core.Test: Timer-0:1111 15:54:02.172 INFO --- [Timer-0] net.venom.core.Test: Timer-0:222 15:54:12.180 INFO --- [Timer-0] net.venom.core.Test: Timer-0:1111 15:54:12.183 INFO --- [Timer-0] net.venom.core.Test: Timer-0:222 15:54:22.200 INFO --- [Timer-0] net.venom.core.Test: Timer-0:1111 15:54:22.203 INFO --- [Timer-0] net.venom.core.Test: Timer-0:1111 15:54:22.204 INFO --- [Timer-0] net.venom.core.Test: Timer-0:222 ------------------------------------------------------------------ 15:54:32.217 INFO --- [Timer-0] net.venom.core.Test: Timer-0:1111 15:54:32.219 INFO --- [Timer-0] net.venom.core.Test: Timer-0:1111 15:54:32.219 INFO --- [Timer-0] net.venom.core.Test: Timer-0:222 15:54:32.220 INFO --- [Timer-0] net.venom.core.Test: Timer-0:1111 15:54:32.220 INFO --- [Timer-0] net.venom.core.Test: Timer-0:222 15:54:32.221 INFO --- [Timer-0] net.venom.core.Test: Timer-0:1111 15:54:32.221 INFO --- [Timer-0] net.venom.core.Test: Timer-0:1111 15:54:32.222 INFO --- [Timer-0] net.venom.core.Test: Timer-0:222 15:54:32.222 INFO --- [Timer-0] net.venom.core.Test: Timer-0:1111 15:54:32.222 INFO --- [Timer-0] net.venom.core.Test: Timer-0:1111 15:54:32.222 INFO --- [Timer-0] net.venom.core.Test: Timer-0:222
|
可以看出,第三个 222 打印出来后,一瞬间就执行了超多任务,这些都是前面被阻塞的任务
优缺点
使用非常方便,jdk 原生提供,不用引入第三方依赖
Timer 是单线程执行,如果某个任务执行过长,会出现阻塞
如果 TimerTask 抛出 RuntimeException,整个 Timer 都会停掉
ScheduleExecutorService
ScheduledExecutorService 是 JDK1.5 版本引进的定时任务,该类位于 java.util.concurrent 并发包下
ScheduledExecutorService 是基于多线程的,设计的初衷是为了解决 Timer 单线程执行,多个任务之间会互相影响的问题
主要包含 4 个方法:
- **schedule(Runnable command,long delay,TimeUnit unit)**:延迟时间的调度,只执行一次,调度之后可通过 Future.get() 阻塞直至任务执行完毕
- **schedule(Callable callable,long delay,TimeUnit unit)**:延迟时间的调度,只执行一次,调度之后可通过 Future.get() 阻塞直至任务执行完毕,并且可以获取执行结果。
- scheduleAtFixedRate:以固定频率执行的任务
- scheduleWithFixedDelay:以固定延时执行任务,延时是相对当前任务结束为起点计算开始时间
调度方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public static void main(String[] args) { ScheduledExecutorService pool = Executors.newScheduledThreadPool(3); pool.scheduleAtFixedRate(() -> { try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { throw new RuntimeException(e); } log.info("1 do..."); }, 2, 3, TimeUnit.SECONDS); pool.scheduleAtFixedRate(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { throw new RuntimeException(e); } log.info("2 do..."); }, 2, 3, TimeUnit.SECONDS); pool.scheduleAtFixedRate(() -> { log.info("3 do..."); }, 2, 3, TimeUnit.SECONDS); }
|
out:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| 16:23:09.920 INFO --- [pool-1-thread-3] net.venom.core.Test: 3 do... 16:23:11.927 INFO --- [pool-1-thread-2] net.venom.core.Test: 2 do... 16:23:12.921 INFO --- [pool-1-thread-3] net.venom.core.Test: 3 do... 16:23:14.926 INFO --- [pool-1-thread-2] net.venom.core.Test: 2 do... 16:23:15.910 INFO --- [pool-1-thread-3] net.venom.core.Test: 3 do... 16:23:17.926 INFO --- [pool-1-thread-2] net.venom.core.Test: 2 do... 16:23:18.921 INFO --- [pool-1-thread-3] net.venom.core.Test: 3 do... 16:23:19.931 INFO --- [pool-1-thread-1] net.venom.core.Test: 1 do... 16:23:20.926 INFO --- [pool-1-thread-2] net.venom.core.Test: 2 do... 16:23:21.918 INFO --- [pool-1-thread-3] net.venom.core.Test: 3 do... 16:23:23.921 INFO --- [pool-1-thread-2] net.venom.core.Test: 2 do... 16:23:24.917 INFO --- [pool-1-thread-3] net.venom.core.Test: 3 do... 16:23:26.921 INFO --- [pool-1-thread-2] net.venom.core.Test: 2 do... 16:23:27.914 INFO --- [pool-1-thread-3] net.venom.core.Test: 3 do... 16:23:29.927 INFO --- [pool-1-thread-2] net.venom.core.Test: 2 do... 16:23:29.943 INFO --- [pool-1-thread-1] net.venom.core.Test: 1 do...
|
这里多线程虽然可以避免多个任务间的影响,但如果是任务自身的执行缓慢,那么还是存在 上次任务对下次任务的阻塞
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public static void main(String[] args) { int i = 0; ScheduledExecutorService pool = Executors.newScheduledThreadPool(3); pool.scheduleAtFixedRate(() -> { if (i < 3) { try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { throw new RuntimeException(e); } } log.info("do..."); i++; }, 10, 3, TimeUnit.SECONDS); }
|
out:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| 16:16:08.894 INFO --- [pool-1-thread-1] net.venom.core.Test: do... 16:16:18.907 INFO --- [pool-1-thread-1] net.venom.core.Test: do... 16:16:28.912 INFO --- [pool-1-thread-2] net.venom.core.Test: do... ----------------------------------------------------------------- 16:16:28.915 INFO --- [pool-1-thread-2] net.venom.core.Test: do... 16:16:28.916 INFO --- [pool-1-thread-3] net.venom.core.Test: do... 16:16:28.916 INFO --- [pool-1-thread-3] net.venom.core.Test: do... 16:16:28.917 INFO --- [pool-1-thread-3] net.venom.core.Test: do... 16:16:28.917 INFO --- [pool-1-thread-3] net.venom.core.Test: do... 16:16:28.918 INFO --- [pool-1-thread-3] net.venom.core.Test: do... 16:16:28.919 INFO --- [pool-1-thread-3] net.venom.core.Test: do... 16:16:28.919 INFO --- [pool-1-thread-3] net.venom.core.Test: do... 16:16:31.876 INFO --- [pool-1-thread-3] net.venom.core.Test: do... 16:16:34.879 INFO --- [pool-1-thread-3] net.venom.core.Test: do... 16:16:37.873 INFO --- [pool-1-thread-3] net.venom.core.Test: do...
|
可以明显发现,第三次执行完后,瞬间就有很多任务执行
优缺点
- 基于线程池的定时任务,多个任务之间不会相互影响
- 不支持复杂的定时规则
DelayQueue
DelayQueue 是 JUC 下提供的延迟队列,用于实现延时任务。如订单下单 15 分钟未付款则直接取消订单。DelayQueue 是 BlockingQueue
的一种,底层是一个基于 PriorityQueue
实现的一个无界队列。DelayQueue 通过 ReentrantLock
实现了互斥访问,通过 Condition
实现了线程间的等待和唤醒,所以它是 线程安全
的
调度方式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| public static void main(String[] args) throws InterruptedException { DelayQueue<DelayedTask> delayedQueue = new DelayQueue<>(); delayedQueue.add(new DelayedTask(System.currentTimeMillis() + 20000, () -> log.info("延迟 20 秒"))); delayedQueue.add(new DelayedTask(System.currentTimeMillis() + 10000, () -> log.info("延迟 10 秒"))); delayedQueue.add(new DelayedTask(System.currentTimeMillis() + 30000, () -> log.info("延迟 30 秒")));
while (!delayedQueue.isEmpty()) { delayedQueue.take().run(); } }
@AllArgsConstructor public static class DelayedTask implements Delayed {
private long delayTime;
private Runnable task;
@Override public long getDelay(@NonNull TimeUnit unit) { return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); }
@Override public int compareTo(@NonNull Delayed o) { return Long.compare(this.delayTime, ((DelayedTask) o).delayTime); }
public void run() { task.run(); } }
|
out:
1 2 3
| 15:50:58.933 INFO --- [main] net.venom.core.Test: 延迟 10 秒 15:51:08.933 INFO --- [main] net.venom.core.Test: 延迟 20 秒 15:51:18.935 INFO --- [main] net.venom.core.Test: 延迟 30 秒
|
可以看出 DelayQueue 会按照延迟时间排序后的顺序取出任务进行执行
那么如果一个大的任务执行超时,是否会阻塞后面的任务呢?
1 2 3 4 5 6 7 8
| delayedQueue.add(new DelayedTask(System.currentTimeMillis() + 10000, () -> { log.info("延迟 10 秒"); try { TimeUnit.SECONDS.sleep(30); } catch (InterruptedException e) { throw new RuntimeException(e); } }));
|
out:
1 2 3
| 15:58:57.545 INFO --- [main] net.venom.core.Test: 延迟 10 秒 15:59:27.563 INFO --- [main] net.venom.core.Test: 延迟 20 秒 15:59:27.563 INFO --- [main] net.venom.core.Test: 延迟 30 秒
|
可以看出,延迟 10 秒的任务严重阻塞了其他任务,当延迟 10 秒的任务执行完毕后,其他任务立即执行
但是 DelayQueue 最大的优势就是它是 线程安全
的,这意味着我们可以使用多线程从队列中获取任务来执行,从而避免某个任务对其他任务的阻塞
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| ExecutorService pool = Executors.newFixedThreadPool(3); pool.submit(() -> { while (!delayedQueue.isEmpty()) { try { delayedQueue.take().run(); } catch (InterruptedException e) { } } }); pool.submit(() -> { while (!delayedQueue.isEmpty()) { try { delayedQueue.take().run(); } catch (InterruptedException e) { } } }); pool.submit(() -> { while (!delayedQueue.isEmpty()) { try { delayedQueue.take().run(); } catch (InterruptedException e) { } } });
|
out:
1 2 3
| 16:10:25.134 INFO --- [pool-1-thread-1] net.venom.core.Test: 延迟 10 秒 16:10:35.121 INFO --- [pool-1-thread-2] net.venom.core.Test: 延迟 20 秒 16:10:45.127 INFO --- [pool-1-thread-3] net.venom.core.Test: 延迟 30 秒
|
优缺点
简单易用:DelayQueue 在 JUC 下,不需要引入第三方依赖,使用简单
线程安全:多线程可以安全访问和操作其中的元素
场景局限:DelayQueue 仅支持延迟执行任务,无法实现周期性执行任务
内存占用:DelayQueue 使用优先级队列来实现,需要维护内部的任务队列,当任务数量较多时,会占用一定内存
无法动态调整时间:一旦任务被添加到 DelayQueue 中,其执行时间就固定了,无法动态调整
不适用于大规模任务:DelayQueue 内部使用了🔒来保证线程安全,当任务量较大时,可能会出现性能瓶颈
TimmingWheel
时间轮(TimmingWheel)是一种 实现延迟功能 的巧妙算法。如果一个系统存在大量的任务调度,时间轮可以高效的利用线程资源来进行批量化调度。相较于 jdk 自带的 Timer、ScheduledExecutorService 和 DelayQueue 而言,时间轮是一种更加高效的调度模型
调度方式
时间轮是一个存储定时任务的 环形队列,底层采用数组实现,数组中每个元素可以放一个定时任务列表(TimerTaskList)。TimerTaskList 是一个 环形双向链表,链表中每一项都是一个定时任务实体(TimerTaskEntry),其中封装了具体的定时任务(TimerTask)
- tickMs:基本时间跨度,时间轮由多个时间格组成,每个格子代表时间轮的基本时间跨度
- wheelSize:时间格数量,时间轮的格子数量是固定的
- startMs:时间轮起始时间
- Interval:时间轮跨度,可以由
tickMs * wheelSize
计算得出
处理流程
- 若时间轮的 tickMs=1ms,wheelSize=20,那么可以计算得出 interval 为 20ms
- 初始情况下表盘指针 currentTime 指向时间格 0,此时有一个定时为 2ms 的任务插入进来会存放到时间格为 2 的 TimerTaskList 中
- 随着时间的不断推移,指针 currentTime 不断向前推进,过了 2ms 之后,当到达时间格 2 时,就需要将时间格 2 所对应的 TimeTaskList 中的任务做相应的到期操作
- 此时若又有一个定时为 8ms 的任务插入进来,则会存放到时间格 10 中,currentTime 再过 8ms 后会指向时间格 10
- 如果同时有一个定时为 19ms 的任务插入进来,新来的 TimerTaskEntry 会复用原来的 TimerTaskList,所以它会插入到原本已经到期的时间格 1 中
多重时间轮
如果此时有个定时为 350ms 的任务该如何处理?直接扩充 wheelSize 的大小么?
很多业务场景不乏几万甚至几十万毫秒的定时任务,这个 wheelSize 的扩充没有底线,就算将所有的定时任务的到期时间都设定一个上限,比如 100 万毫秒,那么这个 wheelSize 为 100 万毫秒的时间轮不仅占用很大的内存空间,而且效率也会拉低。所以 层级时间轮(类似十进制/二进制的计数方式) 的概念应运而生,当任务的到期时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中
复用之前的案例,第一层的时间轮 tickMs=1ms, wheelSize=20, interval=20ms。第二层的时间轮的 tickMs 为第一层时间轮的 interval,即为 20ms。每一层时间轮的 wheelSize 是固定的,都是 20,那么第二层的时间轮的总体时间跨度 interval 为 400ms。以此类推,这个 400ms 也是第三层的 tickMs 的大小,第三层的时间轮的总体时间跨度为 8000ms
- 当到达时间格 2 时,如果此时有个定时为 350ms 的任务,显然第一层时间轮不能满足条件,所以就 时间轮升级 到第二层时间轮中,最终被插入到第二层时间轮中时间格 17 所对应的 TimerTaskList 中;
- 如果此时又有一个定时为 450ms 的任务,那么显然第二层时间轮也无法满足条件,所以又升级到第三层时间轮中,最终被插入到第三层时间轮中时间格 1 的 TimerTaskList 中;
- 注意到在到期时间在 [400ms,800ms) 区间的多个任务(比如446ms、455ms以及473ms的定时任务)都会被放入到第三层时间轮的时间格 1 中,时间格 1 对应的TimerTaskList的超时时间为400ms;
- 随着时间的流逝,当次 TimerTaskList 到期之时,原本定时为 450ms 的任务还剩下 50ms 的时间,还不能执行这个任务的到期操作。这里就有一个 时间轮降级 的操作,会将这个剩余时间为 50ms 的定时任务重新提交到层级时间轮中,此时第一层时间轮的总体时间跨度不够,而第二层足够,所以该任务被放到第二层时间轮到期时间为 [40ms,60ms) 的时间格中;
- 再经历了 40ms 之后,此时这个任务又被“察觉”到,不过还剩余 10ms,还是不能立即执行到期操作。所以还要再有一次时间轮的降级,此任务被添加到第一层时间轮到期时间为 [10ms,11ms) 的时间格中,之后再经历 10ms 后,此任务真正到期,最终执行相应的到期操作
具体代码实现
优缺点
- 高效的任务调度:时间轮算法通过将任务按照延迟时间划分到不同的时间槽中,实现了高效的任务调度,能够在 O(1) 的时间复杂度内找到需要执行的任务
- 适用于大量短期任务:时间轮算法适用于大量短期任务的调度
- 需要手动实现:jdk 中没有提供具体实现,需要自己手动实现或引入第三方工具
- 无法动态调整执行时间:一旦任务被添加到时间轮中,其执行时间就是固定的,无法动态调整
- 不支持周期性任务:时间轮算法主要用于执行一次性任务,不适用于周期性任务的调度
- 内存消耗较大:如果时间轮的槽位数量过多,或者每个槽位中的任务数量较多,可能会占用较多的内存空间
- 不适于长时间延迟的任务:时间轮算法适用于短期任务的调度,对于长时间延迟的任务,可能需要其他算法来处理
Spring Task
Spring Task 是 Spring3 以上版本自带的定时任务
调用方式
- SpringBoot 启动类上加上
@EnableScheduling
- 使用
@Scheduled
定义定时规则
Spring Cron 规则
1 2 3 4 5 6 7 8 9 10
| .------------------------表示秒:0~59,支持 * , - / | .---------------------表示分:0~59,支持 * , - / | | .------------------表示时:0~23,支持 * , - / | | | .---------------表示日期:1~31,支持 * , - / ? L W C | | | | .-----------表示月:1~12,支持 * , - / | | | | | .-------表示星期:1~7(从星期天开始),支持 * , - / ? L C # | | | | | | .---表示年:1970~2099(可选的),支持 * , - / | | | | | | | * * * * * * * 秒 分 时 日期 月 星期 年
|
需要注意的是,「日期」和「星期」都多了一个特殊符「?」,
符号含义:
- *****:任意值,在月域中,
*
表示每个月,在星期域中,*
表示星期的每一天
- **,**:枚举值,在分钟域中,
5,20
表示分别在 5 分钟和 20 分钟触发一次
- **-**:范围,在分钟域中,
5-20
表示从 5 分钟到 20 分钟之间 每隔一分钟触发一次
- **/**:指定数值的增量,在分钟域中,
0/15
表示从第 0 分钟开始,每 15 分钟,3/20
表示从第 3 分钟开始,每 20 分钟
- **?**:不指定值,仅 日期和星期 支持该字符, 当 日期 或 星期 其中之一被指定了值以后,为了避免冲突,需要将另一个域的值设为
?
- L:Last,表示 最后一天,仅 日期和星期域 支持该字符,指定
L
字符时,避免指定列表或者范围,否则,会导致逻辑问题
- 在 日期域 中,
L
表示 某个月的最后一天。在 星期域 中,L
表示一个星期的最后一天,也就是星期日(SUN
)
- 如果在
L
前有具体的内容,例如,在星期域中的 6L
表示这个月的最后一个星期六
- W:除周末以外的有效工作日,在离指定日期的最近的有效工作日触发事件。
W
字符寻找最近有效工作日时不会跨过当前月份,连用字符 LW
时表示为指定月份的最后一个工作日。在日期域中 5W
,如果 5日 是星期六,则将在最近的工作日星期五,即 4日 触发。如果 5日 是星期天,则将在最近的工作日星期一,即 6日 触发;如果 5日 在星期一到星期五中的一天,则就在 5日 触发
- C:允许在日期域和星期域出现,这个字符依靠一个指定的「日历」, 也就是说这个表达式的值依赖于相关的「日历」的计算结果,如果没有「日历」关联,则等价于所有包含的「日历」。如:日期域是
5C
表示关联「日历」中第一天,或者这个月开始的第一天的后 5 天。星期域是 1C
表示关联「日历」中第一天,或者星期的第一天的后 1 天,也就是周日的后一天(周一)
- **#**:确定 每个月第几个星期几,仅 星期域 支持该字符,在星期域中,
4#2
表示某月的第二个星期四
实例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| # 每天上午10:15执行任务 0 15 10 ? * * # 每天上午10:15执行任务 0 15 10 * * ? # 每天中午12:00执行任务 0 0 12 * * ? # 每天上午10:00点、下午14:00以及下午16:00执行任务 0 0 10,14,16 * * ? # 每天上午09:00到下午17:00时间段内每隔半小时执行任务 0 0/30 9-17 * * ? # 每天下午14:00到下午14:59时间段内每隔1分钟执行任务 0 * 14 * * ? # 每天下午14:00到下午14:05时间段内每隔1分钟执行任务 0 0-5 14 * * ? # 每天下午14:00到下午14:55时间段内每隔5分钟执行任务 0 0/5 14 * * ? # 每天下午14:00到下午14:55、下午18:00到下午18:55时间段内每隔5分钟执行任务 0 0/5 14,18 * * ? # 每个星期三中午12:00执行任务 0 0 12 ? * WED # 每月15日上午10:15执行任务 0 15 10 15 * ? # 每月最后一日上午10:15执行任务 0 15 10 L * ? # 每月最后一个星期六上午10:15执行任务 0 15 10 ? * 6L # 每月第三个星期六上午10:15执行任务 0 15 10 ? * 6#3 # 每年3月的每个星期三下午14:10和14:44执行任务 0 10,44 14 ? 3 WED
|
优缺点
- 使用简单,Spring 做了很好的封装,适用于绝大多数单机版的业务场景
- 支持复杂 cron 表达式
- 默认 单线程,需要手动配置线程池
- 不支持分布式场景
- 定时任务不能持久化
Quartz
Quartz 是一个功能强大、灵活、可靠的作业调度开源框架,用于在 Java 应用程序中实现任务调度和定时任务执行。它提供了丰富的功能,能够满足各种复杂的调度需求,包括简单的定时任务、复杂的作业调度、集群环境下的任务调度等。
Quartz 的主要特点和功能包括:
- 灵活的任务调度:Quartz 支持基于时间、间隔、表达式等多种方式的任务调度,能够满足各种复杂的调度需求。
- 作业管理:Quartz 允许用户定义和管理各种类型的作业(Job),包括普通的任务、持久化的任务、集群环境下的任务等。
- 可靠性和容错性:Quartz 提供了可靠的任务执行机制和容错机制,能够保证任务按时执行,即使在系统故障或者异常情况下也能够保证任务的执行。
- 集群支持:Quartz 提供了集群支持功能,多个 Quartz 实例可以组成一个集群,实现任务的负载均衡和高可用性。
- 持久化支持:Quartz 支持将任务调度信息持久化到数据库中,确保任务信息不会丢失,即使系统重启也能够恢复任务调度状态。
- 监听器和触发器:Quartz 提供了丰富的监听器和触发器机制,能够对任务的执行过程进行监控和控制。
- 插件机制:Quartz 提供了插件机制,用户可以根据自己的需求扩展和定制 Quartz 的功能
调用方式
引入相关依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-quartz</artifactId> </dependency>
|
创建定时任务执行类
1 2 3 4 5 6 7
| public class QuartzTestJob extends QuartzJobBean { @Override protected void executeInternal(JobExecutionContext context) throws JobExecutionException { String userName = (String) context.getJobDetail().getJobDataMap().get("userName"); System.out.println("userName:" + userName); } }
|
创建调度程序 JobDetail 和调度器 Trigger
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| @Configuration public class QuartzConfig { @Value("${test.cron}") private String testCron;
@Bean public JobDetail quartzTestDetail() { JobDetail jobDetail = JobBuilder.newJob(QuartzTestJob.class) .withIdentity("quartzTestDetail", "QUARTZ_TEST") .usingJobData("userName", "susan") .storeDurably() .build(); return jobDetail; }
@Bean public Trigger quartzTestJobTrigger() { CronScheduleBuilder cronScheduleBuilder = CronScheduleBuilder.cronSchedule(testCron);
Trigger trigger = TriggerBuilder.newTrigger() .forJob(quartzTestDetail()) .withIdentity("quartzTestJobTrigger", "QUARTZ_TEST_JOB_TRIGGER") .withSchedule(cronScheduleBuilder) .build(); return trigger; } }
|
camel + quartz2
引入依赖
1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-spring-boot-starter</artifactId> <version>2.20.1</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-quartz2</artifactId> <version>2.20.1</version> </dependency>
|
新建任务
1 2 3 4 5 6 7 8 9 10 11 12
| @Slf4j public class QuartzTestJob {
@Value(value = "${test.switch:false}") private boolean testSwitch;
public void execJob() { if (testSwitch) { log.info("Start QuartzTestJob!"); } } }
|
新建路由
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Configuration public class TestSchedule extends RouteBuilder {
@Value(value = "${testJob.cron:0 0 9 * * ?}") private String testJobCron;
@Override public void configure() throws Exception { from("quartz2://group/QuartzTestJob?cron=" + testJobCron + "&stateful=true") .routeId("QuartzTestJobTimer").setHeader("ROUTING_KEY", simple("'QuartzTestJob'")) .to("class:com.venom.job.QuartzTestJob?method=execJob"); } }
|
Quartz Cron 的格式与 Spring Task 相同
优点
- 灵活性高:Quartz 提供了丰富的任务调度功能,支持多种触发器类型、灵活的任务调度配置,能够满足各种复杂的调度需求。
- 可靠性强:Quartz 具有良好的容错机制和任务状态管理,能够保证任务的可靠执行,即使在系统异常或者故障情况下也能够确保任务的执行。
- 集群支持:Quartz 支持集群部署,多个 Quartz 实例可以组成一个集群,实现任务的负载均衡和高可用性。
- 持久化支持:Quartz 提供了持久化支持,可以将任务调度信息持久化到数据库中,确保任务信息不会丢失,即使系统重启也能够恢复任务调度状态。
- 丰富的监听器机制:Quartz 提供了丰富的监听器机制,能够对任务的执行过程进行监控和控制,实现任务执行过程的定制化管理。
- 易于集成:Quartz 是一个开源框架,易于集成到各种 Java 应用程序中,可以与 Spring、JEE 等技术栈无缝集成,提高了开发效率。
缺点
- 学习曲线较陡:Quartz 的功能较为丰富,对于新手来说学习曲线可能较陡,需要一定的时间和精力去理解和掌握。
- 配置复杂:Quartz 的配置相对复杂,尤其是在集群部署和持久化配置方面,需要仔细配置才能保证系统的稳定性和可靠性。
- 资源占用较高:Quartz 的集群模式下,需要占用一定的系统资源和数据库资源,特别是在任务调度频繁的情况下,可能会占用较多的资源。
- 不适用于实时任务:Quartz 是一个基于时间的作业调度框架,不适用于实时任务的调度,如果需要实时任务调度,可能需要其他更适合的技术方案。
- 文档和社区支持有限:相对于一些其他流行的开源框架,Quartz 的文档和社区支持相对有限,可能会在使用过程中遇到一些问题
xxl-job
xxl-job 是大众点评(许雪里)开发的一个分布式任务调度平台,专注于解决大数据场景下的任务调度问题。它是一个开源项目,提供了任务调度和管理、任务执行器、任务日志查看等功能,适用于各种规模的企业应用场景。
xxl-job 的主要特点和功能包括:
- 分布式任务调度:xxl-job 提供了分布式任务调度功能,支持在多台服务器上进行任务调度和执行,能够满足大规模任务调度的需求。
- Web 控制台:xxl-job 提供了 Web 控制台,用于任务的管理、监控和调度配置,管理员可以通过 Web 控制台方便地管理任务和查看任务执行情况。
- 任务执行器:xxl-job 提供了任务执行器,用于执行具体的任务逻辑,任务执行器可以在多台服务器上部署,通过注册到调度中心进行任务调度和执行。
- 任务分片:xxl-job 支持任务分片功能,可以将一个任务分成多个子任务,每个子任务由不同的执行器执行,提高了任务的执行效率和并发能力。
- 任务调度策略:xxl-job 支持多种任务调度策略,包括固定频率调度、固定延迟调度、Cron 表达式调度等,能够满足各种复杂的调度需求。
- 任务日志查看:xxl-job 提供了任务日志查看功能,可以方便地查看任务的执行日志和执行状态,帮助管理员快速定位和排查问题。
- 报警机制:xxl-job 支持报警机制,可以通过邮件、短信等方式发送任务执行结果和异常信息,及时通知管理员处理。
调用方式
引入依赖
1 2 3 4 5
| <dependency> <groupId>com.xuxueli</groupId> <artifactId>xxl-job-core</artifactId> <version>2.3.1</version> </dependency>
|
添加配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
xxl.job.accessToken=default_token
xxl.job.executor.appname=xxl-job-executor-sample
xxl.job.executor.address=
xxl.job.executor.ip= xxl.job.executor.port=9999
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
xxl.job.executor.logretentiondays=30
|
配置 xxl-job bean
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| @Configuration public class XxlJobConfig { private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
@Value("${xxl.job.admin.addresses}") private String adminAddresses;
@Value("${xxl.job.accessToken}") private String accessToken;
@Value("${xxl.job.executor.appname}") private String appname;
@Value("${xxl.job.executor.address}") private String address;
@Value("${xxl.job.executor.ip}") private String ip;
@Value("${xxl.job.executor.port}") private int port;
@Value("${xxl.job.executor.logpath}") private String logPath;
@Value("${xxl.job.executor.logretentiondays}") private int logRetentionDays;
@Bean public XxlJobSpringExecutor xxlJobExecutor() { logger.info(">>>>>>>>>>> xxl-job config init."); XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor(); xxlJobSpringExecutor.setAdminAddresses(adminAddresses); xxlJobSpringExecutor.setAppname(appname); xxlJobSpringExecutor.setAddress(address); xxlJobSpringExecutor.setIp(ip); xxlJobSpringExecutor.setPort(port); xxlJobSpringExecutor.setAccessToken(accessToken); xxlJobSpringExecutor.setLogPath(logPath); xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
XxlJobExecutor.registJobHandler("beanClassDemoJobHandler", new BeanClassDemoJob());
return xxlJobSpringExecutor; } }
|
新建 Job
- 在Spring Bean实例中,开发Job方法
- 为 Job 方法添加注解
@XxlJob(value="job's name", init = "job 初始化方法", destroy = "job 销毁方法")
- 需要通过 “XxlJobHelper.log” 打印执行日志
- 默认任务结果为「成功」状态,不需要主动设置,如有诉求,比如设置任务结果为失败,可以通过
XxlJobHelper.handleFail/handleSuccess
自主设置任务结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
| @Slf4j @Component public class BeanMethodDemoJob {
@XxlJob("demoJobHandler") public void demoJobHandler() { XxlJobHelper.log("demoJobHandler execute..."); }
@XxlJob("shardingJobHandler") public void shardingJobHandler() throws Exception { log.info("shardingJobHandler execute...");
XxlJobHelper.log("shardingJobHandler execute...");
int shardIndex = XxlJobHelper.getShardIndex(); int shardTotal = XxlJobHelper.getShardTotal();
XxlJobHelper.log("分片参数:当前分片序号 = {}, 总分片数 = {}", shardIndex, shardTotal);
for (int i = 0; i < shardTotal; i++) { if (i==shardIndex) { XxlJobHelper.log("第 {} 片, 命中分片开始处理", i); } else { XxlJobHelper.log("第 {} 片, 忽略", i); } }
}
@XxlJob("commandJobHandler") public void commandJobHandler() throws Exception { XxlJobHelper.log("commandJobHandler execute...");
String command = XxlJobHelper.getJobParam(); int exitValue = -1;
BufferedReader bufferedReader = null; try { ProcessBuilder processBuilder = new ProcessBuilder(); processBuilder.command(command); processBuilder.redirectErrorStream(true);
Process process = processBuilder.start();
BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream()); bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));
String line; while ((line = bufferedReader.readLine())!=null) { XxlJobHelper.log(line); }
process.waitFor(); exitValue = process.exitValue(); } catch (Exception e) { XxlJobHelper.log(e); } finally { if (bufferedReader!=null) { bufferedReader.close(); } }
if (exitValue==0) { } else { XxlJobHelper.handleFail("command exit value(" + exitValue + ") is failed"); }
}
@XxlJob("httpJobHandler") public void httpJobHandler() throws Exception { XxlJobHelper.log("httpJobHandler execute...");
String param = XxlJobHelper.getJobParam(); if (param==null || param.trim().length()==0) { XxlJobHelper.log("param[" + param + "] invalid.");
XxlJobHelper.handleFail(); return; }
String[] httpParams = param.split("\n"); String url = null; String method = null; String data = null; for (String httpParam : httpParams) { if (httpParam.startsWith("url:")) { url = httpParam.substring(httpParam.indexOf("url:") + 4).trim(); } if (httpParam.startsWith("method:")) { method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase(); } if (httpParam.startsWith("data:")) { data = httpParam.substring(httpParam.indexOf("data:") + 5).trim(); } }
if (url==null || url.trim().length()==0) { XxlJobHelper.log("url[" + url + "] invalid.");
XxlJobHelper.handleFail(); return; } if (method==null || !Arrays.asList("GET", "POST").contains(method)) { XxlJobHelper.log("method[" + method + "] invalid.");
XxlJobHelper.handleFail(); return; } boolean isPostMethod = method.equals("POST");
HttpURLConnection connection = null; BufferedReader bufferedReader = null; try { URL realUrl = new URL(url); connection = (HttpURLConnection) realUrl.openConnection();
connection.setRequestMethod(method); connection.setDoOutput(isPostMethod); connection.setDoInput(true); connection.setUseCaches(false); connection.setReadTimeout(5 * 1000); connection.setConnectTimeout(3 * 1000); connection.setRequestProperty("connection", "Keep-Alive"); connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8"); connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
connection.connect();
if (isPostMethod && data!=null && data.trim().length() > 0) { DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream()); dataOutputStream.write(data.getBytes("UTF-8")); dataOutputStream.flush(); dataOutputStream.close(); }
int statusCode = connection.getResponseCode(); if (statusCode!=200) { throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid."); }
bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8")); StringBuilder result = new StringBuilder(); String line; while ((line = bufferedReader.readLine())!=null) { result.append(line); } String responseMsg = result.toString();
XxlJobHelper.log(responseMsg);
return; } catch (Exception e) { XxlJobHelper.log(e);
XxlJobHelper.handleFail(); return; } finally { try { if (bufferedReader!=null) { bufferedReader.close(); } if (connection!=null) { connection.disconnect(); } } catch (Exception e2) { XxlJobHelper.log(e2); } }
}
@XxlJob(value = "demoJobHandler2", init = "init", destroy = "destroy") public void demoJobHandler2() throws Exception { XxlJobHelper.log("demoJobHandler2, execute..."); }
public void init() { log.info("init"); }
public void destroy() { log.info("destroy"); } }
|
优缺点
- 有界面管理定时任务,支持弹性扩容缩容、动态分片、故障转移、失败报警等功能。它的功能非常强大,很多大厂在用,可以满足绝大多数业务场景
- 和 quartz 一样,通过数据库分布式锁,来控制任务不能重复执行。在任务非常多的情况下,有一些性能问题
Redis
Redis 可以用来处理延迟类的任务,主要基于两种方案:Redis 过期事件监听和 Redisson 的延迟队列
Redis 过期事件监听
Spring Data Redis 专门提供了一个监听 Redis Key 过期事件的监听器 KeyExpirationEventMessageListener
继承它,并且重写 doHandleMessage(Message message)
方法即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Component public class KeyExpireListener extends KeyExpirationEventMessageListener {
final static Logger logger = LoggerFactory.getLogger(KeyExpireListener.class);
public KeyExpireListener(RedisMessageListenerContainer listenerContainer) { super(listenerContainer); }
@Override public void doHandleMessage(Message message) { byte[] body = message.getBody(); byte[] channel = message.getChannel(); logger.info("message = {}, channel = {}", new String(body), new String(channel)); } }
|
redis 默认不开启过期事件提醒,需要修改 Redis 配置文件中 notify-keyspace-events Ex
开启提醒
可以通过 config get notify-keyspace-events
获取配置信息
Redisson 延迟队列
先看 Demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException { Config config = new Config(); config.useSingleServer().setAddress("redis://172.29.2.10:7000"); RedissonClient redisson = Redisson.create(config); RBlockingQueue<String> blockingQueue = redisson.getBlockingQueue("test_queue"); RDelayedQueue<String> delayedQueue = redisson.getDelayedQueue(blockingQueue); new Thread() { public void run() { while(true) { try { System.err.println( blockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }; }.start(); for(int i=1;i<=5;i++) { delayedQueue.offer("fffffffff"+i, 13, TimeUnit.SECONDS); } }
|
大致原理:
Demo 运行后会发现 Redis 中存在两个列队 redisson_delay_queue_timeout:test_queue
和 redisson_delay_queue:test_queue
这两个队列中,带有timeout关键字的那条是一个ZSet集合,另一个是普通的List
但是为什么要设计两条队列呢,后边我查阅了相关资料,大概整理了下Redisson的延迟队列原理如下:
- 客户端启动,redisson 先订阅一个 key,同时
BLPOP key 0
无限监听一个阻塞队列(等里面有数据了就返回)
- 当有数据 put 时,redisson 先把数据放到一个 zset 集合(按延时到期时间的时间戳为分数排序),同时发布上面订阅的 key,发布内容为数据到期的 timeout,此时客户端进程开启一个延时任务,延时时间为发布的 timeout
- 客户端进程的延时任务到了时间执行,从 zset 分页取出过了当前时间的数据,然后将数据
rpush
到第一步的阻塞队列里。然后将当前数据从 zset 移除,取完之后,又执行 BLPOP key 0
无限监听一个阻塞队列。这一部分的逻辑,客户端会发送一个 lua 脚本给到服务端去操作:org.redisson.RedissonDelayedQueue
源码里面的 pushTaskAsync
函数有 lua 脚本内容
- 上一步客户端监听的阻塞队列返回取到数据,回调到 RBlockingQueue 的 take 方法。于是,我们就收到了数据
这里并未在生产中使用过 Redisson 的延迟队列,网上对此的问题也颇多,需谨慎使用
相关详细原理参靠: 关于Redisson延迟队列的一些思考 (qq.com) Redisson 延时队列 原理 详解 - 知乎 (zhihu.com)
MQ
大部分消息队列,例如 RocketMQ、RabbitMQ,都支持定时/延时消息。定时消息和延时消息本质其实是相同的,都是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。
不过,在使用 MQ 定时消息之前一定要看清楚其使用限制,以免不适合项目需求,例如 RocketMQ 定时时长最大值默认为 24 小时且不支持自定义修改、只支持 18 个 Level 的延时并不支持任意时间
Thanks