1420c9859SShuo Chenpackage muduo.rpc; 2420c9859SShuo Chen 3420c9859SShuo Chenimport java.net.SocketAddress; 4b5a588dfSShuo Chenimport java.util.concurrent.CountDownLatch; 5420c9859SShuo Chenimport java.util.concurrent.Executors; 6b5a588dfSShuo Chenimport java.util.concurrent.TimeUnit; 7420c9859SShuo Chen 8420c9859SShuo Chenimport org.jboss.netty.bootstrap.ClientBootstrap; 9420c9859SShuo Chenimport org.jboss.netty.channel.Channel; 10420c9859SShuo Chenimport org.jboss.netty.channel.ChannelFactory; 11420c9859SShuo Chenimport org.jboss.netty.channel.ChannelFuture; 12420c9859SShuo Chenimport org.jboss.netty.channel.ChannelFutureListener; 13420c9859SShuo Chenimport org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; 14420c9859SShuo Chen 15420c9859SShuo Chenpublic class RpcClient extends RpcPeer { 16420c9859SShuo Chen 17420c9859SShuo Chen ClientBootstrap bootstrap; 18b5a588dfSShuo Chen private volatile RpcChannel rpcChannel; 19420c9859SShuo Chen 20420c9859SShuo Chen public RpcClient() { 21420c9859SShuo Chen ChannelFactory channelFactory = new NioClientSocketChannelFactory( 22420c9859SShuo Chen Executors.newCachedThreadPool(), 23420c9859SShuo Chen Executors.newCachedThreadPool()); 24420c9859SShuo Chen 25420c9859SShuo Chen bootstrap = new ClientBootstrap(channelFactory); 26420c9859SShuo Chen bootstrap.setPipelineFactory(new RpcChannelPiplineFactory(this)); 27420c9859SShuo Chen } 28420c9859SShuo Chen 29c5b4b28fSShuo Chen public RpcClient(ChannelFactory channelFactory) { 30c5b4b28fSShuo Chen bootstrap = new ClientBootstrap(channelFactory); 31c5b4b28fSShuo Chen bootstrap.setPipelineFactory(new RpcChannelPiplineFactory(this)); 32c5b4b28fSShuo Chen } 33c5b4b28fSShuo Chen 34420c9859SShuo Chen public RpcChannel blockingConnect(SocketAddress addr) { 35b5a588dfSShuo Chen final CountDownLatch latch = new CountDownLatch(1); 36b5a588dfSShuo Chen startConnect(addr, new NewChannelCallback() { 37b5a588dfSShuo Chen @Override 38b5a588dfSShuo Chen public void run(RpcChannel channel) { 39b5a588dfSShuo Chen assert channel == RpcClient.this.rpcChannel; 40b5a588dfSShuo Chen latch.countDown(); 41b5a588dfSShuo Chen } 42b5a588dfSShuo Chen }); 43b5a588dfSShuo Chen try { 44b5a588dfSShuo Chen latch.await(5, TimeUnit.SECONDS); 45b5a588dfSShuo Chen } catch (InterruptedException e) { 46b5a588dfSShuo Chen // TODO Auto-generated catch block 47b5a588dfSShuo Chen e.printStackTrace(); 48b5a588dfSShuo Chen } 49420c9859SShuo Chen return rpcChannel; 50420c9859SShuo Chen } 51420c9859SShuo Chen 52420c9859SShuo Chen public void startConnect(SocketAddress addr, NewChannelCallback newChannelCallback) { 53420c9859SShuo Chen ChannelFuture future = bootstrap.connect(addr); 54420c9859SShuo Chen future.addListener(new ChannelFutureListener() { 55420c9859SShuo Chen @Override 56420c9859SShuo Chen public void operationComplete(ChannelFuture future) throws Exception { 5771f65b4dSShuo Chen //System.err.println("operationComplete"); 58420c9859SShuo Chen } 59420c9859SShuo Chen }); 60420c9859SShuo Chen this.newChannelCallback = newChannelCallback; 61420c9859SShuo Chen } 62420c9859SShuo Chen 63b5a588dfSShuo Chen public void stop() { 64b5a588dfSShuo Chen bootstrap.releaseExternalResources(); 65b5a588dfSShuo Chen } 66b5a588dfSShuo Chen 67420c9859SShuo Chen @Override 68420c9859SShuo Chen public void channelConnected(Channel channel) { 69b5a588dfSShuo Chen if (rpcChannel == null) { 70b5a588dfSShuo Chen rpcChannel = new RpcChannel(channel); 71b5a588dfSShuo Chen setupNewChannel(rpcChannel); 72b5a588dfSShuo Chen } 73420c9859SShuo Chen } 74420c9859SShuo Chen 75420c9859SShuo Chen public RpcChannel getChannel() { 76420c9859SShuo Chen return rpcChannel; 77420c9859SShuo Chen } 78420c9859SShuo Chen 79420c9859SShuo Chen} 80