[Cluster-devel] cluster/group/gfs_controld cpg.c lock_dlm.h ma ...

teigland at sourceware.org teigland at sourceware.org
Wed Nov 28 20:49:10 UTC 2007


CVSROOT:	/cvs/cluster
Module name:	cluster
Changes by:	teigland at sourceware.org	2007-11-28 20:49:08

Modified files:
	group/gfs_controld: cpg.c lock_dlm.h main.c plock.c recover.c 

Log message:
	A performance optimization for plocks.  This speeds up locks that are
	repeatedly accessed by processes on a single node.  Plocks used by
	processes on multiple nodes work the same way as before.  The
	optimization is disabled by default, and can be enabled by setting
	
	<gfs_controld plock_ownership="1"/>
	
	in cluster.conf, or by starting gfs_controld with "-o1".  It is disabled
	by default because enabling it breaks compatibility with previous versions
	of gfs_controld.  If all nodes in the cluster are running this version,
	then plock_ownership can be enabled.
	
	The plock_ownership mode needs extensive testing.  This also introduces
	some minor changes when plock_ownership is disabled, so new testing is
	also required in that mode.  Abhi and I worked on this together.

Patches:
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/group/gfs_controld/cpg.c.diff?cvsroot=cluster&r1=1.13&r2=1.14
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/group/gfs_controld/lock_dlm.h.diff?cvsroot=cluster&r1=1.30&r2=1.31
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/group/gfs_controld/main.c.diff?cvsroot=cluster&r1=1.31&r2=1.32
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/group/gfs_controld/plock.c.diff?cvsroot=cluster&r1=1.33&r2=1.34
http://sourceware.org/cgi-bin/cvsweb.cgi/cluster/group/gfs_controld/recover.c.diff?cvsroot=cluster&r1=1.33&r2=1.34

--- cluster/group/gfs_controld/cpg.c	2007/07/18 20:35:50	1.13
+++ cluster/group/gfs_controld/cpg.c	2007/11/28 20:49:08	1.14
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2006 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2006-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -13,6 +13,8 @@
 #include <openais/cpg.h>
 #include "lock_dlm.h"
 
+extern struct list_head mounts;
+extern unsigned int     protocol_active[3];
 static cpg_handle_t	daemon_handle;
 static struct cpg_name	daemon_name;
 int			message_flow_control_on;
@@ -21,6 +23,9 @@
 void receive_options(struct mountgroup *mg, char *buf, int len, int from);
 void receive_remount(struct mountgroup *mg, char *buf, int len, int from);
 void receive_plock(struct mountgroup *mg, char *buf, int len, int from);
+void receive_own(struct mountgroup *mg, char *buf, int len, int from);
+void receive_drop(struct mountgroup *mg, char *buf, int len, int from);
+void receive_sync(struct mountgroup *mg, char *buf, int len, int from);
 void receive_withdraw(struct mountgroup *mg, char *buf, int len, int from);
 void receive_mount_status(struct mountgroup *mg, char *buf, int len, int from);
 void receive_recovery_status(struct mountgroup *mg, char *buf, int len,
@@ -51,9 +56,14 @@
 	hd->nodeid	= le32_to_cpu(hd->nodeid);
 	hd->to_nodeid	= le32_to_cpu(hd->to_nodeid);
 
-	if (hd->version[0] != GDLM_VER_MAJOR) {
-		log_error("reject message version %u.%u.%u",
-			  hd->version[0], hd->version[1], hd->version[2]);
+	/* FIXME: we need to look at how to gracefully fail when we end up
+	   with mixed incompat versions */
+
+	if (hd->version[0] != protocol_active[0]) {
+		log_error("reject message from %d version %u.%u.%u vs %u.%u.%u",
+			  nodeid, hd->version[0], hd->version[1],
+			  hd->version[2], protocol_active[0],
+			  protocol_active[1], protocol_active[2]);
 		return;
 	}
 
@@ -100,6 +110,19 @@
 		receive_withdraw(mg, data, len, nodeid);
 		break;
 
+	case MSG_PLOCK_OWN:
+		receive_own(mg, data, len, nodeid);
+		break;
+
+	case MSG_PLOCK_DROP:
+		receive_drop(mg, data, len, nodeid);
+		break;
+
+	case MSG_PLOCK_SYNC_LOCK:
+	case MSG_PLOCK_SYNC_WAITER:
+		receive_sync(mg, data, len, nodeid);
+		break;
+
 	default:
 		log_error("unknown message type %d from %d",
 			  hd->type, hd->nodeid);
@@ -112,11 +135,26 @@
 	do_deliver(nodeid, data, data_len);
 }
 
+/* Not sure if purging plocks (driven by confchg) needs to be synchronized with
+   the other recovery steps (driven by libgroup) for a node, don't think so.
+   Is it possible for a node to have been cleared from the members_gone list
+   before this confchg is processed? */
+
 void confchg_cb(cpg_handle_t handle, struct cpg_name *group_name,
 		struct cpg_address *member_list, int member_list_entries,
 		struct cpg_address *left_list, int left_list_entries,
 		struct cpg_address *joined_list, int joined_list_entries)
 {
+	struct mountgroup *mg;
+	int i, nodeid;
+
+	for (i = 0; i < left_list_entries; i++) {
+		nodeid = left_list[i].nodeid;
+		list_for_each_entry(mg, &mounts, list) {
+			if (is_member(mg, nodeid) || is_removed(mg, nodeid))
+				purge_plocks(mg, left_list[i].nodeid, 0);
+		}
+	}
 }
 
 static cpg_callbacks_t callbacks = {
@@ -238,9 +276,9 @@
 	struct gdlm_header *hd = (struct gdlm_header *) buf;
 	int type = hd->type;
 
-	hd->version[0]	= cpu_to_le16(GDLM_VER_MAJOR);
-	hd->version[1]	= cpu_to_le16(GDLM_VER_MINOR);
-	hd->version[2]	= cpu_to_le16(GDLM_VER_PATCH);
+	hd->version[0]	= cpu_to_le16(protocol_active[0]);
+	hd->version[1]	= cpu_to_le16(protocol_active[1]);
+	hd->version[2]	= cpu_to_le16(protocol_active[2]);
 	hd->type	= cpu_to_le16(hd->type);
 	hd->nodeid	= cpu_to_le32(hd->nodeid);
 	hd->to_nodeid	= cpu_to_le32(hd->to_nodeid);
--- cluster/group/gfs_controld/lock_dlm.h	2007/11/21 17:49:16	1.30
+++ cluster/group/gfs_controld/lock_dlm.h	2007/11/28 20:49:08	1.31
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
 **  
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -168,6 +168,7 @@
 	uint64_t		cp_handle;
 	time_t			last_checkpoint_time;
 	time_t			last_plock_time;
+	struct timeval		drop_resources_last;
 
 	int			needs_recovery;
 	int			our_jid;
@@ -241,12 +242,12 @@
 	MSG_MOUNT_STATUS,
 	MSG_RECOVERY_STATUS,
 	MSG_RECOVERY_DONE,
+	MSG_PLOCK_OWN,
+	MSG_PLOCK_DROP,
+	MSG_PLOCK_SYNC_LOCK,
+	MSG_PLOCK_SYNC_WAITER,
 };
 
-#define GDLM_VER_MAJOR 1
-#define GDLM_VER_MINOR 0
-#define GDLM_VER_PATCH 0
-
 struct gdlm_header {
 	uint16_t		version[3];
 	uint16_t		type;			/* MSG_ */
@@ -267,6 +268,9 @@
 int do_write(int fd, void *buf, size_t count);
 struct mountgroup *find_mg(char *name);
 struct mountgroup *find_mg_id(uint32_t id);
+struct mg_member *find_memb_nodeid(struct mountgroup *mg, int nodeid);
+int is_member(struct mountgroup *mg, int nodeid);
+int is_removed(struct mountgroup *mg, int nodeid);
 
 int setup_cman(void);
 int process_cman(void);
--- cluster/group/gfs_controld/main.c	2007/10/26 19:33:21	1.31
+++ cluster/group/gfs_controld/main.c	2007/11/28 20:49:08	1.32
@@ -11,12 +11,29 @@
 ******************************************************************************/
 
 #include "lock_dlm.h"
+#include "ccs.h"
 
-#define OPTION_STRING			"DPhVwpl:"
+#define OPTION_STRING			"DPhVwpl:o:t:c:a:"
 #define LOCKFILE_NAME			"/var/run/gfs_controld.pid"
 
+#define DEFAULT_NO_WITHDRAW 0 /* enable withdraw by default */
+#define DEFAULT_NO_PLOCK 0 /* enable plocks by default */
+
+/* max number of plock ops we will cpg-multicast per second */
 #define DEFAULT_PLOCK_RATE_LIMIT 100
 
+/* disable ownership by default because it's a different protocol */
+#define DEFAULT_PLOCK_OWNERSHIP 0
+
+/* max frequency of drop attempts in ms */
+#define DEFAULT_DROP_RESOURCES_TIME 10000 /* 10 sec */
+
+/* max number of resources to drop per time period */
+#define DEFAULT_DROP_RESOURCES_COUNT 10
+
+/* resource not accessed for this many ms before subject to dropping */
+#define DEFAULT_DROP_RESOURCES_AGE 10000 /* 10 sec */
+
 struct client {
 	int fd;
 	char type[32];
@@ -24,11 +41,41 @@
 	int another_mount;
 };
 
+extern struct list_head mounts;
+extern struct list_head withdrawn_mounts;
+extern group_handle_t gh;
+
+int dmsetup_wait;
+
+/* cpg message protocol
+   1.0.0 is initial version
+   2.0.0 is incompatible with 1.0.0 and allows plock ownership */
+unsigned int protocol_v100[3] = {1, 0, 0};
+unsigned int protocol_v200[3] = {2, 0, 0};
+unsigned int protocol_active[3];
+
+/* user configurable */
+int config_no_withdraw;
+int config_no_plock;
+uint32_t config_plock_rate_limit;
+uint32_t config_plock_ownership;
+uint32_t config_drop_resources_time;
+uint32_t config_drop_resources_count;
+uint32_t config_drop_resources_age;
+
+/* command line settings override corresponding cluster.conf settings */
+static int opt_no_withdraw;
+static int opt_no_plock;
+static int opt_plock_rate_limit;
+static int opt_plock_ownership;
+static int opt_drop_resources_time;
+static int opt_drop_resources_count;
+static int opt_drop_resources_age;
+
 static int client_maxi;
 static int client_size = 0;
 static struct client *client = NULL;
 static struct pollfd *pollfd = NULL;
-
 static int cman_fd;
 static int cpg_fd;
 static int listen_fd;
@@ -37,13 +84,6 @@
 static int plocks_fd;
 static int plocks_ci;
 
-extern struct list_head mounts;
-extern struct list_head withdrawn_mounts;
-extern group_handle_t gh;
-int no_withdraw;
-int no_plock;
-uint32_t plock_rate_limit = DEFAULT_PLOCK_RATE_LIMIT;
-int dmsetup_wait;
 
 int do_read(int fd, void *buf, size_t count)
 {
@@ -620,6 +660,85 @@
 	return rv;
 }
 
+#define PLOCK_RATE_LIMIT_PATH "/cluster/gfs_controld/@plock_rate_limit"
+#define PLOCK_OWNERSHIP_PATH "/cluster/gfs_controld/@plock_ownership"
+#define DROP_RESOURCES_TIME_PATH "/cluster/gfs_controld/@drop_resources_time"
+#define DROP_RESOURCES_COUNT_PATH "/cluster/gfs_controld/@drop_resources_count"
+#define DROP_RESOURCES_AGE_PATH "/cluster/gfs_controld/@drop_resources_age"
+
+static void set_ccs_config(void)
+{
+	char path[PATH_MAX], *str;
+	int i = 0, cd, error;
+
+	while ((cd = ccs_connect()) < 0) {
+		sleep(1);
+		if (++i > 9 && !(i % 10))
+			log_error("connect to ccs error %d, "
+				  "check ccsd or cluster status", cd);
+	}
+
+	memset(path, 0, PATH_MAX);
+	snprintf(path, PATH_MAX, "%s", PLOCK_RATE_LIMIT_PATH);
+	str = NULL;
+
+	error = ccs_get(cd, path, &str);
+	if (!error) {
+		if (!opt_plock_rate_limit)
+			config_plock_rate_limit = atoi(str);
+	}
+	if (str)
+		free(str);
+
+	memset(path, 0, PATH_MAX);
+	snprintf(path, PATH_MAX, "%s", PLOCK_OWNERSHIP_PATH);
+	str = NULL;
+
+	error = ccs_get(cd, path, &str);
+	if (!error) {
+		if (!opt_plock_ownership)
+			config_plock_ownership = atoi(str);
+	}
+	if (str)
+		free(str);
+
+	memset(path, 0, PATH_MAX);
+	snprintf(path, PATH_MAX, "%s", DROP_RESOURCES_TIME_PATH);
+	str = NULL;
+
+	error = ccs_get(cd, path, &str);
+	if (!error) {
+		if (!opt_drop_resources_time)
+			config_drop_resources_time = atoi(str);
+	}
+	if (str)
+		free(str);
+
+	memset(path, 0, PATH_MAX);
+	snprintf(path, PATH_MAX, "%s", DROP_RESOURCES_COUNT_PATH);
+	str = NULL;
+
+	error = ccs_get(cd, path, &str);
+	if (!error) {
+		if (!opt_drop_resources_count)
+			config_drop_resources_count = atoi(str);
+	}
+	if (str)
+		free(str);
+
+	memset(path, 0, PATH_MAX);
+	snprintf(path, PATH_MAX, "%s", DROP_RESOURCES_AGE_PATH);
+	str = NULL;
+
+	error = ccs_get(cd, path, &str);
+	if (!error) {
+		if (!opt_drop_resources_age)
+			config_drop_resources_age = atoi(str);
+	}
+	if (str)
+		free(str);
+}
+
 static void lockfile(void)
 {
 	int fd, error;
@@ -692,10 +811,18 @@
 	printf("\n");
 	printf("  -D	       Enable debugging code and don't fork\n");
 	printf("  -P	       Enable plock debugging\n");
+	printf("  -w	       Disable withdraw\n");
 	printf("  -p	       Disable plocks\n");
 	printf("  -l <limit>   Limit the rate of plock operations\n");
 	printf("	       Default is %d, set to 0 for no limit\n", DEFAULT_PLOCK_RATE_LIMIT);
-	printf("  -w	       Disable withdraw\n");
+	printf("  -o <n>       plock ownership, 1 enable, 0 disable\n");
+	printf("               Default is %d\n", DEFAULT_PLOCK_OWNERSHIP);
+	printf("  -t <ms>      drop resources time (milliseconds)\n");
+	printf("               Default is %u\n", DEFAULT_DROP_RESOURCES_TIME);
+	printf("  -c <num>     drop resources count\n");
+	printf("               Default is %u\n", DEFAULT_DROP_RESOURCES_COUNT);
+	printf("  -a <ms>      drop resources age (milliseconds)\n");
+	printf("               Default is %u\n", DEFAULT_DROP_RESOURCES_AGE);
 	printf("  -h	       Print this help, then exit\n");
 	printf("  -V	       Print program version information, then exit\n");
 }
@@ -710,10 +837,6 @@
 
 		switch (optchar) {
 
-		case 'w':
-			no_withdraw = 1;
-			break;
-
 		case 'D':
 			daemon_debug_opt = 1;
 			break;
@@ -722,12 +845,39 @@
 			plock_debug_opt = 1;
 			break;
 
-		case 'l':
-			plock_rate_limit = atoi(optarg);
+		case 'w':
+			config_no_withdraw = 1;
+			opt_no_withdraw = 1;
 			break;
 
 		case 'p':
-			no_plock = 1;
+			config_no_plock = 1;
+			opt_no_plock = 1;
+			break;
+
+		case 'l':
+			config_plock_rate_limit = atoi(optarg);
+			opt_plock_rate_limit = 1;
+			break;
+
+		case 'o':
+			config_plock_ownership = atoi(optarg);
+			opt_plock_ownership = 1;
+			break;
+
+		case 't':
+			config_drop_resources_time = atoi(optarg);
+			opt_drop_resources_time = 1;
+			break;
+
+		case 'c':
+			config_drop_resources_count = atoi(optarg);
+			opt_drop_resources_count = 1;
+			break;
+
+		case 'a':
+			config_drop_resources_age = atoi(optarg);
+			opt_drop_resources_age = 1;
 			break;
 
 		case 'h':
@@ -792,14 +942,41 @@
 int main(int argc, char **argv)
 {
 	prog_name = argv[0];
+
 	INIT_LIST_HEAD(&mounts);
 	INIT_LIST_HEAD(&withdrawn_mounts);
 
+	config_no_withdraw = DEFAULT_NO_WITHDRAW;
+	config_no_plock = DEFAULT_NO_PLOCK;
+	config_plock_rate_limit = DEFAULT_PLOCK_RATE_LIMIT;
+	config_plock_ownership = DEFAULT_PLOCK_OWNERSHIP;
+	config_drop_resources_time = DEFAULT_DROP_RESOURCES_TIME;
+	config_drop_resources_count = DEFAULT_DROP_RESOURCES_COUNT;
+	config_drop_resources_age = DEFAULT_DROP_RESOURCES_AGE;
+
 	decode_arguments(argc, argv);
 
 	if (!daemon_debug_opt)
 		daemonize();
 
+	/* ccs settings override the defaults, but not the command line */
+	set_ccs_config();
+
+	if (config_plock_ownership)
+		memcpy(protocol_active, protocol_v200, sizeof(protocol_v200));
+	else
+		memcpy(protocol_active, protocol_v100, sizeof(protocol_v100));
+
+	log_debug("config_no_withdraw %d", config_no_withdraw);
+	log_debug("config_no_plock %d", config_no_plock);
+	log_debug("config_plock_rate_limit %u", config_plock_rate_limit);
+	log_debug("config_plock_ownership %u", config_plock_ownership);
+	log_debug("config_drop_resources_time %u", config_drop_resources_time);
+	log_debug("config_drop_resources_count %u", config_drop_resources_count);
+	log_debug("config_drop_resources_age %u", config_drop_resources_age);
+	log_debug("protocol %u.%u.%u", protocol_active[0], protocol_active[1],
+		  protocol_active[2]);
+
 	set_scheduler();
 	set_oom_adj(-16);
 
--- cluster/group/gfs_controld/plock.c	2007/07/18 20:35:50	1.33
+++ cluster/group/gfs_controld/plock.c	2007/11/28 20:49:08	1.34
@@ -1,7 +1,7 @@
 /******************************************************************************
 *******************************************************************************
 **
-**  Copyright (C) 2005 Red Hat, Inc.  All rights reserved.
+**  Copyright (C) 2005-2007 Red Hat, Inc.  All rights reserved.
 **
 **  This copyrighted material is made available to anyone wishing to use,
 **  modify, copy, or redistribute it subject to the terms and conditions
@@ -44,20 +44,26 @@
 #define CONTROL_DIR             "/dev/misc"
 #define CONTROL_NAME            "lock_dlm_plock"
 
-static int control_fd = -1;
 extern int our_nodeid;
-static int plocks_online = 0;
 extern int message_flow_control_on;
-extern int no_plock;
 
-extern uint32_t plock_rate_limit;
-uint32_t plock_read_count;
-uint32_t plock_recv_count;
-uint32_t plock_rate_delays;
-struct timeval plock_read_time;
-struct timeval plock_recv_time;
-struct timeval plock_rate_last;
+/* user configurable */
+extern int config_no_plock;
+extern uint32_t config_plock_rate_limit;
+extern uint32_t config_plock_ownership;
+extern uint32_t config_drop_resources_time;
+extern uint32_t config_drop_resources_count;
+extern uint32_t config_drop_resources_age;
+
+static int plocks_online = 0;
+static uint32_t plock_read_count;
+static uint32_t plock_recv_count;
+static uint32_t plock_rate_delays;
+static struct timeval plock_read_time;
+static struct timeval plock_recv_time;
+static struct timeval plock_rate_last;
 
+static int control_fd = -1;
 static SaCkptHandleT ckpt_handle;
 static SaCkptCallbacksT callbacks = { 0, 0 };
 static SaVersionT version = { 'B', 1, 1 };
@@ -76,13 +82,22 @@
 	uint32_t pad;
 };
 
+#define R_GOT_UNOWN 0x00000001 /* have received owner=0 message */
+
 struct resource {
 	struct list_head	list;	   /* list of resources */
 	uint64_t		number;
-	struct list_head	locks;	  /* one lock for each range */
+	int                     owner;     /* nodeid or 0 for unowned */
+	uint32_t		flags;
+	struct timeval          last_access;
+	struct list_head	locks;	   /* one lock for each range */
 	struct list_head	waiters;
+	struct list_head        pending;   /* discovering r owner */
 };
 
+#define P_SYNCING 0x00000001 /* plock has been sent as part of sync but not
+				yet received */
+
 struct posix_lock {
 	struct list_head	list;	   /* resource locks or waiters list */
 	uint32_t		pid;
@@ -91,13 +106,26 @@
 	uint64_t		end;
 	int			ex;
 	int			nodeid;
+	uint32_t		flags;
 };
 
 struct lock_waiter {
 	struct list_head	list;
+	uint32_t		flags;
 	struct gdlm_plock_info	info;
 };
 
+
+static void send_own(struct mountgroup *mg, struct resource *r, int owner);
+static void save_pending_plock(struct mountgroup *mg, struct resource *r,
+			       struct gdlm_plock_info *in);
+
+
+static int got_unown(struct resource *r)
+{
+	return !!(r->flags & R_GOT_UNOWN);
+}
+
 static void info_bswap_out(struct gdlm_plock_info *i)
 {
 	i->version[0]	= cpu_to_le32(i->version[0]);
@@ -292,7 +320,7 @@
 	gettimeofday(&plock_recv_time, NULL);
 	gettimeofday(&plock_rate_last, NULL);
 
-	if (no_plock)
+	if (config_no_plock)
 		goto control;
 
 	err = saCkptInitialize(&ckpt_handle, &callbacks, &version);
@@ -333,113 +361,6 @@
 	return dt;
 }
 
-int process_plocks(void)
-{
-	struct mountgroup *mg;
-	struct gdlm_plock_info info;
-	struct gdlm_header *hd;
-	struct timeval now;
-	char *buf;
-	uint64_t usec;
-	int len, rv;
-
-	/* Don't send more messages while the cpg message queue is backed up */
-
-	if (message_flow_control_on) {
-		update_flow_control_status();
-		if (message_flow_control_on)
-			return -EBUSY;
-	}
-
-	/* Every N ops we check how long it's taken to do those N ops.
-	   If it's less than 1000 ms, we don't take any more. */
-
-	if (plock_rate_limit && plock_read_count &&
-	    !(plock_read_count % plock_rate_limit)) {
-		gettimeofday(&now, NULL);
-		if (time_diff_ms(&plock_rate_last, &now) < 1000) {
-			plock_rate_delays++;
-			return -EBUSY;
-		}
-		plock_rate_last = now;
-	}
-
-	memset(&info, 0, sizeof(info));
-
-	rv = do_read(control_fd, &info, sizeof(info));
-	if (rv < 0) {
-		log_debug("process_plocks: read error %d fd %d\n",
-			  errno, control_fd);
-		return 0;
-	}
-
-	if (!plocks_online) {
-		rv = -ENOSYS;
-		goto fail;
-	}
-
-	mg = find_mg_id(info.fsid);
-	if (!mg) {
-		log_debug("process_plocks: no mg id %x", info.fsid);
-		rv = -EEXIST;
-		goto fail;
-	}
-
-	log_plock(mg, "read plock %llx %s %s %llx-%llx %d/%u/%llx w %d",
-		  (unsigned long long)info.number,
-		  op_str(info.optype),
-		  ex_str(info.optype, info.ex),
-		  (unsigned long long)info.start, (unsigned long long)info.end,
-		  info.nodeid, info.pid, (unsigned long long)info.owner,
-		  info.wait);
-
-	/* report plock rate and any delays since the last report */
-	plock_read_count++;
-	if (!(plock_read_count % 1000)) {
-		gettimeofday(&now, NULL);
-		usec = dt_usec(&plock_read_time, &now) ;
-		log_group(mg, "plock_read_count %u time %.3f s delays %u",
-			  plock_read_count, usec * 1.e-6, plock_rate_delays);
-		plock_read_time = now;
-		plock_rate_delays = 0;
-	}
-
-	len = sizeof(struct gdlm_header) + sizeof(struct gdlm_plock_info);
-	buf = malloc(len);
-	if (!buf) {
-		rv = -ENOMEM;
-		goto fail;
-	}
-	memset(buf, 0, len);
-
-	info.nodeid = our_nodeid;
-
-	hd = (struct gdlm_header *)buf;
-	hd->type = MSG_PLOCK;
-	hd->nodeid = our_nodeid;
-	hd->to_nodeid = 0;
-	memcpy(buf + sizeof(struct gdlm_header), &info, sizeof(info));
-
-	info_bswap_out((struct gdlm_plock_info *) buf +
-						  sizeof(struct gdlm_header));
-
-	rv = send_group_message(mg, len, buf);
-
-	free(buf);
-
-	if (rv) {
-		log_error("send plock error %d", rv);
-		goto fail;
-	}
-	return 0;
-
- fail:
-	info.rv = rv;
-	rv = write(control_fd, &info, sizeof(info));
-
-	return 0;
-}
-
 static struct resource *search_resource(struct mountgroup *mg, uint64_t number)
 {
 	struct resource *r;
@@ -468,6 +389,7 @@
 
 	r = malloc(sizeof(struct resource));
 	if (!r) {
+		log_error("find_resource no memory %d", errno);
 		rv = -ENOMEM;
 		goto out;
 	}
@@ -476,15 +398,27 @@
 	r->number = number;
 	INIT_LIST_HEAD(&r->locks);
 	INIT_LIST_HEAD(&r->waiters);
+	INIT_LIST_HEAD(&r->pending);
+
+	if (config_plock_ownership)
+		r->owner = -1;
+	else
+		r->owner = 0;
 
 	list_add_tail(&r->list, &mg->resources);
  out:
+	if (r)
+		gettimeofday(&r->last_access, NULL);
 	*r_out = r;
 	return rv;
 }
 
 static void put_resource(struct resource *r)
 {
+	/* with ownership, resources are only freed via drop messages */
+	if (config_plock_ownership)
+		return;
+
 	if (list_empty(&r->locks) && list_empty(&r->waiters)) {
 		list_del(&r->list);
 		free(r);
@@ -825,6 +759,7 @@
 
 {
 	struct lock_waiter *w;
+
 	w = malloc(sizeof(struct lock_waiter));
 	if (!w)
 		return -ENOMEM;
@@ -873,15 +808,11 @@
 	}
 }
 
-static void do_lock(struct mountgroup *mg, struct gdlm_plock_info *in)
+static void do_lock(struct mountgroup *mg, struct gdlm_plock_info *in,
+		    struct resource *r)
 {
-	struct resource *r = NULL;
 	int rv;
 
-	rv = find_resource(mg, in->number, 1, &r);
-	if (rv)
-		goto out;
-
 	if (is_conflict(r, in, 0)) {
 		if (!in->wait)
 			rv = -EAGAIN;
@@ -902,41 +833,57 @@
 	put_resource(r);
 }
 
-static void do_unlock(struct mountgroup *mg, struct gdlm_plock_info *in)
+static void do_unlock(struct mountgroup *mg, struct gdlm_plock_info *in,
+		      struct resource *r)
 {
-	struct resource *r = NULL;
 	int rv;
 
-	rv = find_resource(mg, in->number, 0, &r);
-	if (!rv)
-		rv = unlock_internal(mg, r, in);
+	rv = unlock_internal(mg, r, in);
 
 	if (in->nodeid == our_nodeid)
 		write_result(mg, in, rv);
 
-	if (r) {
-		do_waiters(mg, r);
-		put_resource(r);
-	}
+	do_waiters(mg, r);
+	put_resource(r);
 }
 
-static void do_get(struct mountgroup *mg, struct gdlm_plock_info *in)
+/* we don't even get to this function if the getlk isn't from us */
+
+static void do_get(struct mountgroup *mg, struct gdlm_plock_info *in,
+		   struct resource *r)
 {
-	struct resource *r = NULL;
 	int rv;
 
-	rv = find_resource(mg, in->number, 0, &r);
-	if (rv)
-		goto out;
-
 	if (is_conflict(r, in, 1))
 		rv = 1;
 	else
 		rv = 0;
- out:
+
 	write_result(mg, in, rv);
 }
 
+static void __receive_plock(struct mountgroup *mg, struct gdlm_plock_info *in,
+			    int from, struct resource *r)
+{
+	switch (in->optype) {
+	case GDLM_PLOCK_OP_LOCK:
+		mg->last_plock_time = time(NULL);
+		do_lock(mg, in, r);
+		break;
+	case GDLM_PLOCK_OP_UNLOCK:
+		mg->last_plock_time = time(NULL);
+		do_unlock(mg, in, r);
+		break;
+	case GDLM_PLOCK_OP_GET:
+		do_get(mg, in, r);
+		break;
+	default:
+		log_error("receive_plock from %d optype %d", from, in->optype);
+		if (from == our_nodeid)
+			write_result(mg, in, -EINVAL);
+	}
+}
+
 /* When mg members receive our options message (for our mount), one of them
    saves all plock state received to that point in a checkpoint and then sends
    us our journals message.  We know to retrieve the plock state from the
@@ -947,16 +894,16 @@
    set save_plocks (when we see our options message) can be ignored because it
    should be reflected in the checkpointed state. */
 
-void _receive_plock(struct mountgroup *mg, char *buf, int len, int from)
+static void _receive_plock(struct mountgroup *mg, char *buf, int len, int from)
 {
 	struct gdlm_plock_info info;
 	struct gdlm_header *hd = (struct gdlm_header *) buf;
+	struct resource *r = NULL;
 	struct timeval now;
 	uint64_t usec;
-	int rv = 0;
+	int rv, create;
 
 	memcpy(&info, buf + sizeof(struct gdlm_header), sizeof(info));
-
 	info_bswap_in(&info);
 
 	log_plock(mg, "receive plock %llx %s %s %llx-%llx %d/%u/%llx w %d",
@@ -982,30 +929,80 @@
 	if (from != hd->nodeid || from != info.nodeid) {
 		log_error("receive_plock from %d header %d info %d",
 			  from, hd->nodeid, info.nodeid);
-		rv = -EINVAL;
-		goto out;
+		return;
 	}
 
-	switch (info.optype) {
-	case GDLM_PLOCK_OP_LOCK:
-		mg->last_plock_time = time(NULL);
-		do_lock(mg, &info);
-		break;
-	case GDLM_PLOCK_OP_UNLOCK:
-		mg->last_plock_time = time(NULL);
-		do_unlock(mg, &info);
-		break;
-	case GDLM_PLOCK_OP_GET:
-		do_get(mg, &info);
-		break;
-	default:
-		log_error("receive_plock from %d optype %d", from, info.optype);
-		rv = -EINVAL;
+	create = !config_plock_ownership;
+
+	rv = find_resource(mg, info.number, create, &r);
+
+	if (rv && config_plock_ownership) {
+		/* There must have been a race with a drop, so we need to
+		   ignore this plock op which will be resent.  If we're the one
+		   who sent the plock, we need to send_own() and put it on the
+		   pending list to resend once the owner is established. */
+
+		log_debug("receive_plock from %d no r %llx", from,
+			  (unsigned long long)info.number);
+
+		if (from != our_nodeid)
+			return;
+
+		rv = find_resource(mg, info.number, 1, &r);
+		if (rv)
+			return;
+		send_own(mg, r, our_nodeid);
+		save_pending_plock(mg, r, &info);
+		return;
 	}
+	if (rv) {
+		/* r not found, rv is -ENOENT, this shouldn't happen because
+		   process_plocks() creates a resource for every op */
 
- out:
-	if (from == our_nodeid && rv)
-		write_result(mg, &info, rv);
+		log_error("receive_plock from %d no r %llx %d", from,
+			  (unsigned long long)info.number, rv);
+		return;
+	}
+
+	/* The owner should almost always be 0 here, but other owners may
+	   be possible given odd combinations of races with drop.  Odd races to
+	   worry about (some seem pretty improbable):
+
+	   - A sends drop, B sends plock, receive drop, receive plock.
+	   This is addressed above.
+
+	   - A sends drop, B sends two plocks, receive drop, receive plocks.
+	   Receiving the first plock is the previous case, receiving the
+	   second plock will find r with owner of -1.
+
+	   - A sends drop, B sends two plocks, receive drop, C sends own,
+	   receive plock, B sends own, receive own (C), receive plock,
+	   receive own (B).
+
+	   Haven't tried to cook up a scenario that would lead to the
+	   last case below; receiving a plock from ourself and finding
+	   we're the owner of r. */
+
+	/* may want to supress this if some of them are common enough */
+	if (r->owner)
+		log_error("receive_plock from %d r %llx owner %d", from,
+			  (unsigned long long)info.number, r->owner);
+
+	if (!r->owner) {
+		__receive_plock(mg, &info, from, r);
+
+	} else if (r->owner == -1) {
+		if (from == our_nodeid)
+			save_pending_plock(mg, r, &info);
+
+	} else if (r->owner != our_nodeid) {
+		if (from == our_nodeid)
+			save_pending_plock(mg, r, &info);
+
+	} else if (r->owner == our_nodeid) {
+		if (from == our_nodeid)
+			__receive_plock(mg, &info, from, r);
+	}
 }
 
 void receive_plock(struct mountgroup *mg, char *buf, int len, int from)
@@ -1023,92 +1020,696 @@
 	_receive_plock(mg, buf, len, from);
 }
 
-void process_saved_plocks(struct mountgroup *mg)
+static int send_struct_info(struct mountgroup *mg, struct gdlm_plock_info *in,
+			    int msg_type)
 {
-	struct save_msg *sm, *sm2;
+	char *buf;
+	int rv, len;
+	struct gdlm_header *hd;
 
-	if (list_empty(&mg->saved_messages))
-		return;
+	len = sizeof(struct gdlm_header) + sizeof(struct gdlm_plock_info);
+	buf = malloc(len);
+	if (!buf) {
+		rv = -ENOMEM;
+		goto out;
+	}
+	memset(buf, 0, len);
 
-	log_group(mg, "process_saved_plocks");
+	hd = (struct gdlm_header *)buf;
+	hd->type = msg_type;
+	hd->nodeid = our_nodeid;
+	hd->to_nodeid = 0;
 
-	list_for_each_entry_safe(sm, sm2, &mg->saved_messages, list) {
-		if (sm->type != MSG_PLOCK)
-			continue;
-		_receive_plock(mg, sm->buf, sm->len, sm->nodeid);
-		list_del(&sm->list);
-		free(sm);
-	}
+	memcpy(buf + sizeof(struct gdlm_header), in, sizeof(*in));
+	info_bswap_out((struct gdlm_plock_info *) buf + sizeof(*hd));
+
+	rv = send_group_message(mg, len, buf);
+
+	free(buf);
+ out:
+	if (rv)
+		log_error("send plock message error %d", rv);
+	return rv;
 }
 
-void plock_exit(void)
+static void send_plock(struct mountgroup *mg, struct resource *r,
+		       struct gdlm_plock_info *in)
 {
-	if (plocks_online)
-		saCkptFinalize(ckpt_handle);
+	send_struct_info(mg, in, MSG_PLOCK);
 }
 
-void pack_section_buf(struct mountgroup *mg, struct resource *r)
+static void send_own(struct mountgroup *mg, struct resource *r, int owner)
 {
-	struct pack_plock *pp;
-	struct posix_lock *po;
-	struct lock_waiter *w;
-	int count = 0;
-
-	memset(&section_buf, 0, sizeof(section_buf));
+	struct gdlm_plock_info info;
 
-	pp = (struct pack_plock *) &section_buf;
+	/* if we've already sent an own message for this resource,
+	   (pending list is not empty), then we shouldn't send another */
 
-	list_for_each_entry(po, &r->locks, list) {
-		pp->start	= cpu_to_le64(po->start);
-		pp->end		= cpu_to_le64(po->end);
-		pp->owner	= cpu_to_le64(po->owner);
-		pp->pid		= cpu_to_le32(po->pid);
-		pp->nodeid	= cpu_to_le32(po->nodeid);
-		pp->ex		= po->ex;
-		pp->waiter	= 0;
-		pp++;
-		count++;
+	if (!list_empty(&r->pending)) {
+		log_debug("send_own %llx already pending",
+			  (unsigned long long)r->number);
+		return;
 	}
 
-	list_for_each_entry(w, &r->waiters, list) {
-		pp->start	= cpu_to_le64(w->info.start);
-		pp->end		= cpu_to_le64(w->info.end);
-		pp->owner	= cpu_to_le64(w->info.owner);
-		pp->pid		= cpu_to_le32(w->info.pid);
-		pp->nodeid	= cpu_to_le32(w->info.nodeid);
-		pp->ex		= w->info.ex;
-		pp->waiter	= 1;
-		pp++;
-		count++;
-	}
+	memset(&info, 0, sizeof(info));
+	info.number = r->number;
+	info.nodeid = owner;
 
-	section_len = count * sizeof(struct pack_plock);
+	send_struct_info(mg, &info, MSG_PLOCK_OWN);
 }
 
-int unpack_section_buf(struct mountgroup *mg, char *numbuf, int buflen)
+static void send_syncs(struct mountgroup *mg, struct resource *r)
 {
-	struct pack_plock *pp;
+	struct gdlm_plock_info info;
 	struct posix_lock *po;
 	struct lock_waiter *w;
-	struct resource *r;
-	int count = section_len / sizeof(struct pack_plock);
-	int i;
-	unsigned long long num;
+	int rv;
 
-	r = malloc(sizeof(struct resource));
-	if (!r)
-		return -ENOMEM;
-	memset(r, 0, sizeof(struct resource));
-	INIT_LIST_HEAD(&r->locks);
-	INIT_LIST_HEAD(&r->waiters);
-	sscanf(numbuf, "r%llu", &num);
-	r->number = num;
+	list_for_each_entry(po, &r->locks, list) {
+		memset(&info, 0, sizeof(info));
+		info.number    = r->number;
+		info.start     = po->start;
+		info.end       = po->end;
+		info.nodeid    = po->nodeid;
+		info.owner     = po->owner;
+		info.pid       = po->pid;
+		info.ex        = po->ex;
 
-	pp = (struct pack_plock *) &section_buf;
+		rv = send_struct_info(mg, &info, MSG_PLOCK_SYNC_LOCK);
+		if (rv)
+			goto out;
 
-	for (i = 0; i < count; i++) {
-		if (!pp->waiter) {
-			po = malloc(sizeof(struct posix_lock));
+		po->flags |= P_SYNCING;
+	}
+
+	list_for_each_entry(w, &r->waiters, list) {
+		memcpy(&info, &w->info, sizeof(info));
+
+		rv = send_struct_info(mg, &info, MSG_PLOCK_SYNC_WAITER);
+		if (rv)
+			goto out;
+
+		w->flags |= P_SYNCING;
+	}
+ out:
+	return;
+}
+
+static void send_drop(struct mountgroup *mg, struct resource *r)
+{
+	struct gdlm_plock_info info;
+
+	memset(&info, 0, sizeof(info));
+	info.number = r->number;
+
+	send_struct_info(mg, &info, MSG_PLOCK_DROP);
+}
+
+/* plock op can't be handled until we know the owner value of the resource,
+   so the op is saved on the pending list until the r owner is established */
+
+static void save_pending_plock(struct mountgroup *mg, struct resource *r,
+			       struct gdlm_plock_info *in)
+{
+	struct lock_waiter *w;
+
+	w = malloc(sizeof(struct lock_waiter));
+	if (!w) {
+		log_error("save_pending_plock no mem");
+		return;
+	}
+	memcpy(&w->info, in, sizeof(struct gdlm_plock_info));
+	list_add_tail(&w->list, &r->pending);
+}
+
+/* plock ops are on pending list waiting for ownership to be established.
+   owner has now become us, so add these plocks to r */
+
+static void add_pending_plocks(struct mountgroup *mg, struct resource *r)
+{
+	struct lock_waiter *w, *safe;
+
+	list_for_each_entry_safe(w, safe, &r->pending, list) {
+		__receive_plock(mg, &w->info, our_nodeid, r);
+		list_del(&w->list);
+		free(w);
+	}
+}
+
+/* plock ops are on pending list waiting for ownership to be established.
+   owner has now become 0, so send these plocks to everyone */
+
+static void send_pending_plocks(struct mountgroup *mg, struct resource *r)
+{
+	struct lock_waiter *w, *safe;
+
+	list_for_each_entry_safe(w, safe, &r->pending, list) {
+		send_plock(mg, r, &w->info);
+		list_del(&w->list);
+		free(w);
+	}
+}
+
+static void _receive_own(struct mountgroup *mg, char *buf, int len, int from)
+{
+	struct gdlm_header *hd = (struct gdlm_header *) buf;
+	struct gdlm_plock_info info;
+	struct resource *r;
+	int should_not_happen = 0;
+	int rv;
+
+	memcpy(&info, buf + sizeof(struct gdlm_header), sizeof(info));
+	info_bswap_in(&info);
+
+	log_plock(mg, "receive own %llx from %u owner %u",
+		  (unsigned long long)info.number, hd->nodeid, info.nodeid);
+
+	rv = find_resource(mg, info.number, 1, &r);
+	if (rv)
+		return;
+
+	if (from == our_nodeid) {
+		/*
+		 * received our own own message
+		 */
+
+		if (info.nodeid == 0) {
+			/* we are setting owner to 0 */
+
+			if (r->owner == our_nodeid) {
+				/* we set owner to 0 when we relinquish
+				   ownership */
+				should_not_happen = 1;
+			} else if (r->owner == 0) {
+				/* this happens when we relinquish ownership */
+				r->flags |= R_GOT_UNOWN;
+			} else {
+				should_not_happen = 1;
+			}
+
+		} else if (info.nodeid == our_nodeid) {
+			/* we are setting owner to ourself */
+
+			if (r->owner == -1) {
+				/* we have gained ownership */
+				r->owner = our_nodeid;
+				add_pending_plocks(mg, r);
+			} else if (r->owner == our_nodeid) {
+				should_not_happen = 1;
+			} else if (r->owner == 0) {
+				send_pending_plocks(mg, r);
+			} else {
+				/* resource is owned by other node;
+				   they should set owner to 0 shortly */
+			}
+
+		} else {
+			/* we should only ever set owner to 0 or ourself */
+			should_not_happen = 1;
+		}
+	} else {
+		/*
+		 * received own message from another node
+		 */
+
+		if (info.nodeid == 0) {
+			/* other node is setting owner to 0 */
+
+			if (r->owner == -1) {
+				/* we should have a record of the owner before
+				   it relinquishes */
+				should_not_happen = 1;
+			} else if (r->owner == our_nodeid) {
+				/* only the owner should relinquish */
+				should_not_happen = 1;
+			} else if (r->owner == 0) {
+				should_not_happen = 1;
+			} else {
+				r->owner = 0;
+				r->flags |= R_GOT_UNOWN;
+				send_pending_plocks(mg, r);
+			}
+
+		} else if (info.nodeid == from) {
+			/* other node is setting owner to itself */
+
+			if (r->owner == -1) {
+				/* normal path for a node becoming owner */
+				r->owner = from;
+			} else if (r->owner == our_nodeid) {
+				/* we relinquish our ownership: sync our local
+				   plocks to everyone, then set owner to 0 */
+				send_syncs(mg, r);
+				send_own(mg, r, 0);
+				/* we need to set owner to 0 here because
+				   local ops may arrive before we receive
+				   our send_own message and can't be added
+				   locally */
+				r->owner = 0;
+			} else if (r->owner == 0) {
+				/* can happen because we set owner to 0 before
+				   we receive our send_own sent just above */
+			} else {
+				/* do nothing, current owner should be
+				   relinquishing its ownership */
+			}
+
+		} else if (info.nodeid == our_nodeid) {
+			/* no one else should try to set the owner to us */
+			should_not_happen = 1;
+		} else {
+			/* a node should only ever set owner to 0 or itself */
+			should_not_happen = 1;
+		}
+	}
+
+	if (should_not_happen) {
+		log_error("receive_own from %u %llx info nodeid %d r owner %d",
+			  from, (unsigned long long)r->number, info.nodeid,
+			  r->owner);
+	}
+}
+
+void receive_own(struct mountgroup *mg, char *buf, int len, int from)
+{
+	if (mg->save_plocks) {
+		save_message(mg, buf, len, from, MSG_PLOCK_OWN);
+		return;
+	}
+
+	_receive_own(mg, buf, len, from);
+}
+
+static void clear_syncing_flag(struct resource *r, struct gdlm_plock_info *in)
+{
+	struct posix_lock *po;
+	struct lock_waiter *w;
+
+	list_for_each_entry(po, &r->locks, list) {
+		if ((po->flags & P_SYNCING) &&
+		    in->start  == po->start &&
+		    in->end    == po->end &&
+		    in->nodeid == po->nodeid &&
+		    in->owner  == po->owner &&
+		    in->pid    == po->pid &&
+		    in->ex     == po->ex) {
+			po->flags &= ~P_SYNCING;
+			return;
+		}
+	}
+
+	list_for_each_entry(w, &r->waiters, list) {
+		if ((w->flags & P_SYNCING) &&
+		    in->start  == w->info.start &&
+		    in->end    == w->info.end &&
+		    in->nodeid == w->info.nodeid &&
+		    in->owner  == w->info.owner &&
+		    in->pid    == w->info.pid &&
+		    in->ex     == w->info.ex) {
+			w->flags &= ~P_SYNCING;
+			return;
+		}
+	}
+
+	log_error("clear_syncing %llx no match %s %llx-%llx %d/%u/%llx",
+		  (unsigned long long)r->number, in->ex ? "WR" : "RD", 
+		  (unsigned long long)in->start, (unsigned long long)in->end,
+		  in->nodeid, in->pid, (unsigned long long)in->owner);
+}
+
+static void _receive_sync(struct mountgroup *mg, char *buf, int len, int from)
+{
+	struct gdlm_plock_info info;
+	struct gdlm_header *hd = (struct gdlm_header *) buf;
+	struct resource *r;
+	int rv;
+
+	memcpy(&info, buf + sizeof(struct gdlm_header), sizeof(info));
+	info_bswap_in(&info);
+
+	log_plock(mg, "receive sync %llx from %u %s %llx-%llx %d/%u/%llx",
+		  (unsigned long long)info.number, from, info.ex ? "WR" : "RD",
+		  (unsigned long long)info.start, (unsigned long long)info.end,
+		  info.nodeid, info.pid, (unsigned long long)info.owner);
+
+	rv = find_resource(mg, info.number, 0, &r);
+	if (rv) {
+		log_error("receive_sync no r %llx from %d", info.number, from);
+		return;
+	}
+
+	if (from == our_nodeid) {
+		/* this plock now in sync on all nodes */
+		clear_syncing_flag(r, &info);
+		return;
+	}
+
+	if (hd->type == MSG_PLOCK_SYNC_LOCK)
+		add_lock(r, info.nodeid, info.owner, info.pid, !info.ex, 
+			 info.start, info.end);
+	else if (hd->type == MSG_PLOCK_SYNC_WAITER)
+		add_waiter(mg, r, &info);
+}
+
+void receive_sync(struct mountgroup *mg, char *buf, int len, int from)
+{
+	struct gdlm_header *hd = (struct gdlm_header *) buf;
+
+	if (mg->save_plocks) {
+		save_message(mg, buf, len, from, hd->type);
+		return;
+	}
+
+	_receive_sync(mg, buf, len, from);
+}
+
+static void _receive_drop(struct mountgroup *mg, char *buf, int len, int from)
+{
+	struct gdlm_plock_info info;
+	struct resource *r;
+	int rv;
+
+	memcpy(&info, buf + sizeof(struct gdlm_header), sizeof(info));
+	info_bswap_in(&info);
+
+	log_plock(mg, "receive drop %llx from %u",
+		  (unsigned long long)info.number, from);
+
+	rv = find_resource(mg, info.number, 0, &r);
+	if (rv) {
+		/* we'll find no r if two nodes sent drop at once */
+		log_debug("receive_drop from %d no r %llx", from,
+			  (unsigned long long)info.number);
+		return;
+	}
+
+	if (r->owner != 0) {
+		/* shouldn't happen */
+		log_error("receive_drop from %d r %llx owner %d", from,
+			  (unsigned long long)r->number, r->owner);
+		return;
+	}
+
+	if (!list_empty(&r->pending)) {
+		/* shouldn't happen */
+		log_error("receive_drop from %d r %llx pending op", from,
+			  (unsigned long long)r->number);
+		return;
+	}
+
+	/* the decision to drop or not must be based on things that are
+	   guaranteed to be the same on all nodes */
+
+	if (list_empty(&r->locks) && list_empty(&r->waiters)) {
+		list_del(&r->list);
+		free(r);
+	} else {
+		/* A sent drop, B sent a plock, receive plock, receive drop */
+		log_debug("receive_drop from %d r %llx in use", from,
+			  (unsigned long long)r->number);
+	}
+}
+
+void receive_drop(struct mountgroup *mg, char *buf, int len, int from)
+{
+	if (mg->save_plocks) {
+		save_message(mg, buf, len, from, MSG_PLOCK_DROP);
+		return;
+	}
+
+	_receive_drop(mg, buf, len, from);
+}
+
+/* We only drop resources from the unowned state to simplify things.
+   If we want to drop a resource we own, we unown/relinquish it first. */
+
+/* FIXME: in the transition from owner = us, to owner = 0, to drop;
+   we want the second period to be shorter than the first */
+
+static int drop_resources(struct mountgroup *mg)
+{
+	struct resource *r;
+	struct timeval now;
+	int count = 0;
+
+	gettimeofday(&now, NULL);
+
+	/* try to drop the oldest, unused resources */
+
+	list_for_each_entry_reverse(r, &mg->resources, list) {
+		if (count >= config_drop_resources_count)
+			break;
+		if (r->owner && r->owner != our_nodeid)
+			continue;
+		if (time_diff_ms(&r->last_access, &now) <
+		    config_drop_resources_age)
+			continue;
+
+		if (list_empty(&r->locks) && list_empty(&r->waiters)) {
+			if (r->owner == our_nodeid) {
+				send_own(mg, r, 0);
+				r->owner = 0;
+			} else if (r->owner == 0 && got_unown(r)) {
+				send_drop(mg, r);
+			}
+
+			count++;
+		}
+	}
+
+	return 0;
+}
+
+int process_plocks(void)
+{
+	struct mountgroup *mg;
+	struct resource *r;
+	struct gdlm_plock_info info;
+	struct timeval now;
+	uint64_t usec;
+	int rv;
+
+	/* Don't send more messages while the cpg message queue is backed up */
+
+	if (message_flow_control_on) {
+		update_flow_control_status();
+		if (message_flow_control_on)
+			return -EBUSY;
+	}
+
+	gettimeofday(&now, NULL);
+
+	/* Every N ops we check how long it's taken to do those N ops.
+	   If it's less than 1000 ms, we don't take any more. */
+
+	if (config_plock_rate_limit && plock_read_count &&
+	    !(plock_read_count % config_plock_rate_limit)) {
+		if (time_diff_ms(&plock_rate_last, &now) < 1000) {
+			plock_rate_delays++;
+			return -EBUSY;
+		}
+		plock_rate_last = now;
+	}
+
+	memset(&info, 0, sizeof(info));
+
+	rv = do_read(control_fd, &info, sizeof(info));
+	if (rv < 0) {
+		log_debug("process_plocks: read error %d fd %d\n",
+			  errno, control_fd);
+		return 0;
+	}
+
+	/* kernel doesn't set the nodeid field */
+	info.nodeid = our_nodeid;
+
+	if (!plocks_online) {
+		rv = -ENOSYS;
+		goto fail;
+	}
+
+	mg = find_mg_id(info.fsid);
+	if (!mg) {
+		log_debug("process_plocks: no mg id %x", info.fsid);
+		rv = -EEXIST;
+		goto fail;
+	}
+
+	log_plock(mg, "read plock %llx %s %s %llx-%llx %d/%u/%llx w %d",
+		  (unsigned long long)info.number,
+		  op_str(info.optype),
+		  ex_str(info.optype, info.ex),
+		  (unsigned long long)info.start, (unsigned long long)info.end,
+		  info.nodeid, info.pid, (unsigned long long)info.owner,
+		  info.wait);
+
+	/* report plock rate and any delays since the last report */
+	plock_read_count++;
+	if (!(plock_read_count % 1000)) {
+		usec = dt_usec(&plock_read_time, &now) ;
+		log_group(mg, "plock_read_count %u time %.3f s delays %u",
+			  plock_read_count, usec * 1.e-6, plock_rate_delays);
+		plock_read_time = now;
+		plock_rate_delays = 0;
+	}
+
+	rv = find_resource(mg, info.number, 1, &r);
+	if (rv)
+		goto fail;
+
+	if (r->owner == 0) {
+		/* plock state replicated on all nodes */
+		send_plock(mg, r, &info);
+
+	} else if (r->owner == our_nodeid) {
+		/* we are the owner of r, so our plocks are local */
+		__receive_plock(mg, &info, our_nodeid, r);
+
+	} else {
+		/* r owner is -1: r is new, try to become the owner;
+		   r owner > 0: tell other owner to give up ownership;
+		   both done with a message trying to set owner to ourself */
+		send_own(mg, r, our_nodeid);
+		save_pending_plock(mg, r, &info);
+	}
+
+	if (config_plock_ownership &&
+	    time_diff_ms(&mg->drop_resources_last, &now) >=
+	    		 config_drop_resources_time) {
+		mg->drop_resources_last = now;
+		drop_resources(mg);
+	}
+
+	return 0;
+
+ fail:
+	info.rv = rv;
+	rv = write(control_fd, &info, sizeof(info));
+
+	return 0;
+}
+
+void process_saved_plocks(struct mountgroup *mg)
+{
+	struct save_msg *sm, *sm2;
+
+	if (list_empty(&mg->saved_messages))
+		return;
+
+	log_group(mg, "process_saved_plocks");
+
+	list_for_each_entry_safe(sm, sm2, &mg->saved_messages, list) {
+		switch (sm->type) {
+		case MSG_PLOCK:
+			_receive_plock(mg, sm->buf, sm->len, sm->nodeid);
+			break;
+		case MSG_PLOCK_OWN:
+			_receive_own(mg, sm->buf, sm->len, sm->nodeid);
+			break;
+		case MSG_PLOCK_DROP:
+			_receive_drop(mg, sm->buf, sm->len, sm->nodeid);
+			break;
+		case MSG_PLOCK_SYNC_LOCK:
+		case MSG_PLOCK_SYNC_WAITER:
+			_receive_sync(mg, sm->buf, sm->len, sm->nodeid);
+			break;
+		default:
+			continue;
+		}
+
+		list_del(&sm->list);
+		free(sm);
+	}
+}
+
+void plock_exit(void)
+{
+	if (plocks_online)
+		saCkptFinalize(ckpt_handle);
+}
+
+/* locks still marked SYNCING should not go into the ckpt; the new node
+   will get those locks by receiving PLOCK_SYNC messages */
+
+static void pack_section_buf(struct mountgroup *mg, struct resource *r)
+{
+	struct pack_plock *pp;
+	struct posix_lock *po;
+	struct lock_waiter *w;
+	int count = 0;
+
+	/* plocks on owned resources are not replicated on other nodes */
+	if (r->owner == our_nodeid)
+		return;
+
+	pp = (struct pack_plock *) &section_buf;
+
+	list_for_each_entry(po, &r->locks, list) {
+		if (po->flags & P_SYNCING)
+			continue;
+		pp->start	= cpu_to_le64(po->start);
+		pp->end		= cpu_to_le64(po->end);
+		pp->owner	= cpu_to_le64(po->owner);
+		pp->pid		= cpu_to_le32(po->pid);
+		pp->nodeid	= cpu_to_le32(po->nodeid);
+		pp->ex		= po->ex;
+		pp->waiter	= 0;
+		pp++;
+		count++;
+	}
+
+	list_for_each_entry(w, &r->waiters, list) {
+		if (w->flags & P_SYNCING)
+			continue;
+		pp->start	= cpu_to_le64(w->info.start);
+		pp->end		= cpu_to_le64(w->info.end);
+		pp->owner	= cpu_to_le64(w->info.owner);
+		pp->pid		= cpu_to_le32(w->info.pid);
+		pp->nodeid	= cpu_to_le32(w->info.nodeid);
+		pp->ex		= w->info.ex;
+		pp->waiter	= 1;
+		pp++;
+		count++;
+	}
+
+	section_len = count * sizeof(struct pack_plock);
+}
+
+static int unpack_section_buf(struct mountgroup *mg, char *numbuf, int buflen)
+{
+	struct pack_plock *pp;
+	struct posix_lock *po;
+	struct lock_waiter *w;
+	struct resource *r;
+	int count = section_len / sizeof(struct pack_plock);
+	int i, owner = 0;
+	unsigned long long num;
+	struct timeval now;
+
+	gettimeofday(&now, NULL);
+
+	r = malloc(sizeof(struct resource));
+	if (!r)
+		return -ENOMEM;
+	memset(r, 0, sizeof(struct resource));
+	INIT_LIST_HEAD(&r->locks);
+	INIT_LIST_HEAD(&r->waiters);
+	INIT_LIST_HEAD(&r->pending);
+
+	if (config_plock_ownership)
+		sscanf(numbuf, "r%llu.%d", &num, &owner);
+	else
+		sscanf(numbuf, "r%llu", &num);
+
+	r->number = num;
+	r->owner = owner;
+	r->last_access = now;
+
+	pp = (struct pack_plock *) &section_buf;
+
+	for (i = 0; i < count; i++) {
+		if (!pp->waiter) {
+			po = malloc(sizeof(struct posix_lock));
 			po->start	= le64_to_cpu(pp->start);
 			po->end		= le64_to_cpu(pp->end);
 			po->owner	= le64_to_cpu(pp->owner);
@@ -1208,6 +1809,19 @@
 	return _unlink_checkpoint(mg, &name);
 }
 
+/*
+ * section id is r<inodenum>.<owner>, the maximum string length is:
+ * "r" prefix       =  1    strlen("r")
+ * max uint64       = 20    strlen("18446744073709551615")
+ * "." before owner =  1    strlen(".")
+ * max int          = 11    strlen("-2147483647")
+ * \0 at end        =  1
+ * ---------------------
+ *                    34    SECTION_NAME_LEN
+ */
+
+#define SECTION_NAME_LEN 34
+
 /* Copy all plock state into a checkpoint so new node can retrieve it.  The
    node creating the ckpt for the mounter needs to be the same node that's
    sending the mounter its journals message (i.e. the low nodeid).  The new
@@ -1228,12 +1842,12 @@
 	SaCkptCheckpointOpenFlagsT flags;
 	SaNameT name;
 	SaAisErrorT rv;
-	char buf[32];
+	char buf[SECTION_NAME_LEN];
 	struct resource *r;
 	struct posix_lock *po;
 	struct lock_waiter *w;
 	int r_count, lock_count, total_size, section_size, max_section_size;
-	int len;
+	int len, owner;
 
 	if (!plocks_online)
 		return;
@@ -1264,6 +1878,9 @@
 	max_section_size = 0;
 
 	list_for_each_entry(r, &mg->resources, list) {
+		if (r->owner == -1)
+			continue;
+
 		r_count++;
 		section_size = 0;
 		list_for_each_entry(po, &r->locks, list) {
@@ -1290,9 +1907,7 @@
 	attr.retentionDuration = SA_TIME_MAX;
 	attr.maxSections = r_count + 1;      /* don't know why we need +1 */
 	attr.maxSectionSize = max_section_size;
-	attr.maxSectionIdSize = 22;
-	
-	/* 22 = 20 digits in max uint64 + "r" prefix + \0 suffix */
+	attr.maxSectionIdSize = SECTION_NAME_LEN;
 
 	flags = SA_CKPT_CHECKPOINT_READ |
 		SA_CKPT_CHECKPOINT_WRITE |
@@ -1318,15 +1933,49 @@
 		  (unsigned long long)h);
 	mg->cp_handle = (uint64_t) h;
 
+	/* - If r owner is -1, ckpt nothing.
+	   - If r owner is us, ckpt owner of us and no plocks.
+	   - If r owner is other, ckpt that owner and any plocks we have on r
+	     (they've just been synced but owner=0 msg not recved yet).
+	   - If r owner is 0 and !got_unown, then we've just unowned r;
+	     ckpt owner of us and any plocks that don't have SYNCING set
+	     (plocks with SYNCING will be handled by our sync messages).
+	   - If r owner is 0 and got_unown, then ckpt owner 0 and all plocks;
+	     (there should be no SYNCING plocks) */
+
 	list_for_each_entry(r, &mg->resources, list) {
-		memset(&buf, 0, 32);
-		len = snprintf(buf, 32, "r%llu", (unsigned long long)r->number);
+		if (r->owner == -1)
+			continue;
+		else if (r->owner == our_nodeid)
+			owner = our_nodeid;
+		else if (r->owner)
+			owner = r->owner;
+		else if (!r->owner && !got_unown(r))
+			owner = our_nodeid;
+		else if (!r->owner)
+			owner = 0;
+		else {
+			log_error("store_plocks owner %d r %llx", r->owner,
+				  (unsigned long long)r->number);
+			continue;
+		}
+
+		memset(&buf, 0, sizeof(buf));
+		if (config_plock_ownership)
+			len = snprintf(buf, SECTION_NAME_LEN, "r%llu.%d",
+			       	       (unsigned long long)r->number, owner);
+		else
+			len = snprintf(buf, SECTION_NAME_LEN, "r%llu",
+			       	       (unsigned long long)r->number);
 
 		section_id.id = (void *)buf;
 		section_id.idLen = len + 1;
 		section_attr.sectionId = &section_id;
 		section_attr.expirationTime = SA_TIME_END;
 
+		memset(&section_buf, 0, sizeof(section_buf));
+		section_len = 0;
+
 		pack_section_buf(mg, r);
 
 		log_group(mg, "store_plocks: section size %u id %u \"%s\"",
@@ -1377,7 +2026,7 @@
 	SaCkptIOVectorElementT iov;
 	SaNameT name;
 	SaAisErrorT rv;
-	char buf[32];
+	char buf[SECTION_NAME_LEN];
 	int len;
 
 	if (!plocks_online)
@@ -1440,8 +2089,8 @@
 		iov.dataSize = desc.sectionSize;
 		iov.dataOffset = 0;
 
-		memset(&buf, 0, 32);
-		snprintf(buf, 32, "%s", desc.sectionId.id);
+		memset(&buf, 0, sizeof(buf));
+		snprintf(buf, SECTION_NAME_LEN, "%s", desc.sectionId.id);
 		log_group(mg, "retrieve_plocks: section size %llu id %u \"%s\"",
 			  (unsigned long long)iov.dataSize, iov.sectionId.idLen,
 			  buf);
@@ -1488,6 +2137,10 @@
 		saCkptCheckpointClose(h);
 }
 
+/* Called when a node has failed, or we're unmounting.  For a node failure, we
+   need to call this when the cpg confchg arrives so that we're guaranteed all
+   nodes do this in the same sequence wrt other messages. */
+
 void purge_plocks(struct mountgroup *mg, int nodeid, int unmount)
 {
 	struct posix_lock *po, *po2;
@@ -1512,11 +2165,23 @@
 			}
 		}
 
-		if (list_empty(&r->locks) && list_empty(&r->waiters)) {
+		/* TODO: haven't thought carefully about how this transition
+		   to owner 0 might interact with other owner messages in
+		   progress. */
+
+		if (r->owner == nodeid) {
+			r->owner = 0;
+			send_pending_plocks(mg, r);
+		}
+		
+		if (!list_empty(&r->waiters))
+			do_waiters(mg, r);
+
+		if (!config_plock_ownership &&
+		    list_empty(&r->locks) && list_empty(&r->waiters)) {
 			list_del(&r->list);
 			free(r);
-		} else
-			do_waiters(mg, r);
+		}
 	}
 	
 	if (purged)
@@ -1549,7 +2214,6 @@
 		return -1;
 
 	list_for_each_entry(r, &mg->resources, list) {
-
 		list_for_each_entry(po, &r->locks, list) {
 			snprintf(line, MAXLINE,
 			      "%llu %s %llu-%llu nodeid %d pid %u owner %llx\n",
--- cluster/group/gfs_controld/recover.c	2007/09/04 19:22:52	1.33
+++ cluster/group/gfs_controld/recover.c	2007/11/28 20:49:08	1.34
@@ -19,7 +19,7 @@
 extern char *clustername;
 extern int our_nodeid;
 extern group_handle_t gh;
-extern int no_withdraw;
+extern int config_no_withdraw;
 extern int dmsetup_wait;
 
 struct list_head mounts;
@@ -1328,8 +1328,6 @@
 				  memb->spectator,
 				  memb->wait_gfs_recover_done);
 
-			purge_plocks(mg, memb->nodeid, 0);
-
 			if (mg->master_nodeid == memb->nodeid &&
 			    memb->gone_type == GROUP_NODE_FAILED)
 				master_failed = 1;
@@ -2712,7 +2710,7 @@
 	char *name = strstr(table, ":") + 1;
 	int rv;
 
-	if (no_withdraw) {
+	if (config_no_withdraw) {
 		log_error("withdraw feature not enabled");
 		return 0;
 	}




More information about the Cluster-devel mailing list