个人博客


前面介绍通过使用@Scheduled注解来实现定时任务,这种方式虽然很简便,但是还不够灵活。比如在一些场景下,我们需要动态地去改变定时任务周期、实时启停定时任务。这就需要动态地去更改定时任务的相关参数配置。

此文介绍的方式只负责定时任务调度(非分布式调度),不管具体业务的执行结果,也不对多个定时任务做业务上前后顺序关联。

1、Maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
</parent>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>11.2.0.3</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
<dependency>
<groupId>org.apache.karaf.scheduler</groupId>
<artifactId>org.apache.karaf.scheduler.core</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
</dependency>

2、创建定时任务管理表

2.1、MySQL

1
2
3
4
5
6
7
8
9
10
CREATE TABLE spring_schedule_cron (
id bigint(20) NOT NULL AUTO_INCREMENT,
bean_name varchar(128) NOT NULL COMMENT '定时任务beanName',
cron_expression varchar(16) NOT NULL COMMENT 'cron表达式',
task_desc varchar(128) NOT NULL COMMENT '任务描述',
enable tinyint DEFAULT 1 COMMENT '状态 0-禁用 1-启用',
app_id varchar(16) COMMENT '应用id',
PRIMARY KEY (id)
)COMMENT = '定时任务表';
CREATE UNIQUE INDEX spring_schedule_cron_index ON spring_schedule_cron(bean_name, app_id);

2.2、Oracle

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
CREATE TABLE spring_schedule_cron (
id NUMBER NOT NULL,
bean_name VARCHAR2(128) NOT NULL,--COMMENT '定时任务beanName'
cron_expression VARCHAR2(16) NOT NULL,--COMMENT 'cron表达式'
task_desc VARCHAR2(128) NOT NULL, --COMMENT '任务描述',
enable INTEGER DEFAULT 1, --COMMENT '状态 0-禁用 1-启用'
app_id VARCHAR2(16), --COMMENT '应用id'
PRIMARY KEY (id)
);
COMMENT ON TABLE spring_schedule_cron IS '定时任务表';
CREATE UNIQUE INDEX spring_schedule_cron_index ON spring_schedule_cron(bean_name, app_id);

-- 主键序列
DROP sequence seq_spring_schedule_cron;
CREATE sequence seq_spring_schedule_cron
INCREMENT BY 1
START WITH 1
ORDER
CACHE 20;

数据访问dao层可以参考文末的代码链接,这里就不再展示了。

3、启动类配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@SpringBootApplication
@MapperScan("net.zhaoxiaobin.task.dao")
@EnableAsync
@EnableScheduling
@Slf4j
public class TaskApplication {

public static void main(String[] args) {
SpringApplication.run(TaskApplication.class, args);
}

@Bean
@ConditionalOnMissingBean(name = "scheduleTaskExecutor")
public Executor scheduleTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20); // 核心线程数量,线程池创建时候初始化的线程数
executor.setMaxPoolSize(50); // 最大线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
executor.setQueueCapacity(200); // 缓冲队列,用来缓冲执行任务的队列
executor.setKeepAliveSeconds(60); // 当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
executor.setThreadNamePrefix("scheduleTask-"); // 线程名前缀
executor.setWaitForTasksToCompleteOnShutdown(true); // 用来设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
executor.setAwaitTerminationSeconds(60 * 5); // 该方法用来设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住。
/*
1.AbortPolicy,ThreadPoolExecutor中默认的拒绝策略就是AbortPolicy,直接抛出异常。
2.CallerRunsPolicy,CallerRunsPolicy在任务被拒绝添加后,会调用当前线程池的所在的线程去执行被拒绝的任务。
3.DiscardPolicy,采用这个拒绝策略,会让被线程池拒绝的任务直接抛弃,不会抛异常也不会执行。
4.DiscardOldestPolicy,当任务被拒绝添加时,会抛弃任务队列中最旧的任务也就是最先加入队列的,再把这个新任务添加进去。
*/
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
return executor;
}
}
  1. 启动类上需要添加@EnableAsync@EnableScheduling用来启用异步功能和定时任务。
  2. 创建线程池并注入Spring容器用以定时任务的调度执行,这里使用条件注解@ConditionalOnMissingBean是为了此简易定时任务调度工程可以在被其它工程集成依赖的时候,能够在自己的工程中重新定义线程池Executor,用来覆盖默认的线程池配置。

4、动态定时任务配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Configuration
@Slf4j
public class ScheduledConfig implements SchedulingConfigurer {
@Autowired
private ApplicationContext context;

@Autowired
private SpringScheduleCronDao dao;

@Value("${scheduleSwitch:false}")
private boolean scheduleSwitch;

@Value("${spring.application.name:appId}")
private String appId;

@Value("${server.port}")
private String port;

@Override
@SneakyThrows
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
// 定时任务开关
if (!scheduleSwitch) {
return;
}
// 查询所有定时任务
List<SpringScheduleCron> cronList = dao.findByAppId(appId);
for (SpringScheduleCron originSpringScheduleCron : cronList) {
ScheduleService scheduleBean = context.getBean(originSpringScheduleCron.getBeanName(), ScheduleService.class);
// 注册定时任务
/*
每次定时任务执行时,都会按顺序执行以下代码
1.执行scheduleBean的run函数(在ScheduleService接口实现的Runnable方法),可以异步执行
2.从表中查询当前定时任务最新的配置参数,更新下一次的执行周期
*/
taskRegistrar.addTriggerTask(scheduleBean, triggerContext -> {
try {
SpringScheduleCron springScheduleCron = dao.findByBeanName(appId, originSpringScheduleCron.getBeanName());
String originalCronExpression = originSpringScheduleCron.getCronExpression();
String currentCronExpression = springScheduleCron.getCronExpression();
if (!originalCronExpression.equals(currentCronExpression) && CronExpression.isValidExpression(currentCronExpression)) {
log.info("=====更新[{}]最新cron表达式[{}]=====", originSpringScheduleCron.getBeanName(), currentCronExpression);
originSpringScheduleCron.setCronExpression(currentCronExpression);
}
} catch (Exception e) {
log.error("=====更新cron表达式异常=====", e);
}
return new CronTrigger(originSpringScheduleCron.getCronExpression()).nextExecutionTime(triggerContext);
}
);
}
// 定时任务管理界面
InetAddress localHost = Inet4Address.getLocalHost();
String contextPath = "http://".concat(localHost.getHostAddress()).concat(":").concat(port);
log.info("定时任务管理页面:{}", contextPath.concat("/scheduleManagement/taskList"));
}
}
  1. 通过自定义配置类在服务启动时从数据库读取所有定时任务配置,将定时任务的每个实现类通过addTriggerTask方法注册到taskRegistrar对象中。
  2. 执行定时任务具体的业务逻辑,可以异步线程执行。
  3. 每次执行定时任务后,会根据beanName再次查库获取该定时任务的最新配置参数,更新下一次的执行周期。

因为是非分布式调度,集群环境可以通过定时任务开关只让一台运行定时任务调度,或者多台运行在具体实现上自行加锁。

5、定时任务接口和具体实现

5.1、接口定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public interface ScheduleService extends Runnable {
Logger log = LoggerFactory.getLogger(ScheduleService.class);

void execute();

@Override
default void run() {
// 获取当前执行的定时任务beanName
String classPath = this.getClass().getName();
String className = classPath.substring(classPath.lastIndexOf(".") + 1);
String beanName = (className.charAt(0) + "").toLowerCase().concat(className.substring(1));
// 查询是否启用
Environment environment = SpringUtil.getBean(Environment.class);
String appId = environment.getProperty("spring.application.name");
SpringScheduleCronDao dao = SpringUtil.getBean(SpringScheduleCronDao.class);
SpringScheduleCron springScheduleCron = dao.findByBeanName(appId, beanName);
if (springScheduleCron.getEnable() != 1) {
log.debug("=====[{}]不可用=====", beanName);
return;
}
execute();
}
}

每次执行定时任务时,先查表判断该定时任务是否启用,启用则再调用execute方法执行具体的业务逻辑。

5.2、定时任务具体实现

  1. 需要实现ScheduleService接口。
  2. beanName需要使用默认的规则(类名首字母小写),不要自定义beanName
  3. 为了防止定时任务异常导致定时任务调度的主线程挂掉或者定时任务执行耗时过长导致其它定时任务阻塞未被调度,最好需要以异步方式执行定时任务的具体业务逻辑。这里用到了我们之前创建的线程池来执行异步任务。

定时任务1

1
2
3
4
5
6
7
8
9
10
@Service
@Slf4j
@Async
public class ScheduleTask1 implements ScheduleService {
@Override
public void execute() {
String now = DateUtil.now();
log.info("ScheduleTask1:[{}]", now);
}
}

定时任务2

1
2
3
4
5
6
7
8
9
10
@Service
@Slf4j
@Async
public class ScheduleTask2 implements ScheduleService {
@Override
public void execute() {
String now = DateUtil.now();
log.info("ScheduleTask2:[{}]", now);
}
}

定时任务3

1
2
3
4
5
6
7
8
9
10
@Service
@Slf4j
@Async
public class ScheduleTask3 implements ScheduleService {
@Override
public void execute() {
String now = DateUtil.now();
log.info("ScheduleTask3:[{}]", now);
}
}

5.3、定时任务配置入库

1
2
3
4
5
6
7
8
9
-- MySQL
insert into spring_schedule_cron values (1, 'scheduleTask1', '*/5 * * * * ?', '定时任务描述1', 1, 'task');
insert into spring_schedule_cron values (2, 'scheduleTask2', '*/6 * * * * ?', '定时任务描述2', 1, 'task');
insert into spring_schedule_cron values (3, 'scheduleTask3', '*/7 * * * * ?', '定时任务描述3', 1, 'task');

--Oracle
insert into spring_schedule_cron values (seq_spring_schedule_cron.NEXTVAL, 'scheduleTask1', '*/3 * * * * ?', '定时任务描述1', 1, 'task');
insert into spring_schedule_cron values (seq_spring_schedule_cron.NEXTVAL, 'scheduleTask2', '*/5 * * * * ?', '定时任务描述2', 1, 'task');
insert into spring_schedule_cron values (seq_spring_schedule_cron.NEXTVAL, 'scheduleTask3', '*/7 * * * * ?', '定时任务描述3', 1, 'task');

6、测试定时任务

在配置文件application.yml中开启定时任务开关。

1
scheduleSwitch: true

启动服务后可以看到,执行定时任务输出的日志。

1
2
3
2020-07-28 17:53:54,241 [INFO] [scheduleTask-2] [net.zhaoxiaobin.task.service.impl.ScheduleTask2:24] [] ScheduleTask2:[2020-07-28 17:53:54]
2020-07-28 17:53:55,185 [INFO] [scheduleTask-3] [net.zhaoxiaobin.task.service.impl.ScheduleTask1:27] [] ScheduleTask1:[2020-07-28 17:53:55]
2020-07-28 17:53:56,119 [INFO] [scheduleTask-4] [net.zhaoxiaobin.task.service.impl.ScheduleTask3:24] [] ScheduleTask3:[2020-07-28 17:53:56]

7、定时任务管理页面

采用thymeleaf模板实现任务调度中心的管理页面。具有以下功能

  1. 实时启停定时任务。
  2. 更改cron表达式(下一次执行才会更新开始生效)。
  3. 手动执行定时任务。
  4. 由于在服务启动时需要查库并注册所有定时任务,服务启动后添加并不生效,所以页面没有新增定时任务的功能。

-w1361

参考链接

代码地址