[libvirt] [PATCH v3 02/13] Implement common keepalive handling

Daniel P. Berrange berrange at redhat.com
Tue Oct 18 09:29:57 UTC 2011


On Wed, Oct 12, 2011 at 07:16:20AM +0200, Jiri Denemark wrote:
> These APIs are used by both client and server RPC layer to handle
> processing of keepalive messages.
> ---
> Notes:
>     Version 3:
>     - remove ADVERTISE message handling
>     
>     Version 2:
>     - no change
> 
>  po/POTFILES.in         |    1 +
>  src/Makefile.am        |    3 +-
>  src/rpc/virkeepalive.c |  426 ++++++++++++++++++++++++++++++++++++++++++++++++
>  src/rpc/virkeepalive.h |   56 +++++++
>  4 files changed, 485 insertions(+), 1 deletions(-)
>  create mode 100644 src/rpc/virkeepalive.c
>  create mode 100644 src/rpc/virkeepalive.h
> 
> diff --git a/po/POTFILES.in b/po/POTFILES.in
> index 5ce35ae..71254dd 100644
> --- a/po/POTFILES.in
> +++ b/po/POTFILES.in
> @@ -72,6 +72,7 @@ src/qemu/qemu_monitor_text.c
>  src/qemu/qemu_process.c
>  src/remote/remote_client_bodies.h
>  src/remote/remote_driver.c
> +src/rpc/virkeepalive.c
>  src/rpc/virnetclient.c
>  src/rpc/virnetclientprogram.c
>  src/rpc/virnetclientstream.c
> diff --git a/src/Makefile.am b/src/Makefile.am
> index af07020..944629c 100644
> --- a/src/Makefile.am
> +++ b/src/Makefile.am
> @@ -1370,7 +1370,8 @@ libvirt_net_rpc_la_SOURCES = \
>  	rpc/virnetprotocol.h rpc/virnetprotocol.c \
>  	rpc/virnetsocket.h rpc/virnetsocket.c \
>  	rpc/virnettlscontext.h rpc/virnettlscontext.c \
> -	rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c
> +	rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c \
> +	rpc/virkeepalive.h rpc/virkeepalive.c
>  if HAVE_SASL
>  libvirt_net_rpc_la_SOURCES += \
>  	rpc/virnetsaslcontext.h rpc/virnetsaslcontext.c
> diff --git a/src/rpc/virkeepalive.c b/src/rpc/virkeepalive.c
> new file mode 100644
> index 0000000..44cc322
> --- /dev/null
> +++ b/src/rpc/virkeepalive.c
> @@ -0,0 +1,426 @@
> +/*
> + * virkeepalive.c: keepalive handling
> + *
> + * Copyright (C) 2011 Red Hat, Inc.
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307  USA
> + *
> + * Author: Jiri Denemark <jdenemar at redhat.com>
> + */
> +
> +#include <config.h>
> +
> +#include "memory.h"
> +#include "threads.h"
> +#include "virfile.h"
> +#include "logging.h"
> +#include "util.h"
> +#include "virterror_internal.h"
> +#include "virnetsocket.h"
> +#include "virkeepaliveprotocol.h"
> +#include "virkeepalive.h"
> +
> +#define VIR_FROM_THIS VIR_FROM_RPC
> +#define virNetError(code, ...)                                    \
> +    virReportErrorHelper(VIR_FROM_THIS, code, __FILE__,           \
> +                         __FUNCTION__, __LINE__, __VA_ARGS__)
> +
> +struct _virKeepAlive {
> +    int refs;
> +    virMutex lock;
> +
> +    int interval;
> +    unsigned int count;
> +    unsigned int countToDeath;
> +    time_t lastPacketReceived;
> +    int timer;
> +
> +    virNetMessagePtr response;
> +    int responseTimer;
> +
> +    virKeepAliveSendFunc sendCB;
> +    virKeepAliveDeadFunc deadCB;
> +    virKeepAliveFreeFunc freeCB;
> +    void *client;
> +};
> +
> +
> +static void
> +virKeepAliveLock(virKeepAlivePtr ka)
> +{
> +    virMutexLock(&ka->lock);
> +}
> +
> +static void
> +virKeepAliveUnlock(virKeepAlivePtr ka)
> +{
> +    virMutexUnlock(&ka->lock);
> +}
> +
> +
> +static virNetMessagePtr
> +virKeepAliveMessage(int proc)
> +{
> +    virNetMessagePtr msg;
> +
> +    if (!(msg = virNetMessageNew(false)))
> +        return NULL;
> +
> +    msg->header.prog = KEEPALIVE_PROGRAM;
> +    msg->header.vers = KEEPALIVE_VERSION;
> +    msg->header.type = VIR_NET_MESSAGE;
> +    msg->header.proc = proc;
> +
> +    if (virNetMessageEncodeHeader(msg) < 0 ||
> +        virNetMessageEncodePayloadEmpty(msg) < 0) {
> +        virNetMessageFree(msg);
> +        return NULL;
> +    }
> +
> +    return msg;
> +}
> +
> +
> +static int
> +virKeepAliveSend(virKeepAlivePtr ka, virNetMessagePtr msg)
> +{
> +    int ret;
> +    const char *proc = NULL;
> +    void *client = ka->client;
> +    virKeepAliveSendFunc sendCB = ka->sendCB;
> +
> +    switch (msg->header.proc) {
> +    case KEEPALIVE_PROC_PING:
> +        proc = "request";
> +        break;
> +    case KEEPALIVE_PROC_PONG:
> +        proc = "response";
> +        break;
> +    }
> +
> +    if (!proc) {
> +        VIR_WARN("Refusing to send unknown keepalive message: %d",
> +                 msg->header.proc);
> +        return -1;

This exit path requires the caller to free 'msg'

> +    }
> +
> +    VIR_DEBUG("Sending keepalive %s to client %p", proc, ka->client);
> +
> +    ka->refs++;
> +    virKeepAliveUnlock(ka);
> +
> +    if ((ret = sendCB(client, msg)) < 0) {
> +        VIR_WARN("Failed to send keepalive %s to client %p", proc, client);
> +        virNetMessageFree(msg);

Where as this exit path free's the msg itself.


> +    }
> +
> +    virKeepAliveLock(ka);
> +    ka->refs--;
> +
> +    return ret;
> +}
> +
> +
> +static void
> +virKeepAliveScheduleResponse(virKeepAlivePtr ka)
> +{
> +    if (ka->responseTimer == -1)
> +        return;
> +
> +    VIR_DEBUG("Scheduling keepalive response to client %p", ka->client);
> +
> +    if (!ka->response &&
> +        !(ka->response = virKeepAliveMessage(KEEPALIVE_PROC_PONG))) {
> +        VIR_WARN("Failed to generate keepalive response");
> +        return;
> +    }
> +
> +    virEventUpdateTimeout(ka->responseTimer, 0);
> +}
> +
> +
> +static void
> +virKeepAliveTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
> +{
> +    virKeepAlivePtr ka = opaque;
> +    time_t now = time(NULL);
> +
> +    virKeepAliveLock(ka);
> +
> +    VIR_DEBUG("ka=%p, client=%p, countToDeath=%d, lastPacketReceived=%lds ago",
> +              ka, ka->client, ka->countToDeath, now - ka->lastPacketReceived);
> +
> +    if (now - ka->lastPacketReceived < ka->interval - 1) {
> +        int timeout = ka->interval - (now - ka->lastPacketReceived);
> +        virEventUpdateTimeout(ka->timer, timeout * 1000);
> +        goto cleanup;
> +    }
> +
> +    if (ka->countToDeath == 0) {
> +        virKeepAliveDeadFunc deadCB = ka->deadCB;
> +        void *client = ka->client;
> +
> +        VIR_WARN("No response from client %p after %d keepalive messages in"
> +                 " %d seconds",
> +                 ka->client,
> +                 ka->count,
> +                 (int) (now - ka->lastPacketReceived));
> +        ka->refs++;
> +        virKeepAliveUnlock(ka);
> +        deadCB(client);
> +        virKeepAliveLock(ka);
> +        ka->refs--;
> +    } else {
> +        virNetMessagePtr msg;
> +
> +        ka->countToDeath--;
> +        if (!(msg = virKeepAliveMessage(KEEPALIVE_PROC_PING)))
> +            VIR_WARN("Failed to generate keepalive request");
> +        else
> +            ignore_value(virKeepAliveSend(ka, msg));

This might need to change depending on how you fix the return
handling of this method wrt free'ing msg.

> +        virEventUpdateTimeout(ka->timer, ka->interval * 1000);
> +    }
> +
> +cleanup:
> +    virKeepAliveUnlock(ka);
> +}
> +
> +
> +static void
> +virKeepAliveResponseTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
> +{
> +    virKeepAlivePtr ka = opaque;
> +    virNetMessagePtr msg;
> +
> +    virKeepAliveLock(ka);
> +
> +    VIR_DEBUG("ka=%p, client=%p, response=%p",
> +              ka, ka->client, ka->response);
> +
> +    if (ka->response) {
> +        msg = ka->response;
> +        ka->response = NULL;
> +        ignore_value(virKeepAliveSend(ka, msg));

Likewise possible change needed here.

> +    }
> +
> +    virEventUpdateTimeout(ka->responseTimer, ka->response ? 0 : -1);
> +
> +    virKeepAliveUnlock(ka);
> +}
> +
> +
> +static void
> +virKeepAliveTimerFree(void *opaque)
> +{
> +    virKeepAliveFree(opaque);
> +}
> +
> +
> +virKeepAlivePtr
> +virKeepAliveNew(int interval,
> +                unsigned int count,
> +                void *client,
> +                virKeepAliveSendFunc sendCB,
> +                virKeepAliveDeadFunc deadCB,
> +                virKeepAliveFreeFunc freeCB)
> +{
> +    virKeepAlivePtr ka;
> +
> +    VIR_DEBUG("client=%p, interval=%d, count=%u", client, interval, count);
> +
> +    if (VIR_ALLOC(ka) < 0) {
> +        virReportOOMError();
> +        return NULL;
> +    }
> +
> +    if (virMutexInit(&ka->lock) < 0) {
> +        VIR_FREE(ka);
> +        return NULL;
> +    }
> +
> +    ka->refs = 1;
> +    ka->interval = interval;
> +    ka->count = count;
> +    ka->countToDeath = count;
> +    ka->timer = -1;
> +    ka->client = client;
> +    ka->sendCB = sendCB;
> +    ka->deadCB = deadCB;
> +    ka->freeCB = freeCB;
> +
> +    ka->responseTimer = virEventAddTimeout(-1, virKeepAliveResponseTimer,
> +                                           ka, virKeepAliveTimerFree);
> +    if (ka->responseTimer < 0) {
> +        virKeepAliveFree(ka);
> +        return NULL;
> +    }
> +    /* the timer now has a reference to ka */
> +    ka->refs++;
> +
> +    return ka;
> +}
> +
> +
> +void
> +virKeepAliveRef(virKeepAlivePtr ka)
> +{
> +    virKeepAliveLock(ka);
> +    ka->refs++;
> +    VIR_DEBUG("ka=%p, client=%p, refs=%d", ka, ka->client, ka->refs);
> +    virKeepAliveUnlock(ka);
> +}
> +
> +
> +void
> +virKeepAliveFree(virKeepAlivePtr ka)
> +{
> +    if (!ka)
> +        return;
> +
> +    virKeepAliveLock(ka);
> +    VIR_DEBUG("ka=%p, client=%p, refs=%d", ka, ka->client, ka->refs);
> +    if (--ka->refs > 0) {
> +        virKeepAliveUnlock(ka);
> +        return;
> +    }
> +
> +    virMutexDestroy(&ka->lock);
> +    ka->freeCB(ka->client);
> +    VIR_FREE(ka);
> +}
> +
> +
> +int
> +virKeepAliveStart(virKeepAlivePtr ka,
> +                  int interval,
> +                  unsigned int count)
> +{
> +    int ret = -1;
> +    time_t delay;
> +    int timeout;
> +
> +    VIR_DEBUG("ka=%p, client=%p, interval=%d, count=%u",
> +              ka, ka->client, interval, count);
> +
> +    virKeepAliveLock(ka);
> +
> +    if (ka->timer >= 0) {
> +        VIR_DEBUG("Keepalive messages already enabled");
> +        ret = 0;
> +        goto cleanup;
> +    }
> +
> +    if (interval > 0) {
> +        if (ka->interval > 0) {
> +            virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
> +                        _("keepalive interval already set"));
> +            goto cleanup;
> +        }
> +        ka->interval = interval;
> +        ka->count = count;
> +        ka->countToDeath = count;
> +    }
> +
> +    if (ka->interval <= 0) {
> +        VIR_DEBUG("Keepalive messages disabled by configuration");
> +        ret = 0;
> +        goto cleanup;
> +    }
> +
> +    VIR_DEBUG("Enabling keepalive messages; interval=%d, count=%u",
> +              ka->interval, ka->count);
> +
> +    delay = time(NULL) - ka->lastPacketReceived;
> +    if (delay > ka->interval)
> +        timeout = 0;
> +    else
> +        timeout = ka->interval - delay;
> +    ka->timer = virEventAddTimeout(timeout * 1000, virKeepAliveTimer,
> +                                   ka, virKeepAliveTimerFree);
> +    if (ka->timer < 0)
> +        goto cleanup;
> +
> +    /* the timer now has another reference to this object */
> +    ka->refs++;
> +    ret = 0;
> +
> +cleanup:
> +    virKeepAliveUnlock(ka);
> +    return ret;
> +}
> +
> +
> +void
> +virKeepAliveStop(virKeepAlivePtr ka)
> +{
> +    VIR_DEBUG("ka=%p, client=%p", ka, ka->client);
> +
> +    virKeepAliveLock(ka);
> +    if (ka->timer > 0) {
> +        virEventRemoveTimeout(ka->timer);
> +        ka->timer = -1;
> +    }
> +    if (ka->responseTimer > 0) {
> +        virEventRemoveTimeout(ka->responseTimer);
> +        ka->responseTimer = -1;
> +    }
> +    virKeepAliveUnlock(ka);
> +}

Do we need to clear any dangling  'ka->response' object ?


ACK if those questions are cleared up.

Bonus points if you fancy inserting some DTrace/SystemTAP probes  to the
code as a later patch.

Daniel
-- 
|: http://berrange.com      -o-    http://www.flickr.com/photos/dberrange/ :|
|: http://libvirt.org              -o-             http://virt-manager.org :|
|: http://autobuild.org       -o-         http://search.cpan.org/~danberr/ :|
|: http://entangle-photo.org       -o-       http://live.gnome.org/gtk-vnc :|




More information about the libvir-list mailing list