博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java异步非阻塞IO NIO使用与代码分析
阅读量:5933 次
发布时间:2019-06-19

本文共 13775 字,大约阅读时间需要 45 分钟。

[TOC]


Java异步非阻塞IO NIO使用与代码分析

TimeServer程序的NIO实现完整代码

TimeServer程序来自书本《Netty权威指南》,nio的代码确实有些难懂(这也是后面需要使用Netty的原因之一),不过我对代码加了注释,这样一来对nio的概念及基本的使用都会有一个非常清晰的认识:

服务端程序

TimeServer.java:

package cn.xpleaf.nio;public class TimeServer {    public static void main(String[] args) {        int port = 8080;        if (args != null && args.length > 0) {            try {                port = Integer.valueOf(port);            } catch (Exception e) {                // 采用默认值            }        }        new Thread(new MultiplexerTimeServer(port)).start();    }}

MultiplexerTimeServer.java:

package cn.xpleaf.nio;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.sql.Date;import java.util.Iterator;import java.util.Set;public class MultiplexerTimeServer implements Runnable {    private Selector selector;    private ServerSocketChannel servChannel;    private volatile boolean stop;    /**     * 初始化多路复用器,绑定监听端口     */    public MultiplexerTimeServer(int port) {        try {            // 创建多路复用器Selector            selector = Selector.open();            // 创建ServerSocketChannel,它相当于是所有客户端连接的父管道            servChannel = ServerSocketChannel.open();            // 将ServerSocketChannel设置为异步非阻塞            servChannel.configureBlocking(false);            // 绑定侦听端口,backlog为1024,表示serverchannel容纳的最大的客户端数量为1024(个人查找资料得出的结果,不一定准确)            servChannel.socket().bind(new InetSocketAddress(port), 1024);            // 将ServerSocketChannel注册到selector上,并监听SelectionKey.OP_ACCEPT操作位            servChannel.register(selector, SelectionKey.OP_ACCEPT);            System.out.println("The time server is start in port : " + port);        } catch (IOException e) {            e.printStackTrace();            System.exit(1);        }    }    public void stop() {        this.stop = true;    }    @Override    public void run() {        while (!stop) {            try {                // timeout - 如果为正,则在等待某个通道准备就绪时最多阻塞 timeout 毫秒;如果为零,则无限期地阻塞;必须为非负数(API文档)                // 休眠时间为1s,无论是否有读写等事件发生,selector每隔1s都被唤醒一次                selector.select(1000);                Set
selectedKeys = selector.selectedKeys(); Iterator
it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { // 获取key值,通过对key进行操作,可以获取到其所对应的注册到selector上的channel // 最初是只有一个ServerSocketChannel所对应的key,也就是前面所创建的servChannel,它相当于是所有客户端连接的父管道 // nio的服务端就是通过它来创建与客户端的连接的,因为目前的代码就只有它监听了SelectionKey.OP_ACCEPT操作位 key = it.next(); // 同时把该key值从selectedKeys集合中移除 it.remove(); // 处理该key值 try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } catch (Throwable t) { // TODO Auto-generated catch block t.printStackTrace(); } } // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源 if (selector != null) { try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 对key进行处理 * * @param key * @throws IOException */ public void handleInput(SelectionKey key) throws IOException { // 处理新接入的请求消息 if (key.isValid()) { // 连接建立时 if (key.isAcceptable()) { // 接收新的连接 ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); // 设置SocketChannel为异步非阻塞 sc.configureBlocking(false); // 注册新的连接到多路复用器selector中,监听SelectionKey.OP_READ操作位 sc.register(selector, SelectionKey.OP_READ); } // 读数据 if (key.isReadable()) { // 通过key获取到其注册在Selector上的channel SocketChannel sc = (SocketChannel) key.channel(); // 分配一个新的字节缓冲区,大小为1024KB,即1MB ByteBuffer readBuffer = ByteBuffer.allocate(1024); // 由于前面已经将该SocketChannel设置为异步非阻塞模式,因此它的read是非阻塞的 // 返回值为读取到的字节数 // 返回值不同,意义不同: /** * 大小0:读到了字节,对字节进行编解码 等于0:没有读取到字节,属于正常场景,忽略 为-1:链路已经关闭,需要关闭SocketChannel,释放资源 */ int readBytes = sc.read(readBuffer); if (readBytes > 0) { // 读取到字节,进行解码操作 // 将缓冲区当前的limit设置为position,position设置为0,用于后续对缓冲区的读取操作(我想这是API中定义的吧) readBuffer.flip(); // 根据缓冲区可读的字节个数创建字节数组 byte[] bytes = new byte[readBuffer.remaining()]; // 将缓冲区可读的字节数组复制到新创建的字节数组中 readBuffer.get(bytes); // 将字节数组以utf-8方式转换为字符串 String body = new String(bytes, "utf-8"); System.out.println("The time server receive order : " + body); // 解析客户端发送的指令,同时构造返回结果 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; // 将应答消息异步发送给客户端 doWrite(sc, currentTime); } else if (readBytes < 0) { // 对端链路关闭 key.cancel(); sc.close(); } else { ; // 读到0字节忽略 } } } } /** * 将应答消息异步发送给客户端 * * @param channel * @param response * @throws IOException */ public void doWrite(SocketChannel channel, String response) throws IOException { if (response != null && response.trim().length() > 0) { // 将字符串编码为字节数组 byte[] bytes = response.getBytes(); // 根据字节数组的容量创建ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); // 将字节数组复制到缓冲区 writeBuffer.put(bytes); // flip操作 writeBuffer.flip(); // 将缓冲区的字节数组发送出去 channel.write(writeBuffer); /** * 注意这里并没有处理半包问题,《Netty权威指南》中的说明如下(P35) * 需要指出的是,由于SocketChannel是异步非阻塞的,它并不保证一次能够把需要发送的字节数组发送完,此时会出现半包问题。 * 我们需要注册写操作,不断轮询Selector将没有发送完的ByteBuffer发送完毕,然后可以通过ByteBuffer的hasRemain()方法 * 判断消息是否发送完成。此处仅仅是个简单的入门级例程,没有演示如何处理“写半包”场景,后续的章节会有详细说明。 */ } }}

客户端程序

TimeClient.java:

package cn.xpleaf.nio;public class TimeClient {    public static void main(String[] args) {        int port = 8080;        if(args != null && args.length > 0) {            try {                port = Integer.valueOf(port);            } catch (Exception e) {                // 采用默认值            }        }        new Thread(new TimeClientHandle("127.0.0.1", port)).start();    }}

TimeClientHandle.java:

package cn.xpleaf.nio;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;public class TimeClientHandle implements Runnable {    private String host;    private int port;    private Selector selector;    private SocketChannel socketChannel;    private volatile boolean stop;    /**     * 初始化多路复用器,设置连接的服务端地址和端口     *      * @param host     * @param port     */    public TimeClientHandle(String host, int port) {        this.host = host == null ? "127.0.0.1" : host;        this.port = port;        try {            // 创建多路复用器Selector            selector = Selector.open();            // 创建SocketChannel,用来连接服务端            socketChannel = SocketChannel.open();            // 将SocketChannel设置为异步非阻塞            socketChannel.configureBlocking(false);        } catch (IOException e) {            e.printStackTrace();            System.exit(1);        }    }    @Override    public void run() {        // 先尝试直接连接        try {            doConnect();        } catch (IOException e) {            e.printStackTrace();            System.exit(1);        }        // 当然这里也可以将上面的直接连接代码注释,然后使用下面这两行代码        // 只是需要注意的是,如果一开始没有尝试连接,那么即使后来注册侦听连接也是没有意义的        // 此时没有发送连接请求,服务端根本就不会响应        // socketChannel.connect(new InetSocketAddress(host, port));        // socketChannel.register(selector, SelectionKey.OP_CONNECT);        while (!stop) {            try {                // timeout - 如果为正,则在等待某个通道准备就绪时最多阻塞 timeout 毫秒;如果为零,则无限期地阻塞;必须为非负数(API文档)                // 休眠时间为1s,无论是否有读写等事件发生,selector每隔1s都被唤醒一次                selector.select(1000);                // 获取所有就绪的channel的key                Set
selectedKeys = selector.selectedKeys(); Iterator
it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { // 获取key值,通过对key进行操作,可以获取到其所对应的注册到selector上的channel // 最初是只有一个ServerSocketChannel所对应的key,也就是前面所创建的servChannel,它相当于是所有客户端连接的父管道 // nio的服务端就是通过它来创建与客户端的连接的,因为目前的代码就只有它监听了SelectionKey.OP_ACCEPT操作位 key = it.next(); // 同时把该key值从selectedKeys集合中移除 it.remove(); // 处理该key值 try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } catch (Throwable t) { t.printStackTrace(); } } // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源 if (selector != null) { try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 对key进行处理 * * @param key * @throws IOException */ public void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { // 通过key获取到SocketChannel SocketChannel sc = (SocketChannel) key.channel(); // isConnectable是判断是否处于连接状态 // 如果是,说明服务端已经返回ACK应答消息,后面就需要对连接结果进行判断 if (key.isConnectable()) { // 对连接结果进行判断 if (sc.finishConnect()) { // 注册SocketChannel到多路复用器selector上,并监听SelectionKey.OP_READ操作位 sc.register(selector, SelectionKey.OP_READ); doWrite(sc); } else { // 连接失败,进程退出 System.exit(1); } } // 读数据 if (key.isReadable()) { // 分配一个新的字节缓冲区,大小为1024KB,即1MB ByteBuffer readBuffer = ByteBuffer.allocate(1024); // 由于前面已经将该SocketChannel设置为异步非阻塞模式,因此它的read是非阻塞的 // 返回值为读取到的字节数 // 返回值不同,意义不同: /** * 大小0:读到了字节,对字节进行编解码 等于0:没有读取到字节,属于正常场景,忽略 为-1:链路已经关闭,需要关闭SocketChannel,释放资源 */ int readBytes = sc.read(readBuffer); if (readBytes > 0) { // 读取到字节,进行解码操作 // 将缓冲区当前的limit设置为position,position设置为0,用于后续对缓冲区的读取操作(我想这是API中定义的吧) readBuffer.flip(); // 根据缓冲区可读的字节个数创建字节数组 byte[] bytes = new byte[readBuffer.remaining()]; // 将缓冲区可读的字节数组复制到新创建的字节数组中 readBuffer.get(bytes); // 将字节数组以utf-8方式转换为字符串 String body = new String(bytes, "utf-8"); System.out.println("Now : " + body); } else if (readBytes < 0) { // 对端链路关闭 key.cancel(); sc.close(); } else { ; // 读到0字节忽略 } } } } /** * 连接到服务端 * * @throws IOException */ private void doConnect() throws IOException { // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答 if (socketChannel.connect(new InetSocketAddress(host, port))) { socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); } else { socketChannel.register(selector, SelectionKey.OP_CONNECT); } } /** * 写操作 * * @throws IOException */ private void doWrite(SocketChannel sc) throws IOException { // 将字符串编码为字节数组 byte[] req = "QUERY TIME ORDER".getBytes(); // 根据字节数组的容量创建ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); // 将字节数组复制到缓冲区 writeBuffer.put(req); // flip操作 writeBuffer.flip(); // 将缓冲区的字节数组发送出去 sc.write(writeBuffer); if (!writeBuffer.hasRemaining()) { System.out.println("Send order 2 server succeesd."); } // 也是没有处理"半包写"的问题,可以查看服务端程序的代码注释说明 }}

程序测试

服务端执行:

The time server is start in port : 8080

客户端执行:

Send order 2 server succeesd.Now : 2018-02-10

此时再查看服务端的输出结果:

The time server is start in port : 8080The time server receive order : QUERY TIME ORDER

转载于:https://blog.51cto.com/xpleaf/2070942

你可能感兴趣的文章
webpack动态创建入口
查看>>
聊聊Go工作空间
查看>>
ReactNative学习笔记1
查看>>
vue 开发 2048/围住神经猫 小游戏
查看>>
Android:Fragment懒加载的实现以及自己的封装思路
查看>>
k8s使用kube-router暴露集群中的pod和svc到外部
查看>>
ServiceWorker入门
查看>>
『中级篇』在centos上安装docker(九)
查看>>
Nodejs阿里云OSS获取STS 授权
查看>>
JavaScript this 绑定规则
查看>>
Vue.js单向绑定和双向绑定实例
查看>>
每天一道leetcode88-合并两个有序数组
查看>>
Netty学习笔记
查看>>
java B2B2C 仿淘宝电子商城系统-Spring Cloud与Dubbo对比
查看>>
新氧大数据:2.8%城市占53.7%医美医生,咋找靠谱医生?
查看>>
双重夹击之下,优信如何走出困局?
查看>>
SpringMVC学习笔记1(整合Mybatis&参数绑定)
查看>>
Apple Pay接入详细教程(转)
查看>>
UWP 开发中阶 Chapter 2 - 通过 KeyFrame 实现更复杂的 Storyboard Animation
查看>>
企业级 SpringBoot 教程 (十七)上传文件
查看>>