分析 rocketmq 如何实现基本的远程通信
协议
rocketmq-remoting 工程实现 rocketmq 的远程通信,组件间大部分的通信都是通过这个模块实现。
rocketmq-remoting 使用 netty 实现的,基于 TCP 的应用层协议。协议格式在 RemotingCommand 定义,一个数据包由数据包长度,header 长度,header,body 四部分组成,如下:
1 | +--------+---------------+-------+------+ |
header 包含几个部分
域 | 作用 |
---|---|
code | 表示命令的类型,在 RequestCode 定义 |
language | 表示编写的语言,如 java、C、go |
version | rocketmq 的版本 |
opaque | 在一个 tcp channel 中唯一,用于标识 request 和 response 的对应关系 |
flag | 保存多个信息;RPC类型,如 1 表示 RPC_ONEWAY;命令类型, 包含两类数据包 REQUEST_COMMAND (请求命令)和 RESPONSE_COMMAND(应答命令) |
remark | 描述信息 |
extFields | 用户定义部分,长度可变 |
body 就是要发送的命令数据
实现
概述
关键的类层次结构如下图
remoting 主要由 NettyRemotingServer 和 NettyRemotingClient 分别实现 server,clinet。
server 和 clinet 的相同点
- server 可以向 clinet 发送 request 命令, client 也可以向 server 发送命令
- 都有 3 种调用方法 sync, async 和 one way,但 server 只能用已经有的 channel 发送, client 可以创建新的 channel 然后发送
- 都可以通过 registerProcessor 注册一个命令处理器,用于命令特定命令
- 可以更新 namesrv 列表
不同点
- server 有侦听端口(这个当然了)
- server 可以设定默认的 processor, clinet 不能
NettyRemotingAbstract
server 和 client 的公共部分由 NettyRemotingAbstract 实现,包括:
- request processor 的注册以及命令的处理, response 命令的处理
- sync, async, one way 三种调用方法的实现
- NettyEvent 的处理。 NettyEvent 是 remoting 定义的,在 netty 连接有变化时触发的事件
NettyEventExecutor 可以执行注册了的 ChannelEventListener - async 和 one way 调用的流量控制
request & response 的处理
当子类收到 RemotingCommand 数据时,调用 processMessageReceived 方法,processMessageReceived 根据命令的类型是(type)决定如果是 REQUEST_COMMAND,则调用 processRequestCommand,如果是 RESPONSE_COMMAND 则调用 processResponseCommand
request
1 | HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable |
成员 processorTable 保存着注册的 request code 和 processor 的对应关系,ExecutorService 是执行的线程池
当 processRequestCommand 就是用 processorTable 中找到对应的 processor,然后构造一个 Runnable 提交到指定的线程池
这个 Runnable 调用 processRequest 方法得到 response 类型的 RemotingCommand,如果是 sync 或 async 调用方法,就
调用 writeAndFlush 方法将 response 写回调用方
response
1 | ConcurrentMap<Integer /* opaque */, ResponseFuture> responseTable |
成员 responseTable 保存返回的 response 类型 RemotingCommand
当向远端发送一个 request 时,NettyRemotingAbstract 生成一个 opaque 和一个 ResponseFuture 保存到 responseTable
当远端发送的 response 到达时,由 processResponseCommand 方法处理,processResponseCommand 方法根据返回的 opaque
找到对应的 ResponseFuture,设置返回结果,然后调用回调(async)或唤醒等待 ResponseFuture 的线程(sync)
sync, async, one way 三种调用方法的实现
sync 同步调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public RemotingCommand invokeSyncImpl(final Channel channel,
final RemotingCommand request,
final long timeoutMillis) {
...
// 生成 future 并放到 responseTable
ResponseFuture responseFuture = new ResponseFuture
responseTable.put(opaque, responseFuture)
// 向远端发送命令
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
...
});
// 等待写 responseFuture,
// 等收到对应的 Response 时,唤醒这个线程
RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);
...
}async 异步调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void invokeAsyncImpl(final Channel channel,
final RemotingCommand request,
final long timeoutMillis,
// 多了个 callback
final InvokeCallback invokeCallback)
...
// 调用次数控制
boolean acquired = this.semaphoreAsync.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS)
if (acquired) {
// 生成 future 并放到 responseTable
// 这个 future 设置了 callback
final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis - costTime, invokeCallback, once);
this.responseTable.put(opaque, responseFuture);
// 向远端发送命令
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
...
});
// 直接返回
// 等收到对应的 Response 时,会调用对应的 callback
}
...
}one way
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20public void invokeOnewayImpl(final Channel channel,
final RemotingCommand request,
final long timeoutMillis) {
...
boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS);
// 调用次数控制
if (acquired) {
// 向远端发送命令
channel.writeAndFlush(request).addListener(new ChannelFutureListener() {
...
});
// 直接返回,连 response future 都不需要
}
...
}
ResponseFuture
保存异步调用的结果。异步调用时远程命令时,先 new 一个 ResponseFuture 对象保存到 responseTable
待远程调用返回时,将结果写到 ResponseFuture
ResponseFuture 通过成员 opaque 和 processChannel 标识出唯一的请求,
countDownLatch 成员用于实现同步调用,waitResponse 时 await,有结果时 countDown
responseCommand 和 cause 保存调用的结果
NettyLogger
桥接 netty 的 log 到 rocketmq。
netty 没有默认支持一种日志框架,而是自动检测用户使用什么日志框架,它就用什么日志
见 https://segmentfault.com/a/1190000005797595
netty 使用 InternalLoggerFactory 实现日志框架检测并生成 logger,而 NettyLogger 就是将 InternalLoggerFactory 的 logger 工厂设置成 NettyBridgeLoggerFactory
这个工厂生成的 Logger 就是 InternalLogger,即 rocketmq-logging 模块的 logger
NettyRemotingServer
用 netty 实现一个 server
server 创建时会检测能不能用 epool,如果可以就用 EpollEventLoopGroup, 否则用就 NioEventLoopGroup
start 方法启动 netty
1 | public void start() { |
server 有一个 Timer 定时调用 scanResponseTable 清理过期的 response future
NettyRemotingClient
用 netty 实现一个 client
start 方法启动 netty
1 | public void start() { |
与 server 一样, client 也有一个 Timer 定时调用 scanResponseTable 清理过期的 response future
序列化
远程通信需要只能传送字节流,所以在发送 request 或 response 前,需要先将对象转换成字节流,而接受到远端的数据时,需要将字节流转换成数据,这就是序列化和反序列化的职责
所有的需要传输的实体类都在 rocketmq-common 工程的 org.apache.rocketmq.common.protocol 的子包中定义,他们之中需要序列化的都会继承 rocketmq-remoting 工程下的 RemotingSerializable
示例 Register Broker
1 |
|