【阅读笔记】rocketmq 特性实现 —— 消息过滤

分析 rocketmq 的消息过滤机制以及如何实现消息过滤

前言

consumer 有 3 种方式订阅自己需要的消息

  • subscribe(String topic, String subExpression) // 根据 tag 过滤
  • subscribe(final String topic, final MessageSelector messageSelector) // 根据选择器过滤
  • subscribe(String topic, String fullClassName, String filterClassSource) // 根据自定义的 class 过滤

按 tag 过滤

1
subscribe(String topic, String subExpression)

通过 tag 属性进行过滤,可以用 || 连接多个 tag, 例如:
subscribe(topic, “tag1 || tag2”) 表示订阅 tag 是 tag1 或 tag2 的消息

按 tag 过滤示例

producer 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_filter");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();

// 发送 3 条消息, tag 分别是 filter_tag_a, filter_tag_b, filter_tag_c
producer.send(buildMsg("filter_tag_a", "filter_a_message"));
producer.send(buildMsg("filter_tag_b", "filter_b_message"));
producer.send(buildMsg("filter_tag_c", "filter_c_message"));

producer.shutdown();
}

private static Message buildMsg(String tag, String body) throws Exception {
Message msg = new Message(
"TopicTest", // topic
tag, // tag
body.getBytes(RemotingHelper.DEFAULT_CHARSET) // body
);

return msg;
}

consumer 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

public static void main(String [] argv) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
consumer.setNamesrvAddr("127.0.0.1:9876");
// 过滤 tag
// 只有 filter_tag_a 和 filter_tag_b 两条消息会被这个 consumer 消费, filter_tag_c 会被忽略
consumer.subscribe("TopicTest", "filter_tag_a || filter_tag_b");

consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
System.out.println(argv[0] + "收到消息");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

consumer.start();
}

按 tag 过滤实现

订阅时,client 向 broker 发送订阅信息,然后拉取时,broker 会按照 client 的订阅信息过滤消息

consumer 注册订阅信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

...

@Override
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(topic, subExpression);
}

...
}

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
...

public void subscribe(String topic, String subExpression) throws MQClientException {
try {
// 构建订阅信息
SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
topic, subExpression);

// 保存到本地。 RebalanceImpl 用来生成 PullRequest
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);

// 发送订阅信息到 broker
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
...
}

public class MQClientInstance {
...

// 向 broker 发送心跳,其中心跳信息带有订阅信息
public void sendHeartbeatToAllBrokerWithLock() {
if (this.lockHeartbeat.tryLock()) {
try {
this.sendHeartbeatToAllBroker();
this.uploadFilterClassSource();
} catch (final Exception e) {
log.error("sendHeartbeatToAllBroker exception", e);
} finally {
this.lockHeartbeat.unlock();
}
} else {
log.warn("lock heartBeat, but failed.");
}
}

private void sendHeartbeatToAllBroker() {
// HeartbeatData 带有订阅信息 SubscriptionData
final HeartbeatData heartbeatData = this.prepareHeartbeatData();

...

if (!this.brokerAddrTable.isEmpty()) {
long times = this.sendHeartbeatTimesTotal.getAndIncrement();
Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();

... // 遍历每个 broker, 对于每个 master

try {
// 发送 HEART_BEAT 到 broker
int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
if (!this.brokerVersionTable.containsKey(brokerName)) {
this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
}
this.brokerVersionTable.get(brokerName).put(addr, version);
if (times % 20 == 0) {
log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
log.info(heartbeatData.toString());
}
} catch (Exception e) {
if (this.isBrokerInNameServer(addr)) {
log.info("...");
} else {
log.info("...");
}
}

...
}

...
}


// SubscriptionData 保存订阅信息
public class SubscriptionData implements Comparable<SubscriptionData> {
public final static String SUB_ALL = "*";

private boolean classFilterMode = false;
private String topic; // 订阅的 topic
private String subString; // 订阅表达式
private Set<String> tagsSet = new HashSet<String>(); // match 的 tag
private Set<Integer> codeSet = new HashSet<Integer>(); // 保存订阅的 tag 的 hash code
private long subVersion = System.currentTimeMillis();
private String expressionType = ExpressionType.TAG;

// 保存 filter 类的源码
@JSONField(serialize = false)
private String filterClassSource;

...
}

broker 过滤消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205

public class PullMessageProcessor implements NettyRequestProcessor {

private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend) {
...

// 请求里可以控制从哪里取订阅信息(requestHeader or broker,默认 false)
final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());

if (hasSubscriptionFlag) {
try {
/* 从 request 构建 subscriptionData,这是比较特殊的用法
*/
subscriptionData = FilterAPI.build(requestHeader.getTopic(),
requestHeader.getSubscription(), requestHeader.getExpressionType()
);

/* 从 request 构建 consumerFilterData
*/
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {

consumerFilterData = ConsumerFilterManager.build(
requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),
requestHeader.getExpressionType(), requestHeader.getSubVersion()
);

assert consumerFilterData != null;
}
} catch (Exception e) {
... // 报错
}
} else {
ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());
if (null == consumerGroupInfo) {
... // 报错
}

if (!subscriptionGroupConfig.isConsumeBroadcastEnable() && consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {
... // 报错
}

/* 在 broker 本地找 subscriptionData
*/
subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData) {
... // 报错
}

if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {
... // 报错
}

/* 从 broker 本地取 consumerFilterData
*/
if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {
consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(), requestHeader.getConsumerGroup());
if (consumerFilterData == null) {
... // 报错
}

if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {
... // 报错
}
}
}

if (!ExpressionType.isTagType(subscriptionData.getExpressionType()) && !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {
... // 如果不是 ExpressionType ,并且 enablePropertyFilter 形状没打开,报错
}

/* 创建一个 MessageFilter 对象
*/
MessageFilter messageFilter;
if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {
// ExpressionForRetryMessageFilter 用来处理 RETRY 队列
// 它会用原队列的过滤规则过滤消息
messageFilter = new ExpressionForRetryMessageFilter(
subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager());
} else {
messageFilter = new ExpressionMessageFilter(
subscriptionData, consumerFilterData, this.brokerController.getConsumerFilterManager());
}

final GetMessageResult getMessageResult =
brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),
requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(),
messageFilter); // 用来过滤消息
...
}

...
}


public class DefaultMessageStore implements MessageStore {
public GetMessageResult getMessage(final String group, final String topic, final int queueId,
final long offset,
final int maxMsgNums,
final MessageFilter messageFilter) {

...

/* tag code 在 consume queue 里,这先根据 tag 过滤一遍
* tagsCode 并不是 tag, 它是一个 Long 类型,由 tag 做散列得到
*/
if (messageFilter != null &&
!messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {

if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
continue; // 在一个循环里,不匹配就跳过
}

SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);

...

/* 读出消息后再根据消息内容过滤一遍
*/
if (messageFilter != null &&
!messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {

if (getResult.getBufferTotalSize() == 0) {
status = GetMessageStatus.NO_MATCHED_MESSAGE;
}
// release...
selectResult.release();
continue; // 在一个循环里,不匹配就跳过
}

...
}
}

public class ExpressionMessageFilter implements MessageFilter {

// 表示用 ConsumeQueue 的信息过滤
@Override
public boolean isMatchedByConsumeQueue(Long tagsCode, ConsumeQueueExt.CqExtUnit cqExtUnit) {
if (null == subscriptionData) {
return true;
}

if (subscriptionData.isClassFilterMode()) {
return true;
}

// tag 类型就直接检查 tag code
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
if (tagsCode == null) {
return true;
}

// 如果是 *, 所有消息都匹配
if (subscriptionData.getSubString().equals(SubscriptionData.SUB_ALL)) {
return true;
}

// 检查订阅信息里是否包含这个 tag code
return subscriptionData.getCodeSet().contains(tagsCode.intValue());
} else {
// 表达式类型就用布隆过滤器
if (consumerFilterData == null || consumerFilterData.getExpression() == null
|| consumerFilterData.getCompiledExpression() == null || consumerFilterData.getBloomFilterData() == null) {
return true;
}

// 如果在计算 bitmap 之前,这个消息已经存盘,则直接通过
if (cqExtUnit == null || !consumerFilterData.isMsgInLive(cqExtUnit.getMsgStoreTime())) {
log.debug("Pull matched because not in live: {}, {}", consumerFilterData, cqExtUnit);
return true;
}

// 获取布隆过滤器保存的 bit array
// 在 CommitLogDispatcherCalcBitMap 中计算
byte[] filterBitMap = cqExtUnit.getFilterBitMap();

// BloomFilter 实现了布隆过滤器,它内部不存储 bit 数组,当需要插入或检查时,需要传入 BitsArray 作参数
BloomFilter bloomFilter = this.consumerFilterManager.getBloomFilter();
if (filterBitMap == null || !this.bloomDataValid
|| filterBitMap.length * Byte.SIZE != consumerFilterData.getBloomFilterData().getBitNum()) {

return true;
}

// 用布隆过滤器过滤
BitsArray bitsArray = null;
try {
bitsArray = BitsArray.create(filterBitMap);
// consumerFilterData.getBloomFilterData() 需要检查的数据
// bitsArray 布隆过滤器中已存在的 bit array
boolean ret = bloomFilter.isHit(consumerFilterData.getBloomFilterData(), bitsArray);
log.debug("Pull {} by bit map:{}, {}, {}", ret, consumerFilterData, bitsArray, cqExtUnit);
return ret;
} catch (Throwable e) {
log.error(...);
}
}

return true;
} // End of isMatchedByConsumeQueue

...
}

按表达式过滤

1
subscribe(final String topic, final MessageSelector messageSelector)

分两种 MessageSelector, MessageSelector.byTag 和 MessageSelector.bySql

  • MessageSelector.byTag 是可以根据 tag 表达式进行过滤,实际上同按 tag 过滤
  • MessageSelector.bySql 可以用 SQL 表达式实现消息过滤,broker 要将 enablePropertyFilter 设置为 true

按表达式过滤示例

表达式过滤 producer 示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("producer_filter");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();

Message msg = new Message(
"TopicTest",// topic
"filter_tag_a",// tag
("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)// body
);
/* 可以设置一些自定义的属性,根据这些属性过滤
*/
msg.putUserProperty("user_property", "test");

producer.send(msg);

producer.shutdown();
}

表达式过滤 consumer 示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

public static void main(String [] argv) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
consumer.setInstanceName(argv[0]);
consumer.setNamesrvAddr("127.0.0.1:9876");

consumer.subscribe("TopicTest", MessageSelector.bySql("TAGS = 'filter_tag_a' and user_property = 'test'"));

consumer.registerMessageListener((List<MessageExt> msgs, ConsumeConcurrentlyContext context) -> {
System.out.println(argv[0] + "收到消息");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});

consumer.start();
}

按表达式过滤实现

在 consumer 注册订阅信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

@Override
public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {
this.defaultMQPushConsumerImpl.subscribe(topic, messageSelector);
}
}

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
public void subscribe(final String topic, final MessageSelector messageSelector) throws MQClientException {
try {
if (messageSelector == null) {
subscribe(topic, SubscriptionData.SUB_ALL);
return;
}

SubscriptionData subscriptionData = FilterAPI.build(topic,
messageSelector.getExpression(), messageSelector.getExpressionType());

// 订阅信息存本地
this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);

// 发送订阅信息到 broker
if (this.mQClientFactory != null) {
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
}
} catch (Exception e) {
throw new MQClientException("subscription exception", e);
}
}
}

在 broker 过滤消息

同 tag 过滤的方式,区别是会走非 tag 类型的分支

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59

public class ExpressionMessageFilter implements MessageFilter {

// 如果是非 tag 类型过滤,可以按消息的内容过滤
@Override
public boolean isMatchedByCommitLog(ByteBuffer msgBuffer, Map<String, String> properties) {
if (subscriptionData == null) {
return true;
}

if (subscriptionData.isClassFilterMode()) {
return true;
}

// tag type 的话,不需要看消息的内容
if (ExpressionType.isTagType(subscriptionData.getExpressionType())) {
return true;
}

ConsumerFilterData realFilterData = this.consumerFilterData;

// properties 是消息的基本属性
// 包括 TAGS, UNIQ_KEY 以及用 Message.putUserProperty 设置的属性
Map<String, String> tempProperties = properties;

// no expression
if (realFilterData == null || realFilterData.getExpression() == null
|| realFilterData.getCompiledExpression() == null) {

return true;
}

// 解析消息的 properties 域
if (tempProperties == null && msgBuffer != null) {
tempProperties = MessageDecoder.decodeProperties(msgBuffer);
}

/* 表达式过滤
*/
Object ret = null;
try {
MessageEvaluationContext context = new MessageEvaluationContext(tempProperties);

// 计算表达式的值
ret = realFilterData.getCompiledExpression().evaluate(context);
} catch (Throwable e) {
log.error("Message Filter error, " + realFilterData + ", " + tempProperties, e);
}

log.debug("Pull eval result: {}, {}, {}", ret, realFilterData, tempProperties);

if (ret == null || !(ret instanceof Boolean)) {
return false;
}

return (Boolean) ret;

} // End of isMatchedByCommitLog
}

表达式计算

consumer 在注册订阅信息时, broker 会将表达式编译好存放在 ConsumerFilterManager 中,当拉取消息时再从 ConsumerFilterManager 取出编译好的表达式进行计算和过滤。

目前只支持 SQL92,由类 SelectorParser 实现了 SQL 的解析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21

public class ConsumerFilterManager extends ConfigManager {

public static ConsumerFilterData build(final String topic, final String consumerGroup,
final String expression, final String type,
final long clientVersion) {
......
// 编译,type 目前只支持 SQL92
consumerFilterData.setCompiledExpression(
FilterFactory.INSTANCE.get(type).compile(expression)
);
.....
}
}

public class SqlFilter implements FilterSpi {
@Override
public Expression compile(final String expr) throws MQFilterException {
return SelectorParser.parse(expr);
}
}

SelectorParser 是用 JavaCC 做的编译器,语法在 SelectorParser.jj 文件中定义

表达式计算时,会取消息的 properties 作变量表进行计算,默认的变量有 UNIQ_KEY, WAIT 和 TAGS。如果用 putUserProperty 设置了新变量, 就会有新变量。

按用户定义的类过滤

1
subscribe(String topic, String fullClassName, String filterClassSource)

可以用一个用户定义的类进行消息过滤,需要用到 FilterSrv 服务器。

注:4.3.0 中去掉这种方式,原因是不完善。但相关的 API 还没删掉,以后可能会重新实现。

客户端过滤

由于在 broker 的过滤并不是精确的(例如布隆过滤器就可能误判),客户端在拉到消息后会再精确过滤一次。