搜索
写经验 领红包

fok面试题(fokjoipool详解)

导语:面试问ForkJoinPool底层原理?这篇文章轻松为你找到答案1

什么是ForkJoinPool

顾名思义,ForkJoinPool运用了Fork/Join原理,使用“分而治之”的思想,将大任务分拆成小任务分配给多个线程执行,最后合并得到最终结果,加快运算。

核心点

任务分解

任务偷取

但是这篇文章不讲forkjointask的使用,只讲任务偷取算法的实现

work-stealing

work-stealing

ForkJoinPool核心是work-stealing算法,翻译过来叫"工作窃取"算法,有点别扭,还是叫work-stealing吧。

ForkJoinPool里有三个重要的角色:

ForkJoinWorkerThread(下文简称worker):包装Thread;WorkQueue:任务队列,双向;ForkJoinTask:worker执行的对象,实现了Future。两种类型,一种叫submission,另一种就叫task。

ForkJoinPool使用数组保存所有WorkQueue(下文经常出现的WorkQueue[]),每个worker有属于自己的WorkQueue,但不是每个WorkQueue都有对应的worker。

没有worker的WorkQueue:保存的是submission,来自外部提交,在WorkQueue[]的下标是偶数;属于worker的WorkQueue:保存的是task,在WorkQueue[]的下标是奇数。

WorkQueue是一个双端队列,同时支持LIFO(last-in-first-out)的push和pop操作,和FIFO(first-in-first-out)的poll操作,分别操作top端和base端。worker操作自己的WorkQueue是LIFO操作(可选FIFO),除此之外,worker会尝试steal其他WorkQueue里的任务,这个时候执行的是FIFO操作。

分开两端取任务的好处:

LIFO操作只有对应的worker才能执行,push和pop不需要考虑并发;拆分时,越大的任务越在WorkQueue的base端,尽早分解,能够尽快进入计算。

光看概念一知半解,我们进入ForkJoinPool的代码。本文首先从构造函数和类开始了解ForkJoinPool的基本参数,下篇再详细过一遍流程。

ForkJoinPool 普通任务执行方式普通任务提交
public ForkJoinTask<?> submit(Runnable task) { 判断是否为ForkJoinTask,不是的话启动ForkJoinTask.AdaptedRunnableAction return externalSubmit((task instanceof ForkJoinTask<?>) ? (ForkJoinTask<Void>) task // avoid re-wrap : new ForkJoinTask.AdaptedRunnableAction(task));}

继续向下,包装成一个ForkJoinTask对象,用于适配ForkJoinPool

static final class AdaptedRunnableAction extends ForkJoinTask<Void> implements RunnableFuture<Void> { final Runnable runnable; AdaptedRunnableAction(Runnable runnable) { this.runnable = runnable; } public final Void getRawResult() { return null; } public final void setRawResult(Void v) { } public final boolean exec() { runnable.run(); return true; } public final void run() { invoke(); } private static final long serialVersionUID = 5232453952276885070L;}

如何提交?

private <T> ForkJoinTask<T> externalSubmit(ForkJoinTask<T> task) { Thread t; ForkJoinWorkerThread w; WorkQueue q; 如果当前线程是ForkJoinWorkerThread if (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) && (w = (ForkJoinWorkerThread)t).pool == this && (q = w.workQueue) != null) q.push(task); else externalPush(task); return task;}

理论上来说不是 ForkJoinWorkerThread,因为我们仅仅是提交普通任务,普通任务不实用forkjoin方式进行任务分解

final void externalPush(ForkJoinTask<?> task) { int r; // initialize caller's probe if ((r = ThreadLocalRandom.getProbe()) == 0) { ThreadLocalRandom.localInit(); r = ThreadLocalRandom.getProbe(); } for (;;) { WorkQueue q; int md = mode, n; WorkQueue[] ws = workQueues; // 如果条件成立,就说明当前ForkJoinPool类中,还没有任何队列,所以要进行队列初始化 if ((q = ws[(n - 1) & r & SQMASK]) == null) { // add queue int qid = (r | QUIET) & ~(FIFO | OWNED); 用workerNamePrefix 作为锁//final String workerNamePrefix; // for worker thread string; sync lock//this.workerNamePrefix = "ForkJoinPool-" + nextPoolId() + "-worker-"; 由此产生 Object lock = workerNamePrefix; //static final int INITIAL_QUEUE_CAPACITY = 1 << 13; 2的12次方,默认长度 ForkJoinTask<?>[] qa = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY]; //设置任务队列基本参数 q = new WorkQueue(this, null); q.array = qa; q.id = qid; q.source = QUIET; 理论上来说lock肯定不等于null if (lock != null) { // unless disabled, lock pool to install synchronized (lock) { WorkQueue[] vs; int i, vn; if ((vs = workQueues) != null && (vn = vs.length) > 0 && vs[i = qid & (vn - 1) & SQMASK] == null) vs[i] = q; // else another thread already installed } } } else if (!q.tryLockPhase()) // move if busy r = ThreadLocalRandom.advanceProbe(r); else { //只有加入了,才能退出 if (q.lockedPush(task)) signalWork(); return; } } }
推入任务
final boolean lockedPush(ForkJoinTask<?> task) { ForkJoinTask<?>[] a; boolean signal = false; int s = top, b = base, cap, d; if ((a = array) != null && (cap = a.length) > 0) { a[(cap - 1) & s] = task; top = s + 1; //是否需要任务队列增长 if (b - s + cap - 1 == 0) growArray(true); else { phase = 0; // full volatile unlock if (((s - base) & ~1) == 0) // size 0 or 1 signal = true; } } return signal;}

增长队列长度

final void growArray(boolean locked) { ForkJoinTask<?>[] newA = null; try { ForkJoinTask<?>[] oldA; int oldSize, newSize; if ((oldA = array) != null && (oldSize = oldA.length) > 0 && (newSize = oldSize << 1) <= MAXIMUM_QUEUE_CAPACITY && newSize > 0) { try { newA = new ForkJoinTask<?>[newSize]; } catch (OutOfMemoryError ex) { } if (newA != null) { // poll from old array, push to new int oldMask = oldSize - 1, newMask = newSize - 1; for (int s = top - 1, k = oldMask; k >= 0; --k) { ForkJoinTask<?> x = (ForkJoinTask<?>) QA.getAndSet(oldA, s & oldMask, null); if (x != null) newA[s-- & newMask] = x; else break; } array = newA; VarHandle.releaseFence(); } } } finally { if (locked) phase = 0; } if (newA == null) throw new RejectedExecutionException("Queue capacity exceeded");}

免责声明:本站部份内容由优秀作者和原创用户编辑投稿,本站仅提供存储服务,不拥有所有权,不承担法律责任。若涉嫌侵权/违法的,请反馈,一经查实立刻删除内容。本文内容由快快网络小心创作整理编辑!