File 0005-WIP-Add-the-mmkubernetes-plugin.patch of Package rsyslog.24024

From 77886e21292d8220f93b3404236da0e8f7159255 Mon Sep 17 00:00:00 2001
From: Tomas Heinrich <theinric@redhat.com>
Date: Tue, 8 Mar 2016 17:07:55 +0100
Subject: [PATCH] WIP - Add the mmkubernetes plugin

DO NOT MERGE - This is a work-in-progress.
---
 Makefile.am                         |   5 +
 configure.ac                        |  21 ++
 contrib/mmkubernetes/Makefile.am    |   6 +
 contrib/mmkubernetes/mmkubernetes.c | 673 ++++++++++++++++++++++++++++++++++++
 contrib/mmkubernetes/sample.conf    |  12 +
 5 files changed, 717 insertions(+)
 create mode 100644 contrib/mmkubernetes/Makefile.am
 create mode 100644 contrib/mmkubernetes/mmkubernetes.c
 create mode 100644 contrib/mmkubernetes/sample.conf

diff --git a/Makefile.am b/Makefile.am
index 1f73ae5a7..c874c1cdc 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -283,6 +283,11 @@ if ENABLE_OMTCL
 SUBDIRS += contrib/omtcl
 endif
 
+# mmkubernetes
+if ENABLE_MMKUBERNETES
+SUBDIRS += contrib/mmkubernetes
+endif
+
 # tests are added as last element, because tests may need different
 # modules that need to be generated first
 SUBDIRS += tests
diff --git a/configure.ac b/configure.ac
index d36809c74..bb737bbc6 100644
--- a/configure.ac
+++ b/configure.ac
@@ -2116,6 +2116,25 @@ AM_CONDITIONAL(ENABLE_OMTCL, test x$enable_omtcl = xyes)
 
 # END TCL SUPPORT
 
+# mmkubernetes - Kubernetes metadata support
+
+AC_ARG_ENABLE(mmkubernetes,
+        [AS_HELP_STRING([--enable-mmkubernetes],
+            [Enable compilation of the mmkubernetes module @<:@default=no@:>@])],
+        [case "${enableval}" in
+         yes) enable_mmkubernetes="yes" ;;
+          no) enable_mmkubernetes="no" ;;
+           *) AC_MSG_ERROR(bad value ${enableval} for --enable-mmkubernetes) ;;
+         esac],
+        [enable_mmkubernetes=no]
+)
+if test "x$enable_mmkubernetes" = "xyes"; then
+        PKG_CHECK_MODULES([CURL], [libcurl])
+fi
+AM_CONDITIONAL(ENABLE_MMKUBERNETES, test x$enable_mmkubernetes = xyes)
+
+# END Kubernetes metadata support
+
 # man pages
 have_to_generate_man_pages="no"
 git_src_have_to_generate_man_pages="yes" # default to use when building from git source
@@ -2247,6 +2266,7 @@ AC_CONFIG_FILES([Makefile \
 		contrib/omhttpfs/Makefile \
 		contrib/omamqp1/Makefile \
 		contrib/omtcl/Makefile \
+		contrib/mmkubernetes/Makefile \
 		tests/Makefile])
 AC_OUTPUT
 
@@ -2324,6 +2344,7 @@ echo "    mmsequence enabled:                       $enable_mmsequence"
 echo "    mmdblookup enabled:                       $enable_mmdblookup"
 echo "    mmfields enabled:                         $enable_mmfields"
 echo "    mmrm1stspace module enabled:              $enable_mmrm1stspace"
+echo "    mmkubernetes enabled:                     $enable_mmkubernetes"
 echo
 echo "---{ database support }---"
 echo "    MySql support enabled:                    $enable_mysql"
diff --git a/contrib/mmkubernetes/Makefile.am b/contrib/mmkubernetes/Makefile.am
new file mode 100644
index 000000000..02af334d4
--- /dev/null
+++ b/contrib/mmkubernetes/Makefile.am
@@ -0,0 +1,6 @@
+pkglib_LTLIBRARIES = mmkubernetes.la
+
+mmkubernetes_la_SOURCES = mmkubernetes.c
+mmkubernetes_la_CPPFLAGS = $(RSRT_CFLAGS) $(PTHREADS_CFLAGS) $(CURL_CFLAGS)
+mmkubernetes_la_LDFLAGS = -module -avoid-version
+mmkubernetes_la_LIBADD = $(CURL_LIBS)
diff --git a/contrib/mmkubernetes/mmkubernetes.c b/contrib/mmkubernetes/mmkubernetes.c
new file mode 100644
index 000000000..0d5502db6
--- /dev/null
+++ b/contrib/mmkubernetes/mmkubernetes.c
@@ -0,0 +1,673 @@
+/* mmkubernetes.c
+ * This is a message modification module. It uses metadata obtained
+ * from the message to query Kubernetes and obtain additional metadata
+ * relating to the container instance.
+ *
+ * Inspired by:
+ * https://github.com/fabric8io/fluent-plugin-kubernetes_metadata_filter
+ *
+ * NOTE: read comments in module-template.h for details on the calling interface!
+ *
+ * Copyright 2016 Red Hat Inc.
+ *
+ * This file is part of rsyslog.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *       -or-
+ *       see COPYING.ASL20 in the source distribution
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* todo:
+ * - cache cleanup
+ * - SIGHUP handling
+ * - authentication
+ * - statistics generation
+ * - other missing configuration options
+ * - documentation
+ * - more robust error-handling + debugging information
+ * - batching, parallel queries, failover, ...
+ */
+
+/* needed for asprintf */
+#ifndef _GNU_SOURCE
+#  define _GNU_SOURCE
+#endif
+
+#include "config.h"
+#include "rsyslog.h"
+#include <stdio.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <errno.h>
+#include <unistd.h>
+#include <libestr.h>
+#include <json.h>
+#include <curl/curl.h>
+#include <curl/easy.h>
+#include <pthread.h>
+#include "conf.h"
+#include "syslogd-types.h"
+#include "module-template.h"
+#include "errmsg.h"
+#include "regexp.h"
+#include "hashtable.h"
+
+/* static data */
+MODULE_TYPE_OUTPUT /* this is technically an output plugin */
+MODULE_TYPE_KEEP /* releasing the module would cause a leak through libcurl */
+MODULE_CNFNAME("mmkubernetes")
+DEF_OMOD_STATIC_DATA
+DEFobjCurrIf(errmsg)
+DEFobjCurrIf(regexp)
+
+#define DFLT_FILENAME_REGEX "var\\.log\\.containers\\.([a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)_([^_]+)_(.+)-([a-z0-9]{64})\\.log$"
+#define DFLT_SRCMD_PATH "$!metadata!filename"
+#define DFLT_DSTMD_PATH "$!metadata"
+
+static struct cache_s {
+	const uchar *kbUrl;
+	struct hashtable *mdHt;
+	struct hashtable *nsHt;
+	pthread_mutex_t *cacheMtx;
+} **caches;
+
+/* module configuration data */
+struct modConfData_s {
+	rsconf_t *pConf;	/* our overall config object */
+	uchar *kubernetesUrl;	/**< where to place queries */
+	uchar *srcMetadataPath;	/**< where to get data for kubernetes queries */
+	uchar *dstMetadataPath;	/**< where to put metadata obtained from kubernetes */
+};
+
+/* action (instance) configuration data */
+typedef struct _instanceData {
+	uchar *kubernetesUrl;	/**< where to place queries */
+	uchar *srcMetadataPath;	/**< where to get data for kubernetes queries */
+	uchar *dstMetadataPath;	/**< where to put metadata obtained from kubernetes */
+	regex_t fnRegex;
+	struct cache_s *cache;
+} instanceData;
+
+typedef struct wrkrInstanceData {
+	instanceData *pData;
+	CURL *curlCtx;
+	struct curl_slist *curlHdr;
+	char *curlRply;
+	size_t curlRplyLen;
+} wrkrInstanceData_t;
+
+/* module parameters (v6 config format) */
+static struct cnfparamdescr modpdescr[] = {
+	{ "kubernetesurl", eCmdHdlrString, 0 },
+	{ "srcmetadatapath", eCmdHdlrString, 0 },
+	{ "dstmetadatapath", eCmdHdlrString, 0 }
+};
+static struct cnfparamblk modpblk = {
+	CNFPARAMBLK_VERSION,
+	sizeof(modpdescr)/sizeof(struct cnfparamdescr),
+	modpdescr
+};
+
+/* action (instance) parameters (v6 config format) */
+static struct cnfparamdescr actpdescr[] = {
+	{ "kubernetesurl", eCmdHdlrString, 0 },
+	{ "srcmetadatapath", eCmdHdlrString, 0 },
+	{ "dstmetadatapath", eCmdHdlrString, 0 }
+};
+static struct cnfparamblk actpblk =
+	{ CNFPARAMBLK_VERSION,
+	  sizeof(actpdescr)/sizeof(struct cnfparamdescr),
+	  actpdescr
+	};
+
+static modConfData_t *loadModConf = NULL;	/* modConf ptr to use for the current load process */
+static modConfData_t *runModConf = NULL;	/* modConf ptr to use for the current exec process */
+
+
+BEGINbeginCnfLoad
+CODESTARTbeginCnfLoad
+	loadModConf = pModConf;
+	pModConf->pConf = pConf;
+ENDbeginCnfLoad
+
+
+BEGINsetModCnf
+	struct cnfparamvals *pvals = NULL;
+	int i;
+CODESTARTsetModCnf
+	pvals = nvlstGetParams(lst, &modpblk, NULL);
+	if(pvals == NULL) {
+		errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "mmkubernetes: "
+			"error processing module config parameters [module(...)]");
+		ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+	}
+
+	if(Debug) {
+		dbgprintf("module (global) param blk for mmkubernetes:\n");
+		cnfparamsPrint(&modpblk, pvals);
+	}
+
+	for(i = 0 ; i < modpblk.nParams ; ++i) {
+		if(!pvals[i].bUsed) {
+			continue;
+		} else if(!strcmp(modpblk.descr[i].name, "kubernetesurl")) {
+			free(loadModConf->kubernetesUrl);
+			loadModConf->kubernetesUrl = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+		} else if(!strcmp(modpblk.descr[i].name, "srcmetadatapath")) {
+			free(loadModConf->srcMetadataPath);
+			loadModConf->srcMetadataPath = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+			/* todo: sanitize the path */
+		} else if(!strcmp(modpblk.descr[i].name, "dstmetadatapath")) {
+			free(loadModConf->dstMetadataPath);
+			loadModConf->dstMetadataPath = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+			/* todo: sanitize the path */
+		} else {
+			dbgprintf("mmkubernetes: program error, non-handled "
+				"param '%s' in module() block\n", modpblk.descr[i].name);
+			/* todo: error message? */
+		}
+	}
+
+	/* set defaults */
+	if(loadModConf->srcMetadataPath == NULL)
+		loadModConf->srcMetadataPath = (uchar *) strdup(DFLT_SRCMD_PATH);
+	if(loadModConf->dstMetadataPath == NULL)
+		loadModConf->dstMetadataPath = (uchar *) strdup(DFLT_DSTMD_PATH);
+
+	caches = calloc(1, sizeof(struct cache_s *));
+
+finalize_it:
+	if(pvals != NULL)
+		cnfparamvalsDestruct(pvals, &modpblk);
+ENDsetModCnf
+
+
+BEGINcreateInstance
+CODESTARTcreateInstance
+ENDcreateInstance
+
+
+BEGINfreeInstance
+CODESTARTfreeInstance
+	free(pData->kubernetesUrl);
+	free(pData->srcMetadataPath);
+	free(pData->dstMetadataPath);
+	regexp.regfree(&pData->fnRegex);
+ENDfreeInstance
+
+size_t curlCB(char *data, size_t size, size_t nmemb, void *usrptr)
+{
+	wrkrInstanceData_t *pWrkrData = (wrkrInstanceData_t *) usrptr;
+	char * buf;
+	size_t newlen;
+
+	newlen = pWrkrData->curlRplyLen + size * nmemb;
+	buf = realloc(pWrkrData->curlRply, newlen);
+	memcpy(buf + pWrkrData->curlRplyLen, data, size * nmemb);
+	pWrkrData->curlRply = buf;
+	pWrkrData->curlRplyLen = newlen;
+
+	return size * nmemb;
+}
+
+BEGINcreateWrkrInstance
+CODESTARTcreateWrkrInstance
+	CURL *ctx;
+	struct curl_slist *hdr;
+
+	hdr = curl_slist_append(NULL, "Content-Type: text/json; charset=utf-8");
+	pWrkrData->curlHdr = hdr;
+	ctx = curl_easy_init();
+	curl_easy_setopt(ctx, CURLOPT_HTTPHEADER, hdr);
+	curl_easy_setopt(ctx, CURLOPT_WRITEFUNCTION, curlCB);
+	curl_easy_setopt(ctx, CURLOPT_WRITEDATA, pWrkrData);
+	pWrkrData->curlCtx = ctx;
+ENDcreateWrkrInstance
+
+
+BEGINfreeWrkrInstance
+CODESTARTfreeWrkrInstance
+	curl_easy_cleanup(pWrkrData->curlCtx);
+	curl_slist_free_all(pWrkrData->curlHdr);
+ENDfreeWrkrInstance
+
+
+static struct cache_s *cacheNew(const uchar *url)
+{
+	struct cache_s *cache;
+
+	cache = calloc(1, sizeof(struct cache_s));
+	cache->kbUrl = url;
+	cache->mdHt = create_hashtable(100, hash_from_string,
+		key_equals_string, (void (*)(void *)) json_object_put);
+	cache->nsHt = create_hashtable(100, hash_from_string,
+		key_equals_string, (void (*)(void *)) json_object_put);
+	cache->cacheMtx = malloc(sizeof(pthread_mutex_t));
+	pthread_mutex_init(cache->cacheMtx, NULL);
+
+	return cache;
+}
+
+
+static void cacheFree(struct cache_s *cache)
+{
+	hashtable_destroy(cache->mdHt, 1);
+	hashtable_destroy(cache->nsHt, 1);
+	pthread_mutex_destroy(cache->cacheMtx);
+	free(cache->cacheMtx);
+	free(cache);
+}
+
+
+BEGINnewActInst
+	struct cnfparamvals *pvals = NULL;
+	int i, ret;
+CODESTARTnewActInst
+	DBGPRINTF("newActInst (mmkubernetes)\n");
+
+	pvals = nvlstGetParams(lst, &actpblk, NULL);
+	if(pvals == NULL) {
+		errmsg.LogError(0, RS_RET_MISSING_CNFPARAMS, "mmkubernetes: "
+			"error processing config parameters [action(...)]");
+		ABORT_FINALIZE(RS_RET_MISSING_CNFPARAMS);
+	}
+
+	if(Debug) {
+		dbgprintf("action param blk in mmkubernetes:\n");
+		cnfparamsPrint(&actpblk, pvals);
+	}
+
+	CODE_STD_STRING_REQUESTnewActInst(1)
+	CHKiRet(OMSRsetEntry(*ppOMSR, 0, NULL, OMSR_TPL_AS_MSG));
+	CHKiRet(createInstance(&pData));
+
+	for(i = 0 ; i < actpblk.nParams ; ++i) {
+		if(!pvals[i].bUsed) {
+			continue;
+		} else if(!strcmp(actpblk.descr[i].name, "kubernetesurl")) {
+			free(pData->kubernetesUrl);
+			pData->kubernetesUrl = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+		} else if(!strcmp(actpblk.descr[i].name, "srcmetadatapath")) {
+			free(pData->srcMetadataPath);
+			pData->srcMetadataPath = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+			/* todo: sanitize the path */
+		} else if(!strcmp(actpblk.descr[i].name, "dstmetadatapath")) {
+			free(pData->dstMetadataPath);
+			pData->dstMetadataPath = (uchar *) es_str2cstr(pvals[i].val.d.estr, NULL);
+			/* todo: sanitize the path */
+		} else {
+			dbgprintf("mmkubernetes: program error, non-handled "
+				"param '%s' in action() block\n", actpblk.descr[i].name);
+			/* todo: error message? */
+		}
+	}
+
+	if(pData->kubernetesUrl == NULL) {
+		if(loadModConf->kubernetesUrl == NULL)
+			ABORT_FINALIZE(RS_RET_CONFIG_ERROR);
+		pData->kubernetesUrl = (uchar *) strdup((char *) loadModConf->kubernetesUrl);
+	}
+	if(pData->srcMetadataPath == NULL)
+		pData->srcMetadataPath = (uchar *) strdup((char *) loadModConf->srcMetadataPath);
+	if(pData->dstMetadataPath == NULL)
+		pData->dstMetadataPath = (uchar *) strdup((char *) loadModConf->dstMetadataPath);
+
+	/* todo: make file regexp configurable */
+	ret = regexp.regcomp(&pData->fnRegex, DFLT_FILENAME_REGEX, REG_EXTENDED);
+	if(ret) {
+		dbgprintf("mmkubernetes: regexp compilation failed with code %d.\n", ret);
+		ABORT_FINALIZE(RS_RET_ERR);
+	}
+
+	/* get the cache for this url */
+	for(i = 0; caches[i] != NULL; i++) {
+		if(!strcmp((char *) pData->kubernetesUrl, (char *) caches[i]->kbUrl))
+			break;
+	}
+	if(caches[i] != NULL) {
+		pData->cache = caches[i];
+	} else {
+		pData->cache = cacheNew(pData->kubernetesUrl);
+
+		caches = realloc(caches, (i + 2) * sizeof(struct cache_s *));
+		caches[i] = pData->cache;
+		caches[i + 1] = NULL;
+	}
+CODE_STD_FINALIZERnewActInst
+	if(pvals != NULL)
+		cnfparamvalsDestruct(pvals, &actpblk);
+ENDnewActInst
+
+
+/* legacy config format is not supported */
+BEGINparseSelectorAct
+CODESTARTparseSelectorAct
+CODE_STD_STRING_REQUESTparseSelectorAct(1)
+	if(strncmp((char *) p, ":mmkubernetes:", sizeof(":mmkubernetes:") - 1)) {
+		errmsg.LogError(0, RS_RET_LEGA_ACT_NOT_SUPPORTED,
+			"mmkubernetes supports only v6+ config format, use: "
+			"action(type=\"mmkubernetes\" ...)");
+	}
+	ABORT_FINALIZE(RS_RET_CONFLINE_UNPROCESSED);
+CODE_STD_FINALIZERparseSelectorAct
+ENDparseSelectorAct
+
+
+BEGINendCnfLoad
+CODESTARTendCnfLoad
+ENDendCnfLoad
+
+
+BEGINcheckCnf
+CODESTARTcheckCnf
+ENDcheckCnf
+
+
+BEGINactivateCnf
+CODESTARTactivateCnf
+	runModConf = pModConf;
+ENDactivateCnf
+
+
+BEGINfreeCnf
+CODESTARTfreeCnf
+	int i;
+
+	free(pModConf->kubernetesUrl);
+	free(pModConf->srcMetadataPath);
+	free(pModConf->dstMetadataPath);
+	for(i = 0; caches[i] != NULL; i++)
+		cacheFree(caches[i]);
+	free(caches);
+ENDfreeCnf
+
+
+BEGINdbgPrintInstInfo
+CODESTARTdbgPrintInstInfo
+	dbgprintf("mmkubernetes\n");
+	dbgprintf("\tkubernetesUrl='%s'\n", pData->kubernetesUrl);
+	dbgprintf("\tsrcMetadataPath='%s'\n", pData->srcMetadataPath);
+	dbgprintf("\tdstMetadataPath='%s'\n", pData->dstMetadataPath);
+ENDdbgPrintInstInfo
+
+
+BEGINtryResume
+CODESTARTtryResume
+ENDtryResume
+
+
+static rsRetVal
+extractMsgMetadata(smsg_t *pMsg, uchar *propName, regex_t *fnRegex,
+	char **podName, char **ns, char **contName, char **dockerID)
+{
+	DEFiRet;
+	msgPropDescr_t prop;
+	char *filename, *p;
+	rs_size_t fnLen;
+	unsigned short freeFn = 0;
+	size_t nmatch = 8, len;
+	regmatch_t pmatch[nmatch];
+
+	/* extract metadata from the file name */
+	msgPropDescrFill(&prop, propName, strlen((char *) propName));
+	filename = (char *) MsgGetProp(pMsg, NULL, &prop, &fnLen, &freeFn, NULL);
+	dbgprintf("mmkubernetes: filename: '%s'.\n", filename);
+	msgPropDescrDestruct(&prop);
+	if(filename == NULL)
+		ABORT_FINALIZE(RS_RET_NOT_FOUND);
+
+	if(REG_NOMATCH == regexp.regexec(fnRegex, filename, nmatch, pmatch, 0))
+		ABORT_FINALIZE(RS_RET_NOT_FOUND);
+
+	if(pmatch[1].rm_so != -1) {
+		len = pmatch[1].rm_eo - pmatch[1].rm_so;
+		p = malloc(len + 1);
+		memcpy(p, filename + pmatch[1].rm_so, len);
+		p[len] = '\0';
+		*podName = p;
+	}
+	if(pmatch[5].rm_so != -1) {
+		len = pmatch[5].rm_eo - pmatch[5].rm_so;
+		p = malloc(len + 1);
+		memcpy(p, filename + pmatch[5].rm_so, len);
+		p[len] = '\0';
+		*ns = p;
+	}
+	if(pmatch[6].rm_so != -1) {
+		len = pmatch[6].rm_eo - pmatch[6].rm_so;
+		p = malloc(len + 1);
+		memcpy(p, filename + pmatch[6].rm_so, len);
+		p[len] = '\0';
+		*contName = p;
+	}
+	if(pmatch[7].rm_so != -1) {
+		len = pmatch[7].rm_eo - pmatch[7].rm_so;
+		p = malloc(len + 1);
+		memcpy(p, filename + pmatch[7].rm_so, len);
+		p[len] = '\0';
+		*dockerID = p;
+	}
+
+finalize_it:
+	if(freeFn)
+		free(filename);
+	RETiRet;
+}
+
+
+static rsRetVal
+queryKB(wrkrInstanceData_t *pWrkrData, char *url, struct json_object **rply)
+{
+	DEFiRet;
+	CURLcode ccode;
+	struct json_tokener *jt = NULL;
+	struct json_object *jo;
+
+	/* query kubernetes for pod info */
+	ccode = curl_easy_setopt(pWrkrData->curlCtx, CURLOPT_URL, url);
+	if(ccode != CURLE_OK)
+		ABORT_FINALIZE(RS_RET_ERR);
+	ccode = curl_easy_perform(pWrkrData->curlCtx);
+	switch(ccode) {
+		case CURLE_COULDNT_CONNECT:
+		case CURLE_COULDNT_RESOLVE_HOST:
+		case CURLE_COULDNT_RESOLVE_PROXY:
+		case CURLE_HTTP_RETURNED_ERROR:
+		case CURLE_WRITE_ERROR:
+			dbgprintf("mmkubernetes: curl connection failed "
+				"with code %d.\n", ccode);
+			ABORT_FINALIZE(RS_RET_ERR);
+		default:
+			break;
+	}
+
+	/* parse retrieved data */
+	jt = json_tokener_new();
+	json_tokener_reset(jt);
+	jo = json_tokener_parse_ex(jt, pWrkrData->curlRply, pWrkrData->curlRplyLen);
+	json_tokener_free(jt);
+	if(!json_object_is_type(jo, json_type_object)) {
+		json_object_put(jo);
+		ABORT_FINALIZE(RS_RET_JSON_PARSE_ERR);
+	}
+
+	dbgprintf("mmkubernetes: queryKB reply:\n%s\n",
+		json_object_to_json_string_ext(jo, JSON_C_TO_STRING_PRETTY));
+
+	*rply = jo;
+
+finalize_it:
+	if(pWrkrData->curlRply != NULL) {
+		free(pWrkrData->curlRply);
+		pWrkrData->curlRply = NULL;
+		pWrkrData->curlRplyLen = 0;
+	}
+	RETiRet;
+}
+
+
+/* versions < 8.16.0 don't support BEGINdoAction_NoStrings */
+#if defined(BEGINdoAction_NoStrings)
+BEGINdoAction_NoStrings
+	smsg_t **ppMsg = (smsg_t **) pMsgData;
+	smsg_t *pMsg = ppMsg[0];
+#else
+BEGINdoAction
+	smsg_t *pMsg = (smsg_t*) ppString[0];
+#endif
+	char *podName = NULL, *ns = NULL, *containerName = NULL,
+		*dockerID = NULL, *mdKey = NULL;
+	struct json_object *jMetadata = NULL;
+CODESTARTdoAction
+	CHKiRet_Hdlr(extractMsgMetadata(pMsg, pWrkrData->pData->srcMetadataPath,
+		&pWrkrData->pData->fnRegex, &podName, &ns, &containerName, &dockerID)) {
+		ABORT_FINALIZE((iRet == RS_RET_NOT_FOUND) ? RS_RET_OK : iRet);
+	}
+
+	assert(podName != NULL);
+	assert(ns != NULL);
+	assert(containerName != NULL);
+	assert(dockerID != NULL);
+
+	dbgprintf("mmkubernetes:\n  podName: '%s'\n  namespace: '%s'\n  containerName: '%s'\n"
+		"  dockerID: '%s'\n", podName, ns, containerName, dockerID);
+
+	/* check cache for metadata */
+	asprintf(&mdKey, "%s_%s_%s", ns, podName, containerName);
+	pthread_mutex_lock(pWrkrData->pData->cache->cacheMtx);
+	jMetadata = hashtable_search(pWrkrData->pData->cache->mdHt, mdKey);
+
+	if(jMetadata == NULL) {
+		char *url = NULL;
+		struct json_object *jReply = NULL, *jo = NULL, *jo2 = NULL, *jNewNS = NULL;
+
+		/* check cache for namespace id */
+		jo2 = hashtable_search(pWrkrData->pData->cache->nsHt, ns);
+		pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
+
+		if(jo2 == NULL) {
+			/* query kubernetes for namespace info */
+			/* todo: move url definitions elsewhere */
+			asprintf(&url, "%s/api/v1/namespaces/%s",
+				 (char *) pWrkrData->pData->kubernetesUrl, ns);
+			iRet = queryKB(pWrkrData, url, &jReply);
+			free(url);
+			/* todo: opt to ignore the missing uid? */
+			CHKiRet(iRet);
+
+			if(fjson_object_object_get_ex(jReply, "metadata", &jo2)
+				&& fjson_object_object_get_ex(jo2, "uid", &jo2))
+				jo2 = jNewNS = json_object_get(jo2);
+			else
+				jo2 = NULL;
+
+			json_object_put(jReply);
+		}
+
+		asprintf(&url, "%s/api/v1/namespaces/%s/pods/%s",
+			 (char *) pWrkrData->pData->kubernetesUrl, ns, podName);
+		iRet = queryKB(pWrkrData, url, &jReply);
+		free(url);
+		if(iRet != RS_RET_OK) {
+			if(jNewNS) {
+				pthread_mutex_lock(pWrkrData->pData->cache->cacheMtx);
+				hashtable_insert(pWrkrData->pData->cache->nsHt, ns, jNewNS);
+				ns = NULL;
+				pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
+			}
+			FINALIZE;
+		}
+
+		jo = json_object_new_object();
+		if(jo2)
+			json_object_object_add(jo, "namespace_id", json_object_get(jo2));
+		if(fjson_object_object_get_ex(jReply, "nodeName", &jo2))
+			json_object_object_add(jo, "host", json_object_get(jo2));
+		if(fjson_object_object_get_ex(jReply, "uid", &jo2))
+			json_object_object_add(jo, "pod_id", json_object_get(jo2));
+		/* todo: labels */
+		json_object_put(jReply);
+
+		json_object_object_add(jo, "namespace", json_object_new_string(ns));
+		json_object_object_add(jo, "pod_name", json_object_new_string(podName));
+		json_object_object_add(jo, "container_name", json_object_new_string(containerName));
+		jMetadata = json_object_new_object();
+		json_object_object_add(jMetadata, "kubernetes", jo);
+		jo = json_object_new_object();
+		json_object_object_add(jo, "container_id", json_object_new_string(dockerID));
+		json_object_object_add(jMetadata, "docker", jo);
+
+		pthread_mutex_lock(pWrkrData->pData->cache->cacheMtx);
+		hashtable_insert(pWrkrData->pData->cache->mdHt, mdKey, jMetadata);
+		mdKey = NULL;
+		if(jNewNS) {
+			hashtable_insert(pWrkrData->pData->cache->nsHt, ns, jNewNS);
+			ns = NULL;
+		}
+		pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
+	} else {
+		pthread_mutex_unlock(pWrkrData->pData->cache->cacheMtx);
+	}
+
+	/* the +1 is there to skip the leading '$' */
+	msgAddJSON(pMsg, (uchar *) pWrkrData->pData->dstMetadataPath + 1, json_object_get(jMetadata), 0, 0);
+
+finalize_it:
+	free(podName);
+	free(ns);
+	free(containerName);
+	free(dockerID);
+	free(mdKey);
+ENDdoAction
+
+
+BEGINisCompatibleWithFeature
+CODESTARTisCompatibleWithFeature
+ENDisCompatibleWithFeature
+
+
+/* all the macros bellow have to be in a specific order */
+BEGINmodExit
+CODESTARTmodExit
+	curl_global_cleanup();
+
+	objRelease(regexp, LM_REGEXP_FILENAME);
+	objRelease(errmsg, CORE_COMPONENT);
+ENDmodExit
+
+
+BEGINqueryEtryPt
+CODESTARTqueryEtryPt
+CODEqueryEtryPt_STD_OMOD_QUERIES
+CODEqueryEtryPt_STD_OMOD8_QUERIES
+CODEqueryEtryPt_STD_CONF2_QUERIES
+CODEqueryEtryPt_STD_CONF2_setModCnf_QUERIES
+CODEqueryEtryPt_STD_CONF2_OMOD_QUERIES
+ENDqueryEtryPt
+
+
+BEGINmodInit()
+CODESTARTmodInit
+	*ipIFVersProvided = CURR_MOD_IF_VERSION; /* we only support the current interface specification */
+CODEmodInit_QueryRegCFSLineHdlr
+	DBGPRINTF("mmkubernetes: module compiled with rsyslog version %s.\n", VERSION);
+	CHKiRet(objUse(errmsg, CORE_COMPONENT));
+	CHKiRet(objUse(regexp, LM_REGEXP_FILENAME));
+
+	/* CURL_GLOBAL_ALL initializes more than is needed but the
+	 * libcurl documentation discourages use of other values
+	 */
+	curl_global_init(CURL_GLOBAL_ALL);
+ENDmodInit
diff --git a/contrib/mmkubernetes/sample.conf b/contrib/mmkubernetes/sample.conf
new file mode 100644
index 000000000..face6066f
--- /dev/null
+++ b/contrib/mmkubernetes/sample.conf
@@ -0,0 +1,12 @@
+module(load="mmkubernetes" kubernetesurl="http://localhost:5000")
+module(load="imfile")
+
+input(type="imfile" file="/var/log/ctrs/*" tag="ctr" addmetadata="on")
+
+action(type="mmkubernetes")
+
+template(name="tpl" type="list") {
+    property(name="jsonmesg")
+    constant(value="\n")
+}
+action(type="omfile" file="/var/log/ctrs.log" template="tpl")
-- 
2.16.4

openSUSE Build Service is sponsored by