Akka笔记之消息传递

英文原文链接译文链接,原文作者:Arun Manivannan ,译者:有孚

在Akka笔记第一篇的介绍中,我们大致介绍了下Akka工具箱中的Actor。在第二篇当中,我们来看一下Actor消息传递的功能。这里还是延用之前使用的那个学生-老师的例子。

在Actor消息的第一部分中,我们会创建一个老师的Actor,但学生Actor则先不创建,而是使用一个叫做StudentSimulatorApp的主程序。

仔细回顾下学生-老师模型

我们现在只考虑StudentSimulatorApp发送给TeacherActor的消息。这里我所说的StudentSimulatorApp指的是一个正常的主程序。

从图中可以看到:
(如果有陌生的术语,没关系,后面我们会详细解释的)

1. 学生创建了一个叫ActorSystem的东西。
2. 他通过ActorSystem来创建了一个叫ActorRef的对象。QuoteRequest消息就是发送给ActorRef的(它是TeacherActor的一个代理)
3. ActorRef将消息发送给Dispatcher
4. Dispatcher将消息投递到目标Actor的邮箱中。
5. 随后Dispatcher将Mailbox扔给一个线程去执行(这点下节会重点讲到)
6. MailBox将消息出队并最终将其委托给真实的Teacher Actor的接收方法去处理。

正如我所说的,看不懂也别担心。现在我们来一步步地详细地分析下。全部讲完后你可以再回过头来看下这五个步骤。

STUDENTSIMULATORAPP程序

我们用这个STUDENTSIMULATORAPP来启动JVM并初始化ActorSystem。

从图中可以看到,StudentSimulatorApp

1. 创建了一个ActorSystem
2. 通过ActorSystem创建了一个Teacher Actor的代理(ActorRef)
3. 将QuoteRequest消息发送给代理

我们现在只关注这三点。

1. 创建ActorSystem

ActorSystem是进入到Actor的世界的一扇大门。通过它你可以创建或中止Actor。甚至还可以把整个Actor环境给关闭掉。

另一方面来说,Actor是一个分层的结构,ActorSystem之于Actor有点类似于java.lang.Object或者scala.Any的角色——也就是说,它是所有Actor的根对象。当你通过ActorSystem的actorOf方法创建了一个Actor时,你其实创建的是ActorSystem下面的一个Actor。

初始化ActorSystem的代码是这样的:

[code lang=”java”]
val system=ActorSystem("UniversityMessageSystem")

[/code]

UniversityMessageSystem只是你给ActorSystem起的一个可爱的名字而已。

2. 创建一个TeacherActor的代理?

我们来看下下面这段代码:

[code lang=”java”]
val teacherActorRef:ActorRef=actorSystem.actorOf(Props[TeacherActor])

[/code]
actorOf是ActorSystem中创建Actor的方法。但是正如你所看到的,它并不会返回我们所需要的TeacherActor。它返回的是一个ActorRef。

这个ActorRef扮演了真实的Actor的一个代理的角色。客户端并不会直接和Actor通信。这也正是Actor模型中避免直接访问TeacherActor中任何的自定义/私有方法或者变量的一种方式。

再重复一遍,消息只会发送给ActorRef,最终才会到达真正的Actor。你是绝对无法直接和Actor进行通信的。如果你真的找到了什么拙劣的方式来直接通信,大家会恨你入骨的。

image

将消息发送给代理

还是只有一行代码。你只需告诉说把QuoteRequest消息发送到ActorRef就好了。Actor中的这个告诉的方式就是一个!号。(ActorRef中确实也有一个tell方法,不过它只是把这个调用委托给了!号)

[code lang=”java”]
//send a message to the Teacher Actor
teacherActorRef!QuoteRequest

[/code]
这就可以了!

如果你认为我在骗你的话,看一下下面StudentSimulatorApp的完整代码:

STUDENTSIMULATORAPP.SCALA

[code lang=”java”]
package me.rerun.akkanotes.messaging.actormsg1

import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.actorRef2Scala
import me.rerun.akkanotes.messaging.protocols.TeacherProtocol._

object StudentSimulatorApp extends App{

//Initialize the ActorSystem
val actorSystem=ActorSystem("UniversityMessageSystem")

//construct the Teacher Actor Ref
val teacherActorRef=actorSystem.actorOf(Props[TeacherActor])

//send a message to the Teacher Actor
teacherActorRef!QuoteRequest

//Let’s wait for a couple of seconds before we shut down the system
Thread.sleep (2000)

//Shut down the ActorSystem.
actorSystem.shutdown()

}

[/code]
好吧,我承认我撒了点小谎。你还得关掉ActorSystem,不然JVM会一直运行下去的。我还让主线程睡眠了一小会儿,以便给点时间让TeacherActor去完成它的任务。我知道这听起来很愚蠢。别担心。后面我们会通过些优雅的测试用例来替换掉这种取巧的方式。

消息

我们刚发送了一个QuoteMessage给ActorRef,但是,还压根儿没看着过这个消息类呢!

说曹操,曹操到:

(实践中推荐你把消息封装成一个好点的对象,这样维护起来容易些)

TeacherProtocol

[code lang=”java”]
package me.rerun.akkanotes.messaging.protocols

object TeacherProtocol{

case class QuoteRequest()
case class QuoteResponse(quoteString:String)

}

[/code]

正如你所想的那样,QuoteRequest就是发送给TeacherActor的那个消息。Actor会回复一个QuoteResponse。

分发器及邮箱

ActorRef把消息处理功能委托给了Dispatcher。实际上,当我们创建ActorSystem和ActorRef的时候,就已经创建了一个Dispatcher和MailBox了。我们来看下它们是干什么的。

邮箱

每个Actor都有一个MailBox(后面会介绍一种特殊的情况)。在我们这个比喻当中,每个老师也有一个邮箱。老师得去检查邮箱并处理消息。在Actor的世界中,则是另一种形式——邮箱一有机会就会要求Actor去完成自己的任务。

同样的,邮箱里也有一个队列来以FIFO的方式来存储并处理消息——它和实际的邮箱还有点不同,真实的邮箱新的信总是在最上面的。

现在讲到分发器了

Dispatcher会完成一些很酷的事。从它的角度来看,它只是从ActorRef中取出一条消息然后将它传给了MailBox。但是,在这后面发生了一件不可意义的事情:

Dispatcher会封装一个ExecutorService(ForkJoinPoll或者ThreadPoolExecutor)。它把MailBox扔到ExecutorService中去运行。

看下Dispatcher里面的一段代码:

[code lang=”java”]
protected[akka] override def registerForExecution(mbox: Mailbox, …): Boolean = {

try {
executorService execute mbox

}

[/code]

什么,你是说要执行一下邮箱?

是的。我们看到MailBox中包含了队列里面的消息。由于Executor得去执行MailBox,所以它得是一个Thread类型。是的没错。MailBox的声明及构造器就是这样的。

下面是MailBox的签名信息。

[code lang=”java”]
private[akka] abstract class Mailbox(val messageQueue: MessageQueue) extends SystemMessageQueue with Runnable

[/code]

TeacherActor

当MailBox的run方法运行的时候,它会从队列中取出一条消息,然后将它传给Actor去处理。

当你把消息传给ActorRef的时候,最终调用的实际是目标Actor里面的一个receive方法。

TeacherActor只是一个很简单的类,它有一个名言的列表,而receive方法很明显就是用来处理消息的。

来看下代码:

TeacherActor.scala

[code lang=”java”]
package me.rerun.akkanotes.messaging.actormsg1

import scala.util.Random

import akka.actor.Actor
import me.rerun.akkanotes.messaging.protocols.TeacherProtocol._

/*
* Your Teacher Actor class.
*
* The class could use refinement by way of
* using ActorLogging which uses the EventBus of the Actor framework
* instead of the plain old System out
*
*/

class TeacherActor extends Actor {

val quotes = List(
"Moderation is for cowards",
"Anything worth doing is worth overdoing",
"The trouble is you think you have time",
"You never gonna know if you never even try")

def receive = {

case QuoteRequest => {

import util.Random

//Get a random Quote from the list and construct a response
val quoteResponse=QuoteResponse(quotes(Random.nextInt(quotes.size)))

println (quoteResponse)

}

}

}

[/code]
TeacherActor的receive方法的模式匹配只会匹配一种消息——QuoteRequest (事实上,模式匹配中最好匹配下默认的情况,不过这个就说来话长了)

receive方法做的就是

1. 匹配QuoteRequest的模式
2. 从名言列表中随机选取一条
3. 构造出一个QuoteResponse
4. 将QuoteResponse打印到控制台上

代码

整个项目的代码可以从Github中下载到。

本文最早发布于我的个人博客: Java译站

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: Akka笔记之消息传递

  • Trackback 关闭
  • 评论 (0)
  1. 暂无评论

return top