[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[libvirt] [PATCH 2/2] Java bindings for domain events



This patch allows the remote driver to work with an asynchronous
EventImpl (it's the only one using an externally-supplied one), assuming
libvirt is compiled with pthread support.  (Without pthreads, this code
is harmless in a single-threaded environment.)

Basically it uses a mutex to protect reads from the RPC socket in such a
way that message reads (in their entirety) are done atomically
(otherwise the remoteDomainEventFired() can race the call() code that
reads replies & events).

In addition, I update the EventImpl handle to prevent
remoteDomainEventFired() from being called everytime a reply is sent.
(This helps us dispatch events in a timely manner, though it's not
strictly necessary.  Without it, any events coming in during a call()
won't be dispatched until the call drops the socket lock (because
remoteDomainEventFired() will be stuck awaiting the lock).

Dave

diff --git a/src/remote_internal.c b/src/remote_internal.c
index 2ca7930..59128f6 100644
--- a/src/remote_internal.c
+++ b/src/remote_internal.c
@@ -116,6 +116,7 @@ struct private_data {
     virDomainEventQueuePtr domainEvents;
     /* Timer for flushing domainEvents queue */
     int eventFlushTimer;
+    PTHREAD_MUTEX_T(lock);      /* Serializes socket reads w/async EventImpl */
 };
 
 #define GET_PRIVATE(conn,retcode)                                       \
@@ -700,6 +701,9 @@ doRemoteOpen (virConnectPtr conn,
     } /* switch (transport) */
 
 
+    /* This must precede the first call() */
+    priv->eventFlushTimer = -1;
+
     /* Try and authenticate with server */
     if (remoteAuthenticate(conn, priv, 1, auth, authtype) == -1)
         goto failed;
@@ -744,6 +748,8 @@ doRemoteOpen (virConnectPtr conn,
         }
     }
 
+    pthread_mutex_init(&priv->lock, NULL);
+
     if(VIR_ALLOC(priv->callbackList)<0) {
         error(conn, VIR_ERR_INVALID_ARG, _("Error allocating callbacks list"));
         goto failed;
@@ -1250,6 +1256,8 @@ doRemoteClose (virConnectPtr conn, struct private_data *priv)
     /* Free queued events */
     virDomainEventQueueFree(priv->domainEvents);
 
+    pthread_mutex_destroy(&priv->lock);
+
     return 0;
 }
 
@@ -4536,11 +4544,11 @@ static int really_read (virConnectPtr conn, struct private_data *priv,
  * else Bad Things will happen in the XDR code.
  */
 static int
-call (virConnectPtr conn, struct private_data *priv,
-      int flags /* if we are in virConnectOpen */,
-      int proc_nr,
-      xdrproc_t args_filter, char *args,
-      xdrproc_t ret_filter, char *ret)
+really_call (virConnectPtr conn, struct private_data *priv,
+             int flags /* if we are in virConnectOpen */,
+             int proc_nr,
+             xdrproc_t args_filter, char *args,
+             xdrproc_t ret_filter, char *ret)
 {
     char buffer[REMOTE_MESSAGE_MAX];
     char buffer2[4];
@@ -4596,16 +4604,18 @@ call (virConnectPtr conn, struct private_data *priv,
         really_write (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len-4) == -1)
         return -1;
 
+    pthread_mutex_lock(&priv->lock);
+
 retry_read:
     /* Read and deserialise length word. */
     if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer2, sizeof buffer2) == -1)
-        return -1;
+        goto unlock_return_err;
 
     xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE);
     if (!xdr_int (&xdr, &len)) {
         error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
                VIR_ERR_RPC, _("xdr_int (length word, reply)"));
-        return -1;
+        goto unlock_return_err;
     }
     xdr_destroy (&xdr);
 
@@ -4615,12 +4625,14 @@ retry_read:
     if (len < 0 || len > REMOTE_MESSAGE_MAX) {
         error (flags & REMOTE_CALL_IN_OPEN ? NULL : conn,
                VIR_ERR_RPC, _("packet received from server too large"));
-        return -1;
+        goto unlock_return_err;
     }
 
     /* Read reply header and what follows (either a ret or an error). */
     if (really_read (conn, priv, flags & REMOTE_CALL_IN_OPEN, buffer, len) == -1)
-        return -1;
+        goto unlock_return_err;
+
+    pthread_mutex_unlock(&priv->lock);
 
     /* Deserialise reply header. */
     xdrmem_create (&xdr, buffer, len, XDR_DECODE);
@@ -4729,8 +4741,33 @@ retry_read:
         xdr_destroy (&xdr);
         return -1;
     }
+
+ unlock_return_err:
+    pthread_mutex_unlock(&priv->lock);
+    return -1;
+}
+
+static int call (virConnectPtr conn, struct private_data *priv,
+                 int flags /* if we are in virConnectOpen */,
+                 int proc_nr,
+                 xdrproc_t args_filter, char *args,
+                 xdrproc_t ret_filter, char *ret)
+{
+    int rv;
+    if (priv->eventFlushTimer >= 0)
+        virEventUpdateHandle(priv->sock, 0);
+    rv = really_call(conn, priv, flags, proc_nr,
+                     args_filter, args,
+                     ret_filter, ret);
+    if (priv->eventFlushTimer >= 0)
+        virEventUpdateHandle(priv->sock,
+                             VIR_EVENT_HANDLE_READABLE |
+                             VIR_EVENT_HANDLE_ERROR |
+                             VIR_EVENT_HANDLE_HANGUP);
+    return rv;
 }
 
+
 static int
 really_write_buf (virConnectPtr conn, struct private_data *priv,
                   int in_open /* if we are in virConnectOpen */,
@@ -5287,14 +5324,16 @@ remoteDomainEventFired(int fd ATTRIBUTE_UNUSED,
          return;
     }
 
+    pthread_mutex_lock(&priv->lock);
+
     /* Read and deserialise length word. */
     if (really_read (conn, priv, 0, buffer2, sizeof buffer2) == -1)
-        return;
+        goto unlock_and_return;
 
     xdrmem_create (&xdr, buffer2, sizeof buffer2, XDR_DECODE);
     if (!xdr_int (&xdr, &len)) {
         error (conn, VIR_ERR_RPC, _("xdr_int (length word, reply)"));
-        return;
+        goto unlock_and_return;
     }
     xdr_destroy (&xdr);
 
@@ -5303,15 +5342,17 @@ remoteDomainEventFired(int fd ATTRIBUTE_UNUSED,
 
     if (len < 0 || len > REMOTE_MESSAGE_MAX) {
         error (conn, VIR_ERR_RPC, _("packet received from server too large"));
-        return;
+        goto unlock_and_return;
     }
 
     /* Read reply header and what follows (either a ret or an error). */
     if (really_read (conn, priv, 0, buffer, len) == -1) {
         error (conn, VIR_ERR_RPC, _("error reading buffer from memory"));
-        return;
+        goto unlock_and_return;
     }
 
+    pthread_mutex_unlock(&priv->lock);
+
     /* Deserialise reply header. */
     xdrmem_create (&xdr, buffer, len, XDR_DECODE);
     if (!xdr_remote_message_header (&xdr, &hdr)) {
@@ -5327,6 +5368,11 @@ remoteDomainEventFired(int fd ATTRIBUTE_UNUSED,
         DEBUG0("invalid proc in event firing");
         error (conn, VIR_ERR_RPC, _("invalid proc in event firing"));
     }
+
+    return;
+
+ unlock_and_return:
+    pthread_mutex_unlock(&priv->lock);
 }
 
 void

[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]