EchoClient.java revision 71f65b4d
1b5a588dfSShuo Chenpackage echo; 2b5a588dfSShuo Chen 3b5a588dfSShuo Chenimport java.net.InetSocketAddress; 471f65b4dSShuo Chenimport java.net.SocketAddress; 5c5b4b28fSShuo Chenimport java.util.concurrent.CountDownLatch; 6c5b4b28fSShuo Chenimport java.util.concurrent.Executors; 7c5b4b28fSShuo Chen 871f65b4dSShuo Chenimport muduo.rpc.NewChannelCallback; 971f65b4dSShuo Chenimport muduo.rpc.RpcChannel; 1071f65b4dSShuo Chenimport muduo.rpc.RpcClient; 1171f65b4dSShuo Chen 12c5b4b28fSShuo Chenimport org.jboss.netty.channel.ChannelFactory; 13c5b4b28fSShuo Chenimport org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; 14c5b4b28fSShuo Chen 1571f65b4dSShuo Chenimport com.google.protobuf.RpcCallback; 16c5b4b28fSShuo Chenimport com.google.protobuf.ServiceException; 17b5a588dfSShuo Chen 18b5a588dfSShuo Chenimport echo.EchoProto.EchoRequest; 19b5a588dfSShuo Chenimport echo.EchoProto.EchoResponse; 20b5a588dfSShuo Chenimport echo.EchoProto.EchoService; 21b5a588dfSShuo Chenimport echo.EchoProto.EchoService.BlockingInterface; 22b5a588dfSShuo Chen 23b5a588dfSShuo Chenpublic class EchoClient { 2471f65b4dSShuo Chen static final int kRequests = 50000; 2571f65b4dSShuo Chen static CountDownLatch allConnected, startLatch, allFinished; 26c5b4b28fSShuo Chen 2771f65b4dSShuo Chen public static class SyncClient implements Runnable { 28c5b4b28fSShuo Chen private ChannelFactory channelFactory; 2971f65b4dSShuo Chen private SocketAddress serverAddr; 30c5b4b28fSShuo Chen 3171f65b4dSShuo Chen public SyncClient(ChannelFactory channelFactory, SocketAddress server) { 32c5b4b28fSShuo Chen this.channelFactory = channelFactory; 33c5b4b28fSShuo Chen this.serverAddr = server; 34c5b4b28fSShuo Chen } 35c5b4b28fSShuo Chen 36c5b4b28fSShuo Chen @Override 37c5b4b28fSShuo Chen public void run() { 38c5b4b28fSShuo Chen System.out.println(Thread.currentThread()); 39c5b4b28fSShuo Chen RpcClient client = new RpcClient(channelFactory); 40c5b4b28fSShuo Chen RpcChannel channel = client.blockingConnect(serverAddr); 41c5b4b28fSShuo Chen BlockingInterface remoteService = EchoService.newBlockingStub(channel); 42c5b4b28fSShuo Chen String payload = new String(new byte[100]); 43c5b4b28fSShuo Chen payload = "Hello"; 44c5b4b28fSShuo Chen EchoRequest request = EchoRequest.newBuilder().setPayload(payload).build(); 45c5b4b28fSShuo Chen 4671f65b4dSShuo Chen allConnected.countDown(); 4771f65b4dSShuo Chen try { 4871f65b4dSShuo Chen startLatch.await(); 4971f65b4dSShuo Chen } catch (InterruptedException e) { 5071f65b4dSShuo Chen // TODO Auto-generated catch block 5171f65b4dSShuo Chen e.printStackTrace(); 5271f65b4dSShuo Chen } 53c5b4b28fSShuo Chen for (int i = 0; i < kRequests; ++i) { 54c5b4b28fSShuo Chen EchoResponse response; 55c5b4b28fSShuo Chen try { 56c5b4b28fSShuo Chen response = remoteService.echo(null, request); 57c5b4b28fSShuo Chen assert response.getPayload().equals(payload); 58c5b4b28fSShuo Chen } catch (ServiceException e) { 59c5b4b28fSShuo Chen // TODO Auto-generated catch block 60c5b4b28fSShuo Chen e.printStackTrace(); 61c5b4b28fSShuo Chen } 62c5b4b28fSShuo Chen // System.out.println(response); 63c5b4b28fSShuo Chen } 6471f65b4dSShuo Chen allFinished.countDown(); 65c5b4b28fSShuo Chen System.out.println(Thread.currentThread()); 66c5b4b28fSShuo Chen // System.out.println(response); 67c5b4b28fSShuo Chen channel.disconnect(); 68c5b4b28fSShuo Chen // client.stop(); 69c5b4b28fSShuo Chen } 70c5b4b28fSShuo Chen } 71b5a588dfSShuo Chen 7271f65b4dSShuo Chen @SuppressWarnings("unused") 7371f65b4dSShuo Chen private static void runSyncClients(InetSocketAddress server, int nClients, int nSelectors) 7471f65b4dSShuo Chen throws InterruptedException { 75c5b4b28fSShuo Chen ChannelFactory channelFactory = new NioClientSocketChannelFactory( 76c5b4b28fSShuo Chen Executors.newCachedThreadPool(), 7771f65b4dSShuo Chen Executors.newCachedThreadPool(), 7871f65b4dSShuo Chen nSelectors); 7971f65b4dSShuo Chen allConnected = new CountDownLatch(nClients); 8071f65b4dSShuo Chen startLatch = new CountDownLatch(1); 8171f65b4dSShuo Chen allFinished = new CountDownLatch(nClients); 8271f65b4dSShuo Chen Thread[] threads = new Thread[nClients]; 8371f65b4dSShuo Chen for (int i = 0; i < nClients; ++i) { 8471f65b4dSShuo Chen threads[i] = new Thread(new SyncClient(channelFactory, server)); 85c5b4b28fSShuo Chen threads[i].start(); 86b5a588dfSShuo Chen } 8771f65b4dSShuo Chen allConnected.await(); 8871f65b4dSShuo Chen long start = System.currentTimeMillis(); 8971f65b4dSShuo Chen startLatch.countDown(); 9071f65b4dSShuo Chen allFinished.await(); 91b5a588dfSShuo Chen long end = System.currentTimeMillis(); 92c5b4b28fSShuo Chen System.err.println(end - start); 9371f65b4dSShuo Chen System.err.println(nClients * kRequests * 1000L / (end - start)); 94b5a588dfSShuo Chen } 95b5a588dfSShuo Chen 9671f65b4dSShuo Chen public static class AsyncClient { 9771f65b4dSShuo Chen 9871f65b4dSShuo Chen private RpcClient client; 9971f65b4dSShuo Chen private SocketAddress serverAddr; 10071f65b4dSShuo Chen protected EchoService.Stub remoteService; 10171f65b4dSShuo Chen private EchoRequest request; 10271f65b4dSShuo Chen private String payload; 10371f65b4dSShuo Chen private int nPipelines = 1; 10471f65b4dSShuo Chen private int count = 0; 10571f65b4dSShuo Chen 10671f65b4dSShuo Chen public AsyncClient(ChannelFactory channelFactory, SocketAddress server) { 10771f65b4dSShuo Chen this.client = new RpcClient(channelFactory); 10871f65b4dSShuo Chen this.serverAddr = server; 10971f65b4dSShuo Chen 11071f65b4dSShuo Chen payload = new String(new byte[100]); 11171f65b4dSShuo Chen // payload = "Hello"; 11271f65b4dSShuo Chen request = EchoRequest.newBuilder().setPayload(payload).build(); 11371f65b4dSShuo Chen } 11471f65b4dSShuo Chen 11571f65b4dSShuo Chen public void connect() { 11671f65b4dSShuo Chen client.startConnect(serverAddr, new NewChannelCallback() { 11771f65b4dSShuo Chen @Override 11871f65b4dSShuo Chen public void run(RpcChannel channel) { 11971f65b4dSShuo Chen remoteService = EchoService.newStub(channel); 12071f65b4dSShuo Chen allConnected.countDown(); 12171f65b4dSShuo Chen } 12271f65b4dSShuo Chen }); 12371f65b4dSShuo Chen } 12471f65b4dSShuo Chen 12571f65b4dSShuo Chen public void start() { 12671f65b4dSShuo Chen for (int i = 0; i < nPipelines; ++i) { 12771f65b4dSShuo Chen sendAsyncRequest(); 12871f65b4dSShuo Chen } 12971f65b4dSShuo Chen } 13071f65b4dSShuo Chen 13171f65b4dSShuo Chen private void sendAsyncRequest() { 13271f65b4dSShuo Chen RpcCallback<EchoProto.EchoResponse> done = new RpcCallback<EchoProto.EchoResponse>() { 13371f65b4dSShuo Chen @Override 13471f65b4dSShuo Chen public void run(EchoResponse response) { 13571f65b4dSShuo Chen assert response.getPayload().equals(payload); 13671f65b4dSShuo Chen ++count; 13771f65b4dSShuo Chen if (count < kRequests) { 13871f65b4dSShuo Chen sendAsyncRequest(); 13971f65b4dSShuo Chen } else { 14071f65b4dSShuo Chen allFinished.countDown(); 14171f65b4dSShuo Chen } 14271f65b4dSShuo Chen } 14371f65b4dSShuo Chen }; 14471f65b4dSShuo Chen remoteService.echo(null, request, done); 14571f65b4dSShuo Chen } 14671f65b4dSShuo Chen } 14771f65b4dSShuo Chen 14871f65b4dSShuo Chen private static void runAsyncClients(SocketAddress server, int nClients, int nSelectors) 14971f65b4dSShuo Chen throws Exception { 15071f65b4dSShuo Chen ChannelFactory channelFactory = new NioClientSocketChannelFactory( 15171f65b4dSShuo Chen Executors.newCachedThreadPool(), 15271f65b4dSShuo Chen Executors.newCachedThreadPool(), 15371f65b4dSShuo Chen nSelectors); 15471f65b4dSShuo Chen allConnected = new CountDownLatch(nClients); 15571f65b4dSShuo Chen allFinished = new CountDownLatch(nClients); 15671f65b4dSShuo Chen 15771f65b4dSShuo Chen AsyncClient[] clients = new AsyncClient[nClients]; 15871f65b4dSShuo Chen for (int i = 0; i < nClients; ++i) { 15971f65b4dSShuo Chen clients[i] = new AsyncClient(channelFactory, server); 16071f65b4dSShuo Chen clients[i].connect(); 16171f65b4dSShuo Chen } 16271f65b4dSShuo Chen allConnected.await(); 16371f65b4dSShuo Chen long start = System.currentTimeMillis(); 16471f65b4dSShuo Chen for (AsyncClient client : clients) { 16571f65b4dSShuo Chen client.start(); 16671f65b4dSShuo Chen } 16771f65b4dSShuo Chen allFinished.await(); 16871f65b4dSShuo Chen long end = System.currentTimeMillis(); 16971f65b4dSShuo Chen System.err.println(end - start); 17071f65b4dSShuo Chen System.err.println(nClients * kRequests * 1000L / (end - start)); 17171f65b4dSShuo Chen System.exit(0); 17271f65b4dSShuo Chen } 17371f65b4dSShuo Chen 17471f65b4dSShuo Chen public static void main(String[] args) throws Exception { 17571f65b4dSShuo Chen InetSocketAddress server = new InetSocketAddress(args[0], 8888); 17671f65b4dSShuo Chen int nClients = 2; 17771f65b4dSShuo Chen int nSelectors = 2; 17871f65b4dSShuo Chen 17971f65b4dSShuo Chen // runSyncClients(server, nClients, nSelectors); 18071f65b4dSShuo Chen runAsyncClients(server, nClients, nSelectors); 18171f65b4dSShuo Chen } 182b5a588dfSShuo Chen} 183