File cassandra.patch of Package openstack-monasca-persister-java

commit 89efc8c491b6cc04c1d31057a0c38ac1c93798f0
Author: James Gu <jgu@suse.com>
Date:   Thu Aug 31 15:17:11 2017 -0700
Status: not upstream, local backport to Pike
URL: taken from https://review.openstack.org/#/c/527855/

    Add Cassandra db support
    
    Added cassandra db plugins in persister, inlcuding Java and
    python version.
    
    Depends-On: I9aa250bab206438f9b98bd211d472c0d8fe12bde
    Change-Id: I4bbe48f8fe550385de6ed3e14120850a8542c7c9
    story: 2001231
    task: 5758

diff --git a/java/pom.xml b/java/pom.xml
index e7812aa3edc5..b9ea7277a461 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -49,6 +49,26 @@
             <artifactId>monasca-common-influxdb</artifactId>
             <version>${mon.common.version}</version>
         </dependency>
+        <dependency>
+            <groupId>monasca-common</groupId>
+            <artifactId>monasca-common-cassandra</artifactId>
+            <version>${mon.common.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.datastax.cassandra</groupId>
+            <artifactId>cassandra-driver-core</artifactId>
+            <version>3.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.datastax.cassandra</groupId>
+            <artifactId>cassandra-driver-mapping</artifactId>
+            <version>3.1.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.datastax.cassandra</groupId>
+            <artifactId>cassandra-driver-extras</artifactId>
+            <version>3.1.0</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka_2.11</artifactId>
@@ -72,6 +92,12 @@
             <groupId>io.dropwizard</groupId>
             <artifactId>dropwizard-core</artifactId>
             <version>0.7.0</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.codahale.metrics</groupId>
+                    <artifactId>metrics-core</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>io.dropwizard</groupId>
@@ -88,6 +114,11 @@
             <artifactId>guice-assistedinject</artifactId>
             <version>3.0</version>
         </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>17.0</version>
+        </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-all</artifactId>
diff --git a/java/src/main/java/monasca/persister/PersisterModule.java b/java/src/main/java/monasca/persister/PersisterModule.java
index 67e49539a5cc..54e362cf3fca 100644
--- a/java/src/main/java/monasca/persister/PersisterModule.java
+++ b/java/src/main/java/monasca/persister/PersisterModule.java
@@ -1,6 +1,8 @@
 /*
  * Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
  *
+ * Copyright (c) 2017 SUSE LLC.
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -46,6 +48,9 @@ import monasca.persister.pipeline.event.AlarmStateTransitionHandlerFactory;
 import monasca.persister.pipeline.event.MetricHandler;
 import monasca.persister.pipeline.event.MetricHandlerFactory;
 import monasca.persister.repository.Repo;
+import monasca.persister.repository.cassandra.CassandraAlarmRepo;
+import monasca.persister.repository.cassandra.CassandraCluster;
+import monasca.persister.repository.cassandra.CassandraMetricRepo;
 import monasca.persister.repository.influxdb.InfluxV9AlarmRepo;
 import monasca.persister.repository.influxdb.InfluxV9MetricRepo;
 import monasca.persister.repository.influxdb.InfluxV9RepoWriter;
@@ -56,7 +61,7 @@ public class PersisterModule extends AbstractModule {
 
   private static final String VERTICA = "vertica";
   private static final String INFLUXDB = "influxdb";
-
+  private static final String CASSANDRA = "cassandra";
   private static final String INFLUXDB_V9 = "v9";
 
   private final PersisterConfig config;
@@ -168,6 +173,13 @@ public class PersisterModule extends AbstractModule {
       bind(new TypeLiteral<Repo<AlarmStateTransitionedEvent>> () {})
           .to(InfluxV9AlarmRepo.class);
 
+    } else if (config.getDatabaseConfiguration().getDatabaseType().equalsIgnoreCase(CASSANDRA)) {
+      bind(CassandraCluster.class).in(Singleton.class);
+
+      bind(new TypeLiteral<Repo<MetricEnvelope>>() {}).to(CassandraMetricRepo.class);
+
+      bind(new TypeLiteral<Repo<AlarmStateTransitionedEvent>>() {}).to(CassandraAlarmRepo.class);
+
     } else {
 
       System.err.println(
diff --git a/java/src/main/java/monasca/persister/configuration/PersisterConfig.java b/java/src/main/java/monasca/persister/configuration/PersisterConfig.java
index 3abf8ab86a1b..b8d09a6828ec 100644
--- a/java/src/main/java/monasca/persister/configuration/PersisterConfig.java
+++ b/java/src/main/java/monasca/persister/configuration/PersisterConfig.java
@@ -1,6 +1,8 @@
 /*
  * Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
  *
+ * Copyright (c) 2017 SUSE LLC.
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -19,6 +21,7 @@ package monasca.persister.configuration;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 
+import monasca.common.configuration.CassandraDbConfiguration;
 import monasca.common.configuration.DatabaseConfiguration;
 import monasca.common.configuration.InfluxDbConfiguration;
 import io.dropwizard.Configuration;
@@ -97,4 +100,12 @@ public class PersisterConfig extends Configuration {
   public InfluxDbConfiguration getInfluxDBConfiguration() {
     return influxDbConfiguration;
   }
+
+  @Valid
+  @JsonProperty
+  private final CassandraDbConfiguration cassandraDbConfiguration = new CassandraDbConfiguration();
+
+  public CassandraDbConfiguration getCassandraDbConfiguration() {
+    return cassandraDbConfiguration;
+  }
 }
diff --git a/java/src/main/java/monasca/persister/configuration/PipelineConfig.java b/java/src/main/java/monasca/persister/configuration/PipelineConfig.java
index c6adef159280..b5f95016b737 100644
--- a/java/src/main/java/monasca/persister/configuration/PipelineConfig.java
+++ b/java/src/main/java/monasca/persister/configuration/PipelineConfig.java
@@ -42,6 +42,17 @@ public class PipelineConfig {
   @JsonProperty
   Integer maxBatchTime;
 
+  @JsonProperty
+  Integer commitBatchTime;
+
+  public Integer getCommitBatchTime() {
+    return commitBatchTime;
+  }
+
+  public void setCommitBatchTime(Integer commitBatchTime) {
+    this.commitBatchTime = commitBatchTime;
+  }
+
   public String getTopic() {
     return topic;
   }
diff --git a/java/src/main/java/monasca/persister/consumer/KafkaChannel.java b/java/src/main/java/monasca/persister/consumer/KafkaChannel.java
index 1c2081107f39..9c863f3d3e17 100644
--- a/java/src/main/java/monasca/persister/consumer/KafkaChannel.java
+++ b/java/src/main/java/monasca/persister/consumer/KafkaChannel.java
@@ -1,6 +1,8 @@
 /*
  * Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
  *
+ * Copyright (c) 2017 SUSE LLC
+ * 
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -44,33 +46,50 @@ public class KafkaChannel {
   private final String topic;
   private final ConsumerConnector consumerConnector;
   private final String threadId;
+  private final int commitBatchtimeInMills;
+  private long nextCommitTime;
+  private boolean commitDirty = false;
 
   @Inject
-  public KafkaChannel(
-      PersisterConfig configuration,
-      @Assisted PipelineConfig pipelineConfig,
+  public KafkaChannel(PersisterConfig configuration, @Assisted PipelineConfig pipelineConfig,
       @Assisted String threadId) {
 
     this.topic = pipelineConfig.getTopic();
     this.threadId = threadId;
-    Properties kafkaProperties =
-        createKafkaProperties(configuration.getKafkaConfig(), pipelineConfig);
+    this.commitBatchtimeInMills = pipelineConfig.getCommitBatchTime();
+    nextCommitTime = System.currentTimeMillis() + commitBatchtimeInMills;
+    Properties kafkaProperties = createKafkaProperties(configuration.getKafkaConfig(), pipelineConfig);
     consumerConnector = Consumer.createJavaConsumerConnector(createConsumerConfig(kafkaProperties));
   }
 
   public final void markRead() {
-    this.consumerConnector.commitOffsets();
+    if (commitBatchtimeInMills <= 0) {
+      consumerConnector.commitOffsets();
+    } else if (nextCommitTime <= System.currentTimeMillis()) {
+      consumerConnector.commitOffsets();
+      nextCommitTime = System.currentTimeMillis() + commitBatchtimeInMills;
+      commitDirty = false;
+    } else {
+      commitDirty = true;
+    }
+  }
+
+  public final void markReadIfDirty() {
+    if (commitDirty) {
+      this.consumerConnector.commitOffsets();
+      commitDirty = false;
+    }
   }
 
   public KafkaStream<byte[], byte[]> getKafkaStream() {
     final Map<String, Integer> topicCountMap = new HashMap<>();
     topicCountMap.put(this.topic, 1);
-    Map<String, List<KafkaStream<byte[], byte[]>>> streamMap =
-        this.consumerConnector.createMessageStreams(topicCountMap);
+    Map<String, List<KafkaStream<byte[], byte[]>>> streamMap = this.consumerConnector
+        .createMessageStreams(topicCountMap);
     List<KafkaStream<byte[], byte[]>> streams = streamMap.values().iterator().next();
     if (streams.size() != 1) {
-      throw new IllegalStateException(String.format(
-          "Expected only one stream but instead there are %d", streams.size()));
+      throw new IllegalStateException(
+          String.format("Expected only one stream but instead there are %d", streams.size()));
     }
     return streams.get(0);
   }
@@ -92,32 +111,28 @@ public class KafkaChannel {
     properties.put("consumer.id",
         String.format("%s_%s", pipelineConfig.getConsumerId(), this.threadId));
     properties.put("socket.timeout.ms", kafkaConfig.getSocketTimeoutMs().toString());
-    properties.put("socket.receive.buffer.bytes", kafkaConfig.getSocketReceiveBufferBytes()
-        .toString());
-    properties.put("fetch.message.max.bytes", kafkaConfig.getFetchMessageMaxBytes()
-        .toString());
+    properties.put("socket.receive.buffer.bytes", kafkaConfig.getSocketReceiveBufferBytes().toString());
+    properties.put("fetch.message.max.bytes", kafkaConfig.getFetchMessageMaxBytes().toString());
     // Set auto commit to false because the persister is going to explicitly commit
     properties.put("auto.commit.enable", "false");
-    properties.put("queued.max.message.chunks", kafkaConfig.getQueuedMaxMessageChunks()
-        .toString());
+    properties.put("queued.max.message.chunks", kafkaConfig.getQueuedMaxMessageChunks().toString());
     properties.put("rebalance.max.retries", kafkaConfig.getRebalanceMaxRetries().toString());
     properties.put("fetch.min.bytes", kafkaConfig.getFetchMinBytes().toString());
     properties.put("fetch.wait.max.ms", kafkaConfig.getFetchWaitMaxMs().toString());
     properties.put("rebalance.backoff.ms", kafkaConfig.getRebalanceBackoffMs().toString());
-    properties.put("refresh.leader.backoff.ms", kafkaConfig.getRefreshLeaderBackoffMs()
-        .toString());
+    properties.put("refresh.leader.backoff.ms", kafkaConfig.getRefreshLeaderBackoffMs().toString());
     properties.put("auto.offset.reset", kafkaConfig.getAutoOffsetReset());
     properties.put("consumer.timeout.ms", kafkaConfig.getConsumerTimeoutMs().toString());
     properties.put("client.id", String.format("%s_%s", pipelineConfig.getClientId(), threadId));
-    properties.put("zookeeper.session.timeout.ms", kafkaConfig
-        .getZookeeperSessionTimeoutMs().toString());
-    properties.put("zookeeper.connection.timeout.ms", kafkaConfig
-        .getZookeeperConnectionTimeoutMs().toString());
-    properties
-        .put("zookeeper.sync.time.ms", kafkaConfig.getZookeeperSyncTimeMs().toString());
+    properties.put("zookeeper.session.timeout.ms",
+        kafkaConfig.getZookeeperSessionTimeoutMs().toString());
+    properties.put("zookeeper.connection.timeout.ms",
+        kafkaConfig.getZookeeperConnectionTimeoutMs().toString());
+    properties.put("zookeeper.sync.time.ms", kafkaConfig.getZookeeperSyncTimeMs().toString());
 
     for (String key : properties.stringPropertyNames()) {
-      logger.info("[{}]: " + KAFKA_CONFIGURATION + " " + key + " = " + properties.getProperty(key), threadId);
+      logger.info("[{}]: " + KAFKA_CONFIGURATION + " " + key + " = " + properties.getProperty(key),
+          threadId);
     }
 
     return properties;
diff --git a/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java b/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java
index 834a87ebb5f5..c5bb5c2a0c85 100644
--- a/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java
+++ b/java/src/main/java/monasca/persister/consumer/KafkaConsumerRunnableBasic.java
@@ -1,6 +1,8 @@
 /*
  * Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
  *
+ * Copyright (c) 2017 SUSE LLC.
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -38,14 +40,13 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
   private final String threadId;
   private final ManagedPipeline<T> pipeline;
   private volatile boolean stop = false;
+  private boolean active = false;
 
   private ExecutorService executorService;
 
   @Inject
-  public KafkaConsumerRunnableBasic(
-      @Assisted KafkaChannel kafkaChannel,
-      @Assisted ManagedPipeline<T> pipeline,
-      @Assisted String threadId) {
+  public KafkaConsumerRunnableBasic(@Assisted KafkaChannel kafkaChannel,
+      @Assisted ManagedPipeline<T> pipeline, @Assisted String threadId) {
 
     this.kafkaChannel = kafkaChannel;
     this.pipeline = pipeline;
@@ -67,8 +68,9 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
   }
 
   private void markRead() {
-
-    logger.debug("[{}]: marking read", this.threadId);
+    if (logger.isDebugEnabled()) {
+      logger.debug("[{}]: marking read", this.threadId);
+    }
 
     this.kafkaChannel.markRead();
 
@@ -80,12 +82,30 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
 
     this.stop = true;
 
+    int count = 0;
+    while (active) {
+      if (count++ >= 20) {
+        break;
+      }
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+        logger.error("interrupted while waiting for the run loop to stop", e);
+        break;
+      }
+    }
+
+    if (!active) {
+      this.kafkaChannel.markReadIfDirty();
+    }
   }
 
   public void run() {
 
     logger.info("[{}]: run", this.threadId);
 
+    active = true;
+
     final ConsumerIterator<byte[], byte[]> it = kafkaChannel.getKafkaStream().iterator();
 
     logger.debug("[{}]: KafkaChannel has stream iterator", this.threadId);
@@ -121,7 +141,9 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
 
             final String msg = new String(it.next().message());
 
-            logger.debug("[{}]: {}", this.threadId, msg);
+            if (logger.isDebugEnabled()) {
+              logger.debug("[{}]: {}", this.threadId, msg);
+            }
 
             publishEvent(msg);
 
@@ -149,22 +171,24 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
 
       } catch (Throwable e) {
 
-        logger.error(
-            "[{}]: caught fatal exception while publishing msg. Shutting entire persister down "
-            + "now!", this.threadId, e);
+        logger
+            .error("[{}]: caught fatal exception while publishing msg. Shutting entire persister down "
+                + "now!", this.threadId, e);
 
-          logger.error("[{}]: calling shutdown on executor service", this.threadId);
-          this.executorService.shutdownNow();
+        logger.error("[{}]: calling shutdown on executor service", this.threadId);
+        this.executorService.shutdownNow();
 
-          logger.error("[{}]: shutting down system. calling system.exit(1)", this.threadId);
-          System.exit(1);
+        logger.error("[{}]: shutting down system. calling system.exit(1)", this.threadId);
+        System.exit(1);
 
-        }
+      }
 
     }
 
     logger.info("[{}]: calling stop on kafka channel", this.threadId);
 
+    active = false;
+
     this.kafkaChannel.stop();
 
     logger.debug("[{}]: exiting main run loop", this.threadId);
@@ -183,9 +207,10 @@ public class KafkaConsumerRunnableBasic<T> implements Runnable {
 
   private boolean isInterrupted() {
 
-    if (Thread.currentThread().interrupted()) {
-
-      logger.debug("[{}]: is interrupted. breaking out of run loop", this.threadId);
+    if (Thread.interrupted()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("[{}]: is interrupted. breaking out of run loop", this.threadId);
+      }
 
       return true;
 
diff --git a/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java b/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java
index 91c4b97dc5c0..f7697607130c 100644
--- a/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java
+++ b/java/src/main/java/monasca/persister/pipeline/event/FlushableHandler.java
@@ -1,6 +1,8 @@
 /*
  * Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
  *
+ * Copyright (c) 2017 SUSE LLC
+ * 
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -51,28 +53,18 @@ public abstract class FlushableHandler<T> {
 
   protected final String handlerName;
 
-  protected FlushableHandler(
-      PipelineConfig configuration,
-      Environment environment,
-      String threadId,
+  protected FlushableHandler(PipelineConfig configuration, Environment environment, String threadId,
       int batchSize) {
 
     this.threadId = threadId;
 
-    this.handlerName =
-        String.format(
-            "%s[%s]",
-            this.getClass().getName(),
-            threadId);
+    this.handlerName = String.format("%s[%s]", this.getClass().getName(), threadId);
 
-    this.processedMeter =
-        environment.metrics().meter(handlerName + "." + "events-processed-meter");
+    this.processedMeter = environment.metrics().meter(handlerName + "." + "events-processed-meter");
 
-    this.flushMeter =
-        environment.metrics().meter(handlerName + "." + "flush-meter");
+    this.flushMeter = environment.metrics().meter(handlerName + "." + "flush-meter");
 
-    this.flushTimer =
-        environment.metrics().timer(handlerName + "." + "flush-timer");
+    this.flushTimer = environment.metrics().timer(handlerName + "." + "flush-timer");
 
     this.secondsBetweenFlushes = configuration.getMaxBatchTime();
 
@@ -102,7 +94,7 @@ public abstract class FlushableHandler<T> {
 
       } else {
 
-         return false;
+        return false;
 
       }
     }
@@ -126,20 +118,26 @@ public abstract class FlushableHandler<T> {
 
   private boolean isBatchSize() {
 
-    logger.debug("[{}]: checking batch size", this.threadId);
+    if (logger.isDebugEnabled()) {
+
+      logger.debug("[{}]: checking batch size", this.threadId);
+
+    }
 
     if (this.msgCount >= this.batchSize) {
 
-      logger.debug("[{}]: batch sized {} attained", this.threadId, this.batchSize);
+      if (logger.isDebugEnabled()) {
+        logger.debug("[{}]: batch sized {} attained", this.threadId, this.batchSize);
+      }
 
       return true;
 
     } else {
 
-      logger.debug("[{}]: batch size now at {}, batch size {} not attained",
-                   this.threadId,
-                   this.msgCount,
-                   this.batchSize);
+      if (logger.isDebugEnabled()) {
+        logger.debug("[{}]: batch size now at {}, batch size {} not attained", this.threadId,
+            this.msgCount, this.batchSize);
+      }
 
       return false;
 
@@ -147,28 +145,26 @@ public abstract class FlushableHandler<T> {
   }
 
   private boolean isFlushTime() {
-
-    logger.debug("[{}]: got heartbeat message, checking flush time. flush every {} seconds.",
-                 this.threadId,
-                 this.secondsBetweenFlushes);
+    if (logger.isDebugEnabled()) {
+      logger.debug("[{}]: got heartbeat message, checking flush time. flush every {} seconds.",
+          this.threadId, this.secondsBetweenFlushes);
+    }
 
     long now = System.currentTimeMillis();
 
-    if (this.flushTimeMillis <= now ) {
-
-      logger.debug(
-          "[{}]: {} ms past flush time. flushing to repository now.",
-          this.threadId,
-          now - this.flushTimeMillis);
+    if (this.flushTimeMillis <= now) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("[{}]: {} ms past flush time. flushing to repository now.", this.threadId,
+            now - this.flushTimeMillis);
+      }
 
       return true;
 
     } else {
-
-      logger.debug(
-          "[{}]: {} ms to next flush time. no need to flush at this time.",
-          this.threadId,
-          this.flushTimeMillis - now);
+      if (logger.isDebugEnabled()) {
+        logger.debug("[{}]: {} ms to next flush time. no need to flush at this time.", this.threadId,
+            this.flushTimeMillis - now);
+      }
 
       return false;
 
@@ -176,8 +172,9 @@ public abstract class FlushableHandler<T> {
   }
 
   public int flush() throws RepoException {
-
-    logger.debug("[{}]: flushing", this.threadId);
+    if (logger.isDebugEnabled()) {
+      logger.debug("[{}]: flushing", this.threadId);
+    }
 
     Timer.Context context = this.flushTimer.time();
 
@@ -185,13 +182,15 @@ public abstract class FlushableHandler<T> {
 
     context.stop();
 
-    this.flushMeter.mark();
+    this.flushMeter.mark(msgFlushCnt);
 
     this.flushTimeMillis = System.currentTimeMillis() + this.millisBetweenFlushes;
 
-    logger.debug("[{}]: flushed {} msg", this.threadId, msgFlushCnt);
+    if (logger.isDebugEnabled()) {
+      logger.debug("[{}]: flushed {} msg", this.threadId, msgFlushCnt);
+    }
 
-    this.msgCount = 0;
+    this.msgCount -= msgFlushCnt;
 
     this.batchCount++;
 
diff --git a/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java b/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java
index 76ae2697fd23..75fe2f52e679 100644
--- a/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java
+++ b/java/src/main/java/monasca/persister/pipeline/event/MetricHandler.java
@@ -1,6 +1,8 @@
 /*
  * Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
- *
+ * 
+ * Copyright (c) 2017 SUSE LLC
+ * 
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -38,28 +40,23 @@ import monasca.persister.repository.RepoException;
 
 public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
 
-  private static final Logger logger =
-      LoggerFactory.getLogger(MetricHandler.class);
+  private static final Logger logger = LoggerFactory.getLogger(MetricHandler.class);
 
   private final Repo<MetricEnvelope> metricRepo;
 
   private final Counter metricCounter;
 
   @Inject
-  public MetricHandler(
-      Repo<MetricEnvelope> metricRepo,
-      Environment environment,
-      @Assisted PipelineConfig configuration,
-      @Assisted("threadId") String threadId,
+  public MetricHandler(Repo<MetricEnvelope> metricRepo, Environment environment,
+      @Assisted PipelineConfig configuration, @Assisted("threadId") String threadId,
       @Assisted("batchSize") int batchSize) {
 
     super(configuration, environment, threadId, batchSize);
 
     this.metricRepo = metricRepo;
 
-    this.metricCounter =
-        environment.metrics()
-            .counter(this.handlerName + "." + "metrics-added-to-batch-counter");
+    this.metricCounter = environment.metrics()
+        .counter(this.handlerName + "." + "metrics-added-to-batch-counter");
 
   }
 
@@ -89,12 +86,10 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
   }
 
   private void processEnvelope(MetricEnvelope metricEnvelope) {
-
-    logger.debug("[{}]: [{}:{}] {}",
-                 this.threadId,
-                 this.getBatchCount(),
-                 this.getMsgCount(),
-                 metricEnvelope);
+    if (logger.isDebugEnabled()) {
+      logger.debug("[{}]: [{}:{}] {}", this.threadId, this.getBatchCount(), this.getMsgCount(),
+          metricEnvelope);
+    }
 
     this.metricRepo.addToBatch(metricEnvelope, this.threadId);
 
@@ -109,8 +104,8 @@ public class MetricHandler extends FlushableHandler<MetricEnvelope[]> {
 
     this.objectMapper.enable(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY);
 
-    this.objectMapper.setPropertyNamingStrategy(
-        PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
+    this.objectMapper
+        .setPropertyNamingStrategy(PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
 
   }
 
diff --git a/java/src/main/java/monasca/persister/repository/vertica/Sha1HashId.java b/java/src/main/java/monasca/persister/repository/Sha1HashId.java
similarity index 76%
rename from java/src/main/java/monasca/persister/repository/vertica/Sha1HashId.java
rename to java/src/main/java/monasca/persister/repository/Sha1HashId.java
index 899b073b6868..74c8ecbae156 100644
--- a/java/src/main/java/monasca/persister/repository/vertica/Sha1HashId.java
+++ b/java/src/main/java/monasca/persister/repository/Sha1HashId.java
@@ -1,63 +1,73 @@
-/*
- * Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package monasca.persister.repository.vertica;
-
-import org.apache.commons.codec.binary.Hex;
-
-import java.util.Arrays;
-
-public class Sha1HashId {
-  private final byte[] sha1Hash;
-
-  public Sha1HashId(byte[] sha1Hash) {
-    this.sha1Hash = sha1Hash;
-  }
-
-  @Override
-  public String toString() {
-    return "Sha1HashId{" + "sha1Hash=" + Hex.encodeHexString(sha1Hash) + "}";
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o)
-      return true;
-    if (!(o instanceof Sha1HashId))
-      return false;
-
-    Sha1HashId that = (Sha1HashId) o;
-
-    if (!Arrays.equals(sha1Hash, that.sha1Hash))
-      return false;
-
-    return true;
-  }
-
-  @Override
-  public int hashCode() {
-    return Arrays.hashCode(sha1Hash);
-  }
-
-  public byte[] getSha1Hash() {
-    return sha1Hash;
-  }
-
-  public String toHexString() {
-    return Hex.encodeHexString(sha1Hash);
-  }
-}
+/*
+ * Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
+ *
+ * Copyright (c) 2017 SUSE LLC.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package monasca.persister.repository;
+
+import org.apache.commons.codec.binary.Hex;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+public class Sha1HashId {
+  private final byte[] sha1Hash;
+
+  private final String hex;
+
+  public Sha1HashId(byte[] sha1Hash) {
+    this.sha1Hash = sha1Hash;
+    hex = Hex.encodeHexString(sha1Hash);
+  }
+
+  @Override
+  public String toString() {
+    return "Sha1HashId{" + "sha1Hash=" + hex + "}";
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o)
+      return true;
+    if (!(o instanceof Sha1HashId))
+      return false;
+
+    Sha1HashId that = (Sha1HashId) o;
+
+    if (!Arrays.equals(sha1Hash, that.sha1Hash))
+      return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(sha1Hash);
+  }
+
+  public byte[] getSha1Hash() {
+    return sha1Hash;
+  }
+
+  public ByteBuffer getSha1HashByteBuffer() {
+    return ByteBuffer.wrap(sha1Hash);
+  }
+
+  public String toHexString() {
+    return hex;
+  }
+}
diff --git a/java/src/main/java/monasca/persister/repository/cassandra/CassandraAlarmRepo.java b/java/src/main/java/monasca/persister/repository/cassandra/CassandraAlarmRepo.java
new file mode 100644
index 000000000000..96323099e13e
--- /dev/null
+++ b/java/src/main/java/monasca/persister/repository/cassandra/CassandraAlarmRepo.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright (c) 2017 SUSE LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package monasca.persister.repository.cassandra;
+
+import java.security.NoSuchAlgorithmException;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+
+import javax.inject.Inject;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.PropertyNamingStrategy;
+
+import io.dropwizard.setup.Environment;
+import monasca.common.model.event.AlarmStateTransitionedEvent;
+import monasca.persister.configuration.PersisterConfig;
+import monasca.persister.repository.Repo;
+import monasca.persister.repository.RepoException;
+
+/**
+ * This class is not thread safe.
+ *
+ */
+public class CassandraAlarmRepo extends CassandraRepo implements Repo<AlarmStateTransitionedEvent> {
+
+  private static final Logger logger = LoggerFactory.getLogger(CassandraAlarmRepo.class);
+
+  private static String EMPTY_REASON_DATA = "{}";
+
+  private static final int MAX_BYTES_PER_CHAR = 4;
+  private static final int MAX_LENGTH_VARCHAR = 65000;
+
+  private int retention;
+
+  private ObjectMapper objectMapper = new ObjectMapper();
+
+  @Inject
+  public CassandraAlarmRepo(CassandraCluster cluster, PersisterConfig config, Environment environment)
+      throws NoSuchAlgorithmException, SQLException {
+    super(cluster, environment, config.getCassandraDbConfiguration().getMaxWriteRetries(),
+        config.getAlarmHistoryConfiguration().getBatchSize());
+
+    this.retention = config.getCassandraDbConfiguration().getRetentionPolicy() * 24 * 3600;
+
+    logger.debug("Instantiating " + this.getClass().getName());
+
+    this.objectMapper
+        .setPropertyNamingStrategy(PropertyNamingStrategy.CAMEL_CASE_TO_LOWER_CASE_WITH_UNDERSCORES);
+
+    session = cluster.getAlarmsSession();
+
+    logger.debug(this.getClass().getName() + " is fully instantiated");
+
+  }
+
+  public void addToBatch(AlarmStateTransitionedEvent message, String id) {
+
+    String metricsString = getSerializedString(message.metrics, id);
+
+    // Validate metricsString does not exceed a sufficient maximum upper bound
+    if (metricsString.length() * MAX_BYTES_PER_CHAR >= MAX_LENGTH_VARCHAR) {
+      metricsString = "[]";
+      logger.warn("length of metricsString for alarm ID {} exceeds max length of {}", message.alarmId,
+          MAX_LENGTH_VARCHAR);
+    }
+
+    String subAlarmsString = getSerializedString(message.subAlarms, id);
+
+    if (subAlarmsString.length() * MAX_BYTES_PER_CHAR >= MAX_LENGTH_VARCHAR) {
+      subAlarmsString = "[]";
+      logger.warn("length of subAlarmsString for alarm ID {} exceeds max length of {}", message.alarmId,
+          MAX_LENGTH_VARCHAR);
+    }
+
+    queue.offerLast(cluster.getAlarmHistoryInsertStmt().bind(retention, metricsString, message.oldState.name(),
+        message.newState.name(), subAlarmsString, message.stateChangeReason, EMPTY_REASON_DATA,
+        message.tenantId, message.alarmId, new Timestamp(message.timestamp)));
+  }
+
+  private String getSerializedString(Object o, String id) {
+
+    try {
+      return this.objectMapper.writeValueAsString(o);
+    } catch (JsonProcessingException e) {
+      logger.error("[[}]: failed to serialize object {}", id, o, e);
+      return "";
+    }
+  }
+
+  @Override
+  public int flush(String id) throws RepoException {
+    return handleFlush(id);
+  }
+}
diff --git a/java/src/main/java/monasca/persister/repository/cassandra/CassandraCluster.java b/java/src/main/java/monasca/persister/repository/cassandra/CassandraCluster.java
new file mode 100644
index 000000000000..ac20b098db63
--- /dev/null
+++ b/java/src/main/java/monasca/persister/repository/cassandra/CassandraCluster.java
@@ -0,0 +1,427 @@
+/*
+ * Copyright (c) 2017 SUSE LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package monasca.persister.repository.cassandra;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Cluster.Builder;
+import com.datastax.driver.core.CodecRegistry;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PlainTextAuthProvider;
+import com.datastax.driver.core.PoolingOptions;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ProtocolOptions;
+import com.datastax.driver.core.QueryOptions;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SocketOptions;
+import com.datastax.driver.core.TokenRange;
+import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy;
+import com.datastax.driver.core.policies.TokenAwarePolicy;
+import com.datastax.driver.core.utils.Bytes;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.inject.Inject;
+
+import monasca.common.configuration.CassandraDbConfiguration;
+import monasca.persister.configuration.PersisterConfig;
+
+public class CassandraCluster {
+
+  private static final Logger logger = LoggerFactory.getLogger(CassandraCluster.class);
+
+  private static final String MEASUREMENT_INSERT_CQL = "update monasca.measurements USING TTL ? "
+      + "set value = ?, value_meta = ?, region = ?, tenant_id = ?, metric_name = ?, dimensions = ? "
+      + "where metric_id = ? and time_stamp = ?";
+
+  private static final String MEASUREMENT_UPDATE_CQL = "update monasca.measurements USING TTL ? "
+      + "set value = ?, value_meta = ? " + "where metric_id = ? and time_stamp = ?";
+
+  private static final String METRICS_INSERT_CQL = "update monasca.metrics USING TTL ? "
+      + "set metric_id = ?, created_at = ?, updated_at = ? "
+      + "where region = ? and tenant_id = ? and metric_name = ? and dimensions = ? and dimension_names = ?";
+
+  private static final String METRICS_UPDATE_CQL = "update monasca.metrics USING TTL ? "
+      + "set updated_at = ? "
+      + "where region = ? and tenant_id = ? and metric_name = ? and dimensions = ? and dimension_names = ?";
+
+  private static final String DIMENSION_INSERT_CQL = "insert into  monasca.dimensions "
+      + "(region, tenant_id, name, value) values (?, ?, ?, ?)";
+
+  private static final String DIMENSION_METRIC_INSERT_CQL = "insert into monasca.dimensions_metrics "
+      + " (region, tenant_id, dimension_name, dimension_value, metric_name) values (?, ?, ?, ?, ?)";
+
+  private static final String METRIC_DIMENSION_INSERT_CQL = "insert into monasca.metrics_dimensions "
+      + " (region, tenant_id, metric_name, dimension_name, dimension_value) values (?, ?, ?, ?, ?)";
+
+  private static final String INSERT_ALARM_STATE_HISTORY_SQL = "update monasca.alarm_state_history USING TTL ? "
+      + " set metric = ?, old_state = ?, new_state = ?, sub_alarms = ?, reason = ?, reason_data = ?"
+      + " where tenant_id = ? and alarm_id = ? and time_stamp = ?";
+
+  private static final String RETRIEVE_METRIC_DIMENSION_CQL = "select region, tenant_id, metric_name, "
+      + "dimension_name, dimension_value from metrics_dimensions "
+      + "WHERE token(region, tenant_id, metric_name) > ? and token(region, tenant_id, metric_name) <= ? ";
+
+  private static final String RETRIEVE_METRIC_ID_CQL = "select distinct metric_id from measurements WHERE token(metric_id) > ? and token(metric_id) <= ?";
+
+  private static final String RETRIEVE_DIMENSION_CQL = "select region, tenant_id, name, value from dimensions";
+
+  private static final String NAME = "name";
+  private static final String VALUE = "value";
+  private static final String METRIC_ID = "metric_id";
+  private static final String TENANT_ID_COLUMN = "tenant_id";
+  private static final String METRIC_NAME = "metric_name";
+  private static final String DIMENSION_NAME = "dimension_name";
+  private static final String DIMENSION_VALUE = "dimension_value";
+  private static final String REGION = "region";
+
+  private CassandraDbConfiguration dbConfig;
+  private Cluster cluster;
+  private Session metricsSession;
+  private Session alarmsSession;
+
+  private TokenAwarePolicy lbPolicy;
+
+  private PreparedStatement measurementInsertStmt;
+  private PreparedStatement measurementUpdateStmt;
+  private PreparedStatement metricInsertStmt;
+  private PreparedStatement metricUpdateStmt;
+  private PreparedStatement dimensionStmt;
+  private PreparedStatement dimensionMetricStmt;
+  private PreparedStatement metricDimensionStmt;
+
+  private PreparedStatement retrieveMetricDimensionStmt;
+  private PreparedStatement retrieveMetricIdStmt;
+
+  private PreparedStatement alarmHistoryInsertStmt;
+
+  public Cache<String, Boolean> getMetricIdCache() {
+    return metricIdCache;
+  }
+
+  public Cache<String, Boolean> getDimensionCache() {
+    return dimensionCache;
+  }
+
+  public Cache<String, Boolean> getMetricDimensionCache() {
+    return metricDimensionCache;
+  }
+
+  private final Cache<String, Boolean> metricIdCache;
+
+  private final Cache<String, Boolean> dimensionCache;
+
+  private final Cache<String, Boolean> metricDimensionCache;
+
+  @Inject
+  public CassandraCluster(final PersisterConfig config) {
+
+    this.dbConfig = config.getCassandraDbConfiguration();
+
+    QueryOptions qo = new QueryOptions();
+    qo.setConsistencyLevel(ConsistencyLevel.valueOf(dbConfig.getConsistencyLevel()));
+    qo.setDefaultIdempotence(true);
+
+    String[] contactPoints = dbConfig.getContactPoints();
+    int retries = dbConfig.getMaxWriteRetries();
+    Builder builder = Cluster.builder().addContactPoints(contactPoints).withPort(dbConfig.getPort());
+    builder
+        .withSocketOptions(new SocketOptions().setConnectTimeoutMillis(dbConfig.getConnectionTimeout())
+            .setReadTimeoutMillis(dbConfig.getReadTimeout()));
+    builder.withQueryOptions(qo).withRetryPolicy(new MonascaRetryPolicy(retries, retries, retries));
+
+    lbPolicy = new TokenAwarePolicy(
+        DCAwareRoundRobinPolicy.builder().withLocalDc(dbConfig.getLocalDataCenter()).build());
+    builder.withLoadBalancingPolicy(lbPolicy);
+
+    String user = dbConfig.getUser();
+    if (user != null && !user.isEmpty()) {
+      builder.withAuthProvider(new PlainTextAuthProvider(dbConfig.getUser(), dbConfig.getPassword()));
+    }
+    cluster = builder.build();
+
+    PoolingOptions poolingOptions = cluster.getConfiguration().getPoolingOptions();
+
+    poolingOptions.setConnectionsPerHost(HostDistance.LOCAL, dbConfig.getMaxConnections(),
+        dbConfig.getMaxConnections()).setConnectionsPerHost(HostDistance.REMOTE,
+            dbConfig.getMaxConnections(), dbConfig.getMaxConnections());
+
+    poolingOptions.setMaxRequestsPerConnection(HostDistance.LOCAL, dbConfig.getMaxRequests())
+        .setMaxRequestsPerConnection(HostDistance.REMOTE, dbConfig.getMaxRequests());
+
+    metricsSession = cluster.connect(dbConfig.getKeySpace());
+
+    measurementInsertStmt = metricsSession.prepare(MEASUREMENT_INSERT_CQL).setIdempotent(true);
+    measurementUpdateStmt = metricsSession.prepare(MEASUREMENT_UPDATE_CQL).setIdempotent(true);
+    metricInsertStmt = metricsSession.prepare(METRICS_INSERT_CQL).setIdempotent(true);
+    metricUpdateStmt = metricsSession.prepare(METRICS_UPDATE_CQL).setIdempotent(true);
+    dimensionStmt = metricsSession.prepare(DIMENSION_INSERT_CQL).setIdempotent(true);
+    dimensionMetricStmt = metricsSession.prepare(DIMENSION_METRIC_INSERT_CQL).setIdempotent(true);
+    metricDimensionStmt = metricsSession.prepare(METRIC_DIMENSION_INSERT_CQL).setIdempotent(true);
+
+    retrieveMetricIdStmt = metricsSession.prepare(RETRIEVE_METRIC_ID_CQL).setIdempotent(true);
+    retrieveMetricDimensionStmt = metricsSession.prepare(RETRIEVE_METRIC_DIMENSION_CQL)
+        .setIdempotent(true);
+
+    alarmsSession = cluster.connect(dbConfig.getKeySpace());
+
+    alarmHistoryInsertStmt = alarmsSession.prepare(INSERT_ALARM_STATE_HISTORY_SQL).setIdempotent(true);
+
+    metricIdCache = CacheBuilder.newBuilder()
+        .maximumSize(config.getCassandraDbConfiguration().getDefinitionMaxCacheSize()).build();
+
+    dimensionCache = CacheBuilder.newBuilder()
+        .maximumSize(config.getCassandraDbConfiguration().getDefinitionMaxCacheSize()).build();
+
+    metricDimensionCache = CacheBuilder.newBuilder()
+        .maximumSize(config.getCassandraDbConfiguration().getDefinitionMaxCacheSize()).build();
+
+    logger.info("loading cached definitions from db");
+
+    ExecutorService executor = Executors.newFixedThreadPool(250);
+
+    //a majority of the ids are for metrics not actively receiving msgs anymore
+    //loadMetricIdCache(executor);
+
+    loadDimensionCache();
+
+    loadMetricDimensionCache(executor);
+
+    executor.shutdown();
+  }
+
+  public Session getMetricsSession() {
+    return metricsSession;
+  }
+
+  public Session getAlarmsSession() {
+    return alarmsSession;
+  }
+
+  public PreparedStatement getMeasurementInsertStmt() {
+    return measurementInsertStmt;
+  }
+
+  public PreparedStatement getMeasurementUpdateStmt() {
+    return measurementUpdateStmt;
+  }
+
+  public PreparedStatement getMetricInsertStmt() {
+    return metricInsertStmt;
+  }
+
+  public PreparedStatement getMetricUpdateStmt() {
+    return metricUpdateStmt;
+  }
+
+  public PreparedStatement getDimensionStmt() {
+    return dimensionStmt;
+  }
+
+  public PreparedStatement getDimensionMetricStmt() {
+    return dimensionMetricStmt;
+  }
+
+  public PreparedStatement getMetricDimensionStmt() {
+    return metricDimensionStmt;
+  }
+
+  public PreparedStatement getAlarmHistoryInsertStmt() {
+    return alarmHistoryInsertStmt;
+  }
+
+  public ProtocolOptions getProtocolOptions() {
+    return cluster.getConfiguration().getProtocolOptions();
+  }
+
+  public CodecRegistry getCodecRegistry() {
+    return cluster.getConfiguration().getCodecRegistry();
+  }
+
+  public Metadata getMetaData() {
+    return cluster.getMetadata();
+  }
+
+  public TokenAwarePolicy getLoadBalancePolicy() {
+    return lbPolicy;
+  }
+
+  private void loadMetricIdCache(ExecutorService executor) {
+    final AtomicInteger tasks = new AtomicInteger(0);
+    logger.info("Found token ranges: " + cluster.getMetadata().getTokenRanges().size());
+    for (TokenRange range : cluster.getMetadata().getTokenRanges()) {
+      List<BoundStatement> queries = rangeQuery(retrieveMetricIdStmt, range);
+      for (BoundStatement query : queries) {
+        tasks.incrementAndGet();
+        logger.info("adding a metric id reading task, total: " + tasks.get());
+
+        ResultSetFuture future = metricsSession.executeAsync(query);
+
+        Futures.addCallback(future, new FutureCallback<ResultSet>() {
+          @Override
+          public void onSuccess(ResultSet result) {
+            for (Row row : result) {
+              String id = Bytes.toHexString(row.getBytes(METRIC_ID));
+              if (id != null) {
+                //remove '0x'
+                metricIdCache.put(id.substring(2), Boolean.TRUE);
+              }
+            }
+
+            tasks.decrementAndGet();
+
+            logger.info("completed a metric id read task. Remaining tasks: " + tasks.get());
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+            logger.error("Failed to execute query to load metric id cache.", t);
+
+            tasks.decrementAndGet();
+
+            logger.info("Failed a metric id read task. Remaining tasks: " + tasks.get());
+          }
+        }, executor);
+
+      }
+    }
+
+    while (tasks.get() > 0) {
+      logger.debug("waiting for more metric id load tasks: " + tasks.get());
+
+      try {
+        Thread.sleep(3000);
+      } catch (InterruptedException e) {
+        logger.warn("load metric cache was interrupted", e);
+      }
+    }
+
+    logger.info("loaded metric id cache from database: " + metricIdCache.size());
+  }
+
+  private List<BoundStatement> rangeQuery(PreparedStatement rangeStmt, TokenRange range) {
+    List<BoundStatement> res = Lists.newArrayList();
+    for (TokenRange subRange : range.unwrap()) {
+      res.add(rangeStmt.bind(subRange.getStart(), subRange.getEnd()));
+    }
+    return res;
+  }
+
+  private void loadDimensionCache() {
+
+    ResultSet results = metricsSession.execute(RETRIEVE_DIMENSION_CQL);
+
+    for (Row row : results) {
+      String key = getDimnesionEntryKey(row.getString(REGION), row.getString(TENANT_ID_COLUMN),
+          row.getString(NAME), row.getString(VALUE));
+      dimensionCache.put(key, Boolean.TRUE);
+    }
+
+    logger.info("loaded dimension cache from database: " + dimensionCache.size());
+  }
+
+  public String getDimnesionEntryKey(String region, String tenantId, String name, String value) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(region).append('\0');
+    sb.append(tenantId).append('\0');
+    sb.append(name).append('\0');
+    sb.append(value);
+    return sb.toString();
+  }
+
+  private void loadMetricDimensionCache(ExecutorService executor) {
+
+    final AtomicInteger tasks = new AtomicInteger(0);
+
+    for (TokenRange range : cluster.getMetadata().getTokenRanges()) {
+      List<BoundStatement> queries = rangeQuery(retrieveMetricDimensionStmt, range);
+      for (BoundStatement query : queries) {
+        tasks.incrementAndGet();
+
+        logger.info("Adding a metric dimnesion read task, total: " + tasks.get());
+
+        ResultSetFuture future = metricsSession.executeAsync(query);
+
+        Futures.addCallback(future, new FutureCallback<ResultSet>() {
+          @Override
+          public void onSuccess(ResultSet result) {
+            for (Row row : result) {
+              String key = getMetricDimnesionEntryKey(row.getString(REGION),
+                  row.getString(TENANT_ID_COLUMN), row.getString(METRIC_NAME),
+                  row.getString(DIMENSION_NAME), row.getString(DIMENSION_VALUE));
+              metricDimensionCache.put(key, Boolean.TRUE);
+            }
+
+            tasks.decrementAndGet();
+
+            logger.info("Completed a metric dimension read task. Remaining tasks: " + tasks.get());
+          }
+
+          @Override
+          public void onFailure(Throwable t) {
+            logger.error("Failed to execute query to load metric id cache.", t);
+
+            tasks.decrementAndGet();
+
+            logger.info("Failed a metric dimension read task. Remaining tasks: " + tasks.get());
+          }
+        }, executor);
+
+      }
+    }
+
+    while (tasks.get() > 0) {
+
+      logger.debug("waiting for metric dimension cache to load ...");
+
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        logger.warn("load metric dimension cache was interrupted", e);
+      }
+    }
+
+    logger.info("loaded metric dimension cache from database: " + metricDimensionCache.size());
+  }
+
+  public String getMetricDimnesionEntryKey(String region, String tenantId, String metricName,
+      String dimensionName, String dimensionValue) {
+    StringBuilder sb = new StringBuilder();
+    sb.append(region).append('\0');
+    sb.append(tenantId).append('\0');
+    sb.append(metricName).append('\0');
+    sb.append(dimensionName).append('\0');
+    sb.append(dimensionValue);
+    return sb.toString();
+  }
+}
diff --git a/java/src/main/java/monasca/persister/repository/cassandra/CassandraMetricBatch.java b/java/src/main/java/monasca/persister/repository/cassandra/CassandraMetricBatch.java
new file mode 100644
index 000000000000..4a275676cacf
--- /dev/null
+++ b/java/src/main/java/monasca/persister/repository/cassandra/CassandraMetricBatch.java
@@ -0,0 +1,207 @@
+/*
+ * Copyright (c) 2017 SUSE LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package monasca.persister.repository.cassandra;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.BatchStatement.Type;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.CodecRegistry;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.ProtocolOptions;
+import com.datastax.driver.core.Token;
+import com.datastax.driver.core.policies.TokenAwarePolicy;
+
+public class CassandraMetricBatch {
+  private static Logger logger = LoggerFactory.getLogger(CassandraMetricBatch.class);
+
+  ProtocolOptions protocol;
+  CodecRegistry codec;
+  Metadata metadata;
+  TokenAwarePolicy policy;
+  int batchLimit;
+
+  Map<Token, Deque<BatchStatement>> metricQueries;
+  Map<Token, Deque<BatchStatement>> dimensionQueries;
+  Map<Token, Deque<BatchStatement>> dimensionMetricQueries;
+  Map<Token, Deque<BatchStatement>> metricDimensionQueries;
+  Map<Set<Host>, Deque<BatchStatement>> measurementQueries;
+
+  public CassandraMetricBatch(Metadata metadata, ProtocolOptions protocol, CodecRegistry codec,
+      TokenAwarePolicy lbPolicy, int batchLimit) {
+    this.protocol = protocol;
+    this.codec = codec;
+    this.metadata = metadata;
+    this.policy = lbPolicy;
+    metricQueries = new HashMap<>();
+    this.batchLimit = batchLimit;
+
+    metricQueries = new HashMap<>();
+    dimensionQueries = new HashMap<>();
+    dimensionMetricQueries = new HashMap<>();
+    metricDimensionQueries = new HashMap<>();
+    measurementQueries = new HashMap<>();
+  }
+
+  public void addMetricQuery(BoundStatement s) {
+    batchQueryByToken(s, metricQueries);
+  }
+
+  public void addDimensionQuery(BoundStatement s) {
+    batchQueryByToken(s, dimensionQueries);
+  }
+
+  public void addDimensionMetricQuery(BoundStatement s) {
+    batchQueryByToken(s, dimensionMetricQueries);
+  }
+
+  public void addMetricDimensionQuery(BoundStatement s) {
+    batchQueryByToken(s, metricDimensionQueries);
+  }
+
+  public void addMeasurementQuery(BoundStatement s) {
+    batchQueryByReplica(s, measurementQueries);
+  }
+
+  private void batchQueryByToken(BoundStatement s, Map<Token, Deque<BatchStatement>> batchedQueries) {
+    ByteBuffer b = s.getRoutingKey(protocol.getProtocolVersion(), codec);
+    Token token = metadata.newToken(b);
+    Deque<BatchStatement> queue = batchedQueries.get(token);
+    if (queue == null) {
+      queue = new ArrayDeque<BatchStatement>();
+      BatchStatement bs = new BatchStatement(Type.UNLOGGED);
+      bs.add(s);
+      queue.offer(bs);
+      batchedQueries.put(token, queue);
+    } else {
+      BatchStatement bs = queue.getLast();
+      if (bs.size() < batchLimit) {
+        bs.add(s);
+      } else {
+        bs = new BatchStatement(Type.UNLOGGED);
+        bs.add(s);
+        queue.offerLast(bs);
+      }
+    }
+  }
+
+  private void batchQueryByReplica(BoundStatement s,
+      Map<Set<Host>, Deque<BatchStatement>> batchedQueries) {
+    Iterator<Host> it = policy.newQueryPlan(s.getKeyspace(), s);
+    Set<Host> hosts = new HashSet<>();
+
+    while (it.hasNext()) {
+      hosts.add(it.next());
+    }
+
+    Deque<BatchStatement> queue = batchedQueries.get(hosts);
+    if (queue == null) {
+      queue = new ArrayDeque<BatchStatement>();
+      BatchStatement bs = new BatchStatement(Type.UNLOGGED);
+      bs.add(s);
+      queue.offer(bs);
+      batchedQueries.put(hosts, queue);
+    } else {
+      BatchStatement bs = queue.getLast();
+      if (bs.size() < 30) {
+        bs.add(s);
+      } else {
+        bs = new BatchStatement(Type.UNLOGGED);
+        bs.add(s);
+        queue.offerLast(bs);
+      }
+    }
+  }
+
+  public void clear() {
+    metricQueries.clear();
+    dimensionQueries.clear();
+    dimensionMetricQueries.clear();
+    metricDimensionQueries.clear();
+    measurementQueries.clear();
+  }
+
+  public List<Deque<BatchStatement>> getAllBatches() {
+    logTokenBatchMap("metric batches", metricQueries);
+    logTokenBatchMap("dimension batches", dimensionQueries);
+    logTokenBatchMap("dimension metric batches", dimensionMetricQueries);
+    logTokenBatchMap("metric dimension batches", metricDimensionQueries);
+    logReplicaBatchMap("measurement batches", measurementQueries);
+
+    ArrayList<Deque<BatchStatement>> list = new ArrayList<>();
+    list.addAll(metricQueries.values());
+    list.addAll(dimensionQueries.values());
+    list.addAll(dimensionMetricQueries.values());
+    list.addAll(metricDimensionQueries.values());
+    list.addAll(measurementQueries.values());
+    return list;
+  }
+
+  private void logTokenBatchMap(String name, Map<Token, Deque<BatchStatement>> map) {
+    if (logger.isDebugEnabled()) {
+      StringBuilder sb = new StringBuilder(name);
+      sb.append(": Size: ").append(map.size());
+      sb.append(";  Tokens: |");
+      for (Entry<Token, Deque<BatchStatement>> entry : map.entrySet()) {
+        sb.append(entry.getKey().toString()).append(":");
+        for (BatchStatement bs : entry.getValue()) {
+          sb.append(bs.size()).append(",");
+        }
+        sb.append("|.");
+      }
+
+      logger.debug(sb.toString());
+    }
+  }
+
+  private void logReplicaBatchMap(String name, Map<Set<Host>, Deque<BatchStatement>> map) {
+    if (logger.isDebugEnabled()) {
+      StringBuilder sb = new StringBuilder(name);
+      sb.append(": Size: ").append(map.size());
+      sb.append(". Replicas: |");
+      for (Entry<Set<Host>, Deque<BatchStatement>> entry : map.entrySet()) {
+        for (Host host : entry.getKey()) {
+          sb.append(host.getAddress().toString()).append(",");
+        }
+        sb.append(":");
+        for (BatchStatement bs : entry.getValue()) {
+          sb.append(bs.size()).append(",");
+        }
+
+        sb.append("|");
+
+      }
+      logger.debug(sb.toString());
+    }
+  }
+}
diff --git a/java/src/main/java/monasca/persister/repository/cassandra/CassandraMetricRepo.java b/java/src/main/java/monasca/persister/repository/cassandra/CassandraMetricRepo.java
new file mode 100644
index 000000000000..d096e78448b0
--- /dev/null
+++ b/java/src/main/java/monasca/persister/repository/cassandra/CassandraMetricRepo.java
@@ -0,0 +1,336 @@
+/*
+ * Copyright (c) 2017 SUSE LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package monasca.persister.repository.cassandra;
+
+import java.security.NoSuchAlgorithmException;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import javax.inject.Inject;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Meter;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import io.dropwizard.setup.Environment;
+import monasca.common.model.metric.Metric;
+import monasca.common.model.metric.MetricEnvelope;
+import monasca.persister.configuration.PersisterConfig;
+import monasca.persister.repository.Repo;
+import monasca.persister.repository.RepoException;
+import monasca.persister.repository.Sha1HashId;
+
+public class CassandraMetricRepo extends CassandraRepo implements Repo<MetricEnvelope> {
+
+  private static final Logger logger = LoggerFactory.getLogger(CassandraMetricRepo.class);
+
+  public static final int MAX_COLUMN_LENGTH = 255;
+  public static final int MAX_VALUE_META_LENGTH = 2048;
+
+  private static final String TENANT_ID = "tenantId";
+  private static final String REGION = "region";
+  private static final String EMPTY_STR = "";
+
+  private int retention;
+
+  private CassandraMetricBatch batches;
+
+  private int metricCount;
+
+  private final ObjectMapper objectMapper = new ObjectMapper();
+
+  public final Meter measurementMeter;
+  public final Meter metricCacheMissMeter;
+  public final Meter metricCacheHitMeter;
+  public final Meter dimensionCacheMissMeter;
+  public final Meter dimensionCacheHitMeter;
+  public final Meter metricDimensionCacheMissMeter;
+  public final Meter metricDimensionCacheHitMeter;
+
+  @Inject
+  public CassandraMetricRepo(CassandraCluster cluster, PersisterConfig config, Environment environment)
+      throws NoSuchAlgorithmException, SQLException {
+
+    super(cluster, environment, config.getCassandraDbConfiguration().getMaxWriteRetries(),
+        config.getMetricConfiguration().getBatchSize());
+
+    logger.debug("Instantiating " + this.getClass().getName());
+
+    this.retention = config.getCassandraDbConfiguration().getRetentionPolicy() * 24 * 3600;
+
+    this.measurementMeter = this.environment.metrics()
+        .meter(this.getClass().getName() + "." + "measurement-meter");
+
+    this.metricCacheMissMeter = this.environment.metrics()
+        .meter(this.getClass().getName() + "." + "definition-cache-miss-meter");
+
+    this.metricCacheHitMeter = this.environment.metrics()
+        .meter(this.getClass().getName() + "." + "definition-cache-hit-meter");
+
+    this.dimensionCacheMissMeter = this.environment.metrics()
+        .meter(this.getClass().getName() + "." + "dimension-cache-miss-meter");
+
+    this.dimensionCacheHitMeter = this.environment.metrics()
+        .meter(this.getClass().getName() + "." + "dimension-cache-hit-meter");
+
+    this.metricDimensionCacheMissMeter = this.environment.metrics()
+        .meter(this.getClass().getName() + "." + "metric-dimension-cache-miss-meter");
+
+    this.metricDimensionCacheHitMeter = this.environment.metrics()
+        .meter(this.getClass().getName() + "." + "metric-dimension-cache-hit-meter");
+
+    session = cluster.getMetricsSession();
+
+    metricCount = 0;
+
+    batches = new CassandraMetricBatch(cluster.getMetaData(), cluster.getProtocolOptions(),
+        cluster.getCodecRegistry(), cluster.getLoadBalancePolicy(),
+        config.getCassandraDbConfiguration().getMaxBatches());
+
+
+
+    logger.debug(this.getClass().getName() + " is fully instantiated");
+  }
+
+  @Override
+  public void addToBatch(MetricEnvelope metricEnvelope, String id) {
+    Metric metric = metricEnvelope.metric;
+    Map<String, Object> metaMap = metricEnvelope.meta;
+
+    String tenantId = getMeta(TENANT_ID, metric, metaMap, id);
+    String region = getMeta(REGION, metric, metaMap, id);
+    String metricName = metric.getName();
+    TreeMap<String, String> dimensions = metric.getDimensions() == null ? new TreeMap<String, String>()
+        : new TreeMap<>(metric.getDimensions());
+
+    StringBuilder sb = new StringBuilder(region).append(tenantId).append(metricName);
+
+    Iterator<String> it = dimensions.keySet().iterator();
+    while (it.hasNext()) {
+      String k = it.next();
+      sb.append(k).append(dimensions.get(k));
+    }
+
+    byte[] defIdSha = DigestUtils.sha(sb.toString());
+    Sha1HashId defIdShaHash = new Sha1HashId(defIdSha);
+
+    if (cluster.getMetricIdCache().getIfPresent(defIdShaHash.toHexString()) == null) {
+      addDefinitionToBatch(defIdShaHash, metricName, dimensions, tenantId, region, id,
+          metric.getTimestamp());
+      batches.addMeasurementQuery(buildMeasurementInsertQuery(defIdShaHash, metric.getTimestamp(),
+          metric.getValue(), metric.getValueMeta(), region, tenantId, metricName, dimensions, id));
+    } else {
+      metricCacheHitMeter.mark();
+      batches.addMetricQuery(cluster.getMetricUpdateStmt().bind(retention,
+          new Timestamp(metric.getTimestamp()), region, tenantId, metricName,
+          getDimensionList(dimensions), new ArrayList<>(dimensions.keySet())));
+      batches.addMeasurementQuery(buildMeasurementUpdateQuery(defIdShaHash, metric.getTimestamp(),
+          metric.getValue(), metric.getValueMeta(), id));
+    }
+
+    metricCount++;
+  }
+
+  private String getMeta(String name, Metric metric, Map<String, Object> meta, String id) {
+    if (meta.containsKey(name)) {
+      return (String) meta.get(name);
+    } else {
+      logger.warn(
+          "[{}]: failed to find {} in message envelope meta data. metric message may be malformed. "
+              + "setting {} to empty string.",
+          id, name);
+      logger.warn("[{}]: metric: {}", id, metric.toString());
+      logger.warn("[{}]: meta: {}", id, meta.toString());
+      return EMPTY_STR;
+    }
+  }
+
+  private BoundStatement buildMeasurementUpdateQuery(Sha1HashId defId, long timeStamp, double value,
+      Map<String, String> valueMeta, String id) {
+
+    String valueMetaString = getValueMetaString(valueMeta, id);
+    if (logger.isDebugEnabled()) {
+      logger.debug("[{}]: adding metric to batch: metric id: {}, time: {}, value: {}, value meta {}",
+          id, defId.toHexString(), timeStamp, value, valueMetaString);
+    }
+
+    return cluster.getMeasurementUpdateStmt().bind(retention, value, valueMetaString,
+        defId.getSha1HashByteBuffer(), new Timestamp(timeStamp));
+  }
+
+  private BoundStatement buildMeasurementInsertQuery(Sha1HashId defId, long timeStamp, double value,
+      Map<String, String> valueMeta, String region, String tenantId, String metricName,
+      Map<String, String> dimensions, String id) {
+
+    String valueMetaString = getValueMetaString(valueMeta, id);
+    if (logger.isDebugEnabled()) {
+      logger.debug("[{}]: adding metric to batch: metric id: {}, time: {}, value: {}, value meta {}",
+          id, defId.toHexString(), timeStamp, value, valueMetaString);
+    }
+
+    measurementMeter.mark();
+    return cluster.getMeasurementInsertStmt().bind(retention, value, valueMetaString, region, tenantId,
+        metricName, getDimensionList(dimensions), defId.getSha1HashByteBuffer(),
+        new Timestamp(timeStamp));
+  }
+
+  private String getValueMetaString(Map<String, String> valueMeta, String id) {
+
+    String valueMetaString = "";
+
+    if (valueMeta != null && !valueMeta.isEmpty()) {
+
+      try {
+
+        valueMetaString = this.objectMapper.writeValueAsString(valueMeta);
+        if (valueMetaString.length() > MAX_VALUE_META_LENGTH) {
+          logger.error("[{}]: Value meta length {} longer than maximum {}, dropping value meta", id,
+              valueMetaString.length(), MAX_VALUE_META_LENGTH);
+          return "";
+        }
+
+      } catch (JsonProcessingException e) {
+
+        logger.error("[{}]: Failed to serialize value meta {}, dropping value meta from measurement",
+            id, valueMeta);
+      }
+    }
+
+    return valueMetaString;
+  }
+
+  private void addDefinitionToBatch(Sha1HashId defId, String metricName, Map<String, String> dimensions,
+      String tenantId, String region, String id, long timestamp) {
+
+    metricCacheMissMeter.mark();
+    if (logger.isDebugEnabled()) {
+      logger.debug("[{}]: adding definition to batch: defId: {}, name: {}, tenantId: {}, region: {}",
+          id, defId.toHexString(), metricName, tenantId, region);
+    }
+
+    Timestamp ts = new Timestamp(timestamp);
+    batches.addMetricQuery(
+        cluster.getMetricInsertStmt().bind(retention, defId.getSha1HashByteBuffer(), ts, ts, region,
+            tenantId, metricName, getDimensionList(dimensions), new ArrayList<>(dimensions.keySet())));
+
+    for (Map.Entry<String, String> entry : dimensions.entrySet()) {
+      String name = entry.getKey();
+      String value = entry.getValue();
+
+      String dimensionKey = cluster.getDimnesionEntryKey(region, tenantId, name, value);
+      if (cluster.getDimensionCache().getIfPresent(dimensionKey) != null) {
+        dimensionCacheHitMeter.mark();
+
+      } else {
+        dimensionCacheMissMeter.mark();
+        if (logger.isDebugEnabled()) {
+          logger.debug("[{}]: adding dimension to batch: defId: {}, name: {}, value: {}", id,
+              defId.toHexString(), name, value);
+        }
+        batches.addDimensionQuery(cluster.getDimensionStmt().bind(region, tenantId, name, value));
+        cluster.getDimensionCache().put(dimensionKey, Boolean.TRUE);
+      }
+
+      String metricDimensionKey = cluster.getMetricDimnesionEntryKey(region, tenantId, metricName, name, value);
+      if (cluster.getMetricDimensionCache().getIfPresent(metricDimensionKey) != null) {
+        metricDimensionCacheHitMeter.mark();
+      } else {
+        metricDimensionCacheMissMeter.mark();
+        batches.addDimensionMetricQuery(
+            cluster.getDimensionMetricStmt().bind(region, tenantId, name, value, metricName));
+
+        batches.addMetricDimensionQuery(
+            cluster.getMetricDimensionStmt().bind(region, tenantId, metricName, name, value));
+        cluster.getMetricDimensionCache().put(metricDimensionKey, Boolean.TRUE);
+      }
+    }
+
+    String metricId = defId.toHexString();
+    cluster.getMetricIdCache().put(metricId, Boolean.TRUE);
+  }
+
+  public List<String> getDimensionList(Map<String, String> dimensions) {
+    List<String> list = new ArrayList<>(dimensions.size());
+    for (Entry<String, String> dim : dimensions.entrySet()) {
+      list.add(new StringBuffer(dim.getKey()).append('\t').append(dim.getValue()).toString());
+    }
+    return list;
+  }
+
+  @Override
+  public int flush(String id) throws RepoException {
+    long startTime = System.nanoTime();
+    List<ResultSetFuture> results = new ArrayList<>();
+    List<Deque<BatchStatement>> list = batches.getAllBatches();
+    for (Deque<BatchStatement> q : list) {
+      BatchStatement b;
+      while ((b = q.poll()) != null) {
+        results.add(session.executeAsync(b));
+      }
+    }
+
+    List<ListenableFuture<ResultSet>> futures = Futures.inCompletionOrder(results);
+
+    boolean cancel = false;
+    Exception ex = null;
+    for (ListenableFuture<ResultSet> future : futures) {
+      if (cancel) {
+        future.cancel(false);
+        continue;
+      }
+      try {
+        future.get();
+      } catch (InterruptedException | ExecutionException e) {
+        cancel = true;
+        ex = e;
+      }
+    }
+
+    this.commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+
+    if (ex != null) {
+      metricFailed.inc(metricCount);
+      throw new RepoException(ex);
+    }
+
+    batches.clear();
+    int flushCnt = metricCount;
+    metricCount = 0;
+    metricCompleted.inc(flushCnt);
+    return flushCnt;
+  }
+}
diff --git a/java/src/main/java/monasca/persister/repository/cassandra/CassandraRepo.java b/java/src/main/java/monasca/persister/repository/cassandra/CassandraRepo.java
new file mode 100644
index 000000000000..e536d3e5f73a
--- /dev/null
+++ b/java/src/main/java/monasca/persister/repository/cassandra/CassandraRepo.java
@@ -0,0 +1,210 @@
+/*
+ * Copyright (c) 2017 SUSE LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package monasca.persister.repository.cassandra;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Timer;
+import com.datastax.driver.core.BatchStatement;
+import com.datastax.driver.core.BatchStatement.Type;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.ResultSetFuture;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.exceptions.BootstrappingException;
+import com.datastax.driver.core.exceptions.DriverException;
+import com.datastax.driver.core.exceptions.NoHostAvailableException;
+import com.datastax.driver.core.exceptions.OperationTimedOutException;
+import com.datastax.driver.core.exceptions.OverloadedException;
+import com.datastax.driver.core.exceptions.QueryConsistencyException;
+import com.datastax.driver.core.exceptions.UnavailableException;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
+import io.dropwizard.setup.Environment;
+import monasca.persister.repository.RepoException;
+
+public abstract class CassandraRepo {
+  private static Logger logger = LoggerFactory.getLogger(CassandraRepo.class);
+
+  final Environment environment;
+
+  final Timer commitTimer;
+
+  CassandraCluster cluster;
+  Session session;
+
+  int maxWriteRetries;
+
+  int batchSize;
+
+  long lastFlushTimeStamp;
+
+  Deque<Statement> queue;
+
+  Counter metricCompleted;
+
+  Counter metricFailed;
+
+  public CassandraRepo(CassandraCluster cluster, Environment env, int maxWriteRetries, int batchSize) {
+    this.cluster = cluster;
+    this.maxWriteRetries = maxWriteRetries;
+    this.batchSize = batchSize;
+
+    this.environment = env;
+
+    this.commitTimer = this.environment.metrics().timer(getClass().getName() + "." + "commit-timer");
+
+    lastFlushTimeStamp = System.currentTimeMillis();
+
+    queue = new ArrayDeque<>(batchSize);
+
+    this.metricCompleted = environment.metrics()
+        .counter(getClass().getName() + "." + "metrics-persisted-counter");
+
+    this.metricFailed = environment.metrics()
+        .counter(getClass().getName() + "." + "metrics-failed-counter");
+  }
+
+  protected void executeQuery(String id, Statement query, long startTime) throws DriverException {
+    _executeQuery(id, query, startTime, 0);
+  }
+
+  private void _executeQuery(final String id, final Statement query, final long startTime,
+      final int retryCount) {
+    try {
+      session.execute(query);
+
+      commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+
+      // ResultSetFuture future = session.executeAsync(query);
+
+      // Futures.addCallback(future, new FutureCallback<ResultSet>() {
+      // @Override
+      // public void onSuccess(ResultSet result) {
+      // metricCompleted.inc();
+      // commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+      // }
+      //
+      // @Override
+      // public void onFailure(Throwable t) {
+      // if (t instanceof NoHostAvailableException | t instanceof
+      // BootstrappingException
+      // | t instanceof OverloadedException | t instanceof QueryConsistencyException
+      // | t instanceof UnavailableException) {
+      // retryQuery(id, query, startTime, retryCount, (DriverException) t);
+      // } else {
+      // metricFailed.inc();
+      // commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+      // logger.error("Failed to execute query.", t);
+      // }
+      // }
+      // }, MoreExecutors.sameThreadExecutor());
+
+    } catch (NoHostAvailableException | BootstrappingException | OverloadedException
+        | QueryConsistencyException | UnavailableException | OperationTimedOutException e) {
+      retryQuery(id, query, startTime, retryCount, e);
+    } catch (DriverException e) {
+      metricFailed.inc(((BatchStatement) query).size());
+      commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+      throw e;
+    }
+  }
+
+  private void retryQuery(String id, Statement query, final long startTime, int retryCount,
+      DriverException e) throws DriverException {
+    if (retryCount >= maxWriteRetries) {
+      logger.error("[{}]: Query aborted after {} retry: ", id, retryCount, e.getMessage());
+      metricFailed.inc(((BatchStatement) query).size());
+      commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+      throw e;
+    } else {
+      logger.warn("[{}]: Query failed, retrying {} of {}: {} ", id, retryCount, maxWriteRetries,
+          e.getMessage());
+
+      try {
+        Thread.sleep(1000 * (1 << retryCount));
+      } catch (InterruptedException ie) {
+        logger.debug("[{}]: Interrupted: {}", id, ie);
+      }
+      _executeQuery(id, query, startTime, retryCount++);
+    }
+  }
+
+  public int handleFlush_batch(String id) {
+    Statement query;
+    int flushedCount = 0;
+
+    BatchStatement batch = new BatchStatement(Type.UNLOGGED);
+    while ((query = queue.poll()) != null) {
+      flushedCount++;
+      batch.add(query);
+    }
+
+    executeQuery(id, batch, System.nanoTime());
+
+    metricCompleted.inc(flushedCount);
+
+    return flushedCount;
+  }
+
+  public int handleFlush(String id) throws RepoException {
+    long startTime = System.nanoTime();
+
+    int flushedCount = 0;
+    List<ResultSetFuture> results = new ArrayList<>(queue.size());
+    Statement query;
+    while ((query = queue.poll()) != null) {
+      flushedCount++;
+      results.add(session.executeAsync(query));
+    }
+
+    List<ListenableFuture<ResultSet>> futures = Futures.inCompletionOrder(results);
+
+    boolean cancel = false;
+    Exception ex = null;
+    for (ListenableFuture<ResultSet> future : futures) {
+      if (cancel) {
+        future.cancel(false);
+        continue;
+      }
+      try {
+        future.get();
+      } catch (InterruptedException | ExecutionException e) {
+        cancel = true;
+        ex = e;
+      }
+    }
+
+    commitTimer.update(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
+
+    if (ex != null) {
+      throw new RepoException(ex);
+    }
+    return flushedCount;
+  }
+}
diff --git a/java/src/main/java/monasca/persister/repository/cassandra/MonascaRetryPolicy.java b/java/src/main/java/monasca/persister/repository/cassandra/MonascaRetryPolicy.java
new file mode 100644
index 000000000000..f2d884aa25bb
--- /dev/null
+++ b/java/src/main/java/monasca/persister/repository/cassandra/MonascaRetryPolicy.java
@@ -0,0 +1,77 @@
+package monasca.persister.repository.cassandra;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ConsistencyLevel;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.WriteType;
+import com.datastax.driver.core.exceptions.DriverException;
+import com.datastax.driver.core.policies.RetryPolicy;
+
+public class MonascaRetryPolicy implements RetryPolicy {
+
+  private final int readAttempts;
+  private final int writeAttempts;
+  private final int unavailableAttempts;
+
+  public MonascaRetryPolicy(int readAttempts, int writeAttempts, int unavailableAttempts) {
+    super();
+    this.readAttempts = readAttempts;
+    this.writeAttempts = writeAttempts;
+    this.unavailableAttempts = unavailableAttempts;
+  }
+
+  @Override
+  public RetryDecision onReadTimeout(Statement stmnt, ConsistencyLevel cl, int requiredResponses,
+      int receivedResponses, boolean dataReceived, int rTime) {
+    if (dataReceived) {
+      return RetryDecision.ignore();
+    } else if (rTime < readAttempts) {
+      return receivedResponses >= requiredResponses ? RetryDecision.retry(cl)
+          : RetryDecision.rethrow();
+    } else {
+      return RetryDecision.rethrow();
+    }
+
+  }
+
+  @Override
+  public RetryDecision onWriteTimeout(Statement stmnt, ConsistencyLevel cl, WriteType wt,
+      int requiredResponses, int receivedResponses, int wTime) {
+    if (wTime >= writeAttempts)
+      return RetryDecision.rethrow();
+
+    return RetryDecision.retry(cl);
+  }
+
+  @Override
+  public RetryDecision onUnavailable(Statement stmnt, ConsistencyLevel cl, int requiredResponses,
+      int receivedResponses, int uTime) {
+    if (uTime == 0) {
+      return RetryDecision.tryNextHost(cl);
+    } else if (uTime <= unavailableAttempts) {
+      return RetryDecision.retry(cl);
+    } else {
+      return RetryDecision.rethrow();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public RetryDecision onRequestError(Statement statement, ConsistencyLevel cl, DriverException e,
+      int nbRetry) {
+    return RetryDecision.tryNextHost(cl);
+  }
+
+  @Override
+  public void init(Cluster cluster) {
+    // nothing to do
+  }
+
+  @Override
+  public void close() {
+    // nothing to do
+  }
+
+}
diff --git a/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java b/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java
index 52baf7eecbdc..a0c22e4bf7aa 100644
--- a/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java
+++ b/java/src/main/java/monasca/persister/repository/vertica/VerticaMetricRepo.java
@@ -1,6 +1,8 @@
 /*
  * (C) Copyright 2014-2016 Hewlett Packard Enterprise Development LP
  *
+ * (C) Copyright 2017 SUSE LLC.
+ *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
@@ -50,6 +52,7 @@ import monasca.common.model.metric.MetricEnvelope;
 import monasca.persister.configuration.PersisterConfig;
 import monasca.persister.repository.Repo;
 import monasca.persister.repository.RepoException;
+import monasca.persister.repository.Sha1HashId;
 
 public class VerticaMetricRepo extends VerticaRepo implements Repo<MetricEnvelope> {
 
diff --git a/monasca_persister/conf/cassandra.py b/monasca_persister/conf/cassandra.py
index 93cdc325bc1e..09bbe18ae1dc 100644
--- a/monasca_persister/conf/cassandra.py
+++ b/monasca_persister/conf/cassandra.py
@@ -1,6 +1,7 @@
 # (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
 # Copyright 2017 FUJITSU LIMITED
-#
+# (C) Copyright 2017 SUSE LLC
+
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # You may obtain a copy of the License at
@@ -17,13 +18,47 @@
 from oslo_config import cfg
 
 cassandra_opts = [
-    cfg.ListOpt('cluster_ip_addresses',
+    cfg.ListOpt('contact_points',
                 help='Comma separated list of Cassandra node IP addresses',
                 default=['127.0.0.1'],
                 item_type=cfg.IPOpt),
+    cfg.IntOpt('port',
+               help='Cassandra port number',
+               default=8086),
     cfg.StrOpt('keyspace',
-               help='keyspace where metric are stored',
-               default='monasca')]
+               help='Keyspace name where metrics are stored',
+               default='monasca'),
+    cfg.StrOpt('user',
+               help='Cassandra user name',
+               default=''),
+    cfg.StrOpt('password',
+               help='Cassandra password',
+               secret=True,
+               default=''),
+    cfg.IntOpt('connection_timeout',
+               help='Cassandra timeout in seconds when creating a new connection',
+               default=5),
+    cfg.IntOpt('read_timeout',
+               help='Cassandra read timeout in seconds',
+               default=60),
+    cfg.IntOpt('max_write_retries',
+               help='Maximum number of retries in write ops',
+               default=1),
+    cfg.IntOpt('max_definition_cache_size',
+               help='Maximum number of cached metric definition entries in memory',
+               default=20000000),
+    cfg.IntOpt('retention_policy',
+               help='Data retention period in days',
+               default=45),
+    cfg.StrOpt('consistency_level',
+               help='Cassandra default consistency level',
+               default='ONE'),
+    cfg.StrOpt('local_data_center',
+               help='Cassandra local data center name'),
+    cfg.IntOpt('max_batches',
+               help='Maximum batch size in Cassandra',
+               default=250),
+]
 
 cassandra_group = cfg.OptGroup(name='cassandra')
 
diff --git a/monasca_persister/conf/kafka_alarm_history.py b/monasca_persister/conf/kafka_alarm_history.py
index 51ce9845f5a4..e5509e6a114d 100644
--- a/monasca_persister/conf/kafka_alarm_history.py
+++ b/monasca_persister/conf/kafka_alarm_history.py
@@ -1,5 +1,6 @@
 # (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP
 # Copyright 2017 FUJITSU LIMITED
+# (C) Copyright 2017 SUSE LLC
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -37,7 +38,10 @@ kafka_alarm_history_opts = [
                default='alarm-state-transitions'),
     cfg.StrOpt('zookeeper_path',
                help='Path in zookeeper for kafka consumer group partitioning algorithm',
-               default='/persister_partitions/$kafka_alarm_history.topic')
+               default='/persister_partitions/$kafka_alarm_history.topic'),
+    cfg.IntOpt('batch_size',
+               help='Maximum number of alarm state history messages to buffer before writing to database',
+               default=1),
 ]
 
 
diff --git a/monasca_persister/conf/kafka_common.py b/monasca_persister/conf/kafka_common.py
index 0c2cfc7620bc..534d59268a1b 100644
--- a/monasca_persister/conf/kafka_common.py
+++ b/monasca_persister/conf/kafka_common.py
@@ -1,5 +1,6 @@
 # (C) Copyright 2016-2017 Hewlett Packard Enterprise Development LP
 # Copyright 2017 FUJITSU LIMITED
+# (C) Copyright 2017 SUSE LLC
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -26,9 +27,6 @@ kafka_common_opts = [
                help='id of persister kafka client',
                advanced=True,
                default='monasca-persister'),
-    cfg.IntOpt('database_batch_size',
-               help='Maximum number of metric to buffer before writing to database',
-               default=1000),
     cfg.IntOpt('max_wait_time_seconds',
                help='Maximum wait time for write batch to database',
                default=30),
diff --git a/monasca_persister/conf/kafka_metrics.py b/monasca_persister/conf/kafka_metrics.py
index df900445356a..b30ead4a1485 100644
--- a/monasca_persister/conf/kafka_metrics.py
+++ b/monasca_persister/conf/kafka_metrics.py
@@ -38,6 +38,9 @@ kafka_metrics_opts = [
     cfg.StrOpt('zookeeper_path',
                help='Path in zookeeper for kafka consumer group partitioning algorithm',
                default='/persister_partitions/$kafka_metrics.topic'),
+    cfg.IntOpt('batch_size',
+               help='Maximum number of metrics to buffer before writing to database',
+               default=20000),
 ]
 
 # Replace Default OPt with reference to kafka group option
diff --git a/monasca_persister/repositories/cassandra/abstract_repository.py b/monasca_persister/repositories/cassandra/abstract_repository.py
index a7257844ca4d..188f1e8386af 100644
--- a/monasca_persister/repositories/cassandra/abstract_repository.py
+++ b/monasca_persister/repositories/cassandra/abstract_repository.py
@@ -1,4 +1,5 @@
 # (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
+# (C) Copyright 2017 SUSE LLC
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,26 +13,23 @@
 # implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
 import abc
-from cassandra import cluster
-from cassandra import query
 from oslo_config import cfg
 import six
 
 from monasca_persister.repositories import abstract_repository
+from monasca_persister.repositories.cassandra import connection_util
 
+conf = cfg.CONF
 
 @six.add_metaclass(abc.ABCMeta)
 class AbstractCassandraRepository(abstract_repository.AbstractRepository):
-
     def __init__(self):
         super(AbstractCassandraRepository, self).__init__()
-        self.conf = cfg.CONF
-
-        self._cassandra_cluster = cluster.Cluster(
-                self.conf.cassandra.cluster_ip_addresses.split(','))
-
-        self.cassandra_session = self._cassandra_cluster.connect(
-                self.conf.cassandra.keyspace)
 
-        self._batch_stmt = query.BatchStatement()
+        self._cluster = connection_util.create_cluster()
+        self._session = connection_util.create_session(self._cluster)
+        self._retention = conf.cassandra.retention_policy * 24 * 3600
+        self._cache_size = conf.cassandra.max_definition_cache_size
+        self._max_batches = conf.cassandra.max_batches
diff --git a/monasca_persister/repositories/cassandra/alarm_state_history_repository.py b/monasca_persister/repositories/cassandra/alarm_state_history_repository.py
index 335f72bb1ec3..f9e72983081d 100644
--- a/monasca_persister/repositories/cassandra/alarm_state_history_repository.py
+++ b/monasca_persister/repositories/cassandra/alarm_state_history_repository.py
@@ -1,4 +1,5 @@
 # (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
+# (C) Copyright 2017 SUSE LLC
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,9 +13,10 @@
 # implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import json
+import ujson as json
 
-from cassandra import query
+from cassandra.concurrent import execute_concurrent_with_args
+from oslo_config import cfg
 from oslo_log import log
 
 from monasca_persister.repositories.cassandra import abstract_repository
@@ -22,51 +24,38 @@ from monasca_persister.repositories.utils import parse_alarm_state_hist_message
 
 LOG = log.getLogger(__name__)
 
+UPSERT_CQL = ('update monasca.alarm_state_history USING TTL ? '
+              'set metric = ?, old_state = ?, new_state = ?, sub_alarms = ?, reason = ?, reason_data = ? '
+              'where tenant_id = ? and alarm_id = ? and time_stamp = ?')
 
-class AlarmStateHistCassandraRepository(
-    abstract_repository.AbstractCassandraRepository):
 
+class AlarmStateHistCassandraRepository(abstract_repository.AbstractCassandraRepository):
     def __init__(self):
-
         super(AlarmStateHistCassandraRepository, self).__init__()
 
-        self._insert_alarm_state_hist_stmt = self.cassandra_session.prepare(
-                'insert into alarm_state_history (tenant_id, alarm_id, '
-                'metrics, new_state, '
-                'old_state, reason, reason_data, '
-                'sub_alarms, time_stamp) values (?,?,?,?,?,?,?,?,?)')
+        self._upsert_stmt = self._session.prepare(UPSERT_CQL)
 
     def process_message(self, message):
-
-        (alarm_id, metrics, new_state, old_state, link,
-         lifecycle_state, state_change_reason,
-         sub_alarms_json_snake_case, tenant_id,
-         time_stamp) = parse_alarm_state_hist_message(
-                message)
-
-        alarm_state_hist = (
-            tenant_id.encode('utf8'),
-            alarm_id.encode('utf8'),
-            json.dumps(metrics, ensure_ascii=False).encode(
-                    'utf8'),
-            new_state.encode('utf8'),
-            old_state.encode('utf8'),
-            state_change_reason.encode('utf8'),
-            "{}".encode('utf8'),
-            sub_alarms_json_snake_case.encode('utf8'),
-            time_stamp
-        )
-
-        LOG.debug(alarm_state_hist)
+        (alarm_id, metrics, new_state, old_state, link, lifecycle_state, state_change_reason,
+         sub_alarms_json_snake_case,
+         tenant_id, time_stamp) = parse_alarm_state_hist_message(message)
+
+        alarm_state_hist = (self._retention,
+                            json.dumps(metrics, ensure_ascii=False).encode('utf8'),
+                            old_state.encode('utf8'),
+                            new_state.encode('utf8'),
+                            sub_alarms_json_snake_case.encode('utf8'),
+                            state_change_reason.encode('utf8'),
+                            "{}".encode('utf8'),
+                            tenant_id.encode('utf8'),
+                            alarm_id.encode('utf8'),
+                            time_stamp)
 
         return alarm_state_hist
 
     def write_batch(self, alarm_state_hists):
-
-        for alarm_state_hist in alarm_state_hists:
-            self._batch_stmt.add(self._insert_alarm_state_hist_stmt,
-                                 alarm_state_hist)
-
-        self.cassandra_session.execute(self._batch_stmt)
-
-        self._batch_stmt = query.BatchStatement()
+        while alarm_state_hists:
+            num_rows = min(len(alarm_state_hists), cfg.CONF.kafka_alarm_history.batch_size)
+            batch = alarm_state_hists[:num_rows]
+            execute_concurrent_with_args(self._session, self._upsert_stmt, batch)
+            alarm_state_hists = alarm_state_hists[num_rows:]
diff --git a/monasca_persister/repositories/cassandra/connection_util.py b/monasca_persister/repositories/cassandra/connection_util.py
new file mode 100644
index 000000000000..24a0c95e96c0
--- /dev/null
+++ b/monasca_persister/repositories/cassandra/connection_util.py
@@ -0,0 +1,51 @@
+# (C) Copyright 2017 SUSE LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from oslo_config import cfg
+from cassandra.auth import PlainTextAuthProvider
+from cassandra.cluster import Cluster
+from cassandra.cluster import ConsistencyLevel
+from cassandra.cluster import DCAwareRoundRobinPolicy
+from cassandra.cluster import TokenAwarePolicy
+
+from monasca_persister.repositories.cassandra.retry_policy import MonascaRetryPolicy
+
+conf = cfg.CONF
+
+def create_cluster():
+    user = conf.cassandra.user
+    if user:
+        auth_provider = PlainTextAuthProvider(username=user, password=conf.cassandra.password)
+    else:
+        auth_provider = None
+
+    contact_points = [ip.dest for ip in conf.cassandra.contact_points]
+    cluster = Cluster(contact_points,
+                      port=conf.cassandra.port,
+                      auth_provider=auth_provider,
+                      connect_timeout=conf.cassandra.connection_timeout,
+                      load_balancing_policy=TokenAwarePolicy(
+                          DCAwareRoundRobinPolicy(local_dc=conf.cassandra.local_data_center)),
+                      )
+    cluster.default_retry_policy = MonascaRetryPolicy(1, conf.cassandra.max_write_retries,
+                                                      conf.cassandra.max_write_retries)
+    return cluster
+
+
+def create_session(cluster):
+    session = cluster.connect(conf.cassandra.keyspace)
+    session.default_timeout = conf.cassandra.read_timeout
+    session.default_consistency_level = ConsistencyLevel.name_to_value[conf.cassandra.consistency_level]
+    return session
diff --git a/monasca_persister/repositories/cassandra/metric_batch.py b/monasca_persister/repositories/cassandra/metric_batch.py
new file mode 100644
index 000000000000..b6740db6902f
--- /dev/null
+++ b/monasca_persister/repositories/cassandra/metric_batch.py
@@ -0,0 +1,152 @@
+# (C) Copyright 2017 SUSE LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from oslo_log import log
+
+from cassandra.query import BatchStatement
+from cassandra.query import BatchType
+
+LOG = log.getLogger(__name__)
+
+
+class MetricBatch(object):
+    def __init__(self, metadata, load_balance_policy, batch_limit):
+
+        self.metadata = metadata
+        self.batch_limit = batch_limit
+        self.lb_policy = load_balance_policy
+        self.metric_queries = dict()
+        self.dimension_queries = dict()
+        self.dimension_metric_queries = dict()
+        self.metric_dimension_queries = dict()
+        self.measurement_queries = dict()
+
+    def batch_query_by_token(self, bound_stmt, query_map):
+        token = self.metadata.token_map.token_class.from_key(bound_stmt.routing_key)
+
+        queue = query_map.get(token, None)
+        if not queue:
+            queue = []
+            batch = BatchStatement(BatchType.UNLOGGED)
+            batch.add(bound_stmt)
+            queue.append((batch, Counter(1)))
+            query_map[token] = queue
+        else:
+            (batch, counter) = queue[-1]
+            if counter.value() < self.batch_limit:
+                batch.add(bound_stmt)
+                counter.increment()
+            else:
+                batch = BatchStatement(BatchType.UNLOGGED)
+                batch.add(bound_stmt)
+                queue.append((batch, Counter(1)))
+
+    def add_metric_query(self, bound_stmt):
+        self.batch_query_by_token(bound_stmt, self.metric_queries)
+
+    def add_dimension_query(self, bound_stmt):
+        self.batch_query_by_token(bound_stmt, self.dimension_queries)
+
+    def add_dimension_metric_query(self, bound_stmt):
+        self.batch_query_by_token(bound_stmt, self.dimension_metric_queries)
+
+    def add_metric_dimension_query(self, bound_stmt):
+        self.batch_query_by_token(bound_stmt, self.metric_dimension_queries)
+
+    def add_measurement_query(self, bound_stmt):
+        self.batch_query_by_replicas(bound_stmt, self.measurement_queries)
+
+    def batch_query_by_replicas(self, bound_stmt, query_map):
+        hosts = tuple(self.lb_policy.make_query_plan(working_keyspace=bound_stmt.keyspace, query=bound_stmt))
+
+        queue = query_map.get(hosts, None)
+        if not queue:
+            queue = []
+            batch = BatchStatement(BatchType.UNLOGGED)
+            batch.add(bound_stmt)
+            queue.append((batch, Counter(1)))
+            query_map[hosts] = queue
+        else:
+            (batch, counter) = queue[-1]
+            if counter.value() < 30:
+                batch.add(bound_stmt)
+                counter.increment()
+            else:
+                batch = BatchStatement(BatchType.UNLOGGED)
+                batch.add(bound_stmt)
+                queue.append((batch, Counter(1)))
+
+    def clear(self):
+        self.metric_queries.clear()
+        self.dimension_queries.clear()
+        self.dimension_metric_queries.clear()
+        self.metric_dimension_queries.clear()
+        self.measurement_queries.clear()
+
+    @staticmethod
+    def log_token_batch_map(name, query_map):
+        LOG.info('%s : Size: %s;  Tokens: |%s|' % (name, len(query_map),
+                                                   '|'.join(['%s: %s' % (
+                                                       token,
+                                                       ','.join([str(counter.value()) for (batch, counter) in queue]))
+                                                             for token, queue in query_map.items()])))
+
+    @staticmethod
+    def log_replica_batch_map(name, query_map):
+        LOG.info('%s : Size: %s;  Replicas: |%s|' % (name, len(query_map), '|'.join([
+            '%s: %s' % (
+                ','.join([h.address for h in hosts]), ','.join([str(counter.value()) for (batch, counter) in queue]))
+            for hosts, queue in query_map.items()])))
+
+    def get_all_batches(self):
+        self.log_token_batch_map("metric batches", self.metric_queries)
+        self.log_token_batch_map("dimension batches", self.dimension_queries)
+        self.log_token_batch_map("dimension metric batches", self.dimension_metric_queries)
+        self.log_token_batch_map("metric dimension batches", self.metric_dimension_queries)
+        self.log_replica_batch_map("measurement batches", self.measurement_queries)
+
+        result_list = []
+
+        for q in self.measurement_queries.values():
+            result_list.extend(q)
+
+        for q in self.metric_queries.values():
+            result_list.extend(q)
+
+        for q in self.dimension_queries.values():
+            result_list.extend(q)
+
+        for q in self.dimension_metric_queries.values():
+            result_list.extend(q)
+
+        for q in self.metric_dimension_queries.values():
+            result_list.extend(q)
+
+        return result_list
+
+
+class Counter(object):
+    def __init__(self, init_value=0):
+        self._count = init_value
+
+    def increment(self):
+        self._count += 1
+
+    def increment_by(self, increment):
+        self._count += increment
+
+    def value(self):
+        return self._count
diff --git a/monasca_persister/repositories/cassandra/metrics_repository.py b/monasca_persister/repositories/cassandra/metrics_repository.py
index ce10c094470c..2f5cf8fa392c 100644
--- a/monasca_persister/repositories/cassandra/metrics_repository.py
+++ b/monasca_persister/repositories/cassandra/metrics_repository.py
@@ -1,4 +1,5 @@
 # (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
+# (C) Copyright 2017 SUSE LLC
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,105 +13,262 @@
 # implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+from __future__ import with_statement
+from cachetools import LRUCache
+from collections import namedtuple
 import hashlib
-import json
+import threading
+import ujson as json
+
+from cassandra.concurrent import execute_concurrent
 
-from cassandra import query
 from oslo_log import log
-import urllib
 
 from monasca_persister.repositories.cassandra import abstract_repository
+from monasca_persister.repositories.cassandra import token_range_query_manager
+from monasca_persister.repositories.cassandra.metric_batch import MetricBatch
 from monasca_persister.repositories.utils import parse_measurement_message
 
 LOG = log.getLogger(__name__)
 
+MEASUREMENT_INSERT_CQL = ('update monasca.measurements USING TTL ? '
+                          'set value = ?, value_meta = ?, region = ?, tenant_id = ?, metric_name = ?, dimensions = ? '
+                          'where metric_id = ? and time_stamp = ?')
 
-class MetricCassandraRepository(
-    abstract_repository.AbstractCassandraRepository):
+MEASUREMENT_UPDATE_CQL = ('update monasca.measurements USING TTL ? '
+                          'set value = ?, value_meta = ? where metric_id = ? and time_stamp = ?')
 
-    def __init__(self):
+METRICS_INSERT_CQL = ('update monasca.metrics USING TTL ? '
+                      'set metric_id = ?, created_at = ?, updated_at = ? '
+                      'where region = ? and tenant_id = ? and metric_name = ? and dimensions = ? '
+                      'and dimension_names = ?')
 
-        super(MetricCassandraRepository, self).__init__()
+METRICS_UPDATE_CQL = ('update monasca.metrics USING TTL ? '
+                      'set updated_at = ? '
+                      'where region = ? and tenant_id = ? and metric_name = ? and dimensions = ? '
+                      'and dimension_names = ?')
 
-        self._insert_measurement_stmt = self.cassandra_session.prepare(
-                'insert into measurements (tenant_id,'
-                'region, metric_hash, time_stamp, value,'
-                'value_meta) values (?, ?, ?, ?, ?, ?)')
+DIMENSION_INSERT_CQL = ('insert into  monasca.dimensions '
+                        '(region, tenant_id, name, value) values (?, ?, ?, ?)')
 
-        self._insert_metric_map_stmt = self.cassandra_session.prepare(
-                'insert into metric_map (tenant_id,'
-                'region, metric_hash, '
-                'metric_map) values'
-                '(?,?,?,?)')
+DIMENSION_METRIC_INSERT_CQL = ('insert into monasca.dimensions_metrics '
+                               '(region, tenant_id, dimension_name, dimension_value, metric_name) '
+                               'values (?, ?, ?, ?, ?)')
 
-    def process_message(self, message):
+METRIC_DIMENSION_INSERT_CQL = ('insert into monasca.metrics_dimensions '
+                               '(region, tenant_id, metric_name, dimension_name, dimension_value) '
+                               'values (?, ?, ?, ?, ?)')
 
-        (dimensions, metric_name, region, tenant_id, time_stamp, value,
-         value_meta) = parse_measurement_message(message)
+RETRIEVE_DIMENSION_CQL = 'select region, tenant_id, name, value from dimensions'
 
-        metric_hash, metric_map = create_metric_hash(metric_name,
-                                                     dimensions)
+RETRIEVE_METRIC_DIMENSION_CQL = ('select region, tenant_id, metric_name, '
+                                 'dimension_name, dimension_value from metrics_dimensions '
+                                 'WHERE token(region, tenant_id, metric_name) > ? '
+                                 'and token(region, tenant_id, metric_name) <= ? ')
 
-        measurement = (tenant_id.encode('utf8'),
-                       region.encode('utf8'),
-                       metric_hash,
-                       time_stamp,
-                       value,
-                       json.dumps(value_meta, ensure_ascii=False).encode(
-                           'utf8'))
+Metric = namedtuple('Metric', ['id', 'region', 'tenant_id', 'name', 'dimension_list', 'dimension_names',
+                               'time_stamp', 'value', 'value_meta'])
 
-        LOG.debug(measurement)
 
-        return MetricMeasurementInfo(
-                tenant_id.encode('utf8'),
-                region.encode('utf8'),
-                metric_hash,
-                metric_map,
-                measurement)
+class MetricCassandraRepository(abstract_repository.AbstractCassandraRepository):
+    def __init__(self):
+        super(MetricCassandraRepository, self).__init__()
+
+        self._lock = threading.RLock()
+
+        LOG.debug("prepare cql statements...")
+
+        self._measurement_insert_stmt = self._session.prepare(MEASUREMENT_INSERT_CQL)
+        self._measurement_insert_stmt.is_idempotent = True
 
-    def write_batch(self, metric_measurement_infos):
+        self._measurement_update_stmt = self._session.prepare(MEASUREMENT_UPDATE_CQL)
+        self._measurement_update_stmt.is_idempotent = True
 
-        for metric_measurement_info in metric_measurement_infos:
+        self._metric_insert_stmt = self._session.prepare(METRICS_INSERT_CQL)
+        self._metric_insert_stmt.is_idempotent = True
 
-            self._batch_stmt.add(self._insert_measurement_stmt,
-                                 metric_measurement_info.measurement)
+        self._metric_update_stmt = self._session.prepare(METRICS_UPDATE_CQL)
+        self._metric_update_stmt.is_idempotent = True
 
-            metric_map = (metric_measurement_info.tenant_id,
-                          metric_measurement_info.region,
-                          metric_measurement_info.metric_hash,
-                          metric_measurement_info.metric_map)
+        self._dimension_stmt = self._session.prepare(DIMENSION_INSERT_CQL)
+        self._dimension_stmt.is_idempotent = True
 
-            self._batch_stmt.add(self._insert_metric_map_stmt,
-                                 metric_map)
+        self._dimension_metric_stmt = self._session.prepare(DIMENSION_METRIC_INSERT_CQL)
+        self._dimension_metric_stmt.is_idempotent = True
 
-        self.cassandra_session.execute(self._batch_stmt)
+        self._metric_dimension_stmt = self._session.prepare(METRIC_DIMENSION_INSERT_CQL)
+        self._metric_dimension_stmt.is_idempotent = True
 
-        self._batch_stmt = query.BatchStatement()
+        self._retrieve_metric_dimension_stmt = self._session.prepare(RETRIEVE_METRIC_DIMENSION_CQL)
 
+        self._metric_batch = MetricBatch(self._cluster.metadata, self._cluster.load_balancing_policy, self._max_batches)
 
-class MetricMeasurementInfo(object):
+        self._metric_id_cache = LRUCache(self._cache_size)
+        self._dimension_cache = LRUCache(self._cache_size)
+        self._metric_dimension_cache = LRUCache(self._cache_size)
 
-    def __init__(self, tenant_id, region, metric_hash, metric_map,
-                 measurement):
+        self._load_dimension_cache()
+        self._load_metric_dimension_cache()
 
-        self.tenant_id = tenant_id
-        self.region = region
-        self.metric_hash = metric_hash
-        self.metric_map = metric_map
-        self.measurement = measurement
+    def process_message(self, message):
+        (dimensions, metric_name, region, tenant_id, time_stamp, value,
+         value_meta) = parse_measurement_message(message)
 
+        with self._lock:
+            dim_names = []
+            dim_list = []
+            for name in sorted(dimensions.iterkeys()):
+                dim_list.append('%s\t%s' % (name, dimensions[name]))
+                dim_names.append(name)
+
+            hash_string = '%s\0%s\0%s\0%s' % (region, tenant_id, metric_name, '\0'.join(dim_list))
+            metric_id = hashlib.sha1(hash_string.encode('utf8')).hexdigest()
+
+            metric = Metric(id=metric_id,
+                            region=region,
+                            tenant_id=tenant_id,
+                            name=metric_name,
+                            dimension_list=dim_list,
+                            dimension_names=dim_names,
+                            time_stamp=time_stamp,
+                            value=value,
+                            value_meta=json.dumps(value_meta, ensure_ascii=False))
+
+            id_bytes = bytearray.fromhex(metric.id)
+            if self._metric_id_cache.get(metric.id, None):
+                measurement_bound_stmt = self._measurement_update_stmt.bind((self._retention,
+                                                                             metric.value,
+                                                                             metric.value_meta,
+                                                                             id_bytes,
+                                                                             metric.time_stamp))
+                self._metric_batch.add_measurement_query(measurement_bound_stmt)
+
+                metric_update_bound_stmt = self._metric_update_stmt.bind((self._retention,
+                                                                          metric.time_stamp,
+                                                                          metric.region,
+                                                                          metric.tenant_id,
+                                                                          metric.name,
+                                                                          metric.dimension_list,
+                                                                          metric.dimension_names))
+                self._metric_batch.add_metric_query(metric_update_bound_stmt)
+
+                return metric
+
+            self._metric_id_cache[metric.id] = metric.id
+
+            metric_insert_bound_stmt = self._metric_insert_stmt.bind((self._retention,
+                                                                      id_bytes,
+                                                                      metric.time_stamp,
+                                                                      metric.time_stamp,
+                                                                      metric.region,
+                                                                      metric.tenant_id,
+                                                                      metric.name,
+                                                                      metric.dimension_list,
+                                                                      metric.dimension_names))
+            self._metric_batch.add_metric_query(metric_insert_bound_stmt)
+
+            for dim in metric.dimension_list:
+                (name, value) = dim.split('\t')
+                dim_key = self._get_dimnesion_key(metric.region, metric.tenant_id, name, value)
+                if not self._dimension_cache.get(dim_key, None):
+                    dimension_bound_stmt = self._dimension_stmt.bind((metric.region,
+                                                                      metric.tenant_id,
+                                                                      name,
+                                                                      value))
+                    self._metric_batch.add_dimension_query(dimension_bound_stmt)
+                    self._dimension_cache[dim_key] = dim_key
+
+                metric_dim_key = self._get_metric_dimnesion_key(metric.region, metric.tenant_id, metric.name, name,
+                                                                value)
+                if not self._metric_dimension_cache.get(metric_dim_key, None):
+                    dimension_metric_bound_stmt = self._dimension_metric_stmt.bind((metric.region,
+                                                                                    metric.tenant_id,
+                                                                                    name,
+                                                                                    value,
+                                                                                    metric.name))
+                    self._metric_batch.add_dimension_metric_query(dimension_metric_bound_stmt)
+
+                    metric_dimension_bound_stmt = self._metric_dimension_stmt.bind((metric.region,
+                                                                                    metric.tenant_id,
+                                                                                    metric.name,
+                                                                                    name,
+                                                                                    value))
+                    self._metric_batch.add_metric_dimension_query(metric_dimension_bound_stmt)
+
+                    self._metric_dimension_cache[metric_dim_key] = metric_dim_key
+
+            measurement_insert_bound_stmt = self._measurement_insert_stmt.bind((self._retention,
+                                                                                metric.value,
+                                                                                metric.value_meta,
+                                                                                metric.region,
+                                                                                metric.tenant_id,
+                                                                                metric.name,
+                                                                                metric.dimension_list,
+                                                                                id_bytes,
+                                                                                metric.time_stamp))
+            self._metric_batch.add_measurement_query(measurement_insert_bound_stmt)
+
+            return metric
+
+    def write_batch(self, metrics):
+
+        with self._lock:
+            batch_list = self._metric_batch.get_all_batches()
+
+            results = execute_concurrent(self._session, batch_list, raise_on_first_error=True)
+
+            self._handle_results(results)
+
+            self._metric_batch.clear()
+
+            LOG.info("flushed %s metrics", len(metrics))
+
+    @staticmethod
+    def _handle_results(results):
+        for (success, result) in results:
+            if not success:
+                raise result
+
+    def _load_dimension_cache(self):
+
+        rows = self._session.execute(RETRIEVE_DIMENSION_CQL)
+
+        if not rows:
+            return
+
+        for row in rows:
+            key = self._get_dimnesion_key(row.region, row.tenant_id, row.name, row.value)
+            self._dimension_cache[key] = key
+
+        LOG.info("loaded %s dimension entries cache from database into cache." % self._dimension_cache.currsize)
+
+    @staticmethod
+    def _get_dimnesion_key(region, tenant_id, name, value):
+        return '%s\0%s\0%s\0%s' % (region, tenant_id, name, value)
+
+    def _load_metric_dimension_cache(self):
+        qm = token_range_query_manager.TokenRangeQueryManager(RETRIEVE_METRIC_DIMENSION_CQL,
+                                                              self._process_metric_dimension_query)
 
-def create_metric_hash(metric_name, dimensions):
+        token_ring = self._cluster.metadata.token_map.ring
 
-    dimensions['__name__'] = urllib.quote_plus(metric_name)
+        qm.query(token_ring)
 
-    hash_string = ''
+    def _process_metric_dimension_query(self, rows):
+
+        cnt = 0
+        for row in rows:
+            key = self._get_metric_dimnesion_key(row.region, row.tenant_id, row.metric_name, row.dimension_name,
+                                                 row.dimension_value)
+            self._metric_dimension_cache[key] = key
+            cnt += 1
 
-    for dim_name in sorted(dimensions.iterkeys()):
-        dimension = (urllib.quote_plus(dim_name) + '=' + urllib.quote_plus(
-                dimensions[dim_name]))
-        hash_string += dimension
+        LOG.info("loaded %s metric dimension entries from database into cache." % cnt)
+        LOG.info(
+            "total loaded %s metric dimension entries in cache." % self._metric_dimension_cache.currsize)
 
-    sha1_hash = hashlib.sha1(hash_string).hexdigest()
+    @staticmethod
+    def _get_metric_dimnesion_key(region, tenant_id, metric_name, dimension_name, dimension_value):
 
-    return bytearray.fromhex(sha1_hash), dimensions
+        return '%s\0%s\0%s\0%s\0%s' % (region, tenant_id, metric_name, dimension_name, dimension_value)
diff --git a/monasca_persister/repositories/cassandra/retry_policy.py b/monasca_persister/repositories/cassandra/retry_policy.py
new file mode 100644
index 000000000000..163337d6b9e9
--- /dev/null
+++ b/monasca_persister/repositories/cassandra/retry_policy.py
@@ -0,0 +1,49 @@
+# (C) Copyright 2017 SUSE LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from cassandra.policies import RetryPolicy
+
+
+class MonascaRetryPolicy(RetryPolicy):
+
+    def __init__(self, read_attempts, write_attempts, unavailable_attempts):
+
+        super(MonascaRetryPolicy, self).__init__()
+
+        self.read_attempts = read_attempts
+        self.write_attempts = write_attempts
+        self.unavailable_attempts = unavailable_attempts
+
+    def on_read_timeout(self, query, consistency, required_responses,
+                        received_responses, data_retrieved, retry_num):
+
+        if retry_num >= self.read_attempts:
+            return self.RETHROW, None
+        elif received_responses >= required_responses and not data_retrieved:
+            return self.RETRY, consistency
+        else:
+            return self.RETHROW, None
+
+    def on_write_timeout(self, query, consistency, write_type,
+                         required_responses, received_responses, retry_num):
+
+        if retry_num >= self.write_attempts:
+            return self.RETHROW, None
+        else:
+            return self.RETRY, consistency
+
+    def on_unavailable(self, query, consistency, required_replicas, alive_replicas, retry_num):
+
+        return (self.RETRY_NEXT_HOST, consistency) if retry_num < self.unavailable_attempts else (self.RETHROW, None)
diff --git a/monasca_persister/repositories/cassandra/token_range_query_manager.py b/monasca_persister/repositories/cassandra/token_range_query_manager.py
new file mode 100644
index 000000000000..e90a47fab91a
--- /dev/null
+++ b/monasca_persister/repositories/cassandra/token_range_query_manager.py
@@ -0,0 +1,67 @@
+# (C) Copyright 2017 SUSE LLC
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import multiprocessing
+
+from oslo_config import cfg
+from oslo_log import log
+
+from monasca_persister.repositories.cassandra import connection_util
+
+
+LOG = log.getLogger(__name__)
+
+conf = cfg.CONF
+
+
+class TokenRangeQueryManager(object):
+    def __init__(self, cql, result_handler, process_count=None):
+        if process_count:
+            self._process_count = process_count
+        else:
+            self._process_count = multiprocessing.cpu_count()
+
+        self._pool = multiprocessing.Pool(processes=self._process_count, initializer=self._setup,
+                                          initargs=(cql, result_handler,))
+
+    @classmethod
+    def _setup(cls, cql, result_handler):
+        cls.cluster = connection_util.create_cluster()
+        cls.session = connection_util.create_session(cls.cluster)
+        cls.prepared = cls.session.prepare(cql)
+        cls.result_handler = result_handler
+
+    def close_pool(self):
+        self._pool.close()
+        self._pool.join()
+
+    def query(self, token_ring):
+
+        range_size = len(token_ring) / self._process_count + 1
+        start_index = 0
+        params = []
+        while start_index < len(token_ring):
+            end_index = start_index + range_size - 1
+            if end_index >= len(token_ring):
+                end_index = len(token_ring) - 1
+            params.append((token_ring[start_index].value, token_ring[end_index].value))
+            start_index = end_index + 1
+
+        self._pool.map(execute_query_token_range, params, 1)
+
+
+def execute_query_token_range(token_range):
+    results = TokenRangeQueryManager.session.execute(TokenRangeQueryManager.prepared.bind(token_range))
+    TokenRangeQueryManager.result_handler(results)
diff --git a/monasca_persister/repositories/persister.py b/monasca_persister/repositories/persister.py
index d1a497514bea..51c21523d258 100644
--- a/monasca_persister/repositories/persister.py
+++ b/monasca_persister/repositories/persister.py
@@ -1,4 +1,5 @@
 # (C) Copyright 2016 Hewlett Packard Enterprise Development Company LP
+# (C) Copyright 2017 SUSE LLC
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -29,7 +30,7 @@ class Persister(object):
 
         self._kafka_topic = kafka_conf.topic
 
-        self._database_batch_size = kafka_conf.database_batch_size
+        self._batch_size = kafka_conf.batch_size
 
         self._consumer = consumer.KafkaConsumer(
                 kafka_conf.uri,
@@ -71,7 +72,7 @@ class Persister(object):
                     LOG.exception('Error processing message. Message is '
                                   'being dropped. {}'.format(message))
 
-                if len(self._data_points) >= self._database_batch_size:
+                if len(self._data_points) >= self._batch_size:
                     self._flush()
         except Exception:
             LOG.exception(
openSUSE Build Service is sponsored by