Netty Server服务端启动流程


Netty Server服务端启动流程

  1. 调用ServerBootstrap的bind()函数触发new ParentChannel,关联ParentChannel和NIO.selector,绑定ParentChannel去listen目标端口,增加AcceptorHandler到 pipeline->开始loop 监听NIO事件->

  2. ParentChannel监听到connect事件(UDP的第一个read事件),new childChannel,并通过Pipeline.fireChannelRead()传给AcceptorHandler ->

  3. 在AcceptorHandler获取childChannel的配置(childHandler,Option,Attribute),并绑定到childChannel->

  4. 将packet及后续IO事件发给childChannel

第三步为什么需要把 ChildChannel 通过parentChannel传给AcceptorHandler呢,直接设置不行吗?

因为childChannel的自定义handler是通过ServerBootstrap的childHandler()传给AcceptorHandler的.

同时ServerBootstrap在new channel默认给 patentChannel添加了一个AcceptorHandler,是parent和child的中间桥梁.

下面是源码分析 先上一段服务端启动的代码

     //new Server启动器
     UdpServerBootstrap b = new UdpServerBootstrap();
     //new 线程组
     EventLoopGroup group = new NioEventLoopGroup();

     b.group(group)//给server配置线程组

     //给server配置 Channel(Channel 是对 socket 的封装)类型,这里是UDP,通过反射无参构造函数初始化Channel
     .channel(UdpServerChannel.class)
     .option(ChannelOption.SO_BROADCAST, true)//配置Chanel
     .childHandler(new ChannelInitializer<UdpServerChildChannel>() {//配置ChildChanel的Handler构造器
                 @Override
                 protected void initChannel(UdpServerChildChannel ch) throws Exception {
                     ch.pipeline().addLast(
                               //new LoggingHandler(LogLevel.INFO),
                             new UdpServerHandler());//自定义的Handler就在这里加
                 }
             });

     //bind 服务端监听的目标端口
     b.bind(2555).sync()
     //设置服务关闭的回调
     .channel().closeFuture().await();

Channel是对 socket 的封装

1553774673904

ParentChannel 的初始化配置

1553774726164

调用ServerBootstrap的bind()函数触发new ParentChannel,并继续init,init中关联ParentChannel和NIO.selector,绑定ParentChannel到listen目标端口,增加AcceptorHandler到 pipeline->开始loop 监听NIO事件->

//关键代码:
//ServerBootstrap.init(Channel channel)
        //...
        ChannelPipeline p = channel.pipeline();
        //...
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        //ServerUdpBootstrapAcceptor被addLast到ParentChannel的Pipeline
                        pipeline.addLast(new ServerUdpBootstrapAcceptor(
                                ch, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });

unsafe 是Netty与Java 原生NIO的桥梁

ChildChannel 的 new 过程

//关键代码:
//UdpServerChannel.unsafe.read()

//...
     for (int i = 0; i < readBuf.size(); i++) {
         DatagramPacket packet = readBuf.get(i);
         UdpServerChildChannel childChannel = getOrCreateChildChannel(packet, UdpServerChannel.this);
         if (!childChannel.isActive()) {
             packet.release();
             continue;
         }
         childChannel.pipeline().fireChannelRead(packet);
     }
//...

ChildChannel的构造函数调用过程

//UdpServerChannel.getOrCreateChildChannel()函数

 protected UdpServerChildChannel getOrCreateChildChannel(DatagramPacket datagramPacket, UdpServerChannel parent) {
               InetSocketAddress sender = datagramPacket.sender();
               if (connectingMap.containsKey(sender)) {
                   log.info("exist child");
                   return connectingMap.get(sender);
               }

               final UdpServerChildChannel udpChildChannel = new UdpServerChildChannel(parent, parent.javaChannel(), datagramPacket);
               //将ChildChannel传递给ParentChannel的Handler(实际上只有一个BootstrapAcceptor )
               parent.pipeline().fireChannelRead(udpChildChannel);
               parent.pipeline().fireChannelReadComplete();

               connectingMap.put(sender,udpChildChannel);

               return udpChildChannel;
           }

ChildChannel的初始化配置过程(ChildHandler初始化过程)

// UdpServerBootstrapAcceptorHandler.channelRead()方法

   public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final UdpServerChildChannel udpChildChannel = (UdpServerChildChannel) msg;
        log.info("Acceptor.initChildChannel:" + udpChildChannel.remoteAddress());
        udpChildChannel.pipeline().addLast(childHandler);
        UdpServerBootstrap.setChannelOptions(udpChildChannel, childOptions, logger);
        for (Map.Entry<AttributeKey<?>, Object> e : childAttrs) {
            udpChildChannel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }
        try {
            ctx.channel().eventLoop().register(udpChildChannel).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(udpChildChannel, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(udpChildChannel, t);
        }

    }

https://www.processon.com/diagraming/5c9335dce4b02b2ce4a149f8

最后更新于 10th May 2019