RpcClient.java revision b5a588df
1420c9859SShuo Chenpackage muduo.rpc;
2420c9859SShuo Chen
3420c9859SShuo Chenimport java.net.SocketAddress;
4b5a588dfSShuo Chenimport java.util.concurrent.CountDownLatch;
5420c9859SShuo Chenimport java.util.concurrent.Executors;
6b5a588dfSShuo Chenimport java.util.concurrent.TimeUnit;
7420c9859SShuo Chen
8420c9859SShuo Chenimport org.jboss.netty.bootstrap.ClientBootstrap;
9420c9859SShuo Chenimport org.jboss.netty.channel.Channel;
10420c9859SShuo Chenimport org.jboss.netty.channel.ChannelFactory;
11420c9859SShuo Chenimport org.jboss.netty.channel.ChannelFuture;
12420c9859SShuo Chenimport org.jboss.netty.channel.ChannelFutureListener;
13420c9859SShuo Chenimport org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
14420c9859SShuo Chen
15420c9859SShuo Chenpublic class RpcClient extends RpcPeer {
16420c9859SShuo Chen
17420c9859SShuo Chen    ClientBootstrap bootstrap;
18b5a588dfSShuo Chen    private volatile RpcChannel rpcChannel;
19420c9859SShuo Chen
20420c9859SShuo Chen    public RpcClient() {
21420c9859SShuo Chen        ChannelFactory channelFactory = new NioClientSocketChannelFactory(
22420c9859SShuo Chen                Executors.newCachedThreadPool(),
23420c9859SShuo Chen                Executors.newCachedThreadPool());
24420c9859SShuo Chen
25420c9859SShuo Chen        bootstrap = new ClientBootstrap(channelFactory);
26420c9859SShuo Chen        bootstrap.setPipelineFactory(new RpcChannelPiplineFactory(this));
27420c9859SShuo Chen    }
28420c9859SShuo Chen
29420c9859SShuo Chen    public RpcChannel blockingConnect(SocketAddress addr) {
30b5a588dfSShuo Chen        final CountDownLatch latch = new CountDownLatch(1);
31b5a588dfSShuo Chen        startConnect(addr, new NewChannelCallback() {
32b5a588dfSShuo Chen            @Override
33b5a588dfSShuo Chen            public void run(RpcChannel channel) {
34b5a588dfSShuo Chen                assert channel == RpcClient.this.rpcChannel;
35b5a588dfSShuo Chen                latch.countDown();
36b5a588dfSShuo Chen            }
37b5a588dfSShuo Chen        });
38b5a588dfSShuo Chen        try {
39b5a588dfSShuo Chen            latch.await(5, TimeUnit.SECONDS);
40b5a588dfSShuo Chen        } catch (InterruptedException e) {
41b5a588dfSShuo Chen            // TODO Auto-generated catch block
42b5a588dfSShuo Chen            e.printStackTrace();
43b5a588dfSShuo Chen        }
44420c9859SShuo Chen        return rpcChannel;
45420c9859SShuo Chen    }
46420c9859SShuo Chen
47420c9859SShuo Chen    public void startConnect(SocketAddress addr, NewChannelCallback newChannelCallback) {
48420c9859SShuo Chen        ChannelFuture future = bootstrap.connect(addr);
49420c9859SShuo Chen        future.addListener(new ChannelFutureListener() {
50420c9859SShuo Chen            @Override
51420c9859SShuo Chen            public void operationComplete(ChannelFuture future) throws Exception {
52420c9859SShuo Chen                System.err.println("operationComplete");
53420c9859SShuo Chen                // channelConnected(future.getChannel());
54420c9859SShuo Chen            }
55420c9859SShuo Chen        });
56420c9859SShuo Chen        this.newChannelCallback = newChannelCallback;
57420c9859SShuo Chen    }
58420c9859SShuo Chen
59b5a588dfSShuo Chen    public void stop() {
60b5a588dfSShuo Chen        bootstrap.releaseExternalResources();
61b5a588dfSShuo Chen    }
62b5a588dfSShuo Chen
63420c9859SShuo Chen    @Override
64420c9859SShuo Chen    public void channelConnected(Channel channel) {
65b5a588dfSShuo Chen        if (rpcChannel == null) {
66b5a588dfSShuo Chen            rpcChannel = new RpcChannel(channel);
67b5a588dfSShuo Chen            setupNewChannel(rpcChannel);
68b5a588dfSShuo Chen        }
69420c9859SShuo Chen    }
70420c9859SShuo Chen
71420c9859SShuo Chen    public RpcChannel getChannel() {
72420c9859SShuo Chen        return rpcChannel;
73420c9859SShuo Chen    }
74420c9859SShuo Chen
75420c9859SShuo Chen}
76