RpcChannel.java revision 420c9859
1420c9859SShuo Chenpackage muduo.rpc;
2420c9859SShuo Chen
3420c9859SShuo Chenimport java.util.Map;
4420c9859SShuo Chenimport java.util.concurrent.ConcurrentHashMap;
5420c9859SShuo Chenimport java.util.concurrent.atomic.AtomicLong;
6420c9859SShuo Chen
7420c9859SShuo Chenimport muduo.rpc.proto.RpcProto.ErrorCode;
8420c9859SShuo Chenimport muduo.rpc.proto.RpcProto.MessageType;
9420c9859SShuo Chenimport muduo.rpc.proto.RpcProto.RpcMessage;
10420c9859SShuo Chenimport muduo.rpc.proto.RpcProto.RpcMessage.Builder;
11420c9859SShuo Chen
12420c9859SShuo Chenimport org.jboss.netty.channel.Channel;
13420c9859SShuo Chenimport org.jboss.netty.channel.ChannelHandlerContext;
14420c9859SShuo Chenimport org.jboss.netty.channel.MessageEvent;
15420c9859SShuo Chen
16420c9859SShuo Chenimport com.google.protobuf.ByteString;
17420c9859SShuo Chenimport com.google.protobuf.Descriptors.MethodDescriptor;
18420c9859SShuo Chenimport com.google.protobuf.Message;
19420c9859SShuo Chenimport com.google.protobuf.RpcCallback;
20420c9859SShuo Chenimport com.google.protobuf.RpcController;
21420c9859SShuo Chenimport com.google.protobuf.Service;
22420c9859SShuo Chen
23420c9859SShuo Chenpublic class RpcChannel implements com.google.protobuf.RpcChannel {
24420c9859SShuo Chen
25420c9859SShuo Chen    private class Outstanding {
26420c9859SShuo Chen
27420c9859SShuo Chen        public Outstanding(Message responsePrototype, RpcCallback<Message> done) {
28420c9859SShuo Chen            this.responsePrototype = responsePrototype;
29420c9859SShuo Chen            this.done = done;
30420c9859SShuo Chen        }
31420c9859SShuo Chen
32420c9859SShuo Chen        public Message responsePrototype;
33420c9859SShuo Chen        public RpcCallback<Message> done;
34420c9859SShuo Chen    }
35420c9859SShuo Chen
36420c9859SShuo Chen    private Channel channel;
37420c9859SShuo Chen    private AtomicLong id = new AtomicLong(1);
38420c9859SShuo Chen    private Map<Long, Outstanding> outstandings = new ConcurrentHashMap<Long, Outstanding>();
39420c9859SShuo Chen    private Map<String, Service> services;
40420c9859SShuo Chen
41420c9859SShuo Chen    public RpcChannel(Channel channel) {
42420c9859SShuo Chen        this.channel = channel;
43420c9859SShuo Chen    }
44420c9859SShuo Chen
45420c9859SShuo Chen    public void setServiceMap(Map<String, Service> services) {
46420c9859SShuo Chen        this.services = services;
47420c9859SShuo Chen    }
48420c9859SShuo Chen
49420c9859SShuo Chen    public Channel getChannel() {
50420c9859SShuo Chen        return channel;
51420c9859SShuo Chen    }
52420c9859SShuo Chen
53420c9859SShuo Chen    public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
54420c9859SShuo Chen        RpcMessage message = (RpcMessage) e.getMessage();
55420c9859SShuo Chen        assert e.getChannel() == channel;
56420c9859SShuo Chen        System.out.println(message);
57420c9859SShuo Chen        if (message.getType() == MessageType.REQUEST) {
58420c9859SShuo Chen            doRequest(message);
59420c9859SShuo Chen        } else if (message.getType() == MessageType.RESPONSE) {
60420c9859SShuo Chen            Outstanding o = outstandings.get(message.getId());
61420c9859SShuo Chen            if (o != null) {
62420c9859SShuo Chen                Message resp = fromByteString(o.responsePrototype, message.getResponse());
63420c9859SShuo Chen                o.done.run(resp);
64420c9859SShuo Chen            }
65420c9859SShuo Chen        }
66420c9859SShuo Chen    }
67420c9859SShuo Chen
68420c9859SShuo Chen    private void doRequest(RpcMessage message) {
69420c9859SShuo Chen        Service service = services.get(message.getService());
70420c9859SShuo Chen        Builder errorBuilder = RpcMessage.newBuilder().setType(MessageType.ERROR);
71420c9859SShuo Chen        boolean succeed = false;
72420c9859SShuo Chen        if (service != null) {
73420c9859SShuo Chen            MethodDescriptor method = service.getDescriptorForType()
74420c9859SShuo Chen                    .findMethodByName(message.getMethod());
75420c9859SShuo Chen            if (method != null) {
76420c9859SShuo Chen                Message request = fromByteString(service.getRequestPrototype(method),
77420c9859SShuo Chen                        message.getRequest());
78420c9859SShuo Chen                if (request != null) {
79420c9859SShuo Chen                    final long id = message.getId();
80420c9859SShuo Chen                    RpcCallback<Message> done = new RpcCallback<Message>() {
81420c9859SShuo Chen                        @Override
82420c9859SShuo Chen                        public void run(Message response) {
83420c9859SShuo Chen                            done(response, id);
84420c9859SShuo Chen                        }
85420c9859SShuo Chen                    };
86420c9859SShuo Chen                    succeed = doCall(request, service, method, done);
87420c9859SShuo Chen                } else {
88420c9859SShuo Chen                    errorBuilder.setError(ErrorCode.INVALID_REQUEST);
89420c9859SShuo Chen                }
90420c9859SShuo Chen            } else {
91420c9859SShuo Chen                errorBuilder.setError(ErrorCode.NO_METHOD);
92420c9859SShuo Chen            }
93420c9859SShuo Chen        } else {
94420c9859SShuo Chen            errorBuilder.setError(ErrorCode.NO_SERVICE);
95420c9859SShuo Chen        }
96420c9859SShuo Chen        if (!succeed) {
97420c9859SShuo Chen            RpcMessage resp = errorBuilder.build();
98420c9859SShuo Chen            channel.write(resp);
99420c9859SShuo Chen        }
100420c9859SShuo Chen    }
101420c9859SShuo Chen
102420c9859SShuo Chen    private Message fromByteString(Message prototype, ByteString bytes) {
103420c9859SShuo Chen        Message message = null;
104420c9859SShuo Chen        try {
105420c9859SShuo Chen            message = prototype.toBuilder().mergeFrom(bytes).build();
106420c9859SShuo Chen        } catch (Exception e) {
107420c9859SShuo Chen        }
108420c9859SShuo Chen        return message;
109420c9859SShuo Chen    }
110420c9859SShuo Chen
111420c9859SShuo Chen    private boolean doCall(Message request, Service service, MethodDescriptor method,
112420c9859SShuo Chen            RpcCallback<Message> done) {
113420c9859SShuo Chen        service.callMethod(method, null, request, done);
114420c9859SShuo Chen        return true;
115420c9859SShuo Chen    }
116420c9859SShuo Chen
117420c9859SShuo Chen    protected void done(Message response, long id) {
118420c9859SShuo Chen        if (response != null) {
119420c9859SShuo Chen            RpcMessage resp = RpcMessage.newBuilder()
120420c9859SShuo Chen                    .setType(MessageType.RESPONSE)
121420c9859SShuo Chen                    .setId(id)
122420c9859SShuo Chen                    .setResponse(response.toByteString())
123420c9859SShuo Chen                    .build();
124420c9859SShuo Chen            channel.write(resp);
125420c9859SShuo Chen        } else {
126420c9859SShuo Chen            RpcMessage resp = RpcMessage.newBuilder()
127420c9859SShuo Chen                    .setType(MessageType.ERROR)
128420c9859SShuo Chen                    .setId(id)
129420c9859SShuo Chen                    .setError(ErrorCode.INVALID_RESPONSE)
130420c9859SShuo Chen                    .build();
131420c9859SShuo Chen            channel.write(resp);
132420c9859SShuo Chen        }
133420c9859SShuo Chen    }
134420c9859SShuo Chen
135420c9859SShuo Chen    @Override
136420c9859SShuo Chen    public void callMethod(MethodDescriptor method, RpcController controller, Message request,
137420c9859SShuo Chen            Message responsePrototype, RpcCallback<Message> done) {
138420c9859SShuo Chen        long callId = id.getAndIncrement();
139420c9859SShuo Chen        RpcMessage message = RpcMessage.newBuilder()
140420c9859SShuo Chen                .setType(MessageType.REQUEST)
141420c9859SShuo Chen                .setId(callId)
142420c9859SShuo Chen                .setService(method.getService().getFullName())
143420c9859SShuo Chen                .setMethod(method.getName())
144420c9859SShuo Chen                .setRequest(request.toByteString())
145420c9859SShuo Chen                .build();
146420c9859SShuo Chen        outstandings.put(callId, new Outstanding(responsePrototype, done));
147420c9859SShuo Chen        channel.write(message);
148420c9859SShuo Chen    }
149420c9859SShuo Chen
150420c9859SShuo Chen}
151