本文以springboot中cron表达式配置的定时任务为例子。
在springboot中的启动类中添加@EnableScheduling注解,在beanFactory中添加ScheduledAnnotationBeanPostProcessor作为bean初始化完毕后的后置处理器来添加关于spring的定时任务处理支持。
在ScheduledAnnotationBeanPostProcessor类中,含有定时任务的注册器ScheduleTaskRegistrar来负责管理定时任务。
private final ScheduledTaskRegistrar registrar = new ScheduledTaskRegistrar();
在定时任务处理器在beanFactory中作为bean创建完毕之后,将会调用finishRegistration()方法来完成相关注册器的一系列配置。
public void onApplicationEvent(ContextRefreshedEvent event) {if (event.getApplicationContext() == this.applicationContext) {finishRegistration();}
}private void finishRegistration() {if (this.scheduler != null) {this.registrar.setScheduler(this.scheduler);}if (this.beanFactory instanceof ListableBeanFactory) {Map<String, SchedulingConfigurer> configurers =((ListableBeanFactory) this.beanFactory).getBeansOfType(SchedulingConfigurer.class);for (SchedulingConfigurer configurer : configurers.values()) {configurer.configureTasks(this.registrar);}}if (this.registrar.hasTasks() && this.registrar.getScheduler() == null) {Assert.state(this.beanFactory != null, "BeanFactory must be set to find scheduler by type");try { this.registrar.setTaskScheduler(this.beanFactory.getBean(TaskScheduler.class));}catch (NoUniqueBeanDefinitionException ex) {try {this.registrar.setTaskScheduler(this.beanFactory.getBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, TaskScheduler.class));}catch (NoSuchBeanDefinitionException ex2) {if (logger.isInfoEnabled()) {logger.info("More than one TaskScheduler bean exists within the context, and " +"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +ex.getBeanNamesFound());}}}catch (NoSuchBeanDefinitionException ex) {logger.debug("Could not find default TaskScheduler bean", ex);try {this.registrar.setScheduler(this.beanFactory.getBean(ScheduledExecutorService.class));}catch (NoUniqueBeanDefinitionException ex2) {try {this.registrar.setScheduler(this.beanFactory.getBean(DEFAULT_TASK_SCHEDULER_BEAN_NAME, ScheduledExecutorService.class));}catch (NoSuchBeanDefinitionException ex3) {if (logger.isInfoEnabled()) {logger.info("More than one ScheduledExecutorService bean exists within the context, and " +"none is named 'taskScheduler'. Mark one of them as primary or name it 'taskScheduler' " +"(possibly as an alias); or implement the SchedulingConfigurer interface and call " +"ScheduledTaskRegistrar#setScheduler explicitly within the configureTasks() callback: " +ex2.getBeanNamesFound());}}}catch (NoSuchBeanDefinitionException ex2) {logger.debug("Could not find default ScheduledExecutorService bean", ex2); logger.info("No TaskScheduler/ScheduledExecutorService bean found for scheduled processing");}}}this.registrar.afterPropertiesSet();
}
在此处,如果已经在注册器当中已经注册有相关的任务,但是注册器当中的定时任务管理器如果不存在,将会试着从beanFactory将任务管理器设置相应的管理器。在这之后,将会调用注册器的afterPropertiesSet()方法,来进行注册器的相关配置。
public void afterPropertiesSet() {scheduleTasks();
}
protected void scheduleTasks() {if (this.taskScheduler == null) {this.localExecutor = Executors.newSingleThreadScheduledExecutor();this.taskScheduler = new ConcurrentTaskScheduler(this.localExecutor);}if (this.triggerTasks != null) {for (TriggerTask task : this.triggerTasks) {addScheduledTask(scheduleTriggerTask(task));}}if (this.cronTasks != null) {for (CronTask task : this.cronTasks) {addScheduledTask(scheduleCronTask(task));}}if (this.fixedRateTasks != null) {for (IntervalTask task : this.fixedRateTasks) {addScheduledTask(scheduleFixedRateTask(task));}}if (this.fixedDelayTasks != null) {for (IntervalTask task : this.fixedDelayTasks) {addScheduledTask(scheduleFixedDelayTask(task));}}
}
在afterPropertiesSet方法中直接调用scheduleTask()方法进行相关注册器配置。
在这里,我们可以看一下注册器的成员。
private TaskScheduler taskScheduler;private ScheduledExecutorService localExecutor;private List<TriggerTask> triggerTasks;private List<CronTask> cronTasks;private List<IntervalTask> fixedRateTasks;private List<IntervalTask> fixedDelayTasks;private final Map<Task, ScheduledTask> unresolvedTasks = new HashMap<Task, ScheduledTask>(16);
成员相当直观。
其中四个list分别存储还没有加进线程池四种方式配置的定时任务任务,在不存在管理器和线程池的情况下缓存在其中,一旦建立起两者,现试图将这四个list存储的任务加进管理器和线程池。本文主要解析cron表达式配置的定时任务。
unresolvedTasks主要存储还没有正式进入线程池的定时任务配置类与该配置类具体的定时任务实例的键值对。
taskSceduler作为注册器当中的任务管理器,localExcutor作为注册器的线程池。可以剖析一下两者的关系。
先看线程池ScheduledExecutorService类。
ScheduledExecutorService作为接口直接继承了jdk的线程池ExecutorService接口。可见,spring的定时任务线程池是完全基于jdk的线程池的实现的。而我们可以在scheduleTasks()方法里看到默认采用的线程池的实现。Executors.newSingleThreadScheduledExecutor()方法直接申请了线程池(Executors是jdk给出的直接申请线程池的工具类)。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));
}public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,new DelayedWorkQueue());
}
可以让人欣喜的发现,ScheduledThreadPoolExecutor作为线程池直接继承自ThreadPoolExecutor线程池,主要来支持周期性方法的调度,即使是在构造方法,也是直接super()了ThreadPoolExecutor的构造方法。 值得注意的是,corePoolSize为1,这代表着这个线程池的工作线程在一般情况下只有一条。
我们可以看相比父类,ScheduledThreadPoolExecutor最核心的schedule()方法。
public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));delayedExecute(t);return t;
}
这一方法需要三个参数,分别是所要执行执行的具体任务,后两者分别是每两次执行相隔的时间与时间单位。但是,spring中的定时执行周期任务并不是依靠这里来实现的。这里我们将时间和任务作为参数来生成内部任务类,ScheduledFutureTask类(继承自FutureTask类),但是我们可以看相关的构造方法。
ScheduledFutureTask(Runnable r, V result, long ns) {super(r, result);this.time = ns;this.period = 0;this.sequenceNumber = sequencer.getAndIncrement();
}
相关周期执行的参数:
private final long period;
0代表该任务不是周期执行的方法。
public void run() {boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);else if (!periodic)ScheduledFutureTask.super.run();else if (ScheduledFutureTask.super.runAndReset()) {setNextRunTime();reExecutePeriodic(outerTask);}
}
在ScheduledFutureTask类中,如果period如果为0,只是简单调用父类(FutureTask)的run方法来执行所需要的任务,并没有周期执行。
在线程池schedule()方法的最后,调用delayedExecute()方法加入延时队列以确保任务的准时进行。
以上,是spring定时任务注册器当中的定时任务线程池。
接下来是定时任务注册器的定时任务管理器。
定时任务管理器在scheduleTasks()方法,默认ConcurrentTaskScheduler类来实现。
可以明显看到直接在ConcurrentTaskScheduler的构造方法中将现有的定时任务线程池传入作为管理器的线程池。具体使用,接下里再说。
在scheduleTasks()方法的最后,将现有的未执行的定时任务放入刚刚初始化完毕的管理器执行任务。
以上是定时任务后置处理器与注册器初始化的具体流程。
下面,具体的定时任务装载与执行。
public Object postProcessAfterInitialization(final Object bean, String beanName) {Class<?> targetClass = AopUtils.getTargetClass(bean);if (!this.nonAnnotatedClasses.contains(targetClass)) {Map<Method, Set<Scheduled>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,new MethodIntrospector.MetadataLookup<Set<Scheduled>>() {@Overridepublic Set<Scheduled> inspect(Method method) {Set<Scheduled> scheduledMethods = AnnotatedElementUtils.getMergedRepeatableAnnotations(method, Scheduled.class, Schedules.class);return (!scheduledMethods.isEmpty() ? scheduledMethods : null);}});if (annotatedMethods.isEmpty()) {this.nonAnnotatedClasses.add(targetClass);if (logger.isTraceEnabled()) {logger.trace("No @Scheduled annotations found on bean class: " + bean.getClass());}}else {for (Map.Entry<Method, Set<Scheduled>> entry : annotatedMethods.entrySet()) {Method method = entry.getKey();for (Scheduled scheduled : entry.getValue()) {processScheduled(scheduled, method, bean);}}if (logger.isDebugEnabled()) {logger.debug(annotatedMethods.size() + " @Scheduled methods processed on bean '" + beanName +"': " + annotatedMethods);}}}return bean;
}
在beanFacctory中bean装载完毕后,调用后置处理器的postProcessAfterInitialization()方法。
首先,遍历bean的方法,找到所有经过@scheduled注解的方法。如果有,说明该bean有需要被执行的定时任务。那么,遍历bean中所有被注解的方法,依次调用processScheduled()方法完成定时任务的装载注册与执行。
protected void processScheduled(Scheduled scheduled, Method method, Object bean) {try {Assert.isTrue(method.getParameterTypes().length == 0,"Only no-arg methods may be annotated with @Scheduled");Method invocableMethod = AopUtils.selectInvocableMethod(method, bean.getClass());Runnable runnable = new ScheduledMethodRunnable(bean, invocableMethod);boolean processedSchedule = false;String errorMessage ="Exactly one of the 'cron', 'fixedDelay(String)', or 'fixedRate(String)' attributes is required";Set<ScheduledTask> tasks = new LinkedHashSet<ScheduledTask>(4);long initialDelay = scheduled.initialDelay();String initialDelayString = scheduled.initialDelayString();if (StringUtils.hasText(initialDelayString)) {Assert.isTrue(initialDelay < 0, "Specify 'initialDelay' or 'initialDelayString', not both");if (this.embeddedValueResolver != null) {initialDelayString = this.embeddedValueResolver.resolveStringValue(initialDelayString);}try {initialDelay = Long.parseLong(initialDelayString);}catch (NumberFormatException ex) {throw new IllegalArgumentException("Invalid initialDelayString value \"" + initialDelayString + "\" - cannot parse into integer");}}String cron = scheduled.cron();if (StringUtils.hasText(cron)) {Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");processedSchedule = true;String zone = scheduled.zone();if (this.embeddedValueResolver != null) {cron = this.embeddedValueResolver.resolveStringValue(cron);zone = this.embeddedValueResolver.resolveStringValue(zone);}TimeZone timeZone;if (StringUtils.hasText(zone)) {timeZone = StringUtils.parseTimeZoneString(zone);}else {timeZone = TimeZone.getDefault();}tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));}if (initialDelay < 0) {initialDelay = 0;}long fixedDelay = scheduled.fixedDelay();if (fixedDelay >= 0) {Assert.isTrue(!processedSchedule, errorMessage);processedSchedule = true;tasks.add(this.registrar.scheduleFixedDelayTask(new IntervalTask(runnable, fixedDelay, initialDelay)));}String fixedDelayString = scheduled.fixedDelayString();if (StringUtils.hasText(fixedDelayString)) {Assert.isTrue(!processedSchedule, errorMessage);processedSchedule = true;if (this.embeddedValueResolver != null) {fixedDelayString = this.embeddedValueResolver.resolveStringValue(fixedDelayString);}try {fixedDelay = Long.parseLong(fixedDelayString);}catch (NumberFormatException ex) {throw new IllegalArgumentException("Invalid fixedDelayString value \"" + fixedDelayString + "\" - cannot parse into integer");}tasks.add(this.registrar.scheduleFixedDelayTask(new IntervalTask(runnable, fixedDelay, initialDelay)));}long fixedRate = scheduled.fixedRate();if (fixedRate >= 0) {Assert.isTrue(!processedSchedule, errorMessage);processedSchedule = true;tasks.add(this.registrar.scheduleFixedRateTask(new IntervalTask(runnable, fixedRate, initialDelay)));}String fixedRateString = scheduled.fixedRateString();if (StringUtils.hasText(fixedRateString)) {Assert.isTrue(!processedSchedule, errorMessage);processedSchedule = true;if (this.embeddedValueResolver != null) {fixedRateString = this.embeddedValueResolver.resolveStringValue(fixedRateString);}try {fixedRate = Long.parseLong(fixedRateString);}catch (NumberFormatException ex) {throw new IllegalArgumentException("Invalid fixedRateString value \"" + fixedRateString + "\" - cannot parse into integer");}tasks.add(this.registrar.scheduleFixedRateTask(new IntervalTask(runnable, fixedRate, initialDelay)));}Assert.isTrue(processedSchedule, errorMessage);synchronized (this.scheduledTasks) {Set<ScheduledTask> registeredTasks = this.scheduledTasks.get(bean);if (registeredTasks == null) {registeredTasks = new LinkedHashSet<ScheduledTask>(4);this.scheduledTasks.put(bean, registeredTasks);}registeredTasks.addAll(tasks);}}catch (IllegalArgumentException ex) {throw new IllegalStateException("Encountered invalid @Scheduled method '" + method.getName() + "': " + ex.getMessage());}
}
首先通过反射机制找到具体要执行的方法(确保该方法是具体可以访问的)。
接下来则是具体的参数解析,逻辑很清楚。我们可以很简单的发现具体配置方式的解析,我们可以直接看这段cron表达式的解析。
String cron = scheduled.cron();
if (StringUtils.hasText(cron)) {Assert.isTrue(initialDelay == -1, "'initialDelay' not supported for cron triggers");processedSchedule = true;String zone = scheduled.zone();if (this.embeddedValueResolver != null) {cron = this.embeddedValueResolver.resolveStringValue(cron);zone = this.embeddedValueResolver.resolveStringValue(zone);}TimeZone timeZone;if (StringUtils.hasText(zone)) {timeZone = StringUtils.parseTimeZoneString(zone);}else {timeZone = TimeZone.getDefault();}tasks.add(this.registrar.scheduleCronTask(new CronTask(runnable, new CronTrigger(cron, timeZone))));
}
得到注解中的cron表达式,以及时区信息(如果没有则采用默认时区)。如果存在数据解释器,则通过数据解释器得到具体的表达式。在获取成功之后,首先根据cron表达式建立cron触发器cronTrigger
private final CronSequenceGenerator sequenceGenerator;
触发器只有一个成员,是具体的cron表达式解析的工具与存放结果,以及时间操作。
public CronTrigger(String expression, TimeZone timeZone) {this.sequenceGenerator = new CronSequenceGenerator(expression, timeZone);
}
在构造方法中根据cron表达式与时区,解析并存放结果。
下面是CronmSequenceGenerator的成员。
private final String expression;private final TimeZone timeZone;private final BitSet months = new BitSet(12);private final BitSet daysOfMonth = new BitSet(31);private final BitSet daysOfWeek = new BitSet(7);private final BitSet hours = new BitSet(24);private final BitSet minutes = new BitSet(60);private final BitSet seconds = new BitSet(60);
太直观了,从上往下,表达式,时区,月,月的某一天,周的某一天,时分秒。
在构造方法根据时区与cron表达式解析。
public CronSequenceGenerator(String expression, TimeZone timeZone) {this.expression = expression;this.timeZone = timeZone;parse(expression);
}
private void parse(String expression) throws IllegalArgumentException {String[] fields = StringUtils.tokenizeToStringArray(expression, " ");if (!areValidCronFields(fields)) {throw new IllegalArgumentException(String.format("Cron expression must consist of 6 fields (found %d in \"%s\")", fields.length, expression));}setNumberHits(this.seconds, fields[0], 0, 60);setNumberHits(this.minutes, fields[1], 0, 60);setNumberHits(this.hours, fields[2], 0, 24);setDaysOfMonth(this.daysOfMonth, fields[3]);setMonths(this.months, fields[4]);setDays(this.daysOfWeek, replaceOrdinals(fields[5], "SUN,MON,TUE,WED,THU,FRI,SAT"), 8);if (this.daysOfWeek.get(7)) {this.daysOfWeek.set(0);this.daysOfWeek.clear(7);}
}
只要了解了cron的表达式的具体构造,解析方式相当直观而且简单。
在触发器中有直接返回下一次执行时间的方法。
public Date nextExecutionTime(TriggerContext triggerContext) {Date date = triggerContext.lastCompletionTime();if (date != null) {Date scheduled = triggerContext.lastScheduledExecutionTime();if (scheduled != null && date.before(scheduled)) {date = scheduled;}}else {date = new Date();}return this.sequenceGenerator.next(date);
}
很简单,根据传入的时间或者当前时间,直接返回下一次的调用时间。
具体下一次的调用时间通过CronmSequenceGenerator的next()方法来返回。
public Date next(Date date) {Calendar calendar = new GregorianCalendar();calendar.setTimeZone(this.timeZone);calendar.setTime(date);calendar.set(Calendar.MILLISECOND, 0);long originalTimestamp = calendar.getTimeInMillis();doNext(calendar, calendar.get(Calendar.YEAR));if (calendar.getTimeInMillis() == originalTimestamp) {calendar.add(Calendar.SECOND, 1);doNext(calendar, calendar.get(Calendar.YEAR));}return calendar.getTime();
}
private void doNext(Calendar calendar, int dot) {List<Integer> resets = new ArrayList<Integer>();int second = calendar.get(Calendar.SECOND);List<Integer> emptyList = Collections.emptyList();int updateSecond = findNext(this.seconds, second, calendar, Calendar.SECOND, Calendar.MINUTE, emptyList);if (second == updateSecond) {resets.add(Calendar.SECOND);}int minute = calendar.get(Calendar.MINUTE);int updateMinute = findNext(this.minutes, minute, calendar, Calendar.MINUTE, Calendar.HOUR_OF_DAY, resets);if (minute == updateMinute) {resets.add(Calendar.MINUTE);}else {doNext(calendar, dot);}int hour = calendar.get(Calendar.HOUR_OF_DAY);int updateHour = findNext(this.hours, hour, calendar, Calendar.HOUR_OF_DAY, Calendar.DAY_OF_WEEK, resets);if (hour == updateHour) {resets.add(Calendar.HOUR_OF_DAY);}else {doNext(calendar, dot);}int dayOfWeek = calendar.get(Calendar.DAY_OF_WEEK);int dayOfMonth = calendar.get(Calendar.DAY_OF_MONTH);int updateDayOfMonth = findNextDay(calendar, this.daysOfMonth, dayOfMonth, daysOfWeek, dayOfWeek, resets);if (dayOfMonth == updateDayOfMonth) {resets.add(Calendar.DAY_OF_MONTH);}else {doNext(calendar, dot);}int month = calendar.get(Calendar.MONTH);int updateMonth = findNext(this.months, month, calendar, Calendar.MONTH, Calendar.YEAR, resets);if (month != updateMonth) {if (calendar.get(Calendar.YEAR) - dot > 4) {throw new IllegalArgumentException("Invalid cron expression \"" + this.expression +"\" led to runaway search for next trigger");}doNext(calendar, dot);}}
具体的cron表达式的处理这里就暂且不展开了。
在生成完cron触发器之后,生成cronTask,cron任务。CronTask继承自TriggerTask,在后者的基础上除了触发器和具体的线程任务,添加了表达式的存放。
在cronTask建立完毕后,通过注册器调用scheduleCronTask()方法在注册器中准备调用。
public ScheduledTask scheduleCronTask(CronTask task) {ScheduledTask scheduledTask = this.unresolvedTasks.remove(task);boolean newTask = false;if (scheduledTask == null) {scheduledTask = new ScheduledTask();newTask = true;}if (this.taskScheduler != null) {scheduledTask.future = this.taskScheduler.schedule(task.getRunnable(), task.getTrigger());}else {addCronTask(task);this.unresolvedTasks.put(task, scheduledTask);}return (newTask ? scheduledTask : null);
}
首先判断是不是已经注册过,如果注册过就不用从新建立具体的scheduledTask任务了。
之后如果已经存在线程池直接调用注册器的任务管理器的schedule方法将抽象的任务变成具体的定时任务,否则放入等待数组并建立抽象任务与具体任务的键值对。
public ScheduledFuture<?> schedule(Runnable task, Trigger trigger) {try {if (this.enterpriseConcurrentScheduler) {return new EnterpriseConcurrentTriggerScheduler().schedule(decorateTask(task, true), trigger);}else {ErrorHandler errorHandler = (this.errorHandler != null ? this.errorHandler : TaskUtils.getDefaultErrorHandler(true));return new ReschedulingRunnable(task, trigger, this.scheduledExecutor, errorHandler).schedule();}}catch (RejectedExecutionException ex) {throw new TaskRejectedException("Executor [" + this.scheduledExecutor + "] did not accept task: " + task, ex);}
}
这里的scheduledExecutor就是注册器在构造方法传入的线程池。这里将会生成一个新的ReschedulingRunnable定时任务返回给注册器。
ReschedulingRunnable继承自DelegatingErrorHandlingRunnable类。
DelegatingErrorHandlingRunnable实现了Runnable的接口,自然就有run()方法。两个成员。
private final Runnable delegate;private final ErrorHandler errorHandler;
所要执行的线程和错误处理器。
public void run() {try {this.delegate.run();}catch (UndeclaredThrowableException ex) {this.errorHandler.handleError(ex.getUndeclaredThrowable());}catch (Throwable ex) {this.errorHandler.handleError(ex);}
}
run()方法一目了然。
那么就是定时任务的核心所在了。
public ReschedulingRunnable(Runnable delegate, Trigger trigger, ScheduledExecutorService executor, ErrorHandler errorHandler) {super(delegate, errorHandler);this.trigger = trigger;this.executor = executor;
}
ReschedulingRunnable的狗仔方法一目了然。在接下来管理器会调用ReschedulingRunnable的schedule()方法。
public ScheduledFuture<?> schedule() {synchronized (this.triggerContextMonitor) {this.scheduledExecutionTime = this.trigger.nextExecutionTime(this.triggerContext);if (this.scheduledExecutionTime == null) {return null;}long initialDelay = this.scheduledExecutionTime.getTime() - System.currentTimeMillis();this.currentFuture = this.executor.schedule(this, initialDelay, TimeUnit.MILLISECONDS);return this;}
}
直接通过触发器得到下次的执行时间,计算当前时间距离下次调用时间的具体数字,调用我们一开始就提到过的线程池的schedule方法来将该线程丢入线程池。
既然我们将其丢入了线程池,那一定会执行run()方法。
public void run() {Date actualExecutionTime = new Date();super.run();Date completionTime = new Date();synchronized (this.triggerContextMonitor) {this.triggerContext.update(this.scheduledExecutionTime, actualExecutionTime, completionTime);if (!this.currentFuture.isCancelled()) {schedule();}}
}
在run()方法中调用了父类的run()方法来执行具体所要执行的任务。之后更新起止时间,如果当前任务没有被取消,就再一次调用schedule()方法重复上一次操作,继续把自己试着扔进线程池,已完成定期周期执行任务的目的!
在完成了上述步骤后,根据是否是第一次任务返回具体的任务或者是null。
在后置处理器依次完成了其他方式配置的任务后,将所有完成新注册的任务存放在map中,定时任务的建立宣告完毕。