[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]

[Cluster-devel] [PATCH] RFC: Use workqueues for dlm lowcomms



Steve: Please do /not/ commit this patch.


The attached patch converts the dlm_sendd & dlm_recvd daemons to work queues. This removes a lot of duplicated code and allows the
receive thread to run on multiple processors on SMP systems (as per bz#218756).

It's currently only applicable to the TCP lowcomms. I'll do the SCTP one when (if?) we are happy with this one
-- 

patrick
diff --git a/fs/dlm/lowcomms-tcp.c b/fs/dlm/lowcomms-tcp.c
index 3b22473..d356585 100644
--- a/fs/dlm/lowcomms-tcp.c
+++ b/fs/dlm/lowcomms-tcp.c
@@ -115,6 +115,8 @@ struct connection {
 	atomic_t waiting_requests;
 #define MAX_CONNECT_RETRIES 3
 	struct connection *othercon;
+	struct work_struct rwork; /* Receive workqueue */
+	struct work_struct swork; /* Send workqueue */
 };
 #define sock2con(x) ((struct connection *)(x)->sk_user_data)
 
@@ -131,14 +133,9 @@ struct writequeue_entry {
 
 static struct sockaddr_storage dlm_local_addr;
 
-/* Manage daemons */
-static struct task_struct *recv_task;
-static struct task_struct *send_task;
-
-static wait_queue_t lowcomms_send_waitq_head;
-static DECLARE_WAIT_QUEUE_HEAD(lowcomms_send_waitq);
-static wait_queue_t lowcomms_recv_waitq_head;
-static DECLARE_WAIT_QUEUE_HEAD(lowcomms_recv_waitq);
+/* Work queues */
+static struct workqueue_struct *recv_workqueue;
+static struct workqueue_struct *send_workqueue;
 
 /* An array of pointers to connections, indexed by NODEID */
 static struct connection **connections;
@@ -146,17 +143,8 @@ static DECLARE_MUTEX(connections_lock);
 static struct kmem_cache *con_cache;
 static int conn_array_size;
 
-/* List of sockets that have reads pending */
-static LIST_HEAD(read_sockets);
-static DEFINE_SPINLOCK(read_sockets_lock);
-
-/* List of sockets which have writes pending */
-static LIST_HEAD(write_sockets);
-static DEFINE_SPINLOCK(write_sockets_lock);
-
-/* List of sockets which have connects pending */
-static LIST_HEAD(state_sockets);
-static DEFINE_SPINLOCK(state_sockets_lock);
+static void process_recv_sockets(struct work_struct *work);
+static void process_send_sockets(struct work_struct *work);
 
 static struct connection *nodeid2con(int nodeid, gfp_t allocation)
 {
@@ -189,6 +177,8 @@ static struct connection *nodeid2con(int nodeid, gfp_t allocation)
 		init_rwsem(&con->sock_sem);
 		INIT_LIST_HEAD(&con->writequeue);
 		spin_lock_init(&con->writequeue_lock);
+		INIT_WORK(&con->swork, process_send_sockets);
+		INIT_WORK(&con->rwork, process_recv_sockets);
 
 		connections[nodeid] = con;
 	}
@@ -203,41 +193,22 @@ static void lowcomms_data_ready(struct sock *sk, int count_unused)
 {
 	struct connection *con = sock2con(sk);
 
-	atomic_inc(&con->waiting_requests);
-	if (test_and_set_bit(CF_READ_PENDING, &con->flags))
-		return;
-
-	spin_lock_bh(&read_sockets_lock);
-	list_add_tail(&con->read_list, &read_sockets);
-	spin_unlock_bh(&read_sockets_lock);
-
-	wake_up_interruptible(&lowcomms_recv_waitq);
+	if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
+		queue_work(recv_workqueue, &con->rwork);
 }
 
 static void lowcomms_write_space(struct sock *sk)
 {
 	struct connection *con = sock2con(sk);
 
-	if (test_and_set_bit(CF_WRITE_PENDING, &con->flags))
-		return;
-
-	spin_lock_bh(&write_sockets_lock);
-	list_add_tail(&con->write_list, &write_sockets);
-	spin_unlock_bh(&write_sockets_lock);
-
-	wake_up_interruptible(&lowcomms_send_waitq);
+	if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags))
+		queue_work(send_workqueue, &con->swork);
 }
 
 static inline void lowcomms_connect_sock(struct connection *con)
 {
-	if (test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
-		return;
-
-	spin_lock_bh(&state_sockets_lock);
-	list_add_tail(&con->state_list, &state_sockets);
-	spin_unlock_bh(&state_sockets_lock);
-
-	wake_up_interruptible(&lowcomms_send_waitq);
+	if (!test_and_set_bit(CF_CONNECT_PENDING, &con->flags))
+		queue_work(send_workqueue, &con->swork);
 }
 
 static void lowcomms_state_change(struct sock *sk)
@@ -388,7 +359,8 @@ out:
 	return 0;
 
 out_resched:
-	lowcomms_data_ready(con->sock->sk, 0);
+	if (!test_and_set_bit(CF_READ_PENDING, &con->flags))
+		queue_work(recv_workqueue, &con->rwork);
 	up_read(&con->sock_sem);
 	cond_resched();
 	return 0;
@@ -477,6 +449,8 @@ static int accept_from_sock(struct connection *con)
 			othercon->nodeid = nodeid;
 			othercon->rx_action = receive_from_sock;
 			init_rwsem(&othercon->sock_sem);
+			INIT_WORK(&othercon->swork, process_send_sockets);
+			INIT_WORK(&othercon->rwork, process_recv_sockets);
 			set_bit(CF_IS_OTHERCON, &othercon->flags);
 			newcon->othercon = othercon;
 		}
@@ -498,7 +472,8 @@ static int accept_from_sock(struct connection *con)
 	 * beween processing the accept adding the socket
 	 * to the read_sockets list
 	 */
-	lowcomms_data_ready(newsock->sk, 0);
+	if (!test_and_set_bit(CF_READ_PENDING, &newcon->flags))
+		queue_work(recv_workqueue, &newcon->rwork);
 	up_read(&con->sock_sem);
 
 	return 0;
@@ -709,6 +684,7 @@ void *dlm_lowcomms_get_buffer(int nodeid, int len,
 	if (!con)
 		return NULL;
 
+	spin_lock(&con->writequeue_lock);
 	e = list_entry(con->writequeue.prev, struct writequeue_entry, list);
 	if ((&e->list == &con->writequeue) ||
 	    (PAGE_CACHE_SIZE - e->end < len)) {
@@ -747,6 +723,7 @@ void dlm_lowcomms_commit_buffer(void *mh)
 	struct connection *con = e->con;
 	int users;
 
+	spin_lock(&con->writequeue_lock);
 	users = --e->users;
 	if (users)
 		goto out;
@@ -754,12 +731,8 @@ void dlm_lowcomms_commit_buffer(void *mh)
 	kunmap(e->page);
 	spin_unlock(&con->writequeue_lock);
 
-	if (test_and_set_bit(CF_WRITE_PENDING, &con->flags) == 0) {
-		spin_lock_bh(&write_sockets_lock);
-		list_add_tail(&con->write_list, &write_sockets);
-		spin_unlock_bh(&write_sockets_lock);
-
-		wake_up_interruptible(&lowcomms_send_waitq);
+	if (!test_and_set_bit(CF_WRITE_PENDING, &con->flags)) {
+		queue_work(send_workqueue, &con->swork);
 	}
 	return;
 
@@ -800,6 +773,7 @@ static void send_to_sock(struct connection *con)
 		offset = e->offset;
 		BUG_ON(len == 0 && e->users == 0);
 		spin_unlock(&con->writequeue_lock);
+		kmap(e->page);
 
 		ret = 0;
 		if (len) {
@@ -881,85 +855,29 @@ out:
 }
 
 /* Look for activity on active sockets */
-static void process_sockets(void)
+static void process_recv_sockets(struct work_struct *work)
 {
-	struct list_head *list;
-	struct list_head *temp;
-	int count = 0;
-
-	spin_lock_bh(&read_sockets_lock);
-	list_for_each_safe(list, temp, &read_sockets) {
-
-		struct connection *con =
-			list_entry(list, struct connection, read_list);
-		list_del(&con->read_list);
-		clear_bit(CF_READ_PENDING, &con->flags);
+	struct connection *con = container_of(work, struct connection, rwork);
+	int err;
 
-		spin_unlock_bh(&read_sockets_lock);
-
-		/* This can reach zero if we are processing requests
-		 * as they come in.
-		 */
-		if (atomic_read(&con->waiting_requests) == 0) {
-			spin_lock_bh(&read_sockets_lock);
-			continue;
-		}
-
-		do {
-			con->rx_action(con);
-
-			/* Don't starve out everyone else */
-			if (++count >= MAX_RX_MSG_COUNT) {
-				cond_resched();
-				count = 0;
-			}
-
-		} while (!atomic_dec_and_test(&con->waiting_requests) &&
-			 !kthread_should_stop());
-
-		spin_lock_bh(&read_sockets_lock);
-	}
-	spin_unlock_bh(&read_sockets_lock);
+	clear_bit(CF_READ_PENDING, &con->flags);
+	do {
+		err = con->rx_action(con);
+	} while (!err);
 }
 
-/* Try to send any messages that are pending
- */
-static void process_output_queue(void)
-{
-	struct list_head *list;
-	struct list_head *temp;
-
-	spin_lock_bh(&write_sockets_lock);
-	list_for_each_safe(list, temp, &write_sockets) {
-		struct connection *con =
-			list_entry(list, struct connection, write_list);
-		clear_bit(CF_WRITE_PENDING, &con->flags);
-		list_del(&con->write_list);
-
-		spin_unlock_bh(&write_sockets_lock);
-		send_to_sock(con);
-		spin_lock_bh(&write_sockets_lock);
-	}
-	spin_unlock_bh(&write_sockets_lock);
-}
 
-static void process_state_queue(void)
+static void process_send_sockets(struct work_struct *work)
 {
-	struct list_head *list;
-	struct list_head *temp;
-
-	spin_lock_bh(&state_sockets_lock);
-	list_for_each_safe(list, temp, &state_sockets) {
-		struct connection *con =
-			list_entry(list, struct connection, state_list);
-		list_del(&con->state_list);
-		clear_bit(CF_CONNECT_PENDING, &con->flags);
-		spin_unlock_bh(&state_sockets_lock);
+	struct connection *con = container_of(work, struct connection, swork);
 
+	if (test_and_clear_bit(CF_CONNECT_PENDING, &con->flags)) {
 		connect_to_sock(con);
-		spin_lock_bh(&state_sockets_lock);
 	}
-	spin_unlock_bh(&state_sockets_lock);
+
+	if (test_and_clear_bit(CF_WRITE_PENDING, &con->flags)) {
+		send_to_sock(con);
+	}
 }
 
 
@@ -976,97 +894,29 @@ static void clean_writequeues(void)
 	}
 }
 
-static int read_list_empty(void)
+static void work_stop(void)
 {
-	int status;
-
-	spin_lock_bh(&read_sockets_lock);
-	status = list_empty(&read_sockets);
-	spin_unlock_bh(&read_sockets_lock);
-
-	return status;
+	destroy_workqueue(recv_workqueue);
+	destroy_workqueue(send_workqueue);
 }
 
-/* DLM Transport comms receive daemon */
-static int dlm_recvd(void *data)
+static int work_start(void)
 {
-	init_waitqueue_entry(&lowcomms_recv_waitq_head, current);
-	add_wait_queue(&lowcomms_recv_waitq, &lowcomms_recv_waitq_head);
-
-	while (!kthread_should_stop()) {
-		set_current_state(TASK_INTERRUPTIBLE);
-		if (read_list_empty())
-			cond_resched();
-		set_current_state(TASK_RUNNING);
-
-		process_sockets();
-	}
-
-	return 0;
-}
-
-static int write_and_state_lists_empty(void)
-{
-	int status;
-
-	spin_lock_bh(&write_sockets_lock);
-	status = list_empty(&write_sockets);
-	spin_unlock_bh(&write_sockets_lock);
-
-	spin_lock_bh(&state_sockets_lock);
-	if (list_empty(&state_sockets) == 0)
-		status = 0;
-	spin_unlock_bh(&state_sockets_lock);
-
-	return status;
-}
-
-/* DLM Transport send daemon */
-static int dlm_sendd(void *data)
-{
-	init_waitqueue_entry(&lowcomms_send_waitq_head, current);
-	add_wait_queue(&lowcomms_send_waitq, &lowcomms_send_waitq_head);
-
-	while (!kthread_should_stop()) {
-		set_current_state(TASK_INTERRUPTIBLE);
-		if (write_and_state_lists_empty())
-			cond_resched();
-		set_current_state(TASK_RUNNING);
-
-		process_state_queue();
-		process_output_queue();
-	}
-
-	return 0;
-}
-
-static void daemons_stop(void)
-{
-	kthread_stop(recv_task);
-	kthread_stop(send_task);
-}
-
-static int daemons_start(void)
-{
-	struct task_struct *p;
 	int error;
-
-	p = kthread_run(dlm_recvd, NULL, "dlm_recvd");
-	error = IS_ERR(p);
+	recv_workqueue = create_workqueue("dlm_recv");
+	error = IS_ERR(recv_workqueue);
 	if (error) {
-		log_print("can't start dlm_recvd %d", error);
+		log_print("can't start dlm_recv %d", error);
 		return error;
 	}
-	recv_task = p;
 
-	p = kthread_run(dlm_sendd, NULL, "dlm_sendd");
-	error = IS_ERR(p);
+	send_workqueue = create_singlethread_workqueue("dlm_send");
+	error = IS_ERR(send_workqueue);
 	if (error) {
-		log_print("can't start dlm_sendd %d", error);
-		kthread_stop(recv_task);
+		log_print("can't start dlm_send %d", error);
+		destroy_workqueue(recv_workqueue);
 		return error;
 	}
-	send_task = p;
 
 	return 0;
 }
@@ -1083,7 +933,7 @@ void dlm_lowcomms_stop(void)
 			connections[i]->flags |= 0xFF;
 	}
 
-	daemons_stop();
+	work_stop();
 	clean_writequeues();
 
 	for (i = 0; i < conn_array_size; i++) {
@@ -1135,7 +985,7 @@ int dlm_lowcomms_start(void)
 	if (error)
 		goto fail_unlisten;
 
-	error = daemons_start();
+	error = work_start();
 	if (error)
 		goto fail_unlisten;
 

[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]