File jc-1.2.1-git.patch of Package jc
diff --git a/.gitignore b/.gitignore
index d9f4a4a..dc2c0ba 100644
--- a/.gitignore
+++ b/.gitignore
@@ -27,3 +27,11 @@ rel/jc
target
*.gz
_build
+test/ct_run.ct@c4b301cb0a29.2017-10-09_19.10.46
+test/*.html
+test/*.css
+test/*.js
+variables-ct@*
+test/rebar.lock
+test/ct_run.ct*
+ct_log_cache
diff --git a/.travis.yml b/.travis.yml
index 433cba9..ce56cab 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,8 +1,6 @@
language: erlang
otp_release:
- - 17.0
- - 18.0
- - 19.0
+ - 22.0
script: "sh -ex .travis_build.sh"
\ No newline at end of file
diff --git a/README.md b/README.md
index 02fc1e7..4dc198f 100644
--- a/README.md
+++ b/README.md
@@ -1,37 +1,60 @@
JC
====
-##Erlang, Distributable, In-Memory Cache
+## Erlang, Distributable, In-Memory Cache
-### Featruing: Pub/Sub, JSON-query, consistency support, and a simple, TCP interop. protocol.
+### Featuring: Pub/Sub, JSON-query, and mechanisms to support consistency without transactoins.
[](https://travis-ci.org/jr0senblum/jc)
[](https://hex.pm/packages/jc)
-###Features
+### Features
* Cache entries are Map, Key, Value, [TTL], [Sequence]
* Maps represent a name-space for Keys - similar to the notion
of 'bucket'
in other caching systems
* Maps, Keys and Values can be any Erlang term
* TTL is time-to-live in seconds
-* Consistency assist through sequence numbers: An alternative API
- allows for a sequence-number parameter on the put/x, evict/x,
- match/x and remove/x operations. Operations whose sequence
- number is lower than the current, per-map max are disallowed
- thereby ensuring, for example, that stale puts do not
- overwrite newer ones due to the newer one beating the stale
- ones to the cache.
-* JSON query support
- * Query by JSON: When Values are JSON, evict_match/2,
+* Consistency assist
+ * Client-Side Sequence Number: An alternative API allows for a sequence-number
+ parameter on the put/x, evict/x, match/x and remove/x operations. Operations
+ whose sequence number is lower than the current, per-map max are disallowed
+ thereby ensuring, for example, that an old delete that shows up after a newer update
+ does not inappropriately evict the newer update.
+ * Node of Responsibility: A key-specific node can be identified for destructive
+ operations (put, evict, etc.) thereby preserving eventual consistency without transactions.
+ * jc_store:locus/2 takes a Key and a list of nodes and returns the node of
+ responsibility or the given key.
+ * jc_bridge accepts a message {From, {locus, Key}} and returns the node of
+ responsibility. Because jc\_bridge knows which nodes are available, the client
+ is relieved from keeping track of up-nodes, which is necessary to
+ caclulate the correct node of responsiblility. For example:
+
+ ~~~~ Erlang
+ {jc_bridge, Any_Cache_Node} ! {self(), {locus, Key}},
+ NOR = receive
+ {error, _} -> error
+ Node -> Node
+ after
+ 1000 -> error
+ end,
+ {jc_bridge, NOR} ! {self, {put, benchtest, 10203, "{\"Key\":\"Value\"}
+ ~~~~
+ Because data is everywhere, a lookup will always find a key irrespective of
+ the node of responsibility. The way to best use this is to configure jc_sequence
+ to not be a singleton and then use the Node of Responsibility feature to chose
+ the node to do the inserts/deletes using the jc_s API.
+
+* JSON query support
+ * Query by JSON: When Values are JSON, evict_match/2,
evict_all_match/1 and values_match/2 will search or evict
keys whose JSON value, at a location specificed by a java-style,
dot-path string, equals the given value. That is,
jc:values_match(bed, "id.type=3") would return all values, in the given
map (bed), where that value was a JSON object, id, with a "type":3
at its top-level.
- * Ad-hoc, index support: In order to support faster
+ * Ad-hoc, index support: In order to support faster
operations, (2-3 orders of magnitude), each map can have up to four,
dot-path, strings configured (or added at run-time) for which jc will
provide index support.
@@ -51,17 +74,11 @@ JC
* Clients can create and subscribe to arbitrary 'topics' and
broadcast arbitrary messages under those topic names
* Clients can subscribe to node-up and node-down events
-* Interopability
- * Binary string over TCP returning JSON
- * Bridge process that accepts messages from a client indicating
- cache operations, executes the cache operations and returns the
- results to the client. This has been used with JInterface to
- interoperate with CLOJURE and Java clients
* Fine-grained logging via Lager
-###Cache Functions (jc)
+### Cache Functions (jc)
* Create
* put(Map, Key, Value, [TTLSecs]) -> {ok, Key} | {error, badarg}
* put_all(Map, [{K,V},{K,V},...], [TTLSecs]) -> {ok, CountOfSuccessfulPuts} |
@@ -83,8 +100,11 @@ JC
* clear(Map) -> ok
* flush() -> ok
* flush(silent) -> ok, Does not send alerts to subscribers
+
* Predicates
* contains_key(Map, Key) -> true | false.
+ * map_exists(Map) -> true | false.
+
* Meta
* cache_nodes() -> {{active, [Node1,... ]}, {configured, [Node1,... ]}}
* cache_size() -> {size, [{TableName, RecordCnt, Words}],...}
@@ -114,12 +134,12 @@ Identical to the Create and Evict family of functions of the jc module
-###Eviction Manager Functions (jc_eviction_manager)
+### Eviction Manager Functions (jc_eviction_manager)
* set_max_ttl(Map, Secs) -> ok | {error, badarg}
* get_max_ttls() -> [{Map, Secs}, ...]
-###Pub/Sub Functions (jc_psub)
+### Pub/Sub Functions (jc_psub)
* map_subscribe(Pid, Map, Key|any, write|delete|any) -> ok | {error, badarg}
* map_unsubscribe(Pid, Map, Key|any, write|delete|any) -> ok | {error, badarg}
* client receives
@@ -134,15 +154,14 @@ Identical to the Create and Evict family of functions of the jc module
* Broadcasts Value to all
subscribers of Topic
* topic_subscribe(Pid, jc_node_events, any) -> ok
- * subscribtes the user
- to node up and node down events:
+ * subscribes the user to node up and node down events:
`{jc_node_events, {nodedown, DownedNode, [ActiveNodes],[ConfiguredNodes]}}`
`{jc_node_events, {nodeup, UppedNode, [ActiveNodes],[ConfiguredNodes]}}`
-###Indexing Functions (jc_store)
+### Indexing Functions (jc_store)
* start_indexing(Map, Path={bed,"menu.id"}) -> ok |
{error, no_indexes_available} |
{error, Term}
@@ -152,7 +171,7 @@ Identical to the Create and Evict family of functions of the jc module
* indexes() -> {indexes, [{{Map, Path}, Position},...]} for all indexes
-###Interoperability: Bridge (jc_bridge)
+### Interoperability: Bridge (jc_bridge)
* All functions from the jc, jc_s, jc_eviction_manager, jc_psub
and jc_store are supported and are of the form:
@@ -163,6 +182,9 @@ Identical to the Create and Evict family of functions of the jc module
`jc_bridge ! {Pid, {put, Map, Key, Value}}`
* Additionally,
+ `{From, locus, Key}} -> node()` Calls jc_store:locus/2 with the list of active
+ cache nodes and returns the node of record.
+
{From, {node_topic_sub}} -> ok | {error, badarg},
client will recieve:
@@ -173,87 +195,25 @@ Identical to the Create and Evict family of functions of the jc module
`{jc_node_events, {nodeup, UppedNode, [ActiveNodes],[ConfiguredNodes]}}`
- {From, {node_topic_unsub}} -> ok.
-
-
-### Interoperability: Socket Protocol - EXPIREMENTAL
-Binary-encoded, string protocol used to provide socket-based
-interoperability with JC.
-
-All messages to JC are string representations of a tuple. All
-messages form the caching system to the client are JSON
-
-The protocol defines three message types: CONNECT, CLOSE and COMMAND all
-of which are binary strings consisting of an 8-byte size followed by the
-actual command messages.
-
-Responses are also binary strings with an 8-byte size prefix.
-
-The CONNECT command initiates a session,
-
- M = <<"{connect,{version,\"1.0\"}}">>
-
- Size is 25, so the CONNECT message is:
-
- <<25:8, M/binary>> =
- <<25,40,58,99,111,110,110,101,99,116,32,123,58,118,101,
- 114,115,105,111,110,32,49,46,48,125,41>>
-
-The server will respond to a CONNECT command with either an error or
-the encoded version of {"version":" "1.0"}
-
- <<15:8, <<"{\"version\":\"1.0\"}">> =
- <<15,123,34,118,101,114,115,105,111,110,34,58,49,46,48,125>>
-
-The CLOSE command closes the socket, ending the session
-
- M = <<"{close}">>
-
- Size is 7 so the CLOSE message is:
- <<7,123,99,108,111,115,101,125>>
-
-
-COMMAND messages are string versions of the tuple-messages, which
-jc_bridge uses, without the self() parameter. For example
-
- {self(), {put, Map, Key, Value}} becomes
- {put, Map, Key, Value}
-
-The return will be an encoded version of a JSON string. A client session
-might look as follows:
-
- client:send("{put, evs, \"1\", \"{\\\"value:\\\":true}\"}")
- <<"{\"ok\":\"1\"}">>
-
- client:send("{get, evs, \"1\"}"),
- <<"{\"ok\":"{\\\"value\\\":true}\"}">>
-
- client:send("{put, evs, 1, \"{\\\"value:\\\":true}\"}")
- <<"{\"ok\":1}">>
-
- client:send("{get, evs, 1}"),
- <<"{\"ok\":"{\\\"value\\\":true}\"}">>
-
+ `{From, {node_topic_unsub}} -> ok`.
-###Configuration
+### Configuration
* Application configuration is in sys.config which is heavily
commented
* Cookie, node-name and auto-restart of VM controlled by vm.args
-###Application Modules
+### Application Modules
* jc_cluster
* Simple, mnesia-based, cluster creation and management
* jc, jc_s, jc_store, jc_eviction_manager
* Caching operations, Key-specific and Map-level TTLs
* jc_sequence
- * Singleton service enforcing strictly monotonic sequence
+ * [optionally] Singleton service enforcing strictly monotonic sequence
numbers on jc_s operations
* jc_analyzer
* Analysis and indexing inititation of JSON query strings
-* jc_protocol
- * Socket processing of messages and Erlang -> JSON
* jc_psub:
* Pub / Sub of cache write and delete events
* On-demand, ad-hoc topic events
@@ -266,7 +226,7 @@ might look as follows:
* Looks for evidence of node dis/apperation and implements a recovery
strategy
-###Net Split/Join Strategy
+### Net Split/Join Strategy
Mnesia does not merge on its own when a node joins (returns) to a mesh of nodes.
There are two situations where this is relevant:
@@ -287,9 +247,9 @@ Given this ClusterId, we have the following strategy:
3. _Nodeup_ Whenever a Node appears, an arbitary Node ensures that any Nodes that report
a different ClusterId (different than the arbitrary Node's ClusterId) are killed to be
restarted by the hearbeat application. If any Nodes required restarting, the entire
- cache is flushed.
+ cache is flushed or not per policy in config.sys.
-###Build Instructions
+### Build Instructions
* Ensure that Erlang 17 or higher is installed
* Get the Source Code from Stash
@@ -306,7 +266,7 @@ Given this ClusterId, we have the following strategy:
`[root@db01] ./rebar3 as prod release`
-###Documentation
+### Documentation
`[root@dbo1] ./rebar3 edoc`
diff --git a/config/sys.config b/config/sys.config
index 34dbd78..6b365c5 100644
--- a/config/sys.config
+++ b/config/sys.config
@@ -1,69 +1,55 @@
-%% Typically the only line that is customer specific is the cache_nodes lines in
+%% Typically the only line that are customer specific are the cache_nodes lines in
%% the jc stanza.
[
- %% SASL config - type of logging not typically used, but might come in handy
- %% log files will be created, but are small and rotated.
- {sasl, [
- {sasl_error_logger, {file, "log/sasl-error.log"}},
- {errlog_type, error},
- {error_logger_mf_dir, "log/sasl"}, % Log directory
- {error_logger_mf_maxbytes, 10485760}, % 10 MB max file size
- {error_logger_mf_maxfiles, 5} % 5 files max
- ]},
-
{mnesia,
[
%% Specifies the maximum number of writes allowed to the transaction log
- %% before a new dump of the log is performed. The higer the more ram is
- %% used but low numbers may result in mnesia not being able to keep up.
+ %% before a new dump of the log is performed. The higer the more RAM is
+ %% used, but low numbers may result in mnesia not being able to keep up.
%% Default is 100
{dump_log_write_threshold, 50000},
- %% Must be ram so that schema is diskless. This allows for nodes to come in
- %% and out of the cluster in any order without worying about conflicting
+ %% MUST be RAM so that schema is diskless. This allows for nodes to come
+ %% in and out of the cluster in any order without worying about conflicting
%% masters.
{schema_location, ram}
- ]
- },
+ ]},
{lager,
[
+ %% Lager back-end handlers. Logs are small and rotated by default.
{handlers, [
- {lager_console_backend, info},
- {lager_file_backend, [{file, "log/error.log"},
- {level, error},
- {size, 10485760},
- {date, "$D0"},
- {count, 5}]},
-%% Uncomment for file-based debug log
-%% {lager_file_backend, [{file, "log/debug.log"},
-%% {level, debug},
-%% {size, 10485760},
-%% {date, "$D0"},
-%% {count, 5}]},
+ {lager_console_backend, [{level, info}]},
+ %% Uncomment for file-based debug log
+ %% {lager_file_backend, [{file, "log/debug.log"},
+ %% {level, debug},
+ %% {size, 10485760},
+ %% {date, "$D0"},
+ %% {count, 5}]},
{lager_file_backend, [{file, "log/info.log"},
{level, info},
{size, 10485760},
{date, "$D0"},
{count, 5}]}
]},
- %% We are not racist and do not want colors on the command line.
{colored, true}
]},
{jc,
[
%% Cache nodes in any order.
- {cache_nodes, ['jc1@127.0.0.1']},
+ {cache_nodes, ['jc1@127.0.0.1', 'jc2@127.0.0.1']},
%% How long to wait for mnesia tables to come up when adding a node to the
%% cluster.
- {table_wait_ms, 2000},
+ {table_wait_ms, 20000},
%% At what interval to run the process that looks for clients with no
- %% subscriptions and remove them from the subscriber (ps_client) tables.
+ %% subscriptions and removes them from the subscriber (ps_client) tables.
+ %% This is a safety-net activity and is used to remove subscriptions that
+ %% failed to be removed due to a failure of somesort.
{evict_deadbeats_ms, 3600000},
%% How often to run the process that evicts records that are older than the
@@ -71,17 +57,26 @@
{max_ttl_job_secs, 180},
%% Possibly empty list indicating max_ttl for records in the map. Format is
- %% {max_ttl_maps, [{Map1, Secs1}]},
- {max_ttl_maps, [{testmap, 100}]},
+ %% {max_ttl_maps, [{Map1, Secs1}, ..., {MapN, SecsN}]},
+ %% {max_ttl_maps, [{testmap, 100}]},
- %% Initial json values upon which to index
+ %% Initial JSON values upon which to index. In a path, the number 2 indicates
+ %% the second whatever, while the '2' indicates the string 2 in the path.
%% {indexes, [{bed, "identifier"}, {bed, "menu.2.id.'2'"}]}
- %% Frequency needed to warrant indexing {freq, Time_secs}
+ %% Frequency needed to see a particular JSON query before indexing
+ %% {freq, Time_secs}
{analyze_freq, {5, 5}},
- %% Port for the Socket protocol listener
- {protocol_port, 5555}
+ %% When a node appears after a net-split, some nodes are restarted. If the
+ %% survivors should flush their contents then this should be true, else false.
+ {should_flush, false},
+
+ %% Should jc_sequence be a singleton or local at each cache node. If
+ %% destructive operations happen at key-specific nodes, than no need;
+ %% otherwise yes.
+ {jc_sequence_singleton, true}
+
]}
diff --git a/config/vm.args b/config/vm.args
index 6cf0656..6249f28 100644
--- a/config/vm.args
+++ b/config/vm.args
@@ -1,6 +1,6 @@
## Name of the node
## comment out for windows, node name comes from jc.cmd
--name jc@127.0.0.1
+-name jc2@127.0.0.1
## Cookie for distributed erlang
-setcookie somecookie
diff --git a/doc/overview.edoc b/doc/overview.edoc
index c6bdd0a..4767573 100644
--- a/doc/overview.edoc
+++ b/doc/overview.edoc
@@ -240,9 +240,34 @@ client:send("{get, evs, \"1\"}"),
<li>Server that acts as a proxy between an external process and
jc functionality</li>
</ul>
+
+=== Net Split/Join Strategy ===
+
+Mnesia does not merge on its own when a node joins (returns) to a mesh of nodes.
+There are two situations where this is relevant:
+
+* j_cache nodes start in a disconnected state so more than one initiates a new
+cluster and then, subsequently, those nodes join into one cluster;
+* A node drops out of the cluster due to some network glitch and then rejoins.
+
+To handle these situations, whenever a cluster is created by a Node (node@123.45.67,
+for example), it creates a ClusterId - its Node name (node@123.45.67), for that cluster.
+
+Given this ClusterId, we have the following strategy:
+
+1. _Cluster Creation_: creates an initial ClusterId;
+2. _Nodedown_: If the Node that created the cluster dissapears, a surviving Node changes the
+ ClusterId such that ClusterId is now this new Node's name. In the case of a
+ disconnected newtwork, one of the islands will have the original ClusterId Node
+ dissapear, and it will create a new one as described.
+3. _Nodeup_ Whenever a Node appears, an arbitary Node ensures that any Nodes that report
+ a different ClusterId (different than the arbitrary Node's ClusterId) are killed to be
+ restarted by the hearbeat application. If any Nodes required restarting, the entire
+ cache is flushed or not per policy in config.sys.
+
=== Build Instructions ===
<ul>
-<li>Ensure that Erlang 17 or higher is installed</li>
+<li>Ensure that Erlang 20 or higher is installed</li>
<li>Get the Source Code from Stash</li>
</ul>
<p><code>[root@db01] git clone https://github.com/jr0senblum/jc.git</code></p>
diff --git a/include/records.hrl b/include/records.hrl
index 0e5398e..f3e094b 100644
--- a/include/records.hrl
+++ b/include/records.hrl
@@ -1,22 +1,21 @@
% Types
-type seconds() :: non_neg_integer().
+-type ttl() :: seconds().
+-type time_stamp() :: seconds().
-type map_name() :: any().
-type key() :: any().
-type value() :: any().
--type ttl() :: seconds().
+
-type rec_ref() :: reference().
--type time_stamp() :: seconds().
-% Key_to_value - an ordered_set table whose key is {key, map}. Ref is used by
-% jc_eviction manager as the key of the cache item to evict. i1 - i4 are
-% fields that can be used to hold values pulled from a json value to support a
-% querry-select feature (see README and jc_store:start_indexing/2 for more. Seq
-% is an integer supplied by the client that, if provided, is expected to be
-% strictly monotinic. If it is not, the put with the non monotonic value will
-% be evicted and the old one re-inserted.
-%
+
+% Key_to_value - an ordered_set table whose key is {key, map_name}. Ref is used
+% by jc_eviction manager as the key of the cache item to evict. i1 - i4 are
+% fields that can be used to hold values pulled from a json value to support an
+% indexed querry-select feature (see README and jc_store:start_indexing/2 for
+% more).
-record (key_to_value,
{jc_key :: {key(), map_name()},
map :: map_name(),
@@ -35,14 +34,19 @@
-type key_to_value() :: #key_to_value{}.
+% Seq_no is an integer supplied by the client that, if provided, MUST be
+% strictly monotinic and is used as a sequence number to ensure that a stale
+% operation doesn't make it to jcache after the fact and clobber a more recent
+% put or evict operation.
-record(seq,
{map :: map_name(),
seq_no :: jc_sequence:seq()
}).
-% Holds information about json-path's that will be the target of a query-sellect
-% for a given map. Position indicates which column (i1-i4) in key_to_value to
-% use.
+
+% Defines the index for a given map and JSON path. Used for query-selects and
+% evicts. Position indicates which column (i1-i4) in key_to_value to store the
+% index.
-record (to_index,
{map_path :: {map_name(), tuple()},
map_name :: map_name(),
@@ -50,6 +54,8 @@
}).
+% Record that keeps track of the accounting around a JSON query as part of
+% determining whether an index should be initiatied.
-record (auto_index,
{map_path :: {map_name(), tuple()} | '_',
count :: non_neg_integer() | '_',
@@ -86,7 +92,6 @@
}).
-
% Jc_psub records. Subscription patterns and the set of PIDS subscribed to those
% patterns.
-record (ps_sub,
@@ -94,6 +99,7 @@
clients = sets:new() :: sets:set()
}).
+
% Ps_client records. Unique processes that are subscribers, includings what type
% of mechanism is used to monitor the client - link to the Pid or monitor the
% node.
diff --git a/rebar.config b/rebar.config
index 4706976..3fb796b 100644
--- a/rebar.config
+++ b/rebar.config
@@ -11,9 +11,8 @@
{deps, [
{jwalk, "1.1.0"},
- {jsone, "1.2.0"},
- {lager, "3.2.1"},
- {ranch, "1.1.0"}
+ {jsone, {git, "git://github.com/sile/jsone", {branch, "otp21-rc1"}}},
+ {lager, "3.7.0"}
]}.
@@ -41,7 +40,6 @@
['jc',
kernel,
stdlib,
- sasl,
inets,
{observer,load},
{wx, load},
diff --git a/rebar.lock b/rebar.lock
deleted file mode 100644
index 69812fa..0000000
--- a/rebar.lock
+++ /dev/null
@@ -1,5 +0,0 @@
-[{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.7">>},1},
- {<<"jsone">>,{pkg,<<"jsone">>,<<"1.2.0">>},0},
- {<<"jwalk">>,{pkg,<<"jwalk">>,<<"1.1.0">>},0},
- {<<"lager">>,{pkg,<<"lager">>,<<"3.0.1">>},0},
- {<<"ranch">>,{pkg,<<"ranch">>,<<"1.1.0">>},0}].
diff --git a/rebar3 b/rebar3
index f49bd25..dc95a3a 100755
Binary files a/rebar3 and b/rebar3 differ
diff --git a/src/jc.app.src b/src/jc.app.src
index 7541d38..0f421a5 100644
--- a/src/jc.app.src
+++ b/src/jc.app.src
@@ -14,11 +14,9 @@
{applications, [
kernel,
stdlib,
- sasl,
stdlib,
syntax_tools,
compiler,
- ranch,
lager,
jsone,
jwalk
@@ -33,7 +31,6 @@
jc_cluster,
jc_eviction_manager,
jc_netsplit,
- jc_protocol,
jc_psub,
jc_s,
jc_sequence,
diff --git a/src/jc.erl b/src/jc.erl
index abb791d..f43373e 100644
--- a/src/jc.erl
+++ b/src/jc.erl
@@ -1,14 +1,15 @@
%%% ----------------------------------------------------------------------------
%%% @author Jim Rosenblum
-%%% @copyright (C) 2011 - 2015, Jim Rosenblum
+%%% @copyright (C) 2011 - 2017, Jim Rosenblum
%%% @doc This module wraps the mnesia-interacting, lower-level functions
%%% implemented in {@link jc_store. jc_store} to provide a public, DIRTY,
%%% set of opperations. {@link jc_s. jc_s} provides functions that take a
%%% sequence parameter to better support serilization (consistency) without.
%%% resorting to transactions.
%%%
-%%% JC can be called directly by Erlang clients; or,
+%%% The jc module can be called directly by Erlang clients; or,
%%% Java node -> JInterface -> {@link jc_bridge. jc_bridge} -> jc; or,
+%%% experimentally,
%%% Application -> TPC/IP -> {@link jc_protocol. jc_protocol} -> jc
%%%
%%% @version {@version}
@@ -29,17 +30,22 @@
flush/0, flush/1,
remove_items/2]).
-
% Get Functions
--export([contains_key/2,
- get/2,
+-export([get/2,
get_all/2,
key_set/1,
values/1,
values_match/2]).
-% CACHE META-INFO SUPPORT
--export([cache_nodes/0, cache_size/0, map_size/1, maps/0, up/0, stop/0]).
+% Predicates
+-export([contains_key/2,
+ map_exists/1]).
+
+% Control
+-export([stop/0]).
+
+% Cache Meta-data support
+-export([cache_nodes/0, cache_size/0, map_size/1, maps/0, up/0]).
% Used by jc_s for evict_match
-export([fun_match/3]).
@@ -47,18 +53,18 @@
% Used by eviction manager to evict an entry based on a reference
-export ([delete_record_by_ref/1]).
-
% definitions of global records and types.
-include("../include/records.hrl").
+% ttl is either infinity (0) or an integer > 0.
-define(INFINITY, 0).
-define(VALID(X), is_integer(X) andalso (X >= 0)).
%% =============================================================================
-%% Meta data API
+%% Cache control
%% =============================================================================
@@ -75,6 +81,12 @@ stop()->
ok.
+
+%% =============================================================================
+%% Meta data API
+%% =============================================================================
+
+
%% -----------------------------------------------------------------------------
%% @doc Return a sorted list of all maps currently in the cache.
%%
@@ -136,7 +148,7 @@ cache_nodes() ->
Configured = application:get_env(jc, cache_nodes,[]),
MnesiaUp = jc_store:up_nodes(),
Running = [N || N <- MnesiaUp,
- undefined /= rpc:call(N, erlang, whereis, [jc_bridge], 1000)],
+ is_pid(rpc:call(N, erlang, whereis, [jc_bridge], 1000))],
{{active, lists:sort(Running)}, {configured, lists:sort(Configured)}}.
@@ -213,7 +225,8 @@ put_all(_m, _K, _T) ->
clear(Map) ->
lager:debug("~p: clear map ~p.", [?MODULE, Map]),
- jc_store:clear(Map),
+ F = fun() -> jc_store:clear(Map) end,
+ trans_execute(F),
ok.
@@ -357,21 +370,6 @@ remove_items(Map, Keys)->
%% =============================================================================
-%% -----------------------------------------------------------------------------
-%% @doc Return true if the {@link key(). Key} is in the {@link map_name()},
-%% else false.
-%%
--spec contains_key(Map::map_name(), Key::key()) -> true | false.
-
-contains_key(Map, Key) ->
- case get(Map, Key) of
- {ok, _} ->
- true;
- _ ->
- false
- end.
-
-
%% -----------------------------------------------------------------------------
%% @doc Retrieve the data associated with Key.
%% %
@@ -467,6 +465,37 @@ fun_match(Map, Criteria, Fun) ->
+%% =============================================================================
+%% Predicate API
+%% =============================================================================
+
+
+%% -----------------------------------------------------------------------------
+%% @doc Return true if the {@link key(). Key} is in the {@link map_name()},
+%% else false.
+%%
+-spec contains_key(Map::map_name(), Key::key()) -> true | false.
+
+contains_key(Map, Key) ->
+ case get(Map, Key) of
+ {ok, _} ->
+ true;
+ _ ->
+ false
+ end.
+
+
+%% -----------------------------------------------------------------------------
+%% @doc Return true if the given {@map mpa(). Map} exists, else false.
+%%
+-spec map_exists(Map::map_name()) -> true | false.
+
+map_exists(Map) ->
+ F = fun() -> jc_store:map_exists(Map) end,
+ trans_execute(F).
+
+
+
%% =============================================================================
%% Utility functions
%% =============================================================================
@@ -481,7 +510,7 @@ fun_match(Map, Criteria, Fun) ->
trans_execute(F) ->
case mnesia:is_transaction() of
true -> F();
- false -> mnesia:sync_dirty(F)
+ false -> mnesia:async_dirty(F)
end.
diff --git a/src/jc_app.erl b/src/jc_app.erl
index 15a6aa3..16418fc 100644
--- a/src/jc_app.erl
+++ b/src/jc_app.erl
@@ -21,21 +21,15 @@
%%% ============================================================================
%% -----------------------------------------------------------------------------
-%% @private Ask jc_store to initialize or join the the mnesia cluster, start
-%% the tcp listeners (for protocol users), and fire-up the top level supervisor.
+%% @private Ask jc_store to initialize or join the the mnesia cluster, and
+%% fire-up the top level supervisor.
%%
-spec start (normal | {takeover | failover, atom()}, [{node, atom()}]) ->
{ok, pid()} | {error, atom()}.
start(_StartType, _StartArgs) ->
ok = jc_cluster:init(),
- Port = application:get_env(jc, protocol_port, 5555),
- {ok, _} = ranch:start_listener(jc_proto,
- 100,
- ranch_tcp,
- [{port, Port}],
- jc_protocol, [Port]),
- lager:info("tcp, protocol listener is up and listening on port: ~p", [Port]),
+
case jc_sup:start_link() of
{ok, Pid} ->
{ok, Pid};
diff --git a/src/jc_bridge.erl b/src/jc_bridge.erl
index 377b4fa..536e122 100644
--- a/src/jc_bridge.erl
+++ b/src/jc_bridge.erl
@@ -18,7 +18,9 @@
%% Module API
--export([start_link/0, do/1]).
+-export([start_link/0]).
+
+-export([do/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -27,8 +29,11 @@
-define(SERVER, ?MODULE).
+%% State is just a list of active nodes, used to enable fast response to the
+%% 'which' info_message - see jc_store:locus/2
+%%
+-record(jc_bridge_state, {nodes :: list(node())}).
--record(jc_bridge_state, {}).
@@ -56,6 +61,7 @@ do(Message) ->
?SERVER ! {self(), Message}.
+
%%% ----------------------------------------------------------------------------
%%% gen_server callbacks
%%% ----------------------------------------------------------------------------
@@ -68,13 +74,35 @@ do(Message) ->
init([]) ->
Nodes = application:get_env(jc, cache_nodes,[node()]),
+ UpNodes = [node() | up_nodes(Nodes)],
lager:info("~p: letting ~p know that this node is ready.",
[?MODULE, Nodes]),
gen_server:abcast(Nodes, jc_psub, {jc_bridge_up, node()}),
+ subscribe_and_monitor(),
+
+ % Store the cach node names, cnt and subscribe to changes
+
lager:info("~p: up.", [?MODULE]),
- {ok, #jc_bridge_state{}}.
+ {ok, #jc_bridge_state{nodes = UpNodes}}.
+
+
+
+% Return the list of up cache nodes.
+%
+up_nodes(Nodes) ->
+ [N || N <- Nodes,
+ is_pid(rpc:call(N, erlang, whereis, [jc_bridge], 1000))].
+
+
+% Subscribe to jc_node_events and monitor jc_psub so that if it goes down,
+% jc_bridge can re-subscribe.
+%
+subscribe_and_monitor() ->
+ jc_psub:topic_subscribe(self(), jc_node_events, any),
+ _ = erlang:monitor(process, jc_psub).
+
%% -----------------------------------------------------------------------------
%% @private Handling call messages
@@ -100,9 +128,27 @@ handle_cast(Msg, State) ->
%% -----------------------------------------------------------------------------
%% @private Handling info messages:
%% Execute the requested j_cache operation and return the answer to requester.
+%% Also recieve the jc_psub, subscription message notifiying of node up down.
%%
-spec handle_info(any(), #jc_bridge_state{}) -> {noreply, #jc_bridge_state{}}.
+handle_info({_From, {jc_node_events, {_Type, _Node, Active, _C}}}, _State) ->
+ NewState = #jc_bridge_state{nodes = Active},
+ lager:info("~p: nodes changed. New actives: ~p.", [?MODULE, Active]),
+ {noreply, NewState};
+
+
+handle_info({'DOWN', _Ref, process, {jc_psub, _Node}, _Info}, State) ->
+ % Need to resubscribe to node up/down messages.
+ lager:info("~p: jc_psub went down. Resubscribing.", [?MODULE]),
+ subscribe_and_monitor(),
+ {noreply, State};
+
+handle_info({From, {locus, Key}}, #jc_bridge_state{nodes=Nodes} = State) ->
+ _Pid = spawn(fun() -> From ! jc_store:locus(Key, Nodes) end),
+ {noreply, State};
+
+
handle_info({From, {map_subscribe, Map, Key, Ops}}, State) ->
_Pid = spawn(fun()->From ! jc_psub:map_subscribe(From, Map, Key, Ops) end),
{noreply, State};
@@ -149,9 +195,8 @@ handle_info({From, {get_max_ttls}}, State) ->
spawn(fun()->From ! jc_eviction_manager:get_max_ttls() end),
{noreply, State};
-
handle_info({From, {put, Map, Key, Value}}, State) ->
- _Pid = spawn(fun() ->From ! jc:put(Map, Key, Value) end),
+ _Pid = spawn(fun() -> From ! jc:put(Map, Key, Value) end),
{noreply, State};
handle_info({From, {put_s, Map, Key, Value, Seq}}, State) ->
@@ -202,6 +247,7 @@ handle_info({From, {evict_map_since, Map, Secs}}, State) ->
handle_info({From, {evict, Map, Key}}, State) ->
_Pid = spawn(fun() -> From ! jc:evict(Map, Key) end),
{noreply, State};
+
handle_info({From, {evict_s, Map, Key, Seq}}, State) ->
_Pid = spawn(fun() -> From ! jc_s:evict(Map, Key, Seq) end),
{noreply, State};
@@ -329,3 +375,4 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+
diff --git a/src/jc_cluster.erl b/src/jc_cluster.erl
index 146a69f..aad0e18 100644
--- a/src/jc_cluster.erl
+++ b/src/jc_cluster.erl
@@ -113,7 +113,7 @@ get_cluster_id() ->
end.
-% kill this node...
+% kill the application, it will be restarted...
%
kamakazee() ->
lager:notice("~p: ~p seppuku.", [?MODULE, node()]),
@@ -159,8 +159,8 @@ init()->
%% -----------------------------------------------------------------------------
-%% @private Jcache is considered loaded if jc_psub is up - it is the last gen_server
-%% started by the superviosr.
+%% @private Jcache is considered loaded if jc_psub is up - it is the last
+%% gen_server started by the superviosr.
%%
-spec jc_loaded(Node::node()) -> true | false.
@@ -179,18 +179,21 @@ dynamic_db_init([]) ->
mnesia:create_table(key_to_value,
[{attributes, record_info(fields, key_to_value)},
- {type, ordered_set},
+ {type, set},
{index, [map, key, ref, create_tm]}
]),
+
mnesia:create_table(seq,
[{attributes, record_info(fields, seq)},
{type, set}
]),
+
mnesia:create_table(to_index,
[{attributes, record_info(fields, to_index)},
{type, set},
{index, [map_name, position]}
]),
+
mnesia:create_table(auto_index,
[{attributes, record_info(fields, auto_index)},
{type, set}
@@ -198,17 +201,21 @@ dynamic_db_init([]) ->
mnesia:create_table(ttl,
[{attributes, record_info(fields, ttl)}
]),
+
mnesia:create_table(max_ttl,
[{attributes, record_info(fields, max_ttl)},
{type, set}
]),
+
mnesia:create_table(stats,
[{attributes, record_info(fields, stats)}
]),
+
mnesia:create_table(ps_sub,
[{attributes, record_info(fields, ps_sub)},
{local_content, true}
]),
+
mnesia:create_table(ps_client,
[{attributes, record_info(fields, ps_client)},
{local_content, true}
diff --git a/src/jc_netsplit.erl b/src/jc_netsplit.erl
index 57d883a..be73d58 100644
--- a/src/jc_netsplit.erl
+++ b/src/jc_netsplit.erl
@@ -28,8 +28,8 @@
%%% node will report different ClusterId - bad.
%%%
%%% For any bad outcome, all nodes having the 'different' ClusterId are killed
-%%% to be restarted by the heart process, and a survivor does a flush.
-%%%
+%%% to be restarted by the heartbeat process, and a survivor may do a flush per
+%%% policy in configy.sys.
%%%
%%% @end
%%% Created : 18 May 2016 by Jim Rosenblum <jrosenblum@Jims-MacBook-Pro.local>
@@ -56,9 +56,10 @@
-define(LOCK, {?MODULE, self()}).
-% State: list of configured nodes.
+% State: list of configured nodes, and flush policy on join after net split.
-record(jc_ns_state,
- {nodes = [] :: [Configured::node()]}).
+ {nodes = [] :: [Configured::node()],
+ should_flush = true :: boolean()}).
@@ -90,10 +91,12 @@ start_link() ->
init([]) ->
ok = net_kernel:monitor_nodes(true),
{ok, Configured} = application:get_env(jc, cache_nodes),
+ ShouldFlush = application:get_env(jc, should_flush, false),
lager:info("~p: up and watching events for ~p.", [?SERVER, Configured]),
- {ok, #jc_ns_state{nodes = lists:sort(Configured)}}.
+ {ok, #jc_ns_state{nodes = lists:sort(Configured),
+ should_flush = ShouldFlush}}.
%% -----------------------------------------------------------------------------
@@ -121,8 +124,8 @@ handle_cast(Msg, State) ->
%%
-spec handle_info(any(), #{}) -> {noreply, #{}}.
-handle_info({nodeup, Upped}, #jc_ns_state{nodes = Ns} = State) ->
- check_cluster_health(Upped, Ns),
+handle_info({nodeup, Upped}, #jc_ns_state{nodes = Ns, should_flush = Sf} = State) ->
+ check_cluster_health(Upped, Ns, Sf),
{noreply, State};
handle_info({nodedown, Downed}, State) ->
@@ -191,23 +194,24 @@ do_change(Downed) ->
% all nodes will report the same ClusterId. If not, then 'outliers' should
% kill themselves and let the hearbeat process restart them.
%
-check_cluster_health(Upped, Nodes) ->
+check_cluster_health(Upped, Nodes, ShouldFlush) ->
case is_relevant(Upped, Nodes) of
false ->
ok;
true ->
- check(Upped, Nodes, jc_cluster:get_cluster_id()),
+ check(Upped, Nodes, jc_cluster:get_cluster_id(), ShouldFlush),
ok
end.
-% Ask each node to check that it has the same ClusterId, if not flush.
-% Any node that has a different ClusterId will kill itself and be restarted
-% by the heartbeat process.
+% Ask each node to check that it has the same ClusterId. Any node that has
+% a different ClusterId will kill itself and be restarted by the heartbeat
+% process. If any nodes were killed, flush the entire cache per policy in
+% sys.config
%
-check(Upped, Nodes, ClusterId) ->
+check(Upped, Nodes, ClusterId, ShouldFlush) ->
{Res, _Bad} =
global:trans(?LOCK,
fun() ->
@@ -223,9 +227,13 @@ check(Upped, Nodes, ClusterId) ->
infinity),
case lists:member(bad, Res) of
- true ->
- lager:notice("~p: cluster repaired, flushing.", [?SERVER]),
+ true when ShouldFlush ->
+ lager:notice("~p: cluster repaired, flushing cache per policy.",
+ [?SERVER]),
jc:flush();
+ true when not ShouldFlush ->
+ lager:notice("~p: cluster repaired, not flushing cache per policy.",
+ [?SERVER]);
false ->
lager:notice("~p: cluster showed no signs of inconsistency.",
[?SERVER])
diff --git a/src/jc_protocol.erl b/src/jc_protocol.erl
deleted file mode 100644
index cb30c0f..0000000
--- a/src/jc_protocol.erl
+++ /dev/null
@@ -1,411 +0,0 @@
-%%% ----------------------------------------------------------------------------
-%%% @author Jim Rosenblum <jrosenblum>
-%%% @copyright (C) 2015, Jim Rosenblum
-%%% @doc
-%%%
-%%% JC Protocol 1.0
-%%% A binary-encoded, string protocol used to provide socket-based
-%%% interoperability with JC. Incoming messages are string representations
-%%% of a tuple. Responses are JSON.
-%%%
-%%% The protocol defines three message types: CONNECT, CLOSE and COMMAND all of
-%%% which are binary strings consisting of a header, indicating the size of the
-%%% message in bytes, follwed by the actual message itself. The header is an
-%%% 8-byte, big endian, unsigned integer indicating the size of the message in
-%%% bytes.
-%%%
-%%% A RESPONSE is structured the same as messages - 8-byte size header followed
-%%% by the content of the response.
-%%%
-%%% The CONNECT command initiates a session,
-%%%
-%%% ```M = <<"{connect,{version,\"1.0\"}}">>'''
-%%%
-%%% The byte size is 26, so the CONNECT message is:
-%%%
-%%% ```<<26:8/integer-unit:8, M/binary>> =
-%%% <<0,0,0,0,0,0,0,26,123,99,111,110,110,101,99,116,44,123,118,101,114,
-%%% 115,105,111,110,44,32,34,49,46,48,34,125,125>> '''
-%%%
-%%% The server will respond to a CONNECT command with either an error or
-%%% the appropriately encoded version of {\"version\":\"1.0\"}
-%%%
-%%% ``` <<17:8/integer-unit:8, <<"{\"version\":1.0}">> =
-%%% <0,0,0,0,0,0,0,17,123,34,118,101,114,115,105,111,110,34,
-%%% 58,34,49,46,48,34,125>> '''
-%%%
-%%%
-%%% The CLOSE command closes the socket, ending the session
-%%%
-%%% ```M = <<"{close}">>'''
-%%%
-%%% Size is 7 so the CLOSE message is:
-%%% ```<0,0,0,0,0,0,0,7,123,99,108,111,115,101,125>> '''
-%%%
-%%%
-%%% COMMAND messages are string versions of the tuple-messages which
-%%% {@link jc_bridge. jc_bridge} uses, only without the self() parameter. For
-%%% example the jc_brdige message, {self(), {put, Map, Key, Value}} becomes
-%%% {put, Map, Key, Value}
-%%%
-%%% The RESPONSE will be an appropriately encoded, binary version of a JSON
-%%% response representing the Erlang return value.
-%%%
-%%% A client session might look as follows:
-%%%
-%%% ```client:send("{put, evs, 1, \"a string value\"}")
-%%% ==> <<"{\"ok\":1}">>
-%%%
-%%% client:send("{get, evs, 1}"),
-%%% ==> <<"{\"ok\": \"a string value\"}">>'''
-%%%
-%%%
-%%% @end
-%%% Created : 25 Aug 2015 by Jim Rosenblum <jrosenblum@carelogistics.coml>
-%%% ----------------------------------------------------------------------------
-
--module(jc_protocol).
-
--behaviour(gen_server).
--behaviour(ranch_protocol).
-
-
-%% module API
--export([start_link/4]).
-
-
-%% gen_server callbacks
--export([init/1, init/4,
- handle_call/3,
- handle_cast/2,
- handle_info/2,
- terminate/2,
- code_change/3]).
-
--define(SERVER, ?MODULE).
--define(TIMEOUT, infinity).
-
--record(jc_p, {socket,
- trans :: 'jc_protocol',
- connected = false :: boolean(),
- acc = undefined :: undefined | binary(),
- size = undefined :: undefined | non_neg_integer(),
- version = <<"1.0">> :: binary()}).
--type jc_p() :: #jc_p{}.
-
-
-
-%%% ============================================================================
-%%% Modue API
-%%% ============================================================================
-
-
-%% -----------------------------------------------------------------------------
-%% Start the gen_server which will handle the socket.
-%%
--spec start_link(ranch:ref(), any(), jc_protocol, any()) -> {ok,pid()}.
-
-start_link(Ref, Socket, Trans, Opts) ->
- proc_lib:start_link(?MODULE, init, [Ref, Socket, Trans, Opts]).
-
-
-
-%%% ============================================================================
-%%% gen_server callbacks
-%%% ============================================================================
-
-
-%% -----------------------------------------------------------------------------
-%% This function is never called. Its defined so that the gen_server contract
-%% is fufilled.
-%%
--spec init([]) -> {ok, undefined}.
-
-init([]) -> {ok, undefined}.
-
-
-%% -----------------------------------------------------------------------------
-%% Set up the socket and convert the process into a gen_server.
-%%
--spec init(ranch:ref(),any(),jc_protocol|undefined,[Port::integer()]) -> any().
-
-init(Ref, S, T, _Opts = [Port]) ->
- ok = proc_lib:init_ack({ok, self()}),
- ok = ranch:accept_ack(Ref),
- ok = T:setopts(S, [{active, once}, {packet, 0}]),
- lager:info("~p(~p): up and listening on port ~p.",
- [?MODULE, self(), Port]),
-
- gen_server:enter_loop(?MODULE, [],
- #jc_p{socket = S,
- trans = T,
- connected = false,
- size = undefined,
- acc = <<>>},
- ?TIMEOUT).
-
-
-%% -----------------------------------------------------------------------------
-%% @private Handle info messages: Socket messages and jc_psub subscription
-%% messages.
-%%
--spec handle_info(any(), jc_p()) -> {noreply, jc_p()} | {stop, normal, jc_p()}.
-
-handle_info({tcp, S, Data}, State = #jc_p{socket=S, trans=T})->
- T:setopts(S, [{active, once}]),
- NewState = handle_data(Data, State),
- {noreply, NewState, ?TIMEOUT};
-
-handle_info({tcp_closed, _Socket}, State) ->
- {stop, normal, State};
-
-handle_info({tcp_error, _, Reason}, State) ->
- {stop, Reason, State};
-
-handle_info(timeout, State) ->
- {stop, normal, State};
-
-handle_info(Msg, State = #jc_p{socket=S, trans = T}) ->
- T:send(S, marshal(Msg)),
- {noreply, State, ?TIMEOUT}.
-
-
-%% -----------------------------------------------------------------------------
-%% @private Hande call messages.
-%%
--spec handle_call(term(), {pid(), _}, jc_p()) -> {reply, ok, jc_p()}.
-
-handle_call(Request, _From, State) ->
- lager:warning("~p: unrecognized handle_call request: ~p.",
- [?MODULE, Request]),
- {reply, ok, State}.
-
-
-%% -----------------------------------------------------------------------------
-%% @private Handle cast messages.
-%%
--spec handle_cast(any(), jc_p()) -> {noreply, jc_p()}.
-
-handle_cast(Msg, State) ->
- lager:warning("~p: unexpected cast message: ~p.", [?MODULE, Msg]),
- {noreply, State}.
-
-
-%% -----------------------------------------------------------------------------
-%% @private Terminate server.
-%%
--spec terminate(any(), jc_p()) -> any().
-
-terminate(Reason, _State) ->
- lager:warning("~p: terminated with reason: ~p.", [?MODULE, Reason]),
- ok.
-
-
-%% -----------------------------------------------------------------------------
-%% @private Convert process state when code is changed.
-%%
--spec code_change(term(), jc_p(), any()) -> {ok, jc_p()}.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-
-
-%%% ============================================================================
-%%% Internal functions
-%%% ============================================================================
-
-
-%% -----------------------------------------------------------------------------
-%% @private Top-level handler for socket data, returns a new state.
-%%
--spec handle_data(Data::binary(), OldState::jc_p()) -> NewState::jc_p().
-
-handle_data(Data, #jc_p{trans = T, socket = S, connected = C} = State) ->
- try
- handle(Data, State)
- catch
- throw:{fatal, F} ->
- T:send(S, marshal({error, {protocol_error, F}})),
- self() ! {tcp_closed, S},
- State;
- _:Error ->
- T:send(S, marshal({error, Error})),
- State#jc_p{acc = <<>>,
- size = undefined,
- connected=C}
- end.
-
-
-%% -----------------------------------------------------------------------------
-%% Get Size Header, Get size number of bytes, Execute Command, Repeat. When no
-%% more bytes, save what has been retrieved thus far in the State and return.
-%%
--spec handle(Data::binary(), OldState::jc_p()) -> NewState::jc_p().
-
-handle(<<>>, State) ->
- State;
-
-handle(<<Size:8/integer-unit:8,C/binary>>,#jc_p{size=undefined, acc = <<>>}=S)->
- case byte_size(C) of
- CSize when CSize >= Size ->
- <<Command:Size/binary, Remainder/binary>> = C,
- S2 = execute(Command, S),
- handle(Remainder, S2);
- Less ->
- S#jc_p{size = Size - Less, acc = C}
- end;
-
-handle(Data, #jc_p{size = undefined, acc = Acc} = S) ->
- NewData = <<Acc/binary, Data/binary>>,
- case byte_size(NewData) of
- NSize when NSize >= 8 ->
- <<Size:8/integer-unit:8, Remainder/binary>> = NewData,
- handle(Remainder, S#jc_p{size=Size, acc = <<>>});
- _ ->
- S#jc_p{acc = NewData, size = undefined}
- end;
-
-handle(Data, #jc_p{acc = Acc, size = Size} = S) when size /= undefined->
- case byte_size(Data) of
- BSize when BSize >= Size ->
- <<Seg:Size/binary, Remainder/binary>> = Data,
- Command = <<Acc/binary, Seg/binary>>,
- S2 = execute(Command, S),
- handle(Remainder, S2#jc_p{acc = <<>>, size = undefined});
- Less ->
- S#jc_p{size = Size - Less, acc = <<Acc/binary, Data/binary>>}
- end.
-
-
-%% -----------------------------------------------------------------------------
-%% Execute the command.
-%
-execute(Command, #jc_p{trans=T, socket=S, connected=false} = State)->
- case evaluate(Command) of
- open_session ->
- lager:debug("~p (~p): connected with version: ~p",
- [?MODULE, self(), <<"1.0">>]),
- T:send(S, marshal({version, <<"1.0">>})),
- State#jc_p{connected = true};
- _ ->
- throw({fatal, missing_connect})
- end;
-
-execute(Command, #jc_p{trans=T, socket = S} = State) ->
- case evaluate(Command) of
- close_session ->
- T:send(S, marshal({close, 1.0})),
- self() ! {tcp_closed, S};
- {command, R} ->
- jc_bridge:do(R),
- State;
- _ ->
- throw(bad_command)
- end.
-
-
-% ==============================================================================
-% Utility functions
-% ==============================================================================
-
-
-% ------------------------------------------------------------------------------
-% Given a string representation of a tuple, convert it to an Erlang tuple, make
-% strings binary and determine the cache action implied by the Command.
-%
-evaluate(Command) ->
- try
- {ok,Scanned,_} = erl_scan:string(binary_to_list(<<Command/binary,".">>)),
- {ok,Parsed} = erl_parse:parse_exprs(strings_to_binary(Scanned, [])),
- determine_action(Parsed)
- catch
- _:_ ->
- throw(command_syntax)
- end.
-
-
-determine_action([{tuple,1,[{atom,1,close}]}]) ->
- close_session;
-
-determine_action([{tuple,1,[{atom,1,connect},{tuple,1,
- [{atom,1,version},
- {bin,1,
- [{bin_element,1,
- {string,1,"1.0"},
- default,default}]}]}]}]) ->
- open_session;
-
-determine_action([{tuple,1,_}] = AST) ->
- {value, R, _} = erl_eval:exprs(AST, []),
- {command, R};
-
-determine_action(_) ->
- {error, badarg}.
-
-
-% Marshal the message: make it binary JSON and package it according to the
-% protocol.
-marshal(Message) ->
- Result = try
- to_json(Message)
- catch
- _:_ -> to_json({error, jc_result_to_json})
- end,
- package(Result).
-
-to_json({ok,{H, M}}) ->
-
- jsone:encode([{hits, encode_keys(H)}, {misses, M}],
- [{float_format, [{decimals, 10}, compact]}]);
-
-to_json({ok, [{_,_}|_Vs]=Lst}) ->
- jsone:encode(encode_keys(Lst),
- [{float_format, [{decimals, 10}, compact]}]);
-
-to_json({{active, _}=A,{configured, _}=C}) ->
- jsone:encode([A,C],
- [{float_format, [{decimals, 10}, compact]}]);
-
-to_json({size, Sizes}) ->
- Os = lists:map(fun({A,B,C}) -> {A, [B,C]} end, Sizes),
- jsone:encode(Os,
- [{float_format, [{decimals, 10}, compact]}]);
-
-to_json({uptime, [{up_at, U}, {now, N},{up_time, _UT}]}) ->
- jsone:encode([{uptime, [{up_at, list_to_binary(U)},
- {now, list_to_binary(N)}]}],
- [{float_format, [{decimals, 10}, compact]}]);
-
-to_json({X, Y}) ->
- jsone:encode([{X,Y}],
- [{float_format, [{decimals, 10}, compact]}]);
-
-to_json(M) ->
- jsone:encode(M,
- [{float_format, [{decimals, 10}, compact]}]).
-
-
-% JC Keys can be anytime, but json needs object keys to be strings, so
-% {k,v} needs to be embedded in a json \"key\" : key, \"value\":value
-% object.
-encode_keys(KVList) ->
- lists:map(fun({K,V}) -> {[{key, K},{value, V}]} end,
- KVList).
-
-
-% Make binary pay-load with size header.
-package(Message) ->
- Size = byte_size(Message),
- <<Size:8/integer-unit:8, Message/binary>>.
-
-
-% Walk the scanned list of tokens making strings binary strings
-strings_to_binary([], Acc) ->
- lists:reverse(Acc);
-
-strings_to_binary([{string, _, _}=S|Tl], Acc) ->
- strings_to_binary(Tl, [{'>>',1}, S, {'<<',1}|Acc]);
-
-strings_to_binary([Hd|Tl],Acc) ->
- strings_to_binary(Tl, [Hd|Acc]).
diff --git a/src/jc_sequence.erl b/src/jc_sequence.erl
index 17b9dec..6f9b4f5 100644
--- a/src/jc_sequence.erl
+++ b/src/jc_sequence.erl
@@ -42,7 +42,8 @@
-define(SERVER, ?MODULE).
--record(jc_seq_state, {}).
+-record(jc_seq_state, {singleton :: boolean()}).
+
%%% ============================================================================
@@ -58,7 +59,12 @@
-spec test_set(Map::map(), NewSeq::seq()) -> true | false.
test_set(Map, Seq) ->
- gen_server:call({global, ?MODULE}, {test_set, Map, Seq}, 1000).
+ case global:whereis_name(?SERVER) of
+ undefined ->
+ gen_server:call(?SERVER, {test_set, Map, Seq}, 1000);
+ _Pid ->
+ gen_server:call({global, ?SERVER}, {test_set, Map, Seq}, 1000)
+ end.
%% -----------------------------------------------------------------------------
@@ -78,15 +84,17 @@ start_link() ->
%% -----------------------------------------------------------------------------
%% @private Initialize the server by trying to register under the global name,
-%% jc_sequence.
+%% jc_sequence. If singleton, try to register jc_sequence globally to this
+%% process.
%%
-spec init([]) -> {ok, #jc_seq_state{}}.
init([]) ->
- lager:info("~p: up.", [?MODULE]),
+ IsSingleton = application:get_env(jc, jc_sequence_singleton, true),
+ grab_name(IsSingleton),
+ lager:info("~p: up with Singletong = ~p.", [?MODULE, IsSingleton]),
- grab_name(),
- {ok, #jc_seq_state{}}.
+ {ok, #jc_seq_state{singleton = IsSingleton}}.
%% -----------------------------------------------------------------------------
@@ -122,10 +130,10 @@ handle_cast(Msg, State) ->
%%
-spec handle_info(any(), #jc_seq_state{}) -> {noreply, #jc_seq_state{}}.
-handle_info({'DOWN', _MonitorRef, _Type, Object, Info}, State) ->
- lager:debug("~p: jc_sequence master at ~p went down with ~p.",
- [?SERVER, Object, Info]),
- grab_name(),
+handle_info({'DOWN', _MRef, _Type, Obj, Info}, State) ->
+ lager:debug("~p: jc_sequence master at ~p went down with ~p.",
+ [?SERVER, Obj, Info]),
+ grab_name(State#jc_seq_state.singleton),
{noreply, State};
@@ -159,12 +167,14 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%% ============================================================================
+%% Assuming that jc_sequence is to be a singleton, if the global registry
+%% doesn't have a PID registered for jc_sequence, then claim it; otherwise,
+%% monitor the master.
+%%
-%% -----------------------------------------------------------------------------
-%% If the global registry doesn't have a PID registered for jc_sequence, then
-%% claim it; otherwise, monitor the master.
-
-grab_name() ->
+grab_name(false) ->
+ ok;
+grab_name(true) ->
case global:register_name(?MODULE, self()) of
yes ->
lager:info("~p: master is ~p.", [?MODULE, self()]);
diff --git a/src/jc_store.erl b/src/jc_store.erl
index 4fcbf9b..fe1668f 100644
--- a/src/jc_store.erl
+++ b/src/jc_store.erl
@@ -40,11 +40,13 @@
flush/1,
get/2,
get_map/1, get_map_since/2, key_set/1,
+ map_exists/1,
maps/0,
put/5]).
%% Meta-data API.
--export([up_nodes/0, stats/1]).
+-export([locus/2, up_nodes/0, stats/1]).
+
%% Custom-field indexing API.
-export([indexes/0, indexes/1, create_index/2, start_indexing/2, stop_indexing/2]).
@@ -68,6 +70,20 @@
%% =============================================================================
+%% -----------------------------------------------------------------------------
+%% Given a Key, hash it producing an index into the list of supplied Nodes and
+%% return the indexed node. Used to help avoid transactions by allowing a client
+%% to locate a key-specific node for destructive (write, delete, etc.) operations.
+%% It is faster to ask for a node and then utilize that node's jc_bridge or
+%% Jcache API rather than using transactions.
+%%
+-spec locus(key(), list(node())) -> node().
+
+locus(K, Nodes) ->
+ Bucket = erlang:phash2(K, length(Nodes)) + 1,
+ lists:nth(Bucket, Nodes).
+
+
%% -----------------------------------------------------------------------------
%% Return list of running mnesia nodes.
%%
@@ -105,6 +121,28 @@ stats(_) ->
{error, badarg}.
+
+%%------------------------------------------------------------------------------
+%% @doc Return true if map exists, else false.
+%%
+-spec map_exists(Map::map()) -> [true | false].
+
+map_exists(Map) ->
+ LocInRec = #key_to_value.map,
+ F = fun() ->
+ mnesia:select(key_to_value,
+ [{'$1', [{'==', Map, {element, LocInRec, '$1'}}],
+ [{element, LocInRec,'$1'}]}],
+ 1,
+ read) end,
+ case mnesia:async_dirty(F) of
+ '$end_of_table' ->
+ false;
+ _ ->
+ true
+ end.
+
+
%%------------------------------------------------------------------------------
%% @doc Return a sorted list of all maps currently in the cache.
%%
@@ -130,12 +168,10 @@ maps() ->
-spec clear(map_name()) -> ok.
clear(Map) ->
- F = fun() ->
- Items = mnesia:index_read(key_to_value, Map, #key_to_value.map),
- [mnesia:delete_object(Rec) || Rec <- Items],
- mnesia:delete({seq, Map})
- end,
- mnesia:sync_dirty(F),
+
+ Items = mnesia:index_read(key_to_value, Map, #key_to_value.map),
+ [mnesia:delete_object(Rec) || Rec <- Items],
+ mnesia:delete({seq, Map}),
ok.
diff --git a/test/app.config b/test/app.config
index bc24935..e0fbefb 100644
--- a/test/app.config
+++ b/test/app.config
@@ -2,16 +2,6 @@
%% the jc stanza.
[
- %% SASL config - type of logging not typically used, but might come in handy
- %% log files will be created, but are small and rotated.
- {sasl, [
- {sasl_error_logger, {file, "log/sasl-error.log"}},
- {errlog_type, error},
- {error_logger_mf_dir, "log/sasl"}, % Log directory
- {error_logger_mf_maxbytes, 10485760}, % 10 MB max file size
- {error_logger_mf_maxfiles, 5} % 5 files max
- ]},
-
{mnesia,
[
%% Specifies the maximum number of writes allowed to the transaction log
@@ -31,7 +21,7 @@
{lager,
[
{handlers, [
- {lager_console_backend, info},
+ {lager_console_backend, [{level,info}]},
{lager_file_backend, [{file, "log/error.log"},
{level, error},
{size, 10485760},
diff --git a/test/app2.config b/test/app2.config
index b4b371b..4f94df6 100644
--- a/test/app2.config
+++ b/test/app2.config
@@ -2,16 +2,6 @@
%% the jc stanza.
[
- %% SASL config - type of logging not typically used, but might come in handy
- %% log files will be created, but are small and rotated.
- {sasl, [
- {sasl_error_logger, {file, "log/sasl-error.log"}},
- {errlog_type, error},
- {error_logger_mf_dir, "log/sasl"}, % Log directory
- {error_logger_mf_maxbytes, 10485760}, % 10 MB max file size
- {error_logger_mf_maxfiles, 5} % 5 files max
- ]},
-
{mnesia,
[
%% Specifies the maximum number of writes allowed to the transaction log
@@ -31,7 +21,7 @@
{lager,
[
{handlers, [
- {lager_console_backend, info},
+ {lager_console_backend, [{level, info}]},
{lager_file_backend, [{file, "log/error.log"},
{level, error},
{size, 10485760},
diff --git a/test/jc.coverspec b/test/jc.coverspec
index 7f1f459..ab22f60 100644
--- a/test/jc.coverspec
+++ b/test/jc.coverspec
@@ -30,6 +30,7 @@
jc_bridge,
jc_cluster,
jc_eviction_manager,
+ jc_netsplit,
jc_protocol,
jc_psub,
jc_sequence,
diff --git a/test/jc_SUITE.erl b/test/jc_SUITE.erl
index 898cf35..000a8e3 100644
--- a/test/jc_SUITE.erl
+++ b/test/jc_SUITE.erl
@@ -202,8 +202,8 @@ meta_data_test(_Config) ->
[{auto_index,{records,0},{bytes,_}},
{key_to_value,{records,0},{bytes,_}},
{max_ttl,{records,0},{bytes,_}},
- {ps_client,{records,0},{bytes,_}},
- {ps_sub,{records,0},{bytes,_}},
+ {ps_client,{records,1},{bytes,_}},
+ {ps_sub,{records,1},{bytes,_}},
{schema,{records,10},{bytes,_}},
{seq,{records,0},{bytes,_}},
{stats,{records,2},{bytes,_}},
@@ -224,12 +224,15 @@ meta_data_test(_Config) ->
% put_s should create map if not there
% deleting last item from map should remove it from use
maps_test(_Config) ->
+
+ false = jc:map_exists(bed),
{maps, []} = bridge({maps}),
{ok, 1} = bridge({put, bed, 1, 1}),
{ok, 1} = bridge({put, evs, 1, 1}),
{ok, 2} = bridge({put, evs, 2, 2}),
{ok, 2} = bridge({put_s, trx, 2, 2, 22}),
{maps, [bed, evs, trx]} = bridge({maps}),
+ true = jc:map_exists(bed),
bridge({evict, bed, 1}),
{maps, [evs, trx]} = bridge({maps}),
bridge({evict, evs, 1}),
@@ -388,9 +391,9 @@ put_all_test(_Config)->
{ok, 100} = bridge({put_all, bed, KVs, 2}),
{ok, TestVs} = bridge({values, bed}),
- lists:sort(Vs) == lists:sort(TestVs),
+ true = (lists:sort(Vs) == lists:sort(TestVs)),
{ok, TestKs} = bridge({key_set, bed}),
- lists:sort(Ks) == lists:sort(TestKs),
+ true = (lists:sort(Ks) == lists:sort(TestKs)),
timer:sleep(2100),
{ok, {[], M}} = bridge({get_all, bed, Ks}),
@@ -754,7 +757,8 @@ remove_items_test(_config) ->
{ok,3} = jc:put_all(bed, [{1, one},{2, two},{3, three}]),
{ok,[{1,one}]} = bridge({remove_items, bed, [1, 22]}),
{ok, []} = bridge({remove_items, bed, [1, 22]}),
- {ok, [2,3]} = jc:key_set(bed),
+ {ok, Result} = jc:key_set(bed),
+ [2,3] = lists:sort(Result),
{ok,[{3, three}, {2, two}]} = bridge({remove_items, bed, [2, 3, 3, 4]}),
{records, 0} = jc:map_size(bed),
jc:flush(),
@@ -762,7 +766,8 @@ remove_items_test(_config) ->
{ok,3} = jc_s:put_all(bed, [{1, one},{2, two},{3, three}], 10),
{ok,[{1,one}]} = bridge({remove_items_s, bed, [1, 22], 11}),
{ok, []} = bridge({remove_items_s, bed, [1, 22], 12}),
- {ok, [2,3]} = jc:key_set(bed),
+ {ok, Result} = jc:key_set(bed),
+ true = ([2,3] == lists:sort(Result)),
{error, out_of_seq} = bridge({remove_items_s, bed, [2, 3, 3, 4], 1}),
{ok,[{3, three}, {2, two}]} = bridge({remove_items_s, bed, [2, 3, 3, 4], 111}),
{records, 0} = jc:map_size(bed).
@@ -838,13 +843,19 @@ map_subscribe_test(_Config) ->
jc_bridge ! {self(), {map_subscribe, bed, key, write}},
timer:sleep(100),
This = self(),
- This = mnesia:dirty_first(ps_client),
- [{ps_sub, {map_sub, bed, key, write}, Set}] =
- mnesia:dirty_read(ps_sub, mnesia:dirty_first(ps_sub)),
- sets:is_element(self(), Set),
+ true = lists:member(This, mnesia:dirty_all_keys(ps_client)),
+ A = mnesia:dirty_read(ps_sub, mnesia:dirty_next(ps_sub, mnesia:dirty_first(ps_sub))),
+ case A of
+ [{ps_sub, {map_sub, bed, key, write}, Set}] ->
+ sets:is_element(self(), Set);
+ _ ->
+ [{ps_sub, {map_sub, bed, key, write}, Set2}] =
+ mnesia:dirty_read(ps_sub, mnesia:dirty_first(ps_sub)),
+ sets:is_element(self(), Set2)
+ end,
- 1 = jc_psub:client_count(),
- 2 = jc_psub:load(),
+ 2 = jc_psub:client_count(),
+ 4 = jc_psub:load(),
jc_bridge ! {self(), {put, bed, key, 1}},
jc_bridge ! {self(), {put, bed, 1, 1}},
@@ -870,11 +881,13 @@ map_subscribe_test(_Config) ->
jc_psub ! {evict_deadbeats, 120000},
timer:sleep(200),
- '$end_of_table' = mnesia:dirty_first(ps_sub),
- '$end_of_table' = mnesia:dirty_first(ps_client),
-
- 0 = jc_psub:load(),
- 0 = jc_psub:client_count(),
+ 1 = length(mnesia:dirty_all_keys(ps_client)),
+ 1 = length(mnesia:dirty_all_keys(ps_sub)),
+
+ 2 = jc_psub:load(),
+ 1 = jc_psub:client_count(),
+
+
flush(),
jc:put(bed, otherkey, 1),
@@ -887,7 +900,9 @@ map_subscribe_test(_Config) ->
jc_bridge ! {self(), {map_subscribe, evs, any, any}},
timer:sleep(600),
- First = mnesia:dirty_first(ps_sub),
+ Zeroth = mnesia:dirty_first(ps_sub),
+ First = mnesia:dirty_next(ps_sub, mnesia:dirty_first(ps_sub)),
+
Second = mnesia:dirty_next(ps_sub, First),
@@ -908,7 +923,8 @@ map_subscribe_test(_Config) ->
catch
_:_ ->
[{ps_sub, {map_sub, bed, key, delete}, Y2}] =
- mnesia:dirty_read(ps_sub, First),
+ mnesia:dirty_read(ps_sub, Zeroth),
+
sets:is_element(self(), Y2)
end,
diff --git a/test/protocol_SUITE.erl b/test/protocol_SUITE.erl
deleted file mode 100644
index 0b4212a..0000000
--- a/test/protocol_SUITE.erl
+++ /dev/null
@@ -1,219 +0,0 @@
-
-
-
--module(protocol_SUITE).
-
--include("../include/records.hrl").
--inclde_lib("common_test/include/ct.hrl").
-
-
--export([all/0,
- init_per_suite/1,
- init_per_testcase/2,
- end_per_testcase/2,
- end_per_suite/1]).
-
--export([put_get_test/1, error_test/1, quant_test/1]).
-
-
-all() ->
- [ put_get_test, error_test, quant_test
-].
-
-init_per_suite(Config) ->
- net_kernel:start(['jc1@127.0.0.1', longnames]),
- application:set_env(jc, cache_nodes,
- ['jc1@127.0.0.1','jc2@127.0.0.1', 'jc3@127.0.0.1']),
- application:set_env(jc, max_ttl_maps, [{testmap, 100}]),
- application:set_env(jc, indexes, [{bed, "identifier"},
- {bed, "menu.2.id.'2'"},
- {cow, "cow.2.id.'2'"}]),
- application:set_env(jc, analyze_freq, {5, 5}),
- application:set_env(jc, protocol_port, 5555),
-
- application:ensure_all_started(jc),
- lager:set_loglevel(lager_console_backend, info),
- Config.
-
-
-init_per_testcase(_, Config) ->
- code:load_file(t),
- t:start_link(),
- <<"{\"version\":\"1.0\"}">> = get_result(),
-
- {maps, Maps} = jc:maps(),
- [jc:clear(Map) || Map <- Maps],
- Config.
-
-end_per_testcase(_, Config) ->
- jc:flush(silent),
- t:send("{close}"),
- get_all_results(),
- Config.
-
-end_per_suite(Config) ->
- jc:stop(),
- Config.
-
-
-error_test(_Config) ->
- t:send("{put, bed, \"1\", 1"),
- <<"{\"error\":\"command_syntax\"}">> = get_result(),
-
- t:send("{what, bed, \"1\", 1}"),
- <<"{\"error\":\"unrecognized_command\"}">> = get_result().
-
-
-
-put_get_test(_Config) ->
-
- t:send("{put, bed, \"1\", 1}"),
- <<"{\"ok\":\"1\"}">> = get_result(),
-
- t:send("{put, bed, \"3\", 3.3}"),
- <<"{\"ok\":\"3\"}">>= get_result(),
-
- t:send("{put, bed, \"4\", [1,2,3]}"),
- <<"{\"ok\":\"4\"}">> = get_result(),
-
- J1 = "\"{\\\"first\\\":{\\\"second\\\":1, \\\"third\\\":true},\\\"second\\\":true}\"",
- J2 = "\"{\\\"first\\\":{\\\"second\\\":2, \\\"third\\\":false},\\\"second\\\":true}\"",
-
- t:send("{put, json, \"1\", " ++ J1 ++ "}"),
- <<"{\"ok\":\"1\"}">> = get_result(),
-
- t:send("{put, json, \"2\", " ++ J2 ++ "}"),
- <<"{\"ok\":\"2\"}">> = get_result(),
-
- t:send("{get, bed, \"1\"}"),
- <<"{\"ok\":1}">> = get_result(),
-
- t:send("{get, bed, \"3\"}"),
- <<"{\"ok\":3.3}">> = get_result(),
-
- t:send("{get, bed, \"4\"}"),
- <<"{\"ok\":[1,2,3]}">> = get_result(),
-
-
- t:send("{put_all, evs, [{1,1}, {2,\"2\"}, {\"3\",3.3}]}"),
- <<"{\"ok\":3}">> = get_result(),
-
- t:send("{get_all, evs, [1, 2, 4]}"),
- <<"{\"hits\":[{\"key\":2,\"value\":\"2\"},{\"key\":1,\"value\":1}],\"misses\":[4]}">> = get_result(),
-
- t:send("{key_set, bed}"),
- <<"{\"ok\":[\"1\",\"3\",\"4\"]}">> = get_result(),
-
- t:send("{values, bed}"),
- true =
- case lists:sort(maps:get(<<"ok">>,jsone:decode(get_result()))) of
- [1,3.3,[1,2,3]] -> true
- end,
-
-
- t:send("{values_match, json, \"first.second=1\"}"),
- R = list_to_binary("[{\"key\":\"1\",\"value\":" ++ J1 ++ "}]"),
- R = get_result(),
-
- t:send("{values_match, json, \"second=true\"}"),
-
- R2 = list_to_binary("[{\"key\":\"1\",\"value\":" ++ J1 ++ "},{\"key\":\"2\",\"value\":" ++ J2 ++ "}]"),
- R2Alt = list_to_binary("[{\"key\":\"2\",\"value\":" ++ J2 ++ "},{\"key\":\"1\",\"value\":" ++ J1 ++ "}]"),
-
- true = case get_result() of
- R2 -> true;
- R2Alt -> true;
- _ -> false
- end,
-
-
- t:send("{put, bed, \"1\", 1}"),
- <<"{\"ok\":\"1\"}">> = get_result(),
-
- t:send("{evict, evs, 1}"),
- <<"\"ok\"">> = get_result(),
-
- t:send("{map_size, evs}"),
- <<"{\"records\":2}">> = get_result(),
-
- t:send("{map_size, json}"),
- <<"{\"records\":2}">> = get_result(),
-
- t:send("{evict_all_match, \"first.second=1\"}"),
- <<"\"ok\"">> = get_result(),
-
- t:send("{map_size, json}"),
- <<"{\"records\":1}">> = get_result(),
-
- t:send("{contains_key, evs, 2}"),
- <<"true">> = get_result(),
-
- t:send("{contains_key, evs, 123}"),
- <<"false">> = get_result(),
-
- t:send("{cache_nodes}"),
- <<"{\"active\":[\"jc1@127.0.0.1\"],\"configured\":[\"jc1@127.0.0.1\",\"jc2@127.0.0.1\",\"jc3@127.0.0.1\"]}">>
- = get_result(),
-
- t:send("{flush}"),
- <<"\"ok\"">> = get_result(),
-
- t:send("{cache_size}"),
- <<"{\"key_to_value\":", _/binary >> = get_result(),
-
- t:send("{map_size, bed}"),
- <<"{\"records\":0}">> = get_result(),
-
- t:send("{put_s, bed, \"1\", 1, 20}"),
- <<"{\"ok\":\"1\"}">> = get_result(),
-
- t:send("{put_s, evs, \"1\", 1, 30}"),
- <<"{\"ok\":\"1\"}">> = get_result(),
-
- t:send("{maps}"),
- <<"{\"maps\":[\"bed\",\"evs\"]}">> = get_result(),
-
- t:send("{up}"),
- <<"{\"uptime\":", _/binary>> = get_result(),
-
- t:send("{sequence}"),
- <<"{\"bed\":20,\"evs\":30}">> = get_result(),
-
- t:send("{sequence, map}"),
- <<"0">> = get_result(),
-
- t:send("{set_max_ttl, bed, 1000}"),
- <<"\"ok\"">> = get_result(),
-
- t:send("{get_max_ttls}"),
- <<"{\"bed\":1000,\"testmap\":100}">> = get_result().
-
-
-
-quant_test(_config) ->
- Limit = 5000,
- [ t:send("{put, bed, \"" ++ integer_to_list(X) ++ "\", " ++ integer_to_list(X) ++ "}") ||
- X <- lists:seq(1, Limit)],
- R = get_all_results(),
- Limit = length(R) - 1,
- t:send("{map_size, bed}"),
- <<"{\"records\":5000}">> = get_result().
-
-
-get_all_results() ->
- receive X ->
- [X|get_all_results()]
- after
- 1000 ->
- [ok]
- end.
-
-get_result() ->
- receive
- X -> X
- after
- 400 ->
- error
- end.
-
-