1b5a588dfSShuo Chenpackage echo;
2b5a588dfSShuo Chen
3b5a588dfSShuo Chenimport java.net.InetSocketAddress;
471f65b4dSShuo Chenimport java.net.SocketAddress;
5c5b4b28fSShuo Chenimport java.util.concurrent.CountDownLatch;
6c5b4b28fSShuo Chenimport java.util.concurrent.Executors;
7c5b4b28fSShuo Chen
871f65b4dSShuo Chenimport muduo.rpc.NewChannelCallback;
971f65b4dSShuo Chenimport muduo.rpc.RpcChannel;
1071f65b4dSShuo Chenimport muduo.rpc.RpcClient;
1171f65b4dSShuo Chen
12c5b4b28fSShuo Chenimport org.jboss.netty.channel.ChannelFactory;
13c5b4b28fSShuo Chenimport org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
14c5b4b28fSShuo Chen
1571f65b4dSShuo Chenimport com.google.protobuf.RpcCallback;
16c5b4b28fSShuo Chenimport com.google.protobuf.ServiceException;
17b5a588dfSShuo Chen
18b5a588dfSShuo Chenimport echo.EchoProto.EchoRequest;
19b5a588dfSShuo Chenimport echo.EchoProto.EchoResponse;
20b5a588dfSShuo Chenimport echo.EchoProto.EchoService;
21b5a588dfSShuo Chenimport echo.EchoProto.EchoService.BlockingInterface;
22b5a588dfSShuo Chen
23b5a588dfSShuo Chenpublic class EchoClient {
2471f65b4dSShuo Chen    static final int kRequests = 50000;
2571f65b4dSShuo Chen    static CountDownLatch allConnected, startLatch, allFinished;
26c5b4b28fSShuo Chen
2771f65b4dSShuo Chen    public static class SyncClient implements Runnable {
28c5b4b28fSShuo Chen        private ChannelFactory channelFactory;
2971f65b4dSShuo Chen        private SocketAddress serverAddr;
30c5b4b28fSShuo Chen
3171f65b4dSShuo Chen        public SyncClient(ChannelFactory channelFactory, SocketAddress server) {
32c5b4b28fSShuo Chen            this.channelFactory = channelFactory;
33c5b4b28fSShuo Chen            this.serverAddr = server;
34c5b4b28fSShuo Chen        }
35c5b4b28fSShuo Chen
36c5b4b28fSShuo Chen        @Override
37c5b4b28fSShuo Chen        public void run() {
38c5b4b28fSShuo Chen            System.out.println(Thread.currentThread());
39c5b4b28fSShuo Chen            RpcClient client = new RpcClient(channelFactory);
40c5b4b28fSShuo Chen            RpcChannel channel = client.blockingConnect(serverAddr);
41c5b4b28fSShuo Chen            BlockingInterface remoteService = EchoService.newBlockingStub(channel);
42c5b4b28fSShuo Chen            String payload = new String(new byte[100]);
43c5b4b28fSShuo Chen            payload = "Hello";
44c5b4b28fSShuo Chen            EchoRequest request = EchoRequest.newBuilder().setPayload(payload).build();
45c5b4b28fSShuo Chen
4671f65b4dSShuo Chen            allConnected.countDown();
4771f65b4dSShuo Chen            try {
4871f65b4dSShuo Chen                startLatch.await();
4971f65b4dSShuo Chen            } catch (InterruptedException e) {
5071f65b4dSShuo Chen                // TODO Auto-generated catch block
5171f65b4dSShuo Chen                e.printStackTrace();
5271f65b4dSShuo Chen            }
53c5b4b28fSShuo Chen            for (int i = 0; i < kRequests; ++i) {
54c5b4b28fSShuo Chen                EchoResponse response;
55c5b4b28fSShuo Chen                try {
56c5b4b28fSShuo Chen                    response = remoteService.echo(null, request);
57c5b4b28fSShuo Chen                    assert response.getPayload().equals(payload);
58c5b4b28fSShuo Chen                } catch (ServiceException e) {
59c5b4b28fSShuo Chen                    // TODO Auto-generated catch block
60c5b4b28fSShuo Chen                    e.printStackTrace();
61c5b4b28fSShuo Chen                }
62c5b4b28fSShuo Chen                // System.out.println(response);
63c5b4b28fSShuo Chen            }
6471f65b4dSShuo Chen            allFinished.countDown();
65c5b4b28fSShuo Chen            System.out.println(Thread.currentThread());
66c5b4b28fSShuo Chen            // System.out.println(response);
67c5b4b28fSShuo Chen            channel.disconnect();
68c5b4b28fSShuo Chen            // client.stop();
69c5b4b28fSShuo Chen        }
70c5b4b28fSShuo Chen    }
71b5a588dfSShuo Chen
7271f65b4dSShuo Chen    @SuppressWarnings("unused")
7371f65b4dSShuo Chen    private static void runSyncClients(InetSocketAddress server, int nClients, int nSelectors)
7471f65b4dSShuo Chen            throws InterruptedException {
75c5b4b28fSShuo Chen        ChannelFactory channelFactory = new NioClientSocketChannelFactory(
76c5b4b28fSShuo Chen                Executors.newCachedThreadPool(),
7771f65b4dSShuo Chen                Executors.newCachedThreadPool(),
7871f65b4dSShuo Chen                nSelectors);
7971f65b4dSShuo Chen        allConnected = new CountDownLatch(nClients);
8071f65b4dSShuo Chen        startLatch = new CountDownLatch(1);
8171f65b4dSShuo Chen        allFinished = new CountDownLatch(nClients);
8271f65b4dSShuo Chen        Thread[] threads = new Thread[nClients];
8371f65b4dSShuo Chen        for (int i = 0; i < nClients; ++i) {
8471f65b4dSShuo Chen            threads[i] = new Thread(new SyncClient(channelFactory, server));
85c5b4b28fSShuo Chen            threads[i].start();
86b5a588dfSShuo Chen        }
8771f65b4dSShuo Chen        allConnected.await();
8871f65b4dSShuo Chen        long start = System.currentTimeMillis();
8971f65b4dSShuo Chen        startLatch.countDown();
9071f65b4dSShuo Chen        allFinished.await();
91b5a588dfSShuo Chen        long end = System.currentTimeMillis();
92c5b4b28fSShuo Chen        System.err.println(end - start);
9371f65b4dSShuo Chen        System.err.println(nClients * kRequests * 1000L / (end - start));
94b5a588dfSShuo Chen    }
95b5a588dfSShuo Chen
9671f65b4dSShuo Chen    public static class AsyncClient {
9771f65b4dSShuo Chen
9871f65b4dSShuo Chen        private RpcClient client;
9971f65b4dSShuo Chen        private SocketAddress serverAddr;
10071f65b4dSShuo Chen        protected EchoService.Stub remoteService;
10171f65b4dSShuo Chen        private EchoRequest request;
10271f65b4dSShuo Chen        private String payload;
10371f65b4dSShuo Chen        private int nPipelines = 1;
10471f65b4dSShuo Chen        private int count = 0;
10571f65b4dSShuo Chen
10671f65b4dSShuo Chen        public AsyncClient(ChannelFactory channelFactory, SocketAddress server) {
10771f65b4dSShuo Chen            this.client = new RpcClient(channelFactory);
10871f65b4dSShuo Chen            this.serverAddr = server;
10971f65b4dSShuo Chen
11071f65b4dSShuo Chen            payload = new String(new byte[100]);
11171f65b4dSShuo Chen            // payload = "Hello";
11271f65b4dSShuo Chen            request = EchoRequest.newBuilder().setPayload(payload).build();
11371f65b4dSShuo Chen        }
11471f65b4dSShuo Chen
11571f65b4dSShuo Chen        public void connect() {
11671f65b4dSShuo Chen            client.startConnect(serverAddr, new NewChannelCallback() {
11771f65b4dSShuo Chen                @Override
11871f65b4dSShuo Chen                public void run(RpcChannel channel) {
11971f65b4dSShuo Chen                    remoteService = EchoService.newStub(channel);
12071f65b4dSShuo Chen                    allConnected.countDown();
12171f65b4dSShuo Chen                }
12271f65b4dSShuo Chen            });
12371f65b4dSShuo Chen        }
12471f65b4dSShuo Chen
12571f65b4dSShuo Chen        public void start() {
12671f65b4dSShuo Chen            for (int i = 0; i < nPipelines; ++i) {
12771f65b4dSShuo Chen                sendAsyncRequest();
12871f65b4dSShuo Chen            }
12971f65b4dSShuo Chen        }
13071f65b4dSShuo Chen
13171f65b4dSShuo Chen        private void sendAsyncRequest() {
13271f65b4dSShuo Chen            RpcCallback<EchoProto.EchoResponse> done = new RpcCallback<EchoProto.EchoResponse>() {
13371f65b4dSShuo Chen                @Override
13471f65b4dSShuo Chen                public void run(EchoResponse response) {
13571f65b4dSShuo Chen                    assert response.getPayload().equals(payload);
13671f65b4dSShuo Chen                    ++count;
13771f65b4dSShuo Chen                    if (count < kRequests) {
13871f65b4dSShuo Chen                        sendAsyncRequest();
13971f65b4dSShuo Chen                    } else {
14071f65b4dSShuo Chen                        allFinished.countDown();
14171f65b4dSShuo Chen                    }
14271f65b4dSShuo Chen                }
14371f65b4dSShuo Chen            };
14471f65b4dSShuo Chen            remoteService.echo(null, request, done);
14571f65b4dSShuo Chen        }
14671f65b4dSShuo Chen    }
14771f65b4dSShuo Chen
14871f65b4dSShuo Chen    private static void runAsyncClients(SocketAddress server, int nClients, int nSelectors)
14971f65b4dSShuo Chen            throws Exception {
15071f65b4dSShuo Chen        ChannelFactory channelFactory = new NioClientSocketChannelFactory(
15171f65b4dSShuo Chen                Executors.newCachedThreadPool(),
15271f65b4dSShuo Chen                Executors.newCachedThreadPool(),
15371f65b4dSShuo Chen                nSelectors);
15471f65b4dSShuo Chen        allConnected = new CountDownLatch(nClients);
15571f65b4dSShuo Chen        allFinished = new CountDownLatch(nClients);
15671f65b4dSShuo Chen
15771f65b4dSShuo Chen        AsyncClient[] clients = new AsyncClient[nClients];
15871f65b4dSShuo Chen        for (int i = 0; i < nClients; ++i) {
15971f65b4dSShuo Chen            clients[i] = new AsyncClient(channelFactory, server);
16071f65b4dSShuo Chen            clients[i].connect();
16171f65b4dSShuo Chen        }
16271f65b4dSShuo Chen        allConnected.await();
16371f65b4dSShuo Chen        long start = System.currentTimeMillis();
16471f65b4dSShuo Chen        for (AsyncClient client : clients) {
16571f65b4dSShuo Chen            client.start();
16671f65b4dSShuo Chen        }
16771f65b4dSShuo Chen        allFinished.await();
16871f65b4dSShuo Chen        long end = System.currentTimeMillis();
16971f65b4dSShuo Chen        System.err.println(end - start);
17071f65b4dSShuo Chen        System.err.println(nClients * kRequests * 1000L / (end - start));
17171f65b4dSShuo Chen        System.exit(0);
17271f65b4dSShuo Chen    }
17371f65b4dSShuo Chen
17471f65b4dSShuo Chen    public static void main(String[] args) throws Exception {
17571f65b4dSShuo Chen        InetSocketAddress server = new InetSocketAddress(args[0], 8888);
17671f65b4dSShuo Chen        int nClients = 2;
17771f65b4dSShuo Chen        int nSelectors = 2;
17871f65b4dSShuo Chen
17971f65b4dSShuo Chen        // runSyncClients(server, nClients, nSelectors);
18071f65b4dSShuo Chen        runAsyncClients(server, nClients, nSelectors);
18171f65b4dSShuo Chen    }
182b5a588dfSShuo Chen}
183