File upgrade-to-influxdb-1.1.patch of Package openstack-monasca-persister-java

commit 9b36fbdd5dbc0dec6ede8e2ce0a1778555baf68f
Author: Kamil Choroba <kamil.choroba@est.fujitsu.com>
Date:   Thu Feb 23 13:51:43 2017 +0100

    Upgrade to influxdb 1.1
    
    Added functionality to support influxdb 1.1.
    
    Change-Id: I993a8b905451c9fef61c011d65e0512014b0dbba

diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxPoint.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxPoint.java
index 2d25152b4458..cbf0bc636f54 100644
--- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxPoint.java
+++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxPoint.java
@@ -1,5 +1,6 @@
 /*
  * Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
+ * Copyright (c) 2017 FUJITSU LIMITED
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -19,18 +20,20 @@ package monasca.persister.repository.influxdb;
 
 import java.util.Map;
 
+import com.google.common.base.Joiner;
+
 public class InfluxPoint {
 
   private final String measurement;
   private final Map<String, String> tags;
-  private final String time;
+  private final Long time;
   private final Map<String, Object> fields;
   private final String Precision = "ms";
 
   public InfluxPoint(
       final String measurement,
       final Map<String, String> tags,
-      final String time,
+      final Long time,
       final Map<String, Object> fields) {
 
     this.measurement = measurement;
@@ -47,7 +50,7 @@ public class InfluxPoint {
     return this.tags;
   }
 
-  public String getTime() {
+  public Long getTime() {
     return this.time;
   }
 
@@ -59,4 +62,14 @@ public class InfluxPoint {
     return Precision;
   }
 
+  // Create influxdb line protocol string
+  public String toInflux() {
+    return new StringBuilder(this.measurement).append(",")
+      .append(Joiner.on(",").join(this.tags.entrySet().iterator()))
+      .append(" ")
+      .append(Joiner.on(",").join(this.fields.entrySet().iterator()))
+      .append(" ").append(this.time)
+      .toString();
+  }
+
 }
diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java
index 86f267e34ea9..bbd6de62b5dd 100644
--- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java
+++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9AlarmRepo.java
@@ -1,5 +1,6 @@
 /*
  * (C) Copyright 2014-2016 Hewlett Packard Enterprise Development Company LP
+ * Copyright (c) 2017 FUJITSU LIMITED
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -27,8 +28,6 @@ import com.fasterxml.jackson.databind.PropertyNamingStrategy;
 
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
-import org.joda.time.format.DateTimeFormatter;
-import org.joda.time.format.ISODateTimeFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,8 +47,6 @@ public class InfluxV9AlarmRepo extends InfluxAlarmRepo {
 
   private final ObjectMapper objectMapper = new ObjectMapper();
 
-  private final DateTimeFormatter dateFormatter = ISODateTimeFormat.dateTime();
-
   @Inject
   public InfluxV9AlarmRepo(
       final Environment env,
@@ -200,9 +197,7 @@ public class InfluxV9AlarmRepo extends InfluxAlarmRepo {
 
       valueMap.put("reason_data", "{}");
 
-      DateTime dateTime = new DateTime(event.timestamp, DateTimeZone.UTC);
-
-      String dateString = this.dateFormatter.print(dateTime);
+      Long dateTime = new DateTime(event.timestamp, DateTimeZone.UTC).getMillis();
 
       Map<String, String> tags = new HashMap<>();
 
@@ -212,7 +207,7 @@ public class InfluxV9AlarmRepo extends InfluxAlarmRepo {
 
       InfluxPoint
           influxPoint =
-          new InfluxPoint(ALARM_STATE_HISTORY_NAME, tags, dateString, valueMap);
+          new InfluxPoint(ALARM_STATE_HISTORY_NAME, tags, dateTime, valueMap);
 
       influxPointList.add(influxPoint);
 
diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java
index 9aeb16b45524..3410e262fb7d 100644
--- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java
+++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9MetricRepo.java
@@ -1,6 +1,7 @@
 
 /*
  * Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
+ * Copyright (c) 2017 FUJITSU LIMITED
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -25,6 +26,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
+import scala.collection.mutable.StringBuilder;
 import io.dropwizard.setup.Environment;
 import monasca.persister.repository.RepoException;
 
@@ -74,7 +76,7 @@ public class InfluxV9MetricRepo extends InfluxMetricRepo {
               influxPoint =
               new InfluxPoint(definition.getName(),
                               tagMap,
-                              measurement.getISOFormattedTimeString(),
+                              measurement.getTime(),
                               buildValueMap(measurement));
 
           influxPointList.add(influxPoint);
@@ -89,24 +91,24 @@ public class InfluxV9MetricRepo extends InfluxMetricRepo {
 
   private Map<String, Object> buildValueMap(Measurement measurement) {
 
-    Map<String, Object> valueMap = new HashMap<>();
-
-    valueMap.put("value", measurement.getValue());
-
-    String valueMetaJSONString = measurement.getValueMetaJSONString();
-
-    if (valueMetaJSONString == null || valueMetaJSONString.isEmpty()) {
-
-      valueMap.put("value_meta", "{}");
-
-    } else {
-
-      valueMap.put("value_meta", valueMetaJSONString);
-
+    Map<String, Object> valueMap = new HashMap<String, Object>();
+    Map<String, String> valueMeta = measurement.getValueMeta();
+
+    if (valueMeta != null && valueMeta.size() != 0) {
+      /*
+       * Refactor value meta strings like:
+       * "error: error(111, 'Connection refused'). * Connection failed after 3 ms"
+       * to "error: error(111, \"Connection refused\"). Connection failed after 3 ms"
+       * Otherwise persisting to influx will fail.
+       */
+      for (Map.Entry<String, String> entry : valueMeta.entrySet()) {
+        valueMap.put(entry.getKey(),
+          new StringBuilder("\"").append(entry.getValue().replaceAll("'", "\\\\\"")).append("\""));
+      }
     }
 
+    valueMap.put("value", measurement.getValue());
     return valueMap;
-
   }
 
   private Map<String, String> buildTagMap(Definition definition, Dimensions dimensions) {
diff --git a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java
index 273176e4ea7d..6ff7e15bb537 100644
--- a/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java
+++ b/java/src/main/java/monasca/persister/repository/influxdb/InfluxV9RepoWriter.java
@@ -1,5 +1,6 @@
 /*
  * Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
+ * Copyright (c) 2017 FUJITSU LIMITED
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -21,10 +22,6 @@ import monasca.persister.configuration.PersisterConfig;
 import monasca.persister.repository.RepoException;
 
 import com.google.inject.Inject;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-
 import org.apache.commons.codec.binary.Base64;
 import org.apache.http.Header;
 import org.apache.http.HeaderElement;
@@ -38,8 +35,8 @@ import org.apache.http.HttpStatus;
 import org.apache.http.client.entity.EntityBuilder;
 import org.apache.http.client.entity.GzipDecompressingEntity;
 import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ByteArrayEntity;
 import org.apache.http.entity.ContentType;
-import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
@@ -49,7 +46,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.HashMap;
 
 public class InfluxV9RepoWriter {
 
@@ -67,17 +63,17 @@ public class InfluxV9RepoWriter {
 
   private final String baseAuthHeader;
 
-  private final ObjectMapper objectMapper = new ObjectMapper();
-
   @Inject
   public InfluxV9RepoWriter(final PersisterConfig config) {
 
     this.influxName = config.getInfluxDBConfiguration().getName();
-    this.influxUrl = config.getInfluxDBConfiguration().getUrl() + "/write";
     this.influxUser = config.getInfluxDBConfiguration().getUser();
     this.influxPass = config.getInfluxDBConfiguration().getPassword();
     this.influxCreds = this.influxUser + ":" + this.influxPass;
     this.influxRetentionPolicy = config.getInfluxDBConfiguration().getRetentionPolicy();
+    this.influxUrl = new StringBuilder(config.getInfluxDBConfiguration().getUrl()).
+      append("/write?db=").append(this.influxName).append("&precision=ms").
+      append("&rp=").append(this.influxRetentionPolicy).toString();
     this.gzip = config.getInfluxDBConfiguration().getGzip();
 
     this.baseAuthHeader = "Basic " + new String(Base64.encodeBase64(this.influxCreds.getBytes()));
@@ -132,12 +128,7 @@ public class InfluxV9RepoWriter {
     request.addHeader("Content-Type", "application/json");
     request.addHeader("Authorization", this.baseAuthHeader);
 
-    InfluxWrite
-        influxWrite =
-        new InfluxWrite(this.influxName, this.influxRetentionPolicy, influxPointArry,
-                        new HashMap<String, String>());
-
-    String jsonBody = getJsonBody(influxWrite);
+    byte[] byte_array = getBinary(influxPointArry);
 
     if (this.gzip) {
 
@@ -147,8 +138,8 @@ public class InfluxV9RepoWriter {
           requestEntity =
           EntityBuilder
               .create()
-              .setText(jsonBody)
-              .setContentType(ContentType.APPLICATION_JSON)
+              .setBinary(byte_array)
+              .setContentType(ContentType.DEFAULT_BINARY)
               .setContentEncoding("UTF-8")
               .gzipCompress()
               .build();
@@ -161,9 +152,9 @@ public class InfluxV9RepoWriter {
 
       logger.debug("[{}]: gzip set to false. sending non-gzip msg", id);
 
-      StringEntity stringEntity = new StringEntity(jsonBody, "UTF-8");
+      ByteArrayEntity byteEntity = new ByteArrayEntity(byte_array);
 
-      request.setEntity(stringEntity);
+      request.setEntity(byteEntity);
 
     }
 
@@ -224,19 +215,11 @@ public class InfluxV9RepoWriter {
     }
   }
 
-  private String getJsonBody(InfluxWrite influxWrite) throws RepoException {
-
-    String json = null;
-
-    try {
-
-      json = this.objectMapper.writeValueAsString(influxWrite);
-
-    } catch (JsonProcessingException e) {
-
-      throw new RepoException("failed to serialize json", e);
+  private byte[] getBinary(InfluxPoint[] influxPointArry) {
+    StringBuilder binaryString = new StringBuilder();
+    for (InfluxPoint influxPoint : influxPointArry) {
+      binaryString.append(influxPoint.toInflux()).append(System.lineSeparator());
     }
-
-    return json;
+    return binaryString.toString().getBytes();
   }
 }
openSUSE Build Service is sponsored by