《RabbitMQ官方文档》订阅与发布

之前的教程中,我们创建了一个工作队列。在一个工作队列背后的假设是将每个任务都准确地交付给一个工作人员。在这个环节我们要做些完全不同的事情—我们将要把一个消息传递给多个消费者。这种模式被称为“发布/订阅”。

为了阐述这种模式,我们打算构建一个简单的日志系统。它由两个程序组成—第一个发出日志消息,第二个接收消息并打印出来。

在我们的日志系统中,每次运行接收者程序的副本都将收到消息。这使我们能够运行一个接收者程序,将日志导向磁盘;同时我们可以运行另一个接收者程序查看屏幕上的日志。

实际上,已发布的日志消息将被广播到所有接收者。

交换

在教程的前面部分中,我们发送和接受消息都来自一个队列,现在是时候引进一个完整的Rabbit消息模型了。

让我们快速地回顾一下在之前的教程里介绍的内容:

生产者是发送消息的用户程序。

队列是储存消息的缓存。

消费者是接收消息的用户程序。

在Rabbit消息模型中核心的思想是,生产者从不直接向一个队列发送任何消息。事实上,通常生产者甚至不知道将一个消息是否传递到了某一队列。

相反的,生产者只能向交换器(exchange)发送消息。交换器是一个非常简单的东西。它在一端从生产者接收消息,在另一端将消息发布到队列。交换器必须准确地知道它收到的消息要做什么。它应该被添加到一个特殊的队列?它应该被添加到多个队列?或者它应该被废弃?这些规则由交换类型定义。

有许多交换类型是可用的:direct,tpic,headers和fanout。我们来关注最后一种–fanout,我们来创建一个这种类型的交换器命名为logs:

[code lang=”java”]

channel.exchangeDeclare("logs", "fanout");

[/code]

这个fanout交换器非常简单。就像你可以从它的名字中猜到的,它仅仅是将它收到的所有消息传递到它所知的队列中去。这正是我们的日志系统所需要的。

列出交换器

你可以用有用的rabbitmqctl列出交换器:

sudo rabbitmqctl list_exchanges
在列表里会有一些名字为 amq.*的默认(未命名的),但是现在你可能不需要使用它们。

匿名交换器

在教程的前面部分中,我们还不知道交换器,但仍能够将消息发送到对了,这是因为我们可能用了一个默认的交换器,我们用空字符串(””)来识别它。

想想我们之前是怎样发布一条消息的:

[code lang=”java”]
channel.basicPublish("", "hello", null, message.getBytes());
[/code]

第一个参数是交换器的名字。空字符串指明是默认或匿名的交换器:消息通过“routingKey”选择指定的名称路由到队列,如果它存在的话。

现在我们可以发布到我们命名的交换器了。

[code language=”lang=”]

channel.basicPublish( "logs", "", null, message.getBytes());

[/code]

临时队列

也许你记得我们之前使用的队列都有一个指定的名字(记得hello和task——queue吗?)。能够为一个队列指定名字对于我们来说是很重要的—我们需要将工作者指向相同的队列。当你想要在生产者和消费者之间同用一个队列的时候,给一个队列赋予名字是很重要的。

但这不适用于我们的日志系统。我们想要监听所有的消息,而不只是其中的一部分。我们也仅只对当前活动的消息感兴趣而不是旧的。为了解决这个问题我们需要两个东西。

首先,每当我们连接Rabbit我们需要一个刷新过的空队列。我们可以创建一个队列并使用一个随机的名字,甚至更好的做法是让服务器为我们选择一个随机的名字:

其次,一旦我们使消费者从队列断开后,队列应该自动被删掉。
在Java客户端,当我们使用一个不带参数的queueDeclare()时,我们使用自动生成的名字创建了一个,不持久的,专用的,自动删除的队列。

[code lang=”java”]
String queueName = channel.queueDeclare().getQueue();
[/code]

此时queueName包含一个随机的队列名字。例如它可能是amq.gen-JzTY20BRgKO-HjmUJj0wLg.

绑定

我们已经创建了一个fanout交换器和一个队列。现在我们需要让交换器将消息发送到我们的队列里。交换器和一个队列之间的关系称为一个绑定。

http://www.rabbitmq.com/img/tutorials/bindings.png

[code lang=”java”]
channel.queueBind(queueName, "logs", "");
[/code]

现在开始logs交换器会添加消息到我们的队列。

列出绑定

你可以列出所有存在的绑定,你可以猜到这条命令:
rabbitmqctl list_bindings

将他们联系在一起

http://www.rabbitmq.com/img/tutorials/python-three-overall.png

用于发出消息的生产者程序,看起来和之前的教程里没有太大差异。最重要的改变是现在我们要发布消息到我们的logs交换器而不是匿名的。我们需要在发送的时候提供一个routingKey,但对于fanout交换器来说,这个值会被忽略掉。

以下是EmitLog.java程序的代码:

[code lang=”java”]</pre>
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

private static final String EXCHANGE_NAME = "logs";

public static void main(String[] argv)
throws java.io.IOException {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

String message = getMessage(argv);

channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent ‘" + message + "’");

channel.close();
connection.close();
}
//…
}
[/code]

(https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/EmitLog.java)

正如你所见,在建立一个连接后我们声明了一个交换器,这个步骤是必须的因为发布到一个不存在的交换器是禁止的。

如果没有队列绑定到交换器上消息将丢失,但这对于我们来说没有影响;如果没有消费者在监听,我们可以安全地丢弃消息。

这是ReceiveLogs.java:的代码:

[code lang=”java”]

import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received ‘" + message + "’");
}
};
channel.basicConsume(queueName, true, consumer);
}
}

[/code]

https://github.com/rabbitmq/rabbitmq-tutorials/blob/master/java/ReceiveLogs.java

像我们之前那样编译它。

javac -cp $CP EmitLog.java ReceiveLogs.java

 

如果你想要把日志保存到文件,只需打开控制台并输入:

java -cp $CP ReceiveLogs > logs_from_rabbit.log

如果你想要在屏幕里看到日志,spawn一个新的终端并且运行:

java -cp $CP ReceiveLogs

当然,要发出日志类型:

java -cp $CP EmitLog

使用rabbitmqctl list_bindings 你可以验证代码实际上是根据需要创建绑定和队列。在两个ReceiveLogs.java程序中你应该可以看到这些东西:

sudo rabbitmqctl list_bindings
# => Listing bindings …
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => …done.

结果解释很直接:数据从交换器logs而来去到两个系统分配名字的队列。这正是我们想要的。

 

要了解如何监听消息的一个子集,我们继续阅读教程4。

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 《RabbitMQ官方文档》订阅与发布

  • Trackback 关闭
  • 评论 (0)
  1. 暂无评论

return top