capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

json-rpc.c++ (13030B)


      1 // Copyright (c) 2018 Kenton Varda and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #include "json-rpc.h"
     23 #include <kj/compat/http.h>
     24 #include <capnp/compat/json-rpc.capnp.h>
     25 
     26 namespace capnp {
     27 
     28 static constexpr uint64_t JSON_NAME_ANNOTATION_ID = 0xfa5b1fd61c2e7c3dull;
     29 static constexpr uint64_t JSON_NOTIFICATION_ANNOTATION_ID = 0xa0a054dea32fd98cull;
     30 
     31 class JsonRpc::CapabilityImpl final: public DynamicCapability::Server {
     32 public:
     33   CapabilityImpl(JsonRpc& parent, InterfaceSchema schema)
     34       : DynamicCapability::Server(schema), parent(parent) {}
     35 
     36   kj::Promise<void> call(InterfaceSchema::Method method,
     37                          CallContext<DynamicStruct, DynamicStruct> context) override {
     38     auto proto = method.getProto();
     39     bool isNotification = false;
     40     kj::StringPtr name = proto.getName();
     41     for (auto annotation: proto.getAnnotations()) {
     42       switch (annotation.getId()) {
     43         case JSON_NAME_ANNOTATION_ID:
     44           name = annotation.getValue().getText();
     45           break;
     46         case JSON_NOTIFICATION_ANNOTATION_ID:
     47           isNotification = true;
     48           break;
     49       }
     50     }
     51 
     52     capnp::MallocMessageBuilder message;
     53     auto value = message.getRoot<json::Value>();
     54     auto list = value.initObject(3 + !isNotification);
     55 
     56     uint index = 0;
     57 
     58     auto jsonrpc = list[index++];
     59     jsonrpc.setName("jsonrpc");
     60     jsonrpc.initValue().setString("2.0");
     61 
     62     uint callId = parent.callCount++;
     63 
     64     if (!isNotification) {
     65       auto id = list[index++];
     66       id.setName("id");
     67       id.initValue().setNumber(callId);
     68     }
     69 
     70     auto methodName = list[index++];
     71     methodName.setName("method");
     72     methodName.initValue().setString(name);
     73 
     74     auto params = list[index++];
     75     params.setName("params");
     76     parent.codec.encode(context.getParams(), params.initValue());
     77 
     78     auto writePromise = parent.queueWrite(parent.codec.encode(value));
     79 
     80     if (isNotification) {
     81       auto sproto = context.getResultsType().getProto().getStruct();
     82       MessageSize size { sproto.getDataWordCount(), sproto.getPointerCount() };
     83       context.initResults(size);
     84       return kj::mv(writePromise);
     85     } else {
     86       auto paf = kj::newPromiseAndFulfiller<void>();
     87       parent.awaitedResponses.insert(callId, AwaitedResponse { context, kj::mv(paf.fulfiller) });
     88       auto promise = writePromise.then([p = kj::mv(paf.promise)]() mutable { return kj::mv(p); });
     89       auto& parentRef = parent;
     90       return promise.attach(kj::defer([&parentRef,callId]() {
     91         parentRef.awaitedResponses.erase(callId);
     92       }));
     93     }
     94   }
     95 
     96 private:
     97   JsonRpc& parent;
     98 };
     99 
    100 JsonRpc::JsonRpc(Transport& transport, DynamicCapability::Client interface)
    101     : JsonRpc(transport, kj::mv(interface), kj::newPromiseAndFulfiller<void>()) {}
    102 JsonRpc::JsonRpc(Transport& transport, DynamicCapability::Client interfaceParam,
    103                  kj::PromiseFulfillerPair<void> paf)
    104     : transport(transport),
    105       interface(kj::mv(interfaceParam)),
    106       errorPromise(paf.promise.fork()),
    107       errorFulfiller(kj::mv(paf.fulfiller)),
    108       readTask(readLoop().eagerlyEvaluate([this](kj::Exception&& e) {
    109         errorFulfiller->reject(kj::mv(e));
    110       })),
    111       tasks(*this) {
    112   codec.handleByAnnotation(interface.getSchema());
    113   codec.handleByAnnotation<json::RpcMessage>();
    114 
    115   for (auto method: interface.getSchema().getMethods()) {
    116     auto proto = method.getProto();
    117     kj::StringPtr name = proto.getName();
    118     for (auto annotation: proto.getAnnotations()) {
    119       switch (annotation.getId()) {
    120         case JSON_NAME_ANNOTATION_ID:
    121           name = annotation.getValue().getText();
    122           break;
    123       }
    124     }
    125     methodMap.insert(name, method);
    126   }
    127 }
    128 
    129 DynamicCapability::Client JsonRpc::getPeer(InterfaceSchema schema) {
    130   codec.handleByAnnotation(interface.getSchema());
    131   return kj::heap<CapabilityImpl>(*this, schema);
    132 }
    133 
    134 static kj::HttpHeaderTable& staticHeaderTable() {
    135   static kj::HttpHeaderTable HEADER_TABLE;
    136   return HEADER_TABLE;
    137 }
    138 
    139 kj::Promise<void> JsonRpc::queueWrite(kj::String text) {
    140   auto fork = writeQueue.then([this, text = kj::mv(text)]() mutable {
    141     auto promise = transport.send(text);
    142     return promise.attach(kj::mv(text));
    143   }).eagerlyEvaluate([this](kj::Exception&& e) {
    144     errorFulfiller->reject(kj::mv(e));
    145   }).fork();
    146   writeQueue = fork.addBranch();
    147   return fork.addBranch();
    148 }
    149 
    150 void JsonRpc::queueError(kj::Maybe<json::Value::Reader> id, int code, kj::StringPtr message) {
    151   MallocMessageBuilder capnpMessage;
    152   auto jsonResponse = capnpMessage.getRoot<json::RpcMessage>();
    153   jsonResponse.setJsonrpc("2.0");
    154   KJ_IF_MAYBE(i, id) {
    155     jsonResponse.setId(*i);
    156   } else {
    157     jsonResponse.initId().setNull();
    158   }
    159   auto error = jsonResponse.initError();
    160   error.setCode(code);
    161   error.setMessage(message);
    162 
    163   // OK to discard result of queueWrite() since it's just one branch of a fork.
    164   queueWrite(codec.encode(jsonResponse));
    165 }
    166 
    167 kj::Promise<void> JsonRpc::readLoop() {
    168   return transport.receive().then([this](kj::String message) -> kj::Promise<void> {
    169     MallocMessageBuilder capnpMessage;
    170     auto rpcMessageBuilder = capnpMessage.getRoot<json::RpcMessage>();
    171 
    172     KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
    173       codec.decode(message, rpcMessageBuilder);
    174     })) {
    175       queueError(nullptr, -32700, kj::str("Parse error: ", exception->getDescription()));
    176       return readLoop();
    177     }
    178 
    179     KJ_CONTEXT("decoding JSON-RPC message", message);
    180 
    181     auto rpcMessage = rpcMessageBuilder.asReader();
    182 
    183     if (!rpcMessage.hasJsonrpc()) {
    184       queueError(nullptr, -32700, kj::str("Missing 'jsonrpc' field."));
    185       return readLoop();
    186     } else if (rpcMessage.getJsonrpc() != "2.0") {
    187       queueError(nullptr, -32700,
    188           kj::str("Unknown JSON-RPC version. This peer implements version '2.0'."));
    189       return readLoop();
    190     }
    191 
    192     switch (rpcMessage.which()) {
    193       case json::RpcMessage::NONE:
    194         queueError(nullptr, -32700, kj::str("message has none of params, result, or error"));
    195         break;
    196 
    197       case json::RpcMessage::PARAMS: {
    198         // a call
    199         KJ_IF_MAYBE(method, methodMap.find(rpcMessage.getMethod())) {
    200           auto req = interface.newRequest(*method);
    201           KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
    202             codec.decode(rpcMessage.getParams(), req);
    203           })) {
    204             kj::Maybe<JsonValue::Reader> id;
    205             if (rpcMessage.hasId()) id = rpcMessage.getId();
    206             queueError(id, -32602,
    207                 kj::str("Type error in method params: ", exception->getDescription()));
    208             break;
    209           }
    210 
    211           if (rpcMessage.hasId()) {
    212             auto id = rpcMessage.getId();
    213             auto idCopy = kj::heapArray<word>(id.totalSize().wordCount + 1);
    214             memset(idCopy.begin(), 0, idCopy.asBytes().size());
    215             copyToUnchecked(id, idCopy);
    216             auto idPtr = readMessageUnchecked<json::Value>(idCopy.begin());
    217 
    218             auto promise = req.send()
    219                 .then([this,idPtr](Response<DynamicStruct> response) mutable {
    220               MallocMessageBuilder capnpMessage;
    221               auto jsonResponse = capnpMessage.getRoot<json::RpcMessage>();
    222               jsonResponse.setJsonrpc("2.0");
    223               jsonResponse.setId(idPtr);
    224               codec.encode(DynamicStruct::Reader(response), jsonResponse.initResult());
    225               return queueWrite(codec.encode(jsonResponse));
    226             }, [this,idPtr](kj::Exception&& e) {
    227               MallocMessageBuilder capnpMessage;
    228               auto jsonResponse = capnpMessage.getRoot<json::RpcMessage>();
    229               jsonResponse.setJsonrpc("2.0");
    230               jsonResponse.setId(idPtr);
    231               auto error = jsonResponse.initError();
    232               switch (e.getType()) {
    233                 case kj::Exception::Type::FAILED:
    234                   error.setCode(-32000);
    235                   break;
    236                 case kj::Exception::Type::DISCONNECTED:
    237                   error.setCode(-32001);
    238                   break;
    239                 case kj::Exception::Type::OVERLOADED:
    240                   error.setCode(-32002);
    241                   break;
    242                 case kj::Exception::Type::UNIMPLEMENTED:
    243                   error.setCode(-32601);  // method not found
    244                   break;
    245               }
    246               error.setMessage(e.getDescription());
    247               return queueWrite(codec.encode(jsonResponse));
    248             });
    249             tasks.add(promise.attach(kj::mv(idCopy)));
    250           } else {
    251             // No 'id', so this is a notification.
    252             tasks.add(req.send().ignoreResult().catch_([](kj::Exception&& exception) {
    253               if (exception.getType() != kj::Exception::Type::UNIMPLEMENTED) {
    254                 KJ_LOG(ERROR, "JSON-RPC notification threw exception into the abyss", exception);
    255               }
    256             }));
    257           }
    258         } else {
    259           if (rpcMessage.hasId()) {
    260             queueError(rpcMessage.getId(), -32601, "Method not found");
    261           } else {
    262             // Ignore notification for unknown method.
    263           }
    264         }
    265         break;
    266       }
    267 
    268       case json::RpcMessage::RESULT: {
    269         auto id = rpcMessage.getId();
    270         if (!id.isNumber()) {
    271           // JSON-RPC doesn't define what to do if receiving a response with an invalid id.
    272           KJ_LOG(ERROR, "JSON-RPC response has invalid ID");
    273         } else KJ_IF_MAYBE(awaited, awaitedResponses.find((uint)id.getNumber())) {
    274           KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]() {
    275             codec.decode(rpcMessage.getResult(), awaited->context.getResults());
    276             awaited->fulfiller->fulfill();
    277           })) {
    278             // Errors always propagate from callee to caller, so we don't want to throw this error
    279             // back to the server.
    280             awaited->fulfiller->reject(kj::mv(*exception));
    281           }
    282         } else {
    283           // Probably, this is the response to a call that was canceled.
    284         }
    285         break;
    286       }
    287 
    288       case json::RpcMessage::ERROR: {
    289         auto id = rpcMessage.getId();
    290         if (id.isNull()) {
    291           // Error message will be logged by KJ_CONTEXT, above.
    292           KJ_LOG(ERROR, "peer reports JSON-RPC protocol error");
    293         } else if (!id.isNumber()) {
    294           // JSON-RPC doesn't define what to do if receiving a response with an invalid id.
    295           KJ_LOG(ERROR, "JSON-RPC response has invalid ID");
    296         } else KJ_IF_MAYBE(awaited, awaitedResponses.find((uint)id.getNumber())) {
    297           auto error = rpcMessage.getError();
    298           auto code = error.getCode();
    299           kj::Exception::Type type =
    300               code == -32601 ? kj::Exception::Type::UNIMPLEMENTED
    301                              : kj::Exception::Type::FAILED;
    302           awaited->fulfiller->reject(kj::Exception(
    303               type, __FILE__, __LINE__, kj::str(error.getMessage())));
    304         } else {
    305           // Probably, this is the response to a call that was canceled.
    306         }
    307         break;
    308       }
    309     }
    310 
    311     return readLoop();
    312   });
    313 }
    314 
    315 void JsonRpc::taskFailed(kj::Exception&& exception) {
    316   errorFulfiller->reject(kj::mv(exception));
    317 }
    318 
    319 // =======================================================================================
    320 
    321 JsonRpc::ContentLengthTransport::ContentLengthTransport(kj::AsyncIoStream& stream)
    322     : stream(stream), input(kj::newHttpInputStream(stream, staticHeaderTable())) {}
    323 JsonRpc::ContentLengthTransport::~ContentLengthTransport() noexcept(false) {}
    324 
    325 kj::Promise<void> JsonRpc::ContentLengthTransport::send(kj::StringPtr text) {
    326   auto headers = kj::str("Content-Length: ", text.size(), "\r\n\r\n");
    327   parts[0] = headers.asBytes();
    328   parts[1] = text.asBytes();
    329   return stream.write(parts).attach(kj::mv(headers));
    330 }
    331 
    332 kj::Promise<kj::String> JsonRpc::ContentLengthTransport::receive() {
    333   return input->readMessage()
    334       .then([](kj::HttpInputStream::Message&& message) {
    335     auto promise = message.body->readAllText();
    336     return promise.attach(kj::mv(message.body));
    337   });
    338 }
    339 
    340 }  // namespace capnp