Java的并行世界-3.0 线程池与拒绝策略
Jdk并发相关核心类:java.util.concurrent
java.util.concurrent.Executors提供了一些静态工厂方法,用于创建不同类型的源码线程池,例如:
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(5);
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool();
可以通过new ThreadPoolExecutor()方法手动创建线程池,码深该方法需要传入四个参数,度解分别是线程线程析asp成绩统计 源码核心线程数、最大线程数、池的池源线程保活时间和任务队列。源码其中,码深核心线程数和最大线程数是度解必填参数,线程保活时间和任务队列是线程线程析可选参数。
Java中的池的池源Executors共有四种创建方式,这些方式包括使用newFixedThreadPool、源码newCachedThreadPool、码深newSingleThreadExecutor和newScheduledThreadPool。度解在使用这些方法时,可以根据实际需求选择最适合的方式来创建线程池。无论哪种方式,线程池都可以有效地管理和控制线程,提高程序的执行效率。
新FixedThreadPool创建一个固定大小的线程池。
以下是一个Java中创建newFixedThreadPool的代码例子:
新CachedThreadPool创建一个根据需要自动扩展的线程池,线程数根据任务数量动态调整。
以下是一个newCachedThreadPool的Java代码示例:
新SingleThreadExecutor创建一个只有一个线程的线程池。
新ScheduledThreadPool创建一个支持定时任务的线程池。
ForkJoinPool是一个用于执行分而治之任务的线程池,特别适用于递归分解的问题,例如并行归并排序、并行求和等。
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinPool在java.util.concurrent包下,从Java 7开始引入,专门用于处理需要递归分解的任务。它利用工作窃取(Work-Stealing)算法来实现高效的任务调度和线程利用,能够充分利用多核处理器的优势。
ForkJoinPool的主要特点包括:
下面是一个简单的使用ForkJoinPool的示例,计算斐波那契数列的值:
虽然上面5个线程池看上去功能特点不同,但是其内部实现原理都调用了JDK的ThreadPoolExecutor线程池类。
ThreadPoolExecutor的构造函数有多个参数,允许根据实际需求配置线程池的行为,主要包括:
corePoolSize:线程池的核心线程数,即线程池中保持的最小线程数量。即使线程处于空闲状态,核心线程也不会被销毁。
maximumPoolSize:线程池的最大线程数,即线程池中允许的最大线程数量。当任务数量超过核心线程数时,线程池会根据实际情况动态地创建新的线程,但不会超过最大线程数。
keepAliveTime:非核心线程的壹脉源码空闲时间超过这个时间后,会被销毁,从而控制线程池的大小。时间单位可以通过指定的TimeUnit来定义。
workQueue:任务队列,用于存放等待执行的任务。ThreadPoolExecutor支持多种类型的任务队列,如ArrayBlockingQueue、LinkedBlockingQueue等。
threadFactory:用于创建线程的工厂,可以自定义线程的创建方式。
handler:拒绝策略,当线程池和队列都满了,无法继续接受新的任务时,会触发拒绝策略来处理新的任务。常见的拒绝策略包括AbortPolicy(默认策略,直接抛出异常)、CallerRunsPolicy(由调用线程来执行被拒绝的任务)、DiscardPolicy(丢弃被拒绝的任务)、DiscardOldestPolicy(丢弃队列中最老的任务)。
ThreadPoolExecutor提供了submit()和execute()等方法来向线程池提交任务,其中:
submit()方法可以接受Callable和Runnable类型的任务,并返回一个Future对象,通过这个对象可以获取任务的执行结果或者取消任务。
execute()方法只能接受Runnable类型的任务,无法获取任务的返回结果。
ThreadPoolExecutor还提供了一些方法来管理和监控线程池的状态,如getActiveCount()、getCompletedTaskCount()、getTaskCount()等。
workQueue:任务队列,其中的任务是被提交但尚未执行的任务。其类型是:BlockingQueue接口,只能用于存放Runable对象。
有界队列可以通过ArrayBlockingQueue实现,当有新任务到来时,如果线程池的实际线程数量小于corePoolSize,则会优先创建新的线程;如果大于corePoolSize,则会将新任务加入等待队列;若队列已满,无法加入,则在总线程数量不大于maximumPoolSize的前提下,创建新的线程执行任务;若大于maximumPoolSize,则执行拒绝策略。
无界队列可以通过LinkedBlockingQueue实现,与有界队列相比,除非系统资源耗尽,不然当有新的任务到来,并且线程数量小于corePoolSize时,线程池就会创建新的线程执行任务。但当线程数量大于corePoolSize后,信用账户源码就不会继续创建了。若还有任务进来,系统CPU没那么忙,还有线程资源,则任务直接进入队列等待。
优先任务队列可以通过PriorityBlockingQueue实现,可以根据任务的优先级执行任务,这是一种特殊的无界队列。
有界队列和无界队列需要做demo测试。新CachedThreadPool内部使用的是SynchronousQueue队列,这是一个直接提交队列,系统会增加新的线程执行任务,当任务执行完毕后,线程会被回收;如果开启大量任务提交,每个任务执行慢,系统会开启等量的线程处理,直到系统资源耗尽。
ThreadPoolExecutor的核心调度代码包括workerCountOf(c)来获取当前工作线程池的总数,addWorker(command)用于提交任务或创建线程池,以及reject(command)来执行拒绝策略。
Handler是RejectedExecutionHandler接口类型,代表了不同的拒绝策略。常见的拒绝策略包括:
CallerRunsPolicy:如果线程池未关闭,会直接在调用者当前线程执行等待队列放不下的任务。
AbortPolicy:会直接抛出异常,阻止系统正常运行。
DiscardPolicy:直接丢失无法放入等待队列的任务,不做异常抛出。
DiscardOldestPolicy:丢弃最老的一个请求,然后尝试把当前请求任务加入到等待队列。
注意:在创建ThreadPoolExecutor时需要指定拒绝策略,如果以上拒绝策略无法满足,可以继承RejectedExecutionHandler接口来实现自定义的拒绝策略。
最常用的是升级DiscardPolicy策略,但需要在放弃前记录请求;示例如下:
以下是RejectedExecutionHandler接口代码,可以重新实现该方法以满足自定义需求。
熟悉拒绝策略后,在线程池中还有重要参数ThreadFactory,用于控制线程的创建。通过ThreadFactory,可以实现以下功能:
命名线程:通过为线程指定有意义的名称,便于跟踪日志和调试信息。
设置线程属性:根据需要设置线程的优先级、守护状态、异常处理器等。
定制化线程创建逻辑:添加自定义逻辑来创建线程,如记录线程的创建次数、设置线程组等。团队系统源码
以下是简单的ThreadFactory示例:
Java多线程——singleThreadExecutor
singleThreadExecutor,Java中Executors类的一个静态方法,创建了一个线程池,该线程池仅包含一个核心线程。这意味着所有任务将由这一单一线程执行,形成单线程执行模式。若核心线程因异常停止,则将启动新的线程替代,确保服务不中断。此线程池特别设计确保任务执行顺序与提交顺序一致,提升程序执行流程的可预测性与稳定性。
创建singleThreadExecutor的代码示例如下:
在这个例子中,ThreadPoolExecutor的corePoolSize和maximumPoolSize的值均为1,明确指出线程池仅包含一个核心线程,且最大线程数同样为1,保证了线程的高效利用。缓冲队列采用的是LinkedBlockingQueue,这是一个无边界队列,用于存储等待执行的任务。
线程池的实现原理 Java线程池实现原理
1、java线程池的实现原理很简单,说白了就是一个线程集合workerSet和一个阻塞队列workQueue。当用户向线程池提交一个任务(也就是线程)时,线程池会先将任务放入workQueue中。workerSet中的线程会不断的从workQueue中获取线程然后执行。当workQueue中没有任务的时候,worker就会阻塞,直到队列中有任务了就取出来继续执行。
2、线程池的几个主要参数的作用
corePoolSize: 规定线程池有几个线程(worker)在运行。
maximumPoolSize: 当workQueue满了,不能添加任务的时候,这个参数才会生效。规定线程池最多只能有多少个线程(worker)在执行。
keepAliveTime: 超出corePoolSize大小的那些线程的生存时间,这些线程如果长时间没有执行任务并且超过了keepAliveTime设定的时间,就会消亡。
unit: 生存时间对于的单位
workQueue: 存放任务的队列
threadFactory: 创建线程的工厂
handler: 当workQueue已经满了,并且线程池线程数已经达到maximumPoolSize,将执行拒绝策略。
Java线程池实现原理及其在美团业务中的实践
随着计算机行业的飞速发展,摩尔定律逐渐失效,多核CPU成为主流。使用多线程并行计算逐渐成为开发人员提升服务器性能的基本武器。J.U.C提供的线程池ThreadPoolExecutor类,帮助开发人员管理线程并方便地执行并行任务。了解并合理使用线程池,是一个开发人员必修的基本功。本文开篇简述了线程池概念和用途,接着结合线程池的房源码查询源码,帮助大家领略线程池的设计思路,最后回归实践,通过案例讲述使用线程池遇到的问题,并给出了一种动态化线程池解决方案。一、写在前面
1.1 线程池是什么
线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。线程过多会带来额外的开销,包括创建销毁线程的开销、调度线程的开销等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。本文描述的线程池是JDK中提供的ThreadPoolExecutor类。
1.2 线程池解决的问题是什么
线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能确定在任意时刻有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下问题:资源分配问题、线程调度问题等。线程池采用了“池化”思想来解决这些问题。Pooling是将资源统一管理的一种思想,不仅能应用在计算机领域,还在金融、设备、人员管理、工作管理等领域有相关应用。在计算机领域,表现为统一管理IT资源,包括服务器、存储、网络等,通过共享资源在低投入中获益。
二、线程池核心设计与实现
Java中的线程池核心实现类是ThreadPoolExecutor,本文基于JDK 1.8的源码来分析线程池的核心设计与实现。首先,我们通过ThreadPoolExecutor的UML类图了解其继承关系,然后深入探讨其设计与实现。
2.1 总体设计
ThreadPoolExecutor实现的顶层接口是Executor,提供了一种思想:将任务提交和任务执行进行解耦。用户只需提供Runnable对象,将任务的运行逻辑提交到执行器(Executor)中,由Executor框架完成线程的调配和任务的执行。ExecutorService接口增加了能力,如补充可以为一个或一批异步任务生成Future的方法以及提供管控线程池的方法,如停止线程池运行。
AbstractExecutorService是上层的抽象类,将执行任务的流程串联起来,保证下层实现只需关注执行任务的方法。ThreadPoolExecutor作为最下层的实现类,实现最复杂的运行部分,负责维护自身的生命周期和管理线程与任务,使两者结合执行并行任务。
ThreadPoolExecutor运行机制分为任务管理和线程管理两部分。任务管理充当生产者的角色,线程池会根据任务的流转决定执行流程。线程管理是消费者,维护线程池内的线程,根据任务请求进行线程分配。
2.2 生命周期管理
线程池运行状态由内部维护,使用变量控制线程池的运行状态和有效线程数量。线程池内部使用AtomicInteger存储关键参数,实现线程池运行状态和线程数量的高效管理。线程池提供方法供用户获取当前运行状态和线程数量,通过位运算实现快速计算。
ThreadPoolExecutor的运行状态有五种,包含生命周期转换。
2.3 任务执行机制
2.3.1 任务调度
任务调度是线程池核心入口,用户提交任务后,决定任务执行流程。通过execute方法完成检查线程池状态、运行线程数和运行策略,决定执行流程,如直接申请线程执行或缓冲到队列执行,或直接拒绝任务。执行流程如下。
2.3.2 任务缓冲
任务缓冲模块实现任务和线程的管理,通过生产者消费者模式和阻塞队列实现。阻塞队列缓存任务,工作线程从队列中获取任务。
2.3.3 任务申请
任务执行有两种可能:直接由新创建的线程执行或从队列中获取任务执行。线程从任务缓存模块不断获取任务,通过getTask方法实现线程管理和任务管理之间的通信。
2.3.4 任务拒绝
任务拒绝策略保护线程池,实现拒绝策略接口定制策略或选择JDK提供的四种已有策略。拒绝策略特点如下。
2.4 Worker线程管理
2.4.1 Worker线程
Worker线程实现Runnable接口,持有线程和任务,通过构造方法创建。Worker线程执行任务模型如下,线程池通过AQS实现独占锁,控制线程生命周期,回收线程。
2.4.2 Worker线程增加
Worker线程增加通过addWorker方法实现,增加线程时考虑线程池状态,策略在上一步完成,仅完成增加线程并运行,最后返回成功结果。方法参数包括firstTask和core,用于指定任务和线程策略。
2.4.3 Worker线程回收
Worker线程回收依赖JVM自动回收,线程池维护线程引用,通过添加和移除引用控制线程生命周期。Worker被创建后,不断获取任务执行,核心线程无限等待,非核心线程限时获取。当无法获取任务时,循环结束,Worker主动移除自身引用。
2.4.4 Worker线程执行任务
Worker线程执行任务通过runWorker方法实现,执行流程如下。
三、线程池在业务中的实践
业务实践中,线程池用于获取并发性,提供典型场景和问题解决方案。
3.1 业务背景
互联网业界追求CPU多核性能,通过线程池管理线程获取并发性。常见场景包括快速响应用户请求和快速处理批量任务。
3.2 实际问题及方案思考
线程池使用面临核心问题:参数配置困难。调研替代方案、参数设置合理性以及线程池参数动态化,动态化线程池提供简单有效的方法解决参数修改成本问题。
3.3 动态化线程池
动态化线程池设计包括整体设计、功能架构,提供参数动态化、监控和告警能力。动态化线程池允许用户在管理平台上修改参数,实时生效,并监控线程池负载、任务执行情况,提供任务级别监控和运行时状态查看。
3.4 实践总结
面对使用线程池的实际问题,动态化线程池提供成本效益平衡的解决方案,降低故障发生的概率,适用于业务需求。
四、参考资料
1. JDK 1.8 源码
2. 维基百科-线程池
3. 更好的使用Java线程池
4. 维基百科Pooling(Resource Management)
5. 深入理解Java线程池:ThreadPoolExecutor
6. 《Java并发编程实践》
java线程池之ScheduledThreadPoolExecutor实现原理
java中异步周期任务调度有Timer,ScheduledThreadPoolExecutor等实现,目前单机版的定时调度都是使用ScheduledThreadPoolExecutor去实现,那么它是如何实现周期执行任务的呢?其实它还是利用ThreadPoolExecutor线程池去执行任务,这一点从它是继承自ThreadPoolExecutor救可以看的出来,其实关键在于如何实现任务的周期性调度,ScheduledThreadPoolExecutor类以及核心函数首先ScheduledThreadPoolExecutor是实现ScheduledExecutorService接口,它主要定义了四个方法:
周期调度一个Runnable的对象
周期调度一个Callable的对象
固定周期调度Runnable对象 (不管上一次Runnable执行结束的时间,总是以固定延迟时间执行 即 上一个Runnable执行开始时候 + 延时时间 = 下一个Runnable执行的时间点)
以固定延迟调度unnable对象(当上一个Runnable执行结束后+固定延迟 = 下一个Runnable执行的时间点)
publicinterfaceScheduledExecutorServiceextendsExecutorService{ publicScheduledFuture<?>schedule(Runnablecommand,longdelay,TimeUnitunit);public<V>ScheduledFuture<V>schedule(Callable<V>callable,longdelay,TimeUnitunit);publicScheduledFuture<?>scheduleAtFixedRate(Runnablecommand,longinitialDelay,longperiod,TimeUnitunit);publicScheduledFuture<?>scheduleWithFixedDelay(Runnablecommand,longinitialDelay,longdelay,TimeUnitunit);}其次,ScheduledThreadPoolExecutor是继承ThreadPoolExecutor,所以它是借助线程池的能力去执行任务,然后自身去实现周期性调度。从构造方法调用父类的线程池的构造方法,核心线程数是构造方法传入,这里可以看到最大线程数是Integer的最大值即, 还有等待队列是DelayedWorkQueue,它是实现延时的关键.
/***Createsanew{ @codeScheduledThreadPoolExecutor}withthe*givencorepoolsize.**@paramcorePoolSizethenumberofthreadstokeepinthepool,even*iftheyareidle,unless{ @codeallowCoreThreadTimeOut}isset*@throwsIllegalArgumentExceptionif{ @codecorePoolSize<0}*/publicScheduledThreadPoolExecutor(intcorePoolSize){ super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}scheduleAtFixedRate是实现周期性调度的方法,调度任务就是实现Runnable对象, 以及系统的开始延时时间,周期的调度的间隔时间。
计算初始触发时间和执行周期,并和传入的Runnable对象作为参数封装成 ScheduledFutureTask,然后调用decorateTask装饰Tas(默认实现为空)。
设置ScheduledFutureTask对象outerTask为t(默认就是它自己)。
调用delayedExecute延迟执行任务。
publicScheduledFuture<?>scheduleAtFixedRate(Runnablecommand,longinitialDelay,longperiod,**TimeUnitunit){ if(command==null||unit==null)thrownewNullPointerException();if(period<=0)thrownewIllegalArgumentException();ScheduledFutureTask<Void>sft=newScheduledFutureTask<Void>(command,null,triggerTime(initialDelay,unit),unit.toNanos(period));RunnableScheduledFuture<Void>t=decorateTask(command,sft);sft.outerTask=t;delayedExecute(t);returnt;}判断线程池状态,如果不是处于running状态,则拒绝该任务。
将该任务加入父类的延迟队列(实际为初始化的DelayedWorkQueue对象)
再次判断线程池不是处于running状态,并且,判断是否是处于shutdown状态并且continueExistingPeriodicTasksAfterShutdown标志是否是true(默认是false,表示是否线程次处于shutdown状态下是否继续执行周期性任务),若果为true,则从队列删除任务,false,则确保启动线程来执行周期性任务
privatevoiddelayedExecute(RunnableScheduledFuture<?>task){ if(isShutdown())reject(task);else{ super.getQueue().add(task);if(isShutdown()&&!canRunInCurrentRunState(task.isPeriodic())&&remove(task))task.cancel(false);elseensurePrestart();}}获取线程池数量
如果小于核心线程数,则启动核心线程执行任务,如果线程数为空,则启动非核心线程
voidensurePrestart(){ intwc=workerCountOf(ctl.get());if(wc<corePoolSize)addWorker(null,true);elseif(wc==0)addWorker(null,false);}ScheduledFutureTask的run函数获取是否是周期性任务
判断是否线程池状态是否可以执行任务,如果为true,则取消任务 3 如果是非周期性任务,则直接调用父类FutureTask的run方法, 4 如果是周期性任务,则调用FutureTask的runAndReset函数, 如果该函数返回为true,则调用setNextRunTime设置下一次运行的时间, 并且还行reExecutePeriodic再次执行周期性任务。
publicvoidrun(){ booleanperiodic=isPeriodic();if(!canRunInCurrentRunState(periodic))cancel(false);elseif(!periodic)ScheduledFutureTask.super.run();elseif(ScheduledFutureTask.super.runAndReset()){ setNextRunTime();reExecutePeriodic(outerTask);}}判断线程池是否处于可执行任务的状态,如果为true,则重新将设置下一次运行时间的任务加入父类的等待队列,
如果线程池处于不可运行任务的状态,则并且从等待队列中移除成功, 调用任务的取消操作,否则调用ensurePrestart确保启动线程执行任务
voidreExecutePeriodic(RunnableScheduledFuture<?>task){ if(canRunInCurrentRunState(true)){ super.getQueue().add(task);if(!canRunInCurrentRunState(true)&&remove(task))task.cancel(false);elseensurePrestart();}}DelayedWorkQueue类核心函数DelayedWorkQueue是继承AbstractQueue,并实现BlockingQueue接口
staticclassDelayedWorkQueueextendsAbstractQueue<Runnable>implementsBlockingQueue<Runnable>{核心字段
//初始容量为privatestaticfinalintINITIAL_CAPACITY=;//等待队列,只能保存RunnableScheduledFuture对象privateRunnableScheduledFuture<?>[]queue=newRunnableScheduledFuture<?>[INITIAL_CAPACITY];//锁privatefinalReentrantLocklock=newReentrantLock();//对俄大小privateintsize=0;//leader线程,表示最近需要执行的任务的线程。privateThreadleader=null;//条件锁privatefinalConditionavailable=lock.newCondition();offer函数:
将添加的参数转换成RunnableScheduledFuture对象。
加全局锁。
获取当前队列的size,如果等于队列的长度,则嗲用grow扩容,增加%的数组长度。
size加1。
如果数组为0,则将加入的对象放在索引为0的位置,然后设置ScheduledFutureTask的heapIndex的索引(便于后续快速删除)。
调用siftUp做堆的上浮操作,这里是小根堆的操作。
如果队列中第一个元素是传入的对象,则将laader设置null
释放锁
返回true
publicbooleanoffer(Runnablex){ if(x==null)thrownewNullPointerException();RunnableScheduledFuture<?>e=(RunnableScheduledFuture<?>)x;finalReentrantLocklock=this.lock;lock.lock();try{ inti=size;if(i>=queue.length)grow();size=i+1;if(i==0){ queue[0]=e;setIndex(e,0);}else{ siftUp(i,e);}if(queue[0]==e){ leader=null;available.signal();}}finally{ lock.unlock();}returntrue;}siftUp主要就是做小根堆的上移操作,从if (key.compareTo(e) >= 0) 看出,如果key大于parent索引的元素,则停止。
/***Createsanew{ @codeScheduledThreadPoolExecutor}withthe*givencorepoolsize.**@paramcorePoolSizethenumberofthreadstokeepinthepool,even*iftheyareidle,unless{ @codeallowCoreThreadTimeOut}isset*@throwsIllegalArgumentExceptionif{ @codecorePoolSize<0}*/publicScheduledThreadPoolExecutor(intcorePoolSize){ super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}0poll函数
加锁
获取队列中索引为0的云元素,若果为null或者第一个元素的执行时间戳时间大于当前时间则直接返回null,否则调用finishPoll将第一个元素返回.
释放锁
/***Createsanew{ @codeScheduledThreadPoolExecutor}withthe*givencorepoolsize.**@paramcorePoolSizethenumberofthreadstokeepinthepool,even*iftheyareidle,unless{ @codeallowCoreThreadTimeOut}isset*@throwsIllegalArgumentExceptionif{ @codecorePoolSize<0}*/publicScheduledThreadPoolExecutor(intcorePoolSize){ super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}1将队列size 减 1
获取队列中队列中最后一个元素,并且设置队列最后一个为null
最后一个元素不为null,则调用sfitdown进行,将最后一个元素设置到索引为0的位置,将下移操作,重新调整小根堆。
ScheduledFutureTask的heapIndex为-1
/***Createsanew{ @codeScheduledThreadPoolExecutor}withthe*givencorepoolsize.**@paramcorePoolSizethenumberofthreadstokeepinthepool,even*iftheyareidle,unless{ @codeallowCoreThreadTimeOut}isset*@throwsIllegalArgumentExceptionif{ @codecorePoolSize<0}*/publicScheduledThreadPoolExecutor(intcorePoolSize){ super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}2ScheduledFutureTask的compareTo函数ScheduledFutureTask实现compareTo方法逻辑
首先比较是否是同一个对象
若果是ScheduledFutureTask对象,则比较time的大小,time是下一次执行的任务的时间戳,如果不是,则比较 getDelay的时间大小
/***Createsanew{ @codeScheduledThreadPoolExecutor}withthe*givencorepoolsize.**@paramcorePoolSizethenumberofthreadstokeepinthepool,even*iftheyareidle,unless{ @codeallowCoreThreadTimeOut}isset*@throwsIllegalArgumentExceptionif{ @codecorePoolSize<0}*/publicScheduledThreadPoolExecutor(intcorePoolSize){ super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}3ScheduledThreadPoolExecutor的take函数就是ThreadPoolExecutor的从任务队列中获取任务,没有任务则一直等待(这里是线程数小于核心线程数的情况)
加可中断锁
获取队列中第一个元素的任务,从前面可以知道此任务执行的时间戳最小的任务
如果第一个任务为空,则再全局的锁的条件锁上等待,
如果第一个任务不为空,则获取延迟时间,如果延时时间小于0,说明第一个任务已经到时间了,则返回第一个任务。
如果leader线程不为空,则让线程在全局锁的条件锁上等待
如果leader为空,则将获取第一个任务的当前线程赋值为leader变量。
在全局锁的条件锁上等待delay纳秒, 等待结束后,如果当前线程还是等于leader线程,则重置leader为空
最后判断 leader为空并且第一个任务不为空,则唤醒全局锁上条件锁的等待的线程。
释放全局锁。
/***Createsanew{ @codeScheduledThreadPoolExecutor}withthe*givencorepoolsize.**@paramcorePoolSizethenumberofthreadstokeepinthepool,even*iftheyareidle,unless{ @codeallowCoreThreadTimeOut}isset*@throwsIllegalArgumentExceptionif{ @codecorePoolSize<0}*/publicScheduledThreadPoolExecutor(intcorePoolSize){ super(corePoolSize,Integer.MAX_VALUE,0,NANOSECONDS,newDelayedWorkQueue());}4总结\ 综合前面所述,线程池从DelayedWorkQueue每次取出的任务就是延迟时间最小的任务, 若果到达时间的任务,则执行任务,否则则用条件锁Conditon的wait进行等待,执行完后,则用signal进行唤醒下一个任务的执行。
Java高并发编程实战5,异步注解@Async自定义线程池
@Async注解的作用是异步处理任务。
在使用@Async时,如果不指定线程池的名称,默认线程池是Spring默认的线程池SimpleAsyncTaskExecutor。
默认线程池的配置如下:
从最大线程数可以看出,在并发情况下,会无限制地创建线程。
也可以通过yml重新配置:
也可以自定义线程池,下面通过简单的代码来实现@Async自定义线程池。
二、代码实例
导入POM
配置类AsyncTaskConfig
UserController
UserService
UserServiceImpl
三、为什么在文件内执行异步任务,还是一个线程,没有实现@Async效果?
在众多尝试中,找到了@Async失效的几个原因:
四、配置中使用了ThreadPoolTaskExecutor和ThreadPoolExecutor,这两个有什么区别?
ThreadPoolTaskExecutor是spring core包中的,而ThreadPoolExecutor是JDK中的JUC。
1、initialize()
查看ThreadPoolTaskExecutor的initialize()方法
2、initializeExecutor抽象方法
再查看initializeExecutor抽象方法的具体实现类,其中有一个就是ThreadPoolTaskExecutor类,查看它的initializeExecutor方法,使用的就是ThreadPoolExecutor。
因此可以了解到ThreadPoolTaskExecutor是对ThreadPoolExecutor进行了封装。
五、核心线程数
配置文件中的线程池核心线程数为何配置为Runtime.getRuntime().availableProcessors()?
获取的是CPU核心线程数,也就是计算资源。
在实际中,需要对具体的线程池大小进行调整,可以通过压测及机器设备现状,进行调整大小。如果线程池太大,则会造成CPU不断的切换,对整个系统性能也不会有太大的提升,反而会导致系统缓慢。
六、线程池执行流程
2025-01-13 10:42
2025-01-13 10:41
2025-01-13 10:33
2025-01-13 09:22
2025-01-13 08:53