初探Netty

论坛 期权论坛 脚本     
匿名网站用户   2020-12-19 15:00   17   0

在开始了解Netty是什么之前,我们先来回顾一下,如果我们需要实现一个客户端与服务端通信的程序,使用传统的IO编程,应该如何实现?

IO编程

我们简化下场景:客户端每隔两秒发送一个带有时间戳的"hello world"给服务端,服务端收到之后打印.
为了方便演示,下面例子中,服务端与客户端各一个类,把这两个类拷贝到你的IDE中,先后运行IOServer.java和IOClientServer.java可看到效果.
下面是传统的IO编程中服务端实现

public class IOServer {

    public static void main(String[] args) throws Exception {


            ServerSocket serverSocket = new ServerSocket(8000);

            // (1) 接收新连接线程
            new Thread(() -> {
                while (true) {
                    try {
                        // (2) 阻塞方法获取新的连接
                        Socket socket = serverSocket.accept();

                        // (3) 每一个新的连接都创建一个线程,负责读取数据
                        new Thread(() -> {
                            try {
                                byte[] data = new byte[1024];
                                InputStream inputStream = socket.getInputStream();
                                while (true) {
                                    int len;
                                    // (3) 按字节流方式读取数据
                                    while ((len = inputStream.read(data)) != -1) {
                                        System.out.println(new String(data, 0, len));
                                    }
                                }
                            } catch (IOException e) {
                            }
                        }).start();

                    } catch (IOException e) {
                    }

                }
            }).start();
        }


}

server端首先创建了一个serverSocket来监听8000端口,然后创建一个线程,线程里面不断调用阻塞方法serverSocket.accept();获取新的连接,见(1),当获取到新的连接之后,给每条线程创建一个新的线程,这个线程负责从该连接中读取数据,见(2),然后读取数据是以字节流的方式,见(3).
下面是传统的IO编程中客户端实现

public class IOClient {
    public static void main(String[] args) {
        new Thread(() ->{
            try {
                Socket socket = new Socket("127.0.0.1",8000);
                while(true){
                    try{
                        socket.getOutputStream().write((new Date() + "hello world").getBytes());
                        socket.getOutputStream().flush();
                        Thread.sleep(2000);
                    }catch (Exception e){

                    }
                }
            }  catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

客户端的代码相对简单,连接上服务端8000端口之后,每隔2秒,我们向服务端写一个带有时间戳的"hello world"
IO编程模型在客户端较少的情况下运行良好,但是对客户端比较多的业务来说,单机服务端可能需要支撑成千上万的连接,IO模型可能就不太合适了,我们来分析一下原因.
上面的demo,从服务端代码中可以看到,在传统的IO模型中,每个连接创建成功之后都需要一个线程来维护,每个线程包含一个while死循环,那么1w个连接对应1w个线程,继而1w个死循环,这就带来了如下几个问题:
1.线程资源受限:线程是操作系统中非常宝贵的资源,同一时刻有大量的线程处于阻塞状态是非常严重的资源浪费,操作系统耗不起
2.线程切换效率低下:单机cpu核数固定,线程爆炸之后操作系统频换进行线程切换,应用性能急剧下降.
3.除了以上两个问题,IO编程中,我们看到数据读写是一字节流为单位,效率不高.
为了解决这三个问题,JDK在1.4之后提出了NIO

NIO编程

下面简单描述一下NIO是如何解决以上三个问题的.

线程资源受限

NIO编程模型中,新来一个连接不在创建一个新的线程,而是可以把这条连接直接绑定到某个固定的线程,然后这条连接所有的读写都由这个线程来负责,那么他是怎么做到的?我们用一副图来对比一下IO与NIO
在这里插入图片描述

如上图所示,IO模型中,一个连接来了,会创建一个线程,对应一个while死循环,死循环的目的就是不断监测这条连接上是否有数据可读,大多数情况下,1W个连接里面同一时刻只有少量的连接有数据可读,因此,很多个while死循环就能监测1w个连接是否有数据可读的呢?
这就是NIO模型中selector的作用,一条连接来了之后,现在不创建一个while死循环去监听是否有数据可读了,而是直接把这条连接注册到selector上,然后,监测这个selector,就可以批量监测出有数据可读的连接,进而读取数据,下面我在举个非常简单的生活中的例子说明IO与NIO的区别
在一家幼儿园里,小朋友有上厕所的需求,小朋友都太小以至于你要问他要不要上厕所,他才会告诉你.幼儿园一共有100个小朋友,有两种方案可以解决系小朋友上厕所的问题:
1.每个小朋友配一个老师.每个老师隔断时间询问小朋友是否要上厕所,如果要上,就领他去上厕所,100个小朋友就需要100个老师来徐闻,并且每个小朋友上厕所的时候都需要一个老师领着他去上,这就是IO模型,一个连接对应一个线程.
2.所有的小朋友都配同一个老师.这个老师隔断时间询问所有的小朋友是否有人要上厕所,然后每一个时刻把所有要上厕所的小朋友批量领到厕所,这就是NIO模型,所有小朋友都注册到同一个老师,对应的就是所有的连接注册到一个线程,然后批量轮询.
这就是NIO模型解决线程资源受限的方案,实际开发过程中,我们开发多个线程,每个线程都管理者一批连接,相对于IO模型中一个线程管理一条连接,消耗的线程资源大幅度减少.

线程切换效率低下

由于NIO模型中线程数量达达降低,线程切换效率因此也大幅度提高

IO读写以字节为单位

NIO解决这个问题的方式是数据读写不在以字节为单位,而是以字节块为单位.IO模型中,每次都是以操作系统底层一个字节一个字节地读取数据,而NIO维护一个缓冲区,每次可以从这个缓冲区里面读取一块的数据,这就好比一盘美味的豆子放在那你的面前,你用筷子一个一个夹(每次一个),肯定不如要勺子挖着吃(每次一批)效率来的高
简单讲完了JDK NIO的解决方案之后,我们接下来使用NIO的方案替换掉IO的方案,我们先来看看,如果用JDK原生的NIO来实现服务端,该怎么做
NIOServer.java

public class NioServer {
    public static void main(String[] args) throws IOException {
        Selector serverSelector = Selector.open();
        Selector clientSelector = Selector.open();

        new Thread(() ->{
            try {
                // 对应IO编程中的服务端启动
                ServerSocketChannel listenerChannel = ServerSocketChannel.open();
                listenerChannel.socket().bind(new InetSocketAddress(8000));
                listenerChannel.configureBlocking(false);
                listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);
                while(true){
                    // 监测是否有新的连接,这里的1指的是阻塞的时间为1ms
                    if(serverSelector.select(1) > 0){
                        Set<SelectionKey> set = serverSelector.selectedKeys();
                        Iterator<SelectionKey> keyIterator = set.iterator();

                        while(keyIterator.hasNext()){
                            SelectionKey key = keyIterator.next();
                            if(key.isAcceptable()){
                                try{
                                    // (1)每来一个新连接,不需要创建一个线程,而是直接注册到clientSelector
                                    SocketChannel clientChannel = (SocketChannel) key.channel();
                                    clientChannel.configureBlocking(false);
                                    clientChannel.register(clientSelector,SelectionKey.OP_ACCEPT);
                                }finally {
                                    keyIterator.remove();
                                }
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() ->{
            try{
                while(true){
                    // (2) 批量轮询是否有哪些连接有数据可读,这里的1指的是阻塞的时间为1ms
                    if(clientSelector.select(1) > 0){
                        Set<SelectionKey> set = clientSelector.selectedKeys();
                        Iterator<SelectionKey> keyIterator = set.iterator();

                        while(keyIterator.hasNext()){
                            SelectionKey key = keyIterator.next();
                            if(key.isReadable()){
                                try{
                                   SocketChannel clientChannel = (SocketChannel) key.channel();
                                   ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                                   // (3)读取数据以块为单位批量读取
                                    clientChannel.read(byteBuffer);
                                    byteBuffer.flip();
                                    System.out.println(Charset.defaultCharset().newDecoder().decode(byteBuffer)
                                            .toString());
                                }finally {
                                    keyIterator.remove();
                                    key.interestOps(SelectionKey.OP_READ);
                                }
                            }

                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }).start();
    }
}

相信大部分没有接触过NIO的同学应该会直接跳过代码来到这一行:原来使用JDK原生NIO的API实现一个简单的服务端通信程序是如此复杂.
我们先对照NIO来解释一下几个核心思路
1.NIO模型中通常会有两个线程,每个线程绑定一个轮询器selector,在我们这个例子中serverSelector负责轮询是否有新的连接,clientSelector负责轮询连接数是否有数据可读.
2.服务端监测到新的连接之后,不再创建一个新的线程,而是直接将连接绑定到clientSerlector上,这样就不用IO模型中1w个while循环在死等,参见(1)
3.clientSelector被一个while死循环包裹着,如果在某一个时刻有多条连接有数据可读,那么通过clientSelector(1)方法可以轮询出来,进而批量处理,参见(2)
4.数据的读写以内存块为单位,参见(3)
其他的细节部分,因为实在太复杂,所有也就不深究到底了.总之,不建议直接基于JDK原生NIO来进行网络开发,下面总结的原因:
1.JDK的NIO编程需要了解很多的概念,编程复杂,对NIO入门非常不友好,编程模型不友好,ByteBuffer的api简直反人类
2.对NIO编程来说,一个比较合适的线程模型能充分发挥它的优势,而JDK没有给你实现,你需要自己实现,就连简单的自定义协议拆包都要你自己实现
3.JDK的NIO底层由epoll实现,该实现饱受诟病的空轮询bug会导致cpu飙升100%
4.项目庞大之后,自行实现的NIO很容易出现各类bug,维护成本较高.
JDK的NIO犹如带刺的玫瑰,虽然美好,让人向往,但是使用不会让你抓耳挠腮,痛不欲生,正因为如此,Netty横空出世!

Netty编程

那么Netty到底是何方神圣?
用一句话简单的话来说就是:Netty封装了JDK的NIO,让你用的更爽,你不用在写一大堆复杂的代码了.
用官网正式的话来说就是:Netty是一个异步事件驱动的网络应用程序,用于快速开发可维护的高性能服务器和客户端.
下面是我总结的使用Netty不使用JDK原生NIO的原因
1.使用JDK自带的NIO需要理解太多的概念,编程复杂,一步小心bug横飞
2.Netty底层IO模型随意切换,而这一切只需要做微小的改动,改改参数,Netty可以直接从NIO模型变身为IO模型
3.Netty自带的拆包解包,异常检测等机制让你从NIO的繁重细节中脱离出来,让你只需要关心业务逻辑
4.Netty解决了JDK的很多包括空轮询在内的bug
5.Netty底层对线程,selector做了很多细小的优化,精心设计的reactor线程模型做到非常高效的并发处理
6.自带各种协议栈让你处理任何一种通用协议都几乎不用亲自动手
7.Netty社区活跃,遇到问题随时邮件列表或者issue
8.Netty已经历经各大rpc框架,消息中间件,分布式通信中间件线上的广泛验证,健壮性无比强大
接下我们用Netty的版本来实现一下本文开篇的功能吧
首先,引入Maven依赖

<dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.6.Final</version>
    </dependency>

这么一小段代码就实现了我们前面NIO编程中的所有的功能,包括服务端启动,接收新连接,打印客户端传来的数据,.
初学Netty的时候,由于大部分人对NIO编程缺乏经验,因此,将Netty里面的概念与IO模型结合起来可能更好理解
下面是Netty客户端的实现部分

public class TimeClient {
    public static void main(String[] args) throws Exception {
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        EventLoopGroup workerGroup = new NioEventLoopGroup(); // (1)
        
        try {
            Bootstrap b = new Bootstrap(); // (2)
            b.group(workerGroup); // (3)
            b.channel(NioSocketChannel.class); // (4)
            b.option(ChannelOption.SO_KEEPALIVE, true); // (5)
            b.handler(new ChannelInitializer<SocketChannel>() { // (6)
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            
            // Start the client.
            ChannelFuture f = b.connect(host, port).sync(); // (7)

            // Wait until the connection is closed.
            f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}

1.初始化用于连接即I/O工作的"线程池"
2.初始化BootStrap实例,此实例是netty客户端应用开发的入口
3.通过Bootstrap的group方法,设置(1)中初始化的"线程池"
4.指定通道Channel的类型,由于客户端,故而是NioSocketChannel
5.设置SocketChannel的选项
6.设置SocketChannel的处理器,其内部是实际业务开发的"主战场"
7.连接指定的服务地址

下面是Netty服务端实现部分:

EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
    ServerBootstrap b = new ServerBootstrap(); // (2)
    b.group(bossGroup, workerGroup)  // (3)
     .channel(NioServerSocketChannel.class) // (4)
     .handler(new LoggingHandler())    // (5)
     .childHandler(new ChannelInitializer<SocketChannel>() { // (6)
         @Override
         public void initChannel(SocketChannel ch) throws Exception {
             ch.pipeline().addLast(new DiscardServerHandler());
         }
     })
     .option(ChannelOption.SO_BACKLOG, 128)          // (7)
     .childOption(ChannelOption.SO_KEEPALIVE, true); // (8)
    
     // Bind and start to accept incoming connections.
     ChannelFuture f = b.bind(port).sync(); // (9)
    
     // Wait until the server socket is closed.
     // In this example, this does not happen, but you can do that to gracefully
     // shut down your server.
     f.channel().closeFuture().sync();
} finally {
    workerGroup.shutdownGracefully();
    bossGroup.shutdownGracefully();
}

1.初始化用于Acceptor的主"线程池"以用于I/O工作的从"线程池"
2.初始化ServerBootstrap实例,此实例是netty服务端应用开发的入口
3.通过ServerBootstrap的group方法,设置(1)中初始化的主从"线程池"
4.指定通道channel的类型,由于服务端,故而是NioServerSocketChannel
5.设置ServerSocketChannel的处理器
6.设置子通道也就是SocketChannel的处理器,其内部是时机业务开发的"主战场"
7.配置ServerSocketChannel的选项
8.配置子通道也就是SocketChannel的选项
9.绑定并侦听某个端口

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:1136255
帖子:227251
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP