package com.nio.aio.server;
import java.io.IOException;
public class AIOServer {
public static void main(String[] args)throws IOException {
// 设置要监听的端口
int port = 8788;
if (args != null && args.length > 0){
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
// 创建异步服务器处理类
AIOTimeServerHandler aioServer = new AIOTimeServerHandler(port);
// 启动线程
new Thread(aioServer, "AIO-Server-Handler").start();
}
}
package com.nio.aio.server;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
public class AcceptComplectionHandler implements CompletionHandler<AsynchronousSocketChannel, AIOTimeServerHandler> {
@Override
public void completed(AsynchronousSocketChannel result, AIOTimeServerHandler attachment) {
/**
* 为什么要再次调用accept方法呢?
* 原因: 调用该方法以后,如果有新的客户端连接接入,系统将回调我们传入的
*/
attachment.asynchronousServerSocketChannel.accept(attachment, this);
// 创建ByteBuffer对象,预分配1MB的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 进行异步读的操作
/**
* 参数的分析:
* 1 接收缓冲区用于从异步channel中读取数据包
* 2 异步channel携带的附件,通知回调的时候作为参数使用
* 3 接收通知回调的业务handler
*/
result.read(buffer,buffer,new ReadComplectionHandler(result));
}
@Override
public void failed(Throwable exc, AIOTimeServerHandler attachment) {
exc.printStackTrace();
attachment.latch.countDown();
}
}
package com.nio.aio.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;
public class AIOTimeServerHandler implements Runnable {
private int port;
CountDownLatch latch;
AsynchronousServerSocketChannel asynchronousServerSocketChannel;
public AIOTimeServerHandler(int port) {
this.port = port;
try {
// 创建异步服务通道
asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
// 绑定监听端口号
asynchronousServerSocketChannel.bind(new InetSocketAddress(port));
System.out.println("服务端启动成功,端口号是:" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
// 初始化CountDownLatch对象,作用:在完成一组正在执行的操作之前,允许当前的线程一直阻塞。
// 这个例子在这阻塞是防止服务端执行完成退出。
latch = new CountDownLatch(1);
// 用于接收客户端的连接,异步操作
doAccept();
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void doAccept() {
asynchronousServerSocketChannel.accept(this, new AcceptComplectionHandler());
}
}
package com.nio.aio.server;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Date;
public class ReadComplectionHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel channel;
public ReadComplectionHandler(AsynchronousSocketChannel channel) {
if (this.channel == null){
this.channel = channel;
}
}
@Override
public void completed(Integer result, ByteBuffer attachment) {
// 进行flip操作,为后续从缓冲区读取数据做准备
attachment.flip();
// 根据缓冲区可读的字节数创建字节数组
byte[] body = new byte[attachment.remaining()];
// 将缓冲区的可读的字节数组复制到新创建的字节数组中
attachment.get(body);
try {
// 编码
String req = new String(body, "utf-8");
// 打印服务端收到的客户端传来的消息
System.out.println("服务端收到的消息是:" + req);
// 如果收到的消息和服务端预设值的消息相同就返回给客户端当前时间
String currentTime = "AIO TIME TEST".equalsIgnoreCase(req) ? new Date(System.currentTimeMillis()).toString() : "客户端发送的消息不是AIO TIME TEST";
doWrite(currentTime);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
private void doWrite(String currentTime) {
// 非空和去除空格,校验
if (currentTime != null && currentTime.trim().length() > 0){
// 使用默认的编码方式对currentTime编码,将编码以后的结果放到byte数组
byte[] bytes = currentTime.getBytes();
// 分配一个新的字节缓冲区,根据上面的字节数组
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
// 将字节数组中的数据复制到缓冲区中
writeBuffer.put(bytes);
// 进行flip操作,为后续从缓冲区读取数据做准备
writeBuffer.flip();
/**
* 该方法发起一个异步写操作,从给定的缓冲区向该通道写入字节序列
* 参数的分析:
* 1 要检索字节的缓冲区
* 2 连接到I/O操作的对象
* 3 接收通知回调的业务handler
*/
channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
// 如果没有发送完成,继续发送
if (attachment.hasRemaining()){
channel.write(attachment, attachment, this);
}
}
/**
* 当发生异常的时候,对隐藏进行判断,如果是IO异常,关闭链路释放资源
* @param exc
* @param attachment
*/
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
【1】AIO入门案例客户端的代码具体实现以及详细注释 https://blog.csdn.net/wildwolf_001/article/details/81103006
【2】NIO入门案例使用netty最新版本框架代码实现及详细注释 https://blog.csdn.net/wildwolf_001/article/details/81132896
【3】NIO入门案例客户端的代码具体实现以及详细注释 https://blog.csdn.net/wildwolf_001/article/details/81102953
【4】NIO入门案例之分析NIO服务端序列图 https://blog.csdn.net/wildwolf_001/article/details/81069180 |