1package muduo.rpc; 2 3import java.net.SocketAddress; 4import java.util.concurrent.CountDownLatch; 5import java.util.concurrent.Executors; 6import java.util.concurrent.TimeUnit; 7 8import org.jboss.netty.bootstrap.ClientBootstrap; 9import org.jboss.netty.channel.Channel; 10import org.jboss.netty.channel.ChannelFactory; 11import org.jboss.netty.channel.ChannelFuture; 12import org.jboss.netty.channel.ChannelFutureListener; 13import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; 14 15public class RpcClient extends RpcPeer { 16 17 ClientBootstrap bootstrap; 18 private volatile RpcChannel rpcChannel; 19 20 public RpcClient() { 21 ChannelFactory channelFactory = new NioClientSocketChannelFactory( 22 Executors.newCachedThreadPool(), 23 Executors.newCachedThreadPool()); 24 25 bootstrap = new ClientBootstrap(channelFactory); 26 bootstrap.setPipelineFactory(new RpcChannelPiplineFactory(this)); 27 } 28 29 public RpcClient(ChannelFactory channelFactory) { 30 bootstrap = new ClientBootstrap(channelFactory); 31 bootstrap.setPipelineFactory(new RpcChannelPiplineFactory(this)); 32 } 33 34 public RpcChannel blockingConnect(SocketAddress addr) { 35 final CountDownLatch latch = new CountDownLatch(1); 36 startConnect(addr, new NewChannelCallback() { 37 @Override 38 public void run(RpcChannel channel) { 39 assert channel == RpcClient.this.rpcChannel; 40 latch.countDown(); 41 } 42 }); 43 try { 44 latch.await(5, TimeUnit.SECONDS); 45 } catch (InterruptedException e) { 46 // TODO Auto-generated catch block 47 e.printStackTrace(); 48 } 49 return rpcChannel; 50 } 51 52 public void startConnect(SocketAddress addr, NewChannelCallback newChannelCallback) { 53 ChannelFuture future = bootstrap.connect(addr); 54 future.addListener(new ChannelFutureListener() { 55 @Override 56 public void operationComplete(ChannelFuture future) throws Exception { 57 //System.err.println("operationComplete"); 58 } 59 }); 60 this.newChannelCallback = newChannelCallback; 61 } 62 63 public void stop() { 64 bootstrap.releaseExternalResources(); 65 } 66 67 @Override 68 public void channelConnected(Channel channel) { 69 if (rpcChannel == null) { 70 rpcChannel = new RpcChannel(channel); 71 setupNewChannel(rpcChannel); 72 } 73 } 74 75 public RpcChannel getChannel() { 76 return rpcChannel; 77 } 78 79} 80