(1). 概述
在前面有剖析过,XxlJobSpringExecutor实现了:SmartInitializingSingleton,所以Spring初始化之后,会回调:SmartInitializingSingleton.afterSingletonsInstantiated方法. 所以:XxlJobSpringExecutor类的入口在:afterSingletonsInstantiated方法上.
(2). 看下XxlJobSpringExecutor类结构图
(3). XxlJobSpringExecutor.afterSingletonsInstantiated
public void afterSingletonsInstantiated() {
// ***********************************************************
// 4. init JobHandler Repository (for method)
// 根据方法上的注解,初始化:JobHandler
// ***********************************************************
initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start
try {
// *********************************************************
// 6. 调用父类:XxlJobExecutor.start
// *********************************************************
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
(4). XxlJobSpringExecutor.initJobHandlerMethodRepository
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
// 从Spring容器里获得所有的Bean的名称
// init job handler from method
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
// 遍历Bean名称,获得Bean信息
for (String beanDefinitionName : beanDefinitionNames) {
// 获得Bean对象
Object bean = applicationContext.getBean(beanDefinitionName);
Map<Method, XxlJob> annotatedMethods = null; // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
try {
// 获取方法上:@XxlJob注解
annotatedMethods = MethodIntrospector.selectMethods(
bean.getClass(),
new MethodIntrospector.MetadataLookup<XxlJob>() {
@Override
public XxlJob inspect(Method method) {
return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
}
});
} catch (Throwable ex) {
logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
}
if (annotatedMethods==null || annotatedMethods.isEmpty()) {
// 不存在注解,则跳过
continue;
}
for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
Method method = methodXxlJobEntry.getKey();
XxlJob xxlJob = methodXxlJobEntry.getValue();
if (xxlJob == null) {
continue;
}
// @XxlJob(name = "")
// 注解必须要配置名称
String name = xxlJob.value();
if (name.trim().length() == 0) {
throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
}
// ********************************************************
// 5. XxlJobSpringExecutor.registJobHandler/loadJobHandler
// @XxlJob配置的名称不能有相同的,否则抛出异常
// ********************************************************
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}
// 标注@XxlJob(name = "")的方法,入参是有要求的
// 这个方法,必须要有一个参数,而且入参必须是String,否则抛出异常.
// execute method
if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {
throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
"The correct method format like \" public ReturnT<String> execute(String param) \" .");
}
// 标注@XxlJob(name = "")的方法,返回值是有要求的
// 这个方法,必须要有返回值,而且返回值必须要是:ReturnT
if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {
throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +
"The correct method format like \" public ReturnT<String> execute(String param) \" .");
}
// 设置方法可以访问
method.setAccessible(true);
// init and destory
Method initMethod = null;
Method destroyMethod = null;
// 注解上有配置init方法
if (xxlJob.init().trim().length() > 0) {
try {
initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
}
}
// 注解上有配置destory方法
if (xxlJob.destroy().trim().length() > 0) {
try {
destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + method.getName() + "] .");
}
}
// ******************************************************************
// 5. XxlJobSpringExecutor.registJobHandler/loadJobHandler
// 5.1. 把@XxlJob注解信息,转换成:MethodJobHandler(属于:IJobHandler的实现)对象.
// 5.2. 把MethodJobHandler向MAP注册.
// ******************************************************************
// registry jobhandler
registJobHandler(name, new MethodJobHandler(bean, method, initMethod, destroyMethod));
} // end for
}// end for
}//end initJobHandlerMethodRepository
(5). XxlJobSpringExecutor.registJobHandler/loadJobHandler
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
// 注册IJobHandler
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
return jobHandlerRepository.put(name, jobHandler);
}// end registJobHandler
// 根据名称获取:IJobHandler
public static IJobHandler loadJobHandler(String name){
return jobHandlerRepository.get(name);
} // end loadJobHandler
(6). XxlJobExecutor.start
public void start() throws Exception {
// init logpath
// 初始化日志路径
XxlJobFileAppender.initLogPath(logPath);
// 7. init invoker, admin-client
// 初始化与admin的交互客户端.
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread
// 日志轮询线程
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread
// 触发器回调线程启动,当任务处理完之后,通过独立的线程向admin汇报处理结果.
TriggerCallbackThread.getInstance().start();
// ********************************************************
// init executor-server
// 8. XxlJobExecutor.initEmbedServer
// 初始化内嵌的Web服务器,基于Netty开发.
// ********************************************************
initEmbedServer(address, ip, port, appname, accessToken);
}
(7). XxlJobExecutor.initAdminBizList
private static List<AdminBiz> adminBizList;
// adminAddresses = "http://127.0.0.1:8080/xxl-job-admin"
private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
if (adminAddresses!=null && adminAddresses.trim().length()>0) {
// adminAddresses按照"逗号"分隔,创建:AdminBiz对象.
for (String address: adminAddresses.trim().split(",")) {
if (address!=null && address.trim().length()>0) {
AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);
if (adminBizList == null) {
adminBizList = new ArrayList<AdminBiz>();
}
// 添加到集合
adminBizList.add(adminBiz);
}
}
}
}// end initAdminBizList
public static List<AdminBiz> getAdminBizList(){
return adminBizList;
}// end getAdminBizList
(8). XxlJobExecutor.initEmbedServer
private EmbedServer embedServer = null;
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
// fill ip port
// 如果没有指定port,则使用9999 ~ 65535之间的端口
port = port > 0 ? port : NetUtil.findAvailablePort(9999);
// 没有指定IP,则获取本机IP地址,注意,多网卡,如果是外网,建议配置某个网卡的IP
ip = (ip != null && ip.trim().length() > 0 ) ? ip: IpUtil.getIp();
// 没有指定address的情况下
// generate address
if (address==null || address.trim().length()==0) {
String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null
// 产生一个address
// 比如: http://192.168.1.100:9999
address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
}
// *******************************************************************
// 创建:EmbedServer并启动
// *******************************************************************
// start
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
}// end initEmbedServer
(9). 总结
其实,上面一张UML图之后,XXL-Job基本功能尽收眼底,XxlJobSpringExecutor的职责如下:
- 把Method上的@XxlJob(name=”helloJob”)转换成:MethodJobHandler对象,通过Map(key=”helloJob”,value=MethodJobHandler) Hold住这些信息.
- 初始化日志路径.
- 创建AdminBiz(该对象主要是与admin进行交互).
- 初始化日志轮询线程.
- 初始化回调线程(用于向admin汇报jobId执行状态).
- 创建内嵌Web Server,用于接受来自于:admin的请求信息.
- 看完UML图后感觉,基本功能已经很清楚了,可以把重心可以放到xxl-job-admin了.