聪明文档网

聪明文档网

最新最全的文档下载
当前位置: 首页> netty 4.0.9final版本

netty 4.0.9final版本

时间:2014-08-11 11:19:29    下载该word文档

作者:pingnanlee

博客http://blog.csdn.net/pingnanlee/article/details/11929009

备注:本文的分析基于netty 4.0.9final版本,仅对Nio进行分析,因为本人对Socket编程比较感兴趣。

1channel总体机构图

nio channel的总体结构图如下:

2、关键类和接口分析

2.1  基于NioServerSocketChannel进行分析

1Channel

Channel是顶层接口,继承了AttributeMap, ChannelOutboundInvoker, ChannelPropertyAccess, Comparable,它作为一个具体IO能力的组件提供给开发者,包括read, write, connect, and bind等操作。另外还提供了Channel配置的功能,以及获取Channel所在的eventloop的功能。

2AbstractChannel

AbstractChannel实现Channel接口,关键代码如下:

[java] view plaincopy

1. private final Channel parent;   

2. private final long hashCode = ThreadLocalRandom.current().nextLong();  

3. private final Unsafe unsafe;  

4. private final DefaultChannelPipeline pipeline;  

5. private final ChannelFuture succeededFuture = new SucceededChannelFuture(thisnull);  

6. private final VoidChannelPromise voidPromise = new VoidChannelPromise(thistrue);  

7. private final VoidChannelPromise unsafeVoidPromise = new VoidChannelPromise(thisfalse);  

8. private final CloseFuture closeFuture = new CloseFuture(this);  

9.   

10. private volatile SocketAddress localAddress;  

11. private volatile SocketAddress remoteAddress;  

12. private volatile EventLoop eventLoop;  

13. private volatile boolean registered;  

14.   

15. /** Cache for the string representation of this channel */  

16. private boolean strValActive;  

17. private String strVal;"code" class="java">    /** 

18.  * Creates a new instance. 

19.  * 

20.  * @param parent 

21.  *        the parent of this channel. {@code null} if there's no parent. 

22.  */  

23. protected AbstractChannel(Channel parent) {  

24.     this.parent = parent;  

25.     unsafe = newUnsafe();  

26.     pipeline = new DefaultChannelPipeline(this);  

27. }  

比较重要的对象是pipelineunsafe,它们提供对readwritebind等操作的具体实现。

3AbstractNioChannel

AbstractNioChannel继承AbstractChannel,从这个类开始涉及到JDKsocket,参考如下关键代码:

[java] view plaincopy

1.     private final SelectableChannel ch;  

2.     protected final int readInterestOp;  

3.     private volatile SelectionKey selectionKey;  

4.     private volatile boolean inputShutdown;  

5. "code" class="java">    @Override  

6.     protected void doRegister() throws Exception {  

7.         boolean selected = false;  

8.         for (;;) {  

9.             try {  

10.                 selectionKey = javaChannel().register(eventLoop().selector, 0this);  

11.                 return;  

12.             } catch (CancelledKeyException e) {  

13.                 if (!selected) {  

14.                     // Force the Selector to select now as the "canceled" SelectionKey may still be  

15.                     // cached and not removed because no Select.select(..) operation was called yet.  

16.                     eventLoop().selectNow();  

17.                     selected = true;  

18.                 } else {  

19.                     // We forced a select operation on the selector before but the SelectionKey is still cached  

20.                     // for whatever reason. JDK bug ?  

21.                     throw e;  

22.                 }  

23.             }  

24.         }  

25.     }     /**     * Create a new instance     *     * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}     * @param ch                the underlying {@link SelectableChannel} on which it operates     * @param readInterestOp    the ops to set to receive data from the {@link SelectableChannel}     */    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {        super(parent);        this.ch = ch;        this.readInterestOp = readInterestOp;        try {            ch.configureBlocking(false);        } catch (IOException e) {            try {                ch.close();            } catch (IOException e2) {                if (logger.isWarnEnabled()) {                    logger.warn(                            "Failed to close a partially initialized socket.", e2);                }            }            throw new ChannelException("Failed to enter non-blocking mode.", e);        }    }  

从上面的代码可以看出,这里定义真正的Socket Channel(SelectableChannel),关心的事件,注册后的key。将Socket设置为非阻塞,这是所有异步IO的关键,也就是说不管多么好的框架,底层基础还是不会变,可见学好基础的重要性啊,^_^。这里重点要关注一下register函数,这个函数是将Channel和事件循环进行关联的关键。每个事件循环都有一个自己的selectorchannel实际上是注册到了相应eventloopselector中,这也是Nio Socket编程的基础。

从这个类中已经可以看到nettychannel是如何和socket nio channel关联的了,以及channel是如何和eventloop关联的了。

4AbstractNioMessageChannel

这个类继承AbstractNioChannel,主要是提供了一个newUnsafe方法返回NioMessageUnsafe对象的实例(实现read方法)。另外还定义doReadMessagesdoWriteMessage两个抽象方法。

5ServerSocketChannelServerChannel

这两个接口主要是定义了一个config方法,以及获取网络地址的方法。

6NioServerSocketChannel

NioServerSocketChannel继承AbstractNioMessageChannel,实现ServerSocketChannel,它是一个具体类,提供给开发者使用。

[java] view plaincopy

1. /** 

2.  * A {@link io.netty.channel.socket.ServerSocketChannel} implementation which uses 

3.  * NIO selector based implementation to accept new connections. 

4.  */  

5. public class NioServerSocketChannel extends AbstractNioMessageChannel  

6.                              implements io.netty.channel.socket.ServerSocketChannel {  

7.   

8.     private static final ChannelMetadata METADATA = new ChannelMetadata(false);  

9.   

10.     private static final InternalLogger logger = InternalLoggerFactory.getInstance(NioServerSocketChannel.class);  

11.   

12.     private static ServerSocketChannel newSocket() {  

13.         try {  

14.             return ServerSocketChannel.open();  

15.         } catch (IOException e) {  

16.             throw new ChannelException(  

17.                     "Failed to open a server socket.", e);  

18.         }  

19.     }  

20.   

21.     private final ServerSocketChannelConfig config;  

22.   

23.     /** 

24.      * Create a new instance 

25.      */  

26.     public NioServerSocketChannel() {  

27.         super(null, newSocket(), SelectionKey.OP_ACCEPT);  

28.         config = new DefaultServerSocketChannelConfig(this, javaChannel().socket());  

29.     }  

30.   

31.     @Override  

32.     protected ServerSocketChannel javaChannel() {  

33.         return (ServerSocketChannel) super.javaChannel();  

34.     }  

35.   

36.     @Override  

37.     protected void doBind(SocketAddress localAddress) throws Exception {  

38.         javaChannel().socket().bind(localAddress, config.getBacklog());  

39.     }  

40.   

41.     @Override  

42.     protected void doClose() throws Exception {  

43.         javaChannel().close();  

44.     }  

45.   

46.     @Override  

47.     protected int doReadMessages(List buf) throws Exception {  

48.         SocketChannel ch = javaChannel().accept();  

49.   

50.         try {  

51.             if (ch != null) {  

52.                 buf.add(new NioSocketChannel(this, ch));  

53.                 return 1;  

54.             }  

55.         } catch (Throwable t) {  

56.             logger.warn("Failed to create a new channel from an accepted socket.", t);  

57.   

58.             try {  

59.                 ch.close();  

60.             } catch (Throwable t2) {  

61.                 logger.warn("Failed to close a socket.", t2);  

62.             }  

63.         }  

64.   

65.         return 0;  

66.     }  

67.   

68.     // Unnecessary stuff  

69.     @Override  

70.     protected boolean doConnect(  

71.             SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {  

72.         throw new UnsupportedOperationException();  

73.     }  

74.   

75.     @Override  

76.     protected void doFinishConnect() throws Exception {  

77.         throw new UnsupportedOperationException();  

78.     }  

79.   

80.     @Override  

81.     protected SocketAddress remoteAddress0() {  

82.         return null;  

83.     }  

84.   

85.     @Override  

86.     protected void doDisconnect() throws Exception {  

87.         throw new UnsupportedOperationException();  

88.     }  

89.   

90.     @Override  

91.     protected boolean doWriteMessage(Object msg, ChannelOutboundBuffer in) throws Exception {  

92.         throw new UnsupportedOperationException();  

93.     }  

94. }  

从这个具体类中,我们可以看到,调用JDK函数ServerSocketChannel.open();生成了底层ServerSocketChannel对象,将NioServerSocketChannelServerSocketChannel相关联,并且传递了感兴趣的事件OP_ACCEPT给父类。实现了doReadMessage函数,实际上就是accept一个SocketChanel

2.2  基于NioSocketChannel进行分析

NioServerSocketChannel中介绍过的类和接口,这里不再介绍。其实和NioServerSocketChannel差不多,只是它是基于Byte的。

1AbstractNioByteChannel

这个类继承AbstractNioChannel,主要也是提供了一个newUnsafe方法返回NioByteUnsafe对象的实例(实现read方法)。另外还定义doReadBytesdoWriteBytes两个抽象方法。

2SocketChannel

这个接口继承了Channel接口,定义了多个shutdown方法,以及一个parent方法,返回该SocketChannel相应的ServerSocketChannel

3NioSocketChannel

这个类继承AbstractNioByteChannel,并且实现SocketChannel接口,是一个具体类,提供给开发者使用。

[java] view plaincopy

1. /** 

2.  * {@link io.netty.channel.socket.SocketChannel} which uses NIO selector based implementation. 

3.  */  

4. public class NioSocketChannel extends AbstractNioByteChannel implements io.netty.channel.socket.SocketChannel {  

5.   

6.     private static final ChannelMetadata METADATA = new ChannelMetadata(false);  

7.   

8.     private static SocketChannel newSocket() {  

9.         try {  

10.             return SocketChannel.open();  

11.         } catch (IOException e) {  

12.             throw new ChannelException("Failed to open a socket.", e);  

13.         }  

14.     }  

15.   

16.     private final SocketChannelConfig config;  

17.   

18.     /** 

19.      * Create a new instance 

20.      */  

21.     public NioSocketChannel() {  

22.         this(newSocket());  

23.     }  

24.   

25.     /** 

26.      * Create a new instance using the given {@link SocketChannel}. 

27.      */  

28.     public NioSocketChannel(SocketChannel socket) {  

29.         this(null, socket);  

30.     }  

31.   

32.     /** 

33.      * Create a new instance 

34.      * 

35.      * @param parent    the {@link Channel} which created this instance or {@code null} if it was created by the user 

36.      * @param socket    the {@link SocketChannel} which will be used 

37.      */  

38.     public NioSocketChannel(Channel parent, SocketChannel socket) {  

39.         super(parent, socket);  

40.         config = new DefaultSocketChannelConfig(this, socket.socket());  

41.     }  

42.   

43.     @Override  

44.     protected SocketChannel javaChannel() {  

45.         return (SocketChannel) super.javaChannel();  

46.     }  

47.   

48.     @Override  

49.     public boolean isActive() {  

50.         SocketChannel ch = javaChannel();  

51.         return ch.isOpen() && ch.isConnected();  

52.     }  

53.   

54.     @Override  

55.     public boolean isInputShutdown() {  

56.         return super.isInputShutdown();  

57.     }  

58.   

59.     @Override  

60.     public InetSocketAddress localAddress() {  

61.         return (InetSocketAddress) super.localAddress();  

62.     }  

63.   

64.     @Override  

65.     public InetSocketAddress remoteAddress() {  

66.         return (InetSocketAddress) super.remoteAddress();  

67.     }  

68.   

69.     @Override  

70.     public boolean isOutputShutdown() {  

71.         return javaChannel().socket().isOutputShutdown() || !isActive();  

72.     }  

73.   

74.     @Override  

75.     public ChannelFuture shutdownOutput() {  

76.         return shutdownOutput(newPromise());  

77.     }  

78.   

79.     @Override  

80.     public ChannelFuture shutdownOutput(final ChannelPromise promise) {  

81.         EventLoop loop = eventLoop();  

82.         if (loop.inEventLoop()) {  

83.             try {  

84.                 javaChannel().socket().shutdownOutput();  

85.                 promise.setSuccess();  

86.             } catch (Throwable t) {  

87.                 promise.setFailure(t);  

88.             }  

89.         } else {  

90.             loop.execute(new Runnable() {  

91.                 @Override  

92.                 public void run() {  

93.                     shutdownOutput(promise);  

94.                 }  

95.             });  

96.         }  

97.         return promise;  

98.     }  

99.   

100.     @Override  

101.     protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {  

102.         if (localAddress != null) {  

103.             javaChannel().socket().bind(localAddress);  

104.         }  

105.   

106.         boolean success = false;  

107.         try {  

108.             boolean connected = javaChannel().connect(remoteAddress);  

109.             if (!connected) {  

110.                 selectionKey().interestOps(SelectionKey.OP_CONNECT);  

111.             }  

112.             success = true;  

113.             return connected;  

114.         } finally {  

115.             if (!success) {  

116.                 doClose();  

117.             }  

118.         }  

119.     }  

120.   

121.     @Override  

122.     protected void doFinishConnect() throws Exception {  

123.         if (!javaChannel().finishConnect()) {  

124.             throw new Error();  

125.         }  

126.     }  

127.   

128.     @Override  

129.     protected void doDisconnect() throws Exception {  

130.         doClose();  

131.     }  

132.   

133.     @Override  

134.     protected void doClose() throws Exception {  

135.         javaChannel().close();  

136.     }  

137.   

138.     @Override  

139.     protected int doReadBytes(ByteBuf byteBuf) throws Exception {  

140.         return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());  

141.     }  

142.   

143.     @Override  

144.     protected int doWriteBytes(ByteBuf buf) throws Exception {  

145.         final int expectedWrittenBytes = buf.readableBytes();  

146.         final int writtenBytes = buf.readBytes(javaChannel(), expectedWrittenBytes);  

147.         return writtenBytes;  

148.     }  

149.   

150.     @Override  

151.     protected long doWriteFileRegion(FileRegion region) throws Exception {  

152.         final long position = region.transfered();  

153.         final long writtenBytes = region.transferTo(javaChannel(), position);  

154.         return writtenBytes;  

155.     }  

156.   

157.     @Override  

158.     protected void doWrite(ChannelOutboundBuffer in) throws Exception {  

159.         for (;;) {  

160.             // Do non-gathering write for a single buffer case.  

161.             final int msgCount = in.size();  

162.             if (msgCount <= 1) {  

163.                 super.doWrite(in);  

164.                 return;  

165.             }  

166.   

167.             // Ensure the pending writes are made of ByteBufs only.  

168.             ByteBuffer[] nioBuffers = in.nioBuffers();  

169.             if (nioBuffers == null) {  

170.                 super.doWrite(in);  

171.                 return;  

172.             }  

173.   

174.             int nioBufferCnt = in.nioBufferCount();  

175.             long expectedWrittenBytes = in.nioBufferSize();  

176.   

177.             final SocketChannel ch = javaChannel();  

178.             long writtenBytes = 0;  

179.             boolean done = false;  

180.             for (int i = config().getWriteSpinCount() - 1; i >= 0; i --) {  

181.                 final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);  

182.                 if (localWrittenBytes == 0) {  

183.                     break;  

184.                 }  

185.                 expectedWrittenBytes -= localWrittenBytes;  

186.                 writtenBytes += localWrittenBytes;  

187.                 if (expectedWrittenBytes == 0) {  

188.                     done = true;  

189.                     break;  

190.                 }  

191.             }  

192.   

193.             if (done) {  

194.                 // Release all buffers  

195.                 for (int i = msgCount; i > 0; i --) {  

196.                     in.remove();  

197.                 }  

198.   

199.                 // Finish the write loop if no new messages were flushed by in.remove().  

200.                 if (in.isEmpty()) {  

201.                     clearOpWrite();  

202.                     break;  

203.                 }  

204.             } else {  

205.                 // Did not write all buffers completely.  

206.                 // Release the fully written buffers and update the indexes of the partially written buffer.  

207.   

208.                 for (int i = msgCount; i > 0; i --) {  

209.                     final ByteBuf buf = (ByteBuf) in.current();  

210.                     final int readerIndex = buf.readerIndex();  

211.                     final int readableBytes = buf.writerIndex() - readerIndex;  

212.   

213.                     if (readableBytes < writtenBytes) {  

214.                         in.progress(readableBytes);  

215.                         in.remove();  

216.                         writtenBytes -= readableBytes;  

217.                     } else if (readableBytes > writtenBytes) {  

218.                         buf.readerIndex(readerIndex + (int) writtenBytes);  

219.                         in.progress(writtenBytes);  

220.                         break;  

221.                     } else { // readableBytes == writtenBytes  

222.                         in.progress(readableBytes);  

223.                         in.remove();  

224.                         break;  

225.                     }  

226.                 }  

227.   

228.                 setOpWrite();  

229.                 break;  

230.             }  

231.         }  

232.     }  

233. }  

从代码中可以看出,调用了SocketChannel.open();创建SocketChannel对象,将NioSocketChannelSocketChannel关联。主要是实现了发送数据的doWrite函数。

3、总结

NioSocketChannelNioServerSocketChannel这两个具体类是提供给开发者使用的。从上面的分析可以看出,实际上他们底层关联的还是JDKSocketChannelServerSocketChannelnettySocket Channel是对JDKSocket Channel的封装,它将Channelloop关联,在loop中处理Channel的事件通知。

备注:Channelnetty的核心数据结构,这篇文章只是对ChannelSocket部分进行简单分析,不过通过它基本上已经能够了解netty是如何将它的Channel和上一篇的event关联的,以及它是如何将channelJDKchannel关联的。

免费下载 Word文档免费下载: netty 4.0.9final版本

  • 29.8

    ¥45 每天只需1.0元
    1个月 推荐
  • 9.9

    ¥15
    1天
  • 59.8

    ¥90
    3个月

选择支付方式

  • 微信付款
郑重提醒:支付后,系统自动为您完成注册

请使用微信扫码支付(元)

订单号:
支付后,系统自动为您完成注册
遇到问题请联系 在线客服