《Apache RocketMQ用户指南》之过滤消息示例
消息过滤示例
原文链接 译者:小村长
[code lang=”java”]
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
[/code]
消费者将收到包含TAGA或TAGB或TAGB的消息. 但限制是一条消息只能有一个标签,而这对于复杂的情况可能无效。 在这种情况下,您可以使用SQL表达式筛选出消息.
原理
SQL功能可以通过您在发送消息时放入的属性进行一些计算。 在RocketMQ定义的语法下,您可以实现一些有趣的逻辑。 这是一个例子:
------------ | message | |----------| a > 5 AND b = 'abc' | a = 10 | --------------------> Gotten | b = 'abc'| | c = true | ------------ ------------ | message | |----------| a > 5 AND b = 'abc' | a = 1 | --------------------> Missed | b = 'abc'| | c = true | ------------
语法
RocketMQ只定义了一些基本的语法来支持这个功能。 你也可以很容易地扩展它.
- 数字比较, 像
>
,>=
,<
,<=
,BETWEEN
,=
; - 字符比较, 像
=
,<>
,IN
; IS NULL
或者IS NOT NULL
;- 逻辑运算
AND
,OR
,NOT
;
常量类型是:
- 数字, 像123, 3.1415;
- 字符串, 像‘abc’,必须使用单引号;
NULL
, 特殊常数;- 布尔常量,
TRUE
或FALSE
;
使用限制
只有消费者可以通过SQL92选择消息。 示例:
[code lang=”java”]
public void subscribe(final String topic, final MessageSelector messageSelector)
[/code]
生产者示例
发送时,您可以通过putUserProperty方法在消息中放置属性.
[code lang=”java”]
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();
Message msg = new Message("TopicTest",
tag,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));
SendResult sendResult = producer.send(msg);
producer.shutdown()
[/code]
消费者示例
消费时,使用Message Selector.by Sql通过SQL92选择消息.
[code lang=”java”]
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
[/code]
原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 《Apache RocketMQ用户指南》之过滤消息示例
Hi,又遇到org.apache.rocketmq.client.exception.MQClientException: CODE: 1 DESC: The broker does not support consumer to filter message by SQL92吗?
配置加了enablePropertyFilter=true后,也一样
我也遇到了相同的问题,有没有解决的办法。