1.park Դ?源码?
2.Golang GMP 原理
3.Java ä¸LockSupportç±»å¨C#ä¸çå®ç°
4.Rust并发:标准库sync::Once源码分析
5.LockSupportçparkçå¾
çåºå±å®ç°
6.线程池中空闲的线程处于什么状态?
park Դ??
我最近建立了一个在线自习室(App:番茄ToDO)用于相互监督学习,感兴趣的源码小伙伴可以加入。自习室加入码:D5A7A
Java并发包下的源码类大多基于AQS(AbstractQueuedSynchronizer)框架实现,而AQS线程安全的源码实现依赖于两个关键类:Unsafe和LockSupport。
其中,源码Unsafe主要提供CAS操作(关于CAS,源码撮合源码在文章《读懂AtomicInteger源码(多线程专题)》中讲解过),源码LockSupport主要提供park/unpark操作。源码实际上,源码park/unpark操作的源码最终调用还是基于Unsafe类,因此Unsafe类才是源码核心。
Unsafe类的源码实现是由native关键字说明的,这意味着这个方法是源码原生函数,是源码用C/C++语言实现的,并被编译成了DLL,源码由Java去调用。
park函数的作用是将当前调用线程阻塞,而unpark函数则是唤醒指定线程。
park是等待一个许可,unpark是为某线程提供一个许可。如果线程A调用park,除非另一个线程调用unpark(A)给A一个许可,否则线程A将阻塞在park操作上。每次调用一次park,需要有一个unpark来解锁。
并且,unpark可以先于park调用,但不管unpark先调用多少次,都只提供一个许可,不可叠加。只需要一次park来消费掉unpark带来的许可,再次调用会阻塞。
在Linux系统下,park和unpark是通过Posix线程库pthread中的mutex(互斥量)和condition(条件变量)来实现的。
简单来说,mutex和condition保护了一个叫_counter的信号量。当park时,电脑好玩的源码这个变量被设置为0,当unpark时,这个变量被设置为1。当_counter=0时线程阻塞,当_counter>0时直接设为0并返回。
每个Java线程都有一个Parker实例,Parker类的部分源码如下:
由源码可知,Parker类继承于PlatformParker,实际上是用Posix的mutex和condition来实现的。Parker类里的_counter字段,就是用来记录park和unpark是否需要阻塞的标识。
具体的执行逻辑已经用注释标记在代码中,简要来说,就是检查_counter是不是大于0,如果是,则把_counter设置为0,返回。如果等于零,继续执行,阻塞等待。
unpark直接设置_counter为1,再unlock mutex返回。如果_counter之前的值是0,则还要调用pthread_cond_signal唤醒在park中等待的线程。源码如下:
(如果不会下载JVM源码可以后台回复“jdk”,获得下载压缩包)
Golang GMP 原理
通常语义中的线程,指的是内核级线程,核心点包括:(1)它是操作系统最小调度单元;(2)创建、销毁、调度交由内核完成,cpu 需完成用户态与内核态间的切换;(3)可充分利用多核,实现并行。
协程,又称为用户级线程,核心点如下:(1)与线程存在映射关系,为 M:1;(2)创建、各种支付系统源码销毁、调度在用户态完成,对内核透明,所以更轻;(3)从属同一个内核级线程,无法并行;一个协程阻塞会导致从属同一线程的所有协程无法执行。
Goroutine,经 Golang 优化后的特殊“协程”,核心点包括:(1)与线程存在映射关系,为 M:N;(2)创建、销毁、调度在用户态完成,对内核透明,足够轻便;(3)可利用多个线程,实现并行;(4)通过调度器的斡旋,实现和线程间的动态绑定和灵活调度;(5)栈空间大小可动态扩缩,因地制宜。
对比三个模型的各项能力:综上,goroutine 可说是博采众长之物。
实际上,“灵活调度” 一词概括得实在过于简要,Golang 在调度 goroutine 时,针对“如何减少加锁行为”,“如何避免资源不均”等问题都给出了精彩的解决方案,这一切都得益于经典的 “gmp” 模型。
GMP = goroutine + machine + processor(+ 一套有机组合的机制),下面先单独拆出每个组件进行介绍,最后再总览全局,对 GMP 进行总述。
G = goroutine,是 Golang 中对协程的抽象;(2)g 有自己的运行栈、状态、以及执行的任务函数(用户通过 go func 指定);(3)g 需要绑定到 p 才能执行,在 g 的视角中,p 就是它的 cpu。
P = processor,是springmvc自动配置源码 Golang 中的调度器;(2)p 是 gmp 的中枢,借由 p 承上启下,实现 g 和 m 之间的动态有机结合;(3)对 g 而言,p 是其 cpu,g 只有被 p 调度,才得以执行;(4)对 m 而言,p 是其执行代理,为其提供必要信息的同时(可执行的 g、内存分配情况等),并隐藏了繁杂的调度细节;(5)p 的数量决定了 g 最大并行数量,可由用户通过 GOMAXPROCS 进行设定(超过 CPU 核数时无意义)。
M = machine,是 Golang 中对线程的抽象;(1)m 不直接执行 g,而是先和 p 绑定,由其实现代理;(3)借由 p 的存在,m 无需和 g 绑死,也无需记录 g 的状态信息,因此 g 在全生命周期中可以实现跨 m 执行。
全局有多个 M 和多个 P,但同时并行的 G 的最大数量等于 P 的数量。G 的存放队列有三类:P 的本地队列;全局队列;和 wait 队列(图中未展示,为 io 阻塞就绪态 goroutine 队列)。
M 调度 G 时,优先取 P 本地队列,其次取全局队列,最后取 wait 队列。这样的好处是,取本地队列时,可以接近于无锁化,减少全局锁竞争。为防止不同 P 的闲忙差异过大,设立 work-stealing 机制,本地队列为空的 P 可以尝试从其他 P 本地队列偷取一半的 G 补充到自身队列。
核心数据结构定义于 runtime/runtime2.go 文件中,各个类的成员属性较多,这里只摘取核心字段进行介绍:g 的生命周期由以下几种状态组成:_Gidle(值为 0,为协程开始创建时的javaee的考试源码状态,此时尚未初始化完成);_Grunnable(值为 1,协程在待执行队列中,等待被执行);_Grunning(值为 2,协程正在执行,同一时刻一个 p 中只有一个 g 处于此状态);_Gsyscall(值为 3,协程正在执行系统调用);_Gwaiting(值为 4,协程处于挂起态,需要等待被唤醒. gc、channel 通信或者锁操作时经常会进入这种状态);_Gdead(值为 6,协程刚初始化完成或者已经被销毁,会处于此状态);_Gcopystack(值为 8,协程正在栈扩容流程中);_Greempted(值为 9,协程被抢占后的状态)。
文字性总结难免有些过于含糊和空洞,对一些细节的描述总是不够精确的。下面照旧开启源码走读流程,从代码中寻求理论证明和细节补充。
gmp 数据结构定义为 runtime/runtime2.go 文件中,由于各个类的成员属性较多,那么只摘取核心字段进行介绍:(1)m:在 p 的代理,负责执行当前 g 的 m;(2)sched.sp:保存 CPU 的 rsp 寄存器的值,指向函数调用栈栈顶;(3)sched.pc:保存 CPU 的 rip 寄存器的值,指向程序下一条执行指令的地址;(4)sched.ret:保存系统调用的返回值;(5)sched.bp:保存 CPU 的 rbp 寄存器的值,存储函数栈帧的起始位置。其中 g 的生命周期由以下几种状态组成:(1)_Gidle(值为 0,为协程开始创建时的状态,此时尚未初始化完成);(2)_Grunnable(值为 1,协程在待执行队列中,等待被执行);(3)_Grunning(值为 2,协程正在执行,同一时刻一个 p 中只有一个 g 处于此状态);(4)_Gsyscall(值为 3,协程正在执行系统调用);(5)_Gwaiting(值为 4,协程处于挂起态,需要等待被唤醒. gc、channel 通信或者锁操作时经常会进入这种状态);(6)_Gdead(值为 6,协程刚初始化完成或者已经被销毁,会处于此状态);(7)_Gcopystack(值为 8,协程正在栈扩容流程中);(8)_Greempted(值为 9,协程被抢占后的状态)。
其中,goroutine 的类型可分为两类:(1)I 负责调度普通 g 的 g0,执行固定的调度流程,与 m 的关系为一对一;(2)II 负责执行用户函数的普通 g。m 通过 p 调度执行的 goroutine 永远在普通 g 和 g0 之间进行切换。
主动调度是用户主动执行让渡的方式,主要方式是,用户在执行代码中调用了 runtime.Gosched 方法,此时当前 g 会当让出执行权,主动进行队列等待下次被调度执行。被动调度因当前不满足某种执行条件,g 可能会陷入阻塞态无法被调度,直到关注的条件达成后,g 才从阻塞中被唤醒,重新进入可执行队列等待被调度。
正常调度指的是 g 中的执行任务已完成,g0 会将当前 g 置为死亡状态,发起新一轮调度。抢占调度指的是 g 执行系统调用超过指定的时长,且全局的 p 资源比较紧缺,此时将 p 和 g 解绑,抢占出来用于其他 g 的调度。
调度流程的主干方法是位于 runtime/proc.go 中的 schedule 函数。在宏观调度流程中,我们可以尝试对 gmp 的宏观调度流程进行整体串联,包括:(1)以 g0 -> g -> g0 的一轮循环为例进行串联;(2)g0 执行 schedule() 函数,寻找到用于执行的 g;(3)g0 执行 execute() 方法,更新当前 g、p 的状态信息,并调用 gogo() 方法,将执行权交给 g;(4)g 因主动让渡(gosche_m())、被动调度(park_m())、正常结束(goexit0())等原因,调用 m_call 函数,执行权重新回到 g0 手中;(5)g0 执行 schedule() 函数,开启新一轮循环。
在 Golang 中,调度流程的主干方法是位于 runtime/proc.go 中的 schedule 函数,此时的执行权位于 g0 手中。在 findRunnable 方法中,调度流程中,一个非常核心的步骤就是为 m 寻找到下一个执行的 g。在 execute 方法中,当 g0 为 m 寻找到可执行的 g 之后,接下来就开始执行 g。
在 g 执行主动让渡时,会调用 mcall 方法将执行权归还给 g0,并由 g0 调用 gosched_m 方法。在 g 需要被动调度时,会调用 mcall 方法切换至 g0,并调用 park_m 方法将 g 置为阻塞态。当 g 执行完成时,会先执行 mcall 方法切换至 g0,然后调用 goexit0 方法。与 g 的系统调用有关的,视角切换回发生系统调用前,与 g 绑定的原 m 当中,此时执行权同样位于 m 的 g0 手中。在 m 需要执行系统调用前,会先执行位于 runtime/proc.go 的 reentersyscall 的方法。当 m 完成了内核态的系统调用之后,此时会步入位于 runtime/proc.go 的 exitsyscall 函数中。
与 g 的系统调用有关的,视角切换回发生系统调用前,与 g 绑定的原 m 当中,在 m 需要执行系统调用前,会先执行位于 runtime/proc.go 的 reentersyscall 的方法。当 m 完成了内核态的系统调用之后,此时会步入位于 runtime/proc.go 的 exitsyscall 函数中。
当 g 执行完成时,会先执行 mcall 方法切换至 g0,然后调用 goexit0 方法。当 m 完成了内核态的系统调用之后,此时会步入位于 runtime/proc.go 的 exitsyscall 函数中。
对于抢占调度的执行者,不是 g0,而是一个全局的 monitor g,代码位于 runtime/proc.go 的 retake 方法中。与 g 的系统调用有关的,视角切换回发生系统调用前,与 g 绑定的原 m 当中,在 m 需要执行系统调用前,会先执行位于 runtime/proc.go 的 reentersyscall 的方法。当 m 完成了内核态的系统调用之后,此时会步入位于 runtime/proc.go 的 exitsyscall 函数中。
在 Golang 中,调度流程的主干方法是位于 runtime/proc.go 中的 schedule 函数,此时的执行权位于 g0 手中。在 findRunnable 方法中,调度流程中,一个非常核心的步骤就是为 m 寻找到下一个执行的 g。在 execute 方法中,当 g0 为 m 寻找到可执行的 g 之后,接下来就开始执行 g。
在 g 执行主动让渡时,会调用 mcall 方法将执行权归还给 g0,并由 g0 调用 gosched_m 方法。在 g 需要被动调度时,会调用 mcall 方法切换至 g0,并调用 park_m 方法将 g 置为阻塞态。当 g 执行完成时,会先执行 mcall 方法切换至 g0,然后调用 goexit0 方法。当 m 完成了内核态的系统调用之后,此时会步入位于
Java ä¸LockSupportç±»å¨C#ä¸çå®ç°
ããJava ä¹åæä¾ä¼ç§ç并ååºncurrent Netä¸ç¼ºä¹ç±»ä¼¼çåè½ ç±äºç¡¬ä»¶ä½ç³»åçäºåå å¤æ ¸æ¶ä»£æ¥ä¸´ NETä¸ç¼ºä¹å¹¶åç±»åºæ¾ç¶ä¸åæ¶å® ç¼è§£è¿ä¸çç¾çå ¶ä¸ä¸ä¸ªåæ³å°±æ¯å¨å¾C#ä¸ç§»æ¤javaçncurrentããjavaä¸çncurrentå ä¸æä¾äºä¸ä¸ªç±»LockSupport ncurrentå å¾å¤å ³é®å®ç°éè¦è°ç¨LockSupport å¦æéè¦æjavaçncurrentå è¿ç§»å°C#ä¸ LockSupportç±»çè¿ç§»æ¯ä¸å¯é¿å çé®é¢
ããå¨javaä¸ LockSupportç±»æå¦ä¸æ¹æ³
ãã以ä¸æ¯å¼ç¨ç段
ãã
ããpublic static void park(Object blocker) { ããThread t = Thread currentThread(); ããsetBlocker(t blocker); ããunsafe park(false L); ããsetBlocker(t null); ãã}
ããå½ä¸ä¸ªçº¿ç¨è°ç¨LockSupport parkä¹å 线ç¨å°±ä¼åä¸è½½ 类似äºObject wait æè NETä¸çSystem Threading Monitor Wait ä½é®é¢æ¯javaä¸çObject waitå NETä¸çMonitor wait é½éè¦ä¸ä¸ªwaitObject è¿ä¸ªé®é¢æ¾ç»å°æ°æ 为æ¤ç¿»äºä¸éJDK å®ç°æºç å°æååç°ç解å³åæ³å´æ¯å¾ç®å ä¹æ éäºè§£JDKçåºå±å®ç°æºç
ãã以ä¸æ¯å¼ç¨ç段
ãã
ããpublic class LockSupport ãã{ ããprivate static LocalDataStoreSlot slot = Thread GetNamedDataSlot ( LockSupport Park ); ããpublic static void Park(Object blocker) ãã{ ããThread thread = Thread CurrentThread; ããThread SetData(slot blocker); ããlock (thread) ãã{ ããMonitor Wait(thread); ãã} ãã} ããpublic static void Unpark(Thread thread) ãã{ ããif (thread == null) return; ããlock (thread) ãã{ ããMonitor Pulse(thread); ãã} ãã} ãã}
lishixinzhi/Article/program/net//Rust并发:标准库sync::Once源码分析
一次初始化同步原语Once,其核心功能在于确保闭包仅被执行一次。常见应用包括FFI库初始化、静态变量延迟初始化等。
标准库中的Once实现更为复杂,其关键在于如何高效地模拟Mutex阻塞与唤醒机制。这一机制依赖于线程暂停和唤醒原语thread::park/unpark,它们是实现多线程同步对象如Mutex、Condvar等的基础。
具体实现中,Once维护四个内部状态,状态与等待队列头指针共同存储于AtomicUsize中,利用4字节对齐优化空间。
构造Once实例时,初始化状态为Incomplete。调用Once::call_once或Once::call_once_force时,分别检查是否已完成初始化,未完成则执行闭包,闭包执行路径标记为冷路径以节省资源,同时避免泛型导致的代码膨胀。
闭包执行逻辑由Once::call_inner负责,线程尝试获取执行权限,未能获取则进入等待状态,获取成功后执行闭包,结束后唤醒等待线程。
等待队列通过无锁侵入式链表实现,节点在栈上分配,以优化内存使用。Once::wait函数实现等待线程逻辑,WaiterQueue的drop方法用于唤醒所有等待线程,需按特定顺序操作栈节点,以避免use after free等潜在问题。
思考题:如何在实际项目中利用Once实现资源安全共享?如何评估Once与Mutex等同步原语在不同场景下的性能差异?
LockSupportçparkçå¾ çåºå±å®ç°
ä»ä¸ä¸ç¯æç« ä¸çJDKç延è¿éåä¸,æç»æ¯éè¿LockSupport.parkå®ç°çº¿ç¨ççå¾ ï¼é£ä¹åºå±æ¯å¦ä½å®ç°çå¾ åè¶ æ¶çå¾ çï¼æ¬ææ们æ¥æ¢è®¨ä¸ä¸ãLockSupportçparkåunparkçæ¹æ³publicstaticvoidpark(){ UNSAFE.park(false,0L);}publicstaticvoidparkNanos(longnanos){ if(nanos>0)UNSAFE.park(false,nanos);}publicstaticvoidunpark(Threadthread){ if(thread!=null)UNSAFE.unpark(thread);}ä»ä¸é¢å¯ä»¥çå°å®é LockSupport.parkæ¯éè¿Unsafeççparkæ¹æ³å®ç°ï¼ä»ä¸é¢çæ¹æ³å¯ä»¥çåºè¿ä¸ªæ¯ä¸ä¸ªnativeæ¹æ³.
/***Blockscurrentthread,returningwhenabalancing*{ @codeunpark}occurs,orabalancing{ @codeunpark}has*alreadyoccurred,orthethreadisinterrupted,or,ifnot*absoluteandtimeisnotzero,thegiventimenanosecondshave*elapsed,orifabsolute,thegivendeadlineinmilliseconds*sinceEpochhaspassed,orspuriously(i.e.,returningforno*"reason").Note:ThisoperationisintheUnsafeclassonly*because{ @codeunpark}is,soitwouldbestrangetoplaceit*elsewhere.*/publicnativevoidpark(booleanisAbsolute,longtime);JVMçUnsafeçparkæ¹æ³ä»ä¸é¢JDKä¸ä»£ç ä¸å¯ä»¥threadçParkerç对象çparkæ¹æ³è¿è¡ä¸æ®µæ¶é´ççå¾ ã
UNSAFE_ENTRY(void,Unsafe_Park(JNIEnv*env,jobjectunsafe,jbooleanisAbsolute,jlongtime)){ HOTSPOT_THREAD_PARK_BEGIN((uintptr_t)thread->parker(),(int)isAbsolute,time);EventThreadParkevent;JavaThreadParkedStatejtps(thread,time!=0);thread->parker()->park(isAbsolute!=0,time);if(event.should_commit()){ constoopobj=thread->current_park_blocker();if(time==0){ post_thread_park_event(&event,obj,min_jlong,min_jlong);}else{ if(isAbsolute!=0){ post_thread_park_event(&event,obj,min_jlong,time);}else{ post_thread_park_event(&event,obj,time,min_jlong);}}}HOTSPOT_THREAD_PARK_END((uintptr_t)thread->parker());}UNSAFE_ENDThread.hppçæ件ä¸å é¨å®ä¹çPark对象
private:Parker_parker;public:Parker*parker(){ return&_parker;}ä¸é¢æ¯Os_posix.cppä¸æ¯Linuxä¸å®ç°çParkçparkçå®ç°æ¹å¼
é¦å å°_counterçåééè¿CAS设置为0ï¼è¿åå°±æ§çå¼ï¼å¦æä¹åæ¯å¤§äº0ï¼å说ææ¯å 许访é®ï¼ä¸ç¨é»å¡ï¼ç´æ¥è¿åã
è·åå½å线ç¨ã
å¤æ线ç¨æ¯å¦æ¯ä¸æä¸ï¼å¦ææ¯ï¼åç´æ¥è¿åï¼(ä¹å°±æ¯è¯´çº¿ç¨å¤äºä¸æç¶æä¸ä¼å¿½ç¥parkï¼ä¸ä¼é»å¡çå¾ )
å¤æå¦æä¼ å ¥çtimeåæ°å°äº0 æè æ¯ç»å¯¹æ¶é´å¹¶ä¸timeæ¯0,åç´æ¥è¿å,(ä¸é¢çUnsafeè°ç¨parkä¼ å ¥çåæ°æ¯ falseã0ï¼æ以ä¸æ»¡è¶³è¿ç§æ åµ)
å¦ætime大äº0ï¼å转æ¢æç»å¯¹æ¶é´ã
å建ThreadBlockInVM对象ï¼å¹¶ä¸è°ç¨pthread_mutex_trylockè·å线ç¨äºæ¥éï¼å¦æ没æè·åå°éï¼åç´æ¥è¿åï¼
å¤æ_counteråéæ¯å¦å¤§äº0ï¼å¦ææ¯ï¼åéç½®_counter为0ï¼éæ¾çº¿ç¨éï¼ç´æ¥è¿åã
è°ç¨ OrderAccess::fence(); å å ¥å åå±éï¼ç¦æ¢æ令éæåºï¼ç¡®ä¿å éåéæ¾éçæ令ç顺åºã
å建OSThreadWaitState对象ï¼
å¤ætimeæ¯å¦å¤§äº0ï¼å¦ææ¯0ï¼åè°ç¨pthread_cond_waitè¿è¡çå¾ ï¼å¦æä¸æ¯0ï¼ç¶åè°ç¨pthread_cond_timedwaitè¿è¡æ¶é´åæ°ä¸ºabsTimeççå¾ ï¼
è°ç¨pthread_mutex_unlockè¿è¡éæ¾_mutexéï¼
å次è°ç¨OrderAccess::fence()ç¦æ¢æ令éæåºã
//Parker::parkdecrementscountif>0,elsedoesacondvarwait.Unpark//setscountto1andsignalscondvar.Onlyonethreadeverwaits//onthecondvar.Contentionseenwhentryingtoparkimpliesthatsomeone//isunparkingyou,sodon'twait.Andspuriousreturnsarefine,sothere//isnoneedtotracknotifications.voidParker::park(boolisAbsolute,jlongtime){ //Optionalfast-pathcheck://Returnimmediatelyifapermitisavailable.//WedependonAtomic::xchg()havingfullbarriersemantics//sincewearedoingalock-freeupdateto_counter.if(Atomic::xchg(&_counter,0)>0)return;JavaThread*jt=JavaThread::current();//Optionaloptimization--avoidstatetransitionsifthere's//aninterruptpending.if(jt->is_interrupted(false)){ return;}//Next,demultiplex/decodetimeargumentsstructtimespecabsTime;if(time<0||(isAbsolute&&time==0)){ //don'twaitatallreturn;}if(time>0){ to_abstime(&absTime,time,isAbsolute,false);}//Entersafepointregion//Bewareofdeadlockssuchas.//Theper-threadParker::mutexisaclassicleaf-lock.//InparticularathreadmustneverblockontheThreads_lockwhile//holdingtheParker::mutex.Ifsafepointsarependingboththe//theThreadBlockInVM()CTORandDTORmaygrabThreads_lock.ThreadBlockInVMtbivm(jt);//Can'taccessinterruptstatenowthatweare_thread_blocked.Ifwe've//beeninterruptedsincewecheckedabovethen_counterwillbe>0.//Don'twaitifcannotgetlocksinceinterferencearisesfrom//unparking.if(pthread_mutex_trylock(_mutex)!=0){ return;}intstatus;if(_counter>0){ //nowaitneeded_counter=0;status=pthread_mutex_unlock(_mutex);assert_status(status==0,status,"invariant");//Paranoiatoensureourlockedandlock-freepathsinteract//correctlywitheachotherandJava-levelaccesses.OrderAccess::fence();return;}OSThreadWaitStateosts(jt->osthread(),false/*notObject.wait()*/);assert(_cur_index==-1,"invariant");if(time==0){ _cur_index=REL_INDEX;//arbitrarychoicewhennottimedstatus=pthread_cond_wait(&_cond[_cur_index],_mutex);assert_status(status==0MACOS_ONLY(||status==ETIMEDOUT),status,"cond_wait");}else{ _cur_index=isAbsolute?ABS_INDEX:REL_INDEX;status=pthread_cond_timedwait(&_cond[_cur_index],_mutex,&absTime);assert_status(status==0||status==ETIMEDOUT,status,"cond_timedwait");}_cur_index=-1;_counter=0;status=pthread_mutex_unlock(_mutex);assert_status(status==0,status,"invariant");//Paranoiatoensureourlockedandlock-freepathsinteract//correctlywitheachotherandJava-levelaccesses.OrderAccess::fence();Linuxæä½ç³»ç»æ¯å¦ä½å®ç°pthread_cond_timedwaitè¿è¡æ¶é´çå¾ çpthread_cond_timedwaitå½æ°ä½äºglibcä¸pthread_cond_wait.c, å¯ä»¥çå°æ¯è°ç¨__pthread_cond_wait_commonå®ç°
/*See__pthread_cond_wait_common.*/int___pthread_cond_timedwait(pthread_cond_t*cond,pthread_mutex_t*mutex,conststruct__timespec*abstime){ /*Checkparametervalidity.ThisshouldalsotellthecompilerthatitcanassumethatabstimeisnotNULL.*/if(!valid_nanoseconds(abstime->tv_nsec))returnEINVAL;/*RelaxedMOissufficebecauseclockIDbitisonlymodifiedinconditioncreation.*/unsignedintflags=atomic_load_relaxed(&cond->__data.__wrefs);clockid_tclockid=(flags&__PTHREAD_COND_CLOCK_MONOTONIC_MASK)?CLOCK_MONOTONIC:CLOCK_REALTIME;return__pthread_cond_wait_common(cond,mutex,clockid,abstime);}ä¸é¢__pthread_cond_wait_commonæ¯å®ç°éè¿__futex_abstimed_wait_cancelableå®ç°æ¶é´çå¾
static__always_inlineint__pthread_cond_wait_common(pthread_cond_t*cond,pthread_mutex_t*mutex,clockid_tclockid,conststruct__timespec*abstime){ ''çç¥''`err=__futex_abstimed_wait_cancelable(cond->__data.__g_signals+g,0,clockid,abstime,private);''çç¥''`}__futex_abstimed_wait_cancelableæ¯è°ç¨__futex_abstimed_wait_common
int__futex_abstimed_wait_cancelable(unsignedint*futex_word,unsignedintexpected,clockid_tclockid,conststruct__timespec*abstime,intprivate){ return__futex_abstimed_wait_common(futex_word,expected,clockid,abstime,private,true);}__futex_abstimed_wait_commonä¸é¢åæ¯éè¿å¤æå¹³å°æ¯ä½æè ä½,è°ç¨__futex_abstimed_wait_commonæè __futex_abstimed_wait_common
staticint__futex_abstimed_wait_common(unsignedint*futex_word,unsignedintexpected,clockid_tclockid,conststruct__timespec*abstime,intprivate,boolcancel){ interr;unsignedintclockbit;/*Workaroundthefactthatthekernelrejectsnegativetimeoutvaluesdespitethembeingvalid.*/if(__glibc_unlikely((abstime!=NULL)&&(abstime->tv_sec<0)))returnETIMEDOUT;if(!lll_futex_supported_clockid(clockid))returnEINVAL;clockbit=(clockid==CLOCK_REALTIME)?FUTEX_CLOCK_REALTIME:0;intop=__lll_private_flag(FUTEX_WAIT_BITSET|clockbit,private);#ifdef__ASSUME_TIME_SYSCALLSerr=__futex_abstimed_wait_common(futex_word,expected,op,abstime,private,cancel);#elseboolneed_time=abstime!=NULL&&!in_time_t_range(abstime->tv_sec);if(need_time){ err=__futex_abstimed_wait_common(futex_word,expected,op,abstime,private,cancel);if(err==-ENOSYS)err=-EOVERFLOW;}elseerr=__futex_abstimed_wait_common(futex_word,expected,op,abstime,private,cancel);#endifswitch(err){ case0:case-EAGAIN:case-EINTR:case-ETIMEDOUT:case-EINVAL:case-EOVERFLOW:/*Passedabsolutetimeoutusesbittime_ttype,butunderlyingkerneldoesnotsupportbittime_tfutexsyscalls.*/return-err;case-EFAULT:/*Musthavebeencausedbyaglibcorapplicationbug.*/case-ENOSYS:/*Musthavebeencausedbyaglibcbug.*//*Noothererrorsaredocumentedatthistime.*/default:futex_fatal_error();}}__futex_abstimed_wait_commonæ¯è°ç¨INTERNAL_SYSCALL_CANCELå®å®ä¹å®ç°
staticint__futex_abstimed_wait_common(unsignedint*futex_word,unsignedintexpected,intop,conststruct__timespec*abstime,intprivate,boolcancel){ if(cancel)returnINTERNAL_SYSCALL_CANCEL(futex_time,futex_word,op,expected,abstime,NULL/*Unused.*/,FUTEX_BITSET_MATCH_ANY);elsereturnINTERNAL_SYSCALL_CALL(futex_time,futex_word,op,expected,abstime,NULL/*Ununsed.*/,FUTEX_BITSET_MATCH_ANY);}ç³»ç»è°ç¨ççå®å®ä¹
/***Blockscurrentthread,returningwhenabalancing*{ @codeunpark}occurs,orabalancing{ @codeunpark}has*alreadyoccurred,orthethreadisinterrupted,or,ifnot*absoluteandtimeisnotzero,thegiventimenanosecondshave*elapsed,orifabsolute,thegivendeadlineinmilliseconds*sinceEpochhaspassed,orspuriously(i.e.,returningforno*"reason").Note:ThisoperationisintheUnsafeclassonly*because{ @codeunpark}is,soitwouldbestrangetoplaceit*elsewhere.*/publicnativevoidpark(booleanisAbsolute,longtime);0æ»ç»ä¸»è¦å¯¹LockSupportçparkçå¾ å®ç°çåºå±å®ç°çæµ æï¼é对äºLinuxçç³»ç»è°ç¨è¿æ²¡ææ¾å°æºç ï¼åç»ä¼ç»§ç»è·è¸ªï¼å¸ææ读è ç¥éç满å¸å¯ä»¥åç¥ä¸ï¼è°¢è°¢ã
é¾æ¥ï¼/post/线程池中空闲的线程处于什么状态?
一:阻塞状态,线程并没有销毁,也没有得到CPU时间片执行;
源码追踪:
for (;;) {
...
workQueue.take();
...
}
public E take()...{
...
while (count.get() == 0) { / /这里就是任务队列中的消息数量
notEmpty.await();
}
...
}
public final void await()...{
...
LockSupport.park(this);
...
}
继续往下:
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
U.park(false, 0L);
setBlocker(t, null);
}
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
//线程调用该方法,线程将一直阻塞直到超时,或者是中断条件出现。
public native void park(boolean isAbsolute, long time);
上面就是java线程池中阻塞的源码追踪;
二.对比object的wait()方法:
@FastNative
public final native void wait(long timeout, int nanos) throws InterruptedException;
还有Thread的sleep() 方法:
@FastNative
private static native void sleep(Object lock, long millis, int nanos)throws...;
可见,线程池中使用的阻塞方式并不是Object中的wait(),也不是Thread.sleep() ;
这3个方法最终实现都是通过c&c++实现的native方法.
三.在<<Java虚拟机(第二版)>>中,对线程状态有以下介绍:
.4.3 状态转换
Java语言定义了5种线程状态,在任意一个时间点,一个线程只能有且只有其中的一种
状态,这5种状态分别如下。
1)新建(New):创建后尚未启动的线程处于这种状态。
2)运行(Runable):Runable包括了操作系统线程状态中的Running和Ready,也就是处于此
状态的线程有可能正在执行,也有可能正在等待着CPU为它分配执行时间。
3)无限期等待(Waiting):处于这种状态的线程不会被分配CPU执行时间,它们要等待被
其他线程显式地唤醒。以下方法会让线程陷入无限期的等待状态:
●没有设置Timeout参数的Object.wait()方法。
●没有设置Timeout参数的Thread.join()方法。
●LockSupport.park()方法。
4)限期等待(Timed Waiting):处于这种状态的线程也不会被分配CPU执行时间,不过无
须等待被其他线程显式地唤醒,在一定时间之后它们会由系统自动唤醒。以下方法会让线程
进入限期等待状态:
●Thread.sleep()方法。
●设置了Timeout参数的Object.wait()方法。
●设置了Timeout参数的Thread.join()方法。
●LockSupport.parkNanos()方法。
●LockSupport.parkUntil()方法。
5)阻塞(Blocked):线程被阻塞了,“阻塞状态”与“等待状态”的区别是:“阻塞状态”在等
待着获取到一个排他锁,这个事件将在另外一个线程放弃这个锁的时候发生;而“等待状
态”则是在等待一段时间,或者唤醒动作的发生。在程序等待进入同步区域的时候,线程将
进入这种状态。
结束(Terminated):已终止线程的线程状态,线程已经结束执行。