[dm-devel] more workqueue play
Christophe Saout
christophe at saout.de
Tue Feb 17 09:56:02 UTC 2004
Hi,
I've been playing with kcopyd.c and workqueues.
I basically ripped out the whole joe queue thing and converted the three
work functions to workqueue wq functions.
I then found out that this was not 100% intelligent because simply
requeueing failed pages_jobs makes kcopyd turn into a cpu hog because
it endlessly keeps trying to allocate pages.
I then added a list for jobs waiting for pages.
It is still shorter than the dm-daemon version.
I was just playing though, don't shoot me if I did a big mistake. ;)
(the line numbers might be a bit off)
--- linux.orig/drivers/md/kcopyd.c 2004-02-17 15:40:14.789401760 +0100
+++ linux/drivers/md/kcopyd.c 2004-02-17 15:43:10.703658728 +0100
@@ -16,14 +16,14 @@
#include <linux/pagemap.h>
#include <linux/slab.h>
#include <linux/vmalloc.h>
+#include <linux/workqueue.h>
#include "kcopyd.h"
-#include "dm-daemon.h"
/* FIXME: this is only needed for the DMERR macros */
#include "dm.h"
-static struct dm_daemon _kcopyd;
+static struct workqueue_struct *_kcopyd_wq;
/*-----------------------------------------------------------------
* Each kcopyd client has its own little pool of preallocated
@@ -36,6 +36,7 @@
struct list_head pages;
unsigned int nr_pages;
unsigned int nr_free_pages;
+ struct list_head pagewait_jobs;
};
static inline void __push_page(struct kcopyd_client *kc, struct page *p)
@@ -146,9 +147,11 @@
*---------------------------------------------------------------*/
struct kcopyd_job {
struct kcopyd_client *kc;
- struct list_head list;
+ struct work_struct work;
unsigned long flags;
+ struct list_head pagewait_list;
+
/*
* Error state of the job.
*/
@@ -193,27 +196,8 @@
static kmem_cache_t *_job_cache;
static mempool_t *_job_pool;
-/*
- * We maintain three lists of jobs:
- *
- * i) jobs waiting for pages
- * ii) jobs that have pages, and are waiting for the io to be issued.
- * iii) jobs that have completed.
- *
- * All three of these are protected by job_lock.
- */
-static spinlock_t _job_lock = SPIN_LOCK_UNLOCKED;
-
-static LIST_HEAD(_complete_jobs);
-static LIST_HEAD(_io_jobs);
-static LIST_HEAD(_pages_jobs);
-
static int jobs_init(void)
{
- INIT_LIST_HEAD(&_complete_jobs);
- INIT_LIST_HEAD(&_io_jobs);
- INIT_LIST_HEAD(&_pages_jobs);
-
_job_cache = kmem_cache_create("kcopyd-jobs",
sizeof(struct kcopyd_job),
__alignof__(struct kcopyd_job),
@@ -233,71 +217,83 @@
static void jobs_exit(void)
{
- BUG_ON(!list_empty(&_complete_jobs));
- BUG_ON(!list_empty(&_io_jobs));
- BUG_ON(!list_empty(&_pages_jobs));
-
+ flush_workqueue(_kcopyd_wq);
mempool_destroy(_job_pool);
kmem_cache_destroy(_job_cache);
}
-/*
- * Functions to push and pop a job onto the head of a given job
- * list.
- */
-static inline struct kcopyd_job *pop(struct list_head *jobs)
+static void run_pages_job(void *data);
+static void run_complete_job(void *data);
+static void run_io_job(void *data);
+
+static void queue_job(struct kcopyd_job *job, void (*fn)(void *data))
{
- struct kcopyd_job *job = NULL;
- unsigned long flags;
+ PREPARE_WORK(&job->work, fn, job);
+ queue_work(_kcopyd_wq, &job->work);
+}
- spin_lock_irqsave(&_job_lock, flags);
+static void unqueue_pagewait_jobs(struct kcopyd_client *kc)
+{
+ struct list_head *tmp, *tmp2;
+ unsigned int nr;
+
+ spin_lock(&kc->lock);
+ nr = kc->nr_free_pages;
+ list_for_each_safe (tmp, tmp2, &kc->pagewait_jobs) {
+ struct kcopyd_job *job =
+ list_entry(tmp, struct kcopyd_job, pagewait_list);
+
+ if (nr < job->nr_pages)
+ break;
+ nr -= job->nr_pages;
- if (!list_empty(jobs)) {
- job = list_entry(jobs->next, struct kcopyd_job, list);
- list_del(&job->list);
+ list_del(&job->pagewait_list);
+ queue_job(job, run_pages_job);
}
- spin_unlock_irqrestore(&_job_lock, flags);
+ spin_unlock(&kc->lock);
+}
- return job;
+static void queue_pagewait_job(struct kcopyd_job *job)
+{
+ struct kcopyd_client *kc = job->kc;
+
+ spin_lock(&kc->lock);
+ list_add(&job->pagewait_list, &kc->pagewait_jobs);
+ spin_unlock(&kc->lock);
}
-static inline void push(struct list_head *jobs, struct kcopyd_job *job)
+static void error_job(struct kcopyd_job *job)
{
- unsigned long flags;
+ /* error this rogue job */
+ if (job->rw == WRITE)
+ job->write_err = (unsigned int) -1;
+ else
+ job->read_err = 1;
- spin_lock_irqsave(&_job_lock, flags);
- list_add_tail(&job->list, jobs);
- spin_unlock_irqrestore(&_job_lock, flags);
+ queue_job(job, run_complete_job);
}
-/*
- * These three functions process 1 item from the corresponding
- * job list.
- *
- * They return:
- * < 0: error
- * 0: success
- * > 0: can't process yet.
- */
-static int run_complete_job(struct kcopyd_job *job)
+static void run_complete_job(void *data)
{
+ struct kcopyd_job *job = (struct kcopyd_job *) data;
void *context = job->context;
int read_err = job->read_err;
unsigned int write_err = job->write_err;
kcopyd_notify_fn fn = job->fn;
kcopyd_put_pages(job->kc, &job->pages);
+ unqueue_pagewait_jobs(job->kc);
+
mempool_free(job, _job_pool);
fn(read_err, write_err, context);
- return 0;
}
-static unsigned _pending;
+static atomic_t _pending; /* FIXME: ever used? */
static void complete_io(unsigned long error, void *context)
{
struct kcopyd_job *job = (struct kcopyd_job *) context;
- _pending--;
+ atomic_dec(&_pending);
if (error) {
if (job->rw == WRITE)
job->write_err &= error;
@@ -305,29 +301,27 @@
job->read_err = 1;
if (!test_bit(KCOPYD_IGNORE_ERROR, &job->flags)) {
- push(&_complete_jobs, job);
- dm_daemon_wake(&_kcopyd);
+ queue_job(job, run_complete_job);
return;
}
}
if (job->rw == WRITE)
- push(&_complete_jobs, job);
+ queue_job(job, run_complete_job);
else {
job->rw = WRITE;
- push(&_io_jobs, job);
+ queue_job(job, run_io_job);
}
-
- dm_daemon_wake(&_kcopyd);
}
/*
* Request io on as many buffer heads as we can currently get for
* a particular job.
*/
-static int run_io_job(struct kcopyd_job *job)
+static void run_io_job(void *data)
{
+ struct kcopyd_job *job = (struct kcopyd_job *) data;
int r;
if (job->rw == READ)
@@ -340,99 +334,37 @@
list_entry(job->pages.next, struct page, list),
job->offset, complete_io, job);
- if (!r)
- _pending++;
+ if (r) {
+ error_job(job);
+ return;
+ }
- return r;
+ atomic_inc(&_pending);
+ blk_run_queues(); /* FIXME: too often */
}
-static int run_pages_job(struct kcopyd_job *job)
+static void run_pages_job(void *data)
{
+ struct kcopyd_job *job = (struct kcopyd_job *) data;
int r;
job->nr_pages = dm_div_up(job->dests[0].count + job->offset,
PAGE_SIZE >> 9);
r = kcopyd_get_pages(job->kc, job->nr_pages, &job->pages);
- if (!r) {
- /* this job is ready for io */
- push(&_io_jobs, job);
- return 0;
- }
-
if (r == -ENOMEM)
/* can't complete now */
- return 1;
-
- return r;
-}
-
-/*
- * Run through a list for as long as possible. Returns the count
- * of successful jobs.
- */
-static int process_jobs(struct list_head *jobs, int (*fn) (struct kcopyd_job *))
-{
- struct kcopyd_job *job;
- int r, count = 0;
-
- while ((job = pop(jobs))) {
-
- r = fn(job);
-
- if (r < 0) {
- /* error this rogue job */
- if (job->rw == WRITE)
- job->write_err = (unsigned int) -1;
- else
- job->read_err = 1;
- push(&_complete_jobs, job);
- break;
- }
-
- if (r > 0) {
- /*
- * We couldn't service this job ATM, so
- * push this job back onto the list.
- */
- push(jobs, job);
- break;
- }
-
- count++;
- }
-
- return count;
-}
-
-/*
- * kcopyd does this every time it's woken up.
- */
-static jiffy_t do_work(void)
-{
- /*
- * The order that these are called is *very* important.
- * complete jobs can free some pages for pages jobs.
- * Pages jobs when successful will jump onto the io jobs
- * list. io jobs call wake when they complete and it all
- * starts again.
- */
- process_jobs(&_complete_jobs, run_complete_job);
- process_jobs(&_pages_jobs, run_pages_job);
- process_jobs(&_io_jobs, run_io_job);
-
- blk_run_queues();
- return (jiffy_t) 0;
+ queue_pagewait_job(job);
+ else if (r)
+ error_job(job);
+ else
+ /* this job is ready for io */
+ queue_job(job, run_io_job);
}
-/*
- * If we are copying a small region we just dispatch a single job
- * to do the copy, otherwise the io has to be split up into many
- * jobs.
- */
static void dispatch_job(struct kcopyd_job *job)
{
- push(&_pages_jobs, job);
- dm_daemon_wake(&_kcopyd);
+ INIT_WORK(&job->work, run_pages_job, job);
+ queue_work(_kcopyd_wq, &job->work);
}
#define SUB_JOB_SIZE 128
@@ -614,6 +546,8 @@
return r;
}
+ INIT_LIST_HEAD(&kc->pagewait_jobs);
+
r = client_add(kc);
if (r) {
dm_io_put(nr_pages);
@@ -643,17 +577,19 @@
if (r)
return r;
- r = dm_daemon_start(&_kcopyd, "kcopyd", do_work);
- if (r)
+ _kcopyd_wq = create_workqueue("kcopyd");
+ if (!_kcopyd_wq) {
jobs_exit();
+ return -ENOMEM;
+ }
- return r;
+ return 0;
}
void kcopyd_exit(void)
{
jobs_exit();
- dm_daemon_stop(&_kcopyd);
+ destroy_workqueue(_kcopyd_wq);
}
EXPORT_SYMBOL(kcopyd_client_create);
More information about the dm-devel
mailing list