[Libguestfs] [nbdkit PATCH 4/6] connections: Add thread-safe status indicator

Eric Blake eblake at redhat.com
Fri Nov 17 03:26:55 UTC 2017


Once we have multiple threads during parallel processing, we need
to be sure that any I/O error flagged by one thread prevents the
next thread from attempting I/O.  Although we already have a
separate lock for reads and writes, it's easier if status is
shared by both actions, which needs yet another mutex; however
we can optimize (via accessor functions) and only need to use the
mutex if there are actually multiple threads at work.

The next thing to notice is that because we now update status at
all important points, the return value of _handle_single_connection()
can just be the latest status, and recv_request_send_reply() can now
return void.  This will come in handy later as it will avoid trying
to coordinate a value out of multiple threads.

Signed-off-by: Eric Blake <eblake at redhat.com>
---
 src/connections.c | 99 ++++++++++++++++++++++++++++++++++++++-----------------
 1 file changed, 69 insertions(+), 30 deletions(-)

diff --git a/src/connections.c b/src/connections.c
index dd43a9a..75d8884 100644
--- a/src/connections.c
+++ b/src/connections.c
@@ -64,8 +64,11 @@ struct connection {
   pthread_mutex_t request_lock;
   pthread_mutex_t read_lock;
   pthread_mutex_t write_lock;
+  pthread_mutex_t status_lock;
+  int status; /* 1 for more I/O with client, 0 for shutdown, -1 on error */
   void *handle;
   void *crypto_session;
+  int nworkers; /* TODO set up a thread pool for parallel workers */

   uint64_t exportsize;
   int readonly;
@@ -83,7 +86,7 @@ struct connection {
 static struct connection *new_connection (int sockin, int sockout);
 static void free_connection (struct connection *conn);
 static int negotiate_handshake (struct connection *conn);
-static int recv_request_send_reply (struct connection *conn);
+static void recv_request_send_reply (struct connection *conn);

 /* Don't call these raw socket functions directly.  Use conn->recv etc. */
 static int raw_recv (struct connection *, void *buf, size_t len);
@@ -146,40 +149,58 @@ connection_set_close (struct connection *conn, connection_close_function close)
 }

 static int
+get_status (struct connection *conn)
+{
+  int r;
+
+  if (conn->nworkers)
+    pthread_mutex_lock (&conn->status_lock);
+  r = conn->status;
+  if (conn->nworkers)
+    pthread_mutex_unlock (&conn->status_lock);
+  return r;
+}
+
+/* Update the status if the new value is lower than the existing value. */
+static void
+set_status (struct connection *conn, int value)
+{
+  if (conn->nworkers)
+    pthread_mutex_lock (&conn->status_lock);
+  if (value < conn->status)
+    conn->status = value;
+  if (conn->nworkers)
+    pthread_mutex_unlock (&conn->status_lock);
+}
+
+static int
 _handle_single_connection (int sockin, int sockout)
 {
-  int r;
+  int r = -1;
   struct connection *conn = new_connection (sockin, sockout);

   if (!conn)
-    goto err;
+    goto done;

   if (plugin_open (conn, readonly) == -1)
-    goto err;
+    goto done;

   threadlocal_set_name (plugin_name ());

   /* Handshake. */
   if (negotiate_handshake (conn) == -1)
-    goto err;
+    goto done;

   /* Process requests.  XXX Allow these to be dispatched in parallel using
    * a thread pool.
    */
-  while (!quit) {
-    r = recv_request_send_reply (conn);
-    if (r == -1)
-      goto err;
-    if (r == 0)
-      break;
-  }
+  while (!quit && get_status (conn) > 0)
+    recv_request_send_reply (conn);

+  r = get_status (conn);
+ done:
   free_connection (conn);
-  return 0;
-
- err:
-  free_connection (conn);
-  return -1;
+  return r;
 }

 int
@@ -205,11 +226,13 @@ new_connection (int sockin, int sockout)
     return NULL;
   }

+  conn->status = 1;
   conn->sockin = sockin;
   conn->sockout = sockout;
   pthread_mutex_init (&conn->request_lock, NULL);
   pthread_mutex_init (&conn->read_lock, NULL);
   pthread_mutex_init (&conn->write_lock, NULL);
+  pthread_mutex_init (&conn->status_lock, NULL);

   conn->recv = raw_recv;
   conn->send = raw_send;
@@ -229,6 +252,7 @@ free_connection (struct connection *conn)
   pthread_mutex_destroy (&conn->request_lock);
   pthread_mutex_destroy (&conn->read_lock);
   pthread_mutex_destroy (&conn->write_lock);
+  pthread_mutex_destroy (&conn->status_lock);

   /* Don't call the plugin again if quit has been set because the main
    * thread will be in the process of unloading it.  The plugin.unload
@@ -883,7 +907,7 @@ nbd_errno (int error)
   }
 }

-static int
+static void
 recv_request_send_reply (struct connection *conn)
 {
   int r;
@@ -895,23 +919,30 @@ recv_request_send_reply (struct connection *conn)

   /* Read the request packet. */
   pthread_mutex_lock (&conn->read_lock);
+  if (get_status (conn) < 0) {
+    pthread_mutex_unlock (&conn->read_lock);
+    return;
+  }
   r = conn->recv (conn, &request, sizeof request);
   if (r == -1) {
     nbdkit_error ("read request: %m");
+    set_status (conn, -1);
     pthread_mutex_unlock (&conn->read_lock);
-    return -1;
+    return;
   }
   if (r == 0) {
     debug ("client closed input socket, closing connection");
+    set_status (conn, 0);
     pthread_mutex_unlock (&conn->read_lock);
-    return 0;                   /* disconnect */
+    return;                   /* disconnect */
   }

   magic = be32toh (request.magic);
   if (magic != NBD_REQUEST_MAGIC) {
     nbdkit_error ("invalid request: 'magic' field is incorrect (0x%x)", magic);
+    set_status (conn, -1);
     pthread_mutex_unlock (&conn->read_lock);
-    return -1;
+    return;
   }

   cmd = be32toh (request.type);
@@ -923,16 +954,18 @@ recv_request_send_reply (struct connection *conn)

   if (cmd == NBD_CMD_DISC) {
     debug ("client sent disconnect command, closing connection");
+    set_status (conn, 0);
     pthread_mutex_unlock (&conn->read_lock);
-    return 0;                   /* disconnect */
+    return;                   /* disconnect */
   }

   /* Validate the request. */
   if (!validate_request (conn, cmd, flags, offset, count, &error)) {
     if (cmd == NBD_CMD_WRITE &&
         skip_over_write_buffer (conn->sockin, count) < 0) {
+      set_status (conn, -1);
       pthread_mutex_unlock (&conn->read_lock);
-      return -1;
+      return;
     }
     pthread_mutex_unlock (&conn->read_lock);
     goto send_reply;
@@ -946,8 +979,9 @@ recv_request_send_reply (struct connection *conn)
       error = ENOMEM;
       if (cmd == NBD_CMD_WRITE &&
           skip_over_write_buffer (conn->sockin, count) < 0) {
+        set_status (conn, -1);
         pthread_mutex_unlock (&conn->read_lock);
-        return -1;
+        return;
       }
       pthread_mutex_unlock (&conn->read_lock);
       goto send_reply;
@@ -963,14 +997,15 @@ recv_request_send_reply (struct connection *conn)
     }
     if (r == -1) {
       nbdkit_error ("read data: %m");
+      set_status (conn, -1);
       pthread_mutex_unlock (&conn->read_lock);
-      return -1;
+      return;
     }
   }
   pthread_mutex_unlock (&conn->read_lock);

   /* Perform the request.  Only this part happens inside the request lock. */
-  if (quit) {
+  if (quit || !get_status (conn)) {
     error = ESHUTDOWN;
   }
   else {
@@ -982,6 +1017,10 @@ recv_request_send_reply (struct connection *conn)
   /* Send the reply packet. */
  send_reply:
   pthread_mutex_lock (&conn->write_lock);
+  if (get_status (conn) < 0) {
+    pthread_mutex_unlock (&conn->write_lock);
+    return;
+  }
   reply.magic = htobe32 (NBD_REPLY_MAGIC);
   reply.handle = request.handle;
   reply.error = htobe32 (nbd_errno (error));
@@ -999,7 +1038,8 @@ recv_request_send_reply (struct connection *conn)
   if (r == -1) {
     nbdkit_error ("write reply: %m");
     pthread_mutex_unlock (&conn->write_lock);
-    return -1;
+    set_status (conn, -1);
+    return;
   }

   /* Send the read data buffer. */
@@ -1007,13 +1047,12 @@ recv_request_send_reply (struct connection *conn)
     r = conn->send (conn, buf, count);
     if (r == -1) {
       nbdkit_error ("write data: %m");
+      set_status (conn, -1);
       pthread_mutex_unlock (&conn->write_lock);
-      return -1;
+      return;
     }
   }
   pthread_mutex_unlock (&conn->write_lock);
-
-  return 1;                     /* command processed ok */
 }

 /* Write buffer to conn->sockout and either succeed completely
-- 
2.13.6




More information about the Libguestfs mailing list