首页>>后端>>java->netty系列:一行简单的writeAndFlush都做了哪些事

netty系列:一行简单的writeAndFlush都做了哪些事

时间:2023-11-30 本站 点击:2

前言

对于使用Netty的小伙伴来说,我们想通过服务端往客户端发送数据,通常我们会调用ctx.writeAndFlush(数据)的方式。那么它都执行了那些行为呢,是怎么将消息发送出去的呢。

源码分析

下面的这个方法是用来接收客户端发送过来的数据,通常会使用ctx.writeAndFlush(数据)来向客户端发送数据。

@OverridepublicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{System.out.println("接收到消息:"+msg);Stringstr="服务端收到:"+newDate()+msg;ctx.writeAndFlush(str);}

ctx.writeAndFlush 的逻辑

privatevoidwrite(Objectmsg,booleanflush,ChannelPromisepromise){//...AbstractChannelHandlerContextnext=this.findContextOutbound(flush?98304:'耀');Objectm=this.pipeline.touch(msg,next);EventExecutorexecutor=next.executor();if(executor.inEventLoop()){if(flush){next.invokeWriteAndFlush(m,promise);}else{next.invokeWrite(m,promise);}}else{Objecttask;if(flush){task=AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next,m,promise);}else{task=AbstractChannelHandlerContext.WriteTask.newInstance(next,m,promise);}if(!safeExecute(executor,(Runnable)task,promise,m)){((AbstractChannelHandlerContext.AbstractWriteTask)task).cancel();}}}

从上述源码我们可以知道,WriteAndFlush()相对于Write(),它的flush字段是true。

write:将需要写的 ByteBuff 存储到 ChannelOutboundBuffer中。

flush:从ChannelOutboundBuffer中将需要发送的数据读出来,并通过 Channel 发送出去。

writeAndFlush 源码

publicChannelFuturewriteAndFlush(Objectmsg){returnthis.writeAndFlush(msg,this.newPromise());}publicChannelPromisenewPromise(){returnnewDefaultChannelPromise(this.channel(),this.executor());}

writeAndFlush方法里提供了一个默认的 newPromise()作为参数传递。在Netty中发送消息是一个异步操作,那么可以通过往hannelPromise中注册回调监听listener来得到该操作是否成功。

在发送消息时添加监听

ctx.writeAndFlush(str,ctx.newPromise().addListener(newChannelFutureListener(){@OverridepublicvoidoperationComplete(ChannelFuturechannelFuture)throwsException{channelFuture.isSuccess();}}));

继续向下一层跟进代码,AbstractChannelHandlerContext中的invokeWriteAndFlush的源码。

privatevoidinvokeWriteAndFlush(Objectmsg,ChannelPromisepromise){if(this.invokeHandler()){this.invokeWrite0(msg,promise);this.invokeFlush0();}else{this.writeAndFlush(msg,promise);}}

从上述源码我们可以能够知道:

1、首先通过invokeHandler()判断通道处理器是否已添加到管道中。

2、执行消息处理 invokeWrite0方法:

首先将消息内容放入输出缓冲区中 invokeFlush0;

然后将输出缓冲区中的数据通过socket发送到网络中。

分析invokeWrite0执行的内容,源码如下:

privatevoidinvokeWrite0(Objectmsg,ChannelPromisepromise){try{((ChannelOutboundHandler)this.handler()).write(this,msg,promise);}catch(Throwablevar4){notifyOutboundHandlerException(var4,promise);}}

((ChannelOutboundHandler)this.handler()).write是一个出站事件ChannelOutboundHandler,会由ChannelOutboundHandlerAdapter处理。

@Skippublicvoidwrite(ChannelHandlerContextctx,Objectmsg,ChannelPromisepromise)throwsException{ctx.write(msg,promise);}

接下来会走到ChannelPipeline中,来执行网络数据发送;我们来看DefaultChannelPipeline 中HeadContextwrite方法源码

publicvoidwrite(ChannelHandlerContextctx,Objectmsg,ChannelPromisepromise){this.unsafe.write(msg,promise);}

unsafe是构建NioServerSocketChannelNioSocketChannel对象时,一并构建一个成员属性,它会完成底层真正的网络操作等。

我们跟进HenderContext的write() ,而HenderContext的中依赖的是unsafe.wirte()。所以直接去 AbstractChannel的Unsafe 源码如下:

publicfinalvoidwrite(Objectmsg,ChannelPromisepromise){this.assertEventLoop();ChannelOutboundBufferoutboundBuffer=this.outboundBuffer;if(outboundBuffer==null){//缓存写进来的bufferthis.safeSetFailure(promise,this.newWriteException(AbstractChannel.this.initialCloseCause));ReferenceCountUtil.release(msg);}else{intsize;try{//bufferDirct化,(我们查看AbstractNioByteBuf的实现)msg=AbstractChannel.this.filterOutboundMessage(msg);size=AbstractChannel.this.pipeline.estimatorHandle().size(msg);if(size<0){size=0;}}catch(Throwablevar6){this.safeSetFailure(promise,var6);ReferenceCountUtil.release(msg);return;}//插入写队列将msg插入到outboundBuffer//outboundBuffer这个对象是ChannelOutBoundBuff类型的,它的作用就是起到一个容器的作用//下面看,是如何将msg添加进ChannelOutBoundBuff中的outboundBuffer.addMessage(msg,size,promise);}}

从上述源码中,我们可以看出,首先调用 assertEventLoop 确保该方法的调用是在reactor线程中;然后,调用 filterOutboundMessage() 方法,将待写入的对象过滤。下面我们来看看filterOutboundMessage方法的源码。

protectedfinalObjectfilterOutboundMessage(Objectmsg){if(msginstanceofByteBuf){ByteBufbuf=(ByteBuf)msg;returnbuf.isDirect()?msg:this.newDirectBuffer(buf);}elseif(msginstanceofFileRegion){returnmsg;}else{thrownewUnsupportedOperationException("unsupportedmessagetype:"+StringUtil.simpleClassName(msg)+EXPECTED_TYPES);}}

从上述源码可以看出,只有ByteBuf以及 FileRegion可以进行最终的Socket网络传输,其他类型的数据是不支持的,会抛UnsupportedOperationException异常。并且会把堆 ByteBuf 转换为一个非堆的 ByteBuf 返回。也就说,最后会通过socket传输的对象时非堆的 ByteBuf 和 FileRegion。

在发送数据时,我们需要估算出需要写入的 ByteBuf 的size,我们来看看 DefaultMessageSizeEstimator 的HandleImpl类中的size()方法。

privatevoidwrite(Objectmsg,booleanflush,ChannelPromisepromise){//...AbstractChannelHandlerContextnext=this.findContextOutbound(flush?98304:'耀');Objectm=this.pipeline.touch(msg,next);EventExecutorexecutor=next.executor();if(executor.inEventLoop()){if(flush){next.invokeWriteAndFlush(m,promise);}else{next.invokeWrite(m,promise);}}else{Objecttask;if(flush){task=AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next,m,promise);}else{task=AbstractChannelHandlerContext.WriteTask.newInstance(next,m,promise);}if(!safeExecute(executor,(Runnable)task,promise,m)){((AbstractChannelHandlerContext.AbstractWriteTask)task).cancel();}}}0

通过ByteBuf.readableBytes()判断消息内容大小,估计待发送消息数据的大小,如果是FileRegion的话直接返回0,否则返回ByteBuf中可读取字节数。

接下来我们来看看是如何将 msg 添加进 ChannelOutBoundBuff 中的。

ChannelOutBoundBuff 类

ChannelOutboundBuffer类主要用于存储其待处理的出站写请求的内部数据。当 Netty 调用 write时数据不会真正地去发送而是写入到ChannelOutboundBuffer 缓存队列,直到调用 flush方法 Netty 才会从ChannelOutboundBuffer取数据发送。每个 Unsafe 都会绑定一个ChannelOutboundBuffer,也就是说每个客户端连接上服务端都会创建一个 ChannelOutboundBuffer 绑定客户端 Channel。

观察 ChannelOutBoundBuff 源码,可以看到以下四个属性:

privatevoidwrite(Objectmsg,booleanflush,ChannelPromisepromise){//...AbstractChannelHandlerContextnext=this.findContextOutbound(flush?98304:'耀');Objectm=this.pipeline.touch(msg,next);EventExecutorexecutor=next.executor();if(executor.inEventLoop()){if(flush){next.invokeWriteAndFlush(m,promise);}else{next.invokeWrite(m,promise);}}else{Objecttask;if(flush){task=AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next,m,promise);}else{task=AbstractChannelHandlerContext.WriteTask.newInstance(next,m,promise);}if(!safeExecute(executor,(Runnable)task,promise,m)){((AbstractChannelHandlerContext.AbstractWriteTask)task).cancel();}}}1

flushedEntry :指针表示第一个被写到操作系统Socket缓冲区中的节点;

unFlushedEntry:指针表示第一个未被写入到操作系统Socket缓冲区中的节点;

tailEntry:指针表示ChannelOutboundBuffer缓冲区的最后一个节点。

flushed:表示待发送数据个数。

下面分别是三个指针的作用,示意图如下:

flushedEntry 指针表示第一个被写到操作系统Socket缓冲区中的节点;

unFlushedEntry指针表示第一个未被写入到操作系统Socket缓冲区中的节点;

tailEntry指针表示ChannelOutboundBuffer缓冲区的最后一个节点。

初次调用 addMessage 之后,各个指针的情况为:

fushedEntry指向空,unFushedEntrytailEntry 都指向新加入的节点。第二次调用 addMessage之后,各个指针的情况为:

第n次调用 addMessage之后,各个指针的情况为:

可以看到,调用n次addMessageflushedEntry指针一直指向NULL,表示现在还未有节点需要写出到Socket缓冲区。

ChannelOutboundBuffer 主要提供了以下方法:

addMessage方法:添加数据到对列的队尾;

addFlush方法:准备待发送的数据,在 flush 前需要调用;

nioBuffers方法:用于获取待发送的数据。在发送数据的时候,需要调用该方法以便拿到数据;

removeBytes方法:发送完成后需要调用该方法来删除已经成功写入TCP缓存的数据。

addMessage 方法

addMessage 方法是系统调用write方法时调用,源码如下。

privatevoidwrite(Objectmsg,booleanflush,ChannelPromisepromise){//...AbstractChannelHandlerContextnext=this.findContextOutbound(flush?98304:'耀');Objectm=this.pipeline.touch(msg,next);EventExecutorexecutor=next.executor();if(executor.inEventLoop()){if(flush){next.invokeWriteAndFlush(m,promise);}else{next.invokeWrite(m,promise);}}else{Objecttask;if(flush){task=AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next,m,promise);}else{task=AbstractChannelHandlerContext.WriteTask.newInstance(next,m,promise);}if(!safeExecute(executor,(Runnable)task,promise,m)){((AbstractChannelHandlerContext.AbstractWriteTask)task).cancel();}}}2

上述源码流程如下:

将消息数据包装成 Entry 对象;

如果对列为空,直接设置尾结点为当前节点,否则将新节点放尾部;

unflushedEntry为空说明不存在暂时不需要发送的节点,当前节点就是第一个暂时不需要发送的节点;

将消息添加到未刷新的数组后,增加挂起的节点。

这里需要重点看看第一步将消息数据包装成 Entry 对象的方法。

privatevoidwrite(Objectmsg,booleanflush,ChannelPromisepromise){//...AbstractChannelHandlerContextnext=this.findContextOutbound(flush?98304:'耀');Objectm=this.pipeline.touch(msg,next);EventExecutorexecutor=next.executor();if(executor.inEventLoop()){if(flush){next.invokeWriteAndFlush(m,promise);}else{next.invokeWrite(m,promise);}}else{Objecttask;if(flush){task=AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next,m,promise);}else{task=AbstractChannelHandlerContext.WriteTask.newInstance(next,m,promise);}if(!safeExecute(executor,(Runnable)task,promise,m)){((AbstractChannelHandlerContext.AbstractWriteTask)task).cancel();}}}3

其中Recycler类是基于线程本地堆栈的轻量级对象池。这意味着调用newInstance方法时 ,并不是直接创建了一个 Entry 实例,而是通过对象池获取的。

下面我们看看incrementPendingOutboundBytes方法的源码。

privatevoidwrite(Objectmsg,booleanflush,ChannelPromisepromise){//...AbstractChannelHandlerContextnext=this.findContextOutbound(flush?98304:'耀');Objectm=this.pipeline.touch(msg,next);EventExecutorexecutor=next.executor();if(executor.inEventLoop()){if(flush){next.invokeWriteAndFlush(m,promise);}else{next.invokeWrite(m,promise);}}else{Objecttask;if(flush){task=AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next,m,promise);}else{task=AbstractChannelHandlerContext.WriteTask.newInstance(next,m,promise);}if(!safeExecute(executor,(Runnable)task,promise,m)){((AbstractChannelHandlerContext.AbstractWriteTask)task).cancel();}}}4

在每次添加新的节点后都调用incrementPendingOutboundBytes((long)entry.pendingSize, false)方法,这个方法的作用是设置写状态,设置怎样的状态呢?我们看它的源码,可以看到,它会记录下累计的ByteBuf的容量,一旦超出了阈值,就会传播channel不可写的事件。

addFlush 方法

addFlush 方法是在系统调用 flush 方法时调用的,addFlush 方法的源码如下。

privatevoidwrite(Objectmsg,booleanflush,ChannelPromisepromise){//...AbstractChannelHandlerContextnext=this.findContextOutbound(flush?98304:'耀');Objectm=this.pipeline.touch(msg,next);EventExecutorexecutor=next.executor();if(executor.inEventLoop()){if(flush){next.invokeWriteAndFlush(m,promise);}else{next.invokeWrite(m,promise);}}else{Objecttask;if(flush){task=AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next,m,promise);}else{task=AbstractChannelHandlerContext.WriteTask.newInstance(next,m,promise);}if(!safeExecute(executor,(Runnable)task,promise,m)){((AbstractChannelHandlerContext.AbstractWriteTask)task).cancel();}}}5

以上方法的主要功能就是暂存数据节点变成待发送节点,即flushedEntry 指向的节点到unFlushedEntry指向的节点(不包含 unFlushedEntry)之间的数据。

上述源码的流程如下:

先获取unFlushedEntry指向的暂存数据的起始节点;

将待发送数据起始指针flushedEntry 指向暂存起始节点;

通过promise.setUncancellable()锁定待发送数据,并在发送过程中取消,如果锁定过程中发现其节点已经取消,则调用entry.cancel()取消节点发送,并减少待发送的总字节数。

下面我们看看decrementPendingOutboundBytes方法的源码。

privatevoidwrite(Objectmsg,booleanflush,ChannelPromisepromise){//...AbstractChannelHandlerContextnext=this.findContextOutbound(flush?98304:'耀');Objectm=this.pipeline.touch(msg,next);EventExecutorexecutor=next.executor();if(executor.inEventLoop()){if(flush){next.invokeWriteAndFlush(m,promise);}else{next.invokeWrite(m,promise);}}else{Objecttask;if(flush){task=AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next,m,promise);}else{task=AbstractChannelHandlerContext.WriteTask.newInstance(next,m,promise);}if(!safeExecute(executor,(Runnable)task,promise,m)){((AbstractChannelHandlerContext.AbstractWriteTask)task).cancel();}}}6

AbstractNioByteChannel 类

在这个类中,我们主要看doWrite(ChannelOutboundBuffer in)方法,源码如下。

privatevoidwrite(Objectmsg,booleanflush,ChannelPromisepromise){//...AbstractChannelHandlerContextnext=this.findContextOutbound(flush?98304:'耀');Objectm=this.pipeline.touch(msg,next);EventExecutorexecutor=next.executor();if(executor.inEventLoop()){if(flush){next.invokeWriteAndFlush(m,promise);}else{next.invokeWrite(m,promise);}}else{Objecttask;if(flush){task=AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next,m,promise);}else{task=AbstractChannelHandlerContext.WriteTask.newInstance(next,m,promise);}if(!safeExecute(executor,(Runnable)task,promise,m)){((AbstractChannelHandlerContext.AbstractWriteTask)task).cancel();}}}7

通过一个无限循环,保证可以拿到所有的节点上的ByteBuf,通过这个函数获取节点,Object msg = in.current(); 我们进一步看它的实现,如下,它只会取出我们标记的节点。

privatevoidwrite(Objectmsg,booleanflush,ChannelPromisepromise){//...AbstractChannelHandlerContextnext=this.findContextOutbound(flush?98304:'耀');Objectm=this.pipeline.touch(msg,next);EventExecutorexecutor=next.executor();if(executor.inEventLoop()){if(flush){next.invokeWriteAndFlush(m,promise);}else{next.invokeWrite(m,promise);}}else{Objecttask;if(flush){task=AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next,m,promise);}else{task=AbstractChannelHandlerContext.WriteTask.newInstance(next,m,promise);}if(!safeExecute(executor,(Runnable)task,promise,m)){((AbstractChannelHandlerContext.AbstractWriteTask)task).cancel();}}}8

下面我们看下doWriteInternal(in, msg)的方法源码。

privatevoidwrite(Objectmsg,booleanflush,ChannelPromisepromise){//...AbstractChannelHandlerContextnext=this.findContextOutbound(flush?98304:'耀');Objectm=this.pipeline.touch(msg,next);EventExecutorexecutor=next.executor();if(executor.inEventLoop()){if(flush){next.invokeWriteAndFlush(m,promise);}else{next.invokeWrite(m,promise);}}else{Objecttask;if(flush){task=AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next,m,promise);}else{task=AbstractChannelHandlerContext.WriteTask.newInstance(next,m,promise);}if(!safeExecute(executor,(Runnable)task,promise,m)){((AbstractChannelHandlerContext.AbstractWriteTask)task).cancel();}}}9

使用 jdk 的自旋锁,循环16次,尝试往 jdk 底层的ByteBuffer中写数据,调用函数doWriteBytes(buf);他具体的实现是客户端 channel 的封装类NioSocketChannel实现的源码如下:

publicChannelFuturewriteAndFlush(Objectmsg){returnthis.writeAndFlush(msg,this.newPromise());}publicChannelPromisenewPromise(){returnnewDefaultChannelPromise(this.channel(),this.executor());}0

这个readBytes()依然是抽象方法,因为前面我们曾经把从 ByteBuf 转化成了 Dirct 类型的,所以它的实现类是PooledDirctByteBuf 继续跟进如下:

publicChannelFuturewriteAndFlush(Objectmsg){returnthis.writeAndFlush(msg,this.newPromise());}publicChannelPromisenewPromise(){returnnewDefaultChannelPromise(this.channel(),this.executor());}1

被使用过的节点会被remove()掉, 源码如下:

publicChannelFuturewriteAndFlush(Objectmsg){returnthis.writeAndFlush(msg,this.newPromise());}publicChannelPromisenewPromise(){returnnewDefaultChannelPromise(this.channel(),this.executor());}2

总结

1、调用write方法并没有将数据写到 Socket 缓冲区中,而是写到了一个单向链表的数据结构中,flush才是真正的写出。

2、writeAndFlush等价于先将数据写到netty的缓冲区,再将netty缓冲区中的数据写到Socket缓冲区中,写的过程与并发编程类似,用自旋锁保证写成功。

作者:初念初恋


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/4940.html