[libvirt] [PATCH v3 02/13] Implement common keepalive handling
Daniel P. Berrange
berrange at redhat.com
Tue Oct 18 09:29:57 UTC 2011
On Wed, Oct 12, 2011 at 07:16:20AM +0200, Jiri Denemark wrote:
> These APIs are used by both client and server RPC layer to handle
> processing of keepalive messages.
> ---
> Notes:
> Version 3:
> - remove ADVERTISE message handling
>
> Version 2:
> - no change
>
> po/POTFILES.in | 1 +
> src/Makefile.am | 3 +-
> src/rpc/virkeepalive.c | 426 ++++++++++++++++++++++++++++++++++++++++++++++++
> src/rpc/virkeepalive.h | 56 +++++++
> 4 files changed, 485 insertions(+), 1 deletions(-)
> create mode 100644 src/rpc/virkeepalive.c
> create mode 100644 src/rpc/virkeepalive.h
>
> diff --git a/po/POTFILES.in b/po/POTFILES.in
> index 5ce35ae..71254dd 100644
> --- a/po/POTFILES.in
> +++ b/po/POTFILES.in
> @@ -72,6 +72,7 @@ src/qemu/qemu_monitor_text.c
> src/qemu/qemu_process.c
> src/remote/remote_client_bodies.h
> src/remote/remote_driver.c
> +src/rpc/virkeepalive.c
> src/rpc/virnetclient.c
> src/rpc/virnetclientprogram.c
> src/rpc/virnetclientstream.c
> diff --git a/src/Makefile.am b/src/Makefile.am
> index af07020..944629c 100644
> --- a/src/Makefile.am
> +++ b/src/Makefile.am
> @@ -1370,7 +1370,8 @@ libvirt_net_rpc_la_SOURCES = \
> rpc/virnetprotocol.h rpc/virnetprotocol.c \
> rpc/virnetsocket.h rpc/virnetsocket.c \
> rpc/virnettlscontext.h rpc/virnettlscontext.c \
> - rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c
> + rpc/virkeepaliveprotocol.h rpc/virkeepaliveprotocol.c \
> + rpc/virkeepalive.h rpc/virkeepalive.c
> if HAVE_SASL
> libvirt_net_rpc_la_SOURCES += \
> rpc/virnetsaslcontext.h rpc/virnetsaslcontext.c
> diff --git a/src/rpc/virkeepalive.c b/src/rpc/virkeepalive.c
> new file mode 100644
> index 0000000..44cc322
> --- /dev/null
> +++ b/src/rpc/virkeepalive.c
> @@ -0,0 +1,426 @@
> +/*
> + * virkeepalive.c: keepalive handling
> + *
> + * Copyright (C) 2011 Red Hat, Inc.
> + *
> + * This library is free software; you can redistribute it and/or
> + * modify it under the terms of the GNU Lesser General Public
> + * License as published by the Free Software Foundation; either
> + * version 2.1 of the License, or (at your option) any later version.
> + *
> + * This library is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
> + * Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public
> + * License along with this library; if not, write to the Free Software
> + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
> + *
> + * Author: Jiri Denemark <jdenemar at redhat.com>
> + */
> +
> +#include <config.h>
> +
> +#include "memory.h"
> +#include "threads.h"
> +#include "virfile.h"
> +#include "logging.h"
> +#include "util.h"
> +#include "virterror_internal.h"
> +#include "virnetsocket.h"
> +#include "virkeepaliveprotocol.h"
> +#include "virkeepalive.h"
> +
> +#define VIR_FROM_THIS VIR_FROM_RPC
> +#define virNetError(code, ...) \
> + virReportErrorHelper(VIR_FROM_THIS, code, __FILE__, \
> + __FUNCTION__, __LINE__, __VA_ARGS__)
> +
> +struct _virKeepAlive {
> + int refs;
> + virMutex lock;
> +
> + int interval;
> + unsigned int count;
> + unsigned int countToDeath;
> + time_t lastPacketReceived;
> + int timer;
> +
> + virNetMessagePtr response;
> + int responseTimer;
> +
> + virKeepAliveSendFunc sendCB;
> + virKeepAliveDeadFunc deadCB;
> + virKeepAliveFreeFunc freeCB;
> + void *client;
> +};
> +
> +
> +static void
> +virKeepAliveLock(virKeepAlivePtr ka)
> +{
> + virMutexLock(&ka->lock);
> +}
> +
> +static void
> +virKeepAliveUnlock(virKeepAlivePtr ka)
> +{
> + virMutexUnlock(&ka->lock);
> +}
> +
> +
> +static virNetMessagePtr
> +virKeepAliveMessage(int proc)
> +{
> + virNetMessagePtr msg;
> +
> + if (!(msg = virNetMessageNew(false)))
> + return NULL;
> +
> + msg->header.prog = KEEPALIVE_PROGRAM;
> + msg->header.vers = KEEPALIVE_VERSION;
> + msg->header.type = VIR_NET_MESSAGE;
> + msg->header.proc = proc;
> +
> + if (virNetMessageEncodeHeader(msg) < 0 ||
> + virNetMessageEncodePayloadEmpty(msg) < 0) {
> + virNetMessageFree(msg);
> + return NULL;
> + }
> +
> + return msg;
> +}
> +
> +
> +static int
> +virKeepAliveSend(virKeepAlivePtr ka, virNetMessagePtr msg)
> +{
> + int ret;
> + const char *proc = NULL;
> + void *client = ka->client;
> + virKeepAliveSendFunc sendCB = ka->sendCB;
> +
> + switch (msg->header.proc) {
> + case KEEPALIVE_PROC_PING:
> + proc = "request";
> + break;
> + case KEEPALIVE_PROC_PONG:
> + proc = "response";
> + break;
> + }
> +
> + if (!proc) {
> + VIR_WARN("Refusing to send unknown keepalive message: %d",
> + msg->header.proc);
> + return -1;
This exit path requires the caller to free 'msg'
> + }
> +
> + VIR_DEBUG("Sending keepalive %s to client %p", proc, ka->client);
> +
> + ka->refs++;
> + virKeepAliveUnlock(ka);
> +
> + if ((ret = sendCB(client, msg)) < 0) {
> + VIR_WARN("Failed to send keepalive %s to client %p", proc, client);
> + virNetMessageFree(msg);
Where as this exit path free's the msg itself.
> + }
> +
> + virKeepAliveLock(ka);
> + ka->refs--;
> +
> + return ret;
> +}
> +
> +
> +static void
> +virKeepAliveScheduleResponse(virKeepAlivePtr ka)
> +{
> + if (ka->responseTimer == -1)
> + return;
> +
> + VIR_DEBUG("Scheduling keepalive response to client %p", ka->client);
> +
> + if (!ka->response &&
> + !(ka->response = virKeepAliveMessage(KEEPALIVE_PROC_PONG))) {
> + VIR_WARN("Failed to generate keepalive response");
> + return;
> + }
> +
> + virEventUpdateTimeout(ka->responseTimer, 0);
> +}
> +
> +
> +static void
> +virKeepAliveTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
> +{
> + virKeepAlivePtr ka = opaque;
> + time_t now = time(NULL);
> +
> + virKeepAliveLock(ka);
> +
> + VIR_DEBUG("ka=%p, client=%p, countToDeath=%d, lastPacketReceived=%lds ago",
> + ka, ka->client, ka->countToDeath, now - ka->lastPacketReceived);
> +
> + if (now - ka->lastPacketReceived < ka->interval - 1) {
> + int timeout = ka->interval - (now - ka->lastPacketReceived);
> + virEventUpdateTimeout(ka->timer, timeout * 1000);
> + goto cleanup;
> + }
> +
> + if (ka->countToDeath == 0) {
> + virKeepAliveDeadFunc deadCB = ka->deadCB;
> + void *client = ka->client;
> +
> + VIR_WARN("No response from client %p after %d keepalive messages in"
> + " %d seconds",
> + ka->client,
> + ka->count,
> + (int) (now - ka->lastPacketReceived));
> + ka->refs++;
> + virKeepAliveUnlock(ka);
> + deadCB(client);
> + virKeepAliveLock(ka);
> + ka->refs--;
> + } else {
> + virNetMessagePtr msg;
> +
> + ka->countToDeath--;
> + if (!(msg = virKeepAliveMessage(KEEPALIVE_PROC_PING)))
> + VIR_WARN("Failed to generate keepalive request");
> + else
> + ignore_value(virKeepAliveSend(ka, msg));
This might need to change depending on how you fix the return
handling of this method wrt free'ing msg.
> + virEventUpdateTimeout(ka->timer, ka->interval * 1000);
> + }
> +
> +cleanup:
> + virKeepAliveUnlock(ka);
> +}
> +
> +
> +static void
> +virKeepAliveResponseTimer(int timer ATTRIBUTE_UNUSED, void *opaque)
> +{
> + virKeepAlivePtr ka = opaque;
> + virNetMessagePtr msg;
> +
> + virKeepAliveLock(ka);
> +
> + VIR_DEBUG("ka=%p, client=%p, response=%p",
> + ka, ka->client, ka->response);
> +
> + if (ka->response) {
> + msg = ka->response;
> + ka->response = NULL;
> + ignore_value(virKeepAliveSend(ka, msg));
Likewise possible change needed here.
> + }
> +
> + virEventUpdateTimeout(ka->responseTimer, ka->response ? 0 : -1);
> +
> + virKeepAliveUnlock(ka);
> +}
> +
> +
> +static void
> +virKeepAliveTimerFree(void *opaque)
> +{
> + virKeepAliveFree(opaque);
> +}
> +
> +
> +virKeepAlivePtr
> +virKeepAliveNew(int interval,
> + unsigned int count,
> + void *client,
> + virKeepAliveSendFunc sendCB,
> + virKeepAliveDeadFunc deadCB,
> + virKeepAliveFreeFunc freeCB)
> +{
> + virKeepAlivePtr ka;
> +
> + VIR_DEBUG("client=%p, interval=%d, count=%u", client, interval, count);
> +
> + if (VIR_ALLOC(ka) < 0) {
> + virReportOOMError();
> + return NULL;
> + }
> +
> + if (virMutexInit(&ka->lock) < 0) {
> + VIR_FREE(ka);
> + return NULL;
> + }
> +
> + ka->refs = 1;
> + ka->interval = interval;
> + ka->count = count;
> + ka->countToDeath = count;
> + ka->timer = -1;
> + ka->client = client;
> + ka->sendCB = sendCB;
> + ka->deadCB = deadCB;
> + ka->freeCB = freeCB;
> +
> + ka->responseTimer = virEventAddTimeout(-1, virKeepAliveResponseTimer,
> + ka, virKeepAliveTimerFree);
> + if (ka->responseTimer < 0) {
> + virKeepAliveFree(ka);
> + return NULL;
> + }
> + /* the timer now has a reference to ka */
> + ka->refs++;
> +
> + return ka;
> +}
> +
> +
> +void
> +virKeepAliveRef(virKeepAlivePtr ka)
> +{
> + virKeepAliveLock(ka);
> + ka->refs++;
> + VIR_DEBUG("ka=%p, client=%p, refs=%d", ka, ka->client, ka->refs);
> + virKeepAliveUnlock(ka);
> +}
> +
> +
> +void
> +virKeepAliveFree(virKeepAlivePtr ka)
> +{
> + if (!ka)
> + return;
> +
> + virKeepAliveLock(ka);
> + VIR_DEBUG("ka=%p, client=%p, refs=%d", ka, ka->client, ka->refs);
> + if (--ka->refs > 0) {
> + virKeepAliveUnlock(ka);
> + return;
> + }
> +
> + virMutexDestroy(&ka->lock);
> + ka->freeCB(ka->client);
> + VIR_FREE(ka);
> +}
> +
> +
> +int
> +virKeepAliveStart(virKeepAlivePtr ka,
> + int interval,
> + unsigned int count)
> +{
> + int ret = -1;
> + time_t delay;
> + int timeout;
> +
> + VIR_DEBUG("ka=%p, client=%p, interval=%d, count=%u",
> + ka, ka->client, interval, count);
> +
> + virKeepAliveLock(ka);
> +
> + if (ka->timer >= 0) {
> + VIR_DEBUG("Keepalive messages already enabled");
> + ret = 0;
> + goto cleanup;
> + }
> +
> + if (interval > 0) {
> + if (ka->interval > 0) {
> + virNetError(VIR_ERR_INTERNAL_ERROR, "%s",
> + _("keepalive interval already set"));
> + goto cleanup;
> + }
> + ka->interval = interval;
> + ka->count = count;
> + ka->countToDeath = count;
> + }
> +
> + if (ka->interval <= 0) {
> + VIR_DEBUG("Keepalive messages disabled by configuration");
> + ret = 0;
> + goto cleanup;
> + }
> +
> + VIR_DEBUG("Enabling keepalive messages; interval=%d, count=%u",
> + ka->interval, ka->count);
> +
> + delay = time(NULL) - ka->lastPacketReceived;
> + if (delay > ka->interval)
> + timeout = 0;
> + else
> + timeout = ka->interval - delay;
> + ka->timer = virEventAddTimeout(timeout * 1000, virKeepAliveTimer,
> + ka, virKeepAliveTimerFree);
> + if (ka->timer < 0)
> + goto cleanup;
> +
> + /* the timer now has another reference to this object */
> + ka->refs++;
> + ret = 0;
> +
> +cleanup:
> + virKeepAliveUnlock(ka);
> + return ret;
> +}
> +
> +
> +void
> +virKeepAliveStop(virKeepAlivePtr ka)
> +{
> + VIR_DEBUG("ka=%p, client=%p", ka, ka->client);
> +
> + virKeepAliveLock(ka);
> + if (ka->timer > 0) {
> + virEventRemoveTimeout(ka->timer);
> + ka->timer = -1;
> + }
> + if (ka->responseTimer > 0) {
> + virEventRemoveTimeout(ka->responseTimer);
> + ka->responseTimer = -1;
> + }
> + virKeepAliveUnlock(ka);
> +}
Do we need to clear any dangling 'ka->response' object ?
ACK if those questions are cleared up.
Bonus points if you fancy inserting some DTrace/SystemTAP probes to the
code as a later patch.
Daniel
--
|: http://berrange.com -o- http://www.flickr.com/photos/dberrange/ :|
|: http://libvirt.org -o- http://virt-manager.org :|
|: http://autobuild.org -o- http://search.cpan.org/~danberr/ :|
|: http://entangle-photo.org -o- http://live.gnome.org/gtk-vnc :|
More information about the libvir-list
mailing list