讨喜的隔离可变性(十)使用Transactor

声明:本文是《Java虚拟机并发编程》的第五章,感谢华章出版社授权并发编程网站发布此文,禁止以任何形式转载此文。

Akka transactor或事务角色为我们提供了一种将多个角色的执行过程合并到一个事务中的方法。顾名思义,transactor可以将多个角色对于托管STM Ref对象的更改变成原子操作,即仅当外围事务提交成功之后,对于那些托管对象的变更才能生效,否则所有的变更都会被丢弃。

Transactor提供了三种处理消息的方法:

  • 默认情况下,Transactor会在其自己的事务中处理消息。
  • 实现normally()函数。该函数不属于任何事物,其主要功能是独立地处理我们所选择的消息。
  • 申请让消息被协调处理,即使其作为总控事务的一部分来执行。

总体而言,Transactor为我们提供了将其他角色链接到我们的协调事务里的弹性。此外,transactor还提供了前置和后置于事务的可选函数,以便于我们可以提前为事务做好准备或执行某些后置提交操作。

还是老规矩,我们先用Java创建一个transactor,然后再用Scala实现一遍。

在Java中使用Transactor

为了能够在Java中使用transactor,我们需要继承UntypedTransactor类并实现atomically()函数。除此之外,如果我们想要在事务中包含其他角色,则还需要实现coordinate()函数。下面就让我们用transactor来重新实现账户转账的例子。首先还是从我们将会用到的消息类开始说起。

在新版的示例中,我们定义了一个含有存款金额字段的不可变类Deposit,并将该类作为驱动存款操作执行的消息。

[code lang=”java”]
public class Deposit {
public final int amount;
public Deposit(final int theAmount) { amount = theAmount; }
}
[/code]

接下来,我们还需要定义一个与Deposit结构完全相同的消息类Withdraw:

[code]
public class Withdraw {
public final int amount;
public Withdraw(final int theAmount) { amount = theAmount; }
}
[/code]

对于获取账户余额的请求消息而言,由于本身不需要携带任何数据,所以我们只需定义一个空类即可:

[code]
public class FetchBalance {}
[/code]

与此相对地,针对上述请求消息的响应消息Balance则是一个含有有实际账户余额的不可变类:

[code lang=”java”]
public class Balance {
public final int amount;
public Balance(final int theBalance) { amount = theBalance; }
}
[/code]

最后,我们还需要定义Transfer消息,该消息将会包含转账操作的源账户和目的账户以及待转金额:

[code lang=”java”]
public class Transfer {
public final ActorRef from;
public final ActorRef to;
public final int amount;
public Transfer(final ActorRef fromAccount,
final ActorRef toAccount, final int theAmount) {
from = fromAccount;
to = toAccount;
amount = theAmount;
}
}
[/code]

在本例中,AccountService tranactor将会用到Transfer消息,而Account transactor则会使用我们刚才定义的其他消息。下面让我们先来看一下Account transactor的实现代码:

[code]
public class Account extends UntypedTransactor {
private final Ref<Integer> balance = new Ref<Integer>(0);
public void atomically(final Object message) {
if(message instanceof Deposit) {
int amount = ((Deposit)(message)).amount;
if (amount > 0) {
balance.swap(balance.get() + amount);
System.out.println("Received Deposit request " + amount);
Using Transactors • 203
}
}
if(message instanceof Withdraw) {
int amount = ((Withdraw)(message)).amount;
System.out.println("Received Withdraw request " + amount);
if (amount > 0 && balance.get() >= amount)
balance.swap(balance.get() – amount);
else {
System.out.println("…insufficient funds…");
throw new RuntimeException("Insufficient fund");
}
}
if(message instanceof FetchBalance) {
getContext().replySafe(new Balance(balance.get()));
}
}
}
[/code]

Account类继承自UntypedTransactor并且实现了atomically()函数。该函数将会运行在一个给定事务的上下文环境中——这里的事务可以是调用方所在的事务,如果没显式给出的话也可能是一个独立的事务。在atomically()函数中,如果接收到的消息类型为Deposit,我们就会把存款的数额与当前余额相加之后保存在STM托管的Ref对象中。如果接收到的消息类型为Withdraw并且当前余额大于取款金额时,我们才会在balance中减去取款金额,否则我们会抛出一个异常。而一旦抛出异常,该行为将会触发外围事务的回滚。最后,如果收到的消息类型为FetchBalance,我们只需把当前账户余额balance的值返回给发送方即可。由于整个函数都是在一个事务中运行的,所以在一个transactor中对于Ref对象进行多次访问是没关系的。而仅当外围事务被提交之后,我们对Ref对象所做的变更才能生效,请记住,示例中所涉及的不可变状态是需要我们人工维护的。

下面我们将实现AccountService transactor,其主要功能就是负责协调目标账户(transactor)上的存款操作和源账户(另一个transactor)上的取款操作,实现代码如下所示:

[code lang=”java”]

public class AccountService extends UntypedTransactor {
@Override public Set<SendTo> coordinate(final Object message) {
if(message instanceof Transfer) {
Set<SendTo> coordinations = new java.util.HashSet<SendTo>();
Transfer transfer = (Transfer) message;
coordinations.add(sendTo(transfer.to, new Deposit(transfer.amount)));
204 • Chapter 8. Favoring Isolated Mutability
coordinations.add(sendTo(transfer.from,
new Withdraw(transfer.amount)));
return java.util.Collections.unmodifiableSet(coordinations);
}
return nobody();
}
public void atomically(final Object message) {}
}

[/code]

由于AccountService的唯一职责就是协调存取款操作,所以在coordinate()函数中,我们需要将合适的消息分别发送给源账户和目标账户。为了实现这一目的,我们需要将角色以及每个角色所对应的消息都聚集在一个集合中。当我们将该集合自coordinate()函数返回给调用方时,AccountService transactor的父类将会把合适的消息发往集合中的每一个transactor。而一旦消息被发出,则其自身的automically()实现将会被调用。但由于这里我们没有额外的事情要做,所以就只写了一个空的atomically()函数。

下面让我们写一些的测试代码来检验上述这些transactor的功能:

[code lang=”java”]

public class UseAccountService {
public static void printBalance(
final String accountName, final ActorRef account) {
Balance balance =
(Balance)(account.sendRequestReply(new FetchBalance()));
System.out.println(accountName + " balance is " + balance.amount);
}
public static void main(final String[] args)
throws InterruptedException {
final ActorRef account1 = Actors.actorOf(Account.class).start();
final ActorRef account2 = Actors.actorOf(Account.class).start();
final ActorRef accountService =
Actors.actorOf(AccountService.class).start();
account1.sendOneWay(new Deposit(1000));
account2.sendOneWay(new Deposit(1000));
Thread.sleep(1000);
printBalance("Account1", account1);
printBalance("Account2", account2);
System.out.println("Let’s transfer $20… should succeed");
accountService.sendOneWay(new Transfer(account1, account2, 20));
Using Transactors • 205
Thread.sleep(1000);
printBalance("Account1", account1);
printBalance("Account2", account2);
System.out.println("Let’s transfer $2000… should not succeed");
accountService.sendOneWay(new Transfer(account1, account2, 2000));
Thread.sleep(6000);
printBalance("Account1", account1);
printBalance("Account2", account2);
Actors.registry().shutdownAll();
}
}

[/code]

就交互和使用方法而言,角色和transactor实际是没什么区别的。如果我们将一个普通的消息(如new Deposit(1000))发送给transactor,则该消息将会自动被包裹到一个事务中。此外,我们也可以通过创建akka.transactor.Coordinated实例并把消息包裹进去(例如,new Coordinated(new Deposit(1000)))的方法来构建我们自己的协调事务。在本例中,由于我们只处理单向消息,所以在执行下一步查询之前我们都会插入一些延时以便使消息处理能够彻底完成。这种做法为协调事务成功执行或失败回滚提供了时间,同时也便于我们从随后的打印函数中观察到事务执行的效果。

只有在相关transactor的消息全部成功处理完之后,协调事务才能提交,其中协调请求的等待时间至多为事务超时时间(可配置)。上述测试代码的输出结果如下所示:

[code]
Received Deposit request 1000
Received Deposit request 1000
Account1 balance is 1000
Account2 balance is 1000
Let’s transfer $20… should succeed
Received Deposit request 20
Received Withdraw request 20
Account1 balance is 980
Account2 balance is 1020
Let’s transfer $2000… should not succeed
Received Withdraw request 2000
…insufficient funds…
Received Deposit request 2000
Account1 balance is 980
Account2 balance is 1020

[/code]

从输出结果中我们可以看到,前两次转存款操作和第一次转账操作都干脆利落地完成,而第二次转账操作则由于待转金额大于源账户的当前余额而失败。所以虽然转账操作的存款步骤顺利完成(由于存款和取款动作是并发执行的,所以我们从输出结果中所看到的最后一次转账操作的执行步骤可能每次都不一样),但取款步骤却没有成功,从而导致整个转账操作失败并回滚。从最后两条输出结果我们看出,第二次转账的存款步骤所产生的变更被丢弃,两个账户的余额又恢复到第二次转账之前的状态。

在Scala中使用Transactor

为了在Java中使用transactor,我们需要继承UntypedTransactor类并实现其atomically()函数。而如果我们想要在事务中包含其他角色,则还需要实现coordinate()函数。接下来我们会将上面的示例从Java翻译成Scala,首先我们还是从一些消息类的定义开始入手,如下所示,这些消息类用Scala的case类实现起来非常简洁。

[code lang=”scala”]

case class Deposit(val amount : Int)
case class Withdraw(val amount : Int)
case class FetchBalance()
case class Balance(val amount : Int)
case class Transfer(val from : ActorRef, val to : ActorRef, val amount : Int)
Next let’s translate the Account transactor to Scala. We can use pattern
matching to handle the three messages.
class Account extends Transactor {
val balance = Ref(0)
def atomically = {
case Deposit(amount) =>
if (amount > 0) {
balance.swap(balance.get() + amount)
println("Received Deposit request " + amount)
}
case Withdraw(amount) =>
println("Received Withdraw request " + amount)
if (amount > 0 && balance.get() >= amount)
balance.swap(balance.get() – amount)
else {
println("…insufficient funds…")
Using Transactors • 207
throw new RuntimeException("Insufficient fund")
}
case FetchBalance =>
self.replySafe(Balance(balance.get()))
}
}

[/code]

接下来,我们需要把Account transactor翻译成Scala的实现方式,这里我们可以使用模式匹配来处理上面定义的三种消息。

[code]
class Account extends Transactor {
val balance = Ref(0)
def atomically = {
case Deposit(amount) =>
if (amount > 0) {
balance.swap(balance.get() + amount)
println("Received Deposit request " + amount)
}
case Withdraw(amount) =>
println("Received Withdraw request " + amount)
if (amount > 0 && balance.get() >= amount)
balance.swap(balance.get() – amount)
else {
println("…insufficient funds…")
Using Transactors • 207
throw new RuntimeException("Insufficient fund")
}
case FetchBalance =>
self.replySafe(Balance(balance.get()))
}
}
[/code]

下面让我们一起来翻译AccountService transactor。这里我们仍然将atomically()函数置空,同时我们在coordinate()函数中指定了哪些对象是需要参与到事务执行过程中的。与Java版的代码相比,Scala这边的实现代码在语法上要更加简洁:

[code]
class AccountService extends Transactor {
override def coordinate = {
case Transfer(from, to, amount) =>
sendTo(to -> Deposit(amount), from -> Withdraw(amount))
}
def atomically = { case message => }
}
[/code]

最后,我们用下面的测试代码来检验这些transactor的运行情况:

[code lang=”scala”]
object UseAccountService {
def printBalance(accountName : String, account : ActorRef) = {
(account !! FetchBalance) match {
case Some(Balance(amount)) =>
println(accountName + " balance is " + amount)
case None =>
println("Error getting balance for " + accountName)
}
}
def main(args : Array[String]) = {
val account1 = Actor.actorOf[Account].start()
val account2 = Actor.actorOf[Account].start()
val accountService = Actor.actorOf[AccountService].start()
account1 ! Deposit(1000)
account2 ! Deposit(1000)
Thread.sleep(1000)
printBalance("Account1", account1)
printBalance("Account2", account2)
208 • Chapter 8. Favoring Isolated Mutability

[/code]

虽然上述代码只是Java版本对应代码的直译,但在这里我们再次见证了Scala在语法简洁方面的优势。通过观察下面的输出结果我们可以看到,Scala版示例的行为与Java版本是完全相同的。

[code]
Received Deposit request 1000
Received Deposit request 1000
Account1 balance is 1000
Account2 balance is 1000
Let’s transfer $20… should succeed
Received Deposit request 20
Received Withdraw request 20
Account1 balance is 980
Account2 balance is 1020
Let’s transfer $2000… should not succeed
Received Deposit request 2000
Received Withdraw request 2000
…insufficient funds…
Account1 balance is 980
Account2 balance is 1020
[/code]

通过上面两个示例,我们学习了如何在Java和Scala中实现transactor。Transactor集角色与STM的优点于一身,并支持多个独立运行角色之间的一致性。与STM的使用场景类似,transactor非常适用于那些写冲突非常不频繁的应用程序。理想情况下,如果多个角色需要进行某种形式的投票以作出某项决定,则用transactor来实现将会非常方便。

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 讨喜的隔离可变性(十)使用Transactor

  • Trackback 关闭
  • 评论 (0)
  1. 暂无评论

return top