讨喜的隔离可变性(五)同时使用多个角色

声明:本文是《Java虚拟机并发编程》的第五章,感谢华章出版社授权并发编程网站发布此文,禁止以任何形式转载此文。

通过前面的学习,我们已经了解了如何创建角色以及如何给角色发送消息,下面让我们来一起学习如何让多个角色协同工作。在第2章中,我们创建了一个统计给定区间内所有素数的程序。在该程序中,我们使用了ExecutorService、Callable、Future以及其他差不多超过一页纸那么多代码。本节我们将会学习如何用Akka角色对该示例进行重构,并且根据之前的惯例我们的介绍顺序还是先Java后Scala。

在Java中同时使用多个角色

假定待统计数字集合中的数字是1千万个,为了统计其中的素数数量,之前我们是将数字集合划分为若干个不相交的子集合,并将这些子集合丢给一些线程去执行统计操作。但这里我们将使用角色来完成同样的功能,下面就让我们从角色的onRecevie()函数开始说起吧:

public class Primes extends UntypedActor {
public void onReceive(final Object boundsList) {
final List<Integer> bounds = (List<Integer>) boundsList;
final int count =
PrimeFinder.countPrimesInRange(bounds.get(0), bounds.get(1));
getContext().replySafe(count);
}

为了统计给定区间内的素数数量,我们需要指定区间的上下限。在本例中,onReceive()函数的参数是一个List,其中前两个元素即为区间的上下限。在onReceive()函数内部,我们调用了PrimeFinder类的countPrimesInRage()函数来统计区间内的素数数量,最后又使用replySafe()函数将统计结果返回给调用者。

在给定了待统计的数字集合之后,我们需要将其划分成若干个不相交的子集合并将统计这些子集合中素数数量的任务委托给各个不同的角色来执行。下面就让我们在静态方法countPrimes()中实现这些逻辑:

public static int countPrimes(
final int number, final int numberOfParts) {
final int chunksPerPartition = number / numberOfParts;
final List<Future<?>> results = new ArrayList<Future<?>>();
for(int index = 0; index < numberOfParts; index++) {
final int lower = index * chunksPerPartition + 1;
final int upper = (index == numberOfParts - 1) ? number :
lower + chunksPerPartition - 1;
final List<Integer> bounds = Collections.unmodifiableList(
Arrays.asList(lower, upper));
final ActorRef primeFinder = Actors.actorOf(Primes.class).start();
results.add(primeFinder.sendRequestReplyFuture(bounds));
}
int count = 0;
for(Future<?> result : results)
count += (Integer)(result.await().result().get());
Actors.registry().shutdownAll();
return count;
}

在确定了每个子集合的范围之后,我们会将其包装在一个不可变集合里——请记住,所有的消息都必须是不可变的。接下来,我们调用sendRequestReplyFuture()这个非阻塞函数来将统计请求发送给各个角色进行处理。在把请求发送出去之后,我们将sendRequestReplyFuture()返回的Future对象(注意这里是akka.dispatch.Future而不是JDK中的java.util.concurrent.Future)保存在一个数组中以便稍后从其中取回各个子集合的统计结果。在任务分派完毕之后,我们就可以循环查询每个Future,即先调用Future的await()函数,待await()函数返回之后再调用其返回值的result()函数来获取一个Scala的Option实例——你可以将其假想为一个包含统计结果的数据单元(如果数据存在的话)。最后我们可以通过调用该实例对象的get()函数来得到一个Integer类型的统计值。

OK,下面就让我们写一个用来检验上述代码的测试用例,其中的待统计数字和子集合划分数是通过命令行传给程序的:

public static void main(final String[] args) {
if (args.length < 2)
System.out.println("Usage: number numberOfParts");
else {
final long start = System.nanoTime();
final int count = countPrimes(
Integer.parseInt(args[0]), Integer.parseInt(args[1]));
Working with Multiple Actors • 179
final long end = System.nanoTime();
System.out.println("Number of primes is " + count);
System.out.println("Time taken " + (end - start)/1.0e9);
}
}
}

main()函数主要负责对上面的统计代码进行测试并记录执行耗时。最后我们还需要实现PrimeFinder这个真正负责统计工作的类:

public class PrimeFinder {
public static boolean isPrime(final int number) {
if (number <= 1) return false;
final int limit = (int) Math.sqrt(number);
for(int i = 2; i <= limit; i++) if(number % i == 0) return false;
return true;
}
public static int countPrimesInRange(final int lower, final int upper) {
int count = 0;
for(int index = lower; index <= upper; index++)
if(isPrime(index)) count += 1;
return count;
}
}

令待统计区间为[1, 1000w]、划分的子区间为100个,则上述示例程序的输出结果如下所示:

Number of primes is 664579
Time taken 3.890996

下面让我们将本节的代码和输出结果与第2.4节的示例代码和输出结果进行比较。虽然两个版本都将子集合数设为100,但Akka版本的示例代码无需显式设定线程池大小。此外,由于这是一个计算密集型问题,所以对于使用ExecutorService的版本而言,其线程池大小的设定是需要随机器CPU核数计算而定的,所以两个版本的性能都差不多,而Akka版本在代码的形式上要比使用ExecutorServer的版本简洁一些。但正如我们在本章后面将会看到的那样,当我们需要让多个线程/角色相互协作的时候,这些区别将会愈发明显。

在Scala中同时使用多角色

如果用Scala来实现这个统计素数数量的程序,那么我们就可以深切体会到Scala在角色的实现以及与角色交互方面的简洁和优雅。下面让我们来看看Scala版本的Primes类是如何实现的:

class Primes extends Actor {
def receive = {
case (lower : Int, upper : Int) =>
val count = PrimeFinder.countPrimesInRange(lower, upper)
self.replySafe(new Integer(count))
}
}
object Primes {
def countPrimes(number : Int, numberOfParts : Int) = {
val chunksPerPartition : Int = number / numberOfParts
val results = new Array[Future[Integer]](numberOfParts)
var index = 0
while(index < numberOfParts) {
val lower = index * chunksPerPartition + 1
val upper = if (index == numberOfParts - 1)
number else lower + chunksPerPartition - 1
val bounds = (lower, upper)
val primeFinder = Actor.actorOf[Primes].start()
results(index) = (primeFinder !!! bounds).asInstanceOf[Future[Integer]]
index += 1
}
var count = 0
index = 0
while(index < numberOfParts) {
count += results(index).await.result.get.intValue()
index += 1
}
Actors.registry.shutdownAll
count
}
def main(args : Array[String]) : Unit = {
if (args.length < 2)
println("Usage: number numberOfParts")
else {
val start = System.nanoTime
val count = countPrimes(args(0).toInt, args(1).toInt)
val end = System.nanoTime
println("Number of primes is " + count)
println("Time taken " + (end - start)/1.0e9)
}
}
}

Scala版本的代码与Java版本有几点不同。首先,Scala版本所使用的消息格式是简单的元组而不是一个不可变列表。其次,receive()函数中的case语句与应用场景十分契合。第三,Java版本中countPrimes()函数里的for循环在这里变成了一个while循环。其原因是,虽然Scala的for循环表达式十分优雅,但会增加Object到基本类型之间的转换开销。为了能够得到比较真实的性能对比,我在这里放弃了优雅。

类似地,在PrimeFinder中,我们也用while循环代替了for循环。

object PrimeFinder {
def isPrime(number : Int) : Boolean = {
if (number <= 1) return false
var limit = scala.math.sqrt(number).toInt
var i = 2
while(i <= limit) {
if(number % i == 0) return false
i += 1
}
return true
}
def countPrimesInRange(lower : Int, upper : Int) : Int = {
var count = 0
var index = lower
while(index <= upper) {
if(isPrime(index)) count += 1
index += 1
}
count
}
}

令待统计区间为[1,1000w]、划分的子区间为100个,则Scala版示例程序的性能如下所示:

Number of primes is 664579
Time taken 3.88375

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 讨喜的隔离可变性(五)同时使用多个角色


方 腾飞

花名清英,并发网(ifeve.com)创始人,畅销书《Java并发编程的艺术》作者,蚂蚁金服技术专家。目前工作于支付宝微贷事业部,关注互联网金融,并发编程和敏捷实践。微信公众号aliqinying。

Latest posts by 方 腾飞 (see all)

FavoriteLoading添加本文到我的收藏
  • Trackback 关闭
  • 评论 (0)
  1. 暂无评论

您必须 登陆 后才能发表评论

return top