[libvirt PATCH v2 34/56] rpc: convert RPC client to use GMainLoop instead of poll
Pavel Hrdina
phrdina at redhat.com
Thu Jan 30 14:51:05 UTC 2020
On Tue, Jan 28, 2020 at 01:11:15PM +0000, Daniel P. Berrangé wrote:
> To eliminate the dependancy on GNULIB's poll impl, we need
> to change the RPC client code to use GMainLoop. We don't
> really want to use GIOChannel, but it provides the most
> convenient way to do socket event watches with Windows
> portability. The other alternative would be to use GSocket
> but that is a much more complex change affecting libvirt
> more broadly.
>
> Signed-off-by: Daniel P. Berrangé <berrange at redhat.com>
> ---
> src/rpc/virnetclient.c | 215 ++++++++++++++++++++++-------------------
> 1 file changed, 113 insertions(+), 102 deletions(-)
>
> diff --git a/src/rpc/virnetclient.c b/src/rpc/virnetclient.c
> index 031a99711f..9069c57113 100644
> --- a/src/rpc/virnetclient.c
> +++ b/src/rpc/virnetclient.c
> @@ -800,11 +791,7 @@ static void virNetClientCloseInternal(virNetClientPtr client,
> * queue and close the client because we set client->wantClose.
> */
This comment should be probably updated to not reference threads.
> if (client->haveTheBuck) {
> - char ignore = 1;
> - size_t len = sizeof(ignore);
> -
> - if (safewrite(client->wakeupSendFD, &ignore, len) != len)
> - VIR_ERROR(_("failed to wake up polling thread"));
> + g_main_loop_quit(client->eventLoop);
> } else {
> virNetClientIOEventLoopPassTheBuck(client, NULL);
> }
> @@ -831,13 +818,70 @@ void virNetClientSetSASLSession(virNetClientPtr client,
> #endif
>
>
> +static gboolean
> +virNetClientIOEventTLS(int fd,
> + GIOCondition ev,
> + gpointer opaque);
> +
> +static gboolean
> +virNetClientTLSHandshake(virNetClientPtr client)
> +{
> + GIOCondition ev;
> + int ret;
> +
> + ret = virNetTLSSessionHandshake(client->tls);
> +
> + if (ret <= 0)
> + return FALSE;
> +
> + if (virNetTLSSessionGetHandshakeStatus(client->tls) ==
> + VIR_NET_TLS_HANDSHAKE_RECVING)
> + ev = G_IO_IN;
> + else
> + ev = G_IO_OUT;
> +
> + virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
> + ev,
> + client->eventCtx,
> + virNetClientIOEventTLS, client, NULL);
> +
> + return TRUE;
> +}
> +
> +
> +static gboolean
> +virNetClientIOEventTLS(int fd G_GNUC_UNUSED,
> + GIOCondition ev G_GNUC_UNUSED,
> + gpointer opaque)
> +{
> + virNetClientPtr client = opaque;
> +
> + if (!virNetClientTLSHandshake(client))
> + g_main_loop_quit(client->eventLoop);
> +
> + return G_SOURCE_REMOVE;
> +}
> +
> +
> +static gboolean
> +virNetClientIOEventTLSConfirm(int fd G_GNUC_UNUSED,
> + GIOCondition ev G_GNUC_UNUSED,
> + gpointer opaque)
> +{
> + virNetClientPtr client = opaque;
> +
> + g_main_loop_quit(client->eventLoop);
> +
> + return G_SOURCE_REMOVE;
> +}
> +
> +
> int virNetClientSetTLSSession(virNetClientPtr client,
> virNetTLSContextPtr tls)
> {
> int ret;
> char buf[1];
> int len;
> - struct pollfd fds[1];
>
> #ifndef WIN32
> sigset_t oldmask, blockedsigs;
> @@ -860,22 +904,8 @@ int virNetClientSetTLSSession(virNetClientPtr client,
>
> virNetSocketSetTLSSession(client->sock, client->tls);
>
> - for (;;) {
> - ret = virNetTLSSessionHandshake(client->tls);
> -
> - if (ret < 0)
> - goto error;
> - if (ret == 0)
> - break;
> -
> - fds[0].fd = virNetSocketGetFD(client->sock);
> - fds[0].revents = 0;
> - if (virNetTLSSessionGetHandshakeStatus(client->tls) ==
> - VIR_NET_TLS_HANDSHAKE_RECVING)
> - fds[0].events = POLLIN;
> - else
> - fds[0].events = POLLOUT;
> -
> + virResetLastError();
> + if (virNetClientTLSHandshake(client)) {
> #ifndef WIN32
> /* Block SIGWINCH from interrupting poll in curses programs,
> * then restore the original signal mask again immediately
> @@ -885,16 +915,16 @@ int virNetClientSetTLSSession(virNetClientPtr client,
> ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
> #endif /* !WIN32 */
>
> - repoll:
> - ret = poll(fds, G_N_ELEMENTS(fds), -1);
> - if (ret < 0 && (errno == EAGAIN || errno == EINTR))
> - goto repoll;
> + g_main_loop_run(client->eventLoop);
>
> #ifndef WIN32
> ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
> #endif /* !WIN32 */
> }
>
> + if (virGetLastErrorCode() != VIR_ERR_OK)
> + goto error;
> +
> ret = virNetTLSContextCheckCertificate(tls, client->tls);
>
> if (ret < 0)
> @@ -904,19 +934,17 @@ int virNetClientSetTLSSession(virNetClientPtr client,
> * etc. If we make the grade, it will send us a '\1' byte.
> */
>
> - fds[0].fd = virNetSocketGetFD(client->sock);
> - fds[0].revents = 0;
> - fds[0].events = POLLIN;
> + virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
> + G_IO_IN,
> + client->eventCtx,
> + virNetClientIOEventTLSConfirm, client, NULL);
>
> #ifndef WIN32
> /* Block SIGWINCH from interrupting poll in curses programs */
> ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
> #endif /* !WIN32 */
>
> - repoll2:
> - ret = poll(fds, G_N_ELEMENTS(fds), -1);
> - if (ret < 0 && (errno == EAGAIN || errno == EINTR))
> - goto repoll2;
> + g_main_loop_run(client->eventLoop);
>
> #ifndef WIN32
> ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
> @@ -1451,12 +1479,12 @@ virNetClientIOHandleInput(virNetClientPtr client)
> static bool virNetClientIOEventLoopPollEvents(virNetClientCallPtr call,
> void *opaque)
> {
> - struct pollfd *fd = opaque;
> + GIOCondition *ev = opaque;
>
> if (call->mode == VIR_NET_CLIENT_MODE_WAIT_RX)
> - fd->events |= POLLIN;
> + *ev |= G_IO_IN;
> if (call->mode == VIR_NET_CLIENT_MODE_WAIT_TX)
> - fd->events |= POLLOUT;
> + *ev |= G_IO_OUT;
>
> return false;
> }
> @@ -1552,6 +1580,18 @@ virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
> }
>
>
> +static gboolean
> +virNetClientIOEventFD(int fd G_GNUC_UNUSED,
> + GIOCondition ev,
> + gpointer opaque)
> +{
> + GIOCondition *rev = opaque;
> + *rev = ev;
> +
> + return G_SOURCE_REMOVE;
> +}
> +
> +
> /*
> * Process all calls pending dispatch/receive until we
> * get a reply to our own call. Then quit and pass the buck
> @@ -1563,21 +1603,17 @@ virNetClientIOEventLoopPassTheBuck(virNetClientPtr client,
> static int virNetClientIOEventLoop(virNetClientPtr client,
> virNetClientCallPtr thiscall)
> {
> - struct pollfd fds[2];
> bool error = false;
> int closeReason;
> - int ret;
> -
> - fds[0].fd = virNetSocketGetFD(client->sock);
> - fds[1].fd = client->wakeupReadFD;
>
> for (;;) {
> - char ignore;
> #ifndef WIN32
> sigset_t oldmask, blockedsigs;
> #endif /* !WIN32 */
> int timeout = -1;
> virNetMessagePtr msg = NULL;
> + GIOCondition ev = 0;
> + GIOCondition rev = 0;
>
> /* If we have existing SASL decoded data we don't want to sleep in
> * the poll(), just check if any other FDs are also ready.
> @@ -1595,22 +1631,22 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
> if (timeout == -1)
> timeout = virKeepAliveTimeout(client->keepalive);
>
> - fds[0].events = fds[0].revents = 0;
> - fds[1].events = fds[1].revents = 0;
> -
> - fds[1].events = POLLIN;
> -
> /* Calculate poll events for calls */
> virNetClientCallMatchPredicate(client->waitDispatch,
> virNetClientIOEventLoopPollEvents,
> - &fds[0]);
> + &ev);
>
> /* We have to be prepared to receive stream data
> * regardless of whether any of the calls waiting
> * for dispatch are for streams.
> */
> if (client->nstreams)
> - fds[0].events |= POLLIN;
> + ev |= G_IO_IN;
> +
> + virEventGLibAddSocketWatch(virNetSocketGetFD(client->sock),
> + ev,
> + client->eventCtx,
> + virNetClientIOEventFD, &rev, NULL);
>
> /* Release lock while poll'ing so other threads
> * can stuff themselves on the queue */
> @@ -1630,13 +1666,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
> sigaddset(&blockedsigs, SIGCHLD);
> # endif
> sigaddset(&blockedsigs, SIGPIPE);
> +
> ignore_value(pthread_sigmask(SIG_BLOCK, &blockedsigs, &oldmask));
> #endif /* !WIN32 */
>
> - repoll:
> - ret = poll(fds, G_N_ELEMENTS(fds), timeout);
> - if (ret < 0 && (errno == EAGAIN || errno == EINTR))
> - goto repoll;
> + while (!rev)
> + g_main_context_iteration(client->eventCtx, TRUE);
Is there a reason why we don't use g_main_loop_run() here and use
g_main_loop_quit() in virNetClientIOEventFD() the same way we use it
in virNetClientIOEventTLSConfirm() ?
If I'm looking at the code correctly the call to g_main_loop_quit() from
virNetClientIO() where we want to force other threads from poll would be
ignored by the g_main_context_iteration(). This would be a change in
behavior from the old core where the write to "client->wakeupSendFD"
would make the poll() function wake since it is listening on
"client->wakeupReadFD" as well.
Otherwise looks good.
Pavel
>
> #ifndef WIN32
> ignore_value(pthread_sigmask(SIG_SETMASK, &oldmask, NULL));
> @@ -1644,12 +1679,6 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
>
> virObjectLock(client);
>
> - if (ret < 0) {
> - virReportSystemError(errno,
> - "%s", _("poll on socket failed"));
> - goto error;
> - }
> -
> if (virKeepAliveTrigger(client->keepalive, &msg)) {
> virNetClientMarkClose(client, VIR_CONNECT_CLOSE_REASON_KEEPALIVE);
> } else if (msg && virNetClientQueueNonBlocking(client, msg) < 0) {
> @@ -1661,7 +1690,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
> * the socket became readable so we consume it
> */
> if (virNetSocketHasCachedData(client->sock))
> - fds[0].revents |= POLLIN;
> + rev |= G_IO_IN;
>
> /* If wantClose flag is set, pretend there was an error on the socket,
> * but still read and process any data we received so far.
> @@ -1669,23 +1698,12 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
> if (client->wantClose)
> error = true;
>
> - if (fds[1].revents) {
> - VIR_DEBUG("Woken up from poll by other thread");
> - if (saferead(client->wakeupReadFD, &ignore, sizeof(ignore)) != sizeof(ignore)) {
> - virReportSystemError(errno, "%s",
> - _("read on wakeup fd failed"));
> - virNetClientMarkClose(client, VIR_CONNECT_CLOSE_REASON_ERROR);
> - error = true;
> - /* Fall through to process any pending data. */
> - }
> - }
> -
> - if (fds[0].revents & POLLHUP)
> + if (rev & G_IO_HUP)
> closeReason = VIR_CONNECT_CLOSE_REASON_EOF;
> else
> closeReason = VIR_CONNECT_CLOSE_REASON_ERROR;
>
> - if (fds[0].revents & POLLOUT) {
> + if (rev & G_IO_OUT) {
> if (virNetClientIOHandleOutput(client) < 0) {
> virNetClientMarkClose(client, closeReason);
> error = true;
> @@ -1693,7 +1711,7 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
> }
> }
>
> - if (fds[0].revents & POLLIN) {
> + if (rev & G_IO_IN) {
> if (virNetClientIOHandleInput(client) < 0) {
> virNetClientMarkClose(client, closeReason);
> error = true;
> @@ -1725,13 +1743,13 @@ static int virNetClientIOEventLoop(virNetClientPtr client,
> if (error)
> goto error;
>
> - if (fds[0].revents & POLLHUP) {
> + if (rev & G_IO_HUP) {
> virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
> _("received hangup event on socket"));
> virNetClientMarkClose(client, closeReason);
> goto error;
> }
> - if (fds[0].revents & POLLERR) {
> + if (rev & G_IO_ERR) {
> virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
> _("received error event on socket"));
> virNetClientMarkClose(client, closeReason);
> @@ -1858,15 +1876,8 @@ static int virNetClientIO(virNetClientPtr client,
>
> /* Check to see if another thread is dispatching */
> if (client->haveTheBuck) {
> - char ignore = 1;
> -
> /* Force other thread to wakeup from poll */
> - if (safewrite(client->wakeupSendFD, &ignore, sizeof(ignore)) != sizeof(ignore)) {
> - virNetClientCallRemove(&client->waitDispatch, thiscall);
> - virReportSystemError(errno, "%s",
> - _("failed to wake up polling thread"));
> - return -1;
> - }
> + g_main_loop_quit(client->eventLoop);
>
> /* If we are non-blocking, detach the thread and keep the call in the
> * queue. */
> --
> 2.24.1
>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 833 bytes
Desc: not available
URL: <http://listman.redhat.com/archives/libvir-list/attachments/20200130/9d5f2473/attachment-0001.sig>
More information about the libvir-list
mailing list