皮皮网

【线程池迁移源码】【数据挖掘源码大全】【多空波段源码】lJobHandler源码

来源:亳州麻将圈源码 时间:2024-12-23 20:15:51

1.一文带你搞懂xxl-job(分布式任务调度平台)
2.Springboot项目整合xxl -job
3.10. xxl-job 分布式任务调度
4.SpringBoot+XXL-JOB:高效定时任务管理
5.分布式任务调度平台xxl-job
6.JobScheduler的使用和原理

lJobHandler源码

一文带你搞懂xxl-job(分布式任务调度平台)

       本篇文章主要记录项目中遇到的 xxl-job 的实战,希望能通过这篇文章告诉读者们什么是 xxl-job 以及怎么使用 xxl-job 并分享一个实战案例。

       xxl-job 是一个分布式任务调度平台,其核心设计目标是开发迅速、学习简单、轻量级、线程池迁移源码易扩展。设计思想是将调度行为抽象形成 调度中心 平台,平台本身不承担业务逻辑,而是负责发起 调度请求 后,由 执行器 接收调度请求并执行 任务,这里的 任务 抽象为 分散的 JobHandler。通过这种方式即可实现 调度 与 任务 相互解耦,数据挖掘源码大全从而提高系统整体的稳定性和拓展性。

       任务调度指的是系统在约定的指定时间自动去执行指定的任务的过程。在开发项目时大家可能遇到过类似的场景问题,如系统需要定时在每天0点进行数据备份、活动开始前几小时预热执行一些前置业务、定时对 MQ 消息表的发送装填等。这些场景问题都可以通过任务调度 来解决。

       单体系统 中有许多实现 任务调度 的方式,如多线程方式、Timer 类、Spring Tasks 等等。这里比较常用的多空波段源码是 Spring Tasks(通过 @EnableScheduling + @Scheduled 的注解可以自定义定时任务,有兴趣的可以去了解一下)。

       在分布式系统下,每个服务都可以搭建为集群,这样的好处是可以将任务切片分给每一个服务从而实现并行执行,提高任务调度的处理效率。那么为什么分布式系统 不能使用 单体系统 的任务调度实现方式呢?在集群服务下,如果还是使用每台机器按照单体系统的任务调度实现方式实现的话,会出现下面这四个问题:怎么做到对任务的控制(如何避免任务重复执行)、如果某台机器宕机了,会不会存在任务丢失、如果要增加服务实例,怎么做到弹性扩容、阿奇源码资源如何做到对任务调度的执行情况统一监测。通过上面的问题可以了解到分布式系统下需要一个满足高可用、容错管理、负载均衡等功能的任务调度平台来实现任务调度。

       xxl-job 分布式任务调度系统是一个开源软件,可以在 github 或 gitee 上查看和下载 xxl-job 的源码。在 docker 下安装 xxl-job、创建映射容器的文件目录、在/mydata/xxl-job 的目录下创建 application.properties 文件、导入 tables_xxl-job.sql 文件到指定的数据库、配置参数如数据库位置、访问口令等。梅州陀螺世界源码

       在 Spring Boot 项目中,导入 xxl-job 的 maven 依赖,配置application.yml 文件指定调度中心地址、访问口令、执行器名称和端口等属性,编写配置类配置自定义任务和执行器,完成 SpringBoot 集成 xxl-job 实现分布式任务调度的全过程。

       实战案例:当前项目需要对上传到分布式文件系统 minio 中的视频文件进行统一格式的视频转码操作。利用 xxl-job 的方式以任务调度的方式定时处理视频转码操作,以任务调度的方式,可以使得视频转码操作不会阻塞主线程,避免影响主要业务的吞吐量;以集群服务分片接收任务的方式,可以将任务均分给每个机器使得任务调度可以并行执行,提高总任务处理时间以及降低单台机器 CPU 的开销。

       xxl-job 执行流程图:在集群部署时,配置路由策略中选择分片广播的方式,可以使一次任务调度会广播触发集群中所有的执行器执行一次任务,并且可以向系统传递分片参数。利用这一特性可以根据当前执行器的分片序号和分片总数来获取对应的任务记录。通过 Bean 模式(基于方法)获取分片序号和分片总数,编写 sql 获取任务记录,实现对集群服务均分任务的操作。

       确保任务不会被重复消费:通过幂等性实现,依靠任务的状态(未处理1;处理中2;处理失败3;处理成功4)通过比较和设置的方式只有在状态为未处理或处理失败时才能设置为处理中,避免多个执行器同时处理该任务。设置调度过期策略和阻塞处理策略保证真正的幂等性。

       编写完成所有任务:分片视频转码处理,通过分片广播拿到的参数以取模的方式获取当前执行器所属的任务记录集合,遍历集合并发执行任务,使用乐观锁抢占当前任务,执行任务过程包含分布式文件系统下载、视频转码、上传转码后的视频、更新任务状态(处理成功),使用 JUC 工具类 CountDownLatch 实现所有任务执行完后才退出方法,中间使用 xxl-job 的日志记录错误信息和执行结果。

       清理任务表中转码成功的任务的记录并将其插入任务历史表,视频补偿机制处理任务超时情况下的任务,做出补偿,处理失败次数大于3次的任务,做出补偿。测试并查看日志,准备好的任务表记录,启动三台媒资服务器,并开启任务,可以单独查看每个任务的日志,通过日志中的执行日志查看具体日志信息,可以看到直接为了测试改错的路径导致下载视频出错,查看数据库表的变化,核心的视频转码任务执行成功,并且逻辑正确,能够起到分布式任务调度的作用。

Springboot项目整合xxl -job

       搭建并启动xxl-job服务:

       前往github下载源码,选择与springboot版本匹配的分支,执行相关SQL至数据库。若在创建xxl_job_registry表时遇到长度限制错误,需调整索引长度或替换。

       配置数据库连接信息至application.properties文件,确保指定服务端口与上下文名称。

       启动项目,或打包成jar文件。

       访问后台管理页面,地址为/post/

JobScheduler的使用和原理

        JobScheduler主要用于在未来某个时间下满足一定条件时触发执行某项任务的情况,涉及的条件可以是网络、电量、时间等,例如执行特定的网络、是否只在充电时执行任务等。

        JobScheduler类负责将应用需要执行的任务发送给框架,以备对该应用Job的调度,是一个系统服务,可以通过如下方式获取:

        JobInfo是传递给JobScheduler类的数据容器,它封装了针对调用应用程序调度任务所需的各种约束,也可以认为一个JobInfo对象对应一个任务,JobInfo对象通过JobInfo.Builder创建。它将作为参数传递给JobScheduler:

        JobInfo.Builder是JobInfo的一个内部类,用来创建JobInfo的Builder类。

        JobService是JobScheduler最终回调的端点,JobScheduler将会回调该类中的onStartJob()开始执行异步任务。它是一个继承于JobService的抽象类,做为系统回调执行任务内容的终端,JobScheduler框架将通过bindService()方式来启动该服务。因此,用户必须在应用程序中创建一个JobService的子类,并实现其onStartJob()等回调方法,以及在AndroidManifest.xml中对它授予如下权限:

        注意在AndroidManifest.xml中添加权限

        当任务开始时会执行onStartJob(JobParameters params)方法,如果返回值是false,则系统认为这个方法返回时,任务已经执行完毕。如果返回值是true,那么系统认为这个任务正要被执行,执行任务的重担就落在了你的肩上。当任务执行完毕时你需要调用jobFinished(JobParameters params, boolean needsRescheduled)来通知系统。

        当系统接收到一个取消请求时,系统会调用onStopJob(JobParameters params)方法取消正在等待执行的任务。很重要的一点是如果onStartJob(JobParameters params)返回false,那么系统假定在接收到一个取消请求时已经没有正在运行的任务。换句话说,onStopJob(JobParameters params)在这种情况下不会被调用。

        需要注意的是这个Job Service运行在主线程,这意味着你需要使用子线程,handler,或者一个异步任务来运行耗时的操作以防止阻塞主线程。

        Google官方的Sample: /googlearchive/android-JobScheduler

        JobScheduler是一个抽象类,它在系统框架的实现类是android.app.JobSchedulerImpl

        执行的入口是JobScheduler.scheduler,其实是调了JobSchedulerImpl中的schedule方法;然后再调了mBinder.schedule(job)。这个mBinder就是JobSchedulerService,通过Binder跨进程调用JobSchedulerService。

        最后调用到JobSchedulerService中的schedule方法:

        接着发送MSG_CHECK_JOB消息,消息处理的地方是

        接着执行JobHandler中的 maybeRunPendingJobsH 方法,处理相应的任务

        availableContext是JobServiceContext,即ServiceConnection,这个是进程间通讯ServiceConnection,通过调用availableContext.executeRunnableJob(nextPending)方法,会触发调用onServiceConnected,看到这里应该明白了,onServiceConnected方法中的service就是Jobservice,里面还用了WakeLock锁,防止手机休眠。

        接着,通过Handler发消息,调用了handleServiceBoundH()方法。

        从上面源码可以看出,最终是触发调用了JobService中的startJob方法。

        从源码看,设置的内容应用于 JobStatus ,例如网络限制

        而在JobSchedulerService类,相关的状态控制在其构造函数里:

        例如网络控制类ConnectivityControllerç±»

        当网络发生改变时,会调用updateTrackedJobs(userid)方法,在updateTrackedJobs方法中,会判断网络是否有改变,有改变的会调mStateChangedListener.onControllerStateChanged()方法;然后调用了JobSchedulerService类中onControllerStateChanged方法:

        接着也是处理MSG_CHECK_JOB 消息,和上文一样,最终触发调用了JobService中的startJob方法。

        JobSchedulerService是一个系统服务,即应该在SystemServer启动的。阅读SystemServer的源码:

        run 方法如下:

        接着看 startOtherServices()

        因此,在这里就启动了JobSchedulerService服务。

        1. android 性能优化JobScheduler使用及源码分析

        2. Android 9.0 JobScheduler(一) JobScheduler的使用

        3. Android 9.0 JobScheduler(二) JobScheduler框架结构简述及JobSchedulerService的启动

        4. Android 9.0 JobScheduler(三) 从Job的创建到执行

        5. Android 9.0 JobScheduler(四) Job约束条件的控制

        6. 理解JobScheduler机制