`

netty 源码分析之(二.2)Client启动连接

 
阅读更多

来自:http://fbi.taobao.org/?p=64

 

看的主要是Netty的3.X版本,先贴一段Netty初始化代码

  1. // ChannelFactory中主要是线程资源
  2. ClientBootstrap bootstrap = new ClientBootstrap(
  3.   new NioClientSocketChannelFactory(
  4.     Executors.newCachedThreadPool(),
  5.     Executors.newCachedThreadPool()));
  6.  
  7. // 设置业务处理Pipeline>Handler
  8. bootstrap.setPipelineFactory(new TelnetClientPipelineFactory());
  9.  
  10. // Start the connection attempt.
  11. ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
  12.     
  13. // 释放资源
  14. bootstrap.releaseExternalResources();
 
Nettycleint初始化以及连接的时候会做很多事情,bootstrap.connect每次创建一个连接,都会封装成ChannelChannel由以下部分组成

Pipeline就是这个Channel注册的管道,里面表示每一个Handler,是一种链式结构

SocketChannel就是NIO包中进行socket操作的通道

work的选择通过轮转的方式,Client端的bossExecuter主要是用来关注CONNECT事件的(这一部分的处理有很多重复的代码,在Netty4中统一了些)

 

 Worker

netty中真正干底层脏活累活的都是work类,他的结构如下:

WorkPool表示了一个缓冲池,这也限制了使用的线程数,下面是WorkPool的初始化代码

  1. AbstractNioWorkerPool(Executor workerExecutor, int workerCount, boolean allowShutDownOnIdle) {
  2.         if (workerExecutor == null) {
  3.             throw new NullPointerException(“workerExecutor”);
  4.         }
  5.         if (workerCount <= 0) {
  6.             throw new IllegalArgumentException(
  7.                     “workerCount (“ + workerCount + “) ” +
  8.                     “must be a positive integer.”);
  9.         }        
  10.     //创建指定数量的work数组,实例是NioWork,每个Work持有的资源见上图
  11.     //默认是Runtime.getRuntime().availableProcessors() * 2个Work
  12.         workers = new AbstractNioWorker[workerCount];
  13.             
  14.         for (int i = 0; i < workers.length; i++) {
  15.             workers[i] = createWorker(workerExecutor, 
  16.     allowShutDownOnIdle);
  17.         }
  18.         this.workerExecutor = workerExecutor;
  19.     }

可以看到会创建指定个数的work,这里的每个work其实表示的是SelectorThread一种组合,每次创建一个连接,都会选择一个work,并且注册到wokr里的Selectorwork会进行loop,不断地关注通道事件,也就是会说每个workselector)下会注册多个Channel,并且不断地去Select关注的Ops

 

连接过程

下面看下channel注册到workseelct)的代码,连接的时候是在内部的一个Boss类里处理的

所有的连接connect操作都被封装成一个RegisterTask对象,Boss类持有registerTask队列,在loop中不断的去进行select

  1. private static final class RegisterTask implements Runnable {
  2.         private final Boss boss;
  3.         private final NioClientSocketChannel channel;
  4.  
  5.         RegisterTask(Boss boss, NioClientSocketChannel channel) {
  6.             this.boss = boss;
  7.             this.channel = channel;
  8.         }
  9.  
  10.         public void run() {
  11.             try {
  12.            // 注册connect事件
  13.                 channel.channel.register(
  14.                         boss.selector, SelectionKey.OP_CONNECT, channel);
  15.             } catch (ClosedChannelException e) {
  16.                 channel.worker.close(channel, succeededFuture(channel));
  17.             }
  18.  
  19.             int connectTimeout = channel.getConfig().getConnectTimeoutMillis();
  20.             if (connectTimeout > 0) {
  21.                 channel.connectDeadlineNanos = System.nanoTime() + connectTimeout * 1000000L;
  22.             }
  23.         }
  24.     }
 
register方法
  1. void register(NioClientSocketChannel channel) {
  2.            // 封装一个RegisterTask
  3.           Runnable registerTask = new RegisterTask(this, channel);
  4.             Selector selector;
  5.  
  6.             synchronized (startStopLock) {
  7.                 if (!started) {
  8.                     // Open a selector if this worker didn’t start yet.
  9.                     try {
  10.                         this.selector = selector =  Selector.open();
  11.                     } catch (Throwable t) {
  12.                         throw new ChannelException(
  13.                                 “Failed to create a selector.”, t);
  14.                     }
  15.                   …..
  16.                 assert selector != null >> selector.isOpen();
  17.  
  18.                 started = true;
  19.                 boolean offered = registerTaskQueue.offer(registerTask);
  20.                 assert offered;
  21.             }
  22.  
  23.             if (wakenUp.compareAndSet(false, true)) {
  24.                 selector.wakeup();
  25.             }
  26.         }

RegisterTask,放到BossregisterTaskQueue之后,Boss会从boss executer线程池取出一个线程loop不断地处理队列、选择准备就绪的键等。

run方法不断处理感兴趣的事件

  1. public void run() {
  2.             boolean shutdown = false;
  3.             Selector selector = this.selector;
  4.             long lastConnectTimeoutCheckTimeNanos = System.nanoTime();
  5.             for (;;) {
  6.                
  7.                 try {
  8.                     int selectedKeyCount = selector.select(10);
  9.                    
  10.     //在网上有人提问为啥用timeout的select方法,
  11.     http://stackoverflow.com/questions/10189603/netty-architecture-questions-about-nioworker-loop 作者给出了回答,因为在loop中除了select感兴趣的事件还要处理一些TaskQueue,里面主要是注册感兴趣的事件,所以如果不是用timeout的方法,则有可能永远无法注册
  12.                     processRegisterTaskQueue();
  13.  
  14.                     if (selectedKeyCount > 0) {
  15.                         processSelectedKeys(selector.selectedKeys());
  16.                     }
  17.                    
  18.                 } catch (Throwable t) {
  19.                     
  20.                 }
  21.             }
  22.         }

在loop中,processRegisterTaskQueue会处理需要注册的任务,processSelectedKeys处理连接事件:

  1. private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
  2.             for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
  3.                 SelectionKey k = i.next();
  4.                 i.remove();
  5.  
  6.                 if (!k.isValid()) {
  7.                     close(k);
  8.                     continue;
  9.                 }
  10.  
  11.                 if (k.isConnectable()) {
  12.                     connect(k);
  13.                 }
  14.             }
  15.         }
  16. private void connect(SelectionKey k) {
  17.             NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
  18.             try {
  19.                 if (ch.channel.finishConnect()) {
  20.                     k.cancel();
  21.     //将连接上的通道注册到work中,交给work去注册read和write(注册reaad/write的模式是类似的),后面再详细说
  22.                     ch.worker.register(ch, ch.connectFuture);
  23.                 }
  24.             } catch (Throwable t) {
  25.                 ch.connectFuture.setFailure(t);
  26.                 fireExceptionCaught(ch, t);
  27.                 k.cancel(); // Some JDK implementations run into an infinite loop without this.
  28.                 ch.worker.close(ch, succeededFuture(ch));
  29.             }
  30.         }

现在总结下在Client连接过程中事件发生顺序:

UpStream.ChannelState.OPEN(已经open—–>DownStream.ChannelState.BOUND(需要绑定 localhost)——>DownStream.CONNECTED(需要连接,应该是注册Selector的意思)——–>UpStream.ChannelState.BOUND(已经绑定 localhost)——->UpStream.CONNECTED(连接成功)

在这一系列初始化都完成之后,channel就可以拿来write和接收read数据了,最后给client的模式来个总结

 

分享到:
评论

相关推荐

    netty-all-4.1.32.Final-sources.jar 最新版netty源码全部包

    netty-buffer-4.1.32.Final-sources.jar netty-buffer-4.1.32.Final.jar netty-build-22-sources.jar netty-build-22.jar netty-codec-4.1.32.Final-sources.jar netty-codec-4.1.32.Final.jar netty-codec-...

    netty-netty-4.1.36.Final.rar

    netty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-netty-4.1.36.Final.rarnetty源码netty-...

    netty-common-4.1.65.Final-API文档-中英对照版.zip

    赠送jar包:netty-common-4.1.65.Final.jar; 赠送原API文档:netty-common-4.1.65.Final-javadoc.jar; 赠送源代码:netty-common-4.1.65.Final-sources.jar; 赠送Maven依赖信息文件:netty-common-4.1.65.Final....

    netty-buffer-4.1.68.Final-API文档-中文版.zip

    赠送jar包:netty-buffer-4.1.68.Final.jar; 赠送原API文档:netty-buffer-4.1.68.Final-javadoc.jar; 赠送源代码:netty-buffer-4.1.68.Final-sources.jar; 赠送Maven依赖信息文件:netty-buffer-4.1.68.Final....

    netty-netty-4.1.79.Final.tar.gz

    netty-netty-4.1.79.Final.tar.gz

    netty-all-4.1.68.Final-API文档-中英对照版.zip

    赠送jar包:netty-all-4.1.68.Final.jar; 赠送原API文档:netty-all-4.1.68.Final-javadoc.jar; 赠送源代码:netty-all-4.1.68.Final-sources.jar; 赠送Maven依赖信息文件:netty-all-4.1.68.Final.pom; 包含...

    netty-buffer-4.1.73.Final-API文档-中文版.zip

    赠送jar包:netty-buffer-4.1.73.Final.jar; 赠送原API文档:netty-buffer-4.1.73.Final-javadoc.jar; 赠送源代码:netty-buffer-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-buffer-4.1.73.Final....

    netty-netty-3.10.6.Final.tar.gz

    Netty (netty-netty-3.10.6.Final.tar.gz)是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。 “快速和简单”并...

    Netty (netty-netty-4.0.56.Final.tar.gz)

    Netty (netty-netty-4.0.56.Final.tar.gz)是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。 “快速和简单”并...

    netty-all-4.1.23.Final-API文档-中英对照版.zip

    赠送jar包:netty-all-4.1.23.Final.jar; 赠送原API文档:netty-all-4.1.23.Final-javadoc.jar; 赠送源代码:netty-all-4.1.23.Final-sources.jar; 赠送Maven依赖信息文件:netty-all-4.1.23.Final.pom; 包含...

    netty-common-4.1.68.Final-API文档-中文版.zip

    赠送jar包:netty-common-4.1.68.Final.jar; 赠送原API文档:netty-common-4.1.68.Final-javadoc.jar; 赠送源代码:netty-common-4.1.68.Final-sources.jar; 赠送Maven依赖信息文件:netty-common-4.1.68.Final....

    netty-handler-4.1.73.Final-API文档-中文版.zip

    赠送jar包:netty-handler-4.1.73.Final.jar; 赠送原API文档:netty-handler-4.1.73.Final-javadoc.jar; 赠送源代码:netty-handler-4.1.73.Final-sources.jar; 赠送Maven依赖信息文件:netty-handler-4.1.73....

    netty-all-4.1.27.Final-API文档-中文版.zip

    赠送jar包:netty-all-4.1.27.Final.jar; 赠送原API文档:netty-all-4.1.27.Final-javadoc.jar; 赠送源代码:netty-all-4.1.27.Final-sources.jar; 赠送Maven依赖信息文件:netty-all-4.1.27.Final.pom; 包含...

    netty-4.1.48.Final.tar.bz2

    netty-4.1.48.Final.jar包

    netty-all-4.1.29.Final-sources.jar 最新版netty源码

    Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programm ...

    netty-all-4.1.52.Final.jar

    亲自测试可以用的netty-all-4.1.52.Final.jar包,当前最新的稳定版本。替换原来的旧版本,运行稳定。

    Netty (netty-netty-4.1.77.Final.tar.gz)

    Netty (netty-netty-4.1.77.Final.tar.gz)是一个 NIO 客户端服务器框架,可以快速轻松地开发协议服务器和客户端等网络应用程序。它极大地简化和流线了网络编程,例如 TCP 和 UDP 套接字服务器。 “快速和简单”并...

Global site tag (gtag.js) - Google Analytics