File 0025-Initial-version-of-the-AzEvents-RA.patch of Package resource-agents.11488

From 2512b39606b1264c913afc37ddcfb6ecfeb7b6af Mon Sep 17 00:00:00 2001
From: Tobias Niekamp <tniek@microsoft.com>
Date: Fri, 15 Jun 2018 21:37:01 -0700
Subject: [PATCH] Initial version of the AzEvents RA

This resource agent monitors the Metadata API inside a Microsoft Azure
VM. If scheduled (maintenance) events are found, it tries to stop the
resources gracefully to migrate them to other nodes in the cluster.
---
 heartbeat/AzEvents | 893 +++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 893 insertions(+)
 create mode 100644 heartbeat/AzEvents

diff --git a/heartbeat/AzEvents b/heartbeat/AzEvents
new file mode 100644
index 00000000..c12802e4
--- /dev/null
+++ b/heartbeat/AzEvents
@@ -0,0 +1,893 @@
+#!/usr/bin/python
+#
+#	Resource agent for monitoring Azure Scheduled Events
+#
+# 	License:	GNU General Public License (GPL)
+#	(c) 2018 	Tobias Niekamp, Microsoft Corp.
+#				and Linux-HA contributors
+
+import os, sys, time, subprocess
+import json
+import urllib, urllib2, socket
+import logging, syslog
+from enum import Enum
+from collections import defaultdict
+
+##############################################################################
+
+VERSION = "0.10"
+
+OCF_SUCCESS	= 0
+OCF_ERR_GENERIC = 1
+OCF_ERR_UNIMPLEMENTED = 3
+OCF_ERR_CONFIGURED = 6
+OCF_NOT_RUNNING = 7
+
+attr_globalPullState = "AzEvents_globalPullState"
+attr_lastDocVersion  = "AzEvents_lastDocVersion"
+attr_curNodeState = "AzEvents_curNodeState"
+attr_pendingEventIDs = "AzEvents_pendingEventIDs"
+
+default_loglevel = logging.INFO
+default_relevantEventTypes = set(["Reboot", "Redeploy"])
+
+global_pullMaxAttempts = 3
+global_pullDelaySecs = 1
+
+##############################################################################
+
+class SyslogLibHandler(logging.StreamHandler):
+	"""
+	A handler class that correctly push messages into syslog
+	"""
+	def emit(self, record):
+		syslog_level = {
+			logging.CRITICAL: syslog.LOG_CRIT,
+			logging.ERROR:    syslog.LOG_ERR,
+			logging.WARNING:  syslog.LOG_WARNING,
+			logging.INFO:     syslog.LOG_INFO,
+			logging.DEBUG:    syslog.LOG_DEBUG,
+			logging.NOTSET:   syslog.LOG_DEBUG,
+		}[record.levelno]
+		msg = self.format(record)
+		# take care of \x00 character
+		syslog.syslog(syslog_level, msg.replace("\x00", "\n"))
+		return
+
+##############################################################################
+
+class attrDict(defaultdict):
+	"""
+	A wrapper for accessing dict keys like an attribute
+	"""
+	def __init__(self, data):
+		super(attrDict, self).__init__(attrDict)
+		for d in data.keys():
+			self.__setattr__(d, data[d])
+
+	def __getattr__(self, key):
+		try:
+			return self[key]
+		except KeyError:
+			raise AttributeError(key)
+
+	def __setattr__(self, key, value):
+		self[key] = value
+
+##############################################################################
+
+class azHelper:
+	"""
+	Helper class for Azure's metadata API (including Scheduled Events)
+	"""
+	metadata_host = "http://169.254.169.254/metadata"
+	instance_api  = "instance"
+	events_api    = "scheduledevents"
+	api_version   = "2017-08-01"
+
+	@staticmethod
+	def _sendMetadataRequest(endpoint, postData=None):
+		"""
+		Send a request to Azure's Azure Metadata Service API
+		"""
+		url = "%s/%s?api-version=%s" % (azHelper.metadata_host, endpoint, azHelper.api_version)
+		logging.debug("_sendMetadataRequest: begin; endpoint = %s, postData = %s" % (endpoint, postData))
+		logging.debug("_sendMetadataRequest: url = %s" % url)
+
+		req = urllib2.Request(url, postData)
+		req.add_header("Metadata", "true")
+		resp = urllib2.urlopen(req)
+		data = resp.read()
+		logging.debug("_sendMetadataRequest: response = %s" % data)
+		if len(data) > 0:
+			data = json.loads(data)
+
+		logging.debug("_sendMetadataRequest: finished")
+		return data
+
+	@staticmethod
+	def getInstanceInfo():
+		"""
+		Fetch details about the current VM from Azure's Azure Metadata Service API
+		"""
+		logging.debug("getInstanceInfo: begin")
+
+		json = azHelper._sendMetadataRequest(azHelper.instance_api)
+		logging.info("getInstanceInfo: json = %s" % json)
+		logging.debug("getInstanceInfo: finished")
+
+		return attrDict(json["compute"])
+
+	@staticmethod
+	def pullScheduledEvents():
+		"""
+		Retrieve all currently scheduled events via Azure Metadata Service API
+		"""
+		logging.debug("pullScheduledEvents: begin")
+
+		json = azHelper._sendMetadataRequest(azHelper.events_api)
+		logging.info("pullScheduledEvents: json = %s" % json)
+
+		logging.debug("pullScheduledEvents: finished")
+		return attrDict(json)
+
+	@staticmethod
+	def forceEvents(eventIDs):
+		"""
+		Force a set of events to start immediately
+		"""
+		logging.debug("forceEvents: begin")
+
+		events = []
+		for e in eventIDs:
+			events.append({
+				"EventId": e,
+			})
+		postData = {
+			"StartRequests" : events
+		}
+		logging.info("forceEvents: postData = %s" % postData)
+		resp = azHelper._sendMetadataRequest(azHelper.events_api, postData=json.dumps(postData))
+
+		logging.debug("forceEvents: finished")
+		return
+
+##############################################################################
+
+class pcsHelper:
+	"""
+	Helper functions for Pacemaker control via crm
+	"""
+	@staticmethod
+	def _getLocation(node):
+		"""
+		Helper function to retrieve local/global attributes
+		"""
+		if node:
+			return ["--node", node]
+		else:
+			return ["--type", "crm_config"]
+
+	@staticmethod
+	def _exec(command, args):
+		"""
+		Helper function to execute a UNIX command
+		"""
+		logging.debug("_exec: begin; command = %s, args = %s" % (command, str(args)))
+
+		flatten = lambda *n: (str(e) for a in n
+    			for e in (flatten(*a) if isinstance(a, (tuple, list)) else (str(a),)))
+		command = list(flatten([command] + args))
+		logging.debug("_exec: cmd = %s" % " ".join(command))
+		try:
+			ret = subprocess.check_output(command)
+			logging.debug("_exec: return = %s" % ret)
+			return ret.rstrip()
+		except Exception:
+			logging.warning("_exec: %s" % sys.exc_info()[0])
+			return None
+
+	@staticmethod
+	def setAttr(key, value, node=None):
+		"""
+		Set the value of a specific global/local attribute in the Pacemaker cluster
+		"""
+		logging.debug("setAttr: begin; key = %s, value = %s, node = %s" % (key, value, node))
+
+		if value:
+			ret = pcsHelper._exec(
+				"crm_attribute",
+				["--name", key,
+				"--update", value,
+				pcsHelper._getLocation(node)])
+		else:
+			ret = pcsHelper._exec(
+				"crm_attribute",
+				["--name", key,
+				"--delete",
+				pcsHelper._getLocation(node)])
+
+		logging.debug("setAttr: finished")
+		return len(ret) == 0
+
+	@staticmethod
+	def getAttr(key, node=None):
+		"""
+		Retrieve a global/local attribute from the Pacemaker cluster
+		"""
+		logging.debug("getAttr: begin; key = %s, node = %s" % (key, node))
+
+		val = pcsHelper._exec(
+			"crm_attribute",
+			["--name", key,
+			"--query", "--quiet",
+			pcsHelper._getLocation(node)])
+		if not val:
+			ret = None
+		else:
+			ret = val if not val.isdigit() else int(val)
+
+		logging.debug("getAttr: finished")
+		return ret
+
+	@staticmethod
+	def getAllNodes():
+		"""
+		Get a list of hostnames for all nodes in the Pacemaker cluster
+		"""
+		logging.debug("getAllNodes: begin")
+
+		nodes = []
+		nodeList = pcsHelper._exec(
+			"crm_node",
+			["--list"])
+		for n in nodeList.split("\n"):
+			nodes.append(n.split()[1])
+		logging.debug("getAllNodes: finished; return %s" % str(nodes))
+
+		return nodes
+
+	@staticmethod
+	def getHostNameFromAzName(azName):
+		"""
+		Helper function to get the actual host name from an Azure node name
+		"""
+		return pcsHelper.getAttr("hostName_%s" % azName)
+
+	@staticmethod
+	def removeHoldFromNodes():
+		"""
+		Remove the ON_HOLD state from all nodes in the Pacemaker cluster
+		"""
+		logging.debug("removeHoldFromNodes: begin")
+
+		for n in pcsHelper.getAllNodes():
+			if pcsHelper.getAttr(attr_curNodeState, node=n) == pcsNodeState.ON_HOLD.name:
+				pcsHelper.setAttr(attr_curNodeState, pcsNodeState.AVAILABLE.name, node=n)
+				logging.info("removeHoldFromNodes: removed ON_HOLD from node %s" % n)
+
+		logging.debug("removeHoldFromNodes: finished")
+		return False
+
+	@staticmethod
+	def otherNodesAvailable(exceptNode):
+		"""
+		Check if there are any nodes (except a given node) in the Pacemaker cluster that have state AVAILABLE
+		"""
+		logging.debug("otherNodesAvailable: begin; exceptNode = %s" % exceptNode)
+
+		for n in pcsHelper.getAllNodes():
+			state = pcsHelper.getAttr(attr_curNodeState, node=n)
+			if state:
+				state = pcsNodeState[state]
+			else:
+				state = pcsNodeState.AVAILABLE
+			if state == pcsNodeState.AVAILABLE and n != exceptNode.hostName:
+				logging.info("otherNodesAvailable: at least %s is available" % n)
+				logging.debug("otherNodesAvailable: finished")
+				return True
+		logging.info("otherNodesAvailable: no other nodes are available")
+		logging.debug("otherNodesAvailable: finished")
+
+		return False
+
+	@staticmethod
+	def transitionSummary():
+		"""
+		Get the current Pacemaker transition summary (used to check if all resources are stopped when putting a node standby)
+		"""
+		# <tniek> Is a global crm_simulate "too much"? Or would it be sufficient it there are no planned transitions for a particular node?
+		# # crm_simulate -Ls
+		# 	Transition Summary:
+		# 	 * Promote rsc_SAPHana_HN1_HDB03:0      (Slave -> Master hsr3-db1)
+		# 	 * Stop    rsc_SAPHana_HN1_HDB03:1      (hsr3-db0)
+		# 	 * Move    rsc_ip_HN1_HDB03     (Started hsr3-db0 -> hsr3-db1)
+		# 	 * Start   rsc_nc_HN1_HDB03     (hsr3-db1)
+		# # Excepted result when there are no pending actions:
+		# 	Transition Summary:
+		logging.debug("transitionSummary: begin")
+
+		summary = pcsHelper._exec(
+			"crm_simulate",
+			["-Ls"]
+			)
+		if not summary:
+			logging.warning("transitionSummary: could not load transition summary")
+			return False
+		if summary.find("Transition Summary:") < 0:
+			logging.warning("transitionSummary: received unexpected transition summary: %s" % summary)
+			return False
+		summary = summary.split("Transition Summary:")[1]
+		ret = summary.split("\n").pop(0)
+
+		logging.debug("transitionSummary: finished; return = %s" % str(ret))
+		return ret
+
+	@staticmethod
+	def listOperationsOnNode(node):
+		"""
+		Get a list of all current operations for a given node (used to check if any resources are pending)
+		"""
+		# hsr3-db1:/home/tniek # crm_resource --list-operations -N hsr3-db0
+		# rsc_AzEvents    (ocf::heartbeat:AzEvents):      Started: rsc_AzEvents_start_0 (node=hsr3-db0, call=91, rc=0, last-rc-change=Fri Jun  8 22:37:46 2018, exec=115ms): complete
+		# rsc_AzEvents    (ocf::heartbeat:AzEvents):      Started: rsc_AzEvents_monitor_10000 (node=hsr3-db0, call=93, rc=0, last-rc-change=Fri Jun  8 22:37:47 2018, exec=197ms): complete
+		# rsc_SAPHana_HN1_HDB03   (ocf::suse:SAPHana):    Master: rsc_SAPHana_HN1_HDB03_start_0 (node=hsr3-db0, call=-1, rc=193, last-rc-change=Fri Jun  8 22:37:46 2018, exec=0ms): pending
+		# rsc_SAPHanaTopology_HN1_HDB03   (ocf::suse:SAPHanaTopology):    Started: rsc_SAPHanaTopology_HN1_HDB03_start_0 (node=hsr3-db0, call=90, rc=0, last-rc-change=Fri Jun  8 22:37:46 2018, exec=3214ms): complete
+		logging.debug("listOperationsOnNode: begin; node = %s" % node)
+
+		resources = pcsHelper._exec(
+			"crm_resource",
+			["--list-operations",
+			"-N", node]
+			)
+		if len(resources) == 0:
+			ret = []
+		else:
+			ret = resources.split("\n")
+
+		logging.debug("listOperationsOnNode: finished; return = %s" % str(ret))
+		return ret
+
+	@staticmethod
+	def noPendingResourcesOnNode(node):
+		"""
+		Check that there are no pending resources on a given node
+		"""
+		logging.debug("noPendingResourcesOnNode: begin; node = %s" % node)
+
+		for r in pcsHelper.listOperationsOnNode(node):
+			logging.debug("noPendingResourcesOnNode: * %s" % r)
+			resource = r.split()[-1]
+			if resource == "pending":
+				logging.info("noPendingResourcesOnNode: found resource %s that is still pending" % resource)
+				logging.debug("noPendingResourcesOnNode: finished; return = False")
+				return False
+		logging.info("noPendingResourcesOnNode: no pending resources on node %s" % node)
+		logging.debug("noPendingResourcesOnNode: finished; return = True")
+
+		return True
+
+	@staticmethod
+	def allResourcesStoppedOnNode(node):
+		"""
+		Check that all resources on a given node are stopped
+		"""
+		logging.debug("allResourcesStoppedOnNode: begin; node = %s" % node)
+
+		if pcsHelper.noPendingResourcesOnNode(node):
+			if len(pcsHelper.transitionSummary()) == 0:
+				logging.info("allResourcesStoppedOnNode: no pending resources on node %s and empty transition summary" % node)
+				logging.debug("allResourcesStoppedOnNode: finished; return = True")
+				return True
+			else:
+				logging.info("allResourcesStoppedOnNode: transition summary is not empty")
+				logging.debug("allResourcesStoppedOnNode: finished; return = False")
+				return False
+
+		logging.info("allResourcesStoppedOnNode: still pending resources on node %s" % node)
+		logging.debug("allResourcesStoppedOnNode: finished; return = False")
+		return False		
+
+##############################################################################
+
+class pcsNodeState(Enum):
+	AVAILABLE = 0	# Node is online and ready to handle events
+	STOPPING = 1	# Standby has been triggered, but some resources are still running
+	IN_EVENT = 2	# All resources are stopped, and event has been initiated via Azure Metadata Service
+	ON_HOLD = 3		# Node has a pending event that cannot be started there are no other nodes available
+
+##############################################################################
+
+class pcsNode:
+	"""
+	Core class implementing logic for a cluster node
+	"""
+	def __init__(self, ra):
+		self.raOwner  = ra 
+		self.azInfo   = azHelper.getInstanceInfo()
+		self.azName   = self.azInfo.name
+		self.hostName = socket.gethostname()
+		self.setAttr("azName", self.azName)
+		pcsHelper.setAttr("hostName_%s" % self.azName, self.hostName)
+
+	def getAttr(self, key):
+		"""
+		Get a local attribute
+		"""
+		return pcsHelper.getAttr(key, node=self.hostName)
+
+	def setAttr(self, key, value):
+		"""
+		Set a local attribute
+		"""
+		return pcsHelper.setAttr(key, value, node=self.hostName)
+
+	def selfOrOtherNode(self, node):
+		"""
+		Helper function to distinguish self/other node
+		"""
+		if not node:
+		 	return self.hostName
+		else:
+			return node
+
+	def setState(self, state, node=None):
+		"""
+		Set the state for a given node (or self)
+		"""
+		node = self.selfOrOtherNode(node)
+		logging.debug("setState: begin; node = %s, state = %s" % (node, state.name))
+
+		pcsHelper.setAttr(attr_curNodeState, state.name, node=node)
+
+		logging.debug("setState: finished")
+		return
+
+	def getState(self, node=None):
+		"""
+		Get the state for a given node (or self)
+		"""
+		node = self.selfOrOtherNode(node)
+		logging.debug("getState: begin; node = %s" % node)
+
+		state = pcsHelper.getAttr(attr_curNodeState, node=node)
+		logging.debug("getState: state = %s" % state)
+		logging.debug("getState: finished")
+		if not state:
+			return pcsNodeState(pcsNodeState.AVAILABLE)
+		else:
+			return pcsNodeState[state]
+
+	def setEventIDs(self, eventIDs, node=None):
+		"""
+		Set pending EventIDs for a given node (or self)
+		"""
+		node = self.selfOrOtherNode(node)
+		logging.debug("setEventIDs: begin; node = %s, eventIDs = %s" % (node, str(eventIDs)))
+
+		if eventIDs:
+			eventIDStr = ",".join(eventIDs)
+		else:
+			eventIDStr = None
+		pcsHelper.setAttr(attr_pendingEventIDs, eventIDStr, node=node)
+
+		logging.debug("setEventIDs: finished")
+		return
+
+	def getEventIDs(self, node=None):
+		"""
+		Get pending EventIDs for a given node (or self)
+		"""
+		node = self.selfOrOtherNode(node)
+		logging.debug("getEventIDs: begin; node = %s" % node)
+
+		eventIDStr = pcsHelper.getAttr(attr_pendingEventIDs, node=node)
+		if eventIDStr:
+			eventIDs = eventIDStr.split(",")
+		else:
+			eventIDs = None
+
+		logging.debug("getEventIDs: finished; eventIDs = %s" % str(eventIDs))
+		return eventIDs
+
+	def updateNodeStateAndEvents(self, state, eventIDs, node=None):
+		"""
+		Set the state and pending EventIDs for a given node (or self)
+		"""
+		logging.debug("updateNodeStateAndEvents: begin; node = %s, state = %s, eventIDs = %s" % (node, state.name, str(eventIDs)))
+
+		self.setState(state, node=node)
+		self.setEventIDs(eventIDs, node=node)
+
+		logging.debug("updateNodeStateAndEvents: finished")
+		return state
+
+	def putNodeStandby(self, node=None):
+		"""
+		Put self to standby
+		"""
+		node = self.selfOrOtherNode(node)
+		logging.debug("putNodeStandby: begin; node = %s" % node)
+
+		pcsHelper._exec(
+				"crm",
+				["node",
+				"standby",
+				node]
+				)
+
+		logging.debug("putNodeStandby: finished")
+		return
+
+	def putNodeOnline(self, node=None):
+		"""
+		Put self back online
+		"""
+		node = self.selfOrOtherNode(node)
+		logging.debug("putNodeOnline: begin; node = %s" % node)
+
+		pcsHelper._exec(
+				"crm",
+				["node",
+				"online",
+				node]
+				)
+
+		logging.debug("putNodeOnline: finished")
+		return
+
+	def separateEvents(self, events):
+		"""
+		Split own/other nodes' events
+		"""
+		logging.debug("separateEvents: begin; events = %s" % str(events))
+
+		localEvents = []
+		remoteEvents = []
+		for e in events:
+			e = attrDict(e)
+			if e.EventType not in self.raOwner.config.relevantEventTypes:
+				continue
+			if self.azName in e.Resources:
+				localEvents.append(e)
+			else:
+				remoteEvents.append(e)
+		logging.debug("separateEvents: finished; localEvents = %s, remoteEvents = %s" % (str(localEvents), str(remoteEvents)))
+		return (localEvents, remoteEvents)
+
+	def removeOrphanedEvents(self, azEvents):
+		"""
+		Remove remote events that are already finished
+		"""
+		logging.debug("removeOrphanedEvents: begin; azEvents = %s" % str(azEvents))
+
+		azEventIDs = set()
+		for e in azEvents:
+			azEventIDs.add(e.EventId)
+		# for all nodes except self ...
+		for n in pcsHelper.getAllNodes():
+			if n == self.hostName:
+				continue
+			curState = self.getState(node=n)
+			# ... that still show in an event or shutting down resources ...
+			if curState in (pcsNodeState.STOPPING, pcsNodeState.IN_EVENT):
+				logging.info("removeOrphanedEvents: node %s has state %s" % (n, curState))
+				pcsEventIDs = self.getEventIDs(node=n)
+				stillActive = False
+				# ... but don't have any more events running according to Azure, ...
+				for p in pcsEventIDs:
+					if p in azEventIDs:
+						logging.info("removeOrphanedEvents: (at least) event %s on node %s has not yet finished" % (str(p), n))
+						stillActive = True
+						break
+				if not stillActive:
+					# ... put them back online.
+					logging.info("removeOrphanedEvents: pcsEvents %s on node %s are not in azEvents %s -> bring node back online" % (str(pcsEventIDs), n, str(azEventIDs)))
+					self.putNodeOnline(node=n)
+
+		logging.debug("removeOrphanedEvents: finished")
+		return
+
+	def handleRemoteEvents(self, azEvents):
+		"""
+		Handle a list of events (as provided by Azure Metadata Service) for other nodes
+		"""
+		logging.debug("handleRemoteEvents: begin; hostName = %s, events = %s" % (self.hostName, str(azEvents)))
+
+		if len(azEvents) == 0:
+			logging.info("handleRemoteEvents: no remote events to handle")
+			logging.debug("handleRemoteEvents: finished")
+			return
+		eventIDsForNode = {}
+
+		# iterate through all current events as per Azure
+		for e in azEvents:
+			logging.info("handleRemoteEvents: handling remote event %s (%s; nodes = %s)" % (e.EventId, e.EventType, str(e.Resources)))
+			# before we can force an event to start, we need to ensure all nodes involved have stopped their resources
+			if e.EventStatus == "Scheduled":
+				allNodesStopped = True
+				for azName in e.Resources:
+					hostName = pcsHelper.getHostNameFromAzName(azName)
+					state = self.getState(node=hostName)
+					if state == pcsNodeState.STOPPING:
+						# the only way we can continue is when node state is STOPPING, but all resources have been stopped
+						if not pcsHelper.allResourcesStoppedOnNode(hostName):
+							logging.info("handleRemoteEvents: (at least) node %s has still resources running -> wait" % hostName)
+							allNodesStopped = False
+							break
+					elif state in (pcsNodeState.AVAILABLE, pcsNodeState.IN_EVENT, pcsNodeState.ON_HOLD):
+						logging.info("handleRemoteEvents: node %s is still %s -> remote event needs to be picked up locally" % (hostName, state.name))
+						allNodesStopped = False
+						break
+				if allNodesStopped:
+					logging.info("handleRemoteEvents: nodes %s are stopped -> add remote event %s to force list" % (str(e.Resources), e.EventId))
+					for n in e.Resources:
+						hostName = pcsHelper.getHostNameFromAzName(n)
+						if eventIDsForNode.has_key(hostName):
+							eventIDsForNode[hostName].append(e.EventId)
+						else:
+							eventIDsForNode[hostName] = [e.EventId]
+			elif e.EventStatus == "Started":
+				logging.info("handleRemoteEvents: remote event already started")
+
+		# force the start of all events whose nodes are ready (i.e. have no more resources running)	
+		if len(eventIDsForNode.keys()) > 0:
+			eventIDsToForce = set([item for sublist in eventIDsForNode.values() for item in sublist])
+			logging.info("handleRemoteEvents: set nodes %s to IN_EVENT; force remote events %s" % (str(eventIDsForNode.keys()), str(eventIDsToForce)))
+			for n in eventIDsForNode.keys():
+				self.updateNodeStateAndEvents(pcsNodeState.IN_EVENT, eventIDsForNode[n], node=n)
+			azHelper.forceEvents(eventIDsToForce)
+
+		logging.debug("handleRemoteEvents: finished")
+		return
+
+	def handleLocalEvents(self, azEvents):
+		"""
+		Handle a list of own events (as provided by Azure Metadata Service)
+		"""
+		logging.debug("handleLocalEvents: begin; hostName = %s, azEvents = %s" % (self.hostName, str(azEvents)))
+
+		azEventIDs = set()
+		for e in azEvents:
+			azEventIDs.add(e.EventId)
+
+		curState = self.getState()
+		pcsEventIDs = self.getEventIDs()
+		mayUpdateDocVersion = False
+		logging.info("handleLocalEvents: current state = %s; pending local pcsEvents = %s" % (curState.name, str(pcsEventIDs)))
+		
+		# check if there are currently/still events set for the node
+		if pcsEventIDs:
+			# there are pending events set, so our state must be STOPPING or IN_EVENT
+			i = 0; touchedEventIDs = False
+			while i < len(pcsEventIDs):
+				# clean up pending events that are already finished according to AZ 
+				if pcsEventIDs[i] not in azEventIDs:
+					logging.info("handleLocalEvents: remove finished local pcsEvent %s" % (pcsEventIDs[i]))
+					pcsEventIDs.pop(i)
+					touchedEventIDs = True
+				else:
+					i += 1
+			if len(pcsEventIDs) > 0:
+				# there are still pending events (either because we're still stopping, or because the event is still in place)
+				# either way, we need to wait
+				if touchedEventIDs:
+					logging.info("handleLocalEvents: added new local pcsEvent %s" % str(pcsEventIDs))
+					self.setEventIDs(pcsEventIDs)
+				else:
+					logging.info("handleLocalEvents: no local pcsEvents were updated")
+			else:
+				# there are no more pending events left after cleanup
+				if pcsHelper.noPendingResourcesOnNode(self.hostName): 
+					# and no pending resources on the node -> set it back online
+					logging.info("handleLocalEvents: all local events finished -> clean up, put node online and AVAILABLE")
+					curState = self.updateNodeStateAndEvents(pcsNodeState.AVAILABLE, None)
+					self.putNodeOnline()
+					pcsHelper.removeHoldFromNodes()
+					# repeat handleLocalEvents() since we changed status to AVAILABLE		
+				else:
+					logging.info("handleLocalEvents: all local events finished, but some resources have not completed startup yet -> wait")
+		else:
+			# there are no pending events set for us (yet)
+			if curState == pcsNodeState.AVAILABLE:
+				if len(azEventIDs) > 0:
+					if pcsHelper.otherNodesAvailable(self):
+						logging.info("handleLocalEvents: can handle local events %s -> set state STOPPING" % (str(azEventIDs)))
+						# this will also set mayUpdateDocVersion = True
+						curState = self.updateNodeStateAndEvents(pcsNodeState.STOPPING, azEventIDs)
+					else:
+						logging.info("handleLocalEvents: cannot handle azEvents %s (only node available) -> set state ON_HOLD" % str(azEventIDs))			
+						self.setState(pcsNodeState.ON_HOLD)
+				else:
+					logging.info("handleLocalEvents: no local azEvents to handle")
+			if curState == pcsNodeState.STOPPING:
+				if pcsHelper.noPendingResourcesOnNode(self.hostName):
+					logging.info("handleLocalEvents: all local resources are started properly -> put node standby")
+					self.putNodeStandby()
+					mayUpdateDocVersion = True
+				else:
+					logging.info("handleLocalEvents: some local resources are not clean yet -> wait")
+
+		logging.debug("handleLocalEvents: finished; mayUpdateDocVersion = %s" % str(mayUpdateDocVersion))
+		return mayUpdateDocVersion
+
+##############################################################################
+
+class raGlobalPullState(Enum):
+	"""
+	Pull state to avoid two AzEvents resource agents pulling from Azure Metadata Service API concurrently
+	"""
+	IDLE = 0
+	PULLING = 1
+
+##############################################################################
+
+class raConfig:
+	verbose = None
+	relevantEventTypes = default_relevantEventTypes
+
+##############################################################################
+
+class raAzEvents:
+	"""
+	Main class for resource agent
+	"""	
+	def __init__(self, config):
+		self.node = pcsNode(self)
+		self.config = config
+
+	def monitor(self):
+		logging.debug("monitor: begin")
+
+		pullFailedAttemps = 0
+		while True:
+			# check if another node is pulling at the same time;
+			# this should only be a concern for the first pull, as setting up Scheduled Events may take up to 2 minutes.
+			if pcsHelper.getAttr(attr_globalPullState) == raGlobalPullState.PULLING.name:
+				pullFailedAttemps += 1
+				if pullFailedAttemps == global_pullMaxAttempts:
+					logging.warning("monitor: exceeded maximum number of attempts (%d) to pull events" % global_pullMaxAttempts)
+					logging.debug("monitor: finished")
+					return OCF_SUCCESS
+				else:
+					logging.info("monitor: another node is pulling; retry in %d seconds" % global_pullDelaySecs)
+					time.sleep(global_pullDelaySecs)
+					continue
+
+			# we can pull safely from Azure Metadata Service
+			pcsHelper.setAttr(attr_globalPullState, raGlobalPullState.PULLING.name)
+			events = azHelper.pullScheduledEvents()
+			pcsHelper.setAttr(attr_globalPullState, raGlobalPullState.IDLE.name)
+
+			# get current document version
+			curDocVersion  = events.DocumentIncarnation
+			lastDocVersion = self.node.getAttr(attr_lastDocVersion)
+			logging.info("monitor: lastDocVersion = %s; curDocVersion = %s" % (lastDocVersion, curDocVersion))
+
+			# split events local/remote
+			(localEvents, remoteEvents) = self.node.separateEvents(events.Events)
+
+			# ensure local events are only executing once
+			if curDocVersion != lastDocVersion:
+				logging.info("monitor: curDocVersion has not been handled yet")
+				# handleLocalEvents() returns True if mayUpdateDocVersion is True;
+				# this is only the case if we can ensure there are no pending events
+				if self.node.handleLocalEvents(localEvents):
+					logging.info("monitor: handleLocalEvents completed successfully -> update curDocVersion")
+					self.node.setAttr(attr_lastDocVersion, curDocVersion)
+				else:
+					logging.info("monitor: handleLocalEvents still waiting -> keep curDocVersion")
+			else:
+				logging.info("monitor: already handled curDocVersion, skip")
+
+			# remove orphaned remote events and then handle the remaining remote events
+			self.node.removeOrphanedEvents(remoteEvents)
+			self.node.handleRemoteEvents(remoteEvents)
+			break
+
+		logging.debug("monitor: finished")
+		return OCF_SUCCESS
+
+##############################################################################
+##############################################################################
+
+def help():
+	print("""This resource agent implements a monitor for scheduled
+(maintenance) events for a Microsoft Azure VM.
+
+If any relevant events are found, it moves all Pacemaker resources
+away from the affected node to allow for a graceful shutdown.
+
+	Usage:
+		AzEvents <action> [eventTypes=<val>] [verbose=<val>]
+
+		action (required): Supported values: monitor, help, meta-data
+		eventTypes (optional): List of event types to be considered
+				relevant by the resource agent (comma-separated).
+				Supported values: Freeze,Reboot,Redeploy
+				Default = Reboot,Redeploy
+		verbose (optional): If set to true, displays debug info.
+				Default = false
+
+	Deployment:
+		crm configure primitive rsc_AzEvents ocf:heartbeat:AzEvents \
+			op monitor interval=10s
+		crm configure clone cln_AzEvents rsc_AzEvents
+
+For further information on Microsoft Azure Scheduled Events, please
+refer to the following documentation:
+https://docs.microsoft.com/en-us/azure/virtual-machines/linux/scheduled-events""")
+
+def metadata():
+	print("""<?xml version="1.0"?>
+<!DOCTYPE resource-agent SYSTEM "ra-api-1.dtd">
+<resource-agent name="AzEvents">
+<version>%s</version>
+<shortdesc lang="en">Resource agent to handle Microsoft Azure Scheduled Events</shortdesc>
+<longdesc lang="en">
+The AzEvents resource agent is to be used nodes inside a Pacemaker cluster that run Microsoft Azure. It periodically checks if maintenance events (for example, reboots or redploys) are scheduled and takes preemptive action by moving all resources away from the affected node.
+</longdesc>
+<parameters>
+    <parameter name="eventTypes" unique="0" required="0">
+        <longdesc lang="en">A comma-separated list of event types that will be handled by this resource agent. (Possible values: Freeze,Reboot,Redeploy; Default = Reboot,Redeploy)</longdesc>
+        <shortdesc lang="en">List of resources to be considered</shortdesc>
+        <content type="string" default="" />
+    </parameter>
+</parameters>
+<actions>
+	<action name="start" timeout="5" />
+	<action name="stop" timeout="5" />
+	<action name="monitor" timeout="240" interval="10" depth="0" />
+	<action name="meta-data" timeout="5" />
+</actions>
+</resource-agent>""" % VERSION)
+
+def getConfig():
+	# get resource agent config via env variables
+	config = raConfig()
+	verbose = os.environ.get("OCF_RESKEY_verbose")
+	if verbose and verbose.lower() == "true":
+		config.verbose = True
+	relevantEventTypes = os.environ.get("OCF_RESKEY_eventTypes")
+	if relevantEventTypes:
+		config.relevantEventTypes = set(relevantEventTypes.split(","))
+	return config
+
+def setLoglevel(verbose):
+	# set up writing into syslog
+	if verbose:
+		opener = urllib2.build_opener(urllib2.HTTPHandler(debuglevel = 1))
+		urllib2.install_opener(opener)
+		loglevel = logging.DEBUG		
+	else:
+		loglevel = default_loglevel
+	logging.getLogger().setLevel(loglevel)
+	logging.getLogger().addHandler(SyslogLibHandler())	
+	logging.getLogger().addHandler(logging.StreamHandler(sys.stderr))
+	return
+
+def main():
+	config = getConfig()
+	setLoglevel(config.verbose)
+
+	result = OCF_ERR_UNIMPLEMENTED
+	action = sys.argv[1].lower() if len(sys.argv) > 1 else None
+	logging.debug("main: begin; action = %s" % action)
+	if action == "meta-data":
+		result = metadata()
+	elif action == "help":
+		help()
+	elif action:
+		ra = raAzEvents(config)
+		if action == "monitor":
+			result = ra.monitor()
+		elif action in ("start", "stop"):
+			result = OCF_SUCCESS
+		else:
+			logging.error("main: Unsupported action %s" % action)
+
+	logging.debug("main: finished; result = %s" % result)
+	sys.exit(result)
+
+if __name__ == '__main__':
+	main()
\ No newline at end of file
-- 
2.16.4

openSUSE Build Service is sponsored by