1package muduo.rpc;
2
3import java.net.SocketAddress;
4import java.util.concurrent.CountDownLatch;
5import java.util.concurrent.Executors;
6import java.util.concurrent.TimeUnit;
7
8import org.jboss.netty.bootstrap.ClientBootstrap;
9import org.jboss.netty.channel.Channel;
10import org.jboss.netty.channel.ChannelFactory;
11import org.jboss.netty.channel.ChannelFuture;
12import org.jboss.netty.channel.ChannelFutureListener;
13import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
14
15public class RpcClient extends RpcPeer {
16
17    ClientBootstrap bootstrap;
18    private volatile RpcChannel rpcChannel;
19
20    public RpcClient() {
21        ChannelFactory channelFactory = new NioClientSocketChannelFactory(
22                Executors.newCachedThreadPool(),
23                Executors.newCachedThreadPool());
24
25        bootstrap = new ClientBootstrap(channelFactory);
26        bootstrap.setPipelineFactory(new RpcChannelPiplineFactory(this));
27    }
28
29    public RpcClient(ChannelFactory channelFactory) {
30        bootstrap = new ClientBootstrap(channelFactory);
31        bootstrap.setPipelineFactory(new RpcChannelPiplineFactory(this));
32    }
33
34    public RpcChannel blockingConnect(SocketAddress addr) {
35        final CountDownLatch latch = new CountDownLatch(1);
36        startConnect(addr, new NewChannelCallback() {
37            @Override
38            public void run(RpcChannel channel) {
39                assert channel == RpcClient.this.rpcChannel;
40                latch.countDown();
41            }
42        });
43        try {
44            latch.await(5, TimeUnit.SECONDS);
45        } catch (InterruptedException e) {
46            // TODO Auto-generated catch block
47            e.printStackTrace();
48        }
49        return rpcChannel;
50    }
51
52    public void startConnect(SocketAddress addr, NewChannelCallback newChannelCallback) {
53        ChannelFuture future = bootstrap.connect(addr);
54        future.addListener(new ChannelFutureListener() {
55            @Override
56            public void operationComplete(ChannelFuture future) throws Exception {
57                //System.err.println("operationComplete");
58            }
59        });
60        this.newChannelCallback = newChannelCallback;
61    }
62
63    public void stop() {
64        bootstrap.releaseExternalResources();
65    }
66
67    @Override
68    public void channelConnected(Channel channel) {
69        if (rpcChannel == null) {
70            rpcChannel = new RpcChannel(channel);
71            setupNewChannel(rpcChannel);
72        }
73    }
74
75    public RpcChannel getChannel() {
76        return rpcChannel;
77    }
78
79}
80