xxl-job任务调度中间件解决定时任务的调度问题

xxl-job是一款非常优秀的任务调度中间件,轻量级、使用简单、支持分布式等优点,让它广泛应用在我们的项目中,解决了不少定时任务的调度问题。
我们都知道,在使用过程中需要先到xxl-job的任务调度中心页面上,配置执行器executor 和具体的任务job ,这一过程如果项目中的定时任务数量不多还好说,如果任务多了的话还是挺费工夫的。
假设项目中有上百个这样的定时任务,那么每个任务都需要走一遍绑定jobhander后端接口,填写cron表达式这个流程…
我就想问问,填多了谁能不迷糊?
于是出于功能优化(偷懒 )这一动机,前几天我萌生了一个想法,有没有什么方法能够告别xxl-job的管理页面,能够让我不再需要到页面上去手动注册执行器和任务,实现让它们自动注册到调度中心呢。
分析
分析一下,其实我们要做的很简单,只要在项目启动时主动注册executor和各个jobhandler到调度中心就可以了,流程如下:
有的小伙伴们可能要问了,我在页面上创建执行器 的时候,不是有一个选项叫做自动注册 吗,为什么我们这里还要自己添加新执行器?
其实这里有个误区,这里的自动注册指的是会根据项目中配置的xxl.job.executor.appname,将配置的机器地址自动注册到这个执行器的地址列表中。但是如果你之前没有手动创建过执行器,那么是不会给你自动添加一个新执行器到调度中心的。
既然有了想法咱们就直接开干,先到github上拉一份xxl-job的源码下来
整个项目导入idea后,先看一下结构:
结合着文档和代码,先梳理一下各个模块都是干什么的:
xxl-job-admin:任务调度中心,启动后就可以访问管理页面,进行执行器和任务的注册、以及任务调用等功能了
xxl-job-core:公共依赖,项目中使用到xxl-job时要引入的依赖包
xxl-job-executor-samples:执行示例,分别包含了springboot版本和不使用框架的版本
为了弄清楚注册和查询executor和jobhandler调用的是哪些接口,我们先从页面上去抓一个请求看看:
好了,这样就能定位到xxl-job-admin模块中/jobgroup/save这个接口,接下来可以很容易地找到源码位置:
按照这个思路,可以找到下面这几个关键接口:
/jobgroup/pagelist:执行器列表的条件查询
/jobgroup/save:添加执行器
/jobinfo/pagelist:任务列表的条件查询
/jobinfo/add:添加任务
但是如果直接调用这些接口,那么就会发现它会跳转到xxl-job-admin的的登录页面:
其实想想也明白,出于安全性考虑,调度中心的接口也不可能允许裸调的。那么再回头看一下刚才页面上的请求就会发现,它在headers中添加了一条名为xxl_job_login_identity的cookie:
至于这条cookie,则是在通过用户名和密码调用调度中心的/login接口时返回的,在返回的response可以直接拿到。只要保存下来,并在之后每次请求时携带,就能够正常访问其他接口了。
到这里,我们需要的5个接口就基本准备齐了,接下来准备开始正式的改造工作。
基于 spring boot + mybatis plus + vue & element 实现的后台管理系统 + 用户小程序,支持 rbac 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
改造
我们改造的目的是实现一个starter,以后只要引入这个starter就能实现executor和jobhandler的自动注册,要引入的关键依赖有下面两个:
com.xuxueli    xxl-job-core    2.3.0    org.springframework.boot    spring-boot-autoconfigure  
1、接口调用
在调用调度中心的接口前,先把xxl-job-admin模块中的xxljobinfo和xxljobgroup这两个类拿到我们的starter项目中,用于接收接口调用的结果。
登录接口
创建一个jobloginservice,在调用业务接口前,需要通过登录接口获取cookie,并在获取到cookie后,缓存到本地的map中。
private final map logincookie=new hashmap();public void login() {    string url=adminaddresses+/login;    httpresponse response = httprequest.post(url)            .form(username,username)            .form(password,password)            .execute();    list cookies = response.getcookies();    optional cookieopt = cookies.stream()            .filter(cookie -> cookie.getname().equals(xxl_job_login_identity)).findfirst();    if (!cookieopt.ispresent())        throw new runtimeexception(get xxl-job cookie error!);    string value = cookieopt.get().getvalue();    logincookie.put(xxl_job_login_identity,value);}  
其他接口在调用时,直接从缓存中获取cookie,如果缓存中不存在则调用/login接口,为了避免这一过程失败,允许最多重试3次。
public string getcookie() {    for (int i = 0; i  jsonutil.tobean((jsonobject) o, xxljobgroup.class))            .collect(collectors.tolist());    return list;}  
我们在后面要根据配置文件中的appname和title判断当前执行器是否已经被注册到调度中心过,如果已经注册过那么则跳过,而/jobgroup/pagelist接口是一个模糊查询接口,所以在查询列表的结果列表中,还需要再进行一次精确匹配。
public boolean preciselycheck() {    list jobgroup = getjobgroup();    optional has = jobgroup.stream()            .filter(xxljobgroup -> xxljobgroup.getappname().equals(appname)                    && xxljobgroup.gettitle().equals(title))            .findany();    return has.ispresent();}  
注册新executor到调度中心:
public boolean autoregistergroup() {    string url=adminaddresses+/jobgroup/save;    httpresponse response = httprequest.post(url)            .form(appname, appname)            .form(title, title)            .cookie(jobloginservice.getcookie())            .execute();    object code = jsonutil.parse(response.body()).getbypath(code);    return code.equals(200);}  
任务接口
创建一个jobinfoservice,根据执行器id,jobhandler名称查询任务列表,和上面一样,也是模糊查询:
public list getjobinfo(integer jobgroupid,string executorhandler) {    string url=adminaddresses+/jobinfo/pagelist;    httpresponse response = httprequest.post(url)            .form(jobgroup, jobgroupid)            .form(executorhandler, executorhandler)            .form(triggerstatus, -1)            .cookie(jobloginservice.getcookie())            .execute();    string body = response.body();    jsonarray array = jsonutil.parse(body).getbypath(data, jsonarray.class);    list list = array.stream()            .map(o -> jsonutil.tobean((jsonobject) o, xxljobinfo.class))            .collect(collectors.tolist());    return list;}  
注册一个新任务,最终返回创建的新任务的id:
public integer addjobinfo(xxljobinfo xxljobinfo) {    string url=adminaddresses+/jobinfo/add;    map parammap = beanutil.beantomap(xxljobinfo);    httpresponse response = httprequest.post(url)            .form(parammap)            .cookie(jobloginservice.getcookie())            .execute();    json json = jsonutil.parse(response.body());    object code = json.getbypath(code);    if (code.equals(200)){        return convert.toint(json.getbypath(content));    }    throw new runtimeexception(add jobinfo error!);}  
2、创建新注解
在创建任务时,必填字段除了执行器和jobhandler之外,还有任务描述 、负责人 、cron表达式 、调度类型 、运行模式 。在这里,我们默认调度类型为cron、运行模式为bean,另外的3个字段的信息需要用户指定。
因此我们需要创建一个新注解@xxlregister,来配合原生的@xxljob注解进行使用,填写这几个字段的信息:
@target({elementtype.method})@retention(retentionpolicy.runtime)public @interface xxlregister {    string cron();    string jobdesc() default default jobdesc;    string author() default default author;    int triggerstatus() default 0;}  
最后,额外添加了一个triggerstatus属性,表示任务的默认调度状态,0为停止状态,1为运行状态。
3、自动注册核心
基本准备工作做完后,下面实现自动注册执行器和jobhandler的核心代码。核心类实现applicationlistener接口,在接收到applicationreadyevent事件后开始执行自动注册逻辑。
@componentpublic class xxljobautoregister implements applicationlistener,         applicationcontextaware {    private static final log log =logfactory.get();    private applicationcontext applicationcontext;    @autowired    private jobgroupservice jobgroupservice;    @autowired    private jobinfoservice jobinfoservice;    @override    public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception {        this.applicationcontext=applicationcontext;    }    @override    public void onapplicationevent(applicationreadyevent event) {        addjobgroup();//注册执行器        addjobinfo();//注册任务    }}  
自动注册执行器的代码非常简单,根据配置文件中的appname和title精确匹配查看调度中心是否已有执行器被注册过了,如果存在则跳过,不存在则新注册一个:
private void addjobgroup() {    if (jobgroupservice.preciselycheck())        return;    if(jobgroupservice.autoregistergroup())        log.info(auto register xxl-job group success!);}  
自动注册任务的逻辑则相对复杂一些,需要完成:
通过applicationcontext拿到spring容器中的所有bean,再拿到这些bean中所有添加了@xxljob注解的方法
对上面获取到的方法进行检查,是否添加了我们自定义的@xxlregister注解,如果没有则跳过,不进行自动注册
对同时添加了@xxljob和@xxlregister的方法,通过执行器id和jobhandler的值判断是否已经在调度中心注册过了,如果已存在则跳过
对于满足注解条件且没有注册过的jobhandler,调用接口注册到调度中心
具体代码如下:
private void addjobinfo() {    list jobgroups = jobgroupservice.getjobgroup();    xxljobgroup xxljobgroup = jobgroups.get(0);    string[] beandefinitionnames = applicationcontext.getbeannamesfortype(object.class, false, true);    for (string beandefinitionname : beandefinitionnames) {        object bean = applicationcontext.getbean(beandefinitionname);        map annotatedmethods  = methodintrospector.selectmethods(bean.getclass(),                new methodintrospector.metadatalookup() {                    @override                    public xxljob inspect(method method) {                        return annotatedelementutils.findmergedannotation(method, xxljob.class);                    }                });        for (map.entry methodxxljobentry : annotatedmethods.entryset()) {            method executemethod = methodxxljobentry.getkey();            xxljob xxljob = methodxxljobentry.getvalue();            //自动注册            if (executemethod.isannotationpresent(xxlregister.class)) {                xxlregister xxlregister = executemethod.getannotation(xxlregister.class);                list jobinfo = jobinfoservice.getjobinfo(xxljobgroup.getid(), xxljob.value());                if (!jobinfo.isempty()){                    //因为是模糊查询,需要再判断一次                    optional first = jobinfo.stream()                            .filter(xxljobinfo -> xxljobinfo.getexecutorhandler().equals(xxljob.value()))                            .findfirst();                    if (first.ispresent())                        continue;                }                xxljobinfo xxljobinfo = createxxljobinfo(xxljobgroup, xxljob, xxlregister);                integer jobinfoid = jobinfoservice.addjobinfo(xxljobinfo);            }        }    }}  
4、自动装配
创建一个配置类,用于扫描bean:
@configuration@componentscan(basepackages = com.xxl.job.plus.executor)public class xxljobplusconfig {}  
将它添加到meta-inf/spring.factories文件:
org.springframework.boot.autoconfigure.enableautoconfiguration=  com.xxl.job.plus.executor.config.xxljobplusconfig  
到这里starter的编写就完成了,可以通过maven发布jar包到本地或者私服:
mvn clean install/deploy  
基于 spring cloud alibaba + gateway + nacos + rocketmq + vue & element 实现的后台管理系统 + 用户小程序,支持 rbac 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://github.com/yunaiv/yudao-cloud
视频教程:https://doc.iocoder.cn/video/
测试
新建一个springboot项目,引入我们在上面打好的包:
com.cn.hydra    xxljob-autoregister-spring-boot-starter    0.0.1  
在application.properties中配置xxl-job的信息,首先是原生的配置内容:
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-adminxxl.job.accesstoken=default_tokenxxl.job.executor.appname=xxl-job-executor-testxxl.job.executor.address=xxl.job.executor.ip=127.0.0.1xxl.job.executor.port=9999xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandlerxxl.job.executor.logretentiondays=30  
此外还要额外添加我们自己的starter要求的新配置内容:
# admin用户名xxl.job.admin.username=admin# admin 密码xxl.job.admin.password=123456# 执行器名称xxl.job.executor.title=test-title  
完成后在代码中配置一下xxljobspringexecutor,然后在测试接口上添加原生@xxljob注解和我们自定义的@xxlregister注解:
@xxljob(value = testjob)@xxlregister(cron = 0 0 0 * * ? *,        author = hydra,        jobdesc = 测试job)public void testjob(){    system.out.println(#码农参上);}@xxljob(value = testjob222)@xxlregister(cron = 59 1-2 0 * * ?,        triggerstatus = 1)public void testjob2(){    system.out.println(#作者:hydra);}@xxljob(value = testjob444)@xxlregister(cron = 59 59 23 * * ?)public void testjob4(){    system.out.println(hello xxl job);}  
启动项目,可以看到执行器自动注册成功:
再打开调度中心的任务管理页面,可以看到同时添加了两个注解的任务也已经自动完成了注册:
从页面上手动执行任务进行测试,可以执行成功:
到这里,starter的编写和测试过程就算基本完成了,项目中引入后,以后也能省出更多的时间来摸鱼学习了~


2009年法兰克福(莫斯科)国际汽配展
二极管用作收音机全波检波
介绍一种用于人体健康监测的全自动可穿戴腿部运动传感系统
对话CEO:用高性价比AI视觉检测系统做客户坚盾,迎光伏行业新洗牌
RFID智慧仓库管理系统的作用是什么
xxl-job任务调度中间件解决定时任务的调度问题
如何在手机应用的高通平台上使用TAS2560
如何在FPGA中实现系统设计
AMD京东巅峰对决 7nm锐龙笔记本新品来袭
氢燃料电池寿命有多长_氢燃料电池汽车优缺点
全球首创!高分五号搭载的全谱段高光谱卫星对大气和陆地进行综合观测
【节能学院】浅谈电能质量问题与滤波器的发展
用新颖的物理设计制造更好的电池
AGMX2手机评测 一款应运而生的产品
Intel大连公司破记录 QLC 3D NAND存储芯片产量突破1000万
BK3296,蓝牙音频soc单芯片应用介绍
多元共进|2023 Google 谷歌开发者大会回看视频陆续上线
Altium Designer对象智能粘贴功能详解
电磁炉故障判别方法_电磁炉七大常见故障与维修方法
长虹率先提出基于终端的物联网用户运营,为用户提供零干扰服务