开发学院

您的位置:首页>教程>正文

教程正文

RocketMQ 消息过滤

  在大多数情况下,tag(标记)是一种简单而有用的设计,可以选择你想要的消息。例如:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE");
consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");

  消费者将收到包含TAGA或TAGB或TAGC的消息。但是限制是一条消息只能有一个标签,这可能不适用于复杂的场景。在这种情况下,您可以使用SQL表达式过滤掉消息。

原则

  SQL功能可以将发送消息时输入的属性进行一些计算。在RocketMQ定义的语法下,你可以实现一些有趣的逻辑。请看下面的例子:

------------
| message  |
|----------|  a > 5 AND b = 'abc'
| a = 10   |  --------------------> Gotten
| b = 'abc'|
| c = true |
------------
------------
| message  |
|----------|   a > 5 AND b = 'abc'
| a = 1    |  --------------------> Missed
| b = 'abc'|
| c = true |
------------

语法

  RocketMQ只定义了一些基本语法来支持这个特性,但是你可以很容易地扩展它。

Numeric comparison, like >, >=, <, <=, BETWEEN, =;
Character comparison, like =, <>, IN;
IS NULL or IS NOT NULL;
Logical AND, OR, NOT;

  常量类型为:

Numeric, like 123, 3.1415;
Character, like ‘abc’, must be made with single quotes;
NULL, special constant;
Boolean, TRUE or FALSE;

使用限制

  只有推送用户可以通过SQL92选择消息,接口是:

public void subscribe(final String topic, final MessageSelector messageSelector)

生产者例子

  发送时,可以通过putUserProperty方法将属性放入消息中。

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.start();

Message msg = new Message("TopicTest",
    tag,
    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
// Set some properties.
msg.putUserProperty("a", String.valueOf(i));

SendResult sendResult = producer.send(msg);
   
producer.shutdown();

消费者例子

  使用MessageSelector.bySql在消费时通过SQL92选择消息。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");

// only subsribe messages have property a, also a >=0 and a <= 3
consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3");

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();