讨喜的隔离可变性(三)创建角色

声明:本文是《Java虚拟机并发编程》的第五章,感谢华章出版社授权并发编程网站发布此文,禁止以任何形式转载此文。

正如前面曾经提到过的那样,虽然我们有很多支持角色的类库可供选择,但是在本书中我们将使用Akka。这是一个基于Scala的类库,该类库拥有非常好的性能和可扩展性、并同时支持角色和STM。此外,该类库还可以被用于多种JVM上的语言中。在本章中,我们将注意力集中在Java和Scala身上。而在下一章,我们将会学习如何在其他语言中使用Akka的角色。

 某个角色的生命周期

图 8‑2 某个角色的生存周期

由于Akka是用Scala实现的,所以在Scala中创建和使用角色非常简单并且更加自然,从Akka API的实现里我们也可以看到Scala简约而不简单的风格闪耀其中。除此之外,Akka的开发者们还设计了一套相当出色的传统Java API,可以使我们在Java代码中很方便地创建和使用角色。下面我们将先学习如何在Java中使用这套API,然后再体验一下用Scala时将有着怎样的简化和改变。

用Java创建角色

在Akka中,抽象类akka.actor.UntypedActor用于表示一个角色的抽象表示,而具体的角色定义则只需简单继承这个抽象类并实现其onReceive()函数就可以了——每当有消息到达此角色时该函数将被调用。下面让我们通过一个简单的实例来对上述过程建立一个直观感受。下面我们将会创建一个角色(actor)…不如就写一个可以对扮演不同荧幕人物(role)的请求进行响应的HollywoodActor咋样?

[code lang=”java”]<br />
public class HollywoodActor extends UntypedActor {<br />
public void onReceive(final Object role) {<br />
System.out.println(&quot;Playing &quot; + role +<br />
&quot; from Thread &quot; + Thread.currentThread().getName());<br />
}<br />
}<br />
[/code]

如上所示,onReceive()函数接受一个Object对象作为其参数。在本例中,我们只是简单地将该参数以及负责处理消息的线程的详情打印出来。稍后我们将会学习如何处理不同类型的消息。

在完成了角色(actor)的定义之后,我们还需要创建一个角色的实例,并将该角色(actor)曾经演过的荧幕人物(role)以消息的形式发送给它,下面让我们来实现这部分内容:

[code]<br />
public class UseHollywoodActor {<br />
public static void main(final String[] args) throws InterruptedException {<br />
final ActorRef johnnyDepp = Actors.actorOf(HollywoodActor.class).start();<br />
johnnyDepp.sendOneWay(&quot;Jack Sparrow&quot;);<br />
Thread.sleep(100);<br />
johnnyDepp.sendOneWay(&quot;Edward Scissorhands&quot;);<br />
Thread.sleep(100);<br />
johnnyDepp.sendOneWay(&quot;Willy Wonka&quot;);<br />
Actors.registry().shutdownAll();<br />
}<br />
}<br />
[/code]

在Java中我们通常都是用new来创建对象的,但由于Akka的角色并非简单对象而是活动对象(active objects),所以我们需要用一个特殊函数actorOf()来完成创建动作。此外,我们还可以先用new生成一个实例,然后再调用actorOf()对该实例进行封装以获得一个角色的引用,关于这种创建方式我们稍后会再研究具体细节。当我们创建好了角色之后,就可以通过调用其start()函数来启动该角色。而当我们启动一个角色时,Akka会将其写入一个注册表(registry)中,于是在这个角色停止运行之前我们都可以通过注册表来访问它。在本例中,johnnyDeep即为角色实例的引用,其类型为ActorRef。

接下来,我们通过sendOneWay()函数向johnnyDeep发送了一些附带着我们希望其扮演的荧幕人物(role)的消息。当消息发出之后,其实我们本不用加入那几个100毫秒等待时间的,但插入延时将有助于我们更好地学习角色如何进行线程切换的运作细节。在代码的结尾处,我们关闭了所有运行中的角色。除了代码示例中所使用的shutdownAll()之外,我们还可以逐个调用每个角色的stop()函数或给所有角色发送kill消息的方式来达到关停所有角色的目的。

为了能够运行上面的实例,我们需要先把Akka的库文件都添加到classpath中,然后通过javac对代码进行编译。编译完成之后,我们就可以像运行其他常规Java程序一样运行本节的示例程序。需要再次提醒你的是,请务必记得将所有相关的JAR都添加到classpath中。下面就是我在我的系统上所使用的编译和运行指令:

[code]<br />
javac -d . -classpath $AKKA_JARS HollywoodActor.java UseHollywoodActor.java<br />
java -classpath $AKKA_JARS com.agiledeveloper.pcj.UseHollywoodActor<br />
[/code]

其中AKKA_JARS的定义如下所示:

[code]<br />
export AKKA_JARS=&quot;$AKKA_HOME/lib/scala-library.jar:\<br />
$AKKA_HOME/lib/akka/akka-stm-1.1.3.jar:\<br />
$AKKA_HOME/lib/akka/akka-actor-1.1.3.jar:\<br />
$AKKA_HOME/lib/akka/multiverse-alpha-0.6.2.jar:\<br />
$AKKA_HOME/lib/akka/akka-typed-actor-1.1.3.jar:\<br />
$AKKA_HOME/lib/akka/aspectwerkz-2.2.3.jar:\<br />
$AKKA_HOME/config:\<br />
.&quot;<br />
[/code]

为了使实例代码能否顺利地编译运行,请根据你所使用的操作系统来定义AKKA_JARS环境变量,以便编译器能够正确定位到Scala和Akka的安装路径。其中,scala-library.jar是scala相关的功能集合,而我们既可以使用Akka自带的jar,也可以使用Scala安装路径下的那一份。

默认情况下Akka会将额外的日志消息输出到控制台,关于如何对这一行为进行配置请参阅6.8节。

下面让我们编译并运行示例代码,并观察角色对于消息的响应情况:

[code lang=”java”]<br />
Playing Jack Sparrow from Thread akka:event-driven:dispatcher:global-1<br />
Playing Edward Scissorhands from Thread akka:event-driven:dispatcher:global-2<br />
Playing Willy Wonka from Thread akka:event-driven:dispatcher:global-3<br />
[/code]

通过输出结果我们可以看到,示例角色每次只响应一个消息,并且每次运行角色的线程都是不同的。对于消息处理的过程而言,既可以一个线程处理多个消息,也可以像本例这样由不同线程处理不同的消息——但无论是哪种处理模式,在任意时刻都只能有一个消息被处理。该模式的关键点在于,所有角色都是单线程的,但是在陷入等待状态时角色会优雅地将线程释放而不是抓住线程不撒手。我们在发送消息之后插入的sleep语句的目的就是为了将actor引入等待状态以便更清晰地演示这一运作细节。

上例中,我们创建角色时没有带任何构造函参。而如果需要的话,我们可以在角色的创建过程中引入一些参数。例如,我们可以用好莱坞演员的名字来初始化之前的HollywoodActor:

[code]<br />
public class UseHollywoodActor {<br />
public static void main(final String[] args) throws InterruptedException {<br />
final ActorRef tomHanks = Actors.actorOf(new UntypedActorFactory() {<br />
public UntypedActor create() { return new HollywoodActor(&quot;Hanks&quot;); }<br />
}).start();<br />
tomHanks.sendOneWay(&quot;James Lovell&quot;);<br />
tomHanks.sendOneWay(new StringBuilder(&quot;Politics&quot;));<br />
tomHanks.sendOneWay(&quot;Forrest Gump&quot;);<br />
Thread.sleep(1000);<br />
tomHanks.stop();<br />
}<br />
}<br />
[/code]

新版的HollywoodActor类的构造函数定义了一个名为name的String类型参数。而在onReceive()函数中,我们对于不能识别的消息进行了专门的处理,即简单地在屏幕输出该好莱坞演员未曾饰演过那个未识别的消息所代表的荧幕人物(role)。当然我们也可以采取其他动作,比如返回一个错误码、打日志、向上层调用者抛异常等等。下面让我们看看如何将给这个构造函数传递参数:

[code]<br />
public class HollywoodActor extends UntypedActor {<br />
private final String name;<br />
public HollywoodActor(final String theName) { name = theName; }<br />
public void onReceive(final Object role) {<br />
if(role instanceof String)<br />
System.out.println(String.format(&quot;%s playing %s&quot;, name, role));<br />
else<br />
System.out.println(name + &quot; plays no &quot; + role);<br />
}<br />
}<br />
[/code]

一般情况下,我们都是通过发送消息而不是直接调用函数的方式与角色进行交互的。Akka不希望我们拿到角色的直接引用,而是希望我们只针对ActorRef的引用进行操作。这样一来,Akka就可以确保我们不会往角色里添加其他函数,并且也不会与角色实例进行直接的交互。直接操纵角色实例的行为会将我们带回到共享可变性的泥淖中,而这正是我们极力想要避免。此外,这种受控的角色创建方式也便于Akka更好地回收废弃的角色。所以如果我们试图直接创建一个角色类的实例,Akka将抛出一个内容为“请不要用’new’操作符显示地创建角色实例”的akka.actor.ActorInitializationException异常。

Akka允许我们以一种受控的方式创建角色实例,即我们可以在一个匿名类中实现UntypedActorFactory接口,并在其create()函数中实现创建角色实例的逻辑。而接下来的actorOf()则把一个继承自UntypedActor的普通对象转换为为一个Akka角色。随后,我们和之前一样向这个actor发送几条消息并观察输出结果。

在本例中,HollywoodActor只接受String类型的消息,但我们在测试用例中向其发送了一条值为Politics、类型为StringBuilder的消息。而我们在onReceive()函数中设计的检查逻辑将会发现并处理这一情况。最后,我们会调用stop()函数来终止角色的运行。代码结尾处插入sleep(1000)的目的是为了让角色在结束之前有机会响应所有未处理的消息。最终的输出结果如下所示:

[code]<br />
Hanks playing James Lovell<br />
Hanks plays no Politics<br />
Hanks playing Forrest Gump<br />
[/code]

用Scala创建角色

在Scala中创建Akka角色时,我们没有像在Java版本中那样继承UntypedActor类,而是要继承Actor trait并实现receive()函数。下面让我们用Scala来实现之前刚刚用Java写过的HollywoodActor类:

[code lang=”scala”]<br />
class HollywoodActor extends Actor {<br />
def receive = {<br />
case role =&gt;<br />
println(&quot;Playing &quot; + role +<br />
&quot; from Thread &quot; + Thread.currentThread().getName())<br />
}<br />
}<br />
[/code]

在上面的代码中,receive()函数实现了一个PartialFunction并采用了Scala模式匹配的形式,但为了避免分散注意力我们现在先忽略这些细节。当有消息到达时,receive()函数将被调用;如果对Scala语法还不熟悉的话,你可以暂时先把receive()函数想象成一个大的switch语句,其实现的功能与Java版本是完全相同的。

至此我们已经看到了如何定义一个角色,下面让我们把注意力集中到角色的使用上面:

[code lang=”scala”]<br />
object UseHollywoodActor {<br />
def main(args : Array[String]) :Unit = {<br />
val johnnyDepp = Actor.actorOf[HollywoodActor].start()<br />
johnnyDepp ! &quot;Jack Sparrow&quot;<br />
Thread.sleep(100)<br />
johnnyDepp ! &quot;Edward Scissorhands&quot;<br />
Thread.sleep(100)<br />
johnnyDepp ! &quot;Willy Wonka&quot;<br />
Actors.registry.shutdownAll<br />
}<br />
}<br />
[/code]

Actor类的actorOf()函数有多个重载定义,这里我们所采用的是接受一个角色类名(即代码中的 [HollywoodActor])作为其参数的版本。在角色被创建出来之后,我们随即通过调用start()函数将其启动。在本例中,ActorRef类型的变量johnnyDepp即为我们所创建的角色实例的引用。由于Scala可以进行类型推断,所以我们可以不必在代码中明确指定johnnyDepp的类型。

接下来,我们给johnnyDepp发送了3个附带着我们希望其扮演的荧幕人物的消息。噢,稍等一下,这里有一个细节请你注意,即我们是通过特殊函数!来发送消息的。当你见到actor!message时,请从右向左阅读这个语句,就能明白这条语句的意思是把消息发送给指定的角色。这处细节再次展现了Scala在语法方面的简洁与优雅。通过这种方式,我们就无需再将发送消息的语句写成actor.!(message),而是简单地将句点和括号拿掉,简写成actor!message就行了。如果我们更喜欢Java里发送消息的那个函数,那么我们也可以把Scala简洁的语法用在Java风格的函数上,即把语句写成actor sendOneWay message。上面示例中余下的代码与之前Java版本的示例完全相同,这里就不再赘述。

下面我们将通过scalac编译器对上述代码进行编译,但首先请务必记住要把Akka库文件添加到classpath中。编译完成后,我们就可以像之前运行普通Java程序那样运行上面的scala示例程序。需要再次提醒你的是,请务必记得将所需的JARs加入到你系统的classpath中。下面是我在我的系统上所使用的编译和运行指令,请你根据你系统中Scala和Akka的安装目录来自行调整classpath中相关的路径信息:

[code]<br />
scalac -classpath $AKKA_JARS HollywoodActor.scala UseHollywoodActor.scala<br />
java -classpath $AKKA_JARS com.agiledeveloper.pcj.UseHollywoodActor<br />
[/code]

如果我们想要禁止日志消息输出到控制台的话,请参阅6.8节中的相关内容。在将上述示例代码编译并运行之后,我们可以看到其输出结果与之前的Java版本是非常相似的:

[code]<br />
class HollywoodActor(val name : String) extends Actor {<br />
def receive = {<br />
case role : String =&gt; println(String.format(&quot;%s playing %s&quot;, name, role))<br />
case msg =&gt; println(name + &quot; plays no &quot; + msg)<br />
}<br />
}<br />
[/code]

如果想在创建角色时传些参数给它,如好莱坞演员的名字等,你会发现用Scala来实现会比之前的Java版本简单很多。下面让我们先对HollywoodActor类进行改造,以使其可以接受构造函参:

[code]<br />
class HollywoodActor(val name : String) extends Actor {<br />
def receive = {<br />
case role : String =&gt; println(String.format(&quot;%s playing %s&quot;, name, role))<br />
case msg =&gt; println(name + &quot; plays no &quot; + msg)<br />
}<br />
}<br />
[/code]

如上所示,新版本的HollywoodActor类接受一个名为name的String类型的构造函参。而在receive()函数中,我们对于格式无法识别的消息做了专门的处理。在Scala中我们无需再使用instanceof,receive()函数中的case语句即可实现消息与各种模式之间的匹配——在本例中特指消息类型的匹配。

我们用Java创建接受一个构造函参的角色时还是花了不少力气的,但在Scala中一切变得如此简单:

[code]<br />
object UseHollywoodActor {<br />
def main(args : Array[String]) : Unit = {<br />
val tomHanks = Actor.actorOf(new HollywoodActor(&quot;Hanks&quot;)).start()<br />
tomHanks ! &quot;James Lovell&quot;<br />
tomHanks ! new StringBuilder(&quot;Politics&quot;)<br />
tomHanks ! &quot;Forrest Gump&quot;<br />
Thread.sleep(1000)<br />
tomHanks.stop()<br />
}<br />
}<br />
[/code]

在上面的代码中,我们先用new关键字对角色进行初始化,随后又将实例化好的对象传给actorOf()函数(这是由于Akka禁止在actorOf()函数之外随意地创建actor实例)。通过这一动作,我们就将一个继承自Actor的普通对象转换成了一个Akka角色。接下来,我们同样会给新创建的角色发送3条消息。剩下的代码与Java版本非常相似,这里就不再赘述。最后让我们运行上述示例代码,并确认其输出与Java版本是否相同:

[code]<br />
Hanks playing James Lovell<br />
Hanks plays no Politics<br />
Hanks playing Forrest Gump<br />
[/code]

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 讨喜的隔离可变性(三)创建角色

  • Trackback 关闭
  • 评论 (2)
    • vavio99
    • 2013/08/25 4:12下午

    这个排版也太乱了点吧…

    • 匿名
    • 2013/10/31 6:01下午

    感觉actor的模式就是这样一个情况:一个线程池+队列(负责接收不变的message),线程池中 的线程相当于actor,处理queue中的message,不知道这样类比如何???

return top