《JAVA多线程编程实战指南》之Active Object(主动对象)模式

本文是《JAVA多线程编程实战指南》的样章,感谢作者授权并发网(ifeve.com)发表此文。感谢demochen整理此文。

8.1 Active Object

Active Object模式是一种异步编程模式。它通过对方法的调用(Method Invocation)与方法的执行(Method Execution)进行解耦(Decoupling)来提高并发性。若以任务的概念来说,Active Object模式的核心则是它允许任务的提交(相当于对异步方法的调用)和任务的执行(相当于异步方法的真正执行)分离。这有点类似于System.gc()这个方法:客户端代码调用完gc()后,一个进行垃圾回收的任务被提交,但此时JVM并不一定进行了垃圾回收,而可能是在gc()方法调用返回后的某段时间才开始执行任务-回收垃圾。我们知道,System.gc()的调用方代码是运行在自己的线程上(通常是main线程派生的子线程),而JVM的垃圾回收这个动作则由专门的工作者线程(垃圾回收线程)来执行。换而言之,System.gc()这个方法所代表的动作(其所定义的功能)的调用方法和执行方法是运行在不同的线程中的,从而提高了并发性。

在进一步介绍Active Object模式之前,我们可先简单地将其核心理解为一个名为ActiveObject的类,该类对外暴露了一些异步方法,如图8-1所示。
asyncService方法的调用方和执行方运行在个子的线程上。在多线程环境下,asyncService方法会被多个线程调用。这时所需的线程安全控制被封装在asyncService方法背后,因此调用方代码无需关心这点,从而简化了调用方代码:从调用方代码来看,调用一个Active Object对象的方法和调用普通Java对象的方法并无实质性差别,如清单8-1所示。

Snip20151213_313

8-1ActiveObject

[code lang=”java”]
ActiveObject ao=…;
Future future = ao.asyncService(“data”);
//执行其他操作
String result = future.get();
System.out.println(result);
[/code]

8.2 Active Object模式的架构

当ActiveObject 模式对外暴露的异步方法被调用时,与该方法调用相关的上下文信息,包括被调用的异步方法名(或其代表的操作)、客户端代码所传递的参数等,会被封装成一个对象。该对象被称为方法请求(Method Request)。方法请求对象会被存入Active Object模式所维护的缓冲区(Activation Queue)中,并由专门的工作者线程负责根据其包含的上下文信息执行相应的操作。也就是说,方法请求所代表的操作则由专门的工作者线程来执行,从而实现了方法的调用和执行的分离,产生了并发。

Active Object模式的主要参与者有以下几种。其类图如图8-2所示。

Snip20151213_314

8-2Active Object

 

  • Proxy:负责对外暴露异步方法接口。其主要方法及职责如下。
  1. asyncServic:该异步方法负责创建于该方法相应的MethodRequest参与者实例,并将其提交给Scheduler参与者实例。该方法的返回值是一个Future参与者实例,客户端代码可以通过它获取异步方法对应的任务的执行结果。
  • MethodRequest:负责将客户端代码对Proxy实例的异步方法的调用封装为一个对象。该对象保留了异步方法的名称及客户端代码传递的参数等上下文信息。它使得Proxy的异步方法的调用和执行分离成为可能。其主要方法及职责如下。
    1.call:根据其所属MethodRequest实例所包含的上下文信息调用Servant实例的相应方法。
  • ActivationQueu:缓冲区,用于临时存储由Proxy的异步方法被调用时所创建的MethodRequest实例。其主要方法及职责如下。
    enqueue:将MethodRequest实例放入缓冲区
    dequeue:从缓冲区中取出一个MethodRequest实例
  • Schedule:负责将Proxy的异步方法所创建的MethodRequest实例存入其维护的缓冲区中,并根据一定的调度策略,对其维护的缓冲区中的MethodRequest实例进行执行。其调度策略可以根据实际需要来定,如FIFO、LIFO和根据MethodRequest中包含的信息所定的优先级等。其主要方法及职责如下。
    enqueu:接受一个MethodRequest实例,并将其存入缓冲区。
    dispatch:反复地从缓冲区中取出MethodRequest实例进行执行。
  • Servant:负责Proxy所暴露的异步方法的具体实现。其主要方法及职责如下。
    doSomething:执行Proxy所暴露的异步方法对应的任务。
  • Future:负责存储和获取Active Object异步方法的执行结果。其主要方法及职责如下:
    get:获取异步方法对应的任务的执行结果。
    set:设置异步方法对应的任务的执行结果。

Active Object模式的序列图如图8-3所示。

Snip20151213_315

8-3Active Object

 

第1步:客户端代码调用Proxy的异步方法asyncService。

第2-7步:asyncService方法创建Future实例作为该方法的返回值,并将客户端代码对该方法的调用封装为MethodRequest对象。然后以所创建的MethodRequest对象作为参数调用Scheduler的enqueue方法,以将MethodRequest对象存入缓冲区。Scheduler的enqueue方法会调用Scheduler所维护的ActivationQueue实例MethodRequest对象存入缓冲区。

第8步:asyncService返回其所创建的Future实例。

第9步:Schedule实例采用专门的工作者线程运行dispatch方法。

第10-12步:dispatch方法调用ActivationQueue实例的dequeue方法,获取一个MethodRequest对象。然后调用MethodRequest对象的call方法。

第13-16步:MethodRequest对象的call方法调用与其相关联的Servant实例的相应方法doSomethin,并将Servant.doSomething方法的返回值设置到Future实例上。

第17步:MethodRequest对象的call方法返回。

上述步骤中,第1-8步是运行在Active Object的客户端线程中的,这几个步骤实现了将客户端代码对Active Object所提供的异步方法的调用封装成对象(MethodRequest),并将其存入缓冲区(ActivationQueue)。这几个步骤实现了任务的提交。第9-17步是运行在ActiveObject的工作者线程中,这些步骤实现从缓冲区中读取MethonRequest,并对其进行执行,实现了任务的执行。从而实现了Active Objec对外暴露的异步方法的调用与执行的分离。

如果客户端代码关系Active Object的异步方法的返回值,则可以在其需要时,调用Future实例的get方法来获取异步方法的真正执行结果。

8.3 Active Object模式实战案例解析

某电信软件有一个彩信短号模块,其主要功能是实现手机用户给其他手机用户发送彩信时,接收方号码可以填写对方的短号。例如13612345678给其同事13787654321发送彩信时,可以将接收方号码填写为对方的短号,如776而非真是的号码。

该模块处理接收到的下发彩信请求的一个关键操作是,查询数据库以获得接收方短号对应的真实号码(长号)。该操作可能因为数据库故障而失败,从而使整个请求无法继续被处理。而数据库故障时可恢复的故障,因此在短号转换为长号的过程中如果出现数据库异常,可以先将整个下方彩信请求消息缓存到磁盘中,等到数据库恢复后,再从磁盘中读取请求信息,进行重试。为方便起见,我们可以通过java的对象序列化API,将表示下方彩信的对象序列化到磁盘文件中从而实现请求缓存。下面我们讨论这个请求缓存操作还需要考虑的其他因素,以及ActiveObject模式如何帮助我们满足这些考虑。

首先,请求消息缓存到磁盘中设计文件I/O这种慢的操作,我们不希望它在请求处理的主线程(即Web服务器的工作者线程)中执行。因为这样会使该模块的响应延时增大,降低系统的响应性,并使得Web服务器的工作者线程因等待I/O而降低了系统的吞吐量。这时异步处理就派上用场了。Active Object模式可以帮助我们实现请求缓存这个任务的提交和执行分离:任务的提交是在web服务器的工作者线程中完成的,而任务的执行(包括序列化对象到磁盘文件中等操作)则是在Active Object工作者线程中执行的。这样,请求处理的主线程在侦测到短号转长号失败时即可触发对当前彩信下发请求就行缓存,接着继续其请求处理,如给客户端响应。而此时,当前请求消息可能正在被Active Object线程缓存到文件中,如图8-4所示。

Snip20151213_316

 

8-4异步实现缓存

其次,每个短号转长号失败的彩信下发请求消息会被缓存为一个磁盘文件,但我们不希望这些缓存文件被存在同一个子目录下,而是希望多个缓存文件会被存储到多个子目录中。每个子目录最多可以存储指定个数(如2000个)的缓存文件。若当前子目录已存满,则新建一个子目录存放新的缓存文件,直到该子目录叶存满,以此类推。当这些子目录的个数到达指定数量(如100和)时,最老的子目录(连同其下的缓存文件,如果有的话)会被删除,从而保证子目录的个数也是固定的。显然,在并发环境下,实现这种控制需要一些并发访问控制(如通过锁来控制),但是我们不希望这种控制暴露给处理请求的其他代码。而Active Object模式中的Proxy参与者可以帮助我们封装并发访问控制。

下面,我们看该案例的相关代码通过应用Active Object模式在实现缓存功能时满足上述两个目标。首先看请求处理的入口类,该类就是本案例的Active Object模式的客户端代码,如清单8-2所示。

清单8-2.彩信下发请求处理的入口类

[code lang=”java”]
public class MMSDeliveryServlet extends HttpServlet {

private static final long serialVersionUID = 5886933373599895099L;

public void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
// 将请求中的数据解析为内部对象
MMSDeliverRequest mmsDeliverReq = this.parseRequest(req.getInputStream());
Recipient shortNumberRecipient = mmsDeliverReq.getRecipient();
Recipient originalNumberRecipient = null;
try {
// 将对方短号转为长号
originalNumberRecipient = convertShortNumber(shortNumberRecipient);
} catch (Exception e) {
// 接收方短号转为长号时发生数据库异常,处罚请求消息的缓存
AsyncRequestPersistence.getInstance().store(mmsDeliverReq);
// 省略其他代码
resp.setStatus(202);
}
}

private MMSDeliverRequest parseRequest(InputStream reqInputStream) {
MMSDeliverRequest mmsDeliverReq = new MMSDeliverRequest();
// 省略其他代码
return mmsDeliverReq;
}

private Recipient convertShortNumber(Recipient shortNumberRecipient) throws SQLException {
Recipient recipent = null;
// 省略其他代码
return recipent;
}

}
[/code]
清单8-2中的doPost方法在侦测到短号转换过程中发生的数据库异常后,通过调用AsyncRequestPersistence类的store方法触发对彩信下发请求消息的缓存。这里,AsyncRequestPersistence类是彩信下发请求缓存入口类,它相当于Active Object模式中的Proxy参与者。尽管本案例涉及的是一个并发环境,但从清单8-2中的代码可见,AsyncRequestPersistence类的客户端代码无须处理多线程同步问题。这是因为多线程同步问题被封装在AsyncRequestPersistence类之后。

AsyncRequestPersistence类的代码如清单8-3所示。

清单8-3.AsyncRequestPersistence类源码。

[code lang=”java”]
//模式角色:ActiveObject.Proxy
public class AsyncRequestPersistence implements RequestPersistence {

private static final long ONE_MINUTE_IN_SECONDS = 60;
private final Logger logger;
private final AtomicLong taskTimeConsumedPerInterval = new AtomicLong(0);
private final AtomicInteger requestSubmittedPerIterval = new AtomicInteger(0);
// 模式角色:ActiveObject.Servant
private final DiskbasedRequestPersistence delegate = new DiskbasedRequestPersistence();
// 模式角色:ActiveObject.Scheduler
private final ThreadPoolExecutor scheduler;

// 用于保存AsyncRequestPersistence的唯一实例
private static class InstanceHolder {

final static RequestPersistence INSTANCE = new AsyncRequestPersistence();
}

// 私有构造器
private AsyncRequestPersistence(){
logger = Logger.getLogger(AsyncRequestPersistence.class);
scheduler = new ThreadPoolExecutor(1, 30, ONE_MINUTE_IN_SECONDS, TimeUnit.SECONDS,
// 模式角色:ActiveObject.ActivationQueue
new ArrayBlockingQueue(200), new ThreadFactory() {

public Thread newThread(Runnable r) {
Thread t;
t = new Thread(r, “AsyncRequestPersistence”);
return t;
}
});
scheduler.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 启动队列监控定时任务
Timer monitorTimer = new Timer(true);
monitorTimer.scheduleAtFixedRate(new TimerTask() {

@Override
public void run() {
if (logger.isInfoEnabled()) {
logger.info(“task count:” + requestSubmittedPerIterval + “,Queue size:”
+ scheduler.getQueue().size() + “,taskTimeConsumedPerInterval:”
+ taskTimeConsumedPerInterval.get() + ” ms”);
}
taskTimeConsumedPerInterval.set(0);
requestSubmittedPerIterval.set(0);
}
}, 0, ONE_MINUTE_IN_SECONDS * 1000);
}

// 获取类AsyncRequestPersistence的唯一实例
@Override
public static RequestPersistence getInstance() {
return InstanceHolder.INSTANCE;
}

@Override
public void store(final MMSDeliverRequest request) {
/**
* 将store方法的调用封装成MethodRequest对象,并存入缓冲区。
*/
Callable methodRequest = new Callable() {

@Override
public Boolean call() throws Exception {
long start = System.currentTimeMillis();
try {
delegate.store(request);
} finally {
taskTimeConsumedPerInterval.addAndGet(System.currentTimeMillis() – start);
}
return Boolean.TRUE;
}
};
scheduler.submit(methodRequest);
requestSubmittedPerIterval.incrementAndGet();
}

}
[/code]
AsyncRequestPersistence类所实现的接口RequestPersistence定义了Active Object对外暴露的异步方法:store方法。由于本案例不关心请求缓存的结果,故该方法没有返回值。其代码如清单8-4所示。

清单8-4.RequestPersistence接口源码

[code lang=”java”]
public interface RequestPersistence {

void store(MMSDeliverRequest request);
}
[/code]
AsyncRequestPersistence类的实例变量scheduler相当于Active Object模式中的Scheduler参与者实例,这里我们直接使用了JDK1.5引入了Executor Framework中的ThreadPoolExecutor。在ThreadPoolExecutor类实例化时,其构造器的第5个参数(BlockingQueue<Runnable>workQueue )我们指定了一个有界阻塞队列:new ArrayBlockingQueue<Runnable>(200)。该队列相当于Active Object模式中的ActivationQueue参与者实例。

AsyncRequestPersistence类的实例变量delegate相当于Active Object模式中的Servant参与者实例。

AsyncRequestPersistence类的store方法利用匿名类生成一个java.util.concurrent.Callable实例methodReques。该实例相当于Active Object模式中的MethodRequest参与者实例。利用闭包(Closure),该实例封装了对Store方法调用的上下文信息(包括调用参数、所调用的方法对应的操作信息)。AsyncRequestPersistence类的store方法通过调用schedule的submit方法,将methodRequest送入ThreadPoolExecutor所维护的缓冲区(阻塞队列)中。确切的说,ThreadPoolExecutor是Scheduler参与者的一个“近似实现”。ThreadPoolExecutor的submit方法相对于Scheduler的enqueue方法,该方法用于接纳MethodRequest对象,以将其存入缓冲区。当ThreadPoolExecutor当前使用的线程数量小于其核心线程数量时,submit方法所接收的任务会直接被新建的线程执行。当ThreadPoolExecutor当前使用的线程数量大于其核心线程数时,submit方法所接收的任务才会被存入其维护的阻塞队列中。不过ThreadPoolExecutor的这种任务处理机制,并不妨碍我们将它用作Scheduler的实现。

methodRequest的call方法会调用delegate的store方法来真正实现请求缓存功能。delegate实例对应的类DiskbasedRequestPersistence是请求消息缓存功能的真正实现者。其代码如清单8-5所示。

清单8-5.DiskbasedRequestPersistence类的源码
[code lang=”java”]
public class DiskbasedRequestPersistence implements RequestPersistence {

// 负责缓存文件的存储管理
private final SectionBasedDiskStorage storage = new SectionBasedDiskStorage();
private final Logger logger = Logger.getLogger(DiskbasedRequestPersistence.class);

@Override
public void store(MMSDeliverRequest request) {
// 申请缓存文件的文件名
String[] fileNameParts = storage.apply4Filename(request);
File file = new File(fileNameParts[0]);
try {
ObjectOutputStream objOut = new ObjectOutputStream(new FileOutputStream(file));
try {
objOut.writeObject(request);
} finally {
objOut.close();
}
} catch (FileNotFoundException e) {
storage.decrementSectionFileCount(fileNameParts[1]);
logger.error(“Failed to store request”, e);
} catch (IOException e) {
storage.decrementSectionFileCount(fileNameParts[1]);
logger.error(“Failed to store request”, e);
}

}

class SectionBasedDiskStorage {

private Deque sectionNames = new LinkedList();
/**
* Key->value:存储目录名->子目录下缓存文件计数器
*/
private Map<String, AtomicInteger> sectionFileCountMap = new HashMap<String, AtomicInteger>();

private int maxFilesPerSection = 2000;
private int maxSectionCount = 100;
private String storageBaseDir = System.getProperty(“user.dir”) + “/vpn”;
private final Object sectionLock = new Object();

public String[] apply4Filename(MMSDeliverRequest request) {
String sectionName;
int iFileCount;
boolean need2RemoveSection = false;
String[] fileName = new String[2];
synchronized (sectionLock) {
// 获取当前的存储子目录名
sectionName = this.getSectionName();
AtomicInteger fileCount;
fileCount = sectionFileCountMap.get(sectionName);
iFileCount = fileCount.get();
// 如果当前存储子目录已满
if (iFileCount >= maxFilesPerSection) {
if (sectionNames.size() >= maxSectionCount) {
need2RemoveSection = true;
}
sectionName = this.makeNewSectionDir();
fileCount = sectionFileCountMap.get(sectionName);

}
iFileCount = fileCount.addAndGet(1);
}

fileName[0] = storageBaseDir + “/” + sectionName + “/” + new DecimalFormat(“0000”).format(iFileCount) + “-”
+ request.getTimeStamp().getTime() / 1000 + “-” + request.getExpiry() + “.rq”;
fileName[1] = sectionName;
if (need2RemoveSection) {
// 删除最老的存储子目录
String oldestSectionName = sectionNames.removeFirst();
this.removeSection(oldestSectionName);
}
return fileName;
}

public void decrementSectionFileCount(String sectionName) {
AtomicInteger fileCount = sectionFileCountMap.get(sectionName);
if (null != fileCount) {
fileCount.decrementAndGet();
}
}

private boolean removeSection(String sectionName) {
boolean result = true;
File dir = new File(storageBaseDir + “/” + sectionName);
for (File file : dir.listFiles()) {
result = result && file.delete();
}
result = result && dir.delete();
return result;
}

private String getSectionName() {
String sectionName;
if (sectionNames.isEmpty()) {
sectionName = this.makeNewSectionDir();
} else {
sectionName = sectionNames.getLast();
}
return sectionName;
}

private String makeNewSectionDir() {
String sectionName;
SimpleDateFormat sdf = new SimpleDateFormat(“MMddHHmmss”);
sectionName = sdf.format(new Date());
File dir = new File(storageBaseDir + “/” + sectionName);
if (dir.mkdir()) {
sectionNames.addLast(sectionName);
sectionFileCountMap.put(sectionName, new AtomicInteger(0));
} else {
throw new RuntimeException(“Cannot create section dir ” + sectionName);
}
return sectionName;
}
}

}
[/code]
methodRequest的call方法的调用者代码是运行在ThreadPoolExecutor所维护的工作者线程中的,这就保证了store方法的客户端和真正的执行方是分别运行在不同的线程中的:服务器工作者线程负责触发请求消息缓存,ThreadPoolExecutor所维护的工作者线程负责将请求消息序列化到磁盘文件中。
DiskbasedRequestPersistence类的store方法中调用的SectionBasedDiskStorage类的apply4Filename方法中包含了一些多线程同步控制代码(见清单8-5所示)。这部分控制由于是封装在DiskbasedRequestPersistence的内部类中,对于该类外的代码是不可见的。因此,AsyncRequestPersistence的客户端代码无法知道该细节,这体现了Active Object模式对并发访问控制的封装。

8.4Active Object模式的评价与实现考量

Active Object模式通过将方法的调用与执行分离,实现了异步编程。有利于提高并发性,从而提高了系统的吞吐率。

Active Object模式还有个好处是它可以将任务(MethodRequest)的提交(调用异步烦方法)和任务的执行策略(Execution Policy)分离。任务的执行策略被封装在Scheduler的实现类之内,因此它对外是“不可见”的,一旦需要变动也不会影响其他代码,从而降低了系统的耦合性。任务的执行策略可以反映以下一些问题。

  • 采用什么顺序去执行任务,如FIFO、LIFO,或者基于任务中包含的信所定的优先级?
  • 多少个任务可以并发执行?
  • 多少个任务可以被排队等待执行?
  • 如果有任务由于系统过载被拒绝,此时哪个任务该被选中作为牺牲品,应用程序该如何被通知到?
  • 任务执行前、执行后需要执行哪些操作?

这意味着,任务的执行顺序可以和任务的提交顺序不同,可以采用单线程也可以采用多线程去执行任务等。
当然,好处的背后总是隐藏着代价,Active Object模式实现异步编程也有其代价。该模式的参与者有6个之多,其实现过程也包含了不少中间的处理:MethodRequest对象的生成、MethodRequest对象的移动(进出缓冲区)、MethodRequest对象的运行调度和线程上下文切换等。这些处理都有其空间和时间的代价。因此,Active Object模式适合于分解一个比较耗时的任务(如涉及I/O操作的任务):将任务的发起和执行进行分离,以减少不必要的等待时间。

虽然模式的参与者较多,但正如本章案例的实现代码所展示的,其中大部分的参与者我们可以利用JDK自身提供的类来实现,以节省编码时间,如表8-1所示。

表8-1 使用JDK 现有类实现Active Object的一些参与者

 

参与者名称 可以借用的JDK类 备注
Scheduler Java Executor Framework中的java.util.
concurrent.ExecutorService接口的相关实现类,如java.util.concurrent.ThreadPool Executor
ExecutorService接口所定义的submit(Callable task)方法相当于图8-2中的enqueue方法
ActivationQueue java.util.concurrent.LinkedBlockingQueue 若Scheduler采用java.util. concurrent.
ThreadPoolExecutor,则java.util.concurrent.
LinkedBlocking Queue实例作为ThreadPool Executor构造器的参数传入即可
MethodRequest java.util.concurrent.Callable接口的实现类 Callable接口比起Runnable接口的优势在于它定义的call方法有返回值,便于将该返回值传递给Future实例。通常使用callable接口的匿名实现类即可
Future java.util.concurrent.Future ExecutorService接口所定义的submit(Callable task)方法的返回值类型就是java.util.concurrent. Future

8.4.1错误隔离

错误隔离指一个任务的处理失败不影响其他任务的处理。每个MethodRequest实例可以看作一个任务。那么,Scheduler的实现类在执行MethodRequest时需要注意错误隔离。选用JDK中现成的类(如ThreadPoolExecutor)来实现Scheduler的一个好处就是这些类可能已经实现了错误隔离。而如果自己编写代码实现Scheduler,用单个Active Object工作者线程逐一执行所以任务,则需要特别注意线程的run方法的异常处理,确保不会因为个别任务执行时遇到一些运行时异常而导致整个线程终止。如清单8-6所示的实例代码。

清单8-6.自己动手实现Scheduler的错误隔离实例代码
[code lang=”java”]
public class CustomScheduler implements Runnable {

private LinkedBlockingQueue activationQueue = new LinkedBlockingQueue();

@Override
public void run() {
dispatch();
}

public Future enqueue(Callable methodRequest) {
final FutureTask task = new FutureTask(methodRequest) {

public void run() {
try {
super.run();
// 捕获所有可能抛出的对象,避免该任务运行失败而导致其所在的线程终止
} catch (Throwable t) {
this.setException(t);
}
}
};
try {
activationQueue.put(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return task;
}

public void dispatch() {
while (true) {
Runnable methodRequest;
try {
methodRequest = activationQueue.take();
// 防止个别任务执行失败导致线程终止的代码在run方法中
methodRequest.run();
} catch (InterruptedException e) {
// 处理该异常
e.printStackTrace();
}
}
}
}
[/code]

  8.4.2缓冲区监控

如果Active Object是有界缓冲区,则对缓冲区的当前大小进行监控无论是对于运维还是测试来说都是有其意义。从测试的角度来看,监控缓冲区有助于确定区容量的建议值(合理值)。如清单8-3所示的代码,即通过定时任务周期性地调用ThreadPoolExecutor的getQueue方法对缓冲区的大小进行监控。当然,在监控缓冲区的时候,往往只需要大致的值,因此在监控代码中需要避免不必要的锁。

8.4.3缓冲区饱和处理策略

当任务的提交速率大衣任务的执行速率时,缓冲区可能逐渐积压到满。这时新提交的任务会被拒绝。无论是自己编写代码还是利用JDK现有类来实现Scheduler,对于缓冲区满时新任务提交失败,我们需要一个处理策略用于决定此时哪个任务会成为“牺牲品”,若使用ThreadPoolExecutor来实现Scheduler有个好处,是它已经提供了几个缓冲区饱和处理策略的实现代码,应用代码可以直接调用。如清单8-3所示的代码,本章案例中我们选择了在任务的提交方线程中执行被拒绝的任务作为处理策略。

java.util.concurrent.RejectedExecutionHandler接口是ThreadPoolExecutor对缓冲区饱和处理策略的抽象,JDK中提供的具体实现类如表8-2所示。

 

实现类 所实现的处理策略
ThreadPoolExecutor.AbortPolicy 直接抛出异常
ThreadPoolExecutor.DiscardPolicy 丢弃当前被拒绝的任务(而不抛出任何异常)
ThreadPoolExecutor.DiscardOldestPolicy 将缓冲区中最老的任务丢弃,然后重新尝试接纳被拒绝的任务
ThreadPoolExecutor.CallerRunsPolicy 在任务的提交方线程中运行被拒绝的任务

当然,对于ThreadPoolExecutor而言,其工作队列满不移地就意味着新提交的任务会被拒绝。当其最大线程池大小大于其核心线程池大小时,工作队列满的情况下,新提交的任务会用所有核心线程之外的新增线程来执行,直到工作者线程数达到最大线程数时,新提交的任务才会被拒绝。

8.4.4 Scheduler空闲工作者线程清理

如果Scheduler采用多个工作者线程(如采用ThreadPoolExecutor这样的线程池)来执行任务,则可能需要清理空闲的线程以节约资源。如清单8-3的代码就是直接使用ThreadPoolExecutor的现有功能,在初始化实例时通过指定其构造器的第3,4个参数(long keepAliveTime、TimeUnit unit),告诉ThreadPoolExecutor对于核心工作者线程以外的线程,若已经空闲了指定时间,则将其清理掉。

8.5 Active Object模式的可复用实现代码

尽管利用JDK中的现成类可以极大地简化Active Object模式的实现,但如果需要频繁地在不同场景下使用ActiveObject 模式,则需要一套更利于复用的代码,以节约编码的时间和使代码更加易于理解。清单8-7展示了一段基于Java动态代理的可复用的Active Object模式Proxy参与者实现代码。

清单8-7.可复用的Active Object 模式Proxy参与者实现
[code lang=”java”]
/**
* Active Object模式Proxy参与者的可复用表现 模式角色:ActiveObject.Proxy
*/
public class ActiveObjectProxy {

private static class DispatchInvocationHandler implements InvocationHandler {

private final Object delegate;
private final ExecutorService scheduler;

public DispatchInvocationHandler(Object delegate, ExecutorService executorService){
this.delegate = delegate;
this.scheduler = executorService;
}

private String makeDelegateMethodName(final Method method, final Object[] arg) {
String name = method.getName();
name = “do” + Character.toUpperCase(name.charAt(0)) + name.substring(1);
return name;
}

public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
Object returnValue = null;
final Object delegate = this.delegate;
final Method delegateMethod;
if (Future.class.isAssignableFrom(method.getReturnType())) {
delegateMethod = delegate.getClass().getMethod(makeDelegateMethodName(method, args),
method.getParameterTypes());
final ExecutorService scheduler = this.scheduler;

Callable methodRequest = new Callable() {

@Override
public Object call() throws Exception {
Object rv = null;
try {
rv = delegateMethod.invoke(delegate, args);
} catch (IllegalArgumentException e) {
throw new Exception(e);
} catch (IllegalAccessException e) {
throw new Exception(e);
} catch (InvocationTargetException e) {
throw new Exception(e);
}
return rv;
}

};
Future future = scheduler.submit(methodRequest);
returnValue = future;
} else {
// 若拦截到的方法调用不是异步方法则直接转发
delegateMethod = delegate.getClass().getMethod(method.getName(), method.getParameterTypes());
returnValue = delegateMethod.invoke(delegate, args);
}
return returnValue;
}
}

/**
* 生成一个实现指定接口的Active Object proxy实例 对interf所定义的异步方法的调用会被转发到servant的相应的doxxx方法。
*
* @param interf 要实现的Active Object接口
* @param servant Active Object的servant 参与者实例
* @param scheduler Active Object的scheduler 参与者实例
* @return Active Object的Proxy 参与者实例
*/
public static T newInstance(Class interf, Object servant, ExecutorService scheduler) {
@SuppressWarnings(“unchecked”)
T f = (T) Proxy.newProxyInstance(interf.getClassLoader(), new Class[] { interf },
new DispatchInvocationHandler(servant, scheduler));

return f;
}
}

[/code]

清单8-7的代码实现了可复用的Active Object模式的Proxy参与者ActiveObjectProxy。ActiveObjectProxy通过使用Java动态代理,动态生成指定接口的代理对象。对该代理对象的异步方法(即返回值类型为java.util.concurrent.Future的方法)的调用会被ActiveObjectProxy实现InvocationHandler(DispatchInvocationHandler)所拦截,并转发给ActiveObjectProxy的newInstance方法中指定的Servant处理。
使用ActiveObjectProxy 实现Active Object模式,应用代码只需要调用ActiveObjectProxy的静态方法new Instance即可。应用代码调用newInstance方法需要指定以下参数:

1)指定Active Object模式对外暴露的接口,该接口作为第1个参数传入。

2)创建Active Object模式对外暴露的接口的实现类。该类的实例作为第2个参数传入。

3)指定一个java.util.concurrent.ExecutorService实例。该实例作为第3个参数传入。

如清单8-8 基于可复用的API快速实现Active Object模式
[code lang=”java”]
public static void main(String[] args) throws InterruptedException, ExecutionException {
SampleActiveObject sao = ActiveObjectProxy.newInstance(SampleActiveObject.class, new SampleActiveObjectImpl(),
Executors.newCachedThreadPool());
Future ft = sao.process(“Something”, 1);
Thread.sleep(50);
System.out.println(ft.get());
}
[/code]

8.6 JAVA 标准库实例

类java.util.concurrent.ThreadPoolExecutor可以看成是Actiev Object模式的一个通用实例。ThreadPoolExecutor自身相当于Active Object模式的Proxy和Scheduler参与者实例。ThreadPoolExecutor的submit方法相当于Active Object模式对外暴露的异步方法。该方法的唯一参数(java.util.concurrent.Callable或者java.lang.Runnable)可以看作是MethodRequest参与者实例。该方法的返回值(java.util.concurrent.Future)相当于Future参与者实例,而ThreadPoolExecutor的构造方法中需要传入的BlockingQueue相当于ActivationQueue参与者实例。

8.7相关模式

8.7.1 Promise模式(第6章)

Active Object 模式的Proxy参与者相当于Promise模式中的Promisor参与者,其asyncService异步方法的返回值类型Future相当于Promise模式中的Promise参与者。

8.7.2 Producer-Consumer 模式(第7章)

整体上看,Active Object模式可以看作Producer-Consumer模式的一个实例:Active Object模式的Proxy参与者可以看成Producer-Consumer模式中的Producer参与者,它“生产”了MethodRequest参与者这种“产品”;Scheduler参与者可以看成Producer-Consumer模式中的Consumer参与者,它“消费”了Proxy参与者所“生产”的MethodRequest。

8.8参考资源

1.维基百科 Active Object模式词条.http://en.wikipedia.org/wiki/Active_object.

2.Douglas C. Schmidt对Active Object模式的定义.http://www.laputan.org/pub/sag/act-obj.pdf.

3.Schmidt, Douglas et al. Pattern-Oriented Software Architecture, Volume 2: Patterns for
Concurrent and Networked Objects. Wiley, 2000.

4.Java theory and practice: Decorating with dynamic proxies. http://www.ibm.com/
developerworks/java/library/j-jtp08305/index.html.

  • Trackback 关闭
  • 评论 (5)
  1. 赞一个,涉及到了异步,容错处理,学习了

    • 180CM
    • 2016/01/05 1:00下午

    这本书是抄袭的 看目录 感觉根《java多线程设计模式》内容基本一样

    • 180CM
    • 2016/01/05 1:12下午

    180CM :
    这本书是抄袭的 看目录 感觉根《java多线程设计模式》内容基本一样

    感觉是抄袭的

  2. 180CM :

    180CM :
    这本书是抄袭的 看目录 感觉根《java多线程设计模式》内容基本一样

    感觉是抄袭的

    先弄清设计模式的概念,设计模式不是一个人总结的,外国人能够介绍,国人也能介绍。《java多线程设计模式》目录里列的哪些设计模式也是国外人总结出来的,并不是其作者自造的。看下具体目录和内容再说吧。现在抄袭的书太多了,有这种怀疑精神是好的,对原创作者有利,但也请慎言。
    感兴趣的不妨下载试读章节看看:http://download.csdn.net/detail/broadview2006/9254525
    csdn上有本书的答疑活动,虽然活动已过期,但是我不忙的时候还会回复:http://bbs.csdn.net/topics/391863274

    • vincentff7
    • 2016/02/22 4:23下午

    内容充实,感谢作者!

return top