欢迎访问皮皮网官网
皮皮网

【快手名片源码】【问答库app源码】【福利图app源码】synchronousqueue源码

时间:2025-01-19 23:12:30 分类:知识 来源:weixin电影源码

1.java线程池(一):java线程池基本使用及Executors
2.还不了解Java的源码5大BlockingQueue阻塞队列源码,看这篇文章就够了
3.可动态配置的源码Schedule设计
4.线程池newCachedThreadPool
5.如何评价synchronousqueue?

synchronousqueue源码

java线程池(一):java线程池基本使用及Executors

       @[toc] 在前面学习线程组的时候就提到过线程池。实际上线程组在我们的源码日常工作中已经不太会用到,但是源码线程池恰恰相反,是源码我们日常工作中必不可少的工具之一。现在开始对线程池的源码快手名片源码使用,以及底层ThreadPoolExecutor的源码源码进行分析。

1.为什么需要线程池

       我们在前面对线程基础以及线程的源码生命周期有过详细介绍。一个基本的源码常识就是,线程是源码一个特殊的对象,其底层是源码依赖于JVM的native方法,在jvm虚拟机内部实现的源码。线程与普通对象不一样的源码地方在于,除了需要在堆上分配对象之外,源码还需要给每个线程分配一个线程栈、源码以及本地方法栈、程序计数器等线程的私有空间。线程的初始化工作相对于线程执行的大多数任务而言,都是一个耗时比较长的工作。这与数据库使用一样。有时候我们连接数据库,仅仅只是为了执行一条很小的sql语句。但是在我们日常的开发工作中,我们的绝大部分工作内容,都会分解为一个个短小的执行任务来执行。这样才能更加合理的复用资源。这种思想就与我们之前提到的协程一样。任务要尽可能的小。但是在java中,任务不可能像协程那样拆分得那么细。那么试想,如果说,有一个已经初始化好的很多线程,在随时待命,那么当我们有任务提交的时候,这些线程就可以立即工作,无缝接管我们的任务请求。那么效率就会大大增加。这些个线程可以处理任何任务。这样一来我们就把实际的任务与线程本身进行了解耦。从而将这些线程实现了复用。 这种复用的一次创建,可以重复使用的池化的线程对象就被成为线程池。 在线程池中,我们的线程是可以复用的,不用每次都创建一个新的线程。减少了创建和销毁线程的时间开销。 同时,线程池还具有队列缓冲策略,拒绝机制和动态线程管理。可以实现线程环境的问答库app源码隔离。当一个线程有问题的时候,也不会对其他的线程造成影响。 以上就是我们使用线程池的原因。一句话来概括就是资源复用,降低开销。

2.java中线程池的实现

       在java中,线程池的主要接口是Executor和ExecutorService在这两个接口中分别对线程池的行为进行了约束,最主要的是在ExecutorService。之后,线程池的实际实现类是AbstractExecutorService类。这个类有三个主要的实现类,ThreadpoolExecutorService、ForkJoinPool以及DelegatedExecutorService。

       后面我们将对这三种最主要的实现类的源码以及实现机制进行分析。

3.创建线程的工厂方法Executors

       在java中, 已经给我们提供了创建线程池的工厂方法类Executors。通过这个类以静态方法的模式可以为我们创建大多数线程池。Executors提供了5种创建线程池的方式,我们先来看看这个类提供的工厂方法。

3.1 newFixedThreadPool/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue.At any point, at most * { @code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks.The threads in the pool will exist * until it is explicitly { @link ExecutorService#shutdown shutdown}. * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if { @code nThreads <= 0} */public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}

       这个方法能够创建一个固定线程数量的无界队列的线程池。参数nthreads是最多可同时处理的活动的线程数。如果在所有线程都在处理任务的情况下,提交了其他的任务,那么这些任务将处于等待队列中。直到有一个线程可用为止。如果任何线程在关闭之前的执行过程中,由于失败而终止,则需要在执行后续任务的时候,创建一个新的线程来替换。线程池中的所有线程都将一直存在,直到显示的调用了shutdown方法。 上述方法能创建一个固定线程数量的线程池。内部默认的是使用LinkedBlockingQueue。但是需要注意的是,这个LinkedBlockingQueue底层是链表结构,其允许的最大队列长度为Integer.MAX_VALUE。

public LinkedBlockingQueue() { this(Integer.MAX_VALUE);}

       这样在使用的过程中如果我们没有很好的控制,那么就可能导致内存溢出,出现OOM异常。因此这种方式实际上已经不被提倡。我们在使用的过程中应该谨慎使用。 newFixedThreadPool(int nThreads, ThreadFactory threadFactory)方法:

/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue, using the provided * ThreadFactory to create new threads when needed.At any point, * at most { @code nThreads} threads will be active processing * tasks.If additional tasks are submitted when all threads are * active, they will wait in the queue until a thread is * available.If any thread terminates due to a failure during * execution prior to shutdown, a new one will take its place if * needed to execute subsequent tasks.The threads in the pool will * exist until it is explicitly { @link ExecutorService#shutdown * shutdown}. * * @param nThreads the number of threads in the pool * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null * @throws IllegalArgumentException if { @code nThreads <= 0} */public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory);}

       这个方法与3.1中newFixedThreadPool(int nThreads)的方法的唯一区别就是,增加了threadFactory参数。在前面方法中,对于线程的创建是采用的默认实现Executors.defaultThreadFactory()。而在此方法中,可以根据需要自行定制。

3.2 newSingleThreadExecutor/** * Creates an Executor that uses a single worker thread operating * off an unbounded queue. (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.)Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * { @code newFixedThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * * @return the newly created single-threaded Executor */public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}

       此方法将会创建指有一个线程和一个无届队列的线程池。需要注意的是,如果这个执行线程在执行过程中由于失败而终止,福利图app源码那么需要在执行后续任务的时候,用一个新的线程来替换。 那么这样一来,上述线程池就能确保任务的顺序性,并且在任何时间都不会有多个线程处于活动状态。与newFixedThreadPool(1)不同的是,使用newSingleThreadExecutor返回的ExecutorService不能被重新分配线程数量。而使用newFixExecutor(1)返回的ExecutorService,其活动的线程的数量可以重新分配。后面专门对这个问题进行详细分析。 newSingleThreadExecutor(ThreadFactory threadFactory) 方法:

/** * Creates an Executor that uses a single worker thread operating * off an unbounded queue, and uses the provided ThreadFactory to * create a new thread when needed. Unlike the otherwise * equivalent { @code newFixedThreadPool(1, threadFactory)} the * returned executor is guaranteed not to be reconfigurable to use * additional threads. * * @param threadFactory the factory to use when creating new * threads * * @return the newly created single-threaded Executor * @throws NullPointerException if threadFactory is null */public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),threadFactory));}

       这个方法与3.3中newSingleThreadExecutor的区别就在于增加了一个threadFactory。可以自定义创建线程的方法。

3.3 newCachedThreadPool/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available.These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to { @code execute} will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using { @link ThreadPoolExecutor} constructors. * * @return the newly created thread pool */public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE,L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}

       这个方法用来创建一个线程池,该线程池可以根据需要自动增加线程。以前的线程也可以复用。这个线程池通常可以提高很多执行周期短的异步任务的性能。对于execute将重用以前的构造线程。如果没有可用的线程,就创建一个 新的线程添加到pool中。秒内,如果该线程没有被使用,则该线程将会终止,并从缓存中删除。因此,在足够长的时间内,这个线程池不会消耗任何资源。可以使用ThreadPoolExecutor构造函数创建具有类似属性但是详细信息不同的线程池。 ?需要注意的是,这个方法创建的线程池,虽然队列的长度可控,但是线程的数量的范围是Integer.MAX_VALUE。这样的话,如果使用不当,同样存在OOM的风险。比如说,我们使用的每个任务的耗时比较长,任务的请求又非常快,那么这样势必会造成在单位时间内创建了大量的线程。从而造成内存溢出。 newCachedThreadPool(ThreadFactory threadFactory)方法:

/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available, and uses the provided * ThreadFactory to create new threads when needed. * @param threadFactory the factory to use when creating new threads * @return the newly created thread pool * @throws NullPointerException if threadFactory is null */public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE,L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);}

       这个方法区别同样也是在于,增加了threadFactory可以自行指定线程的创建方式。

2.4 newScheduledThreadPool/** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @return a newly created scheduled thread pool * @throws IllegalArgumentException if { @code corePoolSize < 0} */public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize);}

       创建一个线程池,该线程池可以将任务在指定的延迟时间之后运行。或者定期运行。这个方法返回的是ScheduledThreadPoolExecutor。这个类是ThreadPoolExecutor的子类。在原有线程池的的基础之上,增加了延迟和定时功能。我们在后面分析了ThreadPoolExecutor源码之后,再来分析这个类的贪婪指数公式源码源码。 与之类似的方法:

/** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @param threadFactory the factory to use when the executor * creates a new thread * @return a newly created scheduled thread pool * @throws IllegalArgumentException if { @code corePoolSize < 0} * @throws NullPointerException if threadFactory is null */public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);}

       通过这个方法,我们可以指定threadFactory。自定义线程创建的方式。 同样,我们还可以只指定一个线程:

public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1));}public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService(new ScheduledThreadPoolExecutor(1, threadFactory));}

       上述两个方法都可以实现这个功能,但是需要注意的是,这两个方法的返回在外层包裹了一个包装类。

3.5 newWorkStealingPool

       这种方式是在jdk1.8之后新增的。我们先来看看其源码:

public LinkedBlockingQueue() { this(Integer.MAX_VALUE);}0

       这个方法实际上返回的是ForkJoinPool。该方法创建了一

还不了解Java的5大BlockingQueue阻塞队列源码,看这篇文章就够了

       引言

       本文将详细解读Java中常见的5种BlockingQueue阻塞队列,包括它们的优缺点、区别以及典型应用场景,以帮助深入理解这5种队列的独特性质和使用场合。

       常见的BlockingQueue有以下5种:

       1. **基于数组实现的阻塞队列**:创建时需指定容量大小,是有限队列。

       2. **基于链表实现的阻塞队列**:默认无界,可自定义容量。

       3. **无缓冲阻塞队列**:生产的数据需立即被消费,无缓冲。

       4. **优先级阻塞队列**:支持元素按照大小排序,无界。

       5. **延迟阻塞队列**:基于PriorityQueue实现,无界。

       **BlockingQueue简介

**

       BlockingQueue作为接口,定义了放数据和取数据的多组方法,适用于并发多线程环境,特别适合生产者-消费者模式。

       **应用场景

**

       BlockingQueue的作用类似于消息队列,用于解耦、异步处理和削峰,适用于线程池的核心功能实现。

       **区别与比较

**

       - **ArrayBlockingQueue**:基于数组实现,容量可自定义。

       - **LinkedBlockingQueue**:基于链表实现,无界或自定义容量。

       - **SynchronousQueue**:同步队列,生产者和消费者直接交互,无需缓冲。

       - **PriorityBlockingQueue**:实现优先级排序,无界队列。

       - **DelayQueue**:本地延迟队列,支持元素延迟执行。

       在选择使用哪种队列时,需考虑具体任务的特性、吞吐量需求以及是否需要优先级排序或延迟执行。

       本文旨在提供全面理解Java中BlockingQueue的指南,从源码剖析到应用场景,帮助开发者更好地应用这些工具于实际项目中。

可动态配置的OBV合集指标源码Schedule设计

       1.背景

       定时任务是实际开发中常见的一类功能,例如每天早上凌晨对前一天的注册用户数量、渠道来源进行统计,并以邮件报表的方式发送给相关人员。相信这样的需求,每个开发伙伴都处理过。

       你可以使用Linux的Crontab启动应用程序进行处理,或者直接使用Spring的Schedule对任务进行调度,还可以使用分布式调度系统,如果xxl-job等。相信你已经轻车熟路、习以为常。直到有一天你接到了一个新需求:

       1.新建一组任务,周期性的执行指定SQL并将结果以邮件的方式发送给特定人群;2.比较方便的对任务进行管理,比如启动、停止,修改调度周期等;3.动态添加、移除任务,不需要频繁的修改、发布程序;

       停顿几分钟,简单思考一下,有哪几种实现思路呢?

       本篇文章将从以下几部分进行讨论:

       1.SpringSchedule配置和使用。首先我们将介绍Demo的骨架,并基于Spring-Boot完成Schedule的配置;2.数据库定时轮询方案。使用SpringSchedule定时轮询数据库,并执行相应任务。在执行任务策略中,我们将尝试同步和异步执行两种方案,并对其优缺点进行分析;3.基于TaskScheduler动态配置方案。基于数据库轮询或配置中心两种方案动态的对SpringTaskScheduler进行配置,以实现动态管理任务的目的;4.我们进入分布式环境,利用多个冗余节点解决系统高可用问题,同时使用分布式锁保障只会有一个任务同时执行;

2.SpringSchedule

       SpringBoot上的Schedule的使用非常简单,无需增加新的依赖,只需简单配置即可。

       1.使用@EnableScheduling启用Schedule;2.在要调度的方法上增加@Scheduled;

       首先,我们需要在启动类上添加@EnableScheduling注解,该注解将启用SchedulingConfiguration配置类帮我们完成最基本的配置。

@SpringBootApplication@EnableSchedulingpublicclassConfigurableScheduleDemoApplication{ publicstaticvoidmain(String[]args){ SpringApplication.run(ConfigurableScheduleDemoApplication.class,args);}}

       启用Schedule配置之后,在需要被调度的方法上增加@Scheduled注解。

@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}

       runTask任务延迟1s进行初始化,并以5s为间隔进行调度。

       Scheduled注解类的详细配置如下:

配置含义样例cronlinuxcrontab表达式@Scheduled(cron="*/5****MON-FRI")工作日,每5s调度一次fixedDelay固定间隔,上次运行结束,与下次启动运行,相隔固定时长@Scheduled(fixedDelay=)运行结束后,5S后启动一次调度fixedDelayString与fixedDelay一致fixedRate固定周期,前后两次运行相隔固定的时长@Scheduled(fixedRate=)前后两个任务,间隔5秒fixedRateString与fixedRate一致initialDelay第一次执行,间隔时间@Scheduled(initialDelay=,fixedRate=)第一次执行,延时1秒,以后以5秒为周期进行调度initialDelayString与initialDelay一致

       环境搭建完成,让我们开始第一个方案。

3.数据库定时轮询

       使用数据库来管理任务,通过轮询的方案,进行动态调度。首先,我们看下最简单的方案:串行执行方案。

3.1.串行执行方案

       整体思路非常简单,流程如下:

       主要分如下几步:

       1.在应用中启动一个Schedule任务(每1秒调度一次),定时从数据库中获取待执行的任务(状态为可用,下一次执行时间小于当前时间);2.根据数据库的任务配置信息,依次遍历并执行任务;3.任务执行完成后,经过计算获得下一次调度时间,将其写回到数据库;4.等待下一次任务调度。

       核心代码如下:

@Scheduled(fixedDelay=,initialDelay=)publicvoidloadAndRunTask(){ Datenow=newDate();//加载需要运行的任务://1.状态为ENABLE//2.下一次运行时间小于当前时间List<TaskDefinitionV2>shouldRunTasks=loadShouldRunTasks(now);//依次遍历待运行任务,执行对于的任务for(TaskDefinitionV2task:shouldRunTasks){ //DoubleCheckif(task.shouldRun(now)){ //执行任务runTask(task);//更新任务的下一次运行时间updateNextRunTime(task,now);}}}

       方案简单但非常有效,那该方案存在哪些问题呢?最主要的问题就是:任务串行执行,会导致后面任务出现延时运行;同时,下一轮检查也会被delay。

       例如,依次加载了待执行任务task1、task2、task3。其中task1耗时5秒,task2耗时5秒,task3耗时1秒,由于三个任务串行执行,task2将延时5秒,task3延时秒;下一轮检查距上次启动相差秒。

       究其根本,核心问题是调度线程和运行线程是同一个线程,调度的运行和任务的运行相互影响。

       让我们看一个改进方案:并行执行方案。

3.2.并行执行方案

       整体执行流程如下:

       相比之前的方案,新方案引入了线程池,每一个任务对应一个线程池,避免任务间的相互影响;任务在线程池中异步处理,避免了调度线程的延时。具体流程如下:

       1.步骤一不变,在应用中启动一个Schedule任务(每1秒调度一次),定时从数据库中获取待执行的任务(状态为可用,下一次执行时间小于当前时间);2.依次遍历任务,将任务提交到专有线程池中异步执行,调度线程直接返回;3.任务在线程池中运行,结束后更新下一次的运行时间;4.调度线程重新从数据库中获取待执行任务,在将任务提交至线程池中,如果有任务正在执行,使用线程池拒绝策略,抛弃最老的任务;

       核心代码如下:

       Spring调度任务,每1秒运行一次:

@Scheduled(fixedDelay=,initialDelay=)publicvoidloadAndRunTask(){ Datenow=newDate();//加载所有待运行的任务//1.状态为ENABLE//2.下一次运行时间小于当前时间List<TaskDefinitionV2>shouldRunTasks=loadShouldRunTasks(now);//遍历待运行任务for(TaskDefinitionV2task:shouldRunTasks){ //1.根据TaskId获取任务对应的线程池//2.将任务提交至线程池中this.executorServiceForTask(task.getId()).submit(newTaskRunner(task.getId()));}}

       自定义线程池,每个线程池最多只有一个线程,空闲超过秒后,线程自动回收,线程饱和时,直接丢弃最老的任务:

privateExecutorServiceexecutorServiceForTask(LongtaskId){ returnthis.executorServiceRegistry.computeIfAbsent(taskId,id->{ BasicThreadFactorythreadFactory=newBasicThreadFactory.Builder()//指定线程池名称.namingPattern("Async-Task-"+taskId+"-Thread-%d")//设置线程为后台线程.daemon(true).build();//线程池核心配置://1.每个线程池最多只有一个线程//2.线程空闲超过秒进行自动回收//3.直接使用交互器,线程空闲进行任务交互//4.使用指定的线程工厂,设置线性名称//5.线程池饱和,自动丢弃最老的任务returnnewThreadPoolExecutor(0,1,L,TimeUnit.SECONDS,newSynchronousQueue<>(),threadFactory,newThreadPoolExecutor.DiscardOldestPolicy());});}

       最后,在线程池中运行的Task如下:

privateclassTaskRunnerimplementsRunnable{ privatefinalDatenow=newDate();privatefinalLongtaskId;publicTaskRunner(LongtaskId){ this.taskId=taskId;}@Overridepublicvoidrun(){ //重新加载任务,保持最新的任务状态TaskDefinitionV2task=definitionV2Repository.findById(this.taskId).orElse(null);if(task!=null&&task.shouldRun(now)){ //运行任务runTask(task);//更新任务的下一次运行时间updateNextRunTime(task,now);}}}4.TaskScheduler配置方案

       该方案的核心为:绕过@Schedule注解,直接对Spring底层核心类TaskScheduler进行配置。

       TaskScheduler接口是Spring对调度任务的一个抽象,更是@Schedule背后默默的支持者,首先我们看下这个接口定义。

publicinterfaceTaskScheduler{ ScheduledFutureschedule(Runnabletask,Triggertrigger);ScheduledFutureschedule(Runnabletask,InstantstartTime);ScheduledFutureschedule(Runnabletask,DatestartTime);ScheduledFuturescheduleAtFixedRate(Runnabletask,InstantstartTime,Durationperiod);ScheduledFuturescheduleAtFixedRate(Runnabletask,DatestartTime,longperiod);ScheduledFuturescheduleAtFixedRate(Runnabletask,Durationperiod);ScheduledFuturescheduleAtFixedRate(Runnabletask,longperiod);ScheduledFuturescheduleWithFixedDelay(Runnabletask,InstantstartTime,Durationdelay);ScheduledFuturescheduleWithFixedDelay(Runnabletask,DatestartTime,longdelay);ScheduledFuturescheduleWithFixedDelay(Runnabletask,Durationdelay);ScheduledFuturescheduleWithFixedDelay(Runnabletask,longdelay);}

       满满的都是schedule接口,其他的比较简单就不过多叙述了,重点说下Trigger这个接口,首先看下这个接口的定义:

publicinterfaceTrigger{ DatenextExecutionTime(TriggerContexttriggerContext);}

       只有一个方法,获取下次执行的时间。在任务执行完成后,会调用Trigger的nextExecutionTime获取下一次运行时间,从而实现周期性调度。

       CronTrigger是Trigger的最常见实现,以linuxcrontab的方式配置调度任务,如:

scheduler.schedule(task,newCronTrigger("-**MON-FRI"));

       基础部分简单介绍到这,让我们看下数据库动态配置方案。

4.1数据库动态配置方案

       整体设计如下:

       仍旧是轮询数据库方式,详细流程如下:

       1.在应用中启动一个Schedule任务(每1秒调度一次),定时从数据库中获取所有任务;2.依次遍历任务,与内存中的TaskEntry(任务与状态)进行比对,动态的向TaskScheduler中添加或取消调度任务;3.由TaskScheduler负责实际的任务调度;

       核心代码如下:

@Scheduled(fixedDelay=,initialDelay=)publicvoidloadAndConfig(){ //加载所有的任务信息List<TaskDefinitionV3>tasks=repository.findAll();//遍历任务进行任务检查for(TaskDefinitionV3task:tasks){ //获取内存任务状态TaskEntrytaskEntry=this.taskEntry.computeIfAbsent(task.getId(),TaskEntry::new);if(task.isEnable()&&taskEntry.isStop()){ //任务为可用,运行状态为停止,则重新进行schedule注册ScheduledFuture<?>scheduledFuture=this.taskScheduler.scheduleWithFixedDelay(newTaskRunner(task),task.getDelay()*);taskEntry.setScheduledFuture(scheduledFuture);log.info("successtostartscheduletaskfor{ }",task);}elseif(task.isDisable()&&taskEntry.isRunning()){ //任务为禁用,运行状态为运行中,停止正在运行在任务taskEntry.stop();log.info("successtostopscheduletaskfor{ }",task);}}}

       核心辅助类:

@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}0

       有没有发现,以上方案都有一个共同的缺陷:基于数据库轮询获取任务,加大了数据库压力。理论上,只有在配置发生变化时才有必要对任务进行更新,接下来让我们看下改进方案:基于配置中心的方案。

4.2配置中心通知方案

       整体设计如下:

       核心流程如下:

       1.应用启动时,从配置中心中获取调度的配置信息,并完成对TaskScheduler的配置;2.当配置发送变化时,配置中心会主动将配置推送到应用程序,应用程序在接收到变化通知时,动态的增加或取消调度任务;3.任务的实际调度仍旧由TaskScheduler完成。

       由于手底下没有配置中心,暂时没有coding,思路很简单,有条件的同学可以自行完成。

5.分布式环境下应用

       以上方案,都是在单机环境下运行,如果应用程序挂掉了,任务调度也就停止了,为了避免这种情况的发生,需要提升系统的可用性,实现冗余部署和自动化容灾。

       以上方案,如果部署多个节点会发生什么?是的,会出现任务被多次调度的问题,为了保障在同一时刻只有一个任务在运行,需要为任务增加一个排他锁。同时,由于排他锁的存在,当一个节点处问题后,另一个节点在调度时会自动获取锁,从而解系统的单点问题。

       为了简单,我们使用Redis的分布式锁。

5.1.环境搭建

       Redisson是Redis的一个富客户端,提供了很多高级的数据结构。本次,我们将使用RLock对应用进行保护。

       首先,在pom中引入RedissonStarter。

@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}1

       然后,在application.properties文件中增加Redis配置,具体如下:

@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}.2引入分布式锁

       最后,就可以直接使用分布式锁对任务执行进行保护了,代码如下:

@ServicepublicclassSpringScheduleService{ @AutowiredprivateTaskServicetaskService;@Scheduled(fixedDelay=5*,initialDelay=)publicvoidrunTask(){ TaskConfigtaskConfig=TaskConfig.builder().name("SpringDefaultSchedule").build();this.taskService.runTask(taskConfig);}}3

       备注:

       Redis是典型的AP应用,而分布式锁严格意义上来说是CP。所以基于Redis的分布式锁只能使用在非严格环境中,比如我们的数据报表需求。如果设计金钱,需要使用CP实现,如Zookeeper或etcd等。

6.小结

       本文从Spring的Schedule出发,依次对数据库轮询方案、TaskScheduler配置方案进行详细讲解,以实现对调度任务的可配置化。最后,使用Redis分布式锁有效解决了分布式环境下任务重复调度和自动容灾问题。

       仍旧是那句话,架构设计没有更好,只有最适合。同学们可以根据自己的需求自取。

References

       [1]源码:/litao/books/tree/master/configurable-schedule

线程池newCachedThreadPool

       新线程池newCachedThreadPool的源码揭示了其独特设计和功能。它的核心特点在于动态创建和重用线程,以提高执行短暂异步任务的程序性能。此池允许在先前构造的线程可用时重复使用它们,且最大线程数为Integer.MAX_VALUE,意味着资源使用相对灵活。

       在newCachedThreadPool中,线程的存活时间设置为秒,超过此时间未使用的线程将被终止并从池中移除。这一特性有助于避免资源浪费,保持空闲时间足够长的池不会消耗任何资源。此外,新线程池不包含核心线程,其操作基于SynchronousQueue队列,确保线程间高效同步。

       使用newCachedThreadPool时,程序执行到大约秒后自动终止,因为线程池已完成所有任务。存活线程在超过秒的闲置后被终止和移除,这体现了其设计原理。

       为何newCachedThreadPool选择SynchronousQueue而不是其他线程池通常采用的LinkedBlockQueue?SynchronousQueue是一个特殊的阻塞队列,旨在实现线程间高效同步。它没有内部容量,且插入操作需等待相应的删除操作。此特性使其成为切换设计的理想选择,允许线程在需要时安全地传递信息、事件或任务,尤其适用于需要多线程间同步的应用场景。

       SynchronousQueue通过实现Collection和Iterator接口支持所有可选方法,包括支持可选的公平性策略。默认情况下,不保证生产者和使用者线程的FIFO顺序访问,但通过将公平性策略设置为true,可以确保按此顺序授予访问权限。

       总之,newCachedThreadPool通过动态线程重用和SynchronousQueue的高效同步机制,提供了一种灵活且高效的处理短暂异步任务的方法。其设计旨在优化资源使用,通过在任务完成后的秒内自动清理资源,保持系统性能高效。

如何评价synchronousqueue?

       《解读Java源码专栏》深入解析Java核心组件源码,内容覆盖集合、线程、线程池、并发、队列等,旨在剖析设计思想与实现细节,助你轻松应对工作面试。

       本文聚焦于Java中的SynchronousQueue阻塞队列源码解析,前文已讲解了ArrayBlockingQueue和LinkedBlockingQueue,那么SynchronousQueue又是基于什么实现的呢?答案是:它基于一种特殊的机制,使得生产者与消费者之间的操作同步进行,互不影响。

       SynchronousQueue的用途在于快速执行任务,避免任务积压在队列中。与前文所学的队列不同,它并不具备缓冲功能,而是充当了生产者与消费者之间的桥梁。生产者将元素放入队列后必须等待消费者取走,反之亦然,形成一种同步操作机制。

       作为BlockingQueue接口的实现类,SynchronousQueue提供了放数据与取数据的多种方法,以适应不同场景。其底层机制基于Transferer抽象类,进一步细化为栈与队列两种实现方式。

       本文详细解析了SynchronousQueue的类结构、初始化方法、栈实现与队列实现等关键点,展示了其基于公平策略的不同构造方法。同时,通过源码分析,深入理解了放数据与弹出数据的具体实现逻辑,以及如何通过工具方法调用底层操作。

       放数据方面,包括offer、add、put、offer(e, time, unit)等方法,它们分别根据是否匹配到合适节点而决定插入成功与否,或进行阻塞等待。弹出数据则通过poll、remove、take、poll(time, unit)等方法实现,同样基于匹配节点与否的逻辑进行操作。

       此外,文章还涉及了查看数据的peek与element方法,但由于SynchronousQueue不支持查看数据的操作,故返回null。至此,对SynchronousQueue的源码解析全面展开。

       总结,本文通过详细解读SynchronousQueue的源码,揭示了其作为阻塞队列的独特机制与功能实现。未来文章将继续深入探讨其他阻塞队列的源码解析,敬请期待。

copyright © 2016 powered by 皮皮网   sitemap