本站提倡有节制游戏,合理安排游戏时间,注意劳逸结合。

【早晚安源码】【写jdk源码】【5050搭建源码】park 源码分析

2025-01-11 17:16:29 来源:焦点 分类:焦点

1.线程池中空闲的源码线程处于什么状态?
2.ListenableFuture源码解析
3.从HotSpot源码,深度解读 park 和 unpark
4.Java 中LockSupport类在C#中的实现
5.老生常谈线程基础的分析几个问题
6.LockSupport的park等待的底层实现

park 源码分析

线程池中空闲的线程处于什么状态?

       一:阻塞状态,线程并没有销毁,也没有得到CPU时间片执行;

       源码追踪:

       for (;;) {

       ...

        workQueue.take();

       ...

       }

       public E take()...{

       ...

       while (count.get() == 0) { / /这里就是任务队列中的消息数量

       notEmpty.await();

       }

       ...

       }

       public final void await()...{

       ...

       LockSupport.park(this);

       ...

       }

       继续往下:

       public static void park(Object blocker) {

       Thread t = Thread.currentThread();

       setBlocker(t, blocker);

       U.park(false, 0L);

       setBlocker(t, null);

       }

       private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();

       //线程调用该方法,线程将一直阻塞直到超时,源码或者是分析中断条件出现。

       public native void park(boolean isAbsolute,源码 long time);

       上面就是java线程池中阻塞的源码追踪;

       二.对比object的wait()方法:

       @FastNative

       public final native void wait(long timeout, int nanos) throws InterruptedException;

       还有Thread的sleep() 方法:

       @FastNative

       private static native void sleep(Object lock, long millis, int nanos)throws...;

       可见,线程池中使用的阻塞方式并不是Object中的wait(),也不是Thread.sleep() ;

       这3个方法最终实现都是通过c&c++实现的native方法.

       三.在<<Java虚拟机(第二版)>>中,对线程状态有以下介绍:

       .4.3 状态转换

       Java语言定义了5种线程状态,在任意一个时间点,分析早晚安源码一个线程只能有且只有其中的源码一种

       状态,这5种状态分别如下。分析

       1)新建(New):创建后尚未启动的源码线程处于这种状态。

       2)运行(Runable):Runable包括了操作系统线程状态中的分析Running和Ready,也就是源码处于此

       状态的线程有可能正在执行,也有可能正在等待着CPU为它分配执行时间。分析

       3)无限期等待(Waiting):处于这种状态的源码线程不会被分配CPU执行时间,它们要等待被

       其他线程显式地唤醒。分析以下方法会让线程陷入无限期的源码等待状态:

       ●没有设置Timeout参数的Object.wait()方法。

       ●没有设置Timeout参数的Thread.join()方法。

       ●LockSupport.park()方法。

       4)限期等待(Timed Waiting):处于这种状态的线程也不会被分配CPU执行时间,不过无

       须等待被其他线程显式地唤醒,在一定时间之后它们会由系统自动唤醒。以下方法会让线程

       进入限期等待状态:

       ●Thread.sleep()方法。

       ●设置了Timeout参数的Object.wait()方法。

       ●设置了Timeout参数的Thread.join()方法。

       ●LockSupport.parkNanos()方法。

       ●LockSupport.parkUntil()方法。

       5)阻塞(Blocked):线程被阻塞了,“阻塞状态”与“等待状态”的区别是:“阻塞状态”在等

       待着获取到一个排他锁,这个事件将在另外一个线程放弃这个锁的时候发生;而“等待状

       态”则是在等待一段时间,或者唤醒动作的发生。在程序等待进入同步区域的时候,线程将

       进入这种状态。

       结束(Terminated):已终止线程的线程状态,线程已经结束执行。

ListenableFuture源码解析

       ListenableFuture 是 spring 中对 JDK Future 接口的扩展,主要应用于解决在提交线程池的任务拿到 Future 后在 get 方法调用时会阻塞的问题。通过使用 ListenableFuture,可以向其注册回调函数(监听器),当任务完成时,写jdk源码触发回调。Promise 在 Netty 中也实现了类似的功能,用于处理类似 Future 的场景。

       实现 ListenableFuture 的关键在于 FutureTask 的源码解析。FutureTask 是实现 Future 接口的基础类,ListenableFutureTask 在其基础上做了扩展。其主要功能是在任务提交后,当调用 get 方法时能够阻塞当前业务线程,直到任务完成时唤醒。

       FutureTask 通过在内部实现一个轻量级的 Treiber stack 数据结构来管理等待任务完成的线程。这个数据结构由 WaitNode 节点组成,每个节点代表一个等待的线程。当业务线程调用 get 方法时,会将自己插入到 WaitNode 栈中,并且在插入的同时让当前线程进入等待状态。在任务执行完成后,会遍历 WaitNode 栈,唤醒等待的线程。

       为了确保并发安全,FutureTask 使用 CAS(Compare and Swap)操作来管理 WaitNode 栈。每个新插入的节点都会使用 CAS 操作与栈顶节点进行比较,并在满足条件时更新栈顶。这一过程保证了插入操作的原子性,防止了并发条件下的数据混乱。同时,插入操作与栈顶节点的更新操作相互交织,确保了数据的一致性和完整性。

       在 FutureTask 中,还利用了 LockSupport 类提供的 park 和 unpark 方法来实现线程的等待和唤醒。当线程插入到 WaitNode 栈中后,通过 park 方法将线程阻塞;任务执行完成后,通过 unpark 方法唤醒线程,完成等待与唤醒的流程。

       综上所述,ListenableFuture 通过扩展 FutureTask 的功能,实现了任务执行与线程等待的高效管理。通过注册监听器并利用 CAS 操作与 LockSupport 方法,5050搭建源码实现了在任务完成时通知回调,解决了异步任务执行时的线程阻塞问题,提高了程序的并发处理能力。

从HotSpot源码,深度解读 park 和 unpark

       我最近建立了一个在线自习室(App:番茄ToDO)用于相互监督学习,感兴趣的小伙伴可以加入。自习室加入码:D5A7A

       Java并发包下的类大多基于AQS(AbstractQueuedSynchronizer)框架实现,而AQS线程安全的实现依赖于两个关键类:Unsafe和LockSupport。

       其中,Unsafe主要提供CAS操作(关于CAS,在文章《读懂AtomicInteger源码(多线程专题)》中讲解过),LockSupport主要提供park/unpark操作。实际上,park/unpark操作的最终调用还是基于Unsafe类,因此Unsafe类才是核心。

       Unsafe类的实现是由native关键字说明的,这意味着这个方法是原生函数,是用C/C++语言实现的,并被编译成了DLL,由Java去调用。

       park函数的作用是将当前调用线程阻塞,而unpark函数则是唤醒指定线程。

       park是等待一个许可,unpark是为某线程提供一个许可。如果线程A调用park,除非另一个线程调用unpark(A)给A一个许可,否则线程A将阻塞在park操作上。每次调用一次park,需要有一个unpark来解锁。

       并且,unpark可以先于park调用,但不管unpark先调用多少次,都只提供一个许可,不可叠加。只需要一次park来消费掉unpark带来的许可,再次调用会阻塞。

       在Linux系统下,小黑鱼源码park和unpark是通过Posix线程库pthread中的mutex(互斥量)和condition(条件变量)来实现的。

       简单来说,mutex和condition保护了一个叫_counter的信号量。当park时,这个变量被设置为0,当unpark时,这个变量被设置为1。当_counter=0时线程阻塞,当_counter>0时直接设为0并返回。

       每个Java线程都有一个Parker实例,Parker类的部分源码如下:

       由源码可知,Parker类继承于PlatformParker,实际上是用Posix的mutex和condition来实现的。Parker类里的_counter字段,就是用来记录park和unpark是否需要阻塞的标识。

       具体的执行逻辑已经用注释标记在代码中,简要来说,就是检查_counter是不是大于0,如果是,则把_counter设置为0,返回。如果等于零,继续执行,阻塞等待。

       unpark直接设置_counter为1,再unlock mutex返回。如果_counter之前的值是0,则还要调用pthread_cond_signal唤醒在park中等待的线程。源码如下:

       (如果不会下载JVM源码可以后台回复“jdk”,获得下载压缩包)

Java 中LockSupport类在C#中的实现

          Java 之后提供优秀的并发库ncurrent Net中缺乏类似的功能 由于硬件体系发生了变化 多核时代来临 NET中缺乏并发类库显然不合时宜 缓解这一矛盾的其中一个办法就是在往C#中移植java的ncurrent

          java中的ncurrent包中提供了一个类LockSupport ncurrent包很多关键实现需要调用LockSupport 如果需要把java的ncurrent包迁移到C#中 LockSupport类的迁移是不可避免的问题

          在java中 LockSupport类有如下方法

          以下是引用片段

          

       

          public static void park(Object blocker) {   Thread t = Thread currentThread();   setBlocker(t  blocker);   unsafe park(false   L);   setBlocker(t  null);   }

          当一个线程调用LockSupport park之后 线程就会停下载 类似于Object wait 或者 NET中的System Threading Monitor Wait 但问题是java中的Object wait和 NET中的Monitor wait 都需要一个waitObject 这个问题曾经困扰我 为此翻了一遍JDK 实现源码 到最后发现的解决办法却是很简单 也无需了解JDK的底层实现源码

          以下是引用片段

          

          public class LockSupport   {   private static LocalDataStoreSlot slot = Thread GetNamedDataSlot ( LockSupport Park );   public static void Park(Object blocker)   {   Thread thread = Thread CurrentThread;   Thread SetData(slot  blocker);   lock (thread)   {   Monitor Wait(thread);   }   }   public static void Unpark(Thread thread)   {   if (thread == null) return;   lock (thread)   {   Monitor Pulse(thread);   }   }   }

lishixinzhi/Article/program/net//

老生常谈线程基础的几个问题

       实现线程只有一种方式

       我们知道启动线程至少可以通过以下四种方式:

       实现Runnable接口

       继承Thread类

       线程池创建线程

       带返回值的Callable创建线程

       但是看它们的底层就一种方式,就是通过newThread()实现,其他的只不过在它的上面做了层封装。

       实现Runnable接口要比继承Thread类的更好:

       结构上分工更明确,线程本身属性和任务逻辑解耦。

       某些情况下性能更好,直接把任务交给线程池执行,无需再次newThread()。typecho下载源码

       可拓展性更好:实现接口可以多个,而继承只能单继承。

       有的时候可能会问到启动线程为什么是start()方法,而不是run()方法,这个问题很简单,执行run()方法其实就是在执行一个类的普通方法,并没有启动一个线程,而start()方法点进去看是一个native方法。

       当我们在执行java中的start()方法的时候,它的底层会调JVM由c++编写的代码Thread::start,然后c++代码再调操作系统的create_thread创建线程,创建完线程以后并不会马上运行,要等待CPU的调度。CPU的调度算法有很多,比如先来先服务调度算法(FIFO),最短优先(就是对短作业的优先调度)、时间片轮转调度等。如下图所示:

线程的状态

       在Java中线程的生命周期中一共有6种状态。

       NEW:初始状态,线程被构建,但是还没有调用start方法

       RUNNABLE:运行状态,JAVA线程把操作系统中的就绪和运行两种状态统一称为运行中

       BLOCKED:阻塞状态,表示线程进入等待状态,也就是线程因为某种原因放弃了CPU使用权

       WAITING:等待状态

       TIMED_WAITING:超时等待状态,超时以后自动返回

       TERMINATED:终止状态,表示当前线程执行完毕

       当然这也不是我说的,源码中就是这么定义的:

publicenumState{ /***Threadstateforathreadwhichhasnotyetstarted.*/NEW,/***Threadstateforarunnablethread.Athreadintherunnable*stateisexecutingintheJavavirtualmachinebutitmay*bewaitingforotherresourcesfromtheoperatingsystem*suchasprocessor.*/RUNNABLE,/***Threadstateforathreadblockedwaitingforamonitorlock.*Athreadintheblockedstateiswaitingforamonitorlock*toenterasynchronizedblock/methodor*reenterasynchronizedblock/methodaftercalling*{ @linkObject#wait()Object.wait}.*/BLOCKED,/***Threadstateforawaitingthread.*Athreadisinthewaitingstateduetocallingoneofthe*followingmethods:*<ul>*<li>{ @linkObject#wait()Object.wait}withnotimeout</li>*<li>{ @link#join()Thread.join}withnotimeout</li>*<li>{ @linkLockSupport#park()LockSupport.park}</li>*</ul>**<p>Athreadinthewaitingstateiswaitingforanotherthreadto*performaparticularaction.**Forexample,athreadthathascalled<tt>Object.wait()</tt>*onanobjectiswaitingforanotherthreadtocall*<tt>Object.notify()</tt>or<tt>Object.notifyAll()</tt>on*thatobject.Athreadthathascalled<tt>Thread.join()</tt>*iswaitingforaspecifiedthreadtoterminate.*/WAITING,/***Threadstateforawaitingthreadwithaspecifiedwaitingtime.*Athreadisinthetimedwaitingstateduetocallingoneof*thefollowingmethodswithaspecifiedpositivewaitingtime:*<ul>*<li>{ @link#sleepThread.sleep}</li>*<li>{ @linkObject#wait(long)Object.wait}withtimeout</li>*<li>{ @link#join(long)Thread.join}withtimeout</li>*<li>{ @linkLockSupport#parkNanosLockSupport.parkNanos}</li>*<li>{ @linkLockSupport#parkUntilLockSupport.parkUntil}</li>*</ul>*/TIMED_WAITING,/***Threadstateforaterminatedthread.*Thethreadhascompletedexecution.*/TERMINATED;}

       下面是这六种状态的转换:

New新创建

       New表示线程被创建但尚未启动的状态:当我们用newThread()新建一个线程时,如果线程没有开始调用start()方法,那么此时它的状态就是New。而一旦线程调用了start(),它的状态就会从New变成Runnable。

Runnable运行状态

       Java中的Runable状态对应操作系统线程状态中的两种状态,分别是Running和Ready,也就是说,Java中处于Runnable状态的线程有可能正在执行,也有可能没有正在执行,正在等待被分配CPU资源。

       如果一个正在运行的线程是Runnable状态,当它运行到任务的一半时,执行该线程的CPU被调度去做其他事情,导致该线程暂时不运行,它的状态依然不变,还是Runnable,因为它有可能随时被调度回来继续执行任务。

       在Java中Blocked、Waiting、TimedWaiting,这三种状态统称为阻塞状态,下面分别来看下。

Blocked

       从上图可以看出,从Runnable状态进入Blocked状态只有一种可能,就是进入synchronized保护的代码时没有抢到monitor锁,jvm会把当前的线程放入到锁池中。当处于Blocked的线程抢到monitor锁,就会从Blocked状态回到Runnable状态。

Waiting状态

       我们看上图,线程进入Waiting状态有三种可能。

       没有设置Timeout参数的Object.wait()方法,jvm会把当前线程放入到等待队列。

       没有设置Timeout参数的Thread.join()方法。

       LockSupport.park()方法。

       Blocked与Waiting的区别是Blocked在等待其他线程释放monitor锁,而Waiting则是在等待某个条件,比如join的线程执行完毕,或者是notify()/notifyAll()。

       当执行了LockSupport.unpark(),或者join的线程运行结束,或者被中断时可以进入Runnable状态。当调用notify()或notifyAll()来唤醒它,它会直接进入Blocked状态,因为唤醒Waiting状态的线程能够调用notify()或notifyAll(),肯定是已经持有了monitor锁,这时候处于Waiting状态的线程没有拿到monitor锁,就会进入Blocked状态,直到执行了notify()/notifyAll()唤醒它的线程执行完毕并释放monitor锁,才可能轮到它去抢夺这把锁,如果它能抢到,就会从Blocked状态回到Runnable状态。

TimedWaiting状态

       在Waiting上面是TimedWaiting状态,这两个状态是非常相似的,区别仅在于有没有时间限制,TimedWaiting会等待超时,由系统自动唤醒,或者在超时前被唤醒信号唤醒。

       以下情况会让线程进入TimedWaiting状态。

       设置了时间参数的Thread.sleep(longmillis)方法。

       设置了时间参数的Object.wait(longtimeout)方法。

       设置了时间参数的Thread.join(longmillis)方法。

       设置了时间参数的LockSupport.parkNanos(longnanos)。

       LockSupport.parkUntil(longdeadline)方法。

       在TimedWaiting中执行notify()和notifyAll()也是一样的道理,它们会先进入Blocked状态,然后抢夺锁成功后,再回到Runnable状态。当然,如果它的超时时间到了且能直接获取到锁/join的线程运行结束/被中断/调用了LockSupport.unpark(),会直接恢复到Runnable状态,而无需经历Blocked状态。

Terminated终止

       Terminated终止状态,要想进入这个状态有两种可能。

       run()方法执行完毕,线程正常退出。

       出现一个没有捕获的异常,终止了run()方法,最终导致意外终止。

线程的停止interrupt

       我们知道Thread提供了线程的一些操作方法,比如stop(),suspend()和resume(),这些方法已经被Java直接标记为@Deprecated,这就说明这些方法是不建议大家使用的。

       因为stop()会直接把线程停止,这样就没有给线程足够的时间来处理想要在停止前保存数据的逻辑,任务戛然而止,会导致出现数据完整性等问题。这种行为类似于在linux系统中执行kill-9类似,它是一种不安全的操作。

       而对于suspend()和resume()而言,它们的问题在于如果线程调用suspend(),它并不会释放锁,就开始进入休眠,但此时有可能仍持有锁,这样就容易导致死锁问题,因为这把锁在线程被resume()之前,是不会被释放的。

interrupt

       最正确的停止线程的方式是使用interrupt,但interrupt仅仅起到通知被停止线程的作用。而对于被停止的线程而言,它拥有完全的自主权,它既可以选择立即停止,也可以选择一段时间后停止,也可以选择压根不停止。

       下面我们来看下例子:

publicclassInterruptExampleimplementsRunnable{ //interrupt相当于定义一个volatile的变量//volatilebooleanflag=false;publicstaticvoidmain(String[]args)throwsInterruptedException{ Threadt1=newThread(newInterruptExample());t1.start();Thread.sleep(5);//Main线程来决定t1线程的停止,发送一个中断信号,中断标记变为truet1.interrupt();}@Overridepublicvoidrun(){ while(!Thread.currentThread().isInterrupted()){ System.out.println(Thread.currentThread().getName()+"--");}}}

       执行一下,运行了一会就停止了

       主线程在调用t1的interrupt()之后,这个线程的中断标记位就会被设置成true。每个线程都有这样的标记位,当线程执行时,会定期检查这个标记位,如果标记位被设置成true,就说明有程序想终止该线程。在while循环体判断语句中,通过Thread.currentThread().isInterrupt()判断线程是否被中断,如果被置为true了,则跳出循环,线程就结束了,这个就是interrupt的简单用法。

阻塞状态下的线程中断

       下面来看第二个例子,在循环中加了Thread.sleep秒。

publicclassInterruptSleepExampleimplementsRunnable{ //interrupt相当于定义一个volatile的变量//volatilebooleanflag=false;publicstaticvoidmain(String[]args)throwsInterruptedException{ Threadt1=newThread(newInterruptSleepExample());t1.start();Thread.sleep(5);//Main线程来决定t1线程的停止,发送一个中断信号,中断标记变为truet1.interrupt();}@Overridepublicvoidrun(){ while(!Thread.currentThread().isInterrupted()){ try{ Thread.sleep();}catch(InterruptedExceptione){ //中断标记变为falsee.printStackTrace();}System.out.println(Thread.currentThread().getName()+"--");}}}

       再来看下运行结果,卡主了,并没有停止。这是因为main线程调用了t1.interrupt(),此时t1正在sleep中,这时候是接收不到中断信号的,要sleep结束以后才能收到。这样的中断太不及时了,我让你中断了,你缺还在傻傻的sleep中。

       Java开发的设计者已经考虑到了这一点,sleep、wait等方法可以让线程进入阻塞的方法使线程休眠了,而处于休眠中的线程被中断,那么线程是可以感受到中断信号的,并且会抛出一个InterruptedException异常,同时清除中断信号,将中断标记位设置成false。

       这时候有几种做法:

       直接捕获异常,不做处理,e.printStackTrace();打印下信息

       将异常往外抛出,即在方法上throwsInterruptedException

       再次中断,代码如下,加上Thread.currentThread().interrupt();

@Overridepublicvoidrun(){ while(!Thread.currentThread().isInterrupted()){ try{ Thread.sleep();}catch(InterruptedExceptione){ //中断标记变为falsee.printStackTrace();//把中断标记修改为trueThread.currentThread().interrupt();}System.out.println(Thread.currentThread().getName()+"--");}}

       这时候线程感受到了,我们人为的再把中断标记修改为true,线程就能停止了。一般情况下我们操作线程很少会用到interrupt,因为大多数情况下我们用的是线程池,线程池已经帮我封装好了,但是这方面的知识还是需要掌握的。感谢收看,多多点赞~

       作者:小杰博士

LockSupport的park等待的底层实现

       ä»Žä¸Šä¸€ç¯‡æ–‡ç« ä¸­çš„JDK的延迟队列中,最终是通过LockSupport.park实现线程的等待,那么底层是如何实现等待和超时等待的?本文我们来探讨一下。

LockSupport的park和unpark的方法publicstaticvoidpark(){ UNSAFE.park(false,0L);}publicstaticvoidparkNanos(longnanos){ if(nanos>0)UNSAFE.park(false,nanos);}publicstaticvoidunpark(Threadthread){ if(thread!=null)UNSAFE.unpark(thread);}

       ä»Žä¸Šé¢å¯ä»¥çœ‹åˆ°å®žé™…LockSupport.park是通过Unsafe的的park方法实现,从下面的方法可以看出这个是一个native方法.

/***Blockscurrentthread,returningwhenabalancing*{ @codeunpark}occurs,orabalancing{ @codeunpark}has*alreadyoccurred,orthethreadisinterrupted,or,ifnot*absoluteandtimeisnotzero,thegiventimenanosecondshave*elapsed,orifabsolute,thegivendeadlineinmilliseconds*sinceEpochhaspassed,orspuriously(i.e.,returningforno*"reason").Note:ThisoperationisintheUnsafeclassonly*because{ @codeunpark}is,soitwouldbestrangetoplaceit*elsewhere.*/publicnativevoidpark(booleanisAbsolute,longtime);JVM的Unsafe的park方法

       ä»Žä¸‹é¢JDK中代码中可以thread的Parker的对象的park方法进行一段时间的等待。

UNSAFE_ENTRY(void,Unsafe_Park(JNIEnv*env,jobjectunsafe,jbooleanisAbsolute,jlongtime)){ HOTSPOT_THREAD_PARK_BEGIN((uintptr_t)thread->parker(),(int)isAbsolute,time);EventThreadParkevent;JavaThreadParkedStatejtps(thread,time!=0);thread->parker()->park(isAbsolute!=0,time);if(event.should_commit()){ constoopobj=thread->current_park_blocker();if(time==0){ post_thread_park_event(&event,obj,min_jlong,min_jlong);}else{ if(isAbsolute!=0){ post_thread_park_event(&event,obj,min_jlong,time);}else{ post_thread_park_event(&event,obj,time,min_jlong);}}}HOTSPOT_THREAD_PARK_END((uintptr_t)thread->parker());}UNSAFE_END

       Thread.hpp的文件中内部定义的Park对象

private:Parker_parker;public:Parker*parker(){ return&_parker;}

       ä¸‹é¢æ˜¯Os_posix.cpp中是Linux中实现的Park的park的实现方式

       é¦–先将_counter的变量通过CAS设置为0,返回就旧的值,如果之前是大于0,则说明是允许访问,不用阻塞,直接返回。

       èŽ·å–当前线程。

       åˆ¤æ–­çº¿ç¨‹æ˜¯å¦æ˜¯ä¸­æ–­ä¸­ï¼Œå¦‚果是,则直接返回,(也就是说线程处于中断状态下会忽略park,不会阻塞等待)

       åˆ¤æ–­å¦‚果传入的time参数小于0 或者 是绝对时间并且time是0,则直接返回,(上面的Unsafe调用park传入的参数是 false、0,所以不满足这种情况)

       å¦‚æžœtime大于0,则转换成绝对时间。

       åˆ›å»ºThreadBlockInVM对象,并且调用pthread_mutex_trylock获取线程互斥锁,如果没有获取到锁,则直接返回,

       åˆ¤æ–­_counter变量是否大于0,如果是,则重置_counter为0,释放线程锁,直接返回。

       è°ƒç”¨ OrderAccess::fence(); 加入内存屏障,禁止指令重排序,确保加锁和释放锁的指令的顺序。

       åˆ›å»ºOSThreadWaitState对象,

       åˆ¤æ–­time是否大于0,如果是0,则调用pthread_cond_wait进行等待,如果不是0,然后调用pthread_cond_timedwait进行时间参数为absTime的等待,

       è°ƒç”¨pthread_mutex_unlock进行释放_mutex锁,

       å†æ¬¡è°ƒç”¨OrderAccess::fence()禁止指令重排序。

//Parker::parkdecrementscountif>0,elsedoesacondvarwait.Unpark//setscountto1andsignalscondvar.Onlyonethreadeverwaits//onthecondvar.Contentionseenwhentryingtoparkimpliesthatsomeone//isunparkingyou,sodon'twait.Andspuriousreturnsarefine,sothere//isnoneedtotracknotifications.voidParker::park(boolisAbsolute,jlongtime){ //Optionalfast-pathcheck://Returnimmediatelyifapermitisavailable.//WedependonAtomic::xchg()havingfullbarriersemantics//sincewearedoingalock-freeupdateto_counter.if(Atomic::xchg(&_counter,0)>0)return;JavaThread*jt=JavaThread::current();//Optionaloptimization--avoidstatetransitionsifthere's//aninterruptpending.if(jt->is_interrupted(false)){ return;}//Next,demultiplex/decodetimeargumentsstructtimespecabsTime;if(time<0||(isAbsolute&&time==0)){ //don'twaitatallreturn;}if(time>0){ to_abstime(&absTime,time,isAbsolute,false);}//Entersafepointregion//Bewareofdeadlockssuchas.//Theper-threadParker::mutexisaclassicleaf-lock.//InparticularathreadmustneverblockontheThreads_lockwhile//holdingtheParker::mutex.Ifsafepointsarependingboththe//theThreadBlockInVM()CTORandDTORmaygrabThreads_lock.ThreadBlockInVMtbivm(jt);//Can'taccessinterruptstatenowthatweare_thread_blocked.Ifwe've//beeninterruptedsincewecheckedabovethen_counterwillbe>0.//Don'twaitifcannotgetlocksinceinterferencearisesfrom//unparking.if(pthread_mutex_trylock(_mutex)!=0){ return;}intstatus;if(_counter>0){ //nowaitneeded_counter=0;status=pthread_mutex_unlock(_mutex);assert_status(status==0,status,"invariant");//Paranoiatoensureourlockedandlock-freepathsinteract//correctlywitheachotherandJava-levelaccesses.OrderAccess::fence();return;}OSThreadWaitStateosts(jt->osthread(),false/*notObject.wait()*/);assert(_cur_index==-1,"invariant");if(time==0){ _cur_index=REL_INDEX;//arbitrarychoicewhennottimedstatus=pthread_cond_wait(&_cond[_cur_index],_mutex);assert_status(status==0MACOS_ONLY(||status==ETIMEDOUT),status,"cond_wait");}else{ _cur_index=isAbsolute?ABS_INDEX:REL_INDEX;status=pthread_cond_timedwait(&_cond[_cur_index],_mutex,&absTime);assert_status(status==0||status==ETIMEDOUT,status,"cond_timedwait");}_cur_index=-1;_counter=0;status=pthread_mutex_unlock(_mutex);assert_status(status==0,status,"invariant");//Paranoiatoensureourlockedandlock-freepathsinteract//correctlywitheachotherandJava-levelaccesses.OrderAccess::fence();Linux操作系统是如何实现pthread_cond_timedwait进行时间等待的

       pthread_cond_timedwait函数位于glibc中pthread_cond_wait.c, 可以看到是调用__pthread_cond_wait_common实现

/*See__pthread_cond_wait_common.*/int___pthread_cond_timedwait(pthread_cond_t*cond,pthread_mutex_t*mutex,conststruct__timespec*abstime){ /*Checkparametervalidity.ThisshouldalsotellthecompilerthatitcanassumethatabstimeisnotNULL.*/if(!valid_nanoseconds(abstime->tv_nsec))returnEINVAL;/*RelaxedMOissufficebecauseclockIDbitisonlymodifiedinconditioncreation.*/unsignedintflags=atomic_load_relaxed(&cond->__data.__wrefs);clockid_tclockid=(flags&__PTHREAD_COND_CLOCK_MONOTONIC_MASK)?CLOCK_MONOTONIC:CLOCK_REALTIME;return__pthread_cond_wait_common(cond,mutex,clockid,abstime);}

       ä¸‹é¢__pthread_cond_wait_common是实现通过__futex_abstimed_wait_cancelable实现时间等待

static__always_inlineint__pthread_cond_wait_common(pthread_cond_t*cond,pthread_mutex_t*mutex,clockid_tclockid,conststruct__timespec*abstime){ ''省略''`err=__futex_abstimed_wait_cancelable(cond->__data.__g_signals+g,0,clockid,abstime,private);''省略''`}

       __futex_abstimed_wait_cancelable是调用__futex_abstimed_wait_common

int__futex_abstimed_wait_cancelable(unsignedint*futex_word,unsignedintexpected,clockid_tclockid,conststruct__timespec*abstime,intprivate){ return__futex_abstimed_wait_common(futex_word,expected,clockid,abstime,private,true);}

       __futex_abstimed_wait_common下面则是通过判断平台是位或者位,调用__futex_abstimed_wait_common或者__futex_abstimed_wait_common

staticint__futex_abstimed_wait_common(unsignedint*futex_word,unsignedintexpected,clockid_tclockid,conststruct__timespec*abstime,intprivate,boolcancel){ interr;unsignedintclockbit;/*Workaroundthefactthatthekernelrejectsnegativetimeoutvaluesdespitethembeingvalid.*/if(__glibc_unlikely((abstime!=NULL)&&(abstime->tv_sec<0)))returnETIMEDOUT;if(!lll_futex_supported_clockid(clockid))returnEINVAL;clockbit=(clockid==CLOCK_REALTIME)?FUTEX_CLOCK_REALTIME:0;intop=__lll_private_flag(FUTEX_WAIT_BITSET|clockbit,private);#ifdef__ASSUME_TIME_SYSCALLSerr=__futex_abstimed_wait_common(futex_word,expected,op,abstime,private,cancel);#elseboolneed_time=abstime!=NULL&&!in_time_t_range(abstime->tv_sec);if(need_time){ err=__futex_abstimed_wait_common(futex_word,expected,op,abstime,private,cancel);if(err==-ENOSYS)err=-EOVERFLOW;}elseerr=__futex_abstimed_wait_common(futex_word,expected,op,abstime,private,cancel);#endifswitch(err){ case0:case-EAGAIN:case-EINTR:case-ETIMEDOUT:case-EINVAL:case-EOVERFLOW:/*Passedabsolutetimeoutusesbittime_ttype,butunderlyingkerneldoesnotsupportbittime_tfutexsyscalls.*/return-err;case-EFAULT:/*Musthavebeencausedbyaglibcorapplicationbug.*/case-ENOSYS:/*Musthavebeencausedbyaglibcbug.*//*Noothererrorsaredocumentedatthistime.*/default:futex_fatal_error();}}

       __futex_abstimed_wait_common是调用INTERNAL_SYSCALL_CANCEL宏定义实现

staticint__futex_abstimed_wait_common(unsignedint*futex_word,unsignedintexpected,intop,conststruct__timespec*abstime,intprivate,boolcancel){ if(cancel)returnINTERNAL_SYSCALL_CANCEL(futex_time,futex_word,op,expected,abstime,NULL/*Unused.*/,FUTEX_BITSET_MATCH_ANY);elsereturnINTERNAL_SYSCALL_CALL(futex_time,futex_word,op,expected,abstime,NULL/*Ununsed.*/,FUTEX_BITSET_MATCH_ANY);}

       ç³»ç»Ÿè°ƒç”¨çš„的宏定义

/***Blockscurrentthread,returningwhenabalancing*{ @codeunpark}occurs,orabalancing{ @codeunpark}has*alreadyoccurred,orthethreadisinterrupted,or,ifnot*absoluteandtimeisnotzero,thegiventimenanosecondshave*elapsed,orifabsolute,thegivendeadlineinmilliseconds*sinceEpochhaspassed,orspuriously(i.e.,returningforno*"reason").Note:ThisoperationisintheUnsafeclassonly*because{ @codeunpark}is,soitwouldbestrangetoplaceit*elsewhere.*/publicnativevoidpark(booleanisAbsolute,longtime);0总结

       ä¸»è¦å¯¹LockSupport的park等待实现的底层实现的浅析,针对于Linux的系统调用还没有找到源码,后续会继续跟踪,希望有读者知道的满帆可以告知下,谢谢。

链接:/post/

Rust并发:标准库sync::Once源码分析

       一次初始化同步原语Once,其核心功能在于确保闭包仅被执行一次。常见应用包括FFI库初始化、静态变量延迟初始化等。

       标准库中的Once实现更为复杂,其关键在于如何高效地模拟Mutex阻塞与唤醒机制。这一机制依赖于线程暂停和唤醒原语thread::park/unpark,它们是实现多线程同步对象如Mutex、Condvar等的基础。

       具体实现中,Once维护四个内部状态,状态与等待队列头指针共同存储于AtomicUsize中,利用4字节对齐优化空间。

       构造Once实例时,初始化状态为Incomplete。调用Once::call_once或Once::call_once_force时,分别检查是否已完成初始化,未完成则执行闭包,闭包执行路径标记为冷路径以节省资源,同时避免泛型导致的代码膨胀。

       闭包执行逻辑由Once::call_inner负责,线程尝试获取执行权限,未能获取则进入等待状态,获取成功后执行闭包,结束后唤醒等待线程。

       等待队列通过无锁侵入式链表实现,节点在栈上分配,以优化内存使用。Once::wait函数实现等待线程逻辑,WaiterQueue的drop方法用于唤醒所有等待线程,需按特定顺序操作栈节点,以避免use after free等潜在问题。

       思考题:如何在实际项目中利用Once实现资源安全共享?如何评估Once与Mutex等同步原语在不同场景下的性能差异?

相关推荐
一周热点