什么是Fork/Join框架
Fork/Join框架是一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+。。+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。Fork/Join的运行流程图如下:
涉及到的类主要有:
ForkJoinPool
:线程池,实现抽象类AbstractExecutorService
(实现了ExecutorService
)- 负责维护全部的工作线程
- 接收调用者分配的task
- 本身持有一个全局的task队列
- 实现任务窃取
ForkJoinWorkerThread
:ForkJoinPool线程池中的worker线程,具体执行task。其中保存着对所在线程池的引用。ForkJoinTask
接口。task的抽象。RecursiveTask
:task执行完成后带返回值的task。RecursiveAction
:不带返回值的task。
ForkJoin框架能满足的需求
如果一个任务的问题集能被拆分,并且组合多个子任务的结果就能获取结果,那么这个问题就适合使用ForkJoin框架解决问题。例如:从数组中查找最大数,划分为查找局部最大数;
工作窃取
ForkJoin核心点:工作窃取。
工作窃取使得较空闲的线程可以帮助繁忙线程,而不是在空闲等待状态,让整个系统更快的解决问题集合。特别是每个线程处理的问题子集的大小是无法预估的情况下(这种情况下可能出现有些线程很繁忙,而有些比较空闲,在等待其它子任务完成才能算出最终结果。)
每个工作线程都有自己的工作队列,这是使用双端队列(或者叫做 deque)来实现的(Java 6 在类库中添加了几种 deque 实现,包括 ArrayDeque
和 LinkedBlockingDeque
)。
标准队列和双端队列实现工作窃取对比: 可以使用标准队列实现工作窃取,但是与标准队列相比,deque 具有两方面的优势:减少争用和窃取。因为只有工作线程会访问自身的 deque 的头部,deque 头部永远不会发生争用;因为只有当一个线程空闲时才会访问 deque 的尾部,所以也很少存在线程的 deque 尾部的争用(在 fork-join 框架中结合 deque 实现会使这些访问模式进一步减少协调成本)。
跟传统的基于线程池的方法相比,减少争用会大大降低同步成本。此外,这种方法暗含的后进先出(last-in-first-out,LIFO)任务排队机制意味着最大的任务排在队列的尾部,当另一个线程需要窃取任务时,它将得到一个大任务(能够分解成多个小任务的任务),从而避免了在未来窃取任务。因此,工作窃取实现了合理的负载平衡,无需进行协调并且将同步成本降到了最小。
工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。
关键属性
ForkJoinTask [] submissionQueue; // java.util.concurrent.ForkJoinPool// Pool的task队列 初始容量为8192 由于submissionQueue是环形队列 而作者使用了特殊的求余算法 导致的容量必须为2的幂次方 // 通过`java.util.concurrent.ForkJoinPool#growSubmissionQueue()`方法拓展 调用者线程submit的task都会提交到这个队列// 然后唤醒worker去该队列steal task 如果在worker线程调用submit提交直接提交调用提交方法的worker线程的task队列中ForkJoinTask [] queue; // java.util.concurrent.ForkJoinWorkerThread // Worker的task队列 初始容量为8192 基本上与submissionQueue的维护方式一样 不过只能通过在worker线程调用fork()才能将// task添加到这个队列中volatile int queueBase; // java.util.concurrent.ForkJoinPool & java.util.concurrent.ForkJoinWorkerThread// 队列尾部索引 task窃取时会更改此值//由于几个线程可能同时访问 所以修饰符是volatileint queueTop; // java.util.concurrent.ForkJoinPool & java.util.concurrent.ForkJoinWorkerThread// task push与pop时会更改的索引 根据上面对队列属性的描述会发觉所有入队操作都是由"一个线程"来完成的(调用者线程往pool中// push task 而worker线程自己给自己fork task) 所以其是非线程安全的 另 submissionLock 保证了向pool中提交task的安全性// 但是这个保护只是防止调用者作死而存在的(比如并发往pool中提交task) 如果保证调用者单线程入数据 则不需要这个锁volatile int status; // java.util.concurrent.ForkJoinTask// task执行的状态 对应本类的四个状态常量 已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出现异常(EXCEPTIONAL)
框架入口
提交任务
由于ForkJoinPool实现了ExecutorService,也就是支持通过submit
和execute
两种方法提交任务。
在submit
和execute
方法中,可能需要使用ForkJoinTask.adapt()
方法将Runnable
和Callable
方法包装成ForkJoinTask
类型的Job。 核型提交由forkOrSubmit
完成。
源码:
privatevoid forkOrSubmit(ForkJoinTask task) { ForkJoinWorkerThread w; Thread t = Thread.currentThread(); if (shutdown) throw new RejectedExecutionException(); // 已经被shutdown后则不再接收新的task 与传统线程池不同的是task最大数量是由ForkJoin自行管理的 外部不可更改 // 环形队列把这点限制死了 不过也不错 if ((t instanceof ForkJoinWorkerThread) && (w = (ForkJoinWorkerThread)t).pool == this) w.pushTask(task); // 如果提交task的线程是worker线程并且属于当前的pool 则直接将task添加到这个worker中 //(即:调用着直接将task提交到具体的worker线程,而不是提交给线程池) // 这个方法由于面向具体的线程 所以不需要锁 else addSubmission(task); // 否则将task添加到pool队列中}
如果当前线程是ForkJoinWorkerThread
类型的,则将任务追加到ForkJoinWorkerThread
对象中维护的队列上,否则将新的任务放入ForkJoinPool
的提交队列中,并通知线程工作。
pushTask方法实现原理在下面谈ForkJoinTask的fork方法实现原理时一起说。
简化提交方式
publicT invoke(ForkJoinTask task) { Thread t = Thread.currentThread(); if (task == null) throw new NullPointerException(); if (shutdown) throw new RejectedExecutionException(); if ((t instanceof ForkJoinWorkerThread) && ((ForkJoinWorkerThread)t).pool == this) return task.invoke(); // 如果提交task的线程对象是当前pool中的worker 则直接让当前worker自己处理task else { addSubmission(task); // 所有非worker提交的task全部由pool保存 return task.join(); //等待task执行完毕 }}
调用者线程流程
即调用了提交接口的线程,在task提交后的主要流程。 入口处不管是submit,execute方法提交task,还是使用invoke直接提交task,如果是提交给线程池,那么都会进入addSubmission
方法。
private void addSubmission(ForkJoinTask t) { final ReentrantLock lock = this.submissionLock; // 这里这个锁是防止调用者乱搞 task的生成大多都在worker中发生 //并且不会使用ForkJoinPool.addSubmission方法来实现生成task, 所以调用这个方法的只有调用者 lock.lock(); try { ForkJoinTask [] q; int s, m; if ((q = submissionQueue) != null) { long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE; // 获取top对应的数组位置(内存地址) UNSAFE.putOrderedObject(q, u, t); // 将新的task添加到队列中 queueTop = s + 1;//更新队列头的位置 if (s - queueBase == m) growSubmissionQueue(); // 在添加之后检查队列长度是否到极限 如果是则扩容 } } finally { lock.unlock(); } signalWork(); // 唤醒工作线程}
方法结尾处会唤醒(或创建) worker线程 (创建用addWorker()) 而worker线程被创建之后 就会不断的调用scan方法去窃取其他worker或pool中的task 直到全部task结束 这里需要特别说明的地方有2点:
-
(s = queueTop) & (m = q.length-1)
,相当于queueTop % (q.length - 1)
,返回的是队列下标(queueTop或queueBase)在环形队列数组中的真实位置,即数组中的Index。 -
(((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE;
通过直接访问指定内存地址来替换和获取元素。
查看ASHIFT和ABASE的来源:
static { int s; try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class a = ForkJoinTask[].class; ABASE = UNSAFE.arrayBaseOffset(a); s = UNSAFE.arrayIndexScale(a); } catch (Exception e) { throw new Error(e); } if ((s & (s-1)) != 0) throw new Error("data type scale not a power of two"); ASHIFT = 31 - Integer.numberOfLeadingZeros(s);}
此处的特殊的求余算法,就是为什么需要submissionQueue是2的整数幂次方 的原因!!!! 两个2的整数次幂方(x%y,其中x必须小余y) 余方法:
x&(y-1)
Java数组在实际存储时有一个对象头,后面才是实际的数组数据,而UNSAFE.arrayBaseOffset
就是用来获取实际数组数据的偏移量;UNSAFE.arrayIndexScale
则是获取对应数组元素占的字节数。这里的代码ABASE=16(数组对象头大小),s=4(ForkJoinTask对象引用占用字节数),ASIFT=2。
所以上面的Index << ASHIFT + ABASE合起来就是Index左移2位=Index*4,也就是算Index的在数组中的偏移量,再加上ABASE就是Index在对象中的偏移量。也就是那一行代码主要就是算出来queueTop在队列数组中的实际偏移量。
nativa函数:UNSAFE.putOrderedObject(q, u, t);
能够保证写写不会被重排序,但是不保证写会对其它线程可见;而volatile变量既保证写写不会被重排序,也保证写后对其它线程立即可见。可见Unsafe.putOrderedObject会比直接的volatile变量赋值速度会一点,(需要翻墙)则指出Unsafe.putOrderedObject会比volatile写快3倍。
为什么要保证写不会重排序?线程安全性由subbmissionLock保证 因为只有保证了写不重排序,才能使用上面基于偏移的方式寻找queue中的元素地址。 那么为什么需要基于偏移的方式需找地址?这需要对比不使用上述方式插入task到submissionQueue。 如果自己实现,那么先根据queueTop找到下一个数组index,然后在数组中放入task。 总体来说,就是为了高效的插入task到数组中。[感觉理解不到位啊~,还有别的原因?]
从addSubmission源码中不难发现,addSubmission的核心是:growSubmissionQueue
和signalWork
。
growSubmissionQueue
growSubmissionQueue主要是完成扩容功能(当容量为0或者对象为null,则创建)。
/** * Creates or doubles submissionQueue array. * Basically identical to ForkJoinWorkerThread version. */ /** * Creates or doubles submissionQueue array. * Basically identical to ForkJoinWorkerThread version. */private void growSubmissionQueue() { ForkJoinTask[] oldQ = submissionQueue; // 为null则初始化size,否则容量翻倍。 int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY; if (size > MAXIMUM_QUEUE_CAPACITY) throw new RejectedExecutionException("Queue capacity exceeded"); if (size < INITIAL_QUEUE_CAPACITY) size = INITIAL_QUEUE_CAPACITY; ForkJoinTask [] q = submissionQueue = new ForkJoinTask [size]; int mask = size - 1; int top = queueTop; int oldMask; if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) { // 如果旧队列中有数据 则将其数据填充到新的队列数组中 for (int b = queueBase; b != top; ++b) { long u = ((b & oldMask) << ASHIFT) + ABASE; // 根据旧的mask获取元素在旧队列中的位置 Object x = UNSAFE.getObjectVolatile(oldQ, u); // 获取元素 // 获取后判断旧的队列中是否存在此元素 如果不存在则取消将其加入新的队列 因为在两个步骤之间 // task可能已经被执行过了 (其他线程或许会持有旧队列数组的引用) 反之 如果存在 则加入到新的队列中 if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null)) UNSAFE.putObjectVolatile (q, ((b & mask) << ASHIFT) + ABASE, x); } }}
signalWork 唤醒线程
while为ture 的条件:(worker总数量少 | 有至少一个等待中)and (active状态的worker太少 | 线程池正在结束)。
e
是线程池控制字段,意义:
>0
:释放一个waiter,唤醒线程=0
:没有等待创建的worker<0
:线程池正在关闭
创建worker主要是addWorker
完成。
/** * Wakes up or creates a worker. */final void signalWork() { long c; int e, u; while ((((e = (int)(c = ctl)) | (u = (int)(c >>> 32))) & (INT_SIGN|SHORT_SIGN)) == (INT_SIGN|SHORT_SIGN) && e >= 0) { if (e > 0) { // release a waiting worker int i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws; if ((ws = workers) == null || (i = ~e & SMASK) >= ws.length || (w = ws[i]) == null) break; long nc = (((long)(w.nextWait & E_MASK)) | ((long)(u + UAC_UNIT) << 32)); if (w.eventCount == e && UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) { w.eventCount = (e + EC_UNIT) & E_MASK; if (w.parked) UNSAFE.unpark(w); break; } } else if (UNSAFE.compareAndSwapLong (this, ctlOffset, c, (long)(((u + UTC_UNIT) & UTC_MASK) | ((u + UAC_UNIT) & UAC_MASK)) << 32)) { addWorker(); break; } }}
addWorker主要使用ForkJoinWorkerThreadFactory生成worker线程。 源码如下:
private void addWorker() { Throwable ex = null; ForkJoinWorkerThread t = null; try { t = factory.newThread(this); // 创建worker线程 下面的代码不解释了 只是根据发生异常的情况来决定是否告知调用者 } catch (Throwable e) { ex = e; } if (t == null) { // null or exceptional factory return long c; // adjust counts do {} while (!UNSAFE.compareAndSwapLong (this, ctlOffset, c = ctl, (((c - AC_UNIT) & AC_MASK) | ((c - TC_UNIT) & TC_MASK) | (c & ~(AC_MASK|TC_MASK))))); // Propagate exception if originating from an external caller if (!tryTerminate(false) && ex != null && !(Thread.currentThread() instanceof ForkJoinWorkerThread)) UNSAFE.throwException(ex); } else t.start(); // 启动线程}
worker线程流程
创建完线程后,线程直接启动了。Thread的start()
会调用run()。
public void run() { Throwable exception = null; try { onStart(); // 首先初始化当前worker线程 pool.work(this); // 调用pool将自己注册到pool中并表示自己可以开始工作 } catch (Throwable ex) { exception = ex; } finally { onTermination(exception); // 这个方法主要是将当前线程置为终结状态 在work方法种可以看到线程在获取task的时候是根据这个状态轮询的 // 一旦设置为false 就不再接收其他的task 另外也记录了这个线程发生的异常 }}
初始化工作,主要初始化窃取目标:
protected void onStart() { queue = new ForkJoinTask [INITIAL_QUEUE_CAPACITY]; // 初始化队列 int r = pool.workerSeedGenerator.nextInt(); // 初始化工作窃取的时候,要窃取的线程对象 seed = (r == 0) ? 1 : r; // must be nonzero}
每个worker线程基本上在调用到这个work()
方法后,就会一直循环,重复着 工作窃取、执行线程本身队列task的过程除非在执行task过程中发生异常或者pool被shutdown 或者按需调整工作线程总数,导致该线程被回收。
final void work(ForkJoinWorkerThread w) { boolean swept = false; // true on empty scans long c; while (!w.terminate && (int)(c = ctl) >= 0) { // 这个terminate就是上面onTermination(exeception)方法设置的状态位 int a; // active count if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0) swept = scan(w, a); // 扫描 else if (tryAwaitWork(w, c)) swept = false; }}
scan()
如果返回false,则会让work()
方法中的循环继续调用scan;返回true则会调用tryAwaitWork()
也就是等待task。迟早还会调用这个方法。
扫描发现其它任务, scan方法输入参数
w
:当前worker。a
:active状态的worker数量。
scan方法:
private boolean scan(ForkJoinWorkerThread w, int a) { int g = scanGuard; // mask 0 avoids useless scans if only one active int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK; ForkJoinWorkerThread[] ws = workers; if (ws == null || ws.length <= m) // staleness check return false; // 安全检查 for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) { ForkJoinTask t; ForkJoinTask [] q; int b, i; ForkJoinWorkerThread v = ws[k & m]; // 以下部分是task窃取的一部分 新启动的worker线程会通过work方法调用到scan方法并且传递自己的对象(也就是w) // 到方法中 在下面的代码中他会尝试从现存的worker的队列中窃取一个task ForkJoin框架是在运行的期间不断的分裂 // 在ForkJoinTask的实现类里调用fork()就可能会创建新的worker 并走到这个分支 所以 在worker数不足的情况下 // 窃取的几率很高 但是当worker数稳定后 每个worker会给自己分配task 而不是再这样窃取其他线程的task if (v != null && (b = v.queueBase) != v.queueTop && (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) { long u = (i << ASHIFT) + ABASE; if ((t = q[i]) != null && v.queueBase == b && UNSAFE.compareAndSwapObject(q, u, t, null)) { // compareAndSwapObject将被窃取的位置置空,q的u位置上放入t,t原来的位置放入null int d = (v.queueBase = b + 1) - v.queueTop; // 看 他窃取了 他窃取了! v.stealHint = w.poolIndex; if (d != 0) signalWork();// d!=0说明有多个task没有完成,继续创建worker去窃取task w.execTask(t); } r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5); return false; // store next seed } else if (j < 0) { // xorshift r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5; } else ++k; } if (scanGuard != g) // staleness check return false; else { // try to take submission // 第一个task并不是直接被分配到worker的线程里(因为创建task的并不是worker本身) 而是直接进入pool的队列中 // 然后调用者线程会主动创建一个新的worker 在上面的逻辑(说实话我看不懂上边的逻辑) 中无法从其他worker中 // 窃取到task的时候 或者是其他worker分配的task已经执行完毕后 再从pool的队列中获取task ForkJoinTask t; ForkJoinTask [] q; int b, i; if ((b = queueBase) != queueTop && (q = submissionQueue) != null && (i = (q.length - 1) & b) >= 0) { long u = (i << ASHIFT) + ABASE; if ((t = q[i]) != null && queueBase == b && UNSAFE.compareAndSwapObject(q, u, t, null)) { queueBase = b + 1; w.execTask(t); } return false; } return true; // all queues empty } // 无论走了哪个分支 最终都会调用 w.execTask(t); (没task的情况下除外)}
最后都会进入executeTask
。
final void execTask(ForkJoinTask t) { currentSteal = t; // 当前窃取到的task 在join的时候有用 for (;;) { if (t != null) t.doExec(); // 执行具体的task 通常在编写forkjoin的时候 都会在运行期间分裂出其他task if (queueTop == queueBase) // 判断自己的队列task是否已经全部完成 如果是则退出方法 // 这里需要特殊说明的是 只有当前线程会给自己的队列添加task 也就是说 当前线程如果不再fork task // queueTop就不会发生变化 所以这个方法的判断是安全的 break; t = locallyFifo ? locallyDeqTask() : popTask(); // 弹出task // 这里唯一需要注意的是 ForkJoin支持FIFO(在外面设置) 如果设置了FIFO 就会跟其它"steal task"的线程一起 // 从queueBase开始获取task 顺带一提 locallyDeqTask有两个版本 一个是针对其他线程的steal task实现 // 另外一个是当前线程的实现 } ++stealCount; currentSteal = null;}
doExec()是执行task的基本方法,调用JoinForkTask.exe()
实现具体的执行:
final void doExec() { if (status >= 0) { boolean completed; try { completed = exec(); } catch (Throwable rex) { setExceptionalCompletion(rex); return; } if (completed) setCompletion(NORMAL); // must be outside try block } }
具体task是怎么执行的得看JoinForkTask
的具体实现。在jdk中的默认抽象实现类的实现如下:
protected final boolean exec() { result = compute(); return true;}
调用了抽象的compute方法。该方法是一般使用jdk的ForkJoin框架的程序实现。
小结: 先尝试做任务窃取( Work Stealing ),如果不满足条件则到提交队列中获取任务。而ForkJoinWorkerThread线程本身也维护了线程内fork和join任务操作得到的队列,结合起来,总体执行任务的顺序就是:
- 线程会先执行ForkJoinWorkerThread对象内维护的任务队列中的任务,即
ForkJoinWorkerThread.execTask()
方法中的循环实现。通常是LIFO,即去最新的任务。也有特殊情况,这个根据变量locallyFifo的值来判断。 - 之后会尝试做任务窃取,尝试从其他线程中获取任务任务窃取条件不满足时,到提交队列中获取提交的任务
Task流程
最核心的方法是fork
和join
。
fork方法
ForkJoinTask的fork方法实现原理。当我们调用ForkJoinTask的fork方法时,程序会调用ForkJoinWorkerThread的pushTask
方法异步的执行这个任务,然后立即返回结果。 代码如下:
public final ForkJoinTaskfork() { ((ForkJoinWorkerThread) Thread.currentThread()) .pushTask(this); return this;}
pushTask方法:
final void pushTask(ForkJoinTask t) { ForkJoinTask[] q; int s, m; if ((q = queue) != null) { // ignore if queue removed long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE; UNSAFE.putOrderedObject(q, u, t); // 将task放入当前线程的队列中 queueTop = s + 1; // or use putOrderedInt if ((s -= queueBase) <= 2) pool.signalWork(); // 如果队列中的未处理task小于2 则唤醒新的worker // task分解会一分为二,此时如果task <= 2说明可以尝试继续分解,唤醒的线程继续分解task。 else if (s == m) growQueue(); // 扩容 } }
worker获取task
popTask: (当前worker从自己的队列获取任务时,)从队头获取task。
private ForkJoinTask popTask() { int m; ForkJoinTask [] q = queue; if (q != null && (m = q.length - 1) >= 0) { for (int s; (s = queueTop) != queueBase;) { // 轮循自己的队列 获取没有被窃取的task 是出栈的过程 // 通过工作窃取的索引和 当前worker自己弹出、压入队列的索引 来判断是否有剩余元素 int i = m & --s; long u = (i << ASHIFT) + ABASE; // raw offset ForkJoinTask t = q[i]; if (t == null) // 获取task的时候如果task已经为空 则被其他线程窃取掉 此时break 将剩下的判断操作委托给外围的execTask处理 break; if (UNSAFE.compareAndSwapObject(q, u, t, null)) { // t位置 置为null 防止steal task的线程重复执行task queueTop = s; // or putOrderedInt // 更新top return t; } } } return null;}
polltask:获取本地或者移除本地task | 窃取task
final ForkJoinTask pollTask() { ForkJoinWorkerThread[] ws; ForkJoinTask t = pollLocalTask(); // 尝试从自己的队列中获取task if (t != null || (ws = pool.workers) == null) return t; int n = ws.length; // cheap version of FJP.scan int steps = n << 1; // 这里限制了尝试从其他workersteal task的次数 看起来貌似是worker容器长度的2倍 int r = nextSeed(); int i = 0; while (i < steps) {// 从其他的worker的task队列中steal task ForkJoinWorkerThread w = ws[(i++ + r) & (n - 1)]; // 先判断具体的worker队列中是否存在task 不存在则换到其他的worker if (w != null && w.queueBase != w.queueTop && w.queue != null) { // 如果窃取到task则返回 否则重置steps if ((t = w.deqTask()) != null) return t; i = 0; } } return null;}
join方法与结果获取
join 方法:
public final V join() { if (doJoin() != NORMAL) // doJoin就是调用子类实现的exec方法然后根据运行状况 设置不同的状态位 // (比如发生异常设置一个状态 比如task取消是另外一个状态..) return reportResult(); else return getRawResult(); // 状态为NORMAL 的时候说明执行完成 直接返回结果即可}
它调用了doJoin()
方法,通过doJoin()方法得到当前任务的状态来判断任务完成情况,进而判断返回什么结果。 任务状态有四种:已完成(NORMAL),被取消(CANCELLED),信号(SIGNAL)和出现异常(EXCEPTIONAL)。
- 如果任务状态是已完成,则直接返回任务结果。
- 如果任务状态是被取消,则直接抛出
CancellationException
。 - 如果任务状态是抛出异常,则直接抛出对应的异常。
private int doJoin() { Thread t; ForkJoinWorkerThread w; int s; boolean completed; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) { // 如果调用join方法的线程为worker 则尝试让worker本身执行 if ((s = status) < 0) // 判断task是否已经得到结果 得到结果则直接返回 return s; if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) { // unpushTask会检查queueTop位置上是不是当前task 如果是则直接执行 try { completed = exec(); // 这里的流程基本上跟doExec()一样 } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) return setCompletion(NORMAL); } return w.joinTask(this); // 调用当前worker线程的joinTask等待其安排task执行 } else return externalAwaitDone(); // 否则object wait}
joinTask如下:
final int joinTask(ForkJoinTask joinMe) { // 这个替换有点类似方法调用 假设worker在执行task的时候执行了join() 被join的task同样也调用了join() 所以 像个栈.. ForkJoinTask prevJoin = currentJoin; currentJoin = joinMe; for (int s, retries = MAX_HELP;;) { if ((s = joinMe.status) < 0) { currentJoin = prevJoin; // 所以 当前task的join一旦有了结果 则将currentJoin替换回之前的task return s; } if (retries > 0) { if (queueTop != queueBase) { if (!localHelpJoinTask(joinMe)) retries = 0; // cannot help // 检查当前queueTop位置上的task是否是被join的task 如果不是 并且task已经完成 则直接将retries置为0 // 进入下一轮循环 如果queueTop位置就是当前task 则执行 返回true后会重新进入for循环并返回执行结果 } else if (retries == MAX_HELP >>> 1) { --retries; // check uncommon case if (tryDeqAndExec(joinMe) >= 0) // 当前task队列中已经没有task 明显要被join的task已经被窃取 所以去其他worker的将task窃取回来并执行 Thread.yield(); // for politeness } else retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1; // 帮助其他的worker完成task 走到这个分支只能去steal task了 // (steal task的代码已经示范过多次 这里就不示范了..) } else { retries = MAX_HELP; // restart if not done pool.tryAwaitJoin(joinMe); } } // 这里需要注意的是 如果当前worker的task没有执行完毕 绝对不会去其他worker steal task // 其次 每次循环的第一个判断是判断调用join()的task的状态是否为已完成 也就是说在这期间如果task已经被处理完了 // 则直接退出方法 不再尝试steal task 而这个task可能是由子task执行的 也可能是当前线程自己执行的 //(即完成了的task在join的时候是直接退出的)}
总结整体流程
- 客户端程序实现
ForkJoinTask
,主要是实现compute
方法。 - 使用
ForkJoinPool
提交任务。 - 首次需addSubmission,将task添加到
submissionQueue
。创建Worker进程,并启动。 - worker线程使用scan获取队列中task,并最终调用
execTask
执行task。 execTask
会使用popTask
将初始task弹出,然后调用doExec
方法执行。- 执行会最终调用自己实现的
compute
方法。 - 在compute方法会使用
fork
方法实现初始任务划分,将任务划分成小任务。 - 每个fork的任务会将自己放入当前线程的队列中,其实此时刚完成初始化分,两task在一个worker中,所以在同一个队列中。
- 由于队列的queue中,task数量小于2会执行
pool.singleWorker
创建/唤醒新的worker - 此时前一个worker fork完成,在work方法的while循环中,继续执行任务(即步骤4中,在execTask会将本worker线程中所有task挨个doExec。);后一个worker窃取前一个worker的任务,并执行。
- fork完成后,此时任务分工完成,达到预期细粒度。进入join。
- 进入join后,查询任务执行的情况,如果是NORMAL状态的,就调用
doJoin
,获取返回值,并且使用调用TaskJoin
将task与其它task合并(可能会出现工作窃取,并继续执行)。
ForkJoin框架反复递归执行自定义的compute方法,每次调用都会fork,不断划分task,就像一个二叉树不断往下划分子树,最终到达叶节点。然后打到叶节点的开始回溯,使用join合并结果。和最开始的图一样。
划分和合并规则
- 当一个任务划分一个新线程时,它将自己推到 deque 的头部。
- 当一个任务执行与另一个未完成任务的合并操作时,它会将另一个任务推到队列头部并执行,而不会休眠以等待另一任务完成(像 Thread.join() 的操作一样)。
- 当线程的任务队列为空,它将尝试从另一个线程的 deque 的尾部 窃取另一个任务。
参考
- jdk帮助文档