websocket-rpc.c++ (5379B)
1 // Copyright (c) 2021 Ian Denhardt and contributors 2 // Licensed under the MIT License: 3 // 4 // Permission is hereby granted, free of charge, to any person obtaining a copy 5 // of this software and associated documentation files (the "Software"), to deal 6 // in the Software without restriction, including without limitation the rights 7 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 8 // copies of the Software, and to permit persons to whom the Software is 9 // furnished to do so, subject to the following conditions: 10 // 11 // The above copyright notice and this permission notice shall be included in 12 // all copies or substantial portions of the Software. 13 // 14 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 15 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 16 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 17 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 18 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 19 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 20 // THE SOFTWARE. 21 22 #include <capnp/compat/websocket-rpc.h> 23 #include <kj/io.h> 24 #include <capnp/serialize.h> 25 26 namespace capnp { 27 28 WebSocketMessageStream::WebSocketMessageStream(kj::WebSocket& socket) 29 : socket(socket) 30 {}; 31 32 kj::Promise<kj::Maybe<MessageReaderAndFds>> WebSocketMessageStream::tryReadMessage( 33 kj::ArrayPtr<kj::AutoCloseFd> fdSpace, 34 ReaderOptions options, kj::ArrayPtr<word> scratchSpace) { 35 return socket.receive(options.traversalLimitInWords * sizeof(word)) 36 .then([options](auto msg) -> kj::Promise<kj::Maybe<MessageReaderAndFds>> { 37 KJ_SWITCH_ONEOF(msg) { 38 KJ_CASE_ONEOF(closeMsg, kj::WebSocket::Close) { 39 return kj::Maybe<MessageReaderAndFds>(); 40 } 41 KJ_CASE_ONEOF(str, kj::String) { 42 KJ_FAIL_REQUIRE( 43 "Unexpected websocket text message; expected only binary messages."); 44 break; 45 } 46 KJ_CASE_ONEOF(bytes, kj::Array<byte>) { 47 kj::Own<capnp::MessageReader> reader; 48 size_t sizeInWords = bytes.size() / sizeof(word); 49 if (reinterpret_cast<uintptr_t>(bytes.begin()) % alignof(word) == 0) { 50 reader = kj::heap<FlatArrayMessageReader>( 51 kj::arrayPtr( 52 reinterpret_cast<word *>(bytes.begin()), 53 sizeInWords 54 ), 55 options).attach(kj::mv(bytes)); 56 } else { 57 // The array is misaligned, so we need to copy it. 58 auto words = kj::heapArray<word>(sizeInWords); 59 60 // Note: can't just use bytes.size(), since the the target buffer may 61 // be shorter due to integer division. 62 memcpy(words.begin(), bytes.begin(), sizeInWords * sizeof(word)); 63 reader = kj::heap<FlatArrayMessageReader>( 64 kj::arrayPtr(words.begin(), sizeInWords), 65 options).attach(kj::mv(words)); 66 } 67 return kj::Maybe<MessageReaderAndFds>(MessageReaderAndFds { 68 kj::mv(reader), 69 nullptr 70 }); 71 } 72 } 73 KJ_UNREACHABLE; 74 }); 75 } 76 77 kj::Promise<void> WebSocketMessageStream::writeMessage( 78 kj::ArrayPtr<const int> fds, 79 kj::ArrayPtr<const kj::ArrayPtr<const word>> segments) { 80 // TODO(perf): Right now the WebSocket interface only supports send() for 81 // contiguous arrays, so we need to copy the whole message into a new buffer 82 // in order to send it, whereas ideally we could just write each segment 83 // (and the segment table) in sequence. Perhaps we should extend the WebSocket 84 // interface to be able to send an ArrayPtr<ArrayPtr<byte>> as one binary 85 // message, and then use that to avoid an extra copy here. 86 87 auto stream = kj::heap<kj::VectorOutputStream>( 88 computeSerializedSizeInWords(segments) * sizeof(word)); 89 capnp::writeMessage(*stream, segments); 90 auto arrayPtr = stream->getArray(); 91 return socket.send(arrayPtr).attach(kj::mv(stream)); 92 } 93 94 kj::Promise<void> WebSocketMessageStream::writeMessages( 95 kj::ArrayPtr<kj::ArrayPtr<const kj::ArrayPtr<const word>>> messages) { 96 // TODO(perf): Extend WebSocket interface with a way to write multiple messages at once. 97 98 if(messages.size() == 0) { 99 return kj::READY_NOW; 100 } 101 return writeMessage(nullptr, messages[0]) 102 .then([this, messages = messages.slice(1, messages.size())]() mutable -> kj::Promise<void> { 103 return writeMessages(messages); 104 }); 105 } 106 107 kj::Maybe<int> WebSocketMessageStream::getSendBufferSize() { 108 return nullptr; 109 } 110 111 kj::Promise<void> WebSocketMessageStream::end() { 112 return socket.close( 113 1005, // most generic code, indicates "No Status Received." 114 // Since the MessageStream API doesn't tell us why 115 // we're closing the connection, this is the best 116 // we can do. This is consistent with what browser 117 // implementations do if no status is provided, see: 118 // 119 // * https://developer.mozilla.org/en-US/docs/Web/API/WebSocket/close 120 // * https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent 121 122 "Capnp connection closed" // Similarly not much information to go on here, 123 // but this at least lets us trace this back to 124 // capnp. 125 ); 126 }; 127 128 };