最近一个H5游戏上线,出现堆外内存暴涨问题,于是进行了简单的分析,发现暴涨原因竟是服务器消息发送不出去,导致Netty发送队列堆积,内存上涨,针对此次分析,做一些总结。

SO_SEND_BUF和SO_REC_BUFF

  • SO_SEND_BUF是操作系统内核的写缓冲区,所有应用程序需要发送到对端的信息,都会放到该缓冲区中,等待发往对端
  • SO_REC_BUFF是操作系统内核的读缓冲区,所有对端发过来的数据都会放到该缓冲区中,等待应用程序取走

ChannelOutboundBuffer

该buffer是Netty等待写入系统内核缓冲区的消息队列

Channel的高低水位线

netty中提供一种水位线的标志,提用户当前通道的消息堆积情况。
用下面的代码来判断是否达到高水位线

1
2
3
if (!ch.isWritable()) {
System.out.println("channel到达高水位.");
}

Netty发送消息的流程

  • 1.调用Channel的write方法,该方法会将消息加入ChannelOutboundBuffer,此时并没有实际发送.
    当消息调用channel的write以后,netty回增加该连接发送队列的水位线
    以下是AbstractChannel.java中的代码片段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
  • 2.调用Channel的flush方法,该方法将ChannelOutboundBuffer中的消息写入内核缓冲区
    AbstractChannelHandlerContext.flush方法,准备写入。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public ChannelHandlerContext flush() {
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeFlush();
} else {
Runnable task = next.invokeFlushTask;
if (task == null) {
next.invokeFlushTask = task = new Runnable() {
@Override
public void run() {
next.invokeFlush();
}
};
}
safeExecute(executor, task, channel().voidPromise(), null);
}
return this;
}

NioSocketChannel.doWrite方法,该方法调用java原生nio将数据写入内核缓冲区。写入完毕,将消息从ChannelOutboundBuffer移除
并且减少ChannelOutboundBuffer的水位线。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
int writeSpinCount = config().getWriteSpinCount();
do {
if (in.isEmpty()) {
// All written so clear OP_WRITE
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
// Ensure the pending writes are made of ByteBufs only.
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
int nioBufferCnt = in.nioBufferCount();
// Always us nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
case 0:
// We have something else beside ByteBuffers to write so fallback to normal writes.
writeSpinCount -= doWrite0(in);
break;
case 1: {
// Only one ByteBuf so use non-gathering write
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
ByteBuffer buffer = nioBuffers[0];
int attemptedBytes = buffer.remaining();
final int localWrittenBytes = ch.write(buffer);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
// Zero length buffers are not added to nioBuffers by ChannelOutboundBuffer, so there is no need
// to check if the total size of all the buffers is non-zero.
// We limit the max amount to int above so cast is safe
long attemptedBytes = in.nioBufferSize();
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
// Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}

线上问题

由于有大量广播消息,导致逻辑消息大量堆积在ChannelOutboundBuffer中,本身Netty的ByteBuf默认是使用堆外内存,所以内存不断暴涨。

具体堆积过程:消息发送慢->内核缓存区满->netty缓存区满->内存暴涨。

在调试过程中,发现H5客户端是单线程,所以如果客户端渲染之类的卡的话,也会导致消息不能及时被消费。

预计解决思路

  • 1.全面降低功能系统的消息发送量,尽量精简消息结构
  • 2.根据逻辑合理调整高水位线
  • 3.当超过高水位线的时候,应该做一些策略,延缓消息发送,例如有条件的丢弃某些消息,防止内存暴涨