Java AIO-异步通信
客户端
package com.test.client; import java.io.IOException; import java.net.InetSocketAddress; import java.net.StandardSocketOptions; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousSocketChannel; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.log4j.Logger; import com.test.handler.client.ConnectCompleteHandler; public class Client { private Logger logger = Logger.getLogger(Client.class); private String host = "127.0.0.1"; private int port = 9999; private int poolSize = 10; private static CountDownLatch serverStatus = new CountDownLatch(1); public Client() throws Exception { try { //池中的每个线程都在等待IO事件,当IO操作完成后,调用池中的线程处理CompleteHandler ExecutorService threadPool = Executors.newFixedThreadPool(poolSize); AsynchronousChannelGroup asyncChannelGroup = AsynchronousChannelGroup.withThreadPool(threadPool); AsynchronousSocketChannel asyncSocketChannel = AsynchronousSocketChannel.open(asyncChannelGroup); asyncSocketChannel.setOption(StandardSocketOptions.TCP_NODELAY, true); asyncSocketChannel.connect(new InetSocketAddress(host, port), null, new ConnectCompleteHandler(asyncSocketChannel)); } catch (IOException e) { logger.error("Cilent socket establish failed!"); throw e; } } public static void main(String[] args) throws Exception { Client client = new Client(); serverStatus.await(); } }
客户端处理器
package com.test.handler.client; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import org.apache.log4j.Logger; //CompletionHandler<V,A> //V-IO操作的结果,AsynchronousSocketChannel.open创建的异步连接, // asyncSocketChannel.connect实际没有IO操作,因此IO操作的结果为Void //A-IO操作附件, public class ConnectCompleteHandler implements CompletionHandler<Void, Object> { private Logger logger = Logger.getLogger(ConnectCompleteHandler.class); AsynchronousSocketChannel asyncSocketChannel; public ConnectCompleteHandler( AsynchronousSocketChannel asyncSocketChannel){ this.asyncSocketChannel = asyncSocketChannel; } @Override public void completed(Void result, Object attachment) { //使用asyncChannelGroup中保存的线程池中的线程进行处理 logger.info("Deal thread of [ConnectCompleteHandler] : " + Thread.currentThread().getName()); String request = "Hi, this is client!"; logger.info("The request sent by client is : " + request); try { byte[] reqBytes = request.getBytes("utf-8"); ByteBuffer writeByteBuffer = ByteBuffer.allocate(reqBytes.length); writeByteBuffer.put(reqBytes); writeByteBuffer.flip(); asyncSocketChannel.write(writeByteBuffer, asyncSocketChannel, new WriteCompleteHandler()); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, Object attachment) { logger.error("Connection error!"); } }
package com.test.handler.client; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import org.apache.log4j.Logger; //CompletionHandler<V,A> //V-IO操作的结果,这里是write操作写成功的字节数 //A-IO操作附件,这里传入AsynchronousSocketChannel便于获得服务端响应 public class WriteCompleteHandler implements CompletionHandler<Integer, AsynchronousSocketChannel> { private Logger logger = Logger.getLogger(WriteCompleteHandler.class); @Override public void completed(Integer result, AsynchronousSocketChannel asyncSocketChannel) { logger.info("Deal thread of [WriteCompleteHandler] : " + Thread.currentThread().getName()); logger.info("Write bytes : " + result.intValue()); if(result.intValue() == -1){ logger.error("Send request to server error!"); }else{ ByteBuffer readByteBuffer = ByteBuffer.allocate(100); //获取服务端发送的响应 asyncSocketChannel.read(readByteBuffer, readByteBuffer, new ReadCompleteHandler()); } } @Override public void failed(Throwable exc, AsynchronousSocketChannel asyncSocketChannel) { logger.error("Write message error!"); exc.printStackTrace(); } }
package com.test.handler.client; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.CompletionHandler; import org.apache.log4j.Logger; //CompletionHandler<V,A> //V-IO操作的结果,这里是read操作成功读取的字节数 //A-IO操作附件,由于WriteCompleteHandler中调用asyncSocketChannel.read方法时 // 传入了ByteBuffer,所以这里是ByteBuffer public class ReadCompleteHandler implements CompletionHandler<Integer, ByteBuffer> { private Logger logger = Logger.getLogger(ReadCompleteHandler.class); @Override public void completed(Integer result, ByteBuffer respByteBuffer) { logger.info("Deal thread of [ReadCompleteHandler] : " + Thread.currentThread().getName()); logger.info("Read bytes : " + result.intValue()); if(result.intValue() == -1){ logger.error("Get response from server error!"); }else{ respByteBuffer.flip(); byte[] respBytes = new byte[respByteBuffer.remaining()]; respByteBuffer.get(respBytes); try { String response = new String(respBytes, "utf-8"); logger.info("The response sent by server is : " + response); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } } @Override public void failed(Throwable exc, ByteBuffer readByteBuffer) { logger.error("Read message error!"); exc.printStackTrace(); } }
服务端
package com.test.server; import java.net.InetSocketAddress; import java.net.StandardSocketOptions; import java.nio.channels.AsynchronousChannelGroup; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.log4j.Logger; import com.test.handler.server.ConnectCompleteHandler; public class Server { private Logger logger = Logger.getLogger(Server.class); private String host = "127.0.0.1"; private int port = 9999; private int poolSize = 10; private static CountDownLatch serverStatus = new CountDownLatch(1); private AsynchronousServerSocketChannel asyncServerSocketChannel; public Server() throws Exception { try{ //池中的每个线程都在等待IO事件,当IO操作完成后,触发相应的IO时间,调用池中的线程IO回调函数(CompleteHandler) ExecutorService threadPool = Executors.newFixedThreadPool(poolSize); AsynchronousChannelGroup asyncChannelGroup = AsynchronousChannelGroup.withThreadPool(threadPool); asyncServerSocketChannel = AsynchronousServerSocketChannel.open(asyncChannelGroup); asyncServerSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); asyncServerSocketChannel.bind(new InetSocketAddress(host, port)); logger.info("Server start up!"); }catch(Exception e){ logger.error("Server establish error!"); throw e; } } public void service(){ // 这种写法将抛出java.nio.channels.AcceptPendingException异常 // 只有一个连接建立成功之后,才能再建立下一个连接 // while(true){ // asyncServerSocketChannel.accept(); // } // AIO支持直接返回Future对象,但其实此刻调用并未完成, // while(true){ // try { // Future<AsynchronousSocketChannel> acceptFuture = asyncServerSocketChannel.accept(); //// wait直到调用完成 // AsynchronousSocketChannel asyncSocketChannel = acceptFuture.get(); // logger.info("Connection complete!"); // } catch (Exception e) { // e.printStackTrace(); // } // } // 由于asyncChannelGroup的存在,回调是更好的实现方式 asyncServerSocketChannel.accept(asyncServerSocketChannel, new ConnectCompleteHandler()); } public static void main(String[] args) throws Exception { Server server = new Server(); server.service(); //由于AIO的方法都是直接返回的,这里必须使用锁以避免线程退出,服务停止 //所谓AIO,既发起请求之后,当前线程可以去干别的事,当请求完成后会得到通知 //作为一个全职服务端,main线程其实也没什么别的事情可干,也许还是客户端更加适合使用AIO serverStatus.await(); } }
服务端处理器
package com.test.handler.server; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import org.apache.log4j.Logger; // CompletionHandler<V,A> // V-IO操作的结果,这里是成功建立的连接,AsynchronousSocketChannel // A-IO操作附件,这里传入AsynchronousServerSocketChannel便于继续接收请求建立新连接 public class ConnectCompleteHandler implements CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel> { private Logger logger = Logger.getLogger(ConnectCompleteHandler.class); @Override public void completed(AsynchronousSocketChannel asyncSocketChannel, AsynchronousServerSocketChannel asyncServerSocketChannel) { //使用asyncChannelGroup中保存的线程池中的线程进行处理 logger.info("Deal thread of [ConnectCompleteHandler] : " + Thread.currentThread().getName()); //当前连接建立成功后,接收下一个请求建立新的连接 asyncServerSocketChannel.accept(asyncServerSocketChannel, new ConnectCompleteHandler()); //ByteBuffer是非线程安全的,如果要在多个线程间共享同一个ByteBuffer,需要考虑线程安全性问题 ByteBuffer readByteBuffer = ByteBuffer.allocate(100); //获取客户端发送的请求 asyncSocketChannel.read(readByteBuffer, readByteBuffer, new ReadCompleteHandler(asyncSocketChannel)); } @Override public void failed(Throwable exc, AsynchronousServerSocketChannel attachment) { logger.error("Connection error!"); } }
package com.test.handler.server; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import org.apache.log4j.Logger; //CompletionHandler<V,A> //V-IO操作的结果,这里是read操作成功读取的字节数 //A-IO操作附件,由于ConnectCompleteHandler中调用asyncSocketChannel.read方法时 // 传入了ByteBuffer,所以这里为ByteBuffer public class ReadCompleteHandler implements CompletionHandler<Integer, ByteBuffer> { private Logger logger = Logger.getLogger(ReadCompleteHandler.class); private AsynchronousSocketChannel asyncSocketChannel; public ReadCompleteHandler(AsynchronousSocketChannel asyncSocketChannel){ this.asyncSocketChannel = asyncSocketChannel; } @Override public void completed(Integer result, ByteBuffer readByteBuffer) { logger.info("Deal thread of [ReadCompleteHandler] : " + Thread.currentThread().getName()); logger.info("Read bytes : " + result.intValue()); if(result.intValue() == -1){ logger.error("Get request from client error!"); }else{ readByteBuffer.flip(); byte[] reqBytes = new byte[readByteBuffer.remaining()]; readByteBuffer.get(reqBytes); try { String request = new String(reqBytes, "utf-8"); logger.info("The request sent by client is : " + request); String response = "Hi, this is server!"; logger.info("The response has been sent back to client is : " + response); byte[] respBytes = response.getBytes("utf-8"); ByteBuffer writeByteBuffer = ByteBuffer.allocate(respBytes.length); writeByteBuffer.put(respBytes); writeByteBuffer.flip(); // asyncSocketChannel.write(writeByteBuffer); asyncSocketChannel.write(writeByteBuffer, null, new WriteCompleteHandler()); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } } @Override public void failed(Throwable exc, ByteBuffer readByteBuffer) { logger.error("Read message error!"); exc.printStackTrace(); } }
package com.test.handler.server; import java.nio.channels.CompletionHandler; import org.apache.log4j.Logger; //CompletionHandler<V,A> //V-IO操作的结果,这里是write操作写成功的字节数 //A-IO操作附件, public class WriteCompleteHandler implements CompletionHandler<Integer, Object> { private Logger logger = Logger.getLogger(WriteCompleteHandler.class); @Override public void completed(Integer result, Object attachment) { logger.info("Deal thread of [WriteCompleteHandler] : " + Thread.currentThread().getName()); logger.info("Write bytes : " + result.intValue()); if(result.intValue() == -1) logger.info("Send response to client error!" ); else logger.info("The response has been sent back to client successfully!" ); } @Override public void failed(Throwable exc, Object attachment) { logger.error("Write message error!"); exc.printStackTrace(); } }
log4j日志配置文件
log4j.rootLogger=info,logOutput #log console out put log4j.appender.logOutput=org.apache.log4j.ConsoleAppender log4j.appender.logOutput.layout=org.apache.log4j.PatternLayout log4j.appender.logOutput.layout.ConversionPattern=%p%d{[yy-MM-dd HH:mm:ss]}[%c] -> %m%n
Server端运行结果
INFO[16-08-04 01:20:22][com.test.server.Server] -> Server start up! INFO[16-08-04 01:20:26][com.test.handler.server.ConnectCompleteHandler] -> Deal thread of [ConnectCompleteHandler] : pool-1-thread-1 INFO[16-08-04 01:20:26][com.test.handler.server.ReadCompleteHandler] -> Deal thread of [ReadCompleteHandler] : pool-1-thread-2 INFO[16-08-04 01:20:26][com.test.handler.server.ReadCompleteHandler] -> Read bytes : 19 INFO[16-08-04 01:20:26][com.test.handler.server.ReadCompleteHandler] -> The request sent by client is : Hi, this is client! INFO[16-08-04 01:20:26][com.test.handler.server.ReadCompleteHandler] -> The response has been sent back to client is : Hi, this is server! INFO[16-08-04 01:20:26][com.test.handler.server.WriteCompleteHandler] -> Deal thread of [WriteCompleteHandler] : pool-1-thread-3 INFO[16-08-04 01:20:26][com.test.handler.server.WriteCompleteHandler] -> Write bytes : 19 INFO[16-08-04 01:20:26][com.test.handler.server.WriteCompleteHandler] -> The response has been sent back to client successfully!
Client端运行结果
INFO[16-08-04 01:20:26][com.test.handler.client.ConnectCompleteHandler] -> Deal thread of [ConnectCompleteHandler] : pool-1-thread-1 INFO[16-08-04 01:20:26][com.test.handler.client.ConnectCompleteHandler] -> The request sent by client is : Hi, this is client! INFO[16-08-04 01:20:26][com.test.handler.client.WriteCompleteHandler] -> Deal thread of [WriteCompleteHandler] : pool-1-thread-2 INFO[16-08-04 01:20:26][com.test.handler.client.WriteCompleteHandler] -> Write bytes : 19 INFO[16-08-04 01:20:26][com.test.handler.client.ReadCompleteHandler] -> Deal thread of [ReadCompleteHandler] : pool-1-thread-3 INFO[16-08-04 01:20:26][com.test.handler.client.ReadCompleteHandler] -> Read bytes : 19 INFO[16-08-04 01:20:26][com.test.handler.client.ReadCompleteHandler] -> The response sent by server is : Hi, this is server!