《Netty 权威指南》—— AIO创建的TimeClient源码分析

声明:本文是《Netty 权威指南》的样章,感谢博文视点授权并发编程网站发布样章,禁止以任何形式转载此文。

异步非阻塞IO版本的时间服务器服务端已经介绍完毕,下面我们继续看客户端的实现。

首先看下客户端主函数的实现,AIO时间服务器客户端  TimeClient:

public class TimeClient {

    /**
     * @param args
     */
    public static void main(String[] args) {
	int port = 8080;
	if (args != null && args.length > 0) {
	    try {
		port = Integer.valueOf(args[0]);
	    } catch (NumberFormatException e) {
		// 采用默认值
	    }
	}
	new Thread(new AsyncTimeClientHandler("127.0.0.1", port),
		"AIO-AsyncTimeClientHandler-001").start();

    }
}

第15行我们通过一个独立的IO线程创建异步时间服务器客户端handler,在实际项目中,我们不需要独立的线程创建异步连接对象,因为底层都是通过JDK的系统回调实现的,在后面运行时间服务器程序的时候,我们会抓取线程调用堆栈给大家展示。继续看代码, AsyncTimeClientHandler的实现类源码如下:

public class AsyncTimeClientHandler implements
	CompletionHandler<Void, AsyncTimeClientHandler>, Runnable {

    private AsynchronousSocketChannel client;
    private String host;
    private int port;
    private CountDownLatch latch;

    public AsyncTimeClientHandler(String host, int port) {
	this.host = host;
	this.port = port;
	try {
	    client = AsynchronousSocketChannel.open();
	} catch (IOException e) {
	    e.printStackTrace();
	}
    }

    @Override
    public void run() {
	latch = new CountDownLatch(1);
	client.connect(new InetSocketAddress(host, port), this, this);
	try {
	    latch.await();
	} catch (InterruptedException e1) {
	    e1.printStackTrace();
	}
	try {
	    client.close();
	} catch (IOException e) {
	    e.printStackTrace();
	}
    }

    @Override
    public void completed(Void result, AsyncTimeClientHandler attachment) {
	byte[] req = "QUERY TIME ORDER".getBytes();
	ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
	writeBuffer.put(req);
	writeBuffer.flip();
	client.write(writeBuffer, writeBuffer,
		new CompletionHandler<Integer, ByteBuffer>() {
		    @Override
		    public void completed(Integer result, ByteBuffer buffer) {
			if (buffer.hasRemaining()) {
			    client.write(buffer, buffer, this);
			} else {
			    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
			    client.read(
				    readBuffer,
				    readBuffer,
				    new CompletionHandler<Integer, ByteBuffer>() {
					@Override
					public void completed(Integer result,
						ByteBuffer buffer) {
					    buffer.flip();
					    byte[] bytes = new byte[buffer
						    .remaining()];
					    buffer.get(bytes);
					    String body;
					    try {
						body = new String(bytes,
							"UTF-8");
						System.out.println("Now is : "
							+ body);
						latch.countDown();
					    } catch (UnsupportedEncodingException e) {
						e.printStackTrace();
					    }
					}

					@Override
					public void failed(Throwable exc,
						ByteBuffer attachment) {
					    try {
						client.close();
						latch.countDown();
					    } catch (IOException e) {
						// ingnore on close
					    }
					}
				    });
			}
		    }

		    @Override
		    public void failed(Throwable exc, ByteBuffer attachment) {
			try {
			    client.close();
			    latch.countDown();
			} catch (IOException e) {
			    // ingnore on close
			}
		    }
		});
    }

    @Override
    public void failed(Throwable exc, AsyncTimeClientHandler attachment) {
	exc.printStackTrace();
	try {
	    client.close();
	    latch.countDown();
	} catch (IOException e) {
	    e.printStackTrace();
	}
    }
}

由于在AsyncTimeClientHandler中大量使用了内部匿名类,所以代码看起来稍微有些复杂,下面我们就对主要代码进行详细解说。

9-17行是构造方法,首先通过AsynchronousSocketChannel的open方法创建一个新的AsynchronousSocketChannel对象。然后跳到第36行,创建CountDownLatch进行等待,防止异步操作没有执行完成线程就退出。第37行通过connect方法发起异步操作,它有两个参数,分别如下:

1)      A attachment : AsynchronousSocketChannel的附件,用于回调通知时作为入参被传递,调用者可以自定义;

2)      CompletionHandler<Void,? super A> handler:异步操作回调通知接口,由调用者实现。

在本例程中,我们的两个参数都使用AsyncTimeClientHandler类本身,因为它实现了CompletionHandler接口。

接下来我们看异步连接成功之后的方法回调completed方法,代码第39行,我们创建请求消息体,对其进行编码,然后拷贝到发送缓冲区writeBuffer中,调用AsynchronousSocketChannel的write方法进行异步写,与服务端类似,我们可以实现CompletionHandler<Integer, ByteBuffer>接口用于写操作完成后的回调,代码第45-47行,如果发送缓冲区中仍有尚未发送的字节,我们继续异步发送,如果已经发送完成,则执行异步读取操作。

代码第64-97行是客户端异步读取时间服务器服务端应答消息的处理逻辑,代码第49行我们调用AsynchronousSocketChannel的read方法异步读取服务端的响应消息,由于read操作是异步的,所以我们通过内部匿名类实现CompletionHandler<Integer, ByteBuffer>接口,当读取完成被JDK回调时,我们构造应答消息。第56-63行我们从CompletionHandler的ByteBuffer中读取应答消息,然后打印结果。

第197-96行,当读取发生异常时,我们关闭链路,同时调用CountDownLatch的countDown方法让AsyncTimeClientHandler线程执行完毕,客户端退出执行。

需要指出的是,正如之前的NIO例程,我们并没有完整的处理网络的半包读写,当对例程进行功能测试的时候没有问题,但是,如果对代码稍加改造,进行压力或者性能测试,就会发现输出结果存在问题。

由于半包的读写会作为专门的小节在Netty的应用和源码分析章节进行详细讲解,在NIO的入门章节我们就不详细展开介绍,以便读者能够将注意力集中在NIO的入门知识上来。

下面的小节我们运行AIO版本的时间服务器程序,并通过打印线程堆栈的方式看下JDK回调异步Channel CompletionHandler的调用情况。

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 《Netty 权威指南》—— AIO创建的TimeClient源码分析


李 林锋

《Netty权威指南》作者,高级架构师。
FavoriteLoading添加本文到我的收藏
  • Trackback 关闭
  • 评论 (0)
  1. 暂无评论

您必须 登陆 后才能发表评论

return top