json-rpc.c++ (13030B)
1 // Copyright (c) 2018 Kenton Varda and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #include "json-rpc.h" 23 #include <kj/compat/http.h> 24 #include <capnp/compat/json-rpc.capnp.h> 25 26 namespace capnp { 27 28 static constexpr uint64_t JSON_NAME_ANNOTATION_ID = 0xfa5b1fd61c2e7c3dull; 29 static constexpr uint64_t JSON_NOTIFICATION_ANNOTATION_ID = 0xa0a054dea32fd98cull; 30 31 class JsonRpc::CapabilityImpl final: public DynamicCapability::Server { 32 public: 33 CapabilityImpl(JsonRpc& parent, InterfaceSchema schema) 34 : DynamicCapability::Server(schema), parent(parent) {} 35 36 kj::Promise<void> call(InterfaceSchema::Method method, 37 CallContext<DynamicStruct, DynamicStruct> context) override { 38 auto proto = method.getProto(); 39 bool isNotification = false; 40 kj::StringPtr name = proto.getName(); 41 for (auto annotation: proto.getAnnotations()) { 42 switch (annotation.getId()) { 43 case JSON_NAME_ANNOTATION_ID: 44 name = annotation.getValue().getText(); 45 break; 46 case JSON_NOTIFICATION_ANNOTATION_ID: 47 isNotification = true; 48 break; 49 } 50 } 51 52 capnp::MallocMessageBuilder message; 53 auto value = message.getRoot<json::Value>(); 54 auto list = value.initObject(3 + !isNotification); 55 56 uint index = 0; 57 58 auto jsonrpc = list[index++]; 59 jsonrpc.setName("jsonrpc"); 60 jsonrpc.initValue().setString("2.0"); 61 62 uint callId = parent.callCount++; 63 64 if (!isNotification) { 65 auto id = list[index++]; 66 id.setName("id"); 67 id.initValue().setNumber(callId); 68 } 69 70 auto methodName = list[index++]; 71 methodName.setName("method"); 72 methodName.initValue().setString(name); 73 74 auto params = list[index++]; 75 params.setName("params"); 76 parent.codec.encode(context.getParams(), params.initValue()); 77 78 auto writePromise = parent.queueWrite(parent.codec.encode(value)); 79 80 if (isNotification) { 81 auto sproto = context.getResultsType().getProto().getStruct(); 82 MessageSize size { sproto.getDataWordCount(), sproto.getPointerCount() }; 83 context.initResults(size); 84 return kj::mv(writePromise); 85 } else { 86 auto paf = kj::newPromiseAndFulfiller<void>(); 87 parent.awaitedResponses.insert(callId, AwaitedResponse { context, kj::mv(paf.fulfiller) }); 88 auto promise = writePromise.then([p = kj::mv(paf.promise)]() mutable { return kj::mv(p); }); 89 auto& parentRef = parent; 90 return promise.attach(kj::defer([&parentRef,callId]() { 91 parentRef.awaitedResponses.erase(callId); 92 })); 93 } 94 } 95 96 private: 97 JsonRpc& parent; 98 }; 99 100 JsonRpc::JsonRpc(Transport& transport, DynamicCapability::Client interface) 101 : JsonRpc(transport, kj::mv(interface), kj::newPromiseAndFulfiller<void>()) {} 102 JsonRpc::JsonRpc(Transport& transport, DynamicCapability::Client interfaceParam, 103 kj::PromiseFulfillerPair<void> paf) 104 : transport(transport), 105 interface(kj::mv(interfaceParam)), 106 errorPromise(paf.promise.fork()), 107 errorFulfiller(kj::mv(paf.fulfiller)), 108 readTask(readLoop().eagerlyEvaluate([this](kj::Exception&& e) { 109 errorFulfiller->reject(kj::mv(e)); 110 })), 111 tasks(*this) { 112 codec.handleByAnnotation(interface.getSchema()); 113 codec.handleByAnnotation<json::RpcMessage>(); 114 115 for (auto method: interface.getSchema().getMethods()) { 116 auto proto = method.getProto(); 117 kj::StringPtr name = proto.getName(); 118 for (auto annotation: proto.getAnnotations()) { 119 switch (annotation.getId()) { 120 case JSON_NAME_ANNOTATION_ID: 121 name = annotation.getValue().getText(); 122 break; 123 } 124 } 125 methodMap.insert(name, method); 126 } 127 } 128 129 DynamicCapability::Client JsonRpc::getPeer(InterfaceSchema schema) { 130 codec.handleByAnnotation(interface.getSchema()); 131 return kj::heap<CapabilityImpl>(*this, schema); 132 } 133 134 static kj::HttpHeaderTable& staticHeaderTable() { 135 static kj::HttpHeaderTable HEADER_TABLE; 136 return HEADER_TABLE; 137 } 138 139 kj::Promise<void> JsonRpc::queueWrite(kj::String text) { 140 auto fork = writeQueue.then([this, text = kj::mv(text)]() mutable { 141 auto promise = transport.send(text); 142 return promise.attach(kj::mv(text)); 143 }).eagerlyEvaluate([this](kj::Exception&& e) { 144 errorFulfiller->reject(kj::mv(e)); 145 }).fork(); 146 writeQueue = fork.addBranch(); 147 return fork.addBranch(); 148 } 149 150 void JsonRpc::queueError(kj::Maybe<json::Value::Reader> id, int code, kj::StringPtr message) { 151 MallocMessageBuilder capnpMessage; 152 auto jsonResponse = capnpMessage.getRoot<json::RpcMessage>(); 153 jsonResponse.setJsonrpc("2.0"); 154 KJ_IF_MAYBE(i, id) { 155 jsonResponse.setId(*i); 156 } else { 157 jsonResponse.initId().setNull(); 158 } 159 auto error = jsonResponse.initError(); 160 error.setCode(code); 161 error.setMessage(message); 162 163 // OK to discard result of queueWrite() since it's just one branch of a fork. 164 queueWrite(codec.encode(jsonResponse)); 165 } 166 167 kj::Promise<void> JsonRpc::readLoop() { 168 return transport.receive().then([this](kj::String message) -> kj::Promise<void> { 169 MallocMessageBuilder capnpMessage; 170 auto rpcMessageBuilder = capnpMessage.getRoot<json::RpcMessage>(); 171 172 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { 173 codec.decode(message, rpcMessageBuilder); 174 })) { 175 queueError(nullptr, -32700, kj::str("Parse error: ", exception->getDescription())); 176 return readLoop(); 177 } 178 179 KJ_CONTEXT("decoding JSON-RPC message", message); 180 181 auto rpcMessage = rpcMessageBuilder.asReader(); 182 183 if (!rpcMessage.hasJsonrpc()) { 184 queueError(nullptr, -32700, kj::str("Missing 'jsonrpc' field.")); 185 return readLoop(); 186 } else if (rpcMessage.getJsonrpc() != "2.0") { 187 queueError(nullptr, -32700, 188 kj::str("Unknown JSON-RPC version. This peer implements version '2.0'.")); 189 return readLoop(); 190 } 191 192 switch (rpcMessage.which()) { 193 case json::RpcMessage::NONE: 194 queueError(nullptr, -32700, kj::str("message has none of params, result, or error")); 195 break; 196 197 case json::RpcMessage::PARAMS: { 198 // a call 199 KJ_IF_MAYBE(method, methodMap.find(rpcMessage.getMethod())) { 200 auto req = interface.newRequest(*method); 201 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { 202 codec.decode(rpcMessage.getParams(), req); 203 })) { 204 kj::Maybe<JsonValue::Reader> id; 205 if (rpcMessage.hasId()) id = rpcMessage.getId(); 206 queueError(id, -32602, 207 kj::str("Type error in method params: ", exception->getDescription())); 208 break; 209 } 210 211 if (rpcMessage.hasId()) { 212 auto id = rpcMessage.getId(); 213 auto idCopy = kj::heapArray<word>(id.totalSize().wordCount + 1); 214 memset(idCopy.begin(), 0, idCopy.asBytes().size()); 215 copyToUnchecked(id, idCopy); 216 auto idPtr = readMessageUnchecked<json::Value>(idCopy.begin()); 217 218 auto promise = req.send() 219 .then([this,idPtr](Response<DynamicStruct> response) mutable { 220 MallocMessageBuilder capnpMessage; 221 auto jsonResponse = capnpMessage.getRoot<json::RpcMessage>(); 222 jsonResponse.setJsonrpc("2.0"); 223 jsonResponse.setId(idPtr); 224 codec.encode(DynamicStruct::Reader(response), jsonResponse.initResult()); 225 return queueWrite(codec.encode(jsonResponse)); 226 }, [this,idPtr](kj::Exception&& e) { 227 MallocMessageBuilder capnpMessage; 228 auto jsonResponse = capnpMessage.getRoot<json::RpcMessage>(); 229 jsonResponse.setJsonrpc("2.0"); 230 jsonResponse.setId(idPtr); 231 auto error = jsonResponse.initError(); 232 switch (e.getType()) { 233 case kj::Exception::Type::FAILED: 234 error.setCode(-32000); 235 break; 236 case kj::Exception::Type::DISCONNECTED: 237 error.setCode(-32001); 238 break; 239 case kj::Exception::Type::OVERLOADED: 240 error.setCode(-32002); 241 break; 242 case kj::Exception::Type::UNIMPLEMENTED: 243 error.setCode(-32601); // method not found 244 break; 245 } 246 error.setMessage(e.getDescription()); 247 return queueWrite(codec.encode(jsonResponse)); 248 }); 249 tasks.add(promise.attach(kj::mv(idCopy))); 250 } else { 251 // No 'id', so this is a notification. 252 tasks.add(req.send().ignoreResult().catch_([](kj::Exception&& exception) { 253 if (exception.getType() != kj::Exception::Type::UNIMPLEMENTED) { 254 KJ_LOG(ERROR, "JSON-RPC notification threw exception into the abyss", exception); 255 } 256 })); 257 } 258 } else { 259 if (rpcMessage.hasId()) { 260 queueError(rpcMessage.getId(), -32601, "Method not found"); 261 } else { 262 // Ignore notification for unknown method. 263 } 264 } 265 break; 266 } 267 268 case json::RpcMessage::RESULT: { 269 auto id = rpcMessage.getId(); 270 if (!id.isNumber()) { 271 // JSON-RPC doesn't define what to do if receiving a response with an invalid id. 272 KJ_LOG(ERROR, "JSON-RPC response has invalid ID"); 273 } else KJ_IF_MAYBE(awaited, awaitedResponses.find((uint)id.getNumber())) { 274 KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() { 275 codec.decode(rpcMessage.getResult(), awaited->context.getResults()); 276 awaited->fulfiller->fulfill(); 277 })) { 278 // Errors always propagate from callee to caller, so we don't want to throw this error 279 // back to the server. 280 awaited->fulfiller->reject(kj::mv(*exception)); 281 } 282 } else { 283 // Probably, this is the response to a call that was canceled. 284 } 285 break; 286 } 287 288 case json::RpcMessage::ERROR: { 289 auto id = rpcMessage.getId(); 290 if (id.isNull()) { 291 // Error message will be logged by KJ_CONTEXT, above. 292 KJ_LOG(ERROR, "peer reports JSON-RPC protocol error"); 293 } else if (!id.isNumber()) { 294 // JSON-RPC doesn't define what to do if receiving a response with an invalid id. 295 KJ_LOG(ERROR, "JSON-RPC response has invalid ID"); 296 } else KJ_IF_MAYBE(awaited, awaitedResponses.find((uint)id.getNumber())) { 297 auto error = rpcMessage.getError(); 298 auto code = error.getCode(); 299 kj::Exception::Type type = 300 code == -32601 ? kj::Exception::Type::UNIMPLEMENTED 301 : kj::Exception::Type::FAILED; 302 awaited->fulfiller->reject(kj::Exception( 303 type, __FILE__, __LINE__, kj::str(error.getMessage()))); 304 } else { 305 // Probably, this is the response to a call that was canceled. 306 } 307 break; 308 } 309 } 310 311 return readLoop(); 312 }); 313 } 314 315 void JsonRpc::taskFailed(kj::Exception&& exception) { 316 errorFulfiller->reject(kj::mv(exception)); 317 } 318 319 // ======================================================================================= 320 321 JsonRpc::ContentLengthTransport::ContentLengthTransport(kj::AsyncIoStream& stream) 322 : stream(stream), input(kj::newHttpInputStream(stream, staticHeaderTable())) {} 323 JsonRpc::ContentLengthTransport::~ContentLengthTransport() noexcept(false) {} 324 325 kj::Promise<void> JsonRpc::ContentLengthTransport::send(kj::StringPtr text) { 326 auto headers = kj::str("Content-Length: ", text.size(), "\r\n\r\n"); 327 parts[0] = headers.asBytes(); 328 parts[1] = text.asBytes(); 329 return stream.write(parts).attach(kj::mv(headers)); 330 } 331 332 kj::Promise<kj::String> JsonRpc::ContentLengthTransport::receive() { 333 return input->readMessage() 334 .then([](kj::HttpInputStream::Message&& message) { 335 auto promise = message.body->readAllText(); 336 return promise.attach(kj::mv(message.body)); 337 }); 338 } 339 340 } // namespace capnp