[dm-devel] more workqueue play

Christophe Saout christophe at saout.de
Tue Feb 17 09:56:02 UTC 2004


Hi,

I've been playing with kcopyd.c and workqueues.

I basically ripped out the whole joe queue thing and converted the three
work functions to workqueue wq functions.

I then found out that this was not 100% intelligent because simply
requeueing failed pages_jobs makes kcopyd turn into a cpu hog because
it endlessly keeps trying to allocate pages.

I then added a list for jobs waiting for pages.

It is still shorter than the dm-daemon version.

I was just playing though, don't shoot me if I did a big mistake. ;)

(the line numbers might be a bit off)


--- linux.orig/drivers/md/kcopyd.c	2004-02-17 15:40:14.789401760 +0100
+++ linux/drivers/md/kcopyd.c	2004-02-17 15:43:10.703658728 +0100
@@ -16,14 +16,14 @@
 #include <linux/pagemap.h>
 #include <linux/slab.h>
 #include <linux/vmalloc.h>
+#include <linux/workqueue.h>
 
 #include "kcopyd.h"
-#include "dm-daemon.h"
 
 /* FIXME: this is only needed for the DMERR macros */
 #include "dm.h"
 
-static struct dm_daemon _kcopyd;
+static struct workqueue_struct *_kcopyd_wq;
 
 /*-----------------------------------------------------------------
  * Each kcopyd client has its own little pool of preallocated
@@ -36,6 +36,7 @@
 	struct list_head pages;
 	unsigned int nr_pages;
 	unsigned int nr_free_pages;
+	struct list_head pagewait_jobs;
 };
 
 static inline void __push_page(struct kcopyd_client *kc, struct page *p)
@@ -146,9 +147,11 @@
  *---------------------------------------------------------------*/
 struct kcopyd_job {
 	struct kcopyd_client *kc;
-	struct list_head list;
+	struct work_struct work;
 	unsigned long flags;
 
+	struct list_head pagewait_list;
+
 	/*
 	 * Error state of the job.
 	 */
@@ -193,27 +196,8 @@
 static kmem_cache_t *_job_cache;
 static mempool_t *_job_pool;
 
-/*
- * We maintain three lists of jobs:
- *
- * i)   jobs waiting for pages
- * ii)  jobs that have pages, and are waiting for the io to be issued.
- * iii) jobs that have completed.
- *
- * All three of these are protected by job_lock.
- */
-static spinlock_t _job_lock = SPIN_LOCK_UNLOCKED;
-
-static LIST_HEAD(_complete_jobs);
-static LIST_HEAD(_io_jobs);
-static LIST_HEAD(_pages_jobs);
-
 static int jobs_init(void)
 {
-	INIT_LIST_HEAD(&_complete_jobs);
-	INIT_LIST_HEAD(&_io_jobs);
-	INIT_LIST_HEAD(&_pages_jobs);
-
 	_job_cache = kmem_cache_create("kcopyd-jobs",
 				       sizeof(struct kcopyd_job),
 				       __alignof__(struct kcopyd_job),
@@ -233,71 +217,83 @@
 
 static void jobs_exit(void)
 {
-	BUG_ON(!list_empty(&_complete_jobs));
-	BUG_ON(!list_empty(&_io_jobs));
-	BUG_ON(!list_empty(&_pages_jobs));
-
+	flush_workqueue(_kcopyd_wq);
 	mempool_destroy(_job_pool);
 	kmem_cache_destroy(_job_cache);
 }
 
-/*
- * Functions to push and pop a job onto the head of a given job
- * list.
- */
-static inline struct kcopyd_job *pop(struct list_head *jobs)
+static void run_pages_job(void *data);
+static void run_complete_job(void *data);
+static void run_io_job(void *data);
+
+static void queue_job(struct kcopyd_job *job, void (*fn)(void *data))
 {
-	struct kcopyd_job *job = NULL;
-	unsigned long flags;
+	PREPARE_WORK(&job->work, fn, job);
+	queue_work(_kcopyd_wq, &job->work);
+}
 
-	spin_lock_irqsave(&_job_lock, flags);
+static void unqueue_pagewait_jobs(struct kcopyd_client *kc)
+{
+	struct list_head *tmp, *tmp2;
+	unsigned int nr;
+
+	spin_lock(&kc->lock);
+	nr = kc->nr_free_pages;
+	list_for_each_safe (tmp, tmp2, &kc->pagewait_jobs) {
+		struct kcopyd_job *job =
+			list_entry(tmp, struct kcopyd_job, pagewait_list);
+
+		if (nr < job->nr_pages)
+			break;
+		nr -= job->nr_pages;
 
-	if (!list_empty(jobs)) {
-		job = list_entry(jobs->next, struct kcopyd_job, list);
-		list_del(&job->list);
+		list_del(&job->pagewait_list);
+		queue_job(job, run_pages_job);
 	}
-	spin_unlock_irqrestore(&_job_lock, flags);
+	spin_unlock(&kc->lock);
+}
 
-	return job;
+static void queue_pagewait_job(struct kcopyd_job *job)
+{
+	struct kcopyd_client *kc = job->kc;
+
+	spin_lock(&kc->lock);
+	list_add(&job->pagewait_list, &kc->pagewait_jobs);
+	spin_unlock(&kc->lock);
 }
 
-static inline void push(struct list_head *jobs, struct kcopyd_job *job)
+static void error_job(struct kcopyd_job *job)
 {
-	unsigned long flags;
+	/* error this rogue job */
+	if (job->rw == WRITE)
+		job->write_err = (unsigned int) -1;
+	else
+		job->read_err = 1;
 
-	spin_lock_irqsave(&_job_lock, flags);
-	list_add_tail(&job->list, jobs);
-	spin_unlock_irqrestore(&_job_lock, flags);
+	queue_job(job, run_complete_job);
 }
 
-/*
- * These three functions process 1 item from the corresponding
- * job list.
- *
- * They return:
- * < 0: error
- *   0: success
- * > 0: can't process yet.
- */
-static int run_complete_job(struct kcopyd_job *job)
+static void run_complete_job(void *data)
 {
+	struct kcopyd_job *job = (struct kcopyd_job *) data;
 	void *context = job->context;
 	int read_err = job->read_err;
 	unsigned int write_err = job->write_err;
 	kcopyd_notify_fn fn = job->fn;
 
 	kcopyd_put_pages(job->kc, &job->pages);
+	unqueue_pagewait_jobs(job->kc);
+
 	mempool_free(job, _job_pool);
 	fn(read_err, write_err, context);
-	return 0;
 }
 
-static unsigned _pending;
+static atomic_t _pending; /* FIXME: ever used? */
 static void complete_io(unsigned long error, void *context)
 {
 	struct kcopyd_job *job = (struct kcopyd_job *) context;
 
-	_pending--;
+	atomic_dec(&_pending);
 	if (error) {
 		if (job->rw == WRITE)
 			job->write_err &= error;
@@ -305,29 +301,27 @@
 			job->read_err = 1;
 
 		if (!test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) {
-			push(&_complete_jobs, job);
-			dm_daemon_wake(&_kcopyd);
+			queue_job(job, run_complete_job);
 			return;
 		}
 	}
 
 	if (job->rw == WRITE)
-		push(&_complete_jobs, job);
+		queue_job(job, run_complete_job);
 
 	else {
 		job->rw = WRITE;
-		push(&_io_jobs, job);
+		queue_job(job, run_io_job);
 	}
-
-	dm_daemon_wake(&_kcopyd);
 }
 
 /*
  * Request io on as many buffer heads as we can currently get for
  * a particular job.
  */
-static int run_io_job(struct kcopyd_job *job)
+static void run_io_job(void *data)
 {
+	struct kcopyd_job *job = (struct kcopyd_job *) data;
 	int r;
 
 	if (job->rw == READ)
@@ -340,99 +334,37 @@
 				list_entry(job->pages.next, struct page, list),
 				job->offset, complete_io, job);
 
-	if (!r)
-		_pending++;
+	if (r) {
+		error_job(job);
+		return;
+	}
 
-	return r;
+	atomic_inc(&_pending);
+	blk_run_queues(); /* FIXME: too often */
 }
 
-static int run_pages_job(struct kcopyd_job *job)
+static void run_pages_job(void *data)
 {
+	struct kcopyd_job *job = (struct kcopyd_job *) data;
 	int r;
 
 	job->nr_pages = dm_div_up(job->dests[0].count + job->offset,
 				  PAGE_SIZE >> 9);
 	r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages);
-	if (!r) {
-		/* this job is ready for io */
-		push(&_io_jobs, job);
-		return 0;
-	}
-
 	if (r == -ENOMEM)
 		/* can't complete now */
-		return 1;
-
-	return r;
-}
-
-/*
- * Run through a list for as long as possible.  Returns the count
- * of successful jobs.
- */
-static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *))
-{
-	struct kcopyd_job *job;
-	int r, count = 0;
-
-	while ((job = pop(jobs))) {
-
-		r = fn(job);
-
-		if (r < 0) {
-			/* error this rogue job */
-			if (job->rw == WRITE)
-				job->write_err = (unsigned int) -1;
-			else
-				job->read_err = 1;
-			push(&_complete_jobs, job);
-			break;
-		}
-
-		if (r > 0) {
-			/*
-			 * We couldn't service this job ATM, so
-			 * push this job back onto the list.
-			 */
-			push(jobs, job);
-			break;
-		}
-
-		count++;
-	}
-
-	return count;
-}
-
-/*
- * kcopyd does this every time it's woken up.
- */
-static jiffy_t do_work(void)
-{
-	/*
-	 * The order that these are called is *very* important.
-	 * complete jobs can free some pages for pages jobs.
-	 * Pages jobs when successful will jump onto the io jobs
-	 * list.  io jobs call wake when they complete and it all
-	 * starts again.
-	 */
-	process_jobs(&_complete_jobs, run_complete_job);
-	process_jobs(&_pages_jobs, run_pages_job);
-	process_jobs(&_io_jobs, run_io_job);
-
-	blk_run_queues();
-	return (jiffy_t) 0;
+		queue_pagewait_job(job);
+	else if (r)
+		error_job(job);
+	else
+		/* this job is ready for io */
+		queue_job(job, run_io_job);
 }
 
-/*
- * If we are copying a small region we just dispatch a single job
- * to do the copy, otherwise the io has to be split up into many
- * jobs.
- */
 static void dispatch_job(struct kcopyd_job *job)
 {
-	push(&_pages_jobs, job);
-	dm_daemon_wake(&_kcopyd);
+	INIT_WORK(&job->work, run_pages_job, job);
+	queue_work(_kcopyd_wq, &job->work);
 }
 
 #define SUB_JOB_SIZE 128
@@ -614,6 +546,8 @@
 		return r;
 	}
 
+	INIT_LIST_HEAD(&kc->pagewait_jobs);
+
 	r = client_add(kc);
 	if (r) {
 		dm_io_put(nr_pages);
@@ -643,17 +577,19 @@
 	if (r)
 		return r;
 
-	r = dm_daemon_start(&_kcopyd, "kcopyd", do_work);
-	if (r)
+	_kcopyd_wq = create_workqueue("kcopyd");
+	if (!_kcopyd_wq) {
 		jobs_exit();
+		return -ENOMEM;
+	}
 
-	return r;
+	return 0;
 }
 
 void kcopyd_exit(void)
 {
 	jobs_exit();
-	dm_daemon_stop(&_kcopyd);
+	destroy_workqueue(_kcopyd_wq);
 }
 
 EXPORT_SYMBOL(kcopyd_client_create);




More information about the dm-devel mailing list