[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[libvirt] [PATCH 02/15] Allow RPC server to run single threaded



From: "Daniel P. Berrange" <berrange redhat com>

Refactor the RPC server dispatcher code so that if 'max_workers==0'
the entire server will run single threaded. This is useful for
use cases where there will only ever be 1 client connected
which serializes its requests

Signed-off-by: Daniel P. Berrange <berrange redhat com>
---
 src/rpc/virnetserver.c |  113 +++++++++++++++++++++++++++++-------------------
 1 file changed, 68 insertions(+), 45 deletions(-)

diff --git a/src/rpc/virnetserver.c b/src/rpc/virnetserver.c
index 358666d..4a02aab 100644
--- a/src/rpc/virnetserver.c
+++ b/src/rpc/virnetserver.c
@@ -127,49 +127,64 @@ static void virNetServerUnlock(virNetServerPtr srv)
 }
 
 
-static void virNetServerHandleJob(void *jobOpaque, void *opaque)
+static int virNetServerProcessMsg(virNetServerPtr srv,
+                                  virNetServerClientPtr client,
+                                  virNetServerProgramPtr prog,
+                                  virNetMessagePtr msg)
 {
-    virNetServerPtr srv = opaque;
-    virNetServerJobPtr job = jobOpaque;
-
-    VIR_DEBUG("server=%p client=%p message=%p prog=%p",
-              srv, job->client, job->msg, job->prog);
-
-    if (!job->prog) {
+    int ret = -1;
+    if (!prog) {
         /* Only send back an error for type == CALL. Other
          * message types are not expecting replies, so we
          * must just log it & drop them
          */
-        if (job->msg->header.type == VIR_NET_CALL ||
-            job->msg->header.type == VIR_NET_CALL_WITH_FDS) {
-            if (virNetServerProgramUnknownError(job->client,
-                                                job->msg,
-                                                &job->msg->header) < 0)
-                goto error;
+        if (msg->header.type == VIR_NET_CALL ||
+            msg->header.type == VIR_NET_CALL_WITH_FDS) {
+            if (virNetServerProgramUnknownError(client,
+                                                msg,
+                                                &msg->header) < 0)
+                goto cleanup;
         } else {
             VIR_INFO("Dropping client mesage, unknown program %d version %d type %d proc %d",
-                     job->msg->header.prog, job->msg->header.vers,
-                     job->msg->header.type, job->msg->header.proc);
+                     msg->header.prog, msg->header.vers,
+                     msg->header.type, msg->header.proc);
             /* Send a dummy reply to free up 'msg' & unblock client rx */
-            virNetMessageClear(job->msg);
-            job->msg->header.type = VIR_NET_REPLY;
-            if (virNetServerClientSendMessage(job->client, job->msg) < 0)
-                goto error;
+            virNetMessageClear(msg);
+            msg->header.type = VIR_NET_REPLY;
+            if (virNetServerClientSendMessage(client, msg) < 0)
+                goto cleanup;
         }
-        goto cleanup;
+        goto done;
     }
 
-    if (virNetServerProgramDispatch(job->prog,
+    if (virNetServerProgramDispatch(prog,
                                     srv,
-                                    job->client,
-                                    job->msg) < 0)
+                                    client,
+                                    msg) < 0)
+        goto cleanup;
+
+done:
+    ret = 0;
+
+cleanup:
+    return ret;
+}
+
+static void virNetServerHandleJob(void *jobOpaque, void *opaque)
+{
+    virNetServerPtr srv = opaque;
+    virNetServerJobPtr job = jobOpaque;
+
+    VIR_DEBUG("server=%p client=%p message=%p prog=%p",
+              srv, job->client, job->msg, job->prog);
+
+    if (virNetServerProcessMsg(srv, job->client, job->prog, job->msg) < 0)
         goto error;
 
     virNetServerLock(srv);
     virNetServerProgramFree(job->prog);
     virNetServerUnlock(srv);
 
-cleanup:
     virNetServerClientFree(job->client);
     VIR_FREE(job);
     return;
@@ -187,7 +202,6 @@ static int virNetServerDispatchNewMessage(virNetServerClientPtr client,
                                           void *opaque)
 {
     virNetServerPtr srv = opaque;
-    virNetServerJobPtr job;
     virNetServerProgramPtr prog = NULL;
     unsigned int priority = 0;
     size_t i;
@@ -196,34 +210,42 @@ static int virNetServerDispatchNewMessage(virNetServerClientPtr client,
     VIR_DEBUG("server=%p client=%p message=%p",
               srv, client, msg);
 
-    if (VIR_ALLOC(job) < 0) {
-        virReportOOMError();
-        return -1;
-    }
-
-    job->client = client;
-    job->msg = msg;
-
     virNetServerLock(srv);
     for (i = 0 ; i < srv->nprograms ; i++) {
-        if (virNetServerProgramMatches(srv->programs[i], job->msg)) {
+        if (virNetServerProgramMatches(srv->programs[i], msg)) {
             prog = srv->programs[i];
             break;
         }
     }
 
-    if (prog) {
-        virNetServerProgramRef(prog);
-        job->prog = prog;
-        priority = virNetServerProgramGetPriority(prog, msg->header.proc);
-    }
+    if (srv->workers) {
+        virNetServerJobPtr job;
+
+        if (VIR_ALLOC(job) < 0) {
+            virReportOOMError();
+            goto cleanup;
+        }
 
-    ret = virThreadPoolSendJob(srv->workers, priority, job);
+        job->client = client;
+        job->msg = msg;
+
+        if (prog) {
+            virNetServerProgramRef(prog);
+            job->prog = prog;
+            priority = virNetServerProgramGetPriority(prog, msg->header.proc);
+        }
 
-    if (ret < 0) {
-        VIR_FREE(job);
-        virNetServerProgramFree(prog);
+        ret = virThreadPoolSendJob(srv->workers, priority, job);
+
+        if (ret < 0) {
+            VIR_FREE(job);
+            virNetServerProgramFree(prog);
+        }
+    } else {
+        ret = virNetServerProcessMsg(srv, client, prog, msg);
     }
+
+cleanup:
     virNetServerUnlock(srv);
 
     return ret;
@@ -324,7 +346,8 @@ virNetServerPtr virNetServerNew(size_t min_workers,
 
     srv->refs = 1;
 
-    if (!(srv->workers = virThreadPoolNew(min_workers, max_workers,
+    if (max_workers &&
+        !(srv->workers = virThreadPoolNew(min_workers, max_workers,
                                           priority_workers,
                                           virNetServerHandleJob,
                                           srv)))
-- 
1.7.10.4


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]