[libvirt PATCH v2 34/56] rpc: convert RPC client to use GMainLoop instead of poll

Pavel Hrdina phrdina at redhat.com
Thu Jan 30 14:51:05 UTC 2020


On Tue, Jan 28, 2020 at 01:11:15PM +0000, Daniel P. Berrangé wrote:
> To eliminate the dependancy on GNULIB's poll impl, we need
> to change the RPC client code to use GMainLoop. We don't
> really want to use GIOChannel, but it provides the most
> convenient way to do socket event watches with Windows
> portability. The other alternative would be to use GSocket
> but that is a much more complex change affecting libvirt
> more broadly.
> 
> Signed-off-by: Daniel P. Berrangé <berrange at redhat.com>
> ---
>  src/rpc/virnetclient.c | 215 ++++++++++++++++++++++-------------------
>  1 file changed, 113 insertions(+), 102 deletions(-)
> 
> diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c
> index 031a99711f..9069c57113 100644
> --- a/src/rpc/virnetclient.c
> +++ b/src/rpc/virnetclient.c
> @@ -800,11 +791,7 @@ static void virNetClientCloseInternal(virNetClientPtr client,
>       * queue and close the client because we set client->wantClose.
>       */

This comment should be probably updated to not reference threads.

>      if (client->haveTheBuck) {
> -        char ignore = 1;
> -        size_t len = sizeof(ignore);
> -
> -        if (safewrite(client->wakeupSendFD, &ignore, len) != len)
> -            VIR_ERROR(_("failed to wake up polling thread"));
> +        g_main_loop_quit(client->eventLoop);
>      } else {
>          virNetClientIOEventLoopPassTheBuck(client, NULL);
>      }
> @@ -831,13 +818,70 @@ void virNetClientSetSASLSession(virNetClientPtr client,
>  #endif
>  
>  
> +static gboolean
> +virNetClientIOEventTLS(int fd,
> +                       GIOCondition ev,
> +                       gpointer opaque);
> +
> +static gboolean
> +virNetClientTLSHandshake(virNetClientPtr client)
> +{
> +    GIOCondition ev;
> +    int ret;
> +
> +    ret = virNetTLSSessionHandshake(client->tls);
> +
> +    if (ret <= 0)
> +        return FALSE;
> +
> +    if (virNetTLSSessionGetHandshakeStatus(client->tls) ==
> +        VIR_NET_TLS_HANDSHAKE_RECVING)
> +        ev = G_IO_IN;
> +    else
> +        ev = G_IO_OUT;
> +
> +    virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
> +                               ev,
> +                               client->eventCtx,
> +                               virNetClientIOEventTLS, client, NULL);
> +
> +    return TRUE;
> +}
> +
> +
> +static gboolean
> +virNetClientIOEventTLS(int fd G_GNUC_UNUSED,
> +                       GIOCondition ev G_GNUC_UNUSED,
> +                       gpointer opaque)
> +{
> +    virNetClientPtr client = opaque;
> +
> +    if (!virNetClientTLSHandshake(client))
> +        g_main_loop_quit(client->eventLoop);
> +
> +    return G_SOURCE_REMOVE;
> +}
> +
> +
> +static gboolean
> +virNetClientIOEventTLSConfirm(int fd G_GNUC_UNUSED,
> +                              GIOCondition ev G_GNUC_UNUSED,
> +                              gpointer opaque)
> +{
> +    virNetClientPtr client = opaque;
> +
> +    g_main_loop_quit(client->eventLoop);
> +
> +    return G_SOURCE_REMOVE;
> +}
> +
> +
>  int virNetClientSetTLSSession(virNetClientPtr client,
>                                virNetTLSContextPtr tls)
>  {
>      int ret;
>      char buf[1];
>      int len;
> -    struct pollfd fds[1];
>  
>  #ifndef WIN32
>      sigset_t oldmask, blockedsigs;
> @@ -860,22 +904,8 @@ int virNetClientSetTLSSession(virNetClientPtr client,
>  
>      virNetSocketSetTLSSession(client->sock, client->tls);
>  
> -    for (;;) {
> -        ret = virNetTLSSessionHandshake(client->tls);
> -
> -        if (ret < 0)
> -            goto error;
> -        if (ret == 0)
> -            break;
> -
> -        fds[0].fd = virNetSocketGetFD(client->sock);
> -        fds[0].revents = 0;
> -        if (virNetTLSSessionGetHandshakeStatus(client->tls) ==
> -            VIR_NET_TLS_HANDSHAKE_RECVING)
> -            fds[0].events = POLLIN;
> -        else
> -            fds[0].events = POLLOUT;
> -
> +    virResetLastError();
> +    if (virNetClientTLSHandshake(client)) {
>  #ifndef WIN32
>          /* Block SIGWINCH from interrupting poll in curses programs,
>           * then restore the original signal mask again immediately
> @@ -885,16 +915,16 @@ int virNetClientSetTLSSession(virNetClientPtr client,
>          ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
>  #endif /* !WIN32 */
>  
> -    repoll:
> -        ret = poll(fds, G_N_ELEMENTS(fds), -1);
> -        if (ret < 0 && (errno == EAGAIN || errno == EINTR))
> -            goto repoll;
> +        g_main_loop_run(client->eventLoop);
>  
>  #ifndef WIN32
>          ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
>  #endif /* !WIN32 */
>      }
>  
> +    if (virGetLastErrorCode() != VIR_ERR_OK)
> +        goto error;
> +
>      ret = virNetTLSContextCheckCertificate(tls, client->tls);
>  
>      if (ret < 0)
> @@ -904,19 +934,17 @@ int virNetClientSetTLSSession(virNetClientPtr client,
>       * etc.  If we make the grade, it will send us a '\1' byte.
>       */
>  
> -    fds[0].fd = virNetSocketGetFD(client->sock);
> -    fds[0].revents = 0;
> -    fds[0].events = POLLIN;
> +    virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
> +                               G_IO_IN,
> +                               client->eventCtx,
> +                               virNetClientIOEventTLSConfirm, client, NULL);
>  
>  #ifndef WIN32
>      /* Block SIGWINCH from interrupting poll in curses programs */
>      ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
>  #endif /* !WIN32 */
>  
> -    repoll2:
> -    ret = poll(fds, G_N_ELEMENTS(fds), -1);
> -    if (ret < 0 && (errno == EAGAIN || errno == EINTR))
> -        goto repoll2;
> +    g_main_loop_run(client->eventLoop);
>  
>  #ifndef WIN32
>      ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
> @@ -1451,12 +1479,12 @@ virNetClientIOHandleInput(virNetClientPtr client)
>  static bool virNetClientIOEventLoopPollEvents(virNetClientCallPtr call,
>                                                void *opaque)
>  {
> -    struct pollfd *fd = opaque;
> +    GIOCondition *ev = opaque;
>  
>      if (call->mode == VIR_NET_CLIENT_MODE_WAIT_RX)
> -        fd->events |= POLLIN;
> +        *ev |= G_IO_IN;
>      if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
> -        fd->events |= POLLOUT;
> +        *ev |= G_IO_OUT;
>  
>      return false;
>  }
> @@ -1552,6 +1580,18 @@ virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
>  }
>  
>  
> +static gboolean
> +virNetClientIOEventFD(int fd G_GNUC_UNUSED,
> +                      GIOCondition ev,
> +                      gpointer opaque)
> +{
> +    GIOCondition *rev = opaque;
> +    *rev = ev;
> +
> +    return G_SOURCE_REMOVE;
> +}
> +
> +
>  /*
>   * Process all calls pending dispatch/receive until we
>   * get a reply to our own call. Then quit and pass the buck
> @@ -1563,21 +1603,17 @@ virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
>  static int virNetClientIOEventLoop(virNetClientPtr client,
>                                     virNetClientCallPtr thiscall)
>  {
> -    struct pollfd fds[2];
>      bool error = false;
>      int closeReason;
> -    int ret;
> -
> -    fds[0].fd = virNetSocketGetFD(client->sock);
> -    fds[1].fd = client->wakeupReadFD;
>  
>      for (;;) {
> -        char ignore;
>  #ifndef WIN32
>          sigset_t oldmask, blockedsigs;
>  #endif /* !WIN32 */
>          int timeout = -1;
>          virNetMessagePtr msg = NULL;
> +        GIOCondition ev = 0;
> +        GIOCondition rev = 0;
>  
>          /* If we have existing SASL decoded data we don't want to sleep in
>           * the poll(), just check if any other FDs are also ready.
> @@ -1595,22 +1631,22 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
>          if (timeout == -1)
>              timeout = virKeepAliveTimeout(client->keepalive);
>  
> -        fds[0].events = fds[0].revents = 0;
> -        fds[1].events = fds[1].revents = 0;
> -
> -        fds[1].events = POLLIN;
> -
>          /* Calculate poll events for calls */
>          virNetClientCallMatchPredicate(client->waitDispatch,
>                                         virNetClientIOEventLoopPollEvents,
> -                                       &fds[0]);
> +                                       &ev);
>  
>          /* We have to be prepared to receive stream data
>           * regardless of whether any of the calls waiting
>           * for dispatch are for streams.
>           */
>          if (client->nstreams)
> -            fds[0].events |= POLLIN;
> +            ev |= G_IO_IN;
> +
> +        virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
> +                                   ev,
> +                                   client->eventCtx,
> +                                   virNetClientIOEventFD, &rev, NULL);
>  
>          /* Release lock while poll'ing so other threads
>           * can stuff themselves on the queue */
> @@ -1630,13 +1666,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
>          sigaddset(&blockedsigs, SIGCHLD);
>  # endif
>          sigaddset(&blockedsigs, SIGPIPE);
> +
>          ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
>  #endif /* !WIN32 */
>  
> -    repoll:
> -        ret = poll(fds, G_N_ELEMENTS(fds), timeout);
> -        if (ret < 0 && (errno == EAGAIN || errno == EINTR))
> -            goto repoll;
> +        while (!rev)
> +            g_main_context_iteration(client->eventCtx, TRUE);

Is there a reason why we don't use g_main_loop_run() here and use
g_main_loop_quit() in virNetClientIOEventFD() the same way we use it
in virNetClientIOEventTLSConfirm() ?

If I'm looking at the code correctly the call to g_main_loop_quit() from
virNetClientIO() where we want to force other threads from poll would be
ignored by the g_main_context_iteration().  This would be a change in
behavior from the old core where the write to "client->wakeupSendFD"
would make the poll() function wake since it is listening on
"client->wakeupReadFD" as well.

Otherwise looks good.

Pavel

>  
>  #ifndef WIN32
>          ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
> @@ -1644,12 +1679,6 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
>  
>          virObjectLock(client);
>  
> -        if (ret < 0) {
> -            virReportSystemError(errno,
> -                                 "%s", _("poll on socket failed"));
> -            goto error;
> -        }
> -
>          if (virKeepAliveTrigger(client->keepalive, &msg)) {
>              virNetClientMarkClose(client, VIR_CONNECT_CLOSE_REASON_KEEPALIVE);
>          } else if (msg && virNetClientQueueNonBlocking(client, msg) < 0) {
> @@ -1661,7 +1690,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
>           * the socket became readable so we consume it
>           */
>          if (virNetSocketHasCachedData(client->sock))
> -            fds[0].revents |= POLLIN;
> +            rev |= G_IO_IN;
>  
>          /* If wantClose flag is set, pretend there was an error on the socket,
>           * but still read and process any data we received so far.
> @@ -1669,23 +1698,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
>          if (client->wantClose)
>              error = true;
>  
> -        if (fds[1].revents) {
> -            VIR_DEBUG("Woken up from poll by other thread");
> -            if (saferead(client->wakeupReadFD, &ignore, sizeof(ignore)) != sizeof(ignore)) {
> -                virReportSystemError(errno, "%s",
> -                                     _("read on wakeup fd failed"));
> -                virNetClientMarkClose(client, VIR_CONNECT_CLOSE_REASON_ERROR);
> -                error = true;
> -                /* Fall through to process any pending data. */
> -            }
> -        }
> -
> -        if (fds[0].revents & POLLHUP)
> +        if (rev & G_IO_HUP)
>              closeReason = VIR_CONNECT_CLOSE_REASON_EOF;
>          else
>              closeReason = VIR_CONNECT_CLOSE_REASON_ERROR;
>  
> -        if (fds[0].revents & POLLOUT) {
> +        if (rev & G_IO_OUT) {
>              if (virNetClientIOHandleOutput(client) < 0) {
>                  virNetClientMarkClose(client, closeReason);
>                  error = true;
> @@ -1693,7 +1711,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
>              }
>          }
>  
> -        if (fds[0].revents & POLLIN) {
> +        if (rev & G_IO_IN) {
>              if (virNetClientIOHandleInput(client) < 0) {
>                  virNetClientMarkClose(client, closeReason);
>                  error = true;
> @@ -1725,13 +1743,13 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
>          if (error)
>              goto error;
>  
> -        if (fds[0].revents & POLLHUP) {
> +        if (rev & G_IO_HUP) {
>              virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
>                             _("received hangup event on socket"));
>              virNetClientMarkClose(client, closeReason);
>              goto error;
>          }
> -        if (fds[0].revents & POLLERR) {
> +        if (rev & G_IO_ERR) {
>              virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
>                             _("received error event on socket"));
>              virNetClientMarkClose(client, closeReason);
> @@ -1858,15 +1876,8 @@ static int virNetClientIO(virNetClientPtr client,
>  
>      /* Check to see if another thread is dispatching */
>      if (client->haveTheBuck) {
> -        char ignore = 1;
> -
>          /* Force other thread to wakeup from poll */
> -        if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) != sizeof(ignore)) {
> -            virNetClientCallRemove(&client->waitDispatch, thiscall);
> -            virReportSystemError(errno, "%s",
> -                                 _("failed to wake up polling thread"));
> -            return -1;
> -        }
> +        g_main_loop_quit(client->eventLoop);
>  
>          /* If we are non-blocking, detach the thread and keep the call in the
>           * queue. */
> -- 
> 2.24.1
> 
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 833 bytes
Desc: not available
URL: <http://listman.redhat.com/archives/libvir-list/attachments/20200130/9d5f2473/attachment-0001.sig>


More information about the libvir-list mailing list