1package echo;
2
3import java.net.InetSocketAddress;
4import java.net.SocketAddress;
5import java.util.concurrent.CountDownLatch;
6import java.util.concurrent.Executors;
7
8import muduo.rpc.NewChannelCallback;
9import muduo.rpc.RpcChannel;
10import muduo.rpc.RpcClient;
11
12import org.jboss.netty.channel.ChannelFactory;
13import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
14
15import com.google.protobuf.RpcCallback;
16import com.google.protobuf.ServiceException;
17
18import echo.EchoProto.EchoRequest;
19import echo.EchoProto.EchoResponse;
20import echo.EchoProto.EchoService;
21import echo.EchoProto.EchoService.BlockingInterface;
22
23public class EchoClient {
24    static final int kRequests = 50000;
25    static CountDownLatch allConnected, startLatch, allFinished;
26
27    public static class SyncClient implements Runnable {
28        private ChannelFactory channelFactory;
29        private SocketAddress serverAddr;
30
31        public SyncClient(ChannelFactory channelFactory, SocketAddress server) {
32            this.channelFactory = channelFactory;
33            this.serverAddr = server;
34        }
35
36        @Override
37        public void run() {
38            System.out.println(Thread.currentThread());
39            RpcClient client = new RpcClient(channelFactory);
40            RpcChannel channel = client.blockingConnect(serverAddr);
41            BlockingInterface remoteService = EchoService.newBlockingStub(channel);
42            String payload = new String(new byte[100]);
43            payload = "Hello";
44            EchoRequest request = EchoRequest.newBuilder().setPayload(payload).build();
45
46            allConnected.countDown();
47            try {
48                startLatch.await();
49            } catch (InterruptedException e) {
50                // TODO Auto-generated catch block
51                e.printStackTrace();
52            }
53            for (int i = 0; i < kRequests; ++i) {
54                EchoResponse response;
55                try {
56                    response = remoteService.echo(null, request);
57                    assert response.getPayload().equals(payload);
58                } catch (ServiceException e) {
59                    // TODO Auto-generated catch block
60                    e.printStackTrace();
61                }
62                // System.out.println(response);
63            }
64            allFinished.countDown();
65            System.out.println(Thread.currentThread());
66            // System.out.println(response);
67            channel.disconnect();
68            // client.stop();
69        }
70    }
71
72    @SuppressWarnings("unused")
73    private static void runSyncClients(InetSocketAddress server, int nClients, int nSelectors)
74            throws InterruptedException {
75        ChannelFactory channelFactory = new NioClientSocketChannelFactory(
76                Executors.newCachedThreadPool(),
77                Executors.newCachedThreadPool(),
78                nSelectors);
79        allConnected = new CountDownLatch(nClients);
80        startLatch = new CountDownLatch(1);
81        allFinished = new CountDownLatch(nClients);
82        Thread[] threads = new Thread[nClients];
83        for (int i = 0; i < nClients; ++i) {
84            threads[i] = new Thread(new SyncClient(channelFactory, server));
85            threads[i].start();
86        }
87        allConnected.await();
88        long start = System.currentTimeMillis();
89        startLatch.countDown();
90        allFinished.await();
91        long end = System.currentTimeMillis();
92        System.err.println(end - start);
93        System.err.println(nClients * kRequests * 1000L / (end - start));
94    }
95
96    public static class AsyncClient {
97
98        private RpcClient client;
99        private SocketAddress serverAddr;
100        protected EchoService.Stub remoteService;
101        private EchoRequest request;
102        private String payload;
103        private int nPipelines = 1;
104        private int count = 0;
105
106        public AsyncClient(ChannelFactory channelFactory, SocketAddress server) {
107            this.client = new RpcClient(channelFactory);
108            this.serverAddr = server;
109
110            payload = new String(new byte[100]);
111            // payload = "Hello";
112            request = EchoRequest.newBuilder().setPayload(payload).build();
113        }
114
115        public void connect() {
116            client.startConnect(serverAddr, new NewChannelCallback() {
117                @Override
118                public void run(RpcChannel channel) {
119                    remoteService = EchoService.newStub(channel);
120                    allConnected.countDown();
121                }
122            });
123        }
124
125        public void start() {
126            for (int i = 0; i < nPipelines; ++i) {
127                sendAsyncRequest();
128            }
129        }
130
131        private void sendAsyncRequest() {
132            RpcCallback<EchoProto.EchoResponse> done = new RpcCallback<EchoProto.EchoResponse>() {
133                @Override
134                public void run(EchoResponse response) {
135                    assert response.getPayload().equals(payload);
136                    ++count;
137                    if (count < kRequests) {
138                        sendAsyncRequest();
139                    } else {
140                        allFinished.countDown();
141                    }
142                }
143            };
144            remoteService.echo(null, request, done);
145        }
146    }
147
148    private static void runAsyncClients(SocketAddress server, int nClients, int nSelectors)
149            throws Exception {
150        ChannelFactory channelFactory = new NioClientSocketChannelFactory(
151                Executors.newCachedThreadPool(),
152                Executors.newCachedThreadPool(),
153                nSelectors);
154        allConnected = new CountDownLatch(nClients);
155        allFinished = new CountDownLatch(nClients);
156
157        AsyncClient[] clients = new AsyncClient[nClients];
158        for (int i = 0; i < nClients; ++i) {
159            clients[i] = new AsyncClient(channelFactory, server);
160            clients[i].connect();
161        }
162        allConnected.await();
163        long start = System.currentTimeMillis();
164        for (AsyncClient client : clients) {
165            client.start();
166        }
167        allFinished.await();
168        long end = System.currentTimeMillis();
169        System.err.println(end - start);
170        System.err.println(nClients * kRequests * 1000L / (end - start));
171        System.exit(0);
172    }
173
174    public static void main(String[] args) throws Exception {
175        InetSocketAddress server = new InetSocketAddress(args[0], 8888);
176        int nClients = 2;
177        int nSelectors = 2;
178
179        // runSyncClients(server, nClients, nSelectors);
180        runAsyncClients(server, nClients, nSelectors);
181    }
182}
183