1package echo; 2 3import java.net.InetSocketAddress; 4import java.net.SocketAddress; 5import java.util.concurrent.CountDownLatch; 6import java.util.concurrent.Executors; 7 8import muduo.rpc.NewChannelCallback; 9import muduo.rpc.RpcChannel; 10import muduo.rpc.RpcClient; 11 12import org.jboss.netty.channel.ChannelFactory; 13import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; 14 15import com.google.protobuf.RpcCallback; 16import com.google.protobuf.ServiceException; 17 18import echo.EchoProto.EchoRequest; 19import echo.EchoProto.EchoResponse; 20import echo.EchoProto.EchoService; 21import echo.EchoProto.EchoService.BlockingInterface; 22 23public class EchoClient { 24 static final int kRequests = 50000; 25 static CountDownLatch allConnected, startLatch, allFinished; 26 27 public static class SyncClient implements Runnable { 28 private ChannelFactory channelFactory; 29 private SocketAddress serverAddr; 30 31 public SyncClient(ChannelFactory channelFactory, SocketAddress server) { 32 this.channelFactory = channelFactory; 33 this.serverAddr = server; 34 } 35 36 @Override 37 public void run() { 38 System.out.println(Thread.currentThread()); 39 RpcClient client = new RpcClient(channelFactory); 40 RpcChannel channel = client.blockingConnect(serverAddr); 41 BlockingInterface remoteService = EchoService.newBlockingStub(channel); 42 String payload = new String(new byte[100]); 43 payload = "Hello"; 44 EchoRequest request = EchoRequest.newBuilder().setPayload(payload).build(); 45 46 allConnected.countDown(); 47 try { 48 startLatch.await(); 49 } catch (InterruptedException e) { 50 // TODO Auto-generated catch block 51 e.printStackTrace(); 52 } 53 for (int i = 0; i < kRequests; ++i) { 54 EchoResponse response; 55 try { 56 response = remoteService.echo(null, request); 57 assert response.getPayload().equals(payload); 58 } catch (ServiceException e) { 59 // TODO Auto-generated catch block 60 e.printStackTrace(); 61 } 62 // System.out.println(response); 63 } 64 allFinished.countDown(); 65 System.out.println(Thread.currentThread()); 66 // System.out.println(response); 67 channel.disconnect(); 68 // client.stop(); 69 } 70 } 71 72 @SuppressWarnings("unused") 73 private static void runSyncClients(InetSocketAddress server, int nClients, int nSelectors) 74 throws InterruptedException { 75 ChannelFactory channelFactory = new NioClientSocketChannelFactory( 76 Executors.newCachedThreadPool(), 77 Executors.newCachedThreadPool(), 78 nSelectors); 79 allConnected = new CountDownLatch(nClients); 80 startLatch = new CountDownLatch(1); 81 allFinished = new CountDownLatch(nClients); 82 Thread[] threads = new Thread[nClients]; 83 for (int i = 0; i < nClients; ++i) { 84 threads[i] = new Thread(new SyncClient(channelFactory, server)); 85 threads[i].start(); 86 } 87 allConnected.await(); 88 long start = System.currentTimeMillis(); 89 startLatch.countDown(); 90 allFinished.await(); 91 long end = System.currentTimeMillis(); 92 System.err.println(end - start); 93 System.err.println(nClients * kRequests * 1000L / (end - start)); 94 } 95 96 public static class AsyncClient { 97 98 private RpcClient client; 99 private SocketAddress serverAddr; 100 protected EchoService.Stub remoteService; 101 private EchoRequest request; 102 private String payload; 103 private int nPipelines = 1; 104 private int count = 0; 105 106 public AsyncClient(ChannelFactory channelFactory, SocketAddress server) { 107 this.client = new RpcClient(channelFactory); 108 this.serverAddr = server; 109 110 payload = new String(new byte[100]); 111 // payload = "Hello"; 112 request = EchoRequest.newBuilder().setPayload(payload).build(); 113 } 114 115 public void connect() { 116 client.startConnect(serverAddr, new NewChannelCallback() { 117 @Override 118 public void run(RpcChannel channel) { 119 remoteService = EchoService.newStub(channel); 120 allConnected.countDown(); 121 } 122 }); 123 } 124 125 public void start() { 126 for (int i = 0; i < nPipelines; ++i) { 127 sendAsyncRequest(); 128 } 129 } 130 131 private void sendAsyncRequest() { 132 RpcCallback<EchoProto.EchoResponse> done = new RpcCallback<EchoProto.EchoResponse>() { 133 @Override 134 public void run(EchoResponse response) { 135 assert response.getPayload().equals(payload); 136 ++count; 137 if (count < kRequests) { 138 sendAsyncRequest(); 139 } else { 140 allFinished.countDown(); 141 } 142 } 143 }; 144 remoteService.echo(null, request, done); 145 } 146 } 147 148 private static void runAsyncClients(SocketAddress server, int nClients, int nSelectors) 149 throws Exception { 150 ChannelFactory channelFactory = new NioClientSocketChannelFactory( 151 Executors.newCachedThreadPool(), 152 Executors.newCachedThreadPool(), 153 nSelectors); 154 allConnected = new CountDownLatch(nClients); 155 allFinished = new CountDownLatch(nClients); 156 157 AsyncClient[] clients = new AsyncClient[nClients]; 158 for (int i = 0; i < nClients; ++i) { 159 clients[i] = new AsyncClient(channelFactory, server); 160 clients[i].connect(); 161 } 162 allConnected.await(); 163 long start = System.currentTimeMillis(); 164 for (AsyncClient client : clients) { 165 client.start(); 166 } 167 allFinished.await(); 168 long end = System.currentTimeMillis(); 169 System.err.println(end - start); 170 System.err.println(nClients * kRequests * 1000L / (end - start)); 171 System.exit(0); 172 } 173 174 public static void main(String[] args) throws Exception { 175 InetSocketAddress server = new InetSocketAddress(args[0], 8888); 176 int nClients = 2; 177 int nSelectors = 2; 178 179 // runSyncClients(server, nClients, nSelectors); 180 runAsyncClients(server, nClients, nSelectors); 181 } 182} 183