CompletableFuture 不能被中断

原文链接 作者:Tomasz Nurkiewicz  译者:simonwang

我之前写过一篇关于InterruptedException and interrupting threads的文章。总之,如果你调用Future.cancel(),那么Future不仅会终止正在等待的get(),还会试图去中断底层的线程。这是个很重要的特征,它能够使线程池变得更加利于使用。我在之前的文章中也说过,相对于标准的Future,尽量使用CompletableFuture。但事实证明,Future的更加强大的兄弟-CompletableFuture并不能优雅地处理cancel()。

请思考下面的任务代码,在接下来的测试中会用到:

[code lang=”java”]
class InterruptibleTask implements Runnable {

private final CountDownLatch started = new CountDownLatch(1)
private final CountDownLatch interrupted = new CountDownLatch(1)

@Override
void run() {
started.countDown()
try {
Thread.sleep(10_000)
} catch (InterruptedException ignored) {
interrupted.countDown()
}
}

void blockUntilStarted() {
started.await()
}

void blockUntilInterrupted() {
assert interrupted.await(1, TimeUnit.SECONDS)
}

}
[/code]

客户端线程可以检查InterruptibleTask是否已经开始运行或者是被中断了。首先,我们可以从外部查看InterruptibleTask到底会对cancel()作出怎么样的反应:

[code lang=”python”]
def "Future is cancelled without exception"() {
given:
def task = new InterruptibleTask()
def future = myThreadPool.submit(task)
task.blockUntilStarted()
and:
future.cancel(true)
when:
future.get()
then:
thrown(CancellationException)
}

def "CompletableFuture is cancelled via CancellationException"() {
given:
def task = new InterruptibleTask()
def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool)
task.blockUntilStarted()
and:
future.cancel(true)
when:
future.get()
then:
thrown(CancellationException)
}
[/code]

到目前为止一切顺利,Future和CompletableFuture都以几乎相同的方式工作着-在cancel之后取回结果会抛出CancellationException(这里要解释一下,Future.cancel()是不会抛出异常的,而CompletableFuture.cancel()则会以抛出CancellationException强行结束,上面的代码作者都手动抛出了CancellationException)。但在myThreadPool中的线程会怎样呢?我猜会被中断然后被线程池重新回收,我大错特错!

[code lang=”python”]
def "should cancel Future"() {
given:
def task = new InterruptibleTask()
def future = myThreadPool.submit(task)
task.blockUntilStarted()
when:
future.cancel(true)
then:
task.blockUntilInterrupted()
}

@Ignore("Fails with CompletableFuture")
def "should cancel CompletableFuture"() {
given:
def task = new InterruptibleTask()
def future = CompletableFuture.supplyAsync({task.run()} as Supplier, myThreadPool)
task.blockUntilStarted()
when:
future.cancel(true)
then:
task.blockUntilInterrupted()
}
[/code]

第一个测试提交普通的Runnable给ExecutorService然后等待直到它开始执行,接着我们取消Future等待直到抛出InterruptedException,当底层的线程被中断的时候blockUntilInterrupted()会返回。第二个测试失败了,CompletableFuture.cancel()不会中断线程,尽管Future看起来被取消了,但后台线程仍然在执行,sleep()不会抛出InterruptionException。这是一个bug还是这就是CompletableFuture的特点?你们可以查看此文档,不幸地是这就是它的特点:

Parameters: mayInterruptIfRunning – this value has no effect in this implementation because interrupts are not used to control processing.

RTFM(Read The Fucking Manual),但为什么CompletableFuture会以这样的方式工作?首先让我们检查一下“老的”Future的实现与CompletableFuture的有什么不同。FutureTask会在执行ExecutorService.submit()之后返回,而且它的cancel()有如下的实现(我移除了Unsafe以及相似的非线程安全的Java代码,所以仅仅把它当作伪代码看待):

[code lang=”java”]
public boolean cancel(boolean mayInterruptIfRunning) {
if (state != NEW)
return false;
state = mayInterruptIfRunning ? INTERRUPTING : CANCELLED;
try {
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
state = INTERRUPTED;
}
}
} finally {
finishCompletion();
}
return true;
}
[/code]

FutureTask的state变量状态如下图:
state变量的几种状态
万一执行cancel(),我们要么进入CANCELLED状态,要么通过INTERRUPTING进入INTERRUPTED。这里的核心部分是我们要获取runner线程(如果存在,例如如果task正在被执行)然后试着去中断它。这里要小心对于正在运行的线程的强制中断。最后在finishCompletion()中我们要通知所有阻塞在Future.get()的线程(这一步在这里无关痛痒可以忽略)。所以我们可以直观的看到老的Future是如何取消正在运行的tasks的。那CompletableFuture呢?它的cancel()伪代码如下:

[code lang=”java”]
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = false;
if (result == null) {
result = new AltResult(new CancellationException());
cancelled = true;
}
postComplete();
return cancelled || isCancelled();
}
[/code]

这相当令人失望,我们很少把result赋值为CancellationException而忽略mayInterruptIfRunning标志。postComplete()的作用和finishCompletion()的作用相似,通知所有注册在future下的正在等待的回调操作。这种实现相当让人不愉快(使用了非阻塞的Treiber stack),但它的确没有中断任何底层的线程。

Reasons and implications

CompletableFuture的这种cancel限制并不是bug,而是一种设计决定。CompletableFuture天生就没有和任何线程绑定在一起,但Future却几乎总是代表在后台运行的task。使用new关键字创造一个CompletableFuture(new CompletableFuture<>())就很好,这时没有任何底层的线程去取消。但是仍然有大部分的CompletableFuture和后台的task以及线程有联系,在这种情况下有问题的cancel()就是一个潜在的问题。我不建议盲目地用CompletableFuture替换Future,因为如果程序里面有cancel(),那么替换可能会改变程序的行为。这就意味着CompletableFuture有意地违背了里氏替换原则,我们要认真思考这样做的含义。

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: CompletableFuture 不能被中断

  • Trackback 关闭
  • 评论 (0)
  1. 暂无评论

return top