File pacemaker.c of Package libdlm
#include <syslog.h>
#include "config.h"
#include "dlm_daemon.h"
#include <glib.h>
#include <bzlib.h>
#include <heartbeat/ha_msg.h>
#include <pacemaker/crm_config.h>
#include <pacemaker/crm/crm.h>
#include <pacemaker/crm/ais.h>
/* heartbeat support is irrelevant here */
#undef SUPPORT_HEARTBEAT
#define SUPPORT_HEARTBEAT 0
#include <pacemaker/crm/common/cluster.h>
#define COMMS_DIR "/sys/kernel/config/dlm/cluster/comms"
int setup_ccs(void)
{
/* To avoid creating an additional place for the dlm to be configured,
* only allow configuration from the command-line until CoroSync is stable
* enough to be used with Pacemaker
*/
cfgd_groupd_compat = 0; /* always use libcpg and disable backward compatability */
return 0;
}
void close_ccs(void) { return; }
int get_weight(int nodeid, char *lockspace) { return 1; }
/* TODO: Make this configurable
* Can't use logging.c as-is as whitetank exposes a different logging API
*/
void init_logging(void) {
openlog("cluster-dlm", LOG_PERROR|LOG_PID|LOG_CONS|LOG_NDELAY, LOG_DAEMON);
/* cl_log_enable_stderr(TRUE); */
}
void setup_logging(void) { return; }
void close_logging(void) {
closelog();
}
extern int ais_fd_async;
int local_node_id = 0;
char *local_node_uname = NULL;
void dlm_process_node(gpointer key, gpointer value, gpointer user_data);
int setup_cluster(void)
{
int retries = 0;
int rc = SA_AIS_OK;
struct utsname name;
crm_peer_init();
if(local_node_uname == NULL) {
if(uname(&name) < 0) {
cl_perror("uname(2) call failed");
exit(100);
}
local_node_uname = crm_strdup(name.nodename);
log_debug("Local node name: %s", local_node_uname);
}
/* 16 := CRM_SERVICE */
retry:
log_debug("Creating connection to our AIS plugin");
rc = saServiceConnect (&ais_fd_sync, &ais_fd_async, CRM_SERVICE);
if (rc != SA_AIS_OK) {
log_error("Connection to our AIS plugin (%d) failed: %s (%d)", CRM_SERVICE, ais_error2text(rc), rc);
}
switch(rc) {
case SA_AIS_OK:
break;
case SA_AIS_ERR_TRY_AGAIN:
if(retries < 30) {
sleep(1);
retries++;
goto retry;
}
log_error("Retry count exceeded");
return 0;
default:
return 0;
}
log_debug("AIS connection established");
{
int pid = getpid();
char *pid_s = crm_itoa(pid);
send_ais_text(0, pid_s, TRUE, NULL, crm_msg_ais);
crm_free(pid_s);
}
/* Sign up for membership updates */
send_ais_text(crm_class_notify, "true", TRUE, NULL, crm_msg_ais);
/* Requesting the current list of known nodes */
send_ais_text(crm_class_members, __FUNCTION__, TRUE, NULL, crm_msg_ais);
our_nodeid = get_ais_nodeid();
log_debug("Local node id: %d", our_nodeid);
return ais_fd_async;
}
static void statechange(void)
{
static uint64_t last_membership = 0;
cluster_quorate = crm_have_quorum;
if(last_membership < crm_peer_seq) {
log_debug("Processing membership %llu", crm_peer_seq);
g_hash_table_foreach(crm_peer_cache, dlm_process_node, &last_membership);
last_membership = crm_peer_seq;
}
}
void update_cluster(void)
{
statechange();
}
void process_cluster(int ci)
{
/* ci ::= client number */
char *data = NULL;
char *uncompressed = NULL;
AIS_Message *msg = NULL;
SaAisErrorT rc = SA_AIS_OK;
mar_res_header_t *header = NULL;
static int header_len = sizeof(mar_res_header_t);
header = malloc(header_len);
memset(header, 0, header_len);
errno = 0;
rc = saRecvRetry(ais_fd_async, header, header_len);
if (rc != SA_AIS_OK) {
cl_perror("Receiving message header failed: (%d) %s", rc, ais_error2text(rc));
goto bail;
} else if(header->size == header_len) {
log_error("Empty message: id=%d, size=%d, error=%d, header_len=%d",
header->id, header->size, header->error, header_len);
goto done;
} else if(header->size == 0 || header->size < header_len) {
log_error("Mangled header: size=%d, header=%d, error=%d",
header->size, header_len, header->error);
goto done;
} else if(header->error != 0) {
log_error("Header contined error: %d", header->error);
}
header = realloc(header, header->size);
/* Use a char* so we can store the remainder into an offset */
data = (char*)header;
errno = 0;
rc = saRecvRetry(ais_fd_async, data+header_len, header->size - header_len);
msg = (AIS_Message*)data;
if (rc != SA_AIS_OK) {
cl_perror("Receiving message body failed: (%d) %s", rc, ais_error2text(rc));
goto bail;
}
data = msg->data;
if(msg->is_compressed && msg->size > 0) {
int rc = BZ_OK;
unsigned int new_size = msg->size;
if(check_message_sanity(msg, NULL) == FALSE) {
goto badmsg;
}
log_debug("Decompressing message data");
uncompressed = malloc(new_size);
memset(uncompressed, 0, new_size);
rc = BZ2_bzBuffToBuffDecompress(
uncompressed, &new_size, data, msg->compressed_size, 1, 0);
if(rc != BZ_OK) {
log_error("Decompression failed: %d", rc);
goto badmsg;
}
CRM_ASSERT(rc == BZ_OK);
CRM_ASSERT(new_size == msg->size);
data = uncompressed;
} else if(check_message_sanity(msg, data) == FALSE) {
goto badmsg;
} else if(safe_str_eq("identify", data)) {
int pid = getpid();
char *pid_s = crm_itoa(pid);
send_ais_text(0, pid_s, TRUE, NULL, crm_msg_ais);
crm_free(pid_s);
goto done;
}
if(msg->header.id == crm_class_members) {
xmlNode *xml = string2xml(data);
if(xml != NULL) {
const char *value = crm_element_value(xml, "id");
if(value) {
crm_peer_seq = crm_int_helper(value, NULL);
}
log_debug("Updating membership %llu", crm_peer_seq);
/* crm_log_xml_info(xml, __PRETTY_FUNCTION__); */
xml_child_iter(xml, node, crm_update_ais_node(node, crm_peer_seq));
crm_calculate_quorum();
statechange();
free_xml(xml);
} else {
log_error("Invalid peer update: %s", data);
}
} else {
log_error("Unexpected AIS message type: %d", msg->header.id);
}
done:
free(uncompressed);
free(msg);
return;
badmsg:
log_error("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
" min=%d, total=%d, size=%d, bz2_size=%d",
msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
msg->sender.pid, (int)sizeof(AIS_Message),
msg->header.size, msg->size, msg->compressed_size);
goto done;
bail:
log_error("AIS connection failed");
return;
}
void close_cluster(void) {
/* TODO: Implement something for this */
return;
}
#include <arpa/inet.h>
#include <openais/totem/totemip.h>
void dlm_process_node(gpointer key, gpointer value, gpointer user_data)
{
int rc = 0;
struct stat tmp;
char path[PATH_MAX];
crm_node_t *node = value;
uint64_t *last = user_data;
const char *action = "Skipped";
gboolean do_add = FALSE;
gboolean do_remove = FALSE;
gboolean is_active = FALSE;
memset(path, 0, PATH_MAX);
snprintf(path, PATH_MAX, "%s/%d", COMMS_DIR, node->id);
rc = stat(path, &tmp);
is_active = crm_is_member_active(node);
if(rc == 0 && is_active) {
/* nothing to do?
* maybe the node left and came back...
*/
} else if(rc == 0) {
do_remove = TRUE;
} else if(is_active) {
do_add = TRUE;
}
if(do_remove) {
action = "Removed";
del_configfs_node(node->id);
}
if(do_add) {
char *addr_copy = strdup(node->addr);
char *addr_top = addr_copy;
char *addr = NULL;
if(do_remove) {
action = "Re-added";
} else {
action = "Added";
}
if(local_node_id == 0) {
crm_node_t *local_node = g_hash_table_lookup(
crm_peer_cache, local_node_uname);
local_node_id = local_node->id;
}
do {
char ipaddr[1024];
int addr_family = AF_INET;
int cna_len = 0, rc = 0;
struct sockaddr_storage cna_addr;
struct totem_ip_address totem_addr;
addr = strsep(&addr_copy, " ");
if(addr == NULL) {
break;
}
/* do_cmd_get_node_addrs */
if(strstr(addr, "ip(") == NULL) {
continue;
} else if(strchr(addr, ':')) {
rc = sscanf(addr, "ip(%[0-9A-Fa-f:])", ipaddr);
if(rc != 1) {
log_error("Could not extract IPv6 address from '%s'", addr);
continue;
}
addr_family = AF_INET6;
} else {
rc = sscanf(addr, "ip(%[0-9.]) ", ipaddr);
if(rc != 1) {
log_error("Could not extract IPv4 address from '%s'", addr);
continue;
}
}
rc = inet_pton(addr_family, ipaddr, &totem_addr);
if(rc != 1) {
log_error("Could not parse '%s' as in IPv%c address", ipaddr, (addr_family==AF_INET)?'4':'6');
continue;
}
rc = totemip_parse(&totem_addr, ipaddr, addr_family);
if(rc != 0) {
log_error("Could not convert '%s' into a totem address", ipaddr);
continue;
}
rc = totemip_totemip_to_sockaddr_convert(&totem_addr, 0, &cna_addr, &cna_len);
if(rc != 0) {
log_error("Could not convert totem address for '%s' into sockaddr", ipaddr);
continue;
}
log_debug("Adding address %s to configfs for node %u/%s ", addr, node->id, node->uname);
add_configfs_node(node->id, ((char*)&cna_addr), cna_len, (node->id == local_node_id));
} while(addr != NULL);
free(addr_top);
}
log_debug("%s %sctive node %u '%s': born-on=%llu, last-seen=%llu, this-event=%llu, last-event=%llu",
action, crm_is_member_active(value)?"a":"ina",
node->id, node->uname, node->born, node->last_seen,
crm_peer_seq, (unsigned long long)*last);
}
int is_cluster_member(int nodeid)
{
crm_node_t *node = crm_get_peer(nodeid, NULL);
return crm_is_member_active(node);
}
char *nodeid2name(int nodeid) {
crm_node_t *node = crm_get_peer(nodeid, NULL);
if(node->uname == NULL) {
return NULL;
}
return strdup(node->uname);
}
void kick_node_from_cluster(int nodeid)
{
log_error("%s not yet implemented", __FUNCTION__);
return;
}