《Apache RocketMQ用户指南》之批量消息示例

批量消息示例

原文链接        译者:小村长

 

为什么选择批量消息?

批量发送消息可提高单次发送消息的性能.

使用限制

相同批次的消息应具有:相同的主题,相同的等待消息处理成功但是不支持定时处理. 此外,一个批量的消息的总大小不要错过1MB.

怎么使用批量消息

如果您一次只发送不超过1MB的消息,使用批量发送很方便:

[code lang=”java”]
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try {
producer.send(messages);
} catch (Exception e) {
e.printStackTrace();
//handle the error
}
[/code]

分割成列表

只有在发送大批量时才会增加复杂性,并且您可能不确定是否超出了大小限制(1MiB)。

目前,你最好分开列表:

[code lang=”java”]
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1000 * 1000;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override public boolean hasNext() {
return currIndex < messages.size();
}
@Override public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; //for log overhead
if (tmpSize > SIZE_LIMIT) {
//it is unexpected that single message exceeds the SIZE_LIMIT
//here just let it go, otherwise it will block the splitting process
if (nextIndex – currIndex == 0) {
//if the next sublist has no element, add this one and then break, otherwise just break
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}

}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
//then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
//handle the error
}
}
[/code]

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 《Apache RocketMQ用户指南》之批量消息示例

  • Trackback 关闭
  • 评论 (0)
  1. 暂无评论

return top