1.java 线程池 工作队列是线程如何工作的
2.redis7.0源码阅读:Redis中的IO多线程(线程池)
3.Java并发必会,深入剖析Semaphore源码
java 线程池 工作队列是池源程池如何工作的
使用线程池的好处1、降低资源消耗
可以重复利用已创建的码线线程降低线程创建和销毁造成的消耗。
2、源码提高响应速度
当任务到达时,解析任务可以不需要等到线程创建就能立即执行。线程优品指标源码
3、池源程池提高线程的码线可管理性
线程是稀缺资源,如果无限制地创建,源码不仅会消耗系统资源,解析还会降低系统的线程稳定性,使用线程池可以进行统一分配、池源程池调优和监控
线程池的码线工作原理
首先我们看下当一个新的任务提交到线程池之后,线程池是源码如何处理的
1、线程池判断核心线程池里的解析线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则执行第二步。
2、搜索引擎 源码线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里进行等待。如果工作队列满了,则执行第三步
3、线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务
线程池饱和策略
这里提到了线程池的饱和策略,那我们就简单介绍下有哪些饱和策略:
AbortPolicy
为Java线程池默认的阻塞策略,不执行此任务,而且直接抛出一个运行时异常,切记ThreadPoolExecutor.execute需要try catch,否则程序会直接退出。
DiscardPolicy
直接抛弃,任务不执行,空方法
DiscardOldestPolicy
从队列里面抛弃head的一个任务,并再次execute 此task。拦截封包发送封包源码
CallerRunsPolicy
在调用execute的线程里面执行此command,会阻塞入口
用户自定义拒绝策略(最常用)
实现RejectedExecutionHandler,并自己定义策略模式
下我们以ThreadPoolExecutor为例展示下线程池的工作流程图
1.jpg
2.jpg
1、如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
2、如果运行的线程等于或多于corePoolSize,则将任务加入BlockingQueue。
3、如果无法将任务加入BlockingQueue(队列已满),则在非corePool中创建新的线程来处理任务(注意,执行这一步骤需要获取全局锁)。
4、如果创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
ThreadPoolExecutor采取上述步骤的总体设计思路,是聚合服务商源码为了在执行execute()方法时,尽可能地避免获取全局锁(那将会是一个严重的可伸缩瓶颈)。在ThreadPoolExecutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都是执行步骤2,而步骤2不需要获取全局锁。
线程池只是并发编程中的一小部分,下图是史上最全面的Java的并发编程学习技术总汇
3.jpg
关键方法源码分析
我们看看核心方法添加到线程池方法execute的源码如下:
// //Executes the given task sometime in the future. The task //may execute in a new thread or in an existing pooled thread. // // If the task cannot be submitted for execution, either because this // executor has been shutdown or because its capacity has been reached, // the task is handled by the current { @code RejectedExecutionHandler}. // // @param command the task to execute // @throws RejectedExecutionException at discretion of // { @code RejectedExecutionHandler}, if the task // cannot be accepted for execution // @throws NullPointerException if { @code command} is null // public void execute(Runnable command) { if (command == null) throw new NullPointerException(); // // Proceed in 3 steps: // // 1. If fewer than corePoolSize threads are running, try to // start a new thread with the given command as its first // task. The call to addWorker atomically checks runState and // workerCount, and so prevents false alarms that would add // threads when it shouldn't, by returning false. // 翻译如下: // 判断当前的线程数是否小于corePoolSize如果是,使用入参任务通过addWord方法创建一个新的线程, // 如果能完成新线程创建exexute方法结束,成功提交任务 // 2. If a task can be successfully queued, then we still need // to double-check whether we should have added a thread // (because existing ones died since last checking) or that // the pool shut down since entry into this method. So we // recheck state and if necessary roll back the enqueuing if // stopped, or start a new thread if there are none. // 翻译如下: // 在第一步没有完成任务提交;状态为运行并且能否成功加入任务到工作队列后,再进行一次check,如果状态 // 在任务加入队列后变为了非运行(有可能是在执行到这里线程池shutdown了),非运行状态下当然是需要 // reject;然后再判断当前线程数是否为0(有可能这个时候线程数变为了0),如是,新增一个线程; // 3. If we cannot queue task, then we try to add a new // thread. If it fails, we know we are shut down or saturated // and so reject the task. // 翻译如下: // 如果不能加入任务到工作队列,将尝试使用任务新增一个线程,如果失败,则是线程池已经shutdown或者线程池 // 已经达到饱和状态,所以reject这个他任务 // int c = ctl.get(); // 工作线程数小于核心线程数 if (workerCountOf(c) < corePoolSize) { // 直接启动新线程,true表示会再次检查workerCount是宝龙互娱 源码否小于corePoolSize if (addWorker(command, true)) return; c = ctl.get(); } // 如果工作线程数大于等于核心线程数 // 线程的的状态未RUNNING并且队列notfull if (isRunning(c) && workQueue.offer(command)) { // 再次检查线程的运行状态,如果不是RUNNING直接从队列中移除 int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) // 移除成功,拒绝该非运行的任务 reject(command); else if (workerCountOf(recheck) == 0) // 防止了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。 // 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务 addWorker(null, false); } // 如果队列满了或者是非运行的任务都拒绝执行 else if (!addWorker(command, false)) reject(command); }
redis7.0源码阅读:Redis中的IO多线程(线程池)
Redis服务端处理客户端请求时,采用单线程模型执行逻辑操作,然而读取和写入数据的操作则可在IO多线程模型中进行。在Redis中,命令执行发生在单线程环境中,而数据的读取与写入则通过线程池进行。一个命令从客户端接收,解码成具体命令,根据该命令生成结果后编码并回传至客户端。 Redis配置文件redis.conf中可设置开启IO多线程。通过设置`io-threads-do-reads yes`开启多线程,同时配置`io-threads 2`来创建两个线程,其中一个是主线程,另一个为IO线程。在网络处理文件networking.c中,`stopThreadedIOIfNeeded`函数会判断当前需要执行的命令数是否超过线程数,若少于线程数,则不开启多线程模式,便于调试。 要进入IO多线程模式,运行redis-server命令,然后在调试界面设置断点在networking.c的`readQueryFromClient`函数中。使用redis-cli输入命令时,可以观察到两个线程在运行,一个为主线程,另一个为IO线程。 相关视频推荐帮助理解线程池在Redis中的应用,包括手写线程池及线程池在后端开发中的实际应用。学习资源包括C/C++ Linux服务器开发、后台架构师技术等领域,需要相关资料可加入交流群获取免费分享。 在Redis中,IO线程池实现中,主要包括以下步骤:读取任务的处理通过`postponeClientRead`函数,判断是否启用IO多线程模式,将任务加入到待执行任务队列。
主线程执行`postponeClientRead`函数,将待读客户端任务加入到读取任务队列。在多线程模式下,任务被添加至队列中,由IO线程后续执行。
多线程读取IO任务`handleClientsWithPendingReadsUsingThreads`通过解析协议进行数据读取,与写入任务的多线程处理机制相似。
多线程写入IO任务`handleClientsWithPendingWritesUsingThreads`包括判断是否需要启动IO多线程、负载均衡分配任务到不同IO线程、启动IO子线程执行写入操作、等待IO线程完成写入任务等步骤。负载均衡通过将任务队列中的任务均匀分配至不同的线程消费队列中,实现无锁化操作。
线程调度部分包含开启和关闭IO线程的功能。在`startThreadedIO`中,每个IO线程持有锁,若主线程释放锁,线程开始工作,IO线程标识设置为活跃状态。而在`stopThreadedIO`中,若主线程获取锁,则IO线程等待并停止,IO线程标识设置为非活跃状态。Java并发必会,深入剖析Semaphore源码
在深入理解Java并发编程时,必不可少的是对Semaphore源码的剖析。本文将带你探索这一核心组件,通过实践和源码解析,掌握其限流和共享锁的本质。Semaphore,中文名信号量,就像一个令牌桶,任务执行前需要获取令牌,处理完毕后归还,确保资源访问的有序进行。
首先,Semaphore主要有acquire()和release()两个方法。acquire()负责获取许可,若许可不足,任务会被阻塞,直到有许可可用。release()用于释放并归还许可,确保资源释放后,其他任务可以继续执行。一个典型的例子是,如果一个线程池接受个任务,但Semaphore限制为3,那么任务将按每3个一组执行,确保系统稳定性。
Semaphore的源码实现巧妙地结合了AQS(AbstractQueuedSynchronizer)框架,通过Sync同步变量管理许可数量,公平锁和非公平锁的实现方式有所不同。公平锁会优先处理队列中的任务,而非公平锁则按照获取许可的顺序进行。
acquire()方法主要调用AQS中的acquireSharedInterruptibly(),并进一步通过tryReleaseShared()进行许可更新,公平锁与非公平锁的区别在于判断队列中是否有前置节点。release()方法则调用releaseShared(),更新许可数量。
Semaphore的简洁逻辑在于,AQS框架负责大部分并发控制,子类只需实现tryReleaseShared()和tryAcquireShared(),专注于许可数量的管理。欲了解AQS的详细流程,可参考之前的文章。
最后,了解了Semaphore后,我们还将继续探索共享锁CyclicBarrier的实现,敬请期待下篇文章。