【阅读笔记】rocketmq 特性实现 —— 消息Hook

介绍 rocketmq 中的勾子

RPC hook

指 NettyRemotingServer 和 NettyRemotingClinet 中的 hook

client 中的 hook

message hook 可以在消息发送、消费前后执行用户自定义的代码

producer hook

可以在消息发送前后执行用户定义的代码

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

producer.getDefaultMQProducerImpl().registerSendMessageHook(new SendMessageHook() {
@Override
public String hookName() {
return "hook";
}

// 消息发送前执行代码
@Override
public void sendMessageBefore(SendMessageContext context) {
System.out.println(new String(context.getMessage().getBody()));
try {
// 可以修改消息的内容和属性
context.getMessage().setBody("update to something else".getBytes(RemotingHelper.DEFAULT_CHARSET));
} catch(Exception ignore) {
}
}

// 消息发送后执行代码
@Override
public void sendMessageAfter(SendMessageContext context) {
System.out.println(new String(context.getMessage().getBody()));
}
});

consumer hook

可以在消息消费前后执行用户定义的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new ConsumeMessageHook(){
@Override
public String hookName() {
return "hook";
}

// 消息消费前执行
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
}

// 消息消费后执行
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
}
});

filter hook

可以在拉回消息并且在执行过滤后,执行用户定义的代码

1
2
3
4
5
6
7
8
9
10
11

consumer.getDefaultMQPushConsumerImpl().registerFilterMessageHook(new FilterMessageHook() {
@Override
public String hookName() {
return null;
}

@Override
public void filterMessage(FilterMessageContext context) {
}
});