RpcClient.java revision b5a588df
1420c9859SShuo Chenpackage muduo.rpc; 2420c9859SShuo Chen 3420c9859SShuo Chenimport java.net.SocketAddress; 4b5a588dfSShuo Chenimport java.util.concurrent.CountDownLatch; 5420c9859SShuo Chenimport java.util.concurrent.Executors; 6b5a588dfSShuo Chenimport java.util.concurrent.TimeUnit; 7420c9859SShuo Chen 8420c9859SShuo Chenimport org.jboss.netty.bootstrap.ClientBootstrap; 9420c9859SShuo Chenimport org.jboss.netty.channel.Channel; 10420c9859SShuo Chenimport org.jboss.netty.channel.ChannelFactory; 11420c9859SShuo Chenimport org.jboss.netty.channel.ChannelFuture; 12420c9859SShuo Chenimport org.jboss.netty.channel.ChannelFutureListener; 13420c9859SShuo Chenimport org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; 14420c9859SShuo Chen 15420c9859SShuo Chenpublic class RpcClient extends RpcPeer { 16420c9859SShuo Chen 17420c9859SShuo Chen ClientBootstrap bootstrap; 18b5a588dfSShuo Chen private volatile RpcChannel rpcChannel; 19420c9859SShuo Chen 20420c9859SShuo Chen public RpcClient() { 21420c9859SShuo Chen ChannelFactory channelFactory = new NioClientSocketChannelFactory( 22420c9859SShuo Chen Executors.newCachedThreadPool(), 23420c9859SShuo Chen Executors.newCachedThreadPool()); 24420c9859SShuo Chen 25420c9859SShuo Chen bootstrap = new ClientBootstrap(channelFactory); 26420c9859SShuo Chen bootstrap.setPipelineFactory(new RpcChannelPiplineFactory(this)); 27420c9859SShuo Chen } 28420c9859SShuo Chen 29420c9859SShuo Chen public RpcChannel blockingConnect(SocketAddress addr) { 30b5a588dfSShuo Chen final CountDownLatch latch = new CountDownLatch(1); 31b5a588dfSShuo Chen startConnect(addr, new NewChannelCallback() { 32b5a588dfSShuo Chen @Override 33b5a588dfSShuo Chen public void run(RpcChannel channel) { 34b5a588dfSShuo Chen assert channel == RpcClient.this.rpcChannel; 35b5a588dfSShuo Chen latch.countDown(); 36b5a588dfSShuo Chen } 37b5a588dfSShuo Chen }); 38b5a588dfSShuo Chen try { 39b5a588dfSShuo Chen latch.await(5, TimeUnit.SECONDS); 40b5a588dfSShuo Chen } catch (InterruptedException e) { 41b5a588dfSShuo Chen // TODO Auto-generated catch block 42b5a588dfSShuo Chen e.printStackTrace(); 43b5a588dfSShuo Chen } 44420c9859SShuo Chen return rpcChannel; 45420c9859SShuo Chen } 46420c9859SShuo Chen 47420c9859SShuo Chen public void startConnect(SocketAddress addr, NewChannelCallback newChannelCallback) { 48420c9859SShuo Chen ChannelFuture future = bootstrap.connect(addr); 49420c9859SShuo Chen future.addListener(new ChannelFutureListener() { 50420c9859SShuo Chen @Override 51420c9859SShuo Chen public void operationComplete(ChannelFuture future) throws Exception { 52420c9859SShuo Chen System.err.println("operationComplete"); 53420c9859SShuo Chen // channelConnected(future.getChannel()); 54420c9859SShuo Chen } 55420c9859SShuo Chen }); 56420c9859SShuo Chen this.newChannelCallback = newChannelCallback; 57420c9859SShuo Chen } 58420c9859SShuo Chen 59b5a588dfSShuo Chen public void stop() { 60b5a588dfSShuo Chen bootstrap.releaseExternalResources(); 61b5a588dfSShuo Chen } 62b5a588dfSShuo Chen 63420c9859SShuo Chen @Override 64420c9859SShuo Chen public void channelConnected(Channel channel) { 65b5a588dfSShuo Chen if (rpcChannel == null) { 66b5a588dfSShuo Chen rpcChannel = new RpcChannel(channel); 67b5a588dfSShuo Chen setupNewChannel(rpcChannel); 68b5a588dfSShuo Chen } 69420c9859SShuo Chen } 70420c9859SShuo Chen 71420c9859SShuo Chen public RpcChannel getChannel() { 72420c9859SShuo Chen return rpcChannel; 73420c9859SShuo Chen } 74420c9859SShuo Chen 75420c9859SShuo Chen} 76