作者微信 bishe2022

代码功能演示视频在页面下方,请先观看;如需定制开发,联系页面右侧客服
Java AIO-异步通信

Custom Tab

zzzzzzzzzzzzz.png

客户端

package com.test.client;  
  
import java.io.IOException;  
import java.net.InetSocketAddress;  
import java.net.StandardSocketOptions;  
import java.nio.channels.AsynchronousChannelGroup;  
import java.nio.channels.AsynchronousSocketChannel;  
import java.util.concurrent.CountDownLatch;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
  
import org.apache.log4j.Logger;  
  
import com.test.handler.client.ConnectCompleteHandler;  
  
public class Client {  
    private Logger logger = Logger.getLogger(Client.class);  
  
    private String host = "127.0.0.1";  
    private int port = 9999;  
    private int poolSize = 10;  
      
    private static CountDownLatch serverStatus = new CountDownLatch(1);  
      
    public Client() throws Exception {  
        try {  
            //池中的每个线程都在等待IO事件,当IO操作完成后,调用池中的线程处理CompleteHandler  
            ExecutorService threadPool = Executors.newFixedThreadPool(poolSize);  
            AsynchronousChannelGroup asyncChannelGroup   
                    = AsynchronousChannelGroup.withThreadPool(threadPool);  
              
            AsynchronousSocketChannel asyncSocketChannel =   
                    AsynchronousSocketChannel.open(asyncChannelGroup);  
            asyncSocketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true);  
            asyncSocketChannel.connect(new InetSocketAddress(host, port),   
                    null, new ConnectCompleteHandler(asyncSocketChannel));  
        } catch (IOException e) {  
            logger.error("Cilent socket establish failed!");  
            throw e;  
        }  
    }  
      
    public static void main(String[] args) throws Exception {  
        Client client = new Client();  
          
        serverStatus.await();  
    }  
}

客户端处理器

package com.test.handler.client;  
  
import java.io.UnsupportedEncodingException;  
import java.nio.ByteBuffer;  
import java.nio.channels.AsynchronousSocketChannel;  
import java.nio.channels.CompletionHandler;  
  
import org.apache.log4j.Logger;  
  
//CompletionHandler<V,A>  
//V-IO操作的结果,AsynchronousSocketChannel.open创建的异步连接,  
//  asyncSocketChannel.connect实际没有IO操作,因此IO操作的结果为Void  
//A-IO操作附件,  
public class ConnectCompleteHandler   
        implements CompletionHandler<Void, Object> {  
    private Logger logger = Logger.getLogger(ConnectCompleteHandler.class);  
      
    AsynchronousSocketChannel asyncSocketChannel;  
      
    public ConnectCompleteHandler(  
            AsynchronousSocketChannel asyncSocketChannel){  
        this.asyncSocketChannel = asyncSocketChannel;  
    }  
      
    @Override  
    public void completed(Void result, Object attachment) {  
        //使用asyncChannelGroup中保存的线程池中的线程进行处理  
        logger.info("Deal thread of [ConnectCompleteHandler] : "   
                + Thread.currentThread().getName());  
      
        String request = "Hi, this is client!";  
        logger.info("The request sent by client is : " + request);  
        try {  
            byte[] reqBytes = request.getBytes("utf-8");  
            ByteBuffer writeByteBuffer = ByteBuffer.allocate(reqBytes.length);  
            writeByteBuffer.put(reqBytes);  
            writeByteBuffer.flip();  
            asyncSocketChannel.write(writeByteBuffer, asyncSocketChannel,  
                    new WriteCompleteHandler());  
        } catch (UnsupportedEncodingException e) {  
            e.printStackTrace();  
        }  
    }  
  
    @Override  
    public void failed(Throwable exc, Object attachment) {  
        logger.error("Connection error!");  
    }  
}
package com.test.handler.client;  
  
import java.nio.ByteBuffer;  
import java.nio.channels.AsynchronousSocketChannel;  
import java.nio.channels.CompletionHandler;  
  
import org.apache.log4j.Logger;  
  
//CompletionHandler<V,A>  
//V-IO操作的结果,这里是write操作写成功的字节数  
//A-IO操作附件,这里传入AsynchronousSocketChannel便于获得服务端响应  
public class WriteCompleteHandler   
        implements CompletionHandler<Integer, AsynchronousSocketChannel> {  
    private Logger logger = Logger.getLogger(WriteCompleteHandler.class);  
      
    @Override  
    public void completed(Integer result,   
            AsynchronousSocketChannel asyncSocketChannel) {  
        logger.info("Deal thread of [WriteCompleteHandler] : "   
                + Thread.currentThread().getName());  
        logger.info("Write bytes : " + result.intValue());  
        if(result.intValue() == -1){  
            logger.error("Send request to server error!");  
        }else{  
            ByteBuffer readByteBuffer = ByteBuffer.allocate(100);  
            //获取服务端发送的响应  
            asyncSocketChannel.read(readByteBuffer, readByteBuffer,   
                    new ReadCompleteHandler());  
        }  
    }  
  
    @Override  
    public void failed(Throwable exc,   
            AsynchronousSocketChannel asyncSocketChannel) {  
        logger.error("Write message error!");  
        exc.printStackTrace();  
    }  
}
package com.test.handler.client;  
  
import java.io.UnsupportedEncodingException;  
import java.nio.ByteBuffer;  
import java.nio.channels.CompletionHandler;  
  
import org.apache.log4j.Logger;  
  
//CompletionHandler<V,A>  
//V-IO操作的结果,这里是read操作成功读取的字节数  
//A-IO操作附件,由于WriteCompleteHandler中调用asyncSocketChannel.read方法时  
//  传入了ByteBuffer,所以这里是ByteBuffer  
public class ReadCompleteHandler   
        implements CompletionHandler<Integer, ByteBuffer> {  
    private Logger logger = Logger.getLogger(ReadCompleteHandler.class);  
      
    @Override  
    public void completed(Integer result, ByteBuffer respByteBuffer) {  
        logger.info("Deal thread of [ReadCompleteHandler] : "   
                + Thread.currentThread().getName());  
        logger.info("Read bytes : " + result.intValue());  
        if(result.intValue() == -1){  
            logger.error("Get response from server error!");  
        }else{  
            respByteBuffer.flip();  
            byte[] respBytes = new byte[respByteBuffer.remaining()];  
            respByteBuffer.get(respBytes);  
              
            try {  
                String response = new String(respBytes, "utf-8");  
                logger.info("The response sent by server is : " + response);  
            } catch (UnsupportedEncodingException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
  
    @Override  
    public void failed(Throwable exc, ByteBuffer readByteBuffer) {  
        logger.error("Read message error!");  
        exc.printStackTrace();  
    }  
}

服务端

package com.test.server;  
  
import java.net.InetSocketAddress;  
import java.net.StandardSocketOptions;  
import java.nio.channels.AsynchronousChannelGroup;  
import java.nio.channels.AsynchronousServerSocketChannel;  
import java.nio.channels.AsynchronousSocketChannel;  
import java.util.concurrent.CountDownLatch;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
import java.util.concurrent.Future;  
  
import org.apache.log4j.Logger;  
  
import com.test.handler.server.ConnectCompleteHandler;  
  
public class Server {  
    private Logger logger = Logger.getLogger(Server.class);  
      
    private String host = "127.0.0.1";  
    private int port = 9999;  
    private int poolSize = 10;  
      
    private static CountDownLatch serverStatus = new CountDownLatch(1);  
    private AsynchronousServerSocketChannel asyncServerSocketChannel;  
      
    public Server() throws Exception {  
        try{  
            //池中的每个线程都在等待IO事件,当IO操作完成后,触发相应的IO时间,调用池中的线程IO回调函数(CompleteHandler)  
            ExecutorService threadPool = Executors.newFixedThreadPool(poolSize);  
            AsynchronousChannelGroup asyncChannelGroup   
                    = AsynchronousChannelGroup.withThreadPool(threadPool);  
              
            asyncServerSocketChannel = AsynchronousServerSocketChannel.open(asyncChannelGroup);  
            asyncServerSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);  
            asyncServerSocketChannel.bind(new InetSocketAddress(host, port));  
            logger.info("Server start up!");  
        }catch(Exception e){  
            logger.error("Server establish error!");  
            throw e;  
        }  
    }  
  
    public void service(){  
//      这种写法将抛出java.nio.channels.AcceptPendingException异常  
//      只有一个连接建立成功之后,才能再建立下一个连接  
//      while(true){  
//          asyncServerSocketChannel.accept();  
//      }  
//      AIO支持直接返回Future对象,但其实此刻调用并未完成,  
//      while(true){  
//          try {  
//              Future<AsynchronousSocketChannel> acceptFuture = asyncServerSocketChannel.accept();  
////                wait直到调用完成  
//              AsynchronousSocketChannel asyncSocketChannel = acceptFuture.get();  
//              logger.info("Connection complete!");  
//          } catch (Exception e) {  
//              e.printStackTrace();  
//          }  
//      }  
//      由于asyncChannelGroup的存在,回调是更好的实现方式  
        asyncServerSocketChannel.accept(asyncServerSocketChannel, new ConnectCompleteHandler());  
    }  
      
    public static void main(String[] args) throws Exception {  
        Server server = new Server();  
        server.service();  
          
        //由于AIO的方法都是直接返回的,这里必须使用锁以避免线程退出,服务停止  
        //所谓AIO,既发起请求之后,当前线程可以去干别的事,当请求完成后会得到通知  
        //作为一个全职服务端,main线程其实也没什么别的事情可干,也许还是客户端更加适合使用AIO  
        serverStatus.await();  
    }  
}

服务端处理器

package com.test.handler.server;  
  
import java.nio.ByteBuffer;  
import java.nio.channels.AsynchronousServerSocketChannel;  
import java.nio.channels.AsynchronousSocketChannel;  
import java.nio.channels.CompletionHandler;  
  
import org.apache.log4j.Logger;  
  
// CompletionHandler<V,A>  
// V-IO操作的结果,这里是成功建立的连接,AsynchronousSocketChannel  
// A-IO操作附件,这里传入AsynchronousServerSocketChannel便于继续接收请求建立新连接  
public class ConnectCompleteHandler   
        implements CompletionHandler<AsynchronousSocketChannel,   
                AsynchronousServerSocketChannel> {  
      
    private Logger logger = Logger.getLogger(ConnectCompleteHandler.class);  
      
    @Override  
    public void completed(AsynchronousSocketChannel asyncSocketChannel,   
            AsynchronousServerSocketChannel asyncServerSocketChannel) {  
        //使用asyncChannelGroup中保存的线程池中的线程进行处理  
        logger.info("Deal thread of [ConnectCompleteHandler] : "   
                + Thread.currentThread().getName());  
        //当前连接建立成功后,接收下一个请求建立新的连接  
        asyncServerSocketChannel.accept(asyncServerSocketChannel,  
                new ConnectCompleteHandler());  
          
        //ByteBuffer是非线程安全的,如果要在多个线程间共享同一个ByteBuffer,需要考虑线程安全性问题  
        ByteBuffer readByteBuffer = ByteBuffer.allocate(100);  
        //获取客户端发送的请求  
        asyncSocketChannel.read(readByteBuffer, readByteBuffer,   
                new ReadCompleteHandler(asyncSocketChannel));  
    }  
  
    @Override  
    public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) {  
        logger.error("Connection error!");  
    }  
}
package com.test.handler.server;  
  
import java.io.UnsupportedEncodingException;  
import java.nio.ByteBuffer;  
import java.nio.channels.AsynchronousSocketChannel;  
import java.nio.channels.CompletionHandler;  
  
import org.apache.log4j.Logger;  
  
//CompletionHandler<V,A>  
//V-IO操作的结果,这里是read操作成功读取的字节数  
//A-IO操作附件,由于ConnectCompleteHandler中调用asyncSocketChannel.read方法时  
//  传入了ByteBuffer,所以这里为ByteBuffer  
public class ReadCompleteHandler   
        implements CompletionHandler<Integer, ByteBuffer> {  
      
    private Logger logger = Logger.getLogger(ReadCompleteHandler.class);  
    private AsynchronousSocketChannel asyncSocketChannel;  
      
    public ReadCompleteHandler(AsynchronousSocketChannel asyncSocketChannel){  
        this.asyncSocketChannel = asyncSocketChannel;  
    }  
      
    @Override  
    public void completed(Integer result, ByteBuffer readByteBuffer) {  
        logger.info("Deal thread of [ReadCompleteHandler] : "   
                + Thread.currentThread().getName());  
        logger.info("Read bytes : " + result.intValue());  
        if(result.intValue() == -1){  
            logger.error("Get request from client error!");  
        }else{  
            readByteBuffer.flip();  
            byte[] reqBytes = new byte[readByteBuffer.remaining()];  
            readByteBuffer.get(reqBytes);  
              
            try {  
                String request = new String(reqBytes, "utf-8");  
                logger.info("The request sent by client is : " + request);  
              
              
                String response = "Hi, this is server!";  
                logger.info("The response has been sent back to client is : "   
                        + response);  
                byte[] respBytes = response.getBytes("utf-8");  
                ByteBuffer writeByteBuffer = ByteBuffer.allocate(respBytes.length);  
                writeByteBuffer.put(respBytes);  
                writeByteBuffer.flip();  
//              asyncSocketChannel.write(writeByteBuffer);  
                asyncSocketChannel.write(writeByteBuffer, null, new WriteCompleteHandler());  
            } catch (UnsupportedEncodingException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
  
    @Override  
    public void failed(Throwable exc, ByteBuffer readByteBuffer) {  
        logger.error("Read message error!");  
        exc.printStackTrace();  
    }  
}
package com.test.handler.server;  
  
import java.nio.channels.CompletionHandler;  
  
import org.apache.log4j.Logger;  
  
//CompletionHandler<V,A>  
//V-IO操作的结果,这里是write操作写成功的字节数  
//A-IO操作附件,  
public class WriteCompleteHandler   
        implements CompletionHandler<Integer, Object> {  
      
    private Logger logger = Logger.getLogger(WriteCompleteHandler.class);  
      
    @Override  
    public void completed(Integer result, Object attachment) {  
        logger.info("Deal thread of [WriteCompleteHandler] : "   
                + Thread.currentThread().getName());  
        logger.info("Write bytes : " + result.intValue());  
        if(result.intValue() == -1)  
            logger.info("Send response to client error!" );  
        else  
            logger.info("The response has been sent back to client successfully!" );  
    }  
  
    @Override  
    public void failed(Throwable exc, Object attachment) {  
        logger.error("Write message error!");  
        exc.printStackTrace();  
    }  
}

log4j日志配置文件

log4j.rootLogger=info,logOutput  
  
#log console out put   
log4j.appender.logOutput=org.apache.log4j.ConsoleAppender  
log4j.appender.logOutput.layout=org.apache.log4j.PatternLayout  
log4j.appender.logOutput.layout.ConversionPattern=%p%d{[yy-MM-dd HH:mm:ss]}[%c] -> %m%n

Server端运行结果

INFO[16-08-04 01:20:22][com.test.server.Server] -> Server start up!  
INFO[16-08-04 01:20:26][com.test.handler.server.ConnectCompleteHandler] -> 
Deal thread of [ConnectCompleteHandler] : pool-1-thread-1  
INFO[16-08-04 01:20:26][com.test.handler.server.ReadCompleteHandler] -> 
Deal thread of [ReadCompleteHandler] : pool-1-thread-2  
INFO[16-08-04 01:20:26][com.test.handler.server.ReadCompleteHandler] -> 
Read bytes : 19  
INFO[16-08-04 01:20:26][com.test.handler.server.ReadCompleteHandler] -> 
The request sent by client is : Hi, this is client!  
INFO[16-08-04 01:20:26][com.test.handler.server.ReadCompleteHandler] -> 
The response has been sent back to client is : Hi, this is server!  
INFO[16-08-04 01:20:26][com.test.handler.server.WriteCompleteHandler] -> 
Deal thread of [WriteCompleteHandler] : pool-1-thread-3  
INFO[16-08-04 01:20:26][com.test.handler.server.WriteCompleteHandler] -> 
Write bytes : 19  
INFO[16-08-04 01:20:26][com.test.handler.server.WriteCompleteHandler] -> 
The response has been sent back to client successfully!

Client端运行结果

INFO[16-08-04 01:20:26][com.test.handler.client.ConnectCompleteHandler] -> 
Deal thread of [ConnectCompleteHandler] : pool-1-thread-1  
INFO[16-08-04 01:20:26][com.test.handler.client.ConnectCompleteHandler] -> 
The request sent by client is : Hi, this is client!  
INFO[16-08-04 01:20:26][com.test.handler.client.WriteCompleteHandler] -> 
Deal thread of [WriteCompleteHandler] : pool-1-thread-2  
INFO[16-08-04 01:20:26][com.test.handler.client.WriteCompleteHandler] -> 
Write bytes : 19  
INFO[16-08-04 01:20:26][com.test.handler.client.ReadCompleteHandler] -> 
Deal thread of [ReadCompleteHandler] : pool-1-thread-3  
INFO[16-08-04 01:20:26][com.test.handler.client.ReadCompleteHandler] -> 
Read bytes : 19  
INFO[16-08-04 01:20:26][com.test.handler.client.ReadCompleteHandler] -> 
The response sent by server is : Hi, this is server!

转载自:http://blog.csdn.net/a19881029/article/details/52099795

Home