File upgrade-to-influxdb-1.1.patch of Package openstack-monasca-persister-java
commit 9b36fbdd5dbc0dec6ede8e2ce0a1778555baf68f
Author: Kamil Choroba <kamil.choroba@est.fujitsu.com>
Date: Thu Feb 23 13:51:43 2017 +0100
Upgrade to influxdb 1.1
Added functionality to support influxdb 1.1.
Change-Id: I993a8b905451c9fef61c011d65e0512014b0dbba
diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxPoint.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxPoint.java
index 2d25152b4458..cbf0bc636f54 100644
--- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxPoint.java
+++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxPoint.java
@@ -1,5 +1,6 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
+ * Copyright (c) 2017 FUJITSU LIMITED
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,18 +20,20 @@ package monasca.persister.repository.influxdb;
import java.util.Map;
+import com.google.common.base.Joiner;
+
public class InfluxPoint {
private final String measurement;
private final Map<String, String> tags;
- private final String time;
+ private final Long time;
private final Map<String, Object> fields;
private final String Precision = "ms";
public InfluxPoint(
final String measurement,
final Map<String, String> tags,
- final String time,
+ final Long time,
final Map<String, Object> fields) {
this.measurement = measurement;
@@ -47,7 +50,7 @@ public class InfluxPoint {
return this.tags;
}
- public String getTime() {
+ public Long getTime() {
return this.time;
}
@@ -59,4 +62,14 @@ public class InfluxPoint {
return Precision;
}
+ // Create influxdb line protocol string
+ public String toInflux() {
+ return new StringBuilder(this.measurement).append(",")
+ .append(Joiner.on(",").join(this.tags.entrySet().iterator()))
+ .append(" ")
+ .append(Joiner.on(",").join(this.fields.entrySet().iterator()))
+ .append(" ").append(this.time)
+ .toString();
+ }
+
}
diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java
index 86f267e34ea9..bbd6de62b5dd 100644
--- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java
+++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java
@@ -1,5 +1,6 @@
/*
* (C) Copyright 2014-2016 Hewlett Packard Enterprise Development Company LP
+ * Copyright (c) 2017 FUJITSU LIMITED
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -27,8 +28,6 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategy;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormatter;
-import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,8 +47,6 @@ public class InfluxV9AlarmRepo extends InfluxAlarmRepo {
private final ObjectMapper objectMapper = new ObjectMapper();
- private final DateTimeFormatter dateFormatter = ISODateTimeFormat.dateTime();
-
@Inject
public InfluxV9AlarmRepo(
final Environment env,
@@ -200,9 +197,7 @@ public class InfluxV9AlarmRepo extends InfluxAlarmRepo {
valueMap.put("reason_data", "{}");
- DateTime dateTime = new DateTime(event.timestamp, DateTimeZone.UTC);
-
- String dateString = this.dateFormatter.print(dateTime);
+ Long dateTime = new DateTime(event.timestamp, DateTimeZone.UTC).getMillis();
Map<String, String> tags = new HashMap<>();
@@ -212,7 +207,7 @@ public class InfluxV9AlarmRepo extends InfluxAlarmRepo {
InfluxPoint
influxPoint =
- new InfluxPoint(ALARM_STATE_HISTORY_NAME, tags, dateString, valueMap);
+ new InfluxPoint(ALARM_STATE_HISTORY_NAME, tags, dateTime, valueMap);
influxPointList.add(influxPoint);
diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java
index 9aeb16b45524..3410e262fb7d 100644
--- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java
+++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java
@@ -1,6 +1,7 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
+ * Copyright (c) 2017 FUJITSU LIMITED
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +26,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import scala.collection.mutable.StringBuilder;
import io.dropwizard.setup.Environment;
import monasca.persister.repository.RepoException;
@@ -74,7 +76,7 @@ public class InfluxV9MetricRepo extends InfluxMetricRepo {
influxPoint =
new InfluxPoint(definition.getName(),
tagMap,
- measurement.getISOFormattedTimeString(),
+ measurement.getTime(),
buildValueMap(measurement));
influxPointList.add(influxPoint);
@@ -89,24 +91,24 @@ public class InfluxV9MetricRepo extends InfluxMetricRepo {
private Map<String, Object> buildValueMap(Measurement measurement) {
- Map<String, Object> valueMap = new HashMap<>();
-
- valueMap.put("value", measurement.getValue());
-
- String valueMetaJSONString = measurement.getValueMetaJSONString();
-
- if (valueMetaJSONString == null || valueMetaJSONString.isEmpty()) {
-
- valueMap.put("value_meta", "{}");
-
- } else {
-
- valueMap.put("value_meta", valueMetaJSONString);
-
+ Map<String, Object> valueMap = new HashMap<String, Object>();
+ Map<String, String> valueMeta = measurement.getValueMeta();
+
+ if (valueMeta != null && valueMeta.size() != 0) {
+ /*
+ * Refactor value meta strings like:
+ * "error: error(111, 'Connection refused'). * Connection failed after 3 ms"
+ * to "error: error(111, \"Connection refused\"). Connection failed after 3 ms"
+ * Otherwise persisting to influx will fail.
+ */
+ for (Map.Entry<String, String> entry : valueMeta.entrySet()) {
+ valueMap.put(entry.getKey(),
+ new StringBuilder("\"").append(entry.getValue().replaceAll("'", "\\\\\"")).append("\""));
+ }
}
+ valueMap.put("value", measurement.getValue());
return valueMap;
-
}
private Map<String, String> buildTagMap(Definition definition, Dimensions dimensions) {
diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java
index 273176e4ea7d..6ff7e15bb537 100644
--- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java
+++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java
@@ -1,5 +1,6 @@
/*
* Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
+ * Copyright (c) 2017 FUJITSU LIMITED
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,10 +22,6 @@ import monasca.persister.configuration.PersisterConfig;
import monasca.persister.repository.RepoException;
import com.google.inject.Inject;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
import org.apache.commons.codec.binary.Base64;
import org.apache.http.Header;
import org.apache.http.HeaderElement;
@@ -38,8 +35,8 @@ import org.apache.http.HttpStatus;
import org.apache.http.client.entity.EntityBuilder;
import org.apache.http.client.entity.GzipDecompressingEntity;
import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
import org.apache.http.entity.ContentType;
-import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
@@ -49,7 +46,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.HashMap;
public class InfluxV9RepoWriter {
@@ -67,17 +63,17 @@ public class InfluxV9RepoWriter {
private final String baseAuthHeader;
- private final ObjectMapper objectMapper = new ObjectMapper();
-
@Inject
public InfluxV9RepoWriter(final PersisterConfig config) {
this.influxName = config.getInfluxDBConfiguration().getName();
- this.influxUrl = config.getInfluxDBConfiguration().getUrl() + "/write";
this.influxUser = config.getInfluxDBConfiguration().getUser();
this.influxPass = config.getInfluxDBConfiguration().getPassword();
this.influxCreds = this.influxUser + ":" + this.influxPass;
this.influxRetentionPolicy = config.getInfluxDBConfiguration().getRetentionPolicy();
+ this.influxUrl = new StringBuilder(config.getInfluxDBConfiguration().getUrl()).
+ append("/write?db=").append(this.influxName).append("&precision=ms").
+ append("&rp=").append(this.influxRetentionPolicy).toString();
this.gzip = config.getInfluxDBConfiguration().getGzip();
this.baseAuthHeader = "Basic " + new String(Base64.encodeBase64(this.influxCreds.getBytes()));
@@ -132,12 +128,7 @@ public class InfluxV9RepoWriter {
request.addHeader("Content-Type", "application/json");
request.addHeader("Authorization", this.baseAuthHeader);
- InfluxWrite
- influxWrite =
- new InfluxWrite(this.influxName, this.influxRetentionPolicy, influxPointArry,
- new HashMap<String, String>());
-
- String jsonBody = getJsonBody(influxWrite);
+ byte[] byte_array = getBinary(influxPointArry);
if (this.gzip) {
@@ -147,8 +138,8 @@ public class InfluxV9RepoWriter {
requestEntity =
EntityBuilder
.create()
- .setText(jsonBody)
- .setContentType(ContentType.APPLICATION_JSON)
+ .setBinary(byte_array)
+ .setContentType(ContentType.DEFAULT_BINARY)
.setContentEncoding("UTF-8")
.gzipCompress()
.build();
@@ -161,9 +152,9 @@ public class InfluxV9RepoWriter {
logger.debug("[{}]: gzip set to false. sending non-gzip msg", id);
- StringEntity stringEntity = new StringEntity(jsonBody, "UTF-8");
+ ByteArrayEntity byteEntity = new ByteArrayEntity(byte_array);
- request.setEntity(stringEntity);
+ request.setEntity(byteEntity);
}
@@ -224,19 +215,11 @@ public class InfluxV9RepoWriter {
}
}
- private String getJsonBody(InfluxWrite influxWrite) throws RepoException {
-
- String json = null;
-
- try {
-
- json = this.objectMapper.writeValueAsString(influxWrite);
-
- } catch (JsonProcessingException e) {
-
- throw new RepoException("failed to serialize json", e);
+ private byte[] getBinary(InfluxPoint[] influxPointArry) {
+ StringBuilder binaryString = new StringBuilder();
+ for (InfluxPoint influxPoint : influxPointArry) {
+ binaryString.append(influxPoint.toInflux()).append(System.lineSeparator());
}
-
- return json;
+ return binaryString.toString().getBytes();
}
}