capnproto

FORK: Cap'n Proto serialization/RPC system - core tools and C++ library
git clone https://git.neptards.moe/neptards/capnproto.git
Log | Files | Refs | README | LICENSE

websocket-rpc.c++ (5379B)


      1 // Copyright (c) 2021 Ian Denhardt and contributors
      2 // Licensed under the MIT License:
      3 //
      4 // Permission is hereby granted, free of charge, to any person obtaining a copy
      5 // of this software and associated documentation files (the "Software"), to deal
      6 // in the Software without restriction, including without limitation the rights
      7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      8 // copies of the Software, and to permit persons to whom the Software is
      9 // furnished to do so, subject to the following conditions:
     10 //
     11 // The above copyright notice and this permission notice shall be included in
     12 // all copies or substantial portions of the Software.
     13 //
     14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     20 // THE SOFTWARE.
     21 
     22 #include <capnp/compat/websocket-rpc.h>
     23 #include <kj/io.h>
     24 #include <capnp/serialize.h>
     25 
     26 namespace capnp {
     27 
     28 WebSocketMessageStream::WebSocketMessageStream(kj::WebSocket& socket)
     29   : socket(socket)
     30   {};
     31 
     32 kj::Promise<kj::Maybe<MessageReaderAndFds>> WebSocketMessageStream::tryReadMessage(
     33     kj::ArrayPtr<kj::AutoCloseFd> fdSpace,
     34     ReaderOptions options, kj::ArrayPtr<word> scratchSpace) {
     35   return socket.receive(options.traversalLimitInWords * sizeof(word))
     36       .then([options](auto msg) -> kj::Promise<kj::Maybe<MessageReaderAndFds>> {
     37     KJ_SWITCH_ONEOF(msg) {
     38         KJ_CASE_ONEOF(closeMsg, kj::WebSocket::Close) {
     39           return kj::Maybe<MessageReaderAndFds>();
     40         }
     41         KJ_CASE_ONEOF(str, kj::String) {
     42           KJ_FAIL_REQUIRE(
     43               "Unexpected websocket text message; expected only binary messages.");
     44           break;
     45         }
     46         KJ_CASE_ONEOF(bytes, kj::Array<byte>) {
     47           kj::Own<capnp::MessageReader> reader;
     48           size_t sizeInWords = bytes.size() / sizeof(word);
     49           if (reinterpret_cast<uintptr_t>(bytes.begin()) % alignof(word) == 0) {
     50             reader = kj::heap<FlatArrayMessageReader>(
     51                 kj::arrayPtr(
     52                   reinterpret_cast<word *>(bytes.begin()),
     53                   sizeInWords
     54                 ),
     55                 options).attach(kj::mv(bytes));
     56           } else {
     57             // The array is misaligned, so we need to copy it.
     58             auto words = kj::heapArray<word>(sizeInWords);
     59 
     60             // Note: can't just use bytes.size(), since the the target buffer may
     61             // be shorter due to integer division.
     62             memcpy(words.begin(), bytes.begin(), sizeInWords * sizeof(word));
     63             reader = kj::heap<FlatArrayMessageReader>(
     64                 kj::arrayPtr(words.begin(), sizeInWords),
     65                 options).attach(kj::mv(words));
     66           }
     67           return kj::Maybe<MessageReaderAndFds>(MessageReaderAndFds {
     68             kj::mv(reader),
     69             nullptr
     70           });
     71         }
     72       }
     73       KJ_UNREACHABLE;
     74     });
     75 }
     76 
     77 kj::Promise<void> WebSocketMessageStream::writeMessage(
     78     kj::ArrayPtr<const int> fds,
     79     kj::ArrayPtr<const kj::ArrayPtr<const word>> segments) {
     80   // TODO(perf): Right now the WebSocket interface only supports send() for
     81   // contiguous arrays, so we need to copy the whole message into a new buffer
     82   // in order to send it, whereas ideally we could just write each segment
     83   // (and the segment table) in sequence. Perhaps we should extend the WebSocket
     84   // interface to be able to send an ArrayPtr<ArrayPtr<byte>> as one binary
     85   // message, and then use that to avoid an extra copy here.
     86 
     87   auto stream = kj::heap<kj::VectorOutputStream>(
     88       computeSerializedSizeInWords(segments) * sizeof(word));
     89   capnp::writeMessage(*stream, segments);
     90   auto arrayPtr = stream->getArray();
     91   return socket.send(arrayPtr).attach(kj::mv(stream));
     92 }
     93 
     94 kj::Promise<void> WebSocketMessageStream::writeMessages(
     95     kj::ArrayPtr<kj::ArrayPtr<const kj::ArrayPtr<const word>>> messages) {
     96   // TODO(perf): Extend WebSocket interface with a way to write multiple messages at once.
     97 
     98   if(messages.size() == 0) {
     99     return kj::READY_NOW;
    100   }
    101   return writeMessage(nullptr, messages[0])
    102       .then([this, messages = messages.slice(1, messages.size())]() mutable -> kj::Promise<void> {
    103     return writeMessages(messages);
    104   });
    105 }
    106 
    107 kj::Maybe<int> WebSocketMessageStream::getSendBufferSize() {
    108   return nullptr;
    109 }
    110 
    111 kj::Promise<void> WebSocketMessageStream::end() {
    112   return socket.close(
    113     1005, // most generic code, indicates "No Status Received."
    114           // Since the MessageStream API doesn't tell us why
    115           // we're closing the connection, this is the best
    116           // we can do. This is consistent with what browser
    117           // implementations do if no status is provided, see:
    118           //
    119           // * https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close
    120           // * https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent
    121 
    122     "Capnp connection closed" // Similarly not much information to go on here,
    123                               // but this at least lets us trace this back to
    124                               // capnp.
    125   );
    126 };
    127 
    128 };