LogoopenSUSE Build Service > Projects
Sign Up | Log In

View File dlm_controld_fix_plock_owner_syncing.patch of Package cluster (Project home:sschapiro:openstack:upstream)

commit 597111c711eeded2a9c6579064ce25de6ae0ab1b
Author: David Teigland <teigland@redhat.com>
Date:   Thu Aug 12 15:43:25 2010 -0500

    dlm_controld: fix plock owner syncing
    
    - The R_GOT_UNOWN flag was not always being set on resources when
    the owner was set to 0.  This would cause subsequent syncing of
    plock state to write the incorrect owner into the checkpoint.
    
    - Plocks were being written into the checkpoint unnecessarily for
    owned resources in cases where the owner written in the checkpoint
    was not the same as r->owner.
    
    - Set a few other flags in r->flags to help debug future problems.
    
    - Do more error checking and verifying of checkpointed state when
    checkpoints are being written and read.
    
    - Handle errors during plock syncing by disabling plock operations,
    instead of ignoring or running with incorrect state.
    
    bz 617306
    
    Signed-off-by: David Teigland <teigland@redhat.com>

diff --git a/group/dlm_controld/plock.c b/group/dlm_controld/plock.c
index d18d1f5..ca2d61a 100644
--- a/group/dlm_controld/plock.c
+++ b/group/dlm_controld/plock.c
@@ -32,7 +32,11 @@ struct pack_plock {
 	uint32_t pad;
 };
 
-#define R_GOT_UNOWN 0x00000001 /* have received owner=0 message */
+#define R_GOT_UNOWN   0x00000001 /* have received owner=0 message */
+#define R_SEND_UNOWN  0x00000002 /* have sent owner=0 message */
+#define R_SEND_OWN    0x00000004 /* have sent owner=our_nodeid message */
+#define R_PURGE_UNOWN 0x00000008 /* set owner=0 in purge */
+#define R_SEND_DROP   0x00000010
 
 struct resource {
 	struct list_head	list;	   /* list of resources */
@@ -966,6 +970,11 @@ static void send_own(struct lockspace *ls, struct resource *r, int owner)
 		return;
 	}
 
+	if (!owner)
+		r->flags |= R_SEND_UNOWN;
+	else
+		r->flags |= R_SEND_OWN;
+
 	memset(&info, 0, sizeof(info));
 	info.number = r->number;
 	info.nodeid = owner;
@@ -1016,6 +1025,7 @@ static void send_drop(struct lockspace *ls, struct resource *r)
 
 	memset(&info, 0, sizeof(info));
 	info.number = r->number;
+	r->flags |= R_SEND_DROP;
 
 	send_struct_info(ls, &info, DLM_MSG_PLOCK_DROP);
 }
@@ -1588,15 +1598,18 @@ void process_saved_plocks(struct lockspace *ls)
 /* locks still marked SYNCING should not go into the ckpt; the new node
    will get those locks by receiving PLOCK_SYNC messages */
 
-static void pack_section_buf(struct lockspace *ls, struct resource *r)
+static void pack_section_buf(struct lockspace *ls, struct resource *r,
+			     int owner)
 {
 	struct pack_plock *pp;
 	struct posix_lock *po;
 	struct lock_waiter *w;
 	int count = 0;
 
-	/* plocks on owned resources are not replicated on other nodes */
-	if (r->owner == our_nodeid)
+	/* plocks on owned resources are not replicated on other nodes;
+	   N.B. owner not always equal to r->owner */
+
+	if (cfgd_plock_ownership && (owner == our_nodeid))
 		return;
 
 	pp = (struct pack_plock *) &section_buf;
@@ -1646,6 +1659,17 @@ static int unpack_section_buf(struct lockspace *ls, char *numbuf, int buflen,
 
 	gettimeofday(&now, NULL);
 
+	sscanf(numbuf, "r%llu.%d", &num, &owner);
+
+#if 0
+	/* would be nice to always compile this, but it adds a lot of time */
+	r = search_resource(ls, num);
+	if (r) {
+		log_error("unpack %llu duplicate", num);
+		return -1;
+	}
+#endif
+
 	r = malloc(sizeof(struct resource));
 	if (!r)
 		return -ENOMEM;
@@ -1654,7 +1678,26 @@ static int unpack_section_buf(struct lockspace *ls, char *numbuf, int buflen,
 	INIT_LIST_HEAD(&r->waiters);
 	INIT_LIST_HEAD(&r->pending);
 
-	sscanf(numbuf, "r%llu.%d", &num, &owner);
+	if (!cfgd_plock_ownership) {
+		if (owner) {
+			log_error("unpack %llu bad owner %d count %d",
+				  (unsigned long long)num, owner, count);
+			free(r);
+			return -1;
+		}
+	} else {
+		if (!owner)
+			r->flags |= R_GOT_UNOWN;
+
+		/* no locks should be included for owned resources */
+
+		if (owner && count) {
+			log_error("unpack %llu owner %d bad count %d",
+				  (unsigned long long)num, owner, count);
+			free(r);
+			return -1;
+		}
+	}
 
 	r->number = num;
 	r->owner = owner;
@@ -1667,7 +1710,8 @@ static int unpack_section_buf(struct lockspace *ls, char *numbuf, int buflen,
 	for (i = 0; i < count; i++) {
 		if (!pp->waiter) {
 			po = malloc(sizeof(struct posix_lock));
-			// FIXME: handle failed malloc
+			if (!po)
+				return -ENOMEM;
 			po->start	= le64_to_cpu(pp->start);
 			po->end		= le64_to_cpu(pp->end);
 			po->owner	= le64_to_cpu(pp->owner);
@@ -1677,7 +1721,8 @@ static int unpack_section_buf(struct lockspace *ls, char *numbuf, int buflen,
 			list_add_tail(&po->list, &r->locks);
 		} else {
 			w = malloc(sizeof(struct lock_waiter));
-			// FIXME: handle failed malloc
+			if (!w)
+				return -ENOMEM;
 			w->info.start	= le64_to_cpu(pp->start);
 			w->info.end	= le64_to_cpu(pp->end);
 			w->info.owner	= le64_to_cpu(pp->owner);
@@ -1900,13 +1945,9 @@ void store_plocks(struct lockspace *ls, uint32_t *sig)
 		sleep(1);
 		goto open_retry;
 	}
-	if (rv == SA_AIS_ERR_EXIST) {
-		log_group(ls, "store_plocks ckpt already exists");
-		return;
-	}
 	if (rv != SA_AIS_OK) {
 		log_error("store_plocks ckpt open error %d %s", rv, ls->name);
-		return;
+		goto fail;
 	}
 
 	log_group(ls, "store_plocks open ckpt handle %llx",
@@ -1954,14 +1995,14 @@ void store_plocks(struct lockspace *ls, uint32_t *sig)
 		memset(&section_buf, 0, sizeof(section_buf));
 		section_len = 0;
 
-		pack_section_buf(ls, r);
+		pack_section_buf(ls, r, owner);
 
 		if (!r_num_first)
 			r_num_first = r->number;
 		r_num_last = r->number;
 
-		log_plock(ls, "store_plocks section size %u id %u \"%s\"",
-			  section_len, section_id.idLen, buf);
+		log_plock(ls, "wr sect ro %d rf %x len %u \"%s\"",
+			  r->owner, r->flags, section_len, buf);
 
 	 create_retry:
 		rv = saCkptSectionCreate(h, &section_attr, &section_buf,
@@ -1971,19 +2012,10 @@ void store_plocks(struct lockspace *ls, uint32_t *sig)
 			sleep(1);
 			goto create_retry;
 		}
-		if (rv == SA_AIS_ERR_EXIST) {
-			/* this shouldn't happen in general */
-			log_group(ls, "store_plocks clearing old ckpt");
-			/* do we need this close or will the close in
-			   the unlink function be ok? */
-			saCkptCheckpointClose(h);
-			_unlink_checkpoint(ls, &name);
-			goto open_retry;
-		}
 		if (rv != SA_AIS_OK) {
 			log_error("store_plocks ckpt section create err %d %s",
 				  rv, ls->name);
-			break;
+			goto fail;
 		}
 	}
  out:
@@ -2004,6 +2036,13 @@ void store_plocks(struct lockspace *ls, uint32_t *sig)
 	ls->checkpoint_r_num_last = r_num_last;
 	ls->checkpoint_r_count = r_count;
 	ls->checkpoint_p_count = p_count;
+	return;
+
+ fail:
+	ls->disable_plock = 1;
+	/* force the node receiving plocks to fail sig check and disable
+	   plocks as well */
+	*sig = 0xF0000000;
 }
 
 /* called by a node that's just been added to the group to get existing plock
@@ -2018,7 +2057,7 @@ void retrieve_plocks(struct lockspace *ls, uint32_t *sig)
 	SaNameT name;
 	SaAisErrorT rv;
 	char buf[SECTION_NAME_LEN];
-	int len, lock_count;
+	int len, lock_count, error;
 	uint32_t r_count = 0, p_count = 0;
 	uint64_t r_num, r_num_first = 0, r_num_last = 0;
 
@@ -2086,10 +2125,6 @@ void retrieve_plocks(struct lockspace *ls, uint32_t *sig)
 		memset(&buf, 0, sizeof(buf));
 		snprintf(buf, SECTION_NAME_LEN, "%s", desc.sectionId.id);
 
-		log_plock(ls, "retrieve_plocks section size %llu id %u \"%s\"",
-			  (unsigned long long)iov.dataSize, iov.sectionId.idLen,
-			  buf);
-
 	 read_retry:
 		rv = saCkptCheckpointRead(h, &iov, 1, NULL);
 		if (rv == SA_AIS_ERR_TRY_AGAIN) {
@@ -2107,8 +2142,9 @@ void retrieve_plocks(struct lockspace *ls, uint32_t *sig)
 		   no locks, which exist in ownership mode; the resource
 		   name and owner come from the section id */
 
-		log_plock(ls, "retrieve_plocks ckpt read %llu bytes",
-			  (unsigned long long)iov.readSize);
+		log_plock(ls, "rd sect len %llu \"%s\"",
+			  (unsigned long long)iov.readSize, buf);
+				
 		section_len = iov.readSize;
 
 		if (section_len % sizeof(struct pack_plock)) {
@@ -2120,8 +2156,12 @@ void retrieve_plocks(struct lockspace *ls, uint32_t *sig)
 		r_num = 0;
 		lock_count = 0;
 
-		unpack_section_buf(ls, (char *)desc.sectionId.id,
-				   desc.sectionId.idLen, &r_num, &lock_count);
+		error = unpack_section_buf(ls, (char *)desc.sectionId.id,
+					   desc.sectionId.idLen, &r_num,
+					   &lock_count);
+		if (error < 0)
+			continue;
+
 		r_count++;
 		p_count += lock_count;
 
@@ -2186,6 +2226,8 @@ void purge_plocks(struct lockspace *ls, int nodeid, int unmount)
 
 		if (r->owner == nodeid) {
 			r->owner = 0;
+			r->flags |= R_GOT_UNOWN;
+			r->flags |= R_PURGE_UNOWN;
 			send_pending_plocks(ls, r);
 		}