博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java 中的Fork/Join框架
阅读量:6424 次
发布时间:2019-06-23

本文共 21914 字,大约阅读时间需要 73 分钟。

hot3.png

什么是Fork/Join框架

Fork/Join框架是一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+。。+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。Fork/Join的运行流程图如下: ForkJoin框架流程

涉及到的类主要有:

  • ForkJoinPool:线程池,实现抽象类AbstractExecutorService(实现了ExecutorService
    • 负责维护全部的工作线程
    • 接收调用者分配的task
    • 本身持有一个全局的task队列
    • 实现任务窃取
  • ForkJoinWorkerThread:ForkJoinPool线程池中的worker线程,具体执行task。其中保存着对所在线程池的引用。
  • ForkJoinTask接口。task的抽象。
    • RecursiveTask:task执行完成后带返回值的task。
    • RecursiveAction:不带返回值的task。

ForkJoin框架能满足的需求

如果一个任务的问题集能被拆分,并且组合多个子任务的结果就能获取结果,那么这个问题就适合使用ForkJoin框架解决问题。例如:从数组中查找最大数,划分为查找局部最大数;

工作窃取

ForkJoin核心点:工作窃取工作窃取

工作窃取使得较空闲的线程可以帮助繁忙线程,而不是在空闲等待状态,让整个系统更快的解决问题集合。特别是每个线程处理的问题子集的大小是无法预估的情况下(这种情况下可能出现有些线程很繁忙,而有些比较空闲,在等待其它子任务完成才能算出最终结果。)

每个工作线程都有自己的工作队列,这是使用双端队列(或者叫做 deque)来实现的(Java 6 在类库中添加了几种 deque 实现,包括 ArrayDequeLinkedBlockingDeque)。

标准队列和双端队列实现工作窃取对比: 可以使用标准队列实现工作窃取,但是与标准队列相比,deque 具有两方面的优势:减少争用和窃取。因为只有工作线程会访问自身的 deque 的头部,deque 头部永远不会发生争用;因为只有当一个线程空闲时才会访问 deque 的尾部,所以也很少存在线程的 deque 尾部的争用(在 fork-join 框架中结合 deque 实现会使这些访问模式进一步减少协调成本)。

跟传统的基于线程池的方法相比,减少争用会大大降低同步成本。此外,这种方法暗含的后进先出(last-in-first-out,LIFO)任务排队机制意味着最大的任务排在队列的尾部,当另一个线程需要窃取任务时,它将得到一个大任务(能够分解成多个小任务的任务),从而避免了在未来窃取任务。因此,工作窃取实现了合理的负载平衡,无需进行协调并且将同步成本降到了最小。

工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

关键属性

ForkJoinTask
[] submissionQueue; // java.util.concurrent.ForkJoinPool// Pool的task队列 初始容量为8192 由于submissionQueue是环形队列 而作者使用了特殊的求余算法 导致的容量必须为2的幂次方 // 通过`java.util.concurrent.ForkJoinPool#growSubmissionQueue()`方法拓展 调用者线程submit的task都会提交到这个队列// 然后唤醒worker去该队列steal task 如果在worker线程调用submit提交直接提交调用提交方法的worker线程的task队列中ForkJoinTask
[] queue; // java.util.concurrent.ForkJoinWorkerThread // Worker的task队列 初始容量为8192 基本上与submissionQueue的维护方式一样 不过只能通过在worker线程调用fork()才能将// task添加到这个队列中volatile int queueBase; // java.util.concurrent.ForkJoinPool & java.util.concurrent.ForkJoinWorkerThread// 队列尾部索引 task窃取时会更改此值//由于几个线程可能同时访问 所以修饰符是volatileint queueTop; // java.util.concurrent.ForkJoinPool & java.util.concurrent.ForkJoinWorkerThread// task push与pop时会更改的索引 根据上面对队列属性的描述会发觉所有入队操作都是由"一个线程"来完成的(调用者线程往pool中// push task 而worker线程自己给自己fork task) 所以其是非线程安全的 另 submissionLock 保证了向pool中提交task的安全性// 但是这个保护只是防止调用者作死而存在的(比如并发往pool中提交task) 如果保证调用者单线程入数据 则不需要这个锁volatile int status; // java.util.concurrent.ForkJoinTask// task执行的状态 对应本类的四个状态常量 已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出现异常(EXCEPTIONAL)

框架入口

提交任务

由于ForkJoinPool实现了ExecutorService,也就是支持通过submitexecute两种方法提交任务。

submitexecute方法中,可能需要使用ForkJoinTask.adapt()方法将RunnableCallable方法包装成ForkJoinTask类型的Job。 核型提交由forkOrSubmit完成。

源码:

private 
void forkOrSubmit(ForkJoinTask
task) { ForkJoinWorkerThread w; Thread t = Thread.currentThread(); if (shutdown) throw new RejectedExecutionException(); // 已经被shutdown后则不再接收新的task 与传统线程池不同的是task最大数量是由ForkJoin自行管理的 外部不可更改 // 环形队列把这点限制死了 不过也不错 if ((t instanceof ForkJoinWorkerThread) && (w = (ForkJoinWorkerThread)t).pool == this) w.pushTask(task); // 如果提交task的线程是worker线程并且属于当前的pool 则直接将task添加到这个worker中 //(即:调用着直接将task提交到具体的worker线程,而不是提交给线程池) // 这个方法由于面向具体的线程 所以不需要锁 else addSubmission(task); // 否则将task添加到pool队列中}

如果当前线程是ForkJoinWorkerThread类型的,则将任务追加到ForkJoinWorkerThread对象中维护的队列上,否则将新的任务放入ForkJoinPool的提交队列中,并通知线程工作。

pushTask方法实现原理在下面谈ForkJoinTask的fork方法实现原理时一起说。

简化提交方式

public 
T invoke(ForkJoinTask
task) { Thread t = Thread.currentThread(); if (task == null) throw new NullPointerException(); if (shutdown) throw new RejectedExecutionException(); if ((t instanceof ForkJoinWorkerThread) && ((ForkJoinWorkerThread)t).pool == this) return task.invoke(); // 如果提交task的线程对象是当前pool中的worker 则直接让当前worker自己处理task else { addSubmission(task); // 所有非worker提交的task全部由pool保存 return task.join(); //等待task执行完毕 }}

调用者线程流程

即调用了提交接口的线程,在task提交后的主要流程。 入口处不管是submit,execute方法提交task,还是使用invoke直接提交task,如果是提交给线程池,那么都会进入addSubmission方法。

private void addSubmission(ForkJoinTask
t) { final ReentrantLock lock = this.submissionLock; // 这里这个锁是防止调用者乱搞 task的生成大多都在worker中发生 //并且不会使用ForkJoinPool.addSubmission方法来实现生成task, 所以调用这个方法的只有调用者 lock.lock(); try { ForkJoinTask
[] q; int s, m; if ((q = submissionQueue) != null) { long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE; // 获取top对应的数组位置(内存地址) UNSAFE.putOrderedObject(q, u, t); // 将新的task添加到队列中 queueTop = s + 1;//更新队列头的位置 if (s - queueBase == m) growSubmissionQueue(); // 在添加之后检查队列长度是否到极限 如果是则扩容 } } finally { lock.unlock(); } signalWork(); // 唤醒工作线程}

方法结尾处会唤醒(或创建) worker线程 (创建用addWorker()) 而worker线程被创建之后 就会不断的调用scan方法去窃取其他worker或pool中的task 直到全部task结束 这里需要特别说明的地方有2点:

  1. (s = queueTop) & (m = q.length-1) ,相当于queueTop % (q.length - 1),返回的是队列下标(queueTop或queueBase)在环形队列数组中的真实位置,即数组中的Index。

  2. (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE; 通过直接访问指定内存地址来替换和获取元素。

查看ASHIFT和ABASE的来源:

static {    int s;    try {        UNSAFE = sun.misc.Unsafe.getUnsafe();        Class a = ForkJoinTask[].class;        ABASE = UNSAFE.arrayBaseOffset(a);        s = UNSAFE.arrayIndexScale(a);    } catch (Exception e) {        throw new Error(e);    }    if ((s & (s-1)) != 0)        throw new Error("data type scale not a power of two");    ASHIFT = 31 - Integer.numberOfLeadingZeros(s);}

此处的特殊的求余算法,就是为什么需要submissionQueue是2的整数幂次方 的原因!!!! 两个2的整数次幂方(x%y,其中x必须小余y) 余方法:x&(y-1)

Java数组在实际存储时有一个对象头,后面才是实际的数组数据,而UNSAFE.arrayBaseOffset就是用来获取实际数组数据的偏移量UNSAFE.arrayIndexScale则是获取对应数组元素占的字节数。这里的代码ABASE=16(数组对象头大小),s=4(ForkJoinTask对象引用占用字节数),ASIFT=2。

所以上面的Index << ASHIFT + ABASE合起来就是Index左移2位=Index*4,也就是算Index的在数组中的偏移量,再加上ABASE就是Index在对象中的偏移量。也就是那一行代码主要就是算出来queueTop在队列数组中的实际偏移量。

nativa函数:UNSAFE.putOrderedObject(q, u, t); 能够保证写写不会被重排序,但是不保证写会对其它线程可见;而volatile变量既保证写写不会被重排序,也保证写后对其它线程立即可见。可见Unsafe.putOrderedObject会比直接的volatile变量赋值速度会一点,(需要翻墙)则指出Unsafe.putOrderedObject会比volatile写快3倍。

为什么要保证写不会重排序?线程安全性由subbmissionLock保证 因为只有保证了写不重排序,才能使用上面基于偏移的方式寻找queue中的元素地址。 那么为什么需要基于偏移的方式需找地址?这需要对比不使用上述方式插入task到submissionQueue。 如果自己实现,那么先根据queueTop找到下一个数组index,然后在数组中放入task。 总体来说,就是为了高效的插入task到数组中。[感觉理解不到位啊~,还有别的原因?]

从addSubmission源码中不难发现,addSubmission的核心是:growSubmissionQueuesignalWork

growSubmissionQueue

growSubmissionQueue主要是完成扩容功能(当容量为0或者对象为null,则创建)。

/** * Creates or doubles submissionQueue array. * Basically identical to ForkJoinWorkerThread version. */     /**     * Creates or doubles submissionQueue array.     * Basically identical to ForkJoinWorkerThread version.     */private void growSubmissionQueue() {    ForkJoinTask[] oldQ = submissionQueue;    // 为null则初始化size,否则容量翻倍。    int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;     if (size > MAXIMUM_QUEUE_CAPACITY)        throw new RejectedExecutionException("Queue capacity exceeded");     if (size < INITIAL_QUEUE_CAPACITY)        size = INITIAL_QUEUE_CAPACITY;    ForkJoinTask
[] q = submissionQueue = new ForkJoinTask
[size]; int mask = size - 1; int top = queueTop; int oldMask; if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) { // 如果旧队列中有数据 则将其数据填充到新的队列数组中 for (int b = queueBase; b != top; ++b) { long u = ((b & oldMask) << ASHIFT) + ABASE; // 根据旧的mask获取元素在旧队列中的位置 Object x = UNSAFE.getObjectVolatile(oldQ, u); // 获取元素 // 获取后判断旧的队列中是否存在此元素 如果不存在则取消将其加入新的队列 因为在两个步骤之间 // task可能已经被执行过了 (其他线程或许会持有旧队列数组的引用) 反之 如果存在 则加入到新的队列中 if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null)) UNSAFE.putObjectVolatile (q, ((b & mask) << ASHIFT) + ABASE, x); } }}

signalWork 唤醒线程

while为ture 的条件:(worker总数量少 | 有至少一个等待中)and (active状态的worker太少 | 线程池正在结束)。

e是线程池控制字段,意义:

  • >0:释放一个waiter,唤醒线程
  • =0:没有等待创建的worker
  • <0:线程池正在关闭

创建worker主要是addWorker完成。

/** * Wakes up or creates a worker. */final void signalWork() {    long c; int e, u;    while ((((e = (int)(c = ctl)) | (u = (int)(c >>> 32))) &            (INT_SIGN|SHORT_SIGN)) == (INT_SIGN|SHORT_SIGN) && e >= 0) {        if (e > 0) {                         // release a waiting worker            int i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws;            if ((ws = workers) == null ||                (i = ~e & SMASK) >= ws.length ||                (w = ws[i]) == null)                break;            long nc = (((long)(w.nextWait & E_MASK)) |                       ((long)(u + UAC_UNIT) << 32));            if (w.eventCount == e &&                UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {                w.eventCount = (e + EC_UNIT) & E_MASK;                if (w.parked)                    UNSAFE.unpark(w);                break;            }        }        else if (UNSAFE.compareAndSwapLong                 (this, ctlOffset, c,                  (long)(((u + UTC_UNIT) & UTC_MASK) |                         ((u + UAC_UNIT) & UAC_MASK)) << 32)) {            addWorker();            break;        }    }}

addWorker主要使用ForkJoinWorkerThreadFactory生成worker线程。 源码如下:

private void addWorker() {    Throwable ex = null;    ForkJoinWorkerThread t = null;    try {        t = factory.newThread(this);         // 创建worker线程 下面的代码不解释了 只是根据发生异常的情况来决定是否告知调用者    } catch (Throwable e) {        ex = e;    }    if (t == null) {  // null or exceptional factory return        long c;       // adjust counts        do {} while (!UNSAFE.compareAndSwapLong                     (this, ctlOffset, c = ctl,                      (((c - AC_UNIT) & AC_MASK) |                       ((c - TC_UNIT) & TC_MASK) |                       (c & ~(AC_MASK|TC_MASK)))));        // Propagate exception if originating from an external caller        if (!tryTerminate(false) && ex != null &&            !(Thread.currentThread() instanceof ForkJoinWorkerThread))            UNSAFE.throwException(ex);    }    else        t.start(); // 启动线程}

worker线程流程

创建完线程后,线程直接启动了。Thread的start()会调用run()。

public void run() {    Throwable exception = null;    try {        onStart(); // 首先初始化当前worker线程        pool.work(this); // 调用pool将自己注册到pool中并表示自己可以开始工作    } catch (Throwable ex) {        exception = ex;    } finally {        onTermination(exception);         // 这个方法主要是将当前线程置为终结状态 在work方法种可以看到线程在获取task的时候是根据这个状态轮询的         // 一旦设置为false 就不再接收其他的task 另外也记录了这个线程发生的异常    }}

初始化工作,主要初始化窃取目标:

protected void onStart() {    queue = new ForkJoinTask
[INITIAL_QUEUE_CAPACITY]; // 初始化队列 int r = pool.workerSeedGenerator.nextInt(); // 初始化工作窃取的时候,要窃取的线程对象 seed = (r == 0) ? 1 : r; // must be nonzero}

每个worker线程基本上在调用到这个work()方法后,就会一直循环,重复着 工作窃取、执行线程本身队列task的过程除非在执行task过程中发生异常或者pool被shutdown 或者按需调整工作线程总数,导致该线程被回收。

final void work(ForkJoinWorkerThread w) {    boolean swept = false;                // true on empty scans    long c;    while (!w.terminate && (int)(c = ctl) >= 0) {         // 这个terminate就是上面onTermination(exeception)方法设置的状态位        int a;                            // active count        if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)            swept = scan(w, a); // 扫描        else if (tryAwaitWork(w, c))            swept = false;    }}

scan()如果返回false,则会让work()方法中的循环继续调用scan;返回true则会调用tryAwaitWork() 也就是等待task。迟早还会调用这个方法。

扫描发现其它任务, scan方法输入参数

  • w:当前worker。
  • a:active状态的worker数量。

scan方法:

private boolean scan(ForkJoinWorkerThread w, int a) {    int g = scanGuard; // mask 0 avoids useless scans if only one active    int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;    ForkJoinWorkerThread[] ws = workers;    if (ws == null || ws.length <= m)         // staleness check        return false;           // 安全检查     for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {        ForkJoinTask
t; ForkJoinTask
[] q; int b, i; ForkJoinWorkerThread v = ws[k & m]; // 以下部分是task窃取的一部分 新启动的worker线程会通过work方法调用到scan方法并且传递自己的对象(也就是w) // 到方法中 在下面的代码中他会尝试从现存的worker的队列中窃取一个task ForkJoin框架是在运行的期间不断的分裂 // 在ForkJoinTask的实现类里调用fork()就可能会创建新的worker 并走到这个分支 所以 在worker数不足的情况下 // 窃取的几率很高 但是当worker数稳定后 每个worker会给自己分配task 而不是再这样窃取其他线程的task if (v != null && (b = v.queueBase) != v.queueTop && (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) { long u = (i << ASHIFT) + ABASE; if ((t = q[i]) != null && v.queueBase == b && UNSAFE.compareAndSwapObject(q, u, t, null)) { // compareAndSwapObject将被窃取的位置置空,q的u位置上放入t,t原来的位置放入null int d = (v.queueBase = b + 1) - v.queueTop; // 看 他窃取了 他窃取了! v.stealHint = w.poolIndex; if (d != 0) signalWork();// d!=0说明有多个task没有完成,继续创建worker去窃取task w.execTask(t); } r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5); return false; // store next seed } else if (j < 0) { // xorshift r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5; } else ++k; } if (scanGuard != g) // staleness check return false; else { // try to take submission // 第一个task并不是直接被分配到worker的线程里(因为创建task的并不是worker本身) 而是直接进入pool的队列中 // 然后调用者线程会主动创建一个新的worker 在上面的逻辑(说实话我看不懂上边的逻辑) 中无法从其他worker中 // 窃取到task的时候 或者是其他worker分配的task已经执行完毕后 再从pool的队列中获取task ForkJoinTask
t; ForkJoinTask
[] q; int b, i; if ((b = queueBase) != queueTop && (q = submissionQueue) != null && (i = (q.length - 1) & b) >= 0) { long u = (i << ASHIFT) + ABASE; if ((t = q[i]) != null && queueBase == b && UNSAFE.compareAndSwapObject(q, u, t, null)) { queueBase = b + 1; w.execTask(t); } return false; } return true; // all queues empty } // 无论走了哪个分支 最终都会调用 w.execTask(t); (没task的情况下除外)}

最后都会进入executeTask

final void execTask(ForkJoinTask
t) { currentSteal = t; // 当前窃取到的task 在join的时候有用 for (;;) { if (t != null) t.doExec(); // 执行具体的task 通常在编写forkjoin的时候 都会在运行期间分裂出其他task if (queueTop == queueBase) // 判断自己的队列task是否已经全部完成 如果是则退出方法 // 这里需要特殊说明的是 只有当前线程会给自己的队列添加task 也就是说 当前线程如果不再fork task // queueTop就不会发生变化 所以这个方法的判断是安全的 break; t = locallyFifo ? locallyDeqTask() : popTask(); // 弹出task // 这里唯一需要注意的是 ForkJoin支持FIFO(在外面设置) 如果设置了FIFO 就会跟其它"steal task"的线程一起 // 从queueBase开始获取task 顺带一提 locallyDeqTask有两个版本 一个是针对其他线程的steal task实现 // 另外一个是当前线程的实现 } ++stealCount; currentSteal = null;}

doExec()是执行task的基本方法,调用JoinForkTask.exe()实现具体的执行:

final void doExec() {        if (status >= 0) {            boolean completed;            try {                completed = exec();            } catch (Throwable rex) {                setExceptionalCompletion(rex);                return;            }            if (completed)                setCompletion(NORMAL); // must be outside try block        }    }

具体task是怎么执行的得看JoinForkTask的具体实现。在jdk中的默认抽象实现类的实现如下:

protected final boolean exec() {    result = compute();    return true;}

调用了抽象的compute方法。该方法是一般使用jdk的ForkJoin框架的程序实现。

小结: 先尝试做任务窃取( Work Stealing ),如果不满足条件则到提交队列中获取任务。而ForkJoinWorkerThread线程本身也维护了线程内fork和join任务操作得到的队列,结合起来,总体执行任务的顺序就是:

  1. 线程会先执行ForkJoinWorkerThread对象内维护的任务队列中的任务,即ForkJoinWorkerThread.execTask()方法中的循环实现。通常是LIFO,即去最新的任务。也有特殊情况,这个根据变量locallyFifo的值来判断。
  2. 之后会尝试做任务窃取,尝试从其他线程中获取任务任务窃取条件不满足时,到提交队列中获取提交的任务

Task流程

最核心的方法是forkjoin

fork方法

ForkJoinTask的fork方法实现原理。当我们调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask方法异步的执行这个任务,然后立即返回结果。 代码如下:

public final ForkJoinTask
fork() { ((ForkJoinWorkerThread) Thread.currentThread()) .pushTask(this); return this;}

pushTask方法:

final void pushTask(ForkJoinTask t) {     ForkJoinTask[] q; int s, m;     if ((q = queue) != null) { // ignore if queue removed         long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE; UNSAFE.putOrderedObject(q, u, t);                     // 将task放入当前线程的队列中         queueTop = s + 1; // or use putOrderedInt        if ((s -= queueBase) <= 2)             pool.signalWork(); // 如果队列中的未处理task小于2 则唤醒新的worker            // task分解会一分为二,此时如果task <= 2说明可以尝试继续分解,唤醒的线程继续分解task。        else if (s == m)             growQueue(); // 扩容     } }

worker获取task

popTask: (当前worker从自己的队列获取任务时,)从队头获取task。

private ForkJoinTask
popTask() { int m; ForkJoinTask
[] q = queue; if (q != null && (m = q.length - 1) >= 0) { for (int s; (s = queueTop) != queueBase;) { // 轮循自己的队列 获取没有被窃取的task 是出栈的过程 // 通过工作窃取的索引和 当前worker自己弹出、压入队列的索引 来判断是否有剩余元素 int i = m & --s; long u = (i << ASHIFT) + ABASE; // raw offset ForkJoinTask
t = q[i]; if (t == null) // 获取task的时候如果task已经为空 则被其他线程窃取掉 此时break 将剩下的判断操作委托给外围的execTask处理 break; if (UNSAFE.compareAndSwapObject(q, u, t, null)) { // t位置 置为null 防止steal task的线程重复执行task queueTop = s; // or putOrderedInt // 更新top return t; } } } return null;}

polltask:获取本地或者移除本地task | 窃取task

final ForkJoinTask
pollTask() { ForkJoinWorkerThread[] ws; ForkJoinTask
t = pollLocalTask(); // 尝试从自己的队列中获取task if (t != null || (ws = pool.workers) == null) return t; int n = ws.length; // cheap version of FJP.scan int steps = n << 1; // 这里限制了尝试从其他workersteal task的次数 看起来貌似是worker容器长度的2倍 int r = nextSeed(); int i = 0; while (i < steps) {// 从其他的worker的task队列中steal task ForkJoinWorkerThread w = ws[(i++ + r) & (n - 1)]; // 先判断具体的worker队列中是否存在task 不存在则换到其他的worker if (w != null && w.queueBase != w.queueTop && w.queue != null) { // 如果窃取到task则返回 否则重置steps if ((t = w.deqTask()) != null) return t; i = 0; } } return null;}

join方法与结果获取

join 方法:

public final V join() {    if (doJoin() != NORMAL)         // doJoin就是调用子类实现的exec方法然后根据运行状况 设置不同的状态位        // (比如发生异常设置一个状态 比如task取消是另外一个状态..)        return reportResult();    else        return getRawResult(); // 状态为NORMAL 的时候说明执行完成 直接返回结果即可}

它调用了doJoin()方法,通过doJoin()方法得到当前任务的状态来判断任务完成情况,进而判断返回什么结果。 任务状态有四种:已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出现异常(EXCEPTIONAL)。

  • 如果任务状态是已完成,则直接返回任务结果。
  • 如果任务状态是被取消,则直接抛出CancellationException
  • 如果任务状态是抛出异常,则直接抛出对应的异常。
private int doJoin() {    Thread t; ForkJoinWorkerThread w; int s; boolean completed;    if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {         // 如果调用join方法的线程为worker 则尝试让worker本身执行        if ((s = status) < 0) // 判断task是否已经得到结果 得到结果则直接返回            return s;        if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {             // unpushTask会检查queueTop位置上是不是当前task 如果是则直接执行            try {                completed = exec(); // 这里的流程基本上跟doExec()一样            } catch (Throwable rex) {                return setExceptionalCompletion(rex);            }            if (completed)                return setCompletion(NORMAL);        }        return w.joinTask(this); // 调用当前worker线程的joinTask等待其安排task执行    }    else        return externalAwaitDone(); // 否则object wait}

joinTask如下:

final int joinTask(ForkJoinTask
joinMe) { // 这个替换有点类似方法调用 假设worker在执行task的时候执行了join() 被join的task同样也调用了join() 所以 像个栈.. ForkJoinTask
prevJoin = currentJoin; currentJoin = joinMe; for (int s, retries = MAX_HELP;;) { if ((s = joinMe.status) < 0) { currentJoin = prevJoin; // 所以 当前task的join一旦有了结果 则将currentJoin替换回之前的task return s; } if (retries > 0) { if (queueTop != queueBase) { if (!localHelpJoinTask(joinMe)) retries = 0; // cannot help // 检查当前queueTop位置上的task是否是被join的task 如果不是 并且task已经完成 则直接将retries置为0 // 进入下一轮循环 如果queueTop位置就是当前task 则执行 返回true后会重新进入for循环并返回执行结果 } else if (retries == MAX_HELP >>> 1) { --retries; // check uncommon case if (tryDeqAndExec(joinMe) >= 0) // 当前task队列中已经没有task 明显要被join的task已经被窃取 所以去其他worker的将task窃取回来并执行 Thread.yield(); // for politeness } else retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1; // 帮助其他的worker完成task 走到这个分支只能去steal task了 // (steal task的代码已经示范过多次 这里就不示范了..) } else { retries = MAX_HELP; // restart if not done pool.tryAwaitJoin(joinMe); } } // 这里需要注意的是 如果当前worker的task没有执行完毕 绝对不会去其他worker steal task // 其次 每次循环的第一个判断是判断调用join()的task的状态是否为已完成 也就是说在这期间如果task已经被处理完了 // 则直接退出方法 不再尝试steal task 而这个task可能是由子task执行的 也可能是当前线程自己执行的 //(即完成了的task在join的时候是直接退出的)}

总结整体流程

  1. 客户端程序实现ForkJoinTask,主要是实现compute方法。
  2. 使用ForkJoinPool提交任务。
  3. 首次需addSubmission,将task添加到submissionQueue。创建Worker进程,并启动。
  4. worker线程使用scan获取队列中task,并最终调用execTask执行task。
  5. execTask会使用popTask将初始task弹出,然后调用doExec方法执行。
  6. 执行会最终调用自己实现的compute方法。
  7. 在compute方法会使用fork方法实现初始任务划分,将任务划分成小任务。
  8. 每个fork的任务会将自己放入当前线程的队列中,其实此时刚完成初始化分,两task在一个worker中,所以在同一个队列中。
  9. 由于队列的queue中,task数量小于2会执行pool.singleWorker创建/唤醒新的worker
  10. 此时前一个worker fork完成,在work方法的while循环中,继续执行任务(即步骤4中,在execTask会将本worker线程中所有task挨个doExec。);后一个worker窃取前一个worker的任务,并执行。
  11. fork完成后,此时任务分工完成,达到预期细粒度。进入join。
  12. 进入join后,查询任务执行的情况,如果是NORMAL状态的,就调用doJoin,获取返回值,并且使用调用TaskJoin将task与其它task合并(可能会出现工作窃取,并继续执行)。

ForkJoin框架反复递归执行自定义的compute方法,每次调用都会fork,不断划分task,就像一个二叉树不断往下划分子树,最终到达叶节点。然后打到叶节点的开始回溯,使用join合并结果。和最开始的图一样。

划分和合并规则

  1. 当一个任务划分一个新线程时,它将自己推到 deque 的头部。
  2. 当一个任务执行与另一个未完成任务的合并操作时,它会将另一个任务推到队列头部并执行,而不会休眠以等待另一任务完成(像 Thread.join() 的操作一样)。
  3. 当线程的任务队列为空,它将尝试从另一个线程的 deque 的尾部 窃取另一个任务。

参考

  1. jdk帮助文档

转载于:https://my.oschina.net/hgfdoing/blog/704819

你可能感兴趣的文章
Windows2008server R2 组策略批量更改本地管理员密码
查看>>
ubutnu安装geany
查看>>
webservice 之 Java CXF实战效果 RS WS(一)
查看>>
我的友情链接
查看>>
Repository 与 DAO
查看>>
Zabbix监控Windows主机
查看>>
IBM x3850 RAID5数据恢复方案及过程
查看>>
移动计算领域五大机遇:交通运输优势待挖掘
查看>>
如何把win7 旗舰版升级到sp1最新版本
查看>>
android 调用系统界面
查看>>
Software Enginering-------using git
查看>>
浅谈IP地址-1
查看>>
我的友情链接
查看>>
C#中的线程池使用(一)
查看>>
利用Windows Server Backup功能备份活动目录
查看>>
RAC维护手记08-ASM磁盘组信息查看常用命令
查看>>
实验08 磁盘和文件系统管理
查看>>
我的友情链接
查看>>
我的友情链接
查看>>
FastDFS整合nginx后,nginx一直报错
查看>>