Java多线程并发FutureTask

Dianthe ·
更新时间:2024-09-20
· 45 次阅读

本文基于最新的 OpenJDK 代码,预计发行版本为 19 。

Java 的多线程机制本质上能够完成两件事情,异步计算和并发。并发问题通过解决线程安全的一系列 API 来解决;而异步计算,常见的使用是 Runnable 和 Callable 配合线程使用。

FutureTask 是基于 Runnable 实现的一个可取消的异步调用 API 。

基本使用

Future 代表了异步计算的结果,通过 ExecutorService 的 Future<?> submit(Runnable task) 方法,作为返回值使用:

interface ArchiveSearcher { String search(String target); } class App { ExecutorService executor = ...; ArchiveSearcher searcher = ...; void showSearch(String target) throws InterruptedException { Callable&lt;String&gt; task = () -&gt; searcher.search(target); Future&lt;String&gt; future = executor.submit(task); // 获取执行结果 displayOtherThings(); // do other things while searching try { displayText(future.get()); // use future } catch (ExecutionException ex) { cleanup(); return; } } }

FutureTask类是实现了Runnable的Future的实现,因此可以由Executor执行。例如,上述带有submit的构造可以替换为:

class App { ExecutorService executor = ...; ArchiveSearcher searcher = ...; void showSearch(String target) throws InterruptedException { Callable&lt;String&gt; task = () -&gt; searcher.search(target); // 关键两行替换 FutureTask&lt;String&gt; future = new FutureTask&lt;&gt;(task); executor.execute(future); displayOtherThings(); // do other things while searching try { displayText(future.get()); // use future } catch (ExecutionException ex) { cleanup(); return; } } } 代码分析 继承关系

Future

Future 表示异步计算的结果。定义了用于检查计算是否完成、等待计算完成以及检索计算结果的能力。只有在计算完成后,才能使用 get 方法检索结果,在必要时会阻塞线程直到 Future 计算完成。取消是由 cancel 方法执行的。还提供了其他方法来确定任务是正常完成还是被取消。一旦计算完成,就不能取消计算。如果为了可取消性而使用 Future ,但又不想提供一个可用的结果,你可以声明形式 Future<?> 并返回 null 作为任务的结果。

在介绍 Future 中定义的能力之前,先了解一下它的用来表示 Future 状态内部类,和状态检索方法:

public interface Future&lt;V&gt; { enum State { // The task has not completed. RUNNING, // The task completed with a result. @see Future#resultNow() SUCCESS, //The task completed with an exception. @see Future#exceptionNow() FAILED, // The task was cancelled. @see #cancel(boolean) CANCELLED } default State state() { if (!isDone())// 根据 isDone() 判断运行中 return State.RUNNING; if (isCancelled()) // 根据 isCancelled() 判断已取消 return State.CANCELLED; boolean interrupted = false; try { while (true) { // 死循环轮询 try { get(); // may throw InterruptedException when done return State.SUCCESS; } catch (InterruptedException e) { interrupted = true; } catch (ExecutionException e) { return State.FAILED; } } } finally { if (interrupted) Thread.currentThread().interrupt(); } } }

Future 的状态检索的默认实现是根据 isDone()isCancelled() 和不断轮询 get() 方法获取到的返回值判断的。

get() 正常返回结果时, state() 返回 State.SUCCESS ; 当抛出 InterruptedException 时,最终会操作所在的线程执行尝试中断的方法;抛出其他异常时,则返回 State.FAILED

Future 中定义的其他方法包括:

package java.util.concurrent; public interface Future&lt;V&gt; { // 取消操作 boolean cancel(boolean mayInterruptIfRunning); // 检查是否取消 boolean isCancelled(); // 检查是否完成 boolean isDone(); // 获取计算结果的方法 V get() throws InterruptedException, ExecutionException; // 带有超时限制的获取计算结果的方法 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; // 立刻返回结果 default V resultNow() // 立刻抛出异常 default Throwable exceptionNow() }

其中 resultNow()exceptionNow() 是带有默认实现的:

default V resultNow() { if (!isDone()) throw new IllegalStateException("Task has not completed"); boolean interrupted = false; try { while (true) { try { return get(); } catch (InterruptedException e) { interrupted = true; } catch (ExecutionException e) { throw new IllegalStateException("Task completed with exception"); } catch (CancellationException e) { throw new IllegalStateException("Task was cancelled"); } } } finally { if (interrupted) Thread.currentThread().interrupt(); } }

Future 仍在运行中,直接抛出 IllegalStateException 。

执行一个轮询,调用 get() 尝试返回计算结果,如果 get() 抛出异常,则根据异常抛出不同消息的 IllegalStateException 或执行中断线程的操作。

default Throwable exceptionNow() { if (!isDone()) throw new IllegalStateException("Task has not completed"); if (isCancelled()) throw new IllegalStateException("Task was cancelled"); boolean interrupted = false; try { while (true) { try { get(); throw new IllegalStateException("Task completed with a result"); } catch (InterruptedException e) { interrupted = true; } catch (ExecutionException e) { return e.getCause(); } } } finally { if (interrupted) Thread.currentThread().interrupt(); } }

Future 仍在运行中,直接抛出 IllegalStateException 。

Future 检查是否已取消,如果取消了抛出 IllegalStateException 。

执行轮询,调用 get() 方法,如果能够正常执行结束,也抛出 IllegalStateException ,消息是 "Task completed with a result" ;get() 若抛出 InterruptedException ,则执行线程中断操作;其他异常正常抛出。

这就是 Future 的全貌了。

RunnableFuture

RunnableFuture 接口同时实现了 Runnable 和 Future 接口 :

public interface RunnableFuture&lt;V&gt; extends Runnable, Future&lt;V&gt; { /** * Sets this Future to the result of its computation * unless it has been cancelled. * 除非已取消,否则将此Future设置为其计算的结果。 */ void run(); }

Runnable 接口是我们常用的用来实现线程操作的,可以说是十分熟悉也十分简单了。

这个接口代表了一个可以 Runnable 的 Future ,run 方法的成功执行代表着 Future 执行完成,并可以获取它的计算结果。

这个接口是 JDK 1.6 后续才有的。

FutureTask

FutureTask 是 RunnableFuture 的直接实现类,它代表了一个可取消的异步计算任务。根据我们对 Future 的分析和 Runnable 的熟悉,就可以理解它的作用了:可取消并可以检索运行状态的一个 Runnable ,配合线程使用可以中断线程执行。当任务没有执行完成时会造成阻塞。并且它还可以配合 Executor 使用。

状态

FutureTask 内部也定义了自己的状态:

public class FutureTask&lt;V&gt; implements RunnableFuture&lt;V&gt; { private volatile int state; private static final int NEW = 0; // 新建 private static final int COMPLETING = 1; // 完成中 private static final int NORMAL = 2; // 正常完成 private static final int EXCEPTIONAL = 3; // 异常的 private static final int CANCELLED = 4; // 已取消 private static final int INTERRUPTING = 5; // 中断中 private static final int INTERRUPTED = 6; // 已中断 @Override public State state() { int s = state; while (s == COMPLETING) { // 等待过渡到 NORMAL 或 EXCEPTIONAL Thread.yield(); s = state; } switch (s) { case NORMAL: return State.SUCCESS; case EXCEPTIONAL: return State.FAILED; case CANCELLED: case INTERRUPTING: case INTERRUPTED: return State.CANCELLED; default: return State.RUNNING; } } }

FutureTask 的状态包括 7 种,最初为 NEW ,只有在 set、setException 和 cancel 方法中,运行状态才会转换为最终状态。在完成期间,状态可能为 COMPLETING (当结果正在设置时) 或 INTERRUPTING (仅当中断跑者以满足cancel(true) )的瞬态值。

可能存在的状态转换是:

NEW -&gt; COMPLETING -&gt; NORMAL // 正常完成 NEW -&gt; COMPLETING -&gt; EXCEPTIONAL // 抛出异常 NEW -&gt; CANCELLED // 取消 NEW -&gt; INTERRUPTING -&gt; INTERRUPTED // 中断 属性

下面分析一下它的属性:

/** 底层的调用;运行后为null */ private Callable&lt;V&gt; callable; /** get()返回的结果或抛出的异常 */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ private volatile Thread runner; /** 等待线程的 Treiber 堆栈 */ private volatile WaitNode waiters; 内部类

先看一看这个 WaitNode ,这是一个 FutureTask 的内部类:

static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }

一个链表结构,用来对等待线程进行排序。

构造方法

最后是方法的分析,首先是构造方法:

// Creates a {@code FutureTask} that will, upon running, execute the given {@code Callable}. public FutureTask(Callable&lt;V&gt; callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } /** * Creates a {@code FutureTask} that will, upon running, execute the * given {@code Runnable}, and arrange that {@code get} will return the * given result on successful completion. * Runnable 成功是返回给定的结果 result */ public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }

FutureTask 接收一个 Callable 或一个 Runnable 作为参数,Runnable 会封装一下都保存到属性 callable ,然后更新 FutureTask 的状态为 NEW

从 Future 接口中实现的方法逐个分析:

检索 FutureTask 状态 public boolean isCancelled() { return state &gt;= CANCELLED; // 大于等于 4, 已取消、中断中、已中断 } public boolean isDone() { return state != NEW; // 不是 new 就代表执行结束了 } 取消操作 // mayInterruptIfRunning 表示最终的取消是通过中断还是通过取消。 public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW &amp;&amp; STATE.compareAndSet(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) // 尝试设置 CANCELLED 或 INTERRUPTING return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); // 通过中断取消任务 } finally { // final state STATE.setRelease(this, INTERRUPTED); // 更新中断状态 } } } finally { finishCompletion(); } return true; }

这里的 finishCompletion() 的作用是通过 LockSupport 唤醒等待的全部线程并从等待列表中移除,然后调用done(),最后把 callable 置空。相当于取消成功后释放资源的操作。

private void finishCompletion() { // assert state &gt; COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (WAITERS.weakCompareAndSet(this, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }

done() 是个空实现,供子类去自定义的。

protected void done() { } 计算结果 public V get() throws InterruptedException, ExecutionException { int s = state; if (s &lt;= COMPLETING) s = awaitDone(false, 0L); // 异步结果 return report(s); } public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s &lt;= COMPLETING &amp;&amp; (s = awaitDone(true, unit.toNanos(timeout))) &lt;= COMPLETING) throw new TimeoutException(); return report(s); }

这里涉及两个方法:awaitDone 方法和 report 方法 。

awaitDone 方法:

private int awaitDone(boolean timed, long nanos) throws InterruptedException { // The code below is very delicate, to achieve these goals: // - if nanos &lt;= 0L, 及时返回,不需要 allocation 或 nanoTime // - if nanos == Long.MIN_VALUE, don't underflow // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic // and we suffer a spurious wakeup, we will do no worse than // to park-spin for a while long startTime = 0L; // Special value 0L means not yet parked WaitNode q = null; boolean queued = false; for (;;) { int s = state; if (s &gt; COMPLETING) { // COMPLETING = 1 if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // 瞬时态,完成中 // We may have already promised (via isDone) that we are done // so never return empty-handed or throw InterruptedException Thread.yield(); else if (Thread.interrupted()) { removeWaiter(q); // 线程中断,移除等待的线程 throw new InterruptedException(); } else if (q == null) { if (timed &amp;&amp; nanos &lt;= 0L) return s; q = new WaitNode(); } else if (!queued) queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q); else if (timed) { // 设置超时时间的情况 final long parkNanos; if (startTime == 0L) { // first time startTime = System.nanoTime(); if (startTime == 0L) startTime = 1L; parkNanos = nanos; } else { long elapsed = System.nanoTime() - startTime; if (elapsed &gt;= nanos) { removeWaiter(q); return state; } parkNanos = nanos - elapsed; } // nanoTime may be slow; recheck before parking if (state &lt; COMPLETING) LockSupport.parkNanos(this, parkNanos); } else LockSupport.park(this); } }

通过 CAS 和 LockSupport 的挂起/唤醒操作配合,阻塞当前线程,异步地等待计算结果。

这里有个 removeWaiter 方法,内部就是遍历 waiters ,删除超时和中断的等待线程。

当异步逻辑执行完成后,调用 report 方法:

// 为完成的任务返回结果或抛出异常 private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s &gt;= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }

这里用到了一个 outcome ,它是一个 Object 类型,作为返回结果,通过 set 方法可以对它进行设置:

// 除非该 future 已被设置或取消,否则将该 future 的结果设置为给定值。 // 该方法在成功完成计算后由 run 方法在内部调用。 protected void set(V v) { if (STATE.compareAndSet(this, NEW, COMPLETING)) { outcome = v; STATE.setRelease(this, NORMAL); // final state finishCompletion(); } } 立刻获取结果或异常

这两个方法都是通过 outcome 预设的返回值,返回预期的结果或异常。

public V resultNow() { switch (state()) { // Future.State case SUCCESS: @SuppressWarnings("unchecked") V result = (V) outcome; return result; case FAILED: throw new IllegalStateException("Task completed with exception"); case CANCELLED: throw new IllegalStateException("Task was cancelled"); default: throw new IllegalStateException("Task has not completed"); } } @Override public Throwable exceptionNow() { switch (state()) { // Future.State case SUCCESS: throw new IllegalStateException("Task completed with a result"); case FAILED: Object x = outcome; return (Throwable) x; case CANCELLED: throw new IllegalStateException("Task was cancelled"); default: throw new IllegalStateException("Task has not completed"); } } run 方法组

最后是实现了 Runnable 的 run 方法:

public void run() { // 保证 NEW 状态和 RUNNER 成功设置当前线程 if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread())) return; try { Callable&lt;V&gt; c = callable; // 待执行的 Callable if (c != null &amp;&amp; state == NEW) { V result; boolean ran; try { result = c.call(); // 执行 Callable ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // 为了防止并发调用 run ,直到 state 确定之前, runner 必须是非空的 runner = null; // 状态必须在 runner 置空后重新读取,以防止泄漏中断 int s = state; if (s &gt;= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }

这里涉及两个方法,第一个是 setException(ex) :

// 导致该future报告一个{@link ExecutionException},并将给定的可抛出对象作为其原因,除非该future已经被设置或取消。 protected void setException(Throwable t) { if (STATE.compareAndSet(this, NEW, COMPLETING)) { outcome = t; STATE.setRelease(this, EXCEPTIONAL); // final state finishCompletion(); } }

另一个是 handlePossibleCancellationInterrupt 方法:

/** * 确保任何来自可能的 cancel(true) 的中断只在 run 或 runAndReset 时交付给任务。 */ private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let's spin-wait patiently. if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt // assert state == INTERRUPTED; // 我们想清除可能从cancel(true)接收到的所有中断。 // 然而,允许使用中断作为任务与其调用者通信的独立机制,并没有办法只清除取消中断。 // Thread.interrupted(); }

最后是 runAndReset 方法:

protected boolean runAndReset() { if (state != NEW || !RUNNER.compareAndSet(this, null, Thread.currentThread())) return false; boolean ran = false; // flag 表示正常执行结束 int s = state; try { Callable&lt;V&gt; c = callable; if (c != null &amp;&amp; s == NEW) { try { c.call(); // don't set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; // if (s &gt;= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran &amp;&amp; s == NEW; // 当正常执行结束,且 state 一开始就是 NEW 时,表示可以运行并重置。 }

执行计算时不设置其结果,然后将该 future 重置为初始状态,如果计算遇到异常或被取消,则不这样做。这是为本质上执行多次的任务设计的。

run 和 runAndReset 都用到了一个 RUNNER , 最后就来揭秘一下这个东西:

private static final VarHandle STATE; private static final VarHandle RUNNER; private static final VarHandle WAITERS; static { try { MethodHandles.Lookup l = MethodHandles.lookup(); STATE = l.findVarHandle(FutureTask.class, "state", int.class); RUNNER = l.findVarHandle(FutureTask.class, "runner", Thread.class); WAITERS = l.findVarHandle(FutureTask.class, "waiters", WaitNode.class); } catch (ReflectiveOperationException e) { throw new ExceptionInInitializerError(e); } // Reduce the risk of rare disastrous classloading in first call to // LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773 Class&lt;?&gt; ensureLoaded = LockSupport.class; }

MethodHandles.lookup()创建一个MethodHandles.Lookup对象,该对象可以创建所有访问权限的方法,包括publicprotectedprivatedefault

VarHandle 主要用于动态操作数组的元素或对象的成员变量VarHandle通过 MethodHandles 来获取实例,然后调用 VarHandle 的方法即可动态操作指定数组的元素或指定对象的成员变量。


作者:自动化BUG制造器
链接:https://juejin.cn/post/7113990902602792968
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。



java多线程 JAVA 线程

需要 登录 后方可回复, 如果你还没有账号请 注册新账号