[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

Re: [libvirt] PATCH: 8/25: Concurrent dispatch of RPC methods



"Daniel P. Berrange" <berrange redhat com> wrote:
> This patch re-writes the code for dispatching RPC calls in the
> remote driver to allow use from multiple threads. Only one thread
> is allowed to send/recv on the socket at a time though. If another
> thread comes along it will put itself on a queue and go to sleep.
> The first thread may actually get around to transmitting the 2nd
> thread's request while it is waiting for its own reply. It may
> even get the 2nd threads reply, if its own RPC call is being really
> slow. So when a thread wakes up from sleeping, it has to check
> whether its own RPC call has already been processed. Likewise when
> a thread owning the socket finishes with its own wor, it may have
> to pass the buck to another thread. The upshot of this, is that
> we have mutliple RPC calls executing in parallel, and requests+reply
> are no longer guarenteed to be FIFO on the wire if talking to a new
> enough server.
>
> This refactoring required use of a self-pipe/poll trick for sync
> between threads, but fortunately gnulib now provides this on Windows
> too, so there's no compatability problem there.

Quick summary: dense ;-)
though lots of moved code.

I haven't finished, but did find at least one problem, below.

> diff --git a/src/remote_internal.c b/src/remote_internal.c
...
> @@ -114,6 +164,11 @@ struct private_data {
>      virDomainEventQueuePtr domainEvents;
>      /* Timer for flushing domainEvents queue */
>      int eventFlushTimer;
> +
> +    /* List of threads currently doing dispatch */
> +    int wakeupSend;
> +    int wakeupRead;

How about appending "FD" to indicate these are file descriptors.
The names combined with the comment (which must apply to waitDispatch)
made me wonder what they represented.  Only when I saw them used
in safewrite /saferead calls did I get it.

> +    struct remote_thread_call *waitDispatch;
>  };
>
>  enum {
> @@ -160,7 +215,6 @@ static void make_nonnull_storage_pool (r
>  static void make_nonnull_storage_pool (remote_nonnull_storage_pool *pool_dst, virStoragePoolPtr vol_src);
>  static void make_nonnull_storage_vol (remote_nonnull_storage_vol *vol_dst, virStorageVolPtr vol_src);
>  void remoteDomainEventFired(int watch, int fd, int event, void *data);
> -static void remoteDomainProcessEvent(virConnectPtr conn, XDR *xdr);
>  static void remoteDomainQueueEvent(virConnectPtr conn, XDR *xdr);
>  void remoteDomainEventQueueFlush(int timer, void *opaque);
>  /*----------------------------------------------------------------------*/
> @@ -274,6 +328,7 @@ doRemoteOpen (virConnectPtr conn,
>                virConnectAuthPtr auth ATTRIBUTE_UNUSED,
>                int flags)
>  {
> +    int wakeup[2];

Add "fd" to this name, too?
Not as big a deal, since this is local and the
first use makes it obvious.

>      char *transport_str = NULL;
>
>      if (conn->uri) {
> @@ -696,6 +751,21 @@ doRemoteOpen (virConnectPtr conn,
>
>      } /* switch (transport) */
>
> +    if (virSetNonBlock(priv->sock) < 0) {
> +        errorf (conn, VIR_ERR_SYSTEM_ERROR,
> +                _("unable to make socket non-blocking %s"),
> +                strerror(errno));
> +        goto failed;
> +    }
> +
> +    if (pipe(wakeup) < 0) {
> +        errorf (conn, VIR_ERR_SYSTEM_ERROR,
> +                _("unable to make pipe %s"),
> +                strerror(errno));
> +        goto failed;
> +    }
> +    priv->wakeupRead = wakeup[0];
> +    priv->wakeupSend = wakeup[1];
>
>      /* Try and authenticate with server */
>      if (remoteAuthenticate(conn, priv, 1, auth, authtype) == -1)
> @@ -768,6 +838,7 @@ doRemoteOpen (virConnectPtr conn,
>              DEBUG0("virEventAddTimeout failed: No addTimeoutImpl defined. "
>                      "continuing without events.");
>              virEventRemoveHandle(priv->watch);
> +            priv->watch = -1;
>          }
>      }
>      /* Successful. */
> @@ -848,6 +919,7 @@ remoteOpen (virConnectPtr conn,
>      }
>      remoteDriverLock(priv);
>      priv->localUses = 1;
> +    priv->watch = -1;
>
>      if (flags & VIR_CONNECT_RO)
>          rflags |= VIR_DRV_OPEN_REMOTE_RO;
> @@ -1220,6 +1292,7 @@ doRemoteClose (virConnectPtr conn, struc
>          virEventRemoveTimeout(priv->eventFlushTimer);
>          /* Remove handle for remote events */
>          virEventRemoveHandle(priv->watch);
> +        priv->watch = -1;
>      }
>
>      /* Close socket. */
> @@ -5542,12 +5615,658 @@ done:
>
>  /*----------------------------------------------------------------------*/
>
> -static int really_write (virConnectPtr conn, struct private_data *priv,
> -                         int in_open, char *bytes, int len);
> -static int really_read (virConnectPtr conn, struct private_data *priv,
> -                        int in_open, char *bytes, int len);
> -
> -/* This function performs a remote procedure call to procedure PROC_NR.
> +
> +static struct remote_thread_call *
> +prepareCall(virConnectPtr conn,
> +            struct private_data *priv,
> +            int flags,
> +            int proc_nr,
> +            xdrproc_t args_filter, char *args,
> +            xdrproc_t ret_filter, char *ret)
> +{
> +    XDR xdr;
> +    struct remote_message_header hdr;
> +    struct remote_thread_call *rv;
> +
> +    if (VIR_ALLOC(rv) < 0)
> +        return NULL;
> +
> +    if (virCondInit(&rv->cond) < 0) {
> +        VIR_FREE(rv);
> +        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
> +               VIR_ERR_INTERNAL_ERROR,
> +               _("cannot initialize mutex"));
> +        return NULL;
> +    }
> +
> +    /* Get a unique serial number for this message. */
> +    rv->serial = priv->counter++;
> +    rv->proc_nr = proc_nr;
> +    rv->ret_filter = ret_filter;
> +    rv->ret = ret;
> +
> +    hdr.prog = REMOTE_PROGRAM;
> +    hdr.vers = REMOTE_PROTOCOL_VERSION;
> +    hdr.proc = proc_nr;
> +    hdr.direction = REMOTE_CALL;
> +    hdr.serial = rv->serial;
> +    hdr.status = REMOTE_OK;
> +
> +    /* Serialise header followed by args. */
> +    xdrmem_create (&xdr, rv->buffer+4, REMOTE_MESSAGE_MAX, XDR_ENCODE);
> +    if (!xdr_remote_message_header (&xdr, &hdr)) {
> +        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
> +               VIR_ERR_RPC, _("xdr_remote_message_header failed"));
> +        goto error;
> +    }
> +
> +    if (!(*args_filter) (&xdr, args)) {
> +        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
> +               _("marshalling args"));
> +        goto error;
> +    }
> +
> +    /* Get the length stored in buffer. */
> +    rv->bufferLength = xdr_getpos (&xdr);
> +    xdr_destroy (&xdr);
> +
> +    /* Length must include the length word itself (always encoded in
> +     * 4 bytes as per RFC 4506).
> +     */
> +    rv->bufferLength += 4;
> +
> +    /* Encode the length word. */
> +    xdrmem_create (&xdr, rv->buffer, 4, XDR_ENCODE);
> +    if (!xdr_int (&xdr, (int *)&rv->bufferLength)) {
> +        error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn, VIR_ERR_RPC,
> +               _("xdr_int (length word)"));

I haven't done enough xdr* work to know, and man pages
didn't provide an immediate answer:
Is there no need to call xdr_destroy on this error path?
I'd expect xdrmem_create to do any allocation, not xdr_int.
There are many like this.

> +        goto error;
> +    }
> +    xdr_destroy (&xdr);
> +
> +    return rv;
> +
> +error:
> +    VIR_FREE(ret);
> +    return NULL;

The above should free rv, not ret:

       VIR_FREE(rv);


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]