本文共 3653 字,大约阅读时间需要 12 分钟。
package com.clj;import java.io.IOException;import java.io.UnsupportedEncodingException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;/** * NIO客户端线程类 * * @author chenlujun * @version [版本号, 2014年12月18日] * @see [相关类/方法] * @since [产品/模块版本] */public class NIOSocketClient extends Thread { private String IP="192.168.1.85";//服务器IP private int PORT=8989;//服务器端口 private SocketChannel socketChannel; private Selector selector; /** 不带参数构造函数 */ NIOSocketClient() { try { initClient(); } catch (ClosedChannelException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } /** 带参数构造函数 * @param url 服务器IP * @param port 客户端端口 */ NIOSocketClient(String serverIp,int port) { this.IP=serverIp; this.PORT=port; try { initClient(); } catch (ClosedChannelException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } public void run() { while (true) { try { // 写消息到服务器端 writeMessage(); int select = selector.select(); if (select > 0) { Setkeys = selector.selectedKeys(); Iterator iter = keys.iterator(); while (iter.hasNext()) { SelectionKey sk = iter.next(); if (sk.isReadable()) { readMessage(sk); } iter.remove(); } } } catch (Exception e) { e.printStackTrace(); } } } //从服务端接收消息 public void readMessage(SelectionKey sk) throws IOException, UnsupportedEncodingException { SocketChannel curSc = (SocketChannel) sk.channel(); ByteBuffer buffer = ByteBuffer.allocate(8); while (curSc.read(buffer) > 0) { buffer.flip(); System.out.println("Receive from server:" + new String(buffer.array(), "UTF-8")); buffer.clear(); } } //向服务端发送消息 public void writeMessage() throws IOException { while(true) { //每隔1秒向服务端发送一次数据 try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } try { String ss = "Server,how are you?"; ByteBuffer buffer = ByteBuffer.wrap(ss.getBytes("UTF-8")); while (buffer.hasRemaining()) { System.out.println("buffer.hasRemaining() is true."); socketChannel.write(buffer); } } catch (IOException e) { if (socketChannel.isOpen()) { socketChannel.close(); } e.printStackTrace(); } } } //初始化客户端 public void initClient() throws IOException, ClosedChannelException { InetSocketAddress addr = new InetSocketAddress(IP,PORT); socketChannel = SocketChannel.open(); selector = Selector.open(); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); // 连接到server socketChannel.connect(addr); while (!socketChannel.finishConnect()) { System.out.println("check finish connection"); } } /** * 停止客户端 */ public void stopServer() { try { if (selector != null && selector.isOpen()) { selector.close(); } if (socketChannel != null && socketChannel.isOpen()) { socketChannel.close(); } } catch (IOException e) { e.printStackTrace(); } }}
package com.clj;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class Main { public static void main(String[] args) { //线程数 int worker_num = 1000; //服务器IP String serverIp="192.168.1.85"; //服务器端口 int port=8989; //线程池 ExecutorService threadPool = Executors.newFixedThreadPool(worker_num); for (int n = 0; n < worker_num; n++) { NIOSocketClient client=new NIOSocketClient(serverIp,port); threadPool.execute(client); } }}
转载地址:http://qzvmb.baihongyu.com/