RpcClient.java revision 420c9859
1420c9859SShuo Chenpackage muduo.rpc;
2420c9859SShuo Chen
3420c9859SShuo Chenimport java.net.SocketAddress;
4420c9859SShuo Chenimport java.util.concurrent.Executors;
5420c9859SShuo Chen
6420c9859SShuo Chenimport org.jboss.netty.bootstrap.ClientBootstrap;
7420c9859SShuo Chenimport org.jboss.netty.channel.Channel;
8420c9859SShuo Chenimport org.jboss.netty.channel.ChannelFactory;
9420c9859SShuo Chenimport org.jboss.netty.channel.ChannelFuture;
10420c9859SShuo Chenimport org.jboss.netty.channel.ChannelFutureListener;
11420c9859SShuo Chenimport org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
12420c9859SShuo Chen
13420c9859SShuo Chenpublic class RpcClient extends RpcPeer {
14420c9859SShuo Chen
15420c9859SShuo Chen    ClientBootstrap bootstrap;
16420c9859SShuo Chen    private RpcChannel rpcChannel;
17420c9859SShuo Chen
18420c9859SShuo Chen    public RpcClient() {
19420c9859SShuo Chen        ChannelFactory channelFactory = new NioClientSocketChannelFactory(
20420c9859SShuo Chen                Executors.newCachedThreadPool(),
21420c9859SShuo Chen                Executors.newCachedThreadPool());
22420c9859SShuo Chen
23420c9859SShuo Chen        bootstrap = new ClientBootstrap(channelFactory);
24420c9859SShuo Chen        bootstrap.setPipelineFactory(new RpcChannelPiplineFactory(this));
25420c9859SShuo Chen    }
26420c9859SShuo Chen
27420c9859SShuo Chen    public RpcChannel blockingConnect(SocketAddress addr) {
28420c9859SShuo Chen        Channel channel = bootstrap.connect(addr).awaitUninterruptibly().getChannel();
29420c9859SShuo Chen        rpcChannel = new RpcChannel(channel);
30420c9859SShuo Chen        RpcMessageHandler handler = (RpcMessageHandler) channel.getPipeline().get("handler");
31420c9859SShuo Chen        handler.setChannel(rpcChannel);
32420c9859SShuo Chen        return rpcChannel;
33420c9859SShuo Chen    }
34420c9859SShuo Chen
35420c9859SShuo Chen    public void startConnect(SocketAddress addr, NewChannelCallback newChannelCallback) {
36420c9859SShuo Chen        ChannelFuture future = bootstrap.connect(addr);
37420c9859SShuo Chen        future.addListener(new ChannelFutureListener() {
38420c9859SShuo Chen            @Override
39420c9859SShuo Chen            public void operationComplete(ChannelFuture future) throws Exception {
40420c9859SShuo Chen                System.err.println("operationComplete");
41420c9859SShuo Chen                // channelConnected(future.getChannel());
42420c9859SShuo Chen            }
43420c9859SShuo Chen        });
44420c9859SShuo Chen        this.newChannelCallback = newChannelCallback;
45420c9859SShuo Chen    }
46420c9859SShuo Chen
47420c9859SShuo Chen    @Override
48420c9859SShuo Chen    public void channelConnected(Channel channel) {
49420c9859SShuo Chen        rpcChannel = new RpcChannel(channel);
50420c9859SShuo Chen        setupNewChannel(rpcChannel);
51420c9859SShuo Chen    }
52420c9859SShuo Chen
53420c9859SShuo Chen    public RpcChannel getChannel() {
54420c9859SShuo Chen        return rpcChannel;
55420c9859SShuo Chen    }
56420c9859SShuo Chen
57420c9859SShuo Chen}
58