作者微信 bishe2022

代码功能演示视频在页面下方,请先观看;如需定制开发,联系页面右侧客服
java 之异步套接字编程实例(AIO)

Custom Tab

  异步socket编程,一样分成客户端与服务端。 
   AsynchronousServerSocketChannel  -------服务端socket; 
   AsynchronousSocketChannel------客户端socket. 
   AsynchronousChannelGroup-----socket管理器。服务端socket与客户端socket都由它生成。它的管理需要线程池。它的工作方式之一是把必要的资源交给客户端与服务端的处理器,并调用该处理器进行工作。 
   ExecutorService-----线程池。是socket管理器需要的东西。 
   CompletionHandler-------处理器。它有两个泛型参数A、V。这是个回调函数所用的标准接口。Socket管理器 会把相关实参放到这个A,V的参数中,让用户处理后,然后调用这个处理器的方法进行执行。如果用户有一个方法中的参数的类型是该处理器,那么在其他地方再次调用这个方法,尽管方法不同,但是传给该方法的CompletionHandler的处理器的A、V参数 却是不相同的,不仅是值不同,类型也有可能完全不同。这是学习中的难点。 

   练习中总结:除了服务端与客户端初始化时差别很大,但是在各自与对方通信中,所使用的类都是 客户端socket类。 

  下面的例子,展示了异步方式的服务端与客户端相互通信的例子。客户端是swing程序。 
   调试使用方法:先运行服务器,再运行客服端。在客户端上的文本框中输入字符点“点我”按钮,立即通过0号套接字向服务器发送信息。在调试台中,可看到服务器与客户端的通信情况。 
下面是我测试时服务端的信息 

引用

debug: 
AioAcceptHandler.completed called 
有客户端连接:/127.0.0.1:1606 
AioAcceptHandler.completed called 
有客户端连接:/127.0.0.1:1607 
收到/127.0.0.1:1606的消息:0 
收到/127.0.0.1:1607的消息:1 
收到/127.0.0.1:1606的消息:sd 
收到/127.0.0.1:1606的消息:1111111111

下面是我测试时客户端的信息 

引用

debug: 
收到localhost/127.0.0.1:9008的消息:服务器回应,你输出的是:1 
收到localhost/127.0.0.1:9008的消息:服务器回应,你输出的是:0 
收到localhost/127.0.0.1:9008的消息:服务器回应,你输出的是:sd 
收到localhost/127.0.0.1:9008的消息:服务器回应,你输出的是:1111111111

例子说明: 
  服务端与客户端各有四个类。 
服务端: 
  AioTcpServer---服务器主类。它与客户端通信由回调连接处理器AioAcceptHandler完成。 
  AioAcceptHandler-----连接处理器。处理服务器与客户端的通信。具体的读操作,回调AioReadHandler处理器 
  AioReadHandler-----读处理器。处理连接处理器交给的 读任务。即具体的读客户端的信息由它完成。如果服务器要回应该客户端,需要回调AioWriteHandler写处理器。 
  AioWriteHandler----写处理器。完成读处理器交给的任务,即向客户端回应消息。 

客户端: 
   AioTcpClient-------客户端主类。它与服务器的通讯由回调AioConnectHandler连接处理器完成。具体写信息由客户端socket回调AioSendHandler完成。具体读信息由客户端socket回调AioReadHandler完成。 
   AioConnectHandler-------连接处理器。完成与服务器的通讯。具体的读信息由客户端socket回调读处理器AioReadHandler完成,即完成读取服务器的信息。 
   AioReadHandler-------读处理器,完成读取服务端信息。 
   AioSendHandler----写处理器,向服务端发送信息。 

1.9.3.1 服务端 
AioTcpServer.java 

Java代码  收藏代码

import java.net.InetSocketAddress;   
import java.nio.channels.AsynchronousChannelGroup;   
import java.nio.channels.AsynchronousServerSocketChannel;   
import java.nio.channels.AsynchronousSocketChannel;   
import java.util.concurrent.ExecutorService;   
import java.util.concurrent.Executors;   
import java.util.concurrent.Future;   
/** 
 *     AIO异步socket通讯,分成 用于服务端的socekt与用于客户端的socket,当然这两者都是<br> 
 * 异步的。两者使用时,都用到了同样的异步通道管理器,异步通道管理器通过线程池管理。<br> 
 *    异步通道管理器,可以生成服务端socket与客户端socket。 *  
 *    使用服务端socket或客户端socket都需要一个操作处理器(CompletionHandler),<br> 
 *当有信息时异步通道管理器会把 相关信息传递给操作作处理器。 *  
 *    操作处理器的方法是同一方法,但方法的参数是泛型,随着调用它的方法不同而改变。<br> *  
 *    在AIO中,CompletionHandler这个操作处理器方法,是个泛型接口,当回调函数用。<br> 
 * 使用CompletionHandler的方法,约定是把该方法前一个参数实例传递给A型参数<br> 
 * (attachment),CompletionHandler的另一个参数将是存有该方法的使用情况的实例。 
 *  
 */  
public class AioTcpServer implements Runnable {   
    private AsynchronousChannelGroup asyncChannelGroup;    
    private AsynchronousServerSocketChannel listener;    
    
    public AioTcpServer(int port) throws Exception {   
        //创建线程池  
        ExecutorService executor = Executors.newFixedThreadPool(20);   
        //异步通道管理器  
        asyncChannelGroup = AsynchronousChannelGroup.withThreadPool(executor);   
        //创建 用在服务端的异步Socket.以下简称服务器socket。  
        //异步通道管理器,会把服务端所用到的相关参数  
        listener = AsynchronousServerSocketChannel.open(asyncChannelGroup).  
                bind(new InetSocketAddress(port));   
    }   
   
    public void run() {   
        try {   
  
            AioAcceptHandler acceptHandler = new AioAcceptHandler();  
            //为服务端socket指定接收操作对象.accept原型是:  
            //accept(A attachment, CompletionHandler<AsynchronousSocketChannel,  
            // ? super A> handler)  
            //也就是这里的CompletionHandler的A型参数是实际调用accept方法的第一个参数  
            //即是listener。另一个参数V,就是原型中的客户端socket  
            listener.accept(listener, new AioAcceptHandler());    
            Thread.sleep(400000);   
        } catch (Exception e) {   
            e.printStackTrace();   
        } finally {   
            System.out.println("finished server");  
        }   
    }   
   
    public static void main(String... args) throws Exception {   
        AioTcpServer server = new AioTcpServer(9008);   
        new Thread(server).start();   
    }   
}

AioAcceptHandler.java 

Java代码  收藏代码

import client.AioSendHandler;  
import java.io.IOException;   
import java.io.UnsupportedEncodingException;  
import java.nio.ByteBuffer;   
import java.nio.channels.AsynchronousServerSocketChannel;   
import java.nio.channels.AsynchronousSocketChannel;   
import java.nio.channels.CompletionHandler;   
import java.util.concurrent.ExecutionException;   
import java.util.concurrent.Future;   
   
//这里的参数受实际调用它的函数决定。本例是服务端socket.accetp调用决定  
public class AioAcceptHandler implements CompletionHandler  
        <AsynchronousSocketChannel, AsynchronousServerSocketChannel >   
{   
    private  AsynchronousSocketChannel socket;  
    @Override  
    public void completed(AsynchronousSocketChannel socket,   
        AsynchronousServerSocketChannel attachment)   
    { //注意第一个是客户端socket,第二个是服户端socket  
        try {   
            System.out.println("AioAcceptHandler.completed called");  
            attachment.accept(attachment, this);   
            System.out.println("有客户端连接:" +  
                socket.getRemoteAddress().toString()  
            );   
            startRead(socket);      
        } catch (IOException e) {   
            e.printStackTrace();   
        }   
    }   
   
    @Override  
    public void failed(Throwable exc, AsynchronousServerSocketChannel attachment)   
    {   
        exc.printStackTrace();   
    }   
      
    //不是CompletionHandler的方法  
    public void startRead(AsynchronousSocketChannel socket) {   
        ByteBuffer clientBuffer = ByteBuffer.allocate(1024);   
        //read的原型是  
        //read(ByteBuffer dst, A attachment,  
        //    CompletionHandler<Integer,? super A> handler)   
        //即它的操作处理器,的A型,是实际调用read的第二个参数,即clientBuffer。  
        // V型是存有read的连接情况的参数  
        AioReadHandler rd=new AioReadHandler(socket);  
        socket.read(clientBuffer, clientBuffer, rd);   
        try {               
        } catch (Exception e) {   
            e.printStackTrace();   
        }   
    }   
   
}

AioReadHandler.java 

Java代码  收藏代码

import java.io.UnsupportedEncodingException;  
import java.nio.ByteBuffer;   
import java.nio.channels.AsynchronousSocketChannel;   
import java.nio.channels.CompletionHandler;   
import java.nio.charset.CharacterCodingException;   
import java.nio.charset.Charset;   
import java.nio.charset.CharsetDecoder;   
import java.util.logging.Level;  
import java.util.logging.Logger;  
  
//这里的参数型号,受调用它的函数决定。这里是受客户端socket.read调用  
public class AioReadHandler implements CompletionHandler  
        <Integer,ByteBuffer>  
{   
    private AsynchronousSocketChannel socket;   
    public  String msg;  
   
    public AioReadHandler(AsynchronousSocketChannel socket) {   
        this.socket = socket;   
    }       
    private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();    
      
    @Override  
    public void completed(Integer i, ByteBuffer buf) {   
        if (i > 0) {   
            buf.flip();   
            try {   
                msg=decoder.decode(buf).toString();  
                System.out.println("收到" +   
                        socket.getRemoteAddress().toString() + "的消息:" + msg  
                );   
                buf.compact();   
            } catch (CharacterCodingException e) {   
                e.printStackTrace();   
            } catch (IOException e) {   
                e.printStackTrace();   
            }   
            socket.read(buf, buf, this);   
            try {  
                write(socket);  
            } catch (UnsupportedEncodingException ex) {  
                Logger.getLogger(AioReadHandler.class.getName()).log(Level.SEVERE, null, ex);  
            }  
        } else if (i == -1) {   
            try {   
                System.out.println("客户端断线:" + socket.getRemoteAddress().toString());   
                buf = null;   
            } catch (IOException e) {   
                e.printStackTrace();   
            }   
        }   
    }   
  
    @Override  
    public void failed(Throwable exc, ByteBuffer attachment) {  
         System.out.println("cancelled");   
    }  
   
     //不是CompletionHandler的方法  
    public void write(AsynchronousSocketChannel socket) throws UnsupportedEncodingException{  
        String sendString="服务器回应,你输出的是:"+msg;  
        ByteBuffer clientBuffer=ByteBuffer.wrap(sendString.getBytes("UTF-8"));          
        socket.write(clientBuffer, clientBuffer, new AioWriteHandler(socket));  
    }  
}


AioWriteHandler.java 

Java代码  收藏代码

import java.io.IOException;  
import java.nio.ByteBuffer;  
import java.nio.channels.AsynchronousSocketChannel;  
import java.nio.channels.CompletionHandler;  
  
public class AioWriteHandler implements CompletionHandler  
    <Integer,ByteBuffer>  
{   
    private AsynchronousSocketChannel socket;   
   
    public AioWriteHandler(AsynchronousSocketChannel socket) {   
        this.socket = socket;   
    }   
  
    @Override  
    public void completed(Integer i, ByteBuffer buf) {  
        if (i > 0) {   
            socket.write(buf, buf, this);   
        } else if (i == -1) {   
            try {   
                System.out.println("对端断线:" + socket.getRemoteAddress().toString());   
                buf = null;   
            } catch (IOException e) {   
                e.printStackTrace();   
            }   
        }   
          
    }  
  
    @Override  
    public void failed(Throwable exc, ByteBuffer attachment) {  
        System.out.println("cancelled");   
    }  
      
}

1.9.3.2 客户端 
AioTcpClient.java 

Java代码  收藏代码

import java.awt.BorderLayout;  
import java.awt.FlowLayout;  
import java.awt.event.ActionEvent;  
import java.awt.event.ActionListener;  
import java.io.IOException;  
import java.io.UnsupportedEncodingException;  
import java.net.InetSocketAddress;  
import java.nio.ByteBuffer;  
import java.net.StandardSocketOptions;  
import java.nio.channels.AsynchronousChannelGroup;  
import java.nio.channels.AsynchronousSocketChannel;  
import java.nio.channels.CompletionHandler;  
import java.nio.charset.CharacterCodingException;  
import java.nio.charset.Charset;  
import java.nio.charset.CharsetDecoder;  
import java.util.Timer;  
import java.util.TimerTask;  
import java.util.concurrent.ConcurrentHashMap;  
import java.util.concurrent.ExecutionException;  
import java.util.concurrent.ExecutorService;  
import java.util.concurrent.Executors;  
import java.util.concurrent.Future;  
import java.util.logging.Level;  
import java.util.logging.Logger;  
import javax.swing.JButton;  
import javax.swing.JFrame;  
import javax.swing.JPanel;  
import javax.swing.JTextField;  
  
public class AioTcpClient {  
    public static JTextField jt=new JTextField();  
    public static ConcurrentHashMap<String,AsynchronousSocketChannel>  
            sockets =new ConcurrentHashMap<>();  
    static AioTcpClient me;  
      
    private AsynchronousChannelGroup asyncChannelGroup;  
    public AioTcpClient() throws Exception {  
        //创建线程池  
        ExecutorService executor = Executors.newFixedThreadPool(20);  
        //创建异眇通道管理器     
        asyncChannelGroup = AsynchronousChannelGroup.withThreadPool(executor);  
    }  
      
    private final CharsetDecoder decoder = Charset.forName("GBK").newDecoder();  
      
    public void start(final String ip, final int port) throws Exception {  
        // 启动20000个并发连接,使用20个线程的池子  
        for (int i = 0; i < 2; i++) {  
            try {  
                //客户端socket.当然它是异步方式的。  
                AsynchronousSocketChannel connector = null;  
                if (connector == null || !connector.isOpen()) {  
                    //从异步通道管理器处得到客户端socket  
                    connector = AsynchronousSocketChannel.open(asyncChannelGroup);  
                    sockets.putIfAbsent(String.valueOf(i), connector);  
                      
                    connector.setOption(StandardSocketOptions.TCP_NODELAY,  
                        true);  
                    connector.setOption(StandardSocketOptions.SO_REUSEADDR,  
                        true);  
                    connector.setOption(StandardSocketOptions.SO_KEEPALIVE,  
                        true);  
                    //开始连接服务器。这里的的connect原型是  
                    // connect(SocketAddress remote, A attachment,   
                    //  CompletionHandler<Void,? super A> handler)  
                    // 也就是它的CompletionHandler 的A型参数是由这里的调用方法  
                    //的第二个参数决定。即是connector。客户端连接器。  
                    // V型为null  
                    connector.connect(new InetSocketAddress(ip, port),  
                            connector, new AioConnectHandler(i));  
                }  
            } catch (IOException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
      
    public void work() throws Exception{  
        AioTcpClient client = new AioTcpClient();  
        client.start("localhost", 9008);  
    }  
  
    public void send() throws UnsupportedEncodingException{  
        AsynchronousSocketChannel socket=sockets.get("0");  
        String sendString=jt.getText();  
        ByteBuffer clientBuffer=ByteBuffer.wrap(sendString.getBytes("UTF-8"));          
        socket.write(clientBuffer, clientBuffer, new AioSendHandler(socket));  
    }  
    public   void createPanel() {  
        me=this;  
        JFrame f = new JFrame("Wallpaper");  
        f.getContentPane().setLayout(new BorderLayout());         
          
        JPanel p=new JPanel(new FlowLayout(FlowLayout.LEFT));          
        JButton bt=new JButton("点我");  
        p.add(bt);  
        me=this;  
        bt.addActionListener(new ActionListener(){  
  
            @Override  
            public void actionPerformed(ActionEvent e) {  
               try {  
                    me.send();  
                   
                } catch (Exception ex) {  
                    Logger.getLogger(AioTcpClient.class.getName()).log(Level.SEVERE, null, ex);  
                }  
            }  
          
        });  
          
        bt=new JButton("结束");  
        p.add(bt);  
        me=this;  
        bt.addActionListener(new ActionListener(){  
            @Override  
            public void actionPerformed(ActionEvent e) {                   
            }  
          
        });  
   
        f.getContentPane().add(jt,BorderLayout.CENTER);  
        f.getContentPane().add(p, BorderLayout.EAST);  
          
        f.setSize(450, 300);  
        f.setDefaultCloseOperation (JFrame.EXIT_ON_CLOSE);  
        f.setLocationRelativeTo (null);  
        f.setVisible (true);  
    }  
        
    public static void main(String[] args) {  
        javax.swing.SwingUtilities.invokeLater(new Runnable() {  
            @Override  
            public void run() {       
                AioTcpClient d = null;  
                try {  
                    d = new AioTcpClient();  
                } catch (Exception ex) {  
                    Logger.getLogger(AioTcpClient.class.getName()).log(Level.SEVERE, null, ex);  
                }  
                  
                d.createPanel();  
                try {  
                    d.work();  
                } catch (Exception ex) {  
                    Logger.getLogger(AioTcpClient.class.getName()).log(Level.SEVERE, null, ex);  
                }  
                 
                   
            }  
        });  
    }   
}

AioConnectHandler.java 

Java代码  收藏代码

import java.util.concurrent.*;  
import java.nio.ByteBuffer;  
import java.nio.channels.AsynchronousSocketChannel;  
import java.nio.channels.CompletionHandler;  
  
public class AioConnectHandler implements CompletionHandler  
    <Void,AsynchronousSocketChannel>  
{  
    private Integer content = 0;  
      
    public AioConnectHandler(Integer value){  
        this.content = value;  
    }  
   
    @Override  
    public void completed(Void attachment,AsynchronousSocketChannel connector) {   
        try {    
         connector.write(ByteBuffer.wrap(String.valueOf(content).getBytes())).get();  
         startRead(connector);   
        } catch (ExecutionException e) {   
            e.printStackTrace();   
        } catch (InterruptedException ep) {   
            ep.printStackTrace();   
        }   
    }   
   
    @Override  
    public void failed(Throwable exc, AsynchronousSocketChannel attachment) {   
        exc.printStackTrace();   
    }   
      
    //这不是 CompletionHandler接口的方法。  
    public void startRead(AsynchronousSocketChannel socket) {   
        ByteBuffer clientBuffer = ByteBuffer.allocate(1024);   
        //read的原型是  
        //read(ByteBuffer dst, A attachment,  
        //    CompletionHandler<Integer,? super A> handler)   
        //即它的操作处理器,的A型,是实际调用read的第二个参数,即clientBuffer。  
        // V型是存有read的连接情况的参数  
        socket.read(clientBuffer, clientBuffer, new AioReadHandler(socket));   
        try {   
              
        } catch (Exception e) {   
            e.printStackTrace();   
        }   
    }  
   
}


AioReadHandler.java 

Java代码  收藏代码

import java.io.IOException;   
import java.nio.ByteBuffer;   
import java.nio.channels.AsynchronousSocketChannel;   
import java.nio.channels.CompletionHandler;   
import java.nio.charset.CharacterCodingException;   
import java.nio.charset.Charset;   
import java.nio.charset.CharsetDecoder;   
   
public class AioReadHandler implements CompletionHandler  
    <Integer,ByteBuffer>  
{   
    private AsynchronousSocketChannel socket;   
   
    public AioReadHandler(AsynchronousSocketChannel socket) {   
        this.socket = socket;   
    }   
   
    public void cancelled(ByteBuffer attachment) {   
        System.out.println("cancelled");   
    }   
   
    private CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();   
   
    @Override  
    public void completed(Integer i, ByteBuffer buf) {   
        if (i > 0) {   
            buf.flip();   
            try {   
             System.out.println("收到" + socket.getRemoteAddress().toString() + "的消息:" + decoder.decode(buf));   
                buf.compact();   
            } catch (CharacterCodingException e) {   
                e.printStackTrace();   
            } catch (IOException e) {   
                e.printStackTrace();   
            }   
            socket.read(buf, buf, this);   
        } else if (i == -1) {   
            try {   
                System.out.println("对端断线:" + socket.getRemoteAddress().toString());   
                buf = null;   
            } catch (IOException e) {   
                e.printStackTrace();   
            }   
        }   
    }   
   
    @Override  
    public void failed(Throwable exc, ByteBuffer buf) {   
        System.out.println(exc);   
    }   
  
       
}

AioSendHandler.java(与服务端的写相同) 

Java代码  收藏代码

import java.io.IOException;  
import java.nio.ByteBuffer;  
import java.nio.channels.AsynchronousSocketChannel;  
import java.nio.channels.CompletionHandler;  
import java.nio.charset.CharacterCodingException;  
  
  
public class AioSendHandler implements CompletionHandler  
    <Integer,ByteBuffer>  
{   
    private AsynchronousSocketChannel socket;   
   
    public AioSendHandler(AsynchronousSocketChannel socket) {   
        this.socket = socket;   
    }   
  
    @Override  
    public void completed(Integer i, ByteBuffer buf) {  
        if (i > 0) {   
            socket.write(buf, buf, this);   
        } else if (i == -1) {   
            try {   
                System.out.println("对端断线:" + socket.getRemoteAddress().toString());   
                buf = null;   
            } catch (IOException e) {   
                e.printStackTrace();   
            }   
        }   
          
    }  
  
    @Override  
    public void failed(Throwable exc, ByteBuffer attachment) {  
        System.out.println("cancelled");   
    }  
      
}

转载自:http://yunhaifeiwu.iteye.com/blog/1714664

Home