分析 rocketmq 的消息过滤机制以及如何实现消息过滤
前言
consumer 有 3 种方式订阅自己需要的消息
- subscribe(String topic, String subExpression) // 根据 tag 过滤
- subscribe(final String topic, final MessageSelector messageSelector) // 根据选择器过滤
- subscribe(String topic, String fullClassName, String filterClassSource) // 根据自定义的 class 过滤
按 tag 过滤
1 | subscribe(String topic, String subExpression) |
通过 tag 属性进行过滤,可以用 || 连接多个 tag, 例如:
subscribe(topic, “tag1 || tag2”) 表示订阅 tag 是 tag1 或 tag2 的消息
按 tag 过滤示例
producer 代码
1 |
|
consumer 代码
1 |
|
按 tag 过滤实现
订阅时,client 向 broker 发送订阅信息,然后拉取时,broker 会按照 client 的订阅信息过滤消息
consumer 注册订阅信息
1 |
|
broker 过滤消息
1 |
|
按表达式过滤
1 | subscribe(final String topic, final MessageSelector messageSelector) |
分两种 MessageSelector, MessageSelector.byTag 和 MessageSelector.bySql
- MessageSelector.byTag 是可以根据 tag 表达式进行过滤,实际上同按 tag 过滤
- MessageSelector.bySql 可以用 SQL 表达式实现消息过滤,broker 要将 enablePropertyFilter 设置为 true
按表达式过滤示例
表达式过滤 producer 示例代码
1 |
|
表达式过滤 consumer 示例代码
1 |
|
按表达式过滤实现
在 consumer 注册订阅信息
1 |
|
在 broker 过滤消息
同 tag 过滤的方式,区别是会走非 tag 类型的分支
1 |
|
表达式计算
consumer 在注册订阅信息时, broker 会将表达式编译好存放在 ConsumerFilterManager 中,当拉取消息时再从 ConsumerFilterManager 取出编译好的表达式进行计算和过滤。
目前只支持 SQL92,由类 SelectorParser 实现了 SQL 的解析。
1 |
|
SelectorParser 是用 JavaCC 做的编译器,语法在 SelectorParser.jj 文件中定义
表达式计算时,会取消息的 properties 作变量表进行计算,默认的变量有 UNIQ_KEY, WAIT 和 TAGS。如果用 putUserProperty 设置了新变量, 就会有新变量。
按用户定义的类过滤
1 | subscribe(String topic, String fullClassName, String filterClassSource) |
可以用一个用户定义的类进行消息过滤,需要用到 FilterSrv 服务器。
注:4.3.0 中去掉这种方式,原因是不完善。但相关的 API 还没删掉,以后可能会重新实现。
客户端过滤
由于在 broker 的过滤并不是精确的(例如布隆过滤器就可能误判),客户端在拉到消息后会再精确过滤一次。