皮皮网

【压力支撑主源码】【登入器源码】【手册源码】forkjoin 源码

2025-01-11 22:19:53 来源:盒子桌面源码

1.forkjoin Դ?源码?
2.ForkjoinPool -1
3.解析Stream foreach源码

forkjoin 源码

forkjoin Դ??

       fork/join框架在Java并发包中扮演着重要角色,尤其在Java 8的源码并行流中。本文将深入剖析其设计思路、源码核心角色和实现机制。源码

       首先,源码fork/join的源码压力支撑主源码工作原理是将大任务分解成小任务,并利用多核处理。源码其特殊之处在于运用了work-stealing算法,源码通过双端队列分配任务,源码即使线程处理完一个任务,源码也能从其他未完成的源码任务中“窃取”以提高效率。

       核心角色包括ForkJoinPool,源码作为任务的源码管理者和线程容器,负责任务的源码提交和workerThread的管理。ForkJoinWorkerThread则是源码登入器源码实际执行任务的“工人”,处理队列中的任务,并通过work-stealing机制优化资源利用。WorkQueue是存放任务的双端队列,ForkJoinTask则定义了任务类型,分为有返回值和无返回值两种。

       在初始化阶段,ForkJoinPool通过ForkJoinWorkerThreadFactory创建线程,手册源码任务的提交逻辑分为首次提交和任务切分后提交。首次提交会确保队列的创建和加锁,任务切分则在workerThread中进行。任务的消费则由workerThread或非workerThread线程根据任务状态进行处理。

       至于任务的窃取,工作线程在run()方法中通过scan(WorkQueue, int r)函数实现,不断尝试从队列中“窃取”任务,minitools 源码直到找到或者遍历完所有队列。

       尽管文章只是概述,深入研究fork/join的源码是理解其内在机制的关键,这将有助于在实际开发中更有效地利用并发框架。

ForkjoinPool -1

        ForkJoin是用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。

        下面是一个是一个简单的Join/Fork计算过程,将1—数字相加

        通常这样个模型,你们会想到什么?

        Release Framework ? 常见的处理模型是什么? task pool - worker pool的模型。 但是Forkjoinpool 采取了完全不同的模型。

        ForkJoinPool一种ExecutorService的实现,运行ForkJoinTask任务。ForkJoinPool区别于其它ExecutorService,主要是因为它采用了一种工作窃取(work-stealing)的机制。所有被ForkJoinPool管理的线程尝试窃取提交到池子里的任务来执行,执行中又可产生子任务提交到池子中。

        ForkJoinPool维护了一个WorkQueue的数组(数组长度是2的整数次方,自动增长)。每个workQueue都有任务队列(ForkJoinTask的数组),并且用base、top指向任务队列队尾和队头。work-stealing机制就是工作线程挨个扫描任务队列,如果队列不为空则取队尾的任务并执行。示意图如下

        流程图:

        pool属性

        workQueues是pool的属性,它是WorkQueue类型的数组。externalPush和externalSubmit所创建的workQueue没有owner(即不是worker),且会被放到workQueues的偶数位置;而createWorker创建的workQueue(即worker)有owner,且会被放到workQueues的奇数位置。

        WorkQueue的几个重要成员变量说明如下:

        这是WorkQueue的config,高位跟pool的config值保持一致,而低位则是workQueue在workQueues数组的位置。

        从workQueues属性的介绍中,我们知道,不是所有workQueue都有worker,没有worker的workQueue称为公共队列(shared queue),config的第位就是用来判断是否是公共队列的。在externalSubmit创建工作队列时,有:

        q.config = k | SHARED_QUEUE;

        其中q是新创建的workQueue,k就是q在workQueues数组中的位置,SHARED_QUEUE=1<<,注意这里config没有保留mode的信息。

        而在registerWorker中,则是这样给workQueue的config赋值的:

        w.config = i | mode;

        w是新创建的workQueue,i是其在workQueues数组中的位置,没有设置SHARED_QUEUE标记位

        scanState是workQueue的属性,是int类型的。scanState的低位可以用来定位当前worker处于workQueues数组的哪个位置。每个worker在被创建时会在其构造函数中调用pool的registerWorker,而registerWorker会给scanState赋一个初始值,这个值是奇数,因为worker是由createWorker创建,并会被放到WorkQueues的奇数位置,而createWorker创建worker时会调用registerWorker。

        简言之,worker的scanState初始值是奇数,非worker的scanstate初始值=INACTIVE=1<<,小于0(非worker的workQueue在externalSubmit中创建)。

        当每次调用signalWork(或tryRelease)唤醒worker时,worker的高位就会加1

        另外,scanState<0表示worker未激活,当worker调用runtask执行任务时,scanState会被置为偶数,即设置scanState的最右边一位为0。

        worker休眠时,是这样存储的

        worker的唤醒类似这样:

        在worker休眠的4行伪码中,让ctl的低位的值变为worker.scanState,这样下次就可以通过scanState唤醒该worker。唤醒该worker时,把该worker的preStack设置为ctl低位的值,这样下下次唤醒的worker就是scanState等于该preStack的worker。

        这里通过preStack保存下一个worker,这个worker比当前worker更早地在等待,所以形成一个后进先出的栈。

        runState是int类型的值,控制整个pool的运行状态和生命周期,有下面几个值(可以好几个值同时存在):

        如果runState值为0,表示pool尚未初始化。

        RSLOCK表示锁定pool,当添加worker和pool终止时,就要使用RSLOCK锁定整个pool。如果由于runState被锁定,导致其他操作等待runState解锁(通常用wait进行等待),当runState设置了RSIGNAL,表示runState解锁,并通知(notifyAll)等待的操作。

        剩下4个值都跟runState生命周期有关,都可以顾名思义:

        当需要停止时,设置runState的STOP值,表示准备关闭,这样其他操作看到这个标记位,就不会继续操作,比如tryAddWorker看到STOP就不会再创建worker:

        而tryTerminate对这些生命周期状态的处理则是这样的:

        当前top和base的初始值为 INITIAL_QUEUE_CAPACITY >>>1= (1 << )>>>1 = /2。然后push一个task之后,top+=1,也就是说,top对应的位置是没有task的,最近push进来的task在top-1的位置。而base的位置则能对应到task,base对应最先放进队列的task,top-1对应最后放进队列的task。

        qlock值含义:1: locked, < 0: terminate; else 0

        即当qlock值位0时,可以正常操作,值=1时,表示锁定

        int SQMASK=0xe,则任何整数跟SQMASK位与后,得到的数就是偶数。

        证明:

        注意这里化为二进制是 ,尤其注意最右边第一位是0,任何数跟最右边第一位是0的数位与后,得到的数就是偶数,因为位与之后,第一位就是0,比如s=A&SQMASK,A可以是任意整数,然后把s按二进制进行多项式展开,则有s=2 n1+2 n2 ……+2^nn,这里n≥1,所以s可以被2整除,即s是偶数。

        所以一个数是奇数还是偶数,看其最右边第一位即可。

        我们知道workQueue有externalPush创建的和createWorker创建的worker,两种方式创建的workQueue,其放置到workQueues的位置是不同的,前者放到workQueue的偶数位置,而后者则放到奇数位置。不同workQueue找到自己在workQueues的位置的算法有点不同。

        下面看一下forkjoin框架获取workQueues中的偶数位置的workQueue的算法:

        这样就能获取workQueues的偶数位置的workQueue。m保证m & r & SQMASK这整个运算结果不会超出workQueues的下标,SQMASK保证取到的是偶数位置的workQueue。这里有一个有趣的现象,假设0到workQueues.length-1之间有n个偶数,m & r & SQMASK每次都能取到其中一个偶数,而且连续n次取到的偶数不会出现重复值,散列性非常好。而且是循环的,即1到n次取n个不同偶数,n+1到2n也是取n次不同偶数,此时n个偶数每个都被重新取一次。下面分析下r值有什么秘密,为何能保证这样的散列性

        ThreadLocalRandom内有一常量PROBE_INCREMENT = 0x9eb9,以及一个静态的probeGenerator =new AtomicInteger() ,然后每个线程的probe= probeGenerator.addAndGet(PROBE_INCREMENT)所以第一个线程的probe值是0x9eb9,第二个线程的值就是0x9eb9+0x9eb9,第三个线程的值就是0x9eb9+0x9eb9+0x9eb9以此类推,整个值是线性的,可以用y=kx表示,其中k=0x9eb9,x表示第几个线程。这样每个线程的probe可以保证不一样,而且具有很好的离散性。

        实际上,可以不用0x9eb9这个值,用任意一个奇数都是可以的,比如1。如果用1的话,probe+=1,这样每个线程的probe就都是不同的,而且具有很好的离散性。也就是说,假设有限制条件probe<n,超过n则产生溢出。则probe自加n次后才会开始出现重复值,n次前probe每次自加的值都不同。实际上用任意一个奇数,都可以保证probe自加n次后才会开始出现重复值,有兴趣可看本文最后附录部分。由于奇数的离散性,所以只要线程数小于m或者SQMASK两者中的最小值,则每个线程都能唯一地占据一个ws中的一个位置

        当一个操作是在非ForkjoinThread的线程中进行的,则称该操作为外部操作。比如我们前面执行pool.invoke,invoke内又执行externalPush。由于invoke是在非ForkjoinThread线程中进行的(这里是在main线程中进行),所以是一个外部操作,调用的是externalPush。之后task的执行是通过ForkJoinThread来执行的,所以task中的fork就是内部操作,调用的是push,把任务提交到工作队列。其实fork的实现是类似下面这样的:

        即fork会根据执行自身的线程是否是ForkJoinThread的实例来判断是处于外部还是内部。那为何要区分内外部?

        任何线程都可以使用ForkJoin框架,但是对于非ForkJoinThread的线程,它到底是怎样的,ForkJoin无法控制,也无法对其优化。因此区分出内外部,这样方便ForkJoin框架对任务的执行进行控制和优化

        forkJoinPool.invoke(task)是把任务放入工作队列,并等待任务执行。源码如下

        这里externalPush负责任务提交,externalPush源码如下:

解析Stream foreach源码

       本文深入解析Stream的foreach操作源码,主要关注串行流和并行流的区别,特别是源码168并行流背后的ForkJoin框架。

       在Stream中,操作可分为中间操作和结束操作,其中foreach属于结束操作。串行流与并行流的主要区别在于实现方式,串行流是线性执行,而并行流则利用了ForkJoin框架的分治策略。

       对于串行流(如`stream`),其执行过程如下:

       获取ReferencePipeline.Head的Stream实现,内部包含ArrayListSpliterator对象。

       通过ArrayListSpliterator的forEachRemaining方法逐一执行元素操作。

       而并行流(如`parallelStream`)则更为复杂:

       同样获取ReferencePipeline.Head的Stream实现,内部有ArrayListSpliterator。

       调用父类的forEach方法,构建一个ForEachTask。

       在ForEachTask的invoke方法中,调用compute方法,利用ForkJoin框架的分治策略将任务拆分到commonPool中的线程池执行。

       子任务通过拆分器的forEachRemaining方法,最终执行用户定义的action.accept(e)回调。

       ForkJoin框架是JDK7新增的,它通过线程池执行任务,尤其适用于并行处理。在并行流中,任务会分配到Java 8中预定义的commonPool,该线程池基于计算机处理器数量进行配置,以实现高效的并行计算。