`
finux
  • 浏览: 200172 次
  • 性别: Icon_minigender_1
  • 来自: 深圳
社区版块
存档分类
最新评论

使用nio实现简单Echo服务

    博客分类:
  • Java
阅读更多

最近温习下nio API,无聊写个Echo服务,直接上代码了,有点懒,具体细节不解释,查阅API或源码吧,没有公开的src就反编译下。

 

EchoServer.java

 

package com.iteye.finux.echo;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

/**
 * 有关Selector的一些操作尽可能放同一个线程处理
 * @author zhu
 *
 */
public class EchoServer implements Runnable {
	private int port;
	private InetAddress host;
	private ServerSocketChannel channel;
	private Selector selector;
	private ByteBuffer buffer = ByteBuffer.allocate(1024);
	/**
	 * 若要保证所有信息都传回client, 这里可以把byte[]改为队列或其他容器
	 */
	private Map<SocketChannel, byte[]> messages = new HashMap<SocketChannel, byte[]>();
	
	public EchoServer(InetAddress host, int port) throws Exception {
		this.host = host;
		this.port = port;
		channel = ServerSocketChannel.open();
		channel.configureBlocking(false);
		selector = Selector.open();
	}
	
	/**
	 * @see java.lang.Runnable#run()
	 */
	@Override
	public void run() {
		try {
			channel.socket().bind(new InetSocketAddress(host, port));
			channel.register(selector, SelectionKey.OP_ACCEPT);
			while (true) {
				int count = selector.select();
				if (count < 1) {
					continue;
				}
				Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
				while (iterator.hasNext()) {
					SelectionKey key = iterator.next();
					iterator.remove();
					if (!key.isValid()) {
						continue;
					}
					if (key.isAcceptable()) {
						SocketChannel ch = ((ServerSocketChannel)key.channel()).accept();
						ch.configureBlocking(false);
						ch.register(selector, SelectionKey.OP_READ);
					} else if (key.isReadable()) {
						read(key);
					} else if (key.isWritable()) {
						write(key);
					}
				}
			}
		} catch(Exception ex) {
			ex.printStackTrace();
		}
	}
	
	/**
	 * 读操作
	 * @param key
	 * @throws IOException
	 */
	private void read(SelectionKey key) throws IOException {
		SocketChannel aChannel = (SocketChannel)key.channel();
		buffer.clear();
		int num = aChannel.read(buffer);
		if (num == -1) {
			key.cancel();
		} else if (num > 0) {
			buffer.flip();
			byte[] buf = Arrays.copyOfRange(buffer.array(), 0, num);
			messages.put(aChannel, buf);
			//将对应Channel注册写事件
			key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
			System.out.println("read from: " + aChannel.socket().getRemoteSocketAddress() + 
							   "; message: " + new String(buf));
		}
	}
	
	/**
	 * 写操作
	 * @param key
	 * @throws IOException
	 */
	private void write(SelectionKey key) throws IOException {
		SocketChannel aChannel = (SocketChannel)key.channel();
		byte[] buf = messages.get(aChannel);
		if (buf != null) {
			messages.remove(aChannel);
			key.interestOps(SelectionKey.OP_READ);
			buffer.clear();
			buffer.put(buf);
			buffer.flip();
			aChannel.write(buffer);
			System.out.println("write to: " + aChannel.socket().getRemoteSocketAddress() + 
							   "; message: " + new String(buf));
		}
	}
	
	public static void main(String[] args) throws Exception {
		EchoServer server = new EchoServer(InetAddress.getLocalHost(), 8080);
		EchoClient.threadStart(server);
	}
}

 

EchoClient.java

 

package com.iteye.finux.echo;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;

/**
 * @author zhu
 *
 */
public class EchoClient implements Runnable {
	private int port;
	private InetAddress host;
	private SocketChannel channel;
	private Selector selector;
	private ByteBuffer buffer = ByteBuffer.allocate(1024);
	private LinkedList<byte[]> messages = new LinkedList<byte[]>();
	
	public EchoClient(InetAddress host, int port) throws Exception {
		this.host = host;
		this.port = port;
		channel = SocketChannel.open();
		channel.configureBlocking(false);
		selector = Selector.open();
	}
	
	@Override
	public void run() {
		try {
			channel.connect(new InetSocketAddress(host, port));
			channel.register(selector, SelectionKey.OP_CONNECT);
			while (true) {
				synchronized (messages) {
					//检查队列是否有可写的数据
					if (!messages.isEmpty()) {
						SelectionKey key = channel.keyFor(selector);
						key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
					}
				}
				int count = selector.select();
				if (count > 0) {
					Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
					while (iterator.hasNext()) {
						SelectionKey key = iterator.next();
						iterator.remove();
						if (!key.isValid()) {
							continue;
						}
						
						if (key.isConnectable()) {
							channel.finishConnect();
							channel.register(selector, SelectionKey.OP_READ);
						} else if (key.isWritable()) {
							write(key);
						} else if (key.isReadable()) {
							read(key);
						}
					}
				}
			}
		} catch(Exception ex) {
			ex.printStackTrace();
		}
	}
	
	/**
	 * 向服务端发送数据
	 * @param msg
	 */
	public void send(byte[] msg) {
		synchronized(messages) {
			messages.addLast(msg);
			//将阻塞中的select调用直接返回
			selector.wakeup();
		}
	}

	/**
	 * 写操作
	 * @param key
	 * @throws IOException
	 */
	private void write(SelectionKey key) throws IOException {
		SocketChannel aChannel = (SocketChannel)key.channel();
		synchronized(messages) {
			if (!messages.isEmpty()) {
				byte[] buf = messages.getFirst();
				messages.removeFirst();
				buffer.clear();
				buffer.put(buf);
				buffer.flip();
				aChannel.write(buffer);
				key.interestOps(SelectionKey.OP_READ);
				System.out.println("write to: " + aChannel.socket().getRemoteSocketAddress() + 
								   "; message: " + new String(buf));
			}
		}
	}
	
	/**
	 * 读操作
	 * @param key
	 * @throws IOException
	 */
	private void read(SelectionKey key) throws IOException {
		SocketChannel aChannel = (SocketChannel)key.channel();
		buffer.clear();
		int len = aChannel.read(buffer);
		if (len > 0) {
			byte[] buf = Arrays.copyOfRange(buffer.array(), 0, len);
			System.out.println("read from: " + aChannel.socket().getRemoteSocketAddress() + 
							   "; message: " + new String(buf));
		}
	}
	
	static void threadStart(Runnable runnable) {
		Thread thread = new Thread(runnable);
		thread.setName(runnable.getClass().getSimpleName());
		thread.start();
	}
	
	public static void main(String[] args) throws Exception {
		EchoClient client = new EchoClient(InetAddress.getLocalHost(), 8080);
		threadStart(client);
		CommandReader reader = new CommandReader(client);
		threadStart(reader);
	}
}

/**
 * 标准输入读取数据
 * @author zhu
 *
 */
class CommandReader implements Runnable {
	private EchoClient client;
	private byte[] buffer = new byte[1024];
	
	public CommandReader(EchoClient client) {
		this.client = client;
	}
	
	/**
	 * @see java.lang.Runnable#run()
	 */
	@Override
	public void run() {
		try {
			while (true) {
				int len = System.in.read(buffer);
				if (len == -1) {
					break;
				} else if (len > 0) {
					byte[] buf = Arrays.copyOfRange(buffer, 0, len);
					client.send(buf);
				}
			}
		} catch(IOException ex) {
			ex.printStackTrace();
		}
	}
}
0
2
分享到:
评论
1 楼 albrich 2012-01-17  
不说话,不评论

相关推荐

Global site tag (gtag.js) - Google Analytics