RpcClient.java revision 420c9859
1420c9859SShuo Chenpackage muduo.rpc; 2420c9859SShuo Chen 3420c9859SShuo Chenimport java.net.SocketAddress; 4420c9859SShuo Chenimport java.util.concurrent.Executors; 5420c9859SShuo Chen 6420c9859SShuo Chenimport org.jboss.netty.bootstrap.ClientBootstrap; 7420c9859SShuo Chenimport org.jboss.netty.channel.Channel; 8420c9859SShuo Chenimport org.jboss.netty.channel.ChannelFactory; 9420c9859SShuo Chenimport org.jboss.netty.channel.ChannelFuture; 10420c9859SShuo Chenimport org.jboss.netty.channel.ChannelFutureListener; 11420c9859SShuo Chenimport org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; 12420c9859SShuo Chen 13420c9859SShuo Chenpublic class RpcClient extends RpcPeer { 14420c9859SShuo Chen 15420c9859SShuo Chen ClientBootstrap bootstrap; 16420c9859SShuo Chen private RpcChannel rpcChannel; 17420c9859SShuo Chen 18420c9859SShuo Chen public RpcClient() { 19420c9859SShuo Chen ChannelFactory channelFactory = new NioClientSocketChannelFactory( 20420c9859SShuo Chen Executors.newCachedThreadPool(), 21420c9859SShuo Chen Executors.newCachedThreadPool()); 22420c9859SShuo Chen 23420c9859SShuo Chen bootstrap = new ClientBootstrap(channelFactory); 24420c9859SShuo Chen bootstrap.setPipelineFactory(new RpcChannelPiplineFactory(this)); 25420c9859SShuo Chen } 26420c9859SShuo Chen 27420c9859SShuo Chen public RpcChannel blockingConnect(SocketAddress addr) { 28420c9859SShuo Chen Channel channel = bootstrap.connect(addr).awaitUninterruptibly().getChannel(); 29420c9859SShuo Chen rpcChannel = new RpcChannel(channel); 30420c9859SShuo Chen RpcMessageHandler handler = (RpcMessageHandler) channel.getPipeline().get("handler"); 31420c9859SShuo Chen handler.setChannel(rpcChannel); 32420c9859SShuo Chen return rpcChannel; 33420c9859SShuo Chen } 34420c9859SShuo Chen 35420c9859SShuo Chen public void startConnect(SocketAddress addr, NewChannelCallback newChannelCallback) { 36420c9859SShuo Chen ChannelFuture future = bootstrap.connect(addr); 37420c9859SShuo Chen future.addListener(new ChannelFutureListener() { 38420c9859SShuo Chen @Override 39420c9859SShuo Chen public void operationComplete(ChannelFuture future) throws Exception { 40420c9859SShuo Chen System.err.println("operationComplete"); 41420c9859SShuo Chen // channelConnected(future.getChannel()); 42420c9859SShuo Chen } 43420c9859SShuo Chen }); 44420c9859SShuo Chen this.newChannelCallback = newChannelCallback; 45420c9859SShuo Chen } 46420c9859SShuo Chen 47420c9859SShuo Chen @Override 48420c9859SShuo Chen public void channelConnected(Channel channel) { 49420c9859SShuo Chen rpcChannel = new RpcChannel(channel); 50420c9859SShuo Chen setupNewChannel(rpcChannel); 51420c9859SShuo Chen } 52420c9859SShuo Chen 53420c9859SShuo Chen public RpcChannel getChannel() { 54420c9859SShuo Chen return rpcChannel; 55420c9859SShuo Chen } 56420c9859SShuo Chen 57420c9859SShuo Chen} 58