qemu

FORK: QEMU emulator
git clone https://git.neptards.moe/neptards/qemu.git
Log | Files | Refs | Submodules | LICENSE

stream.c (11864B)


      1 /*
      2  * QEMU System Emulator
      3  *
      4  * Copyright (c) 2003-2008 Fabrice Bellard
      5  * Copyright (c) 2022 Red Hat, Inc.
      6  *
      7  * Permission is hereby granted, free of charge, to any person obtaining a copy
      8  * of this software and associated documentation files (the "Software"), to deal
      9  * in the Software without restriction, including without limitation the rights
     10  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
     11  * copies of the Software, and to permit persons to whom the Software is
     12  * furnished to do so, subject to the following conditions:
     13  *
     14  * The above copyright notice and this permission notice shall be included in
     15  * all copies or substantial portions of the Software.
     16  *
     17  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     18  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     19  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
     20  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     21  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     22  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
     23  * THE SOFTWARE.
     24  */
     25 
     26 #include "qemu/osdep.h"
     27 
     28 #include "net/net.h"
     29 #include "clients.h"
     30 #include "monitor/monitor.h"
     31 #include "qapi/error.h"
     32 #include "qemu/error-report.h"
     33 #include "qemu/option.h"
     34 #include "qemu/sockets.h"
     35 #include "qemu/iov.h"
     36 #include "qemu/main-loop.h"
     37 #include "qemu/cutils.h"
     38 #include "io/channel.h"
     39 #include "io/channel-socket.h"
     40 #include "io/net-listener.h"
     41 #include "qapi/qapi-events-net.h"
     42 
     43 typedef struct NetStreamState {
     44     NetClientState nc;
     45     QIOChannel *listen_ioc;
     46     QIONetListener *listener;
     47     QIOChannel *ioc;
     48     guint ioc_read_tag;
     49     guint ioc_write_tag;
     50     SocketReadState rs;
     51     unsigned int send_index;      /* number of bytes sent*/
     52 } NetStreamState;
     53 
     54 static void net_stream_listen(QIONetListener *listener,
     55                               QIOChannelSocket *cioc,
     56                               void *opaque);
     57 
     58 static gboolean net_stream_writable(QIOChannel *ioc,
     59                                     GIOCondition condition,
     60                                     gpointer data)
     61 {
     62     NetStreamState *s = data;
     63 
     64     s->ioc_write_tag = 0;
     65 
     66     qemu_flush_queued_packets(&s->nc);
     67 
     68     return G_SOURCE_REMOVE;
     69 }
     70 
     71 static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf,
     72                                   size_t size)
     73 {
     74     NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
     75     uint32_t len = htonl(size);
     76     struct iovec iov[] = {
     77         {
     78             .iov_base = &len,
     79             .iov_len  = sizeof(len),
     80         }, {
     81             .iov_base = (void *)buf,
     82             .iov_len  = size,
     83         },
     84     };
     85     struct iovec local_iov[2];
     86     unsigned int nlocal_iov;
     87     size_t remaining;
     88     ssize_t ret;
     89 
     90     remaining = iov_size(iov, 2) - s->send_index;
     91     nlocal_iov = iov_copy(local_iov, 2, iov, 2, s->send_index, remaining);
     92     ret = qio_channel_writev(s->ioc, local_iov, nlocal_iov, NULL);
     93     if (ret == QIO_CHANNEL_ERR_BLOCK) {
     94         ret = 0; /* handled further down */
     95     }
     96     if (ret == -1) {
     97         s->send_index = 0;
     98         return -errno;
     99     }
    100     if (ret < (ssize_t)remaining) {
    101         s->send_index += ret;
    102         s->ioc_write_tag = qio_channel_add_watch(s->ioc, G_IO_OUT,
    103                                                  net_stream_writable, s, NULL);
    104         return 0;
    105     }
    106     s->send_index = 0;
    107     return size;
    108 }
    109 
    110 static gboolean net_stream_send(QIOChannel *ioc,
    111                                 GIOCondition condition,
    112                                 gpointer data);
    113 
    114 static void net_stream_send_completed(NetClientState *nc, ssize_t len)
    115 {
    116     NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
    117 
    118     if (!s->ioc_read_tag) {
    119         s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN,
    120                                                 net_stream_send, s, NULL);
    121     }
    122 }
    123 
    124 static void net_stream_rs_finalize(SocketReadState *rs)
    125 {
    126     NetStreamState *s = container_of(rs, NetStreamState, rs);
    127 
    128     if (qemu_send_packet_async(&s->nc, rs->buf,
    129                                rs->packet_len,
    130                                net_stream_send_completed) == 0) {
    131         if (s->ioc_read_tag) {
    132             g_source_remove(s->ioc_read_tag);
    133             s->ioc_read_tag = 0;
    134         }
    135     }
    136 }
    137 
    138 static gboolean net_stream_send(QIOChannel *ioc,
    139                                 GIOCondition condition,
    140                                 gpointer data)
    141 {
    142     NetStreamState *s = data;
    143     int size;
    144     int ret;
    145     char buf1[NET_BUFSIZE];
    146     const char *buf;
    147 
    148     size = qio_channel_read(s->ioc, buf1, sizeof(buf1), NULL);
    149     if (size < 0) {
    150         if (errno != EWOULDBLOCK) {
    151             goto eoc;
    152         }
    153     } else if (size == 0) {
    154         /* end of connection */
    155     eoc:
    156         s->ioc_read_tag = 0;
    157         if (s->ioc_write_tag) {
    158             g_source_remove(s->ioc_write_tag);
    159             s->ioc_write_tag = 0;
    160         }
    161         if (s->listener) {
    162             qio_net_listener_set_client_func(s->listener, net_stream_listen,
    163                                              s, NULL);
    164         }
    165         object_unref(OBJECT(s->ioc));
    166         s->ioc = NULL;
    167 
    168         net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
    169         s->nc.link_down = true;
    170         qemu_set_info_str(&s->nc, "%s", "");
    171 
    172         qapi_event_send_netdev_stream_disconnected(s->nc.name);
    173 
    174         return G_SOURCE_REMOVE;
    175     }
    176     buf = buf1;
    177 
    178     ret = net_fill_rstate(&s->rs, (const uint8_t *)buf, size);
    179 
    180     if (ret == -1) {
    181         goto eoc;
    182     }
    183 
    184     return G_SOURCE_CONTINUE;
    185 }
    186 
    187 static void net_stream_cleanup(NetClientState *nc)
    188 {
    189     NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
    190     if (s->ioc) {
    191         if (QIO_CHANNEL_SOCKET(s->ioc)->fd != -1) {
    192             if (s->ioc_read_tag) {
    193                 g_source_remove(s->ioc_read_tag);
    194                 s->ioc_read_tag = 0;
    195             }
    196             if (s->ioc_write_tag) {
    197                 g_source_remove(s->ioc_write_tag);
    198                 s->ioc_write_tag = 0;
    199             }
    200         }
    201         object_unref(OBJECT(s->ioc));
    202         s->ioc = NULL;
    203     }
    204     if (s->listen_ioc) {
    205         if (s->listener) {
    206             qio_net_listener_disconnect(s->listener);
    207             object_unref(OBJECT(s->listener));
    208             s->listener = NULL;
    209         }
    210         object_unref(OBJECT(s->listen_ioc));
    211         s->listen_ioc = NULL;
    212     }
    213 }
    214 
    215 static NetClientInfo net_stream_info = {
    216     .type = NET_CLIENT_DRIVER_STREAM,
    217     .size = sizeof(NetStreamState),
    218     .receive = net_stream_receive,
    219     .cleanup = net_stream_cleanup,
    220 };
    221 
    222 static void net_stream_listen(QIONetListener *listener,
    223                               QIOChannelSocket *cioc,
    224                               void *opaque)
    225 {
    226     NetStreamState *s = opaque;
    227     SocketAddress *addr;
    228     char *uri;
    229 
    230     object_ref(OBJECT(cioc));
    231 
    232     qio_net_listener_set_client_func(s->listener, NULL, s, NULL);
    233 
    234     s->ioc = QIO_CHANNEL(cioc);
    235     qio_channel_set_name(s->ioc, "stream-server");
    236     s->nc.link_down = false;
    237 
    238     s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
    239                                             s, NULL);
    240 
    241     if (cioc->localAddr.ss_family == AF_UNIX) {
    242         addr = qio_channel_socket_get_local_address(cioc, NULL);
    243     } else {
    244         addr = qio_channel_socket_get_remote_address(cioc, NULL);
    245     }
    246     g_assert(addr != NULL);
    247     uri = socket_uri(addr);
    248     qemu_set_info_str(&s->nc, "%s", uri);
    249     g_free(uri);
    250     qapi_event_send_netdev_stream_connected(s->nc.name, addr);
    251     qapi_free_SocketAddress(addr);
    252 }
    253 
    254 static void net_stream_server_listening(QIOTask *task, gpointer opaque)
    255 {
    256     NetStreamState *s = opaque;
    257     QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc);
    258     SocketAddress *addr;
    259     int ret;
    260 
    261     if (listen_sioc->fd < 0) {
    262         qemu_set_info_str(&s->nc, "connection error");
    263         return;
    264     }
    265 
    266     addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
    267     g_assert(addr != NULL);
    268     ret = qemu_socket_try_set_nonblock(listen_sioc->fd);
    269     if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
    270         qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
    271                           addr->u.fd.str, -ret);
    272         return;
    273     }
    274     g_assert(ret == 0);
    275     qapi_free_SocketAddress(addr);
    276 
    277     s->nc.link_down = true;
    278     s->listener = qio_net_listener_new();
    279 
    280     net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
    281     qio_net_listener_set_client_func(s->listener, net_stream_listen, s, NULL);
    282     qio_net_listener_add(s->listener, listen_sioc);
    283 }
    284 
    285 static int net_stream_server_init(NetClientState *peer,
    286                                   const char *model,
    287                                   const char *name,
    288                                   SocketAddress *addr,
    289                                   Error **errp)
    290 {
    291     NetClientState *nc;
    292     NetStreamState *s;
    293     QIOChannelSocket *listen_sioc = qio_channel_socket_new();
    294 
    295     nc = qemu_new_net_client(&net_stream_info, peer, model, name);
    296     s = DO_UPCAST(NetStreamState, nc, nc);
    297 
    298     s->listen_ioc = QIO_CHANNEL(listen_sioc);
    299     qio_channel_socket_listen_async(listen_sioc, addr, 0,
    300                                     net_stream_server_listening, s,
    301                                     NULL, NULL);
    302 
    303     return 0;
    304 }
    305 
    306 static void net_stream_client_connected(QIOTask *task, gpointer opaque)
    307 {
    308     NetStreamState *s = opaque;
    309     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(s->ioc);
    310     SocketAddress *addr;
    311     gchar *uri;
    312     int ret;
    313 
    314     if (sioc->fd < 0) {
    315         qemu_set_info_str(&s->nc, "connection error");
    316         goto error;
    317     }
    318 
    319     addr = qio_channel_socket_get_remote_address(sioc, NULL);
    320     g_assert(addr != NULL);
    321     uri = socket_uri(addr);
    322     qemu_set_info_str(&s->nc, "%s", uri);
    323     g_free(uri);
    324 
    325     ret = qemu_socket_try_set_nonblock(sioc->fd);
    326     if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
    327         qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
    328                           addr->u.fd.str, -ret);
    329         qapi_free_SocketAddress(addr);
    330         goto error;
    331     }
    332     g_assert(ret == 0);
    333 
    334     net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
    335 
    336     /* Disable Nagle algorithm on TCP sockets to reduce latency */
    337     qio_channel_set_delay(s->ioc, false);
    338 
    339     s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
    340                                             s, NULL);
    341     s->nc.link_down = false;
    342     qapi_event_send_netdev_stream_connected(s->nc.name, addr);
    343     qapi_free_SocketAddress(addr);
    344 
    345     return;
    346 error:
    347     object_unref(OBJECT(s->ioc));
    348     s->ioc = NULL;
    349 }
    350 
    351 static int net_stream_client_init(NetClientState *peer,
    352                                   const char *model,
    353                                   const char *name,
    354                                   SocketAddress *addr,
    355                                   Error **errp)
    356 {
    357     NetStreamState *s;
    358     NetClientState *nc;
    359     QIOChannelSocket *sioc = qio_channel_socket_new();
    360 
    361     nc = qemu_new_net_client(&net_stream_info, peer, model, name);
    362     s = DO_UPCAST(NetStreamState, nc, nc);
    363 
    364     s->ioc = QIO_CHANNEL(sioc);
    365     s->nc.link_down = true;
    366 
    367     qio_channel_socket_connect_async(sioc, addr,
    368                                      net_stream_client_connected, s,
    369                                      NULL, NULL);
    370 
    371     return 0;
    372 }
    373 
    374 int net_init_stream(const Netdev *netdev, const char *name,
    375                     NetClientState *peer, Error **errp)
    376 {
    377     const NetdevStreamOptions *sock;
    378 
    379     assert(netdev->type == NET_CLIENT_DRIVER_STREAM);
    380     sock = &netdev->u.stream;
    381 
    382     if (!sock->has_server || !sock->server) {
    383         return net_stream_client_init(peer, "stream", name, sock->addr, errp);
    384     }
    385     return net_stream_server_init(peer, "stream", name, sock->addr, errp);
    386 }