博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
FutureTask源码解读
阅读量:3577 次
发布时间:2019-05-20

本文共 9299 字,大约阅读时间需要 30 分钟。

一篇纯粹解读FutureTask的文章

public class FutureTask
implements RunnableFuture
{ /** * FutureTask的状态 从NEW-->COMPLETEING-->NORMAL */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; /** callable 对象,封装了Runnable任务 */ private Callable
callable; /** callabel 执行的结果 */ private Object outcome; // non-volatile, protected by state reads/writes /** 执行任务的线程 */ private volatile Thread runner; /** 获取结果阻塞的链条头结点 */ private volatile WaitNode waiters; /** * 获取执行结果,如果状态不为NORMAL 则抛异常 * * @param s completed state value */ @SuppressWarnings("unchecked") private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); } /** * * 传入callable 构造方法 并初始化状态 * */ public FutureTask(Callable
callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } /** * 构造方法 将runnable及result 转换为callable * 同时初始化状态 */ public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } public boolean isCancelled() { return state >= CANCELLED; } public boolean isDone() { return state != NEW; } /** * 取消任务 若可中断,会将执行线程置为中断状态 */ public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; } /** * 获取结果,若状态未完成,将会进入阻塞不指定阻塞时间 */ public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } /** * 指定阻塞的时间,超过该时间,报超时。 */ public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); } /** * Protected method invoked when this task transitions to state * {@code isDone} (whether normally or via cancellation). The * default implementation does nothing. Subclasses may override * this method to invoke completion callbacks or perform * bookkeeping. Note that you can query status inside the * implementation of this method to determine whether this task * has been cancelled. */ protected void done() { } /** * * cas 设置状态 * 设置结果 */ protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } /** * 设置异常状态 */ protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } } /** * 真正执行任务 */ public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable
c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); //设置结果 } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } } /** * 直接执行这个任务,但是并没有设置结果 */ protected boolean runAndReset() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable
c = callable; if (c != null && s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; } /** * Ensures that any interrupt from a possible cancel(true) is only * delivered to a task while in run or runAndReset. */ private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt // assert state == INTERRUPTED; // We want to clear any interrupt we may have received from // cancel(true). However, it is permissible to use interrupts // as an independent mechanism for a task to communicate with // its caller, and there is no way to clear only the // cancellation interrupt. // // Thread.interrupted(); } /** * 等待链表。 */ static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } } /** * 唤醒等待的列表 * */ private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint } /** * 阻塞等待 */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { //发生中断 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // 完成了再等待一会 Thread.yield(); else if (q == null) q = new WaitNode(); //创建等待链表 else if (!queued) // 加入链接,并重置为waiters queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { //删除本线程在等待链表中 removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } } /** * 在等待链表中删除本节点并重置头节点 */ private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } } // cas 操作。查找指定对象的offset private static final sun.misc.Unsafe UNSAFE; private static final long stateOffset; private static final long runnerOffset; private static final long waitersOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class
k = FutureTask.class; stateOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("state")); runnerOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("runner")); waitersOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("waiters")); } catch (Exception e) { throw new Error(e); } }}

 

转载地址:http://bhagj.baihongyu.com/

你可能感兴趣的文章
快速排序
查看>>
vue路由高亮的两种方式
查看>>
vue router 报错: Uncaught (in promise) NavigationDuplicated {_name:""NavigationDuplicated"... 的解决方法
查看>>
vue跳转页面的两种方式
查看>>
存储器题目解析(持续更新中....)
查看>>
存储器知识要点
查看>>
Cache模拟器的实现
查看>>
设计模式七大原则
查看>>
SpringBoot入门(二)场景启动器
查看>>
SpringBoot入门--自动配置
查看>>
自动配置原理
查看>>
TCP协议
查看>>
关于Linux系统使用遇到的问题-1:vi 打开只读(readonly)文件如何退出保存?
查看>>
spring注解版(一)
查看>>
SpringBoot中访问控制层(controller)得不到Json数据
查看>>
BFC(Block Formatting Context)
查看>>
什么是作用域,什么是闭包,什么是作用域链
查看>>
惰性求值,面向对象
查看>>
数据结构之列表
查看>>
es5中的arguments对象
查看>>