《Apache RocketMQ用户指南》之过滤消息示例

消息过滤示例

原文链接        译者:小村长

在大多数情况下,tag是一种简单而有用的设计,用于选择所需的信息。 例如:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

消费者将收到包含TAGA或TAGB或TAGB的消息. 但限制是一条消息只能有一个标签,而这对于复杂的情况可能无效。 在这种情况下,您可以使用SQL表达式筛选出消息.

原理

SQL功能可以通过您在发送消息时放入的属性进行一些计算。 在RocketMQ定义的语法下,您可以实现一些有趣的逻辑。 这是一个例子:

------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------


语法

RocketMQ只定义了一些基本的语法来支持这个功能。 你也可以很容易地扩展它.

  1. 数字比较, 像 >, >=, <, <=, BETWEEN, =;
  2. 字符比较, 像 =, <>, IN;
  3. IS NULL 或者 IS NOT NULL;
  4. 逻辑运算AND, OR, NOT;

常量类型是:

  1. 数字, 像123, 3.1415;
  2. 字符串, 像‘abc’,必须使用单引号;
  3. NULL, 特殊常数;
  4. 布尔常量, TRUEFALSE;

使用限制

只有消费者可以通过SQL92选择消息。 示例:

public void subscribe(final String topic, final MessageSelector messageSelector)

生产者示例

发送时,您可以通过putUserProperty方法在消息中放置属性.

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();

Message msg = new Message("TopicTest",
    tag,
    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));

SendResult sendResult = producer.send(msg);
   
producer.shutdown()

消费者示例

消费时,使用Message Selector.by Sql通过SQL92选择消息.

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();

 

 

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 《Apache RocketMQ用户指南》之过滤消息示例


FavoriteLoading添加本文到我的收藏
  • Trackback 关闭
  • 评论 (2)
    • zzzaaa333444
    • 2018/06/20 1:21下午

    Hi,又遇到org.apache.rocketmq.client.exception.MQClientException: CODE: 1 DESC: The broker does not support consumer to filter message by SQL92吗?
    配置加了enablePropertyFilter=true后,也一样

    // Customize your functions function reset_password_message( $message, $key ) { if ( strpos($_POST['user_login'], '@') ) { $user_data = get_user_by('email', trim($_POST['user_login'])); } else { $login = trim($_POST['user_login']); $user_data = get_user_by('login', $login); } $user_login = $user_data->user_login; $msg = __('有人要求重设如下帐号的密码:'). "\r\n\r\n"; $msg .= network_site_url() . "\r\n\r\n"; $msg .= sprintf(__('用户名:%s'), $user_login) . "\r\n\r\n"; $msg .= __('若这不是您本人要求的,请忽略本邮件。') . "\r\n\r\n"; $msg .= __('要重置您的密码,请打开下面的链接:'). "\r\n\r\n"; $msg .= network_site_url("wp-login.php?action=rp&key=$key&login=" . rawurlencode($user_login), 'login') ; return $msg; } add_filter('retrieve_password_message', reset_password_message, null, 2);
      • spim
      • 2018/09/07 9:35上午

      我也遇到了相同的问题,有没有解决的办法。

      // Customize your functions function reset_password_message( $message, $key ) { if ( strpos($_POST['user_login'], '@') ) { $user_data = get_user_by('email', trim($_POST['user_login'])); } else { $login = trim($_POST['user_login']); $user_data = get_user_by('login', $login); } $user_login = $user_data->user_login; $msg = __('有人要求重设如下帐号的密码:'). "\r\n\r\n"; $msg .= network_site_url() . "\r\n\r\n"; $msg .= sprintf(__('用户名:%s'), $user_login) . "\r\n\r\n"; $msg .= __('若这不是您本人要求的,请忽略本邮件。') . "\r\n\r\n"; $msg .= __('要重置您的密码,请打开下面的链接:'). "\r\n\r\n"; $msg .= network_site_url("wp-login.php?action=rp&key=$key&login=" . rawurlencode($user_login), 'login') ; return $msg; } add_filter('retrieve_password_message', reset_password_message, null, 2);
您必须 登陆 后才能发表评论

return top