RocketMQ 消息过滤
在大多数情况下,tag(标记)是一种简单而有用的设计,可以选择你想要的消息。例如:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_EXAMPLE"); consumer.subscribe("TOPIC", "TAGA || TAGB || TAGC");
消费者将收到包含TAGA或TAGB或TAGC的消息。但是限制是一条消息只能有一个标签,这可能不适用于复杂的场景。在这种情况下,您可以使用SQL表达式过滤掉消息。
原则
SQL功能可以将发送消息时输入的属性进行一些计算。在RocketMQ定义的语法下,你可以实现一些有趣的逻辑。请看下面的例子:
------------ | message | |----------| a > 5 AND b = 'abc' | a = 10 | --------------------> Gotten | b = 'abc'| | c = true | ------------ ------------ | message | |----------| a > 5 AND b = 'abc' | a = 1 | --------------------> Missed | b = 'abc'| | c = true | ------------
语法
RocketMQ只定义了一些基本语法来支持这个特性,但是你可以很容易地扩展它。
Numeric comparison, like >, >=, <, <=, BETWEEN, =; Character comparison, like =, <>, IN; IS NULL or IS NOT NULL; Logical AND, OR, NOT;
常量类型为:
Numeric, like 123, 3.1415; Character, like ‘abc’, must be made with single quotes; NULL, special constant; Boolean, TRUE or FALSE;
使用限制
只有推送用户可以通过SQL92选择消息,接口是:
public void subscribe(final String topic, final MessageSelector messageSelector)
生产者例子
发送时,可以通过putUserProperty方法将属性放入消息中。
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name"); producer.start(); Message msg = new Message("TopicTest", tag, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); // Set some properties. msg.putUserProperty("a", String.valueOf(i)); SendResult sendResult = producer.send(msg); producer.shutdown();
消费者例子
使用MessageSelector.bySql在消费时通过SQL92选择消息。
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4"); // only subsribe messages have property a, also a >=0 and a <= 3 consumer.subscribe("TopicTest", MessageSelector.bySql("a between 0 and 3"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start();