Java AIO 实例(转)

论坛 期权论坛 脚本     
匿名技术用户   2021-1-4 00:26   226   0

转载:http://m.blog.csdn.net/article/details?id=51512200

AIO异步非阻塞IO实例:客户端发送数学表达式,经过服务端接收计算后返回客户端。

1、服务端

包括,Server、ServerHandler、ServerWriteHandler、ServerReadHandler、AcceptHandler、Calculator

1.1 启动程序:

public class Server {

 private static int DEFAULT_PORT = 12345;
 private static AsyncServerHandler serverHandle;
 public volatile static long clientCount = 0;
 public static void start(){
  start(DEFAULT_PORT);
 }
 public static synchronized void start(int port){
  if(serverHandle!=null)
   return;
  serverHandle = new AsyncServerHandler(port);
  new Thread(serverHandle,"Server").start();
 }
 public static void main(String[] args){
  Server.start();
 }

}

1.2 AsyncServerHandler

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;

public class AsyncServerHandler implements Runnable{
 public CountDownLatch latch;
 public AsynchronousServerSocketChannel channel;
 public AsyncServerHandler(int port) {
  try {
   //创建服务端通道
   channel = AsynchronousServerSocketChannel.open();
   //绑定端口
   channel.bind(new InetSocketAddress(port));
   System.out.println("服务器已启动,端口号:" + port);
  } catch (IOException e) {
   e.printStackTrace();
  }
 }
 @Override
 public void run() {
  //CountDownLatch初始化
  //它的作用:在完成一组正在执行的操作之前,允许当前的现场一直阻塞
  //此处,让现场在此阻塞,防止服务端执行完成后退出
  //也可以使用while(true)+sleep 
  //生成环境就不需要担心这个问题,以为服务端是不会退出的
  latch = new CountDownLatch(1);
  //用于接收客户端的连接
  channel.accept(this,new AcceptHandler());
  try {
   latch.await();
  } catch (InterruptedException e) {
   e.printStackTrace();
  }
 }
}

1.3 AcceptHandler

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
//作为handler接收客户端连接
public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler>{

 @Override
 public void completed(AsynchronousSocketChannel channel, AsyncServerHandler serverHandler) {
  //继续接受其他客户端的请求
  Server.clientCount++;
  System.out.println("连接的客户端数:" + Server.clientCount);
  serverHandler.channel.accept(serverHandler, this);
  //创建新的Buffer
  ByteBuffer buffer = ByteBuffer.allocate(1024);
  //异步读  第三个参数为接收消息回调的业务Handler
  channel.read(buffer, buffer, new ServerReadHandler(channel));
 }

 @Override
 public void failed(Throwable exc, AsyncServerHandler serverHandler) {
  exc.printStackTrace();
  serverHandler.latch.countDown();
 }

}

1.4 ServerReadHandler

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class ServerReadHandler implements CompletionHandler<Integer, ByteBuffer> {
 //用于读取半包消息和发送应答
 private AsynchronousSocketChannel channel;
 public ServerReadHandler(AsynchronousSocketChannel channel) {
   this.channel = channel;
 }
 //读取到消息后的处理
 @Override
 public void completed(Integer result, ByteBuffer attachment) {
  //flip操作
  attachment.flip();
  //根据
  byte[] message = new byte[attachment.remaining()];
  attachment.get(message);
  try {
   String expression = new String(message, "UTF-8");
   System.out.println("服务器收到消息: " + expression);
   String calrResult = null;
   try{
    calrResult = Calculator.cal(expression).toString();
   }catch(Exception e){
    calrResult = "计算错误:" + e.getMessage();
   }
   //向客户端发送消息
   doWrite(calrResult);
  } catch (UnsupportedEncodingException e) {
   e.printStackTrace();
  }
 }
 //发送消息
 private void doWrite(String result) {
  byte[] bytes = result.getBytes();
  ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
  writeBuffer.put(bytes);
  writeBuffer.flip();
  //异步写数据 参数与前面的read一样
  channel.write(writeBuffer, writeBuffer,new ServerWriteHandler(channel));
 }
 @Override
 public void failed(Throwable exc, ByteBuffer attachment) {
  try {
   this.channel.close();
  } catch (IOException e) {
   e.printStackTrace();
  }
 }
}

1.5 ServerWriteHandler

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class ServerWriteHandler implements CompletionHandler<Integer, ByteBuffer>{

 private AsynchronousSocketChannel channel;
 
 public ServerWriteHandler(AsynchronousSocketChannel channel) {
   this.channel = channel;
 }
 
 @Override
 public void completed(Integer result, ByteBuffer buffer) {
  //如果没有发送完,就继续发送直到完成
  if (buffer.hasRemaining())
   channel.write(buffer, buffer, this);
  else{
   //创建新的Buffer
   ByteBuffer readBuffer = ByteBuffer.allocate(1024);
   //异步读  第三个参数为接收消息回调的业务Handler
   channel.read(readBuffer, readBuffer, new ServerReadHandler(channel));
  }
 }
 @Override
 public void failed(Throwable exc, ByteBuffer attachment) {
  try {
   channel.close();
  } catch (IOException e) {
  }
 }

}

1.6 Calculator

import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;

public class Calculator {
 private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript");
    public static Object cal(String expression) throws ScriptException{
        return jse.eval(expression);
    }
}

2、客服端

包括 Client、ClientHandler、ClientWriteHandler、ClientReadHandler

2.1 启动器

import java.util.Scanner;

public class Client {

 private static String DEFAULT_HOST = "127.0.0.1";
 private static int DEFAULT_PORT = 12345;
 private static AsyncClientHandler clientHandle;
 public static void start(){
  start(DEFAULT_HOST,DEFAULT_PORT);
 }
 public static synchronized void start(String ip,int port){
  if(clientHandle!=null)
   return;
  clientHandle = new AsyncClientHandler(ip,port);
  new Thread(clientHandle,"Client").start();
 }
 //向服务器发送消息
 public static boolean sendMsg(String msg) throws Exception{
  if(msg.equals("q")) return false;
  clientHandle.sendMsg(msg);
  return true;
 }
 @SuppressWarnings("resource")
 public static void main(String[] args) throws Exception{
  Client.start();
  System.out.println("请输入请求消息:");
  Scanner scanner = new Scanner(System.in);
  while(Client.sendMsg(scanner.nextLine()));
 }
}

2.2 AsyncClientHandler

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;

public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable{
 private AsynchronousSocketChannel clientChannel;
 private String host;
 private int port;
 private CountDownLatch latch;
 public AsyncClientHandler(String host, int port) {
  this.host = host;
  this.port = port;
  try {
   //创建异步的客户端通道
   clientChannel = AsynchronousSocketChannel.open();
  } catch (IOException e) {
   e.printStackTrace();
  }
 }
 @Override
 public void run() {
  //创建CountDownLatch等待
  latch = new CountDownLatch(1);
  //发起异步连接操作,回调参数就是这个类本身,如果连接成功会回调completed方法
  clientChannel.connect(new InetSocketAddress(host, port), this, this);
  try {
   latch.await();
  } catch (InterruptedException e1) {
   e1.printStackTrace();
  }
  try {
   clientChannel.close();
  } catch (IOException e) {
   e.printStackTrace();
  }
 }
 //连接服务器成功
 //意味着TCP三次握手完成
 @Override
 public void completed(Void result, AsyncClientHandler attachment) {
  System.out.println("客户端成功连接到服务器...");
 }
 //连接服务器失败
 @Override
 public void failed(Throwable exc, AsyncClientHandler attachment) {
  System.err.println("连接服务器失败...");
  exc.printStackTrace();
  try {
   clientChannel.close();
   latch.countDown();
  } catch (IOException e) {
   e.printStackTrace();
  }
 }
 //向服务器发送消息
 public void sendMsg(String msg){
  byte[] req = msg.getBytes();
  ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
  writeBuffer.put(req);
  writeBuffer.flip();
  //异步写
  clientChannel.write(writeBuffer, writeBuffer,new ClientWriteHandler(clientChannel, latch));
 }
}

2.3 ClientWriteHandler

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;

public class ClientWriteHandler implements CompletionHandler<Integer, ByteBuffer>{
 private AsynchronousSocketChannel clientChannel;
 private CountDownLatch latch;
 public ClientWriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
  this.clientChannel = clientChannel;
  this.latch = latch;
 }
 @Override
 public void completed(Integer result, ByteBuffer buffer) {
  //完成全部数据的写入
  if (buffer.hasRemaining()) {
   clientChannel.write(buffer, buffer, this);
  }
  else {
   //读取数据
   ByteBuffer readBuffer = ByteBuffer.allocate(1024);
   clientChannel.read(readBuffer,readBuffer,new ClientReadHandler(clientChannel, latch));
  }
 }
 @Override
 public void failed(Throwable exc, ByteBuffer attachment) {
  System.err.println("数据发送失败...");
  try {
   clientChannel.close();
   latch.countDown();
  } catch (IOException e) {
  }
 }
}

2.4 ClientReadHandler

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;

public class ClientReadHandler implements CompletionHandler<Integer, ByteBuffer> {
 private AsynchronousSocketChannel clientChannel;
 private CountDownLatch latch;
 public ClientReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
  this.clientChannel = clientChannel;
  this.latch = latch;
 }
 @Override
 public void completed(Integer result,ByteBuffer buffer) {
  buffer.flip();
  byte[] bytes = new byte[buffer.remaining()];
  buffer.get(bytes);
  String body;
  try {
   body = new String(bytes,"UTF-8");
   System.out.println("客户端收到结果:"+ body);
  } catch (UnsupportedEncodingException e) {
   e.printStackTrace();
  }
 }
 @Override
 public void failed(Throwable exc,ByteBuffer attachment) {
  System.err.println("数据读取失败...");
  try {
   clientChannel.close();
   latch.countDown();
  } catch (IOException e) {
  }
 }
}

3、执行结果

服务器已启动,端口号:12345
请输入请求消息:
客户端成功连接到服务器...
连接的客户端数:1
123456+789+456
服务器收到消息: 123456+789+456
客户端收到结果:124701
9526*56
服务器收到消息: 9526*56
客户端收到结果:533456

分享到 :
0 人收藏
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

积分:7942463
帖子:1588486
精华:0
期权论坛 期权论坛
发布
内容

下载期权论坛手机APP