线程池运用实例——一次错误的多线程程序设计以及修复过程

写在前面的话 

写下这篇文章只为了回顾之前在实际工作中犯的一个极其二逼的错误,用我的经历来提示后来者,诸位程序大神,大牛,小牛们看到此文笑笑即可,轻拍轻拍。。。

1 背景

有这么一个需求,我们的系统(后面简称:A系统)需要在后台执行一个报表导出任务,在这个任务的执行过程中需要通过CORBA调用其他系统(后面简称:B系统)的一个(也有可能是多个)接口去查询报表,待结果返回后,将这些结果写入Excel。这个需求是不是很简单?套用网上一些FutureTask或者线程池的例子一两小时就能搞定这个需求。当时我也是这样认为的,可谁想,这是一个巨大的坑….

2 初始设计

用过CORBA的同学会知道,如同数据库连接一样,CORBA的连接数也是是有限的,如果一个接口调用的时间过长,就会长时间占用CORBA有限的连接数,当这种长时间的同步调用过多时就会造成整个系统CORBA调用的阻塞,进而造成系统停止响应。由于查询操作很耗时,为了避免这种情况的发生,这个接口被设计成了一个异步接口。任务的执行流程就会是这样:任务开始执行,接着调用这个接口并且通过CORBA向B系统订阅一个事件,然后任务进入等待状态,当B系统执行完成后,会向A系统发送一个事件告知执行的结果,任务收到事件后重新开始执行直到结束,如图:

既然说到了事件,那么很自然而然的就想到了使用回调的方式去响应事件,并且为了避免事件超时(也就是长时间没有接收到事件)导致任务长时间等待,我还使用了一个定时的任务去检查任务的状态。所以我的程序看起来就像这样:

IEventFuture.java

[code lang=”java”]
public interface IEventFuture {
void onEventReceived(Event event);
}
[/code]

ExportRptTask.java

[code lang=”java”]
public class ExportRptTask implements Callable<Void>, IEventFuture {
private static final int INITIALIZED = 0;
private static final int RUNNING = 1;
private static final int COMPLETED = 2;
private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L;
private Date lastUpdate = new Date();
private volatile int state = INITIALIZED;

private Timer timer = new Timer();
private SystemBSer systemBSer = new SystemBSer();

private int eventId = -1;

@Override
public Void call() throws Exception {
this.state = RUNNING;
try {
systemBSer.doQuery();
subscribeEvent();
startTaskTimeoutMonitorTask();
Future future = createEventFuture();
future.get();
} catch (Throwable t) {
onTaskError(t);
} finally {
EventManager.unsubscribe(this.eventId);
timer.cancel();
}
return null;
}

@Override
public void onEventReceived(Event event) {
this.lastUpdate = new Date();
// start to write excel
// …..
// end to write excel
this.state = COMPLETED;
}

private void subscribeEvent() {
this.eventId = EventManager.subscribe(this);
}

private Future createEventFuture() {
FutureTask<Void> listenFuture = new FutureTask<Void>(new Callable<Void>() {

@Override
public Void call() throws Exception {
while (state != COMPLETED) {

}
return null;
}
});

new Thread(listenFuture).start();
return listenFuture;
}

private void startTaskTimeoutMonitorTask() {
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {

if (state != COMPLETED || new Date().getTime() – lastUpdate.getTime() > TASK_TIME_OUT_TIME) {
onTaskTimeout();
}
}
}, 0, 15 * 60 * 1000);
}

private void onTaskTimeout() {
// do something on task timeout.
//   ….
// end

// set task to completed to end task.
this.state = COMPLETED;
}

private void onTaskError(Throwable t) {
// do something to handle error.
}
}
[/code]

3 升级改进

由于做这个需求的关系,我开始阅读一些关于JAVA多线程编程的一下教程,在阅读到关于闭锁的内容时,我突然灵光一现,这玩意不正好可以代替我那个丑陋的使用循环来让任务进入等待状态的实现么?然后我的程序就变成了这样:

ExportRptTask.java

[code lang=”java”]
public class ExportRptTask implements Callable<Void>, IEventFuture {
private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L;
private Date lastUpdate = new Date();

private CountDownLatch endGate = new CountDownLatch(1);
private Timer timer = new Timer();
private SystemBSer systemBSer = new SystemBSer();

private int eventId = -1;

@Override
public Void call() throws Exception {
try {
systemBSer.doQuery();
subscribeEvent();
endGate.await();
startTaskTimeoutMonitorTask();
} catch (Throwable t) {
onTaskError(t);
} finally {
EventManager.unsubscribe(this.eventId);
timer.cancel();
}
return null;
}

@Override
public void onEventReceived(Event event) {
this.lastUpdate = new Date();
// start to write excel
// …..
// end to write excel
this.endGate.countDown();
}

private void subscribeEvent() {
this.eventId = EventManager.subscribe(this);
}

private void startTaskTimeoutMonitorTask() {
timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {

if (new Date().getTime() – lastUpdate.getTime() > TASK_TIME_OUT_TIME) {
onTaskTimeout();
}
}
}, 0, 15 * 60 * 1000);
}

private void onTaskTimeout() {
// do something on task timeout.
//   ….
// end

// set task to completed to end task.
this.endGate.countDown();
}

private void onTaskError(Throwable t) {
// do something to handle error.
}
}
[/code]

4 问题浮现

正在我为我使用高大上的闭锁代替循环沾沾自喜的时候,测试大爷告诉我,任务经常莫名其妙的失败,并且日志中没有任何异常。开始,这让我觉得很不可思议,因为我已经在call()方法处处理了所有的异常,任务失败时至少也应该有个日志啥的吧。这个问题一直困扰着我,直到有一天分析日志我突然发现任务执行的工作线程(也就是call()方法所在的线程)和接收到事件后的回调并不是同一个线程。这就意味着在查询到报表结果后,所有写Excel,分发结果等等的操作都是在事件回调的线程中执行的,那么一旦这里发生异常原来call()中的catch块自然无法捕获,然后异常就被莫名其妙的吞掉了。好吧,我承认我之前对线程池也就了解点皮毛,对多线程也仅仅是有个概念,想当然的认为在线程池中可以Hold住任务的一切,包括响应这个任务在执行过程中创建的其他线程运行时发生的异常。而且更严重的是按照原来的实现,只有当整个任务执行完成(包括写完Excel)后,才会释放那个闭锁,所以一旦事件回调发生异常,那么整个任务都无法终止。在线程池中发生一个任务永远无法终止的后果,你懂的。

5 重新设计

痛定思痛,我决定重新梳理这个任务的流程。这个需求的难点就是在如何监听并响应B系统给我们发送的事件,实际上,这是一个很经典的生产者–消费者问题,而阻塞队列正好是解决这类问题的利器。重新设计的事件响应流程就变成:当B系统发送事件的时候,事件回调线程会往阻塞队列里面填充一个事件。在另一方面,任务调用完B系统的查询接口后,就开始从阻塞队列中取事件,当事件队列为空的时候,取事件的线程(也就是线程池执行任务的工作线程)会被阻塞。并且,阻塞队列的取操作可以设置超时时间,所以当取到的事件对象为空时,就意味着事件超时了,这样就省去了使用定时任务定时检查任务状态的工作。重新设计的程序是这样的:

EventProxy.java

[code lang=”java”]
public class EventProxy implements IEventFuture {
private static final BlockingQueue<Event> eventQueue = new ArrayBlockingQueue<Event>(10);
private static final long TASK_TIME_OUT_TIME = 15 * 60 * 1000L;

@Override
public void onEventReceived(Event event) {
eventQueue.offer(event);
}

public Event getEvent() throws InterruptedException {
return eventQueue.poll(TASK_TIME_OUT_TIME, TimeUnit.MILLISECONDS);
}
}
[/code]

ExportRptTask.java

[code lang=”java”]
public class ExportRptTask3 implements Callable<Void> {

private SystemBSer systemBSer = new SystemBSer();
private EventProxy eventProxy = new EventProxy();

private int eventId = -1;

@Override
public Void call() throws Exception {
try {
systemBSer.doQuery();
subscribeEvent();

Event event = eventProxy.getEvent();
if (event != null) {
processEvent(event);
} else {
onTaskTimeout();
}
} catch (Throwable t) {
onTaskError(t);
} finally {
EventManager.unsubscribe(this.eventId);
}
return null;
}

private void subscribeEvent() {
this.eventId = EventManager.subscribe(eventProxy);
}

private void processEvent(Event event) {
// do something on receive event.
}

private void onTaskTimeout() {
// do something on task timeout.
//   ….
// end
}

private void onTaskError(Throwable t) {
// do something to handle error.
}
}
[/code]

6 总结

相信各位并发编程的大牛们能在一瞬间就可以把我的程序(包括改进后的)批得体无完肤,不过我还是想分享下我在这个过程中的收获。

  • 在动手写程序前,请先理解你的需求,特别是要注意用已有的模型去识别问题,在本例中,我就是没有识别响应事件的流程其实是个生产者–消费者问题导致了后面的错误
  • 请充分的了解你需要使用的技术和工具。比如,使用线程池你就要了解线程池的工作原理,这样你才能正确的使用这些技术。做技术切忌想当然。
  • 在使用线程池时,重要的操作尽量放在任务的主线程中执行(也就是call()/run()方法所在的线程),否则线程池本身难以对任务进行控制。
  • 如果一定要在任务中再创建新的线程,请确保任务主线程是任务最后退出的线程。切忌不要使用外部线程直接调用任务类的方法,在本例中我就犯了这样的错误。
  • Trackback 关闭
  • 评论 (14)
  1. 把自己的实战写出来就很有价值。
    充分的了解你需要使用的技术。这句话我非常认同,特别是做并发编程时,需要做到知其然,知其所以然,不能想当然。

  2. 1. BlockingQueue 不能保证事件的顺序
    2. 第一段代码是不是最简单的 notify 和 wait 就能解决问题。这里不是适合使用 CountDownLatch 的

    • 愿意听您高见,请详细的说下你的看法

      • 1. 如果两个线程调用 ExportRptTask3.call,那第一个线程是不是有可能得到第二个线程订阅的事件。如果事件只起到通知作用,里面没有任何与订阅相关的数据,这样也没问题。否则便会产生混乱
        2. CountDownLatch 主要用于一个线程与另外一组线程之间的任务协作。你在这里用 CountDownLatch,也能满足的需要,但是有点大材小用

      • 恩,谢谢大侠指点,对于你说的第一点,我觉得在我的场景上应该不会发生,因为我每个任务都是独立的一个实例(就是放在线程池里运行的,应该不会出现多个线程调用同一个实例的call方法),而且我在初始化任务的时候都会创建一个新的EventProxy对象,这样的话会访问事件队列的线程应该就有两个:任务的工作线程(取事件)和事件分发线程(填充事件),应该不会出现你说的那种情况吧,不知道我理解的对不对,请指点。

  3. 还有,Timer 是绝对不推荐使用的

      • 梁海舰
      • 2014/02/08 3:36下午

      不推荐使用的原因是什么呢?我的项目中有一部分定时清理工作是使用Timer来做的.

      • 主要有几点:

        1.Timer的调度是基于绝对时间的,所以当系统时间改变时会影响Timer。
        2.Timer只有一个工作线程,所以当一个任务执行时间很长的时候,会影响后续任务的调度。而ScheduledThreadPoolExecutor不会,除非你创建了一个单线程的ScheduledThreadPoolExecutor
        3.如果任务抛出了一个未检查的异常,将会导致Timer的工作现场被终止,使Timer无法在继续运行。

        所以推荐使用ScheduledThreadExecutor 代替Timer

    • teasp
    • 2014/02/07 5:04下午

    “切忌”应为“切记”

  4. 排版能不能整理下?看着脑袋都大

    • 恩,不好意思,我回头整理下

      • 码农
      • 2014/09/01 11:54上午

      哈哈, 我也想说这个排版问题, 根本没信息看完了

  5. 个人感觉性能上不太好,因为线程在等待任务收到没有放弃CPU ,用wait可以实现,future和CountDownLatch 其实本身提供了超时处理措施,
    很感谢楼主对开发过程中的心路历程分享,并且给予了解决,好文推荐

    • Janle
    • 2016/11/25 4:58下午

    实战写的这么好已经很牛逼了。。

return top