RpcChannel.java revision 420c9859
1420c9859SShuo Chenpackage muduo.rpc; 2420c9859SShuo Chen 3420c9859SShuo Chenimport java.util.Map; 4420c9859SShuo Chenimport java.util.concurrent.ConcurrentHashMap; 5420c9859SShuo Chenimport java.util.concurrent.atomic.AtomicLong; 6420c9859SShuo Chen 7420c9859SShuo Chenimport muduo.rpc.proto.RpcProto.ErrorCode; 8420c9859SShuo Chenimport muduo.rpc.proto.RpcProto.MessageType; 9420c9859SShuo Chenimport muduo.rpc.proto.RpcProto.RpcMessage; 10420c9859SShuo Chenimport muduo.rpc.proto.RpcProto.RpcMessage.Builder; 11420c9859SShuo Chen 12420c9859SShuo Chenimport org.jboss.netty.channel.Channel; 13420c9859SShuo Chenimport org.jboss.netty.channel.ChannelHandlerContext; 14420c9859SShuo Chenimport org.jboss.netty.channel.MessageEvent; 15420c9859SShuo Chen 16420c9859SShuo Chenimport com.google.protobuf.ByteString; 17420c9859SShuo Chenimport com.google.protobuf.Descriptors.MethodDescriptor; 18420c9859SShuo Chenimport com.google.protobuf.Message; 19420c9859SShuo Chenimport com.google.protobuf.RpcCallback; 20420c9859SShuo Chenimport com.google.protobuf.RpcController; 21420c9859SShuo Chenimport com.google.protobuf.Service; 22420c9859SShuo Chen 23420c9859SShuo Chenpublic class RpcChannel implements com.google.protobuf.RpcChannel { 24420c9859SShuo Chen 25420c9859SShuo Chen private class Outstanding { 26420c9859SShuo Chen 27420c9859SShuo Chen public Outstanding(Message responsePrototype, RpcCallback<Message> done) { 28420c9859SShuo Chen this.responsePrototype = responsePrototype; 29420c9859SShuo Chen this.done = done; 30420c9859SShuo Chen } 31420c9859SShuo Chen 32420c9859SShuo Chen public Message responsePrototype; 33420c9859SShuo Chen public RpcCallback<Message> done; 34420c9859SShuo Chen } 35420c9859SShuo Chen 36420c9859SShuo Chen private Channel channel; 37420c9859SShuo Chen private AtomicLong id = new AtomicLong(1); 38420c9859SShuo Chen private Map<Long, Outstanding> outstandings = new ConcurrentHashMap<Long, Outstanding>(); 39420c9859SShuo Chen private Map<String, Service> services; 40420c9859SShuo Chen 41420c9859SShuo Chen public RpcChannel(Channel channel) { 42420c9859SShuo Chen this.channel = channel; 43420c9859SShuo Chen } 44420c9859SShuo Chen 45420c9859SShuo Chen public void setServiceMap(Map<String, Service> services) { 46420c9859SShuo Chen this.services = services; 47420c9859SShuo Chen } 48420c9859SShuo Chen 49420c9859SShuo Chen public Channel getChannel() { 50420c9859SShuo Chen return channel; 51420c9859SShuo Chen } 52420c9859SShuo Chen 53420c9859SShuo Chen public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) { 54420c9859SShuo Chen RpcMessage message = (RpcMessage) e.getMessage(); 55420c9859SShuo Chen assert e.getChannel() == channel; 56420c9859SShuo Chen System.out.println(message); 57420c9859SShuo Chen if (message.getType() == MessageType.REQUEST) { 58420c9859SShuo Chen doRequest(message); 59420c9859SShuo Chen } else if (message.getType() == MessageType.RESPONSE) { 60420c9859SShuo Chen Outstanding o = outstandings.get(message.getId()); 61420c9859SShuo Chen if (o != null) { 62420c9859SShuo Chen Message resp = fromByteString(o.responsePrototype, message.getResponse()); 63420c9859SShuo Chen o.done.run(resp); 64420c9859SShuo Chen } 65420c9859SShuo Chen } 66420c9859SShuo Chen } 67420c9859SShuo Chen 68420c9859SShuo Chen private void doRequest(RpcMessage message) { 69420c9859SShuo Chen Service service = services.get(message.getService()); 70420c9859SShuo Chen Builder errorBuilder = RpcMessage.newBuilder().setType(MessageType.ERROR); 71420c9859SShuo Chen boolean succeed = false; 72420c9859SShuo Chen if (service != null) { 73420c9859SShuo Chen MethodDescriptor method = service.getDescriptorForType() 74420c9859SShuo Chen .findMethodByName(message.getMethod()); 75420c9859SShuo Chen if (method != null) { 76420c9859SShuo Chen Message request = fromByteString(service.getRequestPrototype(method), 77420c9859SShuo Chen message.getRequest()); 78420c9859SShuo Chen if (request != null) { 79420c9859SShuo Chen final long id = message.getId(); 80420c9859SShuo Chen RpcCallback<Message> done = new RpcCallback<Message>() { 81420c9859SShuo Chen @Override 82420c9859SShuo Chen public void run(Message response) { 83420c9859SShuo Chen done(response, id); 84420c9859SShuo Chen } 85420c9859SShuo Chen }; 86420c9859SShuo Chen succeed = doCall(request, service, method, done); 87420c9859SShuo Chen } else { 88420c9859SShuo Chen errorBuilder.setError(ErrorCode.INVALID_REQUEST); 89420c9859SShuo Chen } 90420c9859SShuo Chen } else { 91420c9859SShuo Chen errorBuilder.setError(ErrorCode.NO_METHOD); 92420c9859SShuo Chen } 93420c9859SShuo Chen } else { 94420c9859SShuo Chen errorBuilder.setError(ErrorCode.NO_SERVICE); 95420c9859SShuo Chen } 96420c9859SShuo Chen if (!succeed) { 97420c9859SShuo Chen RpcMessage resp = errorBuilder.build(); 98420c9859SShuo Chen channel.write(resp); 99420c9859SShuo Chen } 100420c9859SShuo Chen } 101420c9859SShuo Chen 102420c9859SShuo Chen private Message fromByteString(Message prototype, ByteString bytes) { 103420c9859SShuo Chen Message message = null; 104420c9859SShuo Chen try { 105420c9859SShuo Chen message = prototype.toBuilder().mergeFrom(bytes).build(); 106420c9859SShuo Chen } catch (Exception e) { 107420c9859SShuo Chen } 108420c9859SShuo Chen return message; 109420c9859SShuo Chen } 110420c9859SShuo Chen 111420c9859SShuo Chen private boolean doCall(Message request, Service service, MethodDescriptor method, 112420c9859SShuo Chen RpcCallback<Message> done) { 113420c9859SShuo Chen service.callMethod(method, null, request, done); 114420c9859SShuo Chen return true; 115420c9859SShuo Chen } 116420c9859SShuo Chen 117420c9859SShuo Chen protected void done(Message response, long id) { 118420c9859SShuo Chen if (response != null) { 119420c9859SShuo Chen RpcMessage resp = RpcMessage.newBuilder() 120420c9859SShuo Chen .setType(MessageType.RESPONSE) 121420c9859SShuo Chen .setId(id) 122420c9859SShuo Chen .setResponse(response.toByteString()) 123420c9859SShuo Chen .build(); 124420c9859SShuo Chen channel.write(resp); 125420c9859SShuo Chen } else { 126420c9859SShuo Chen RpcMessage resp = RpcMessage.newBuilder() 127420c9859SShuo Chen .setType(MessageType.ERROR) 128420c9859SShuo Chen .setId(id) 129420c9859SShuo Chen .setError(ErrorCode.INVALID_RESPONSE) 130420c9859SShuo Chen .build(); 131420c9859SShuo Chen channel.write(resp); 132420c9859SShuo Chen } 133420c9859SShuo Chen } 134420c9859SShuo Chen 135420c9859SShuo Chen @Override 136420c9859SShuo Chen public void callMethod(MethodDescriptor method, RpcController controller, Message request, 137420c9859SShuo Chen Message responsePrototype, RpcCallback<Message> done) { 138420c9859SShuo Chen long callId = id.getAndIncrement(); 139420c9859SShuo Chen RpcMessage message = RpcMessage.newBuilder() 140420c9859SShuo Chen .setType(MessageType.REQUEST) 141420c9859SShuo Chen .setId(callId) 142420c9859SShuo Chen .setService(method.getService().getFullName()) 143420c9859SShuo Chen .setMethod(method.getName()) 144420c9859SShuo Chen .setRequest(request.toByteString()) 145420c9859SShuo Chen .build(); 146420c9859SShuo Chen outstandings.put(callId, new Outstanding(responsePrototype, done)); 147420c9859SShuo Chen channel.write(message); 148420c9859SShuo Chen } 149420c9859SShuo Chen 150420c9859SShuo Chen} 151