《Apache RocketMQ用户指南》之定时消息示例
定时消息示例
原文链接 译者:小村长
什么是定时消息?
定时消息与正常消息的不同之处在于,它是在指定的时间后执行。
应用
- 启动消费者等待传入的订阅消息
[code lang=”java”]
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class ScheduledMessageConsumer {
public static void main(String[] args) throws Exception {
// Instantiate message consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
// Subscribe topics
consumer.subscribe("TestTopic", "*");
// Register message listener
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
+ (System.currentTimeMillis() – message.getStoreTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// Launch consumer
consumer.start();
}
}
[/code]
2. 发送定时消息
[code lang=”java”]
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
public class ScheduledMessageProducer {
public static void main(String[] args) throws Exception {
// Instantiate a producer to send scheduled messages
DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
// Launch producer
producer.start();
int totalMessagesToSend = 100;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// This message will be delivered to consumer 10 seconds later.
message.setDelayTimeLevel(3);
// Send the message
producer.send(message);
}
// Shutdown producer after use.
producer.shutdown();
}
}
[/code]
3. 核实
您应该看到消息比其存储时间晚约10秒钟。
原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 《Apache RocketMQ用户指南》之定时消息示例
暂无评论