Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:20
jc
jc-1.2.1-git.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File jc-1.2.1-git.patch of Package jc
diff --git a/.gitignore b/.gitignore index d9f4a4a..dc2c0ba 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,11 @@ rel/jc target *.gz _build +test/ct_run.ct@c4b301cb0a29.2017-10-09_19.10.46 +test/*.html +test/*.css +test/*.js +variables-ct@* +test/rebar.lock +test/ct_run.ct* +ct_log_cache diff --git a/.travis.yml b/.travis.yml index 433cba9..ce56cab 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,6 @@ language: erlang otp_release: - - 17.0 - - 18.0 - - 19.0 + - 22.0 script: "sh -ex .travis_build.sh" \ No newline at end of file diff --git a/README.md b/README.md index 02fc1e7..4dc198f 100644 --- a/README.md +++ b/README.md @@ -1,37 +1,60 @@ JC ==== -##Erlang, Distributable, In-Memory Cache +## Erlang, Distributable, In-Memory Cache -### Featruing: Pub/Sub, JSON-query, consistency support, and a simple, TCP interop. protocol. +### Featuring: Pub/Sub, JSON-query, and mechanisms to support consistency without transactoins. [![Build Status](https://travis-ci.org/jr0senblum/jc.svg?branch=master)](https://travis-ci.org/jr0senblum/jc) [![hex.pm version](https://img.shields.io/hexpm/v/jc.svg)](https://hex.pm/packages/jc) -###Features +### Features * Cache entries are Map, Key, Value, [TTL], [Sequence] * Maps represent a name-space for Keys - similar to the notion of 'bucket' in other caching systems * Maps, Keys and Values can be any Erlang term * TTL is time-to-live in seconds -* Consistency assist through sequence numbers: An alternative API - allows for a sequence-number parameter on the put/x, evict/x, - match/x and remove/x operations. Operations whose sequence - number is lower than the current, per-map max are disallowed - thereby ensuring, for example, that stale puts do not - overwrite newer ones due to the newer one beating the stale - ones to the cache. -* JSON query support - * Query by JSON: When Values are JSON, evict_match/2, +* Consistency assist + * Client-Side Sequence Number: An alternative API allows for a sequence-number + parameter on the put/x, evict/x, match/x and remove/x operations. Operations + whose sequence number is lower than the current, per-map max are disallowed + thereby ensuring, for example, that an old delete that shows up after a newer update + does not inappropriately evict the newer update. + * Node of Responsibility: A key-specific node can be identified for destructive + operations (put, evict, etc.) thereby preserving eventual consistency without transactions. + * jc_store:locus/2 takes a Key and a list of nodes and returns the node of + responsibility or the given key. + * jc_bridge accepts a message {From, {locus, Key}} and returns the node of + responsibility. Because jc\_bridge knows which nodes are available, the client + is relieved from keeping track of up-nodes, which is necessary to + caclulate the correct node of responsiblility. For example: + + ~~~~ Erlang + {jc_bridge, Any_Cache_Node} ! {self(), {locus, Key}}, + NOR = receive + {error, _} -> error + Node -> Node + after + 1000 -> error + end, + {jc_bridge, NOR} ! {self, {put, benchtest, 10203, "{\"Key\":\"Value\"} + ~~~~ + Because data is everywhere, a lookup will always find a key irrespective of + the node of responsibility. The way to best use this is to configure jc_sequence + to not be a singleton and then use the Node of Responsibility feature to chose + the node to do the inserts/deletes using the jc_s API. + +* JSON query support + * Query by JSON: When Values are JSON, evict_match/2, evict_all_match/1 and values_match/2 will search or evict keys whose JSON value, at a location specificed by a java-style, dot-path string, equals the given value. That is, jc:values_match(bed, "id.type=3") would return all values, in the given map (bed), where that value was a JSON object, id, with a "type":3 at its top-level. - * Ad-hoc, index support: In order to support faster + * Ad-hoc, index support: In order to support faster operations, (2-3 orders of magnitude), each map can have up to four, dot-path, strings configured (or added at run-time) for which jc will provide index support. @@ -51,17 +74,11 @@ JC * Clients can create and subscribe to arbitrary 'topics' and broadcast arbitrary messages under those topic names * Clients can subscribe to node-up and node-down events -* Interopability - * Binary string over TCP returning JSON - * Bridge process that accepts messages from a client indicating - cache operations, executes the cache operations and returns the - results to the client. This has been used with JInterface to - interoperate with CLOJURE and Java clients * Fine-grained logging via Lager -###Cache Functions (jc) +### Cache Functions (jc) * Create * put(Map, Key, Value, [TTLSecs]) -> {ok, Key} | {error, badarg} * put_all(Map, [{K,V},{K,V},...], [TTLSecs]) -> {ok, CountOfSuccessfulPuts} | @@ -83,8 +100,11 @@ JC * clear(Map) -> ok * flush() -> ok * flush(silent) -> ok, Does not send alerts to subscribers + * Predicates * contains_key(Map, Key) -> true | false. + * map_exists(Map) -> true | false. + * Meta * cache_nodes() -> {{active, [Node1,... ]}, {configured, [Node1,... ]}} * cache_size() -> {size, [{TableName, RecordCnt, Words}],...} @@ -114,12 +134,12 @@ Identical to the Create and Evict family of functions of the jc module -###Eviction Manager Functions (jc_eviction_manager) +### Eviction Manager Functions (jc_eviction_manager) * set_max_ttl(Map, Secs) -> ok | {error, badarg} * get_max_ttls() -> [{Map, Secs}, ...] -###Pub/Sub Functions (jc_psub) +### Pub/Sub Functions (jc_psub) * map_subscribe(Pid, Map, Key|any, write|delete|any) -> ok | {error, badarg} * map_unsubscribe(Pid, Map, Key|any, write|delete|any) -> ok | {error, badarg} * client receives @@ -134,15 +154,14 @@ Identical to the Create and Evict family of functions of the jc module * Broadcasts Value to all subscribers of Topic * topic_subscribe(Pid, jc_node_events, any) -> ok - * subscribtes the user - to node up and node down events: + * subscribes the user to node up and node down events: `{jc_node_events, {nodedown, DownedNode, [ActiveNodes],[ConfiguredNodes]}}` `{jc_node_events, {nodeup, UppedNode, [ActiveNodes],[ConfiguredNodes]}}` -###Indexing Functions (jc_store) +### Indexing Functions (jc_store) * start_indexing(Map, Path={bed,"menu.id"}) -> ok | {error, no_indexes_available} | {error, Term} @@ -152,7 +171,7 @@ Identical to the Create and Evict family of functions of the jc module * indexes() -> {indexes, [{{Map, Path}, Position},...]} for all indexes -###Interoperability: Bridge (jc_bridge) +### Interoperability: Bridge (jc_bridge) * All functions from the jc, jc_s, jc_eviction_manager, jc_psub and jc_store are supported and are of the form: @@ -163,6 +182,9 @@ Identical to the Create and Evict family of functions of the jc module `jc_bridge ! {Pid, {put, Map, Key, Value}}` * Additionally, + `{From, locus, Key}} -> node()` Calls jc_store:locus/2 with the list of active + cache nodes and returns the node of record. + {From, {node_topic_sub}} -> ok | {error, badarg}, client will recieve: @@ -173,87 +195,25 @@ Identical to the Create and Evict family of functions of the jc module `{jc_node_events, {nodeup, UppedNode, [ActiveNodes],[ConfiguredNodes]}}` - {From, {node_topic_unsub}} -> ok. - - -### Interoperability: Socket Protocol - EXPIREMENTAL -Binary-encoded, string protocol used to provide socket-based -interoperability with JC. - -All messages to JC are string representations of a tuple. All -messages form the caching system to the client are JSON - -The protocol defines three message types: CONNECT, CLOSE and COMMAND all -of which are binary strings consisting of an 8-byte size followed by the -actual command messages. - -Responses are also binary strings with an 8-byte size prefix. - -The CONNECT command initiates a session, - - M = <<"{connect,{version,\"1.0\"}}">> - - Size is 25, so the CONNECT message is: - - <<25:8, M/binary>> = - <<25,40,58,99,111,110,110,101,99,116,32,123,58,118,101, - 114,115,105,111,110,32,49,46,48,125,41>> - -The server will respond to a CONNECT command with either an error or -the encoded version of {"version":" "1.0"} - - <<15:8, <<"{\"version\":\"1.0\"}">> = - <<15,123,34,118,101,114,115,105,111,110,34,58,49,46,48,125>> - -The CLOSE command closes the socket, ending the session - - M = <<"{close}">> - - Size is 7 so the CLOSE message is: - <<7,123,99,108,111,115,101,125>> - - -COMMAND messages are string versions of the tuple-messages, which -jc_bridge uses, without the self() parameter. For example - - {self(), {put, Map, Key, Value}} becomes - {put, Map, Key, Value} - -The return will be an encoded version of a JSON string. A client session -might look as follows: - - client:send("{put, evs, \"1\", \"{\\\"value:\\\":true}\"}") - <<"{\"ok\":\"1\"}">> - - client:send("{get, evs, \"1\"}"), - <<"{\"ok\":"{\\\"value\\\":true}\"}">> - - client:send("{put, evs, 1, \"{\\\"value:\\\":true}\"}") - <<"{\"ok\":1}">> - - client:send("{get, evs, 1}"), - <<"{\"ok\":"{\\\"value\\\":true}\"}">> - + `{From, {node_topic_unsub}} -> ok`. -###Configuration +### Configuration * Application configuration is in sys.config which is heavily commented * Cookie, node-name and auto-restart of VM controlled by vm.args -###Application Modules +### Application Modules * jc_cluster * Simple, mnesia-based, cluster creation and management * jc, jc_s, jc_store, jc_eviction_manager * Caching operations, Key-specific and Map-level TTLs * jc_sequence - * Singleton service enforcing strictly monotonic sequence + * [optionally] Singleton service enforcing strictly monotonic sequence numbers on jc_s operations * jc_analyzer * Analysis and indexing inititation of JSON query strings -* jc_protocol - * Socket processing of messages and Erlang -> JSON * jc_psub: * Pub / Sub of cache write and delete events * On-demand, ad-hoc topic events @@ -266,7 +226,7 @@ might look as follows: * Looks for evidence of node dis/apperation and implements a recovery strategy -###Net Split/Join Strategy +### Net Split/Join Strategy Mnesia does not merge on its own when a node joins (returns) to a mesh of nodes. There are two situations where this is relevant: @@ -287,9 +247,9 @@ Given this ClusterId, we have the following strategy: 3. _Nodeup_ Whenever a Node appears, an arbitary Node ensures that any Nodes that report a different ClusterId (different than the arbitrary Node's ClusterId) are killed to be restarted by the hearbeat application. If any Nodes required restarting, the entire - cache is flushed. + cache is flushed or not per policy in config.sys. -###Build Instructions +### Build Instructions * Ensure that Erlang 17 or higher is installed * Get the Source Code from Stash @@ -306,7 +266,7 @@ Given this ClusterId, we have the following strategy: `[root@db01] ./rebar3 as prod release` -###Documentation +### Documentation `[root@dbo1] ./rebar3 edoc` diff --git a/config/sys.config b/config/sys.config index 34dbd78..6b365c5 100644 --- a/config/sys.config +++ b/config/sys.config @@ -1,69 +1,55 @@ -%% Typically the only line that is customer specific is the cache_nodes lines in +%% Typically the only line that are customer specific are the cache_nodes lines in %% the jc stanza. [ - %% SASL config - type of logging not typically used, but might come in handy - %% log files will be created, but are small and rotated. - {sasl, [ - {sasl_error_logger, {file, "log/sasl-error.log"}}, - {errlog_type, error}, - {error_logger_mf_dir, "log/sasl"}, % Log directory - {error_logger_mf_maxbytes, 10485760}, % 10 MB max file size - {error_logger_mf_maxfiles, 5} % 5 files max - ]}, - {mnesia, [ %% Specifies the maximum number of writes allowed to the transaction log - %% before a new dump of the log is performed. The higer the more ram is - %% used but low numbers may result in mnesia not being able to keep up. + %% before a new dump of the log is performed. The higer the more RAM is + %% used, but low numbers may result in mnesia not being able to keep up. %% Default is 100 {dump_log_write_threshold, 50000}, - %% Must be ram so that schema is diskless. This allows for nodes to come in - %% and out of the cluster in any order without worying about conflicting + %% MUST be RAM so that schema is diskless. This allows for nodes to come + %% in and out of the cluster in any order without worying about conflicting %% masters. {schema_location, ram} - ] - }, + ]}, {lager, [ + %% Lager back-end handlers. Logs are small and rotated by default. {handlers, [ - {lager_console_backend, info}, - {lager_file_backend, [{file, "log/error.log"}, - {level, error}, - {size, 10485760}, - {date, "$D0"}, - {count, 5}]}, -%% Uncomment for file-based debug log -%% {lager_file_backend, [{file, "log/debug.log"}, -%% {level, debug}, -%% {size, 10485760}, -%% {date, "$D0"}, -%% {count, 5}]}, + {lager_console_backend, [{level, info}]}, + %% Uncomment for file-based debug log + %% {lager_file_backend, [{file, "log/debug.log"}, + %% {level, debug}, + %% {size, 10485760}, + %% {date, "$D0"}, + %% {count, 5}]}, {lager_file_backend, [{file, "log/info.log"}, {level, info}, {size, 10485760}, {date, "$D0"}, {count, 5}]} ]}, - %% We are not racist and do not want colors on the command line. {colored, true} ]}, {jc, [ %% Cache nodes in any order. - {cache_nodes, ['jc1@127.0.0.1']}, + {cache_nodes, ['jc1@127.0.0.1', 'jc2@127.0.0.1']}, %% How long to wait for mnesia tables to come up when adding a node to the %% cluster. - {table_wait_ms, 2000}, + {table_wait_ms, 20000}, %% At what interval to run the process that looks for clients with no - %% subscriptions and remove them from the subscriber (ps_client) tables. + %% subscriptions and removes them from the subscriber (ps_client) tables. + %% This is a safety-net activity and is used to remove subscriptions that + %% failed to be removed due to a failure of somesort. {evict_deadbeats_ms, 3600000}, %% How often to run the process that evicts records that are older than the @@ -71,17 +57,26 @@ {max_ttl_job_secs, 180}, %% Possibly empty list indicating max_ttl for records in the map. Format is - %% {max_ttl_maps, [{Map1, Secs1}]}, - {max_ttl_maps, [{testmap, 100}]}, + %% {max_ttl_maps, [{Map1, Secs1}, ..., {MapN, SecsN}]}, + %% {max_ttl_maps, [{testmap, 100}]}, - %% Initial json values upon which to index + %% Initial JSON values upon which to index. In a path, the number 2 indicates + %% the second whatever, while the '2' indicates the string 2 in the path. %% {indexes, [{bed, "identifier"}, {bed, "menu.2.id.'2'"}]} - %% Frequency needed to warrant indexing {freq, Time_secs} + %% Frequency needed to see a particular JSON query before indexing + %% {freq, Time_secs} {analyze_freq, {5, 5}}, - %% Port for the Socket protocol listener - {protocol_port, 5555} + %% When a node appears after a net-split, some nodes are restarted. If the + %% survivors should flush their contents then this should be true, else false. + {should_flush, false}, + + %% Should jc_sequence be a singleton or local at each cache node. If + %% destructive operations happen at key-specific nodes, than no need; + %% otherwise yes. + {jc_sequence_singleton, true} + ]} diff --git a/config/vm.args b/config/vm.args index 6cf0656..6249f28 100644 --- a/config/vm.args +++ b/config/vm.args @@ -1,6 +1,6 @@ ## Name of the node ## comment out for windows, node name comes from jc.cmd --name jc@127.0.0.1 +-name jc2@127.0.0.1 ## Cookie for distributed erlang -setcookie somecookie diff --git a/doc/overview.edoc b/doc/overview.edoc index c6bdd0a..4767573 100644 --- a/doc/overview.edoc +++ b/doc/overview.edoc @@ -240,9 +240,34 @@ client:send("{get, evs, \"1\"}"), <li>Server that acts as a proxy between an external process and jc functionality</li> </ul> + +=== Net Split/Join Strategy === + +Mnesia does not merge on its own when a node joins (returns) to a mesh of nodes. +There are two situations where this is relevant: + +* j_cache nodes start in a disconnected state so more than one initiates a new +cluster and then, subsequently, those nodes join into one cluster; +* A node drops out of the cluster due to some network glitch and then rejoins. + +To handle these situations, whenever a cluster is created by a Node (node@123.45.67, +for example), it creates a ClusterId - its Node name (node@123.45.67), for that cluster. + +Given this ClusterId, we have the following strategy: + +1. _Cluster Creation_: creates an initial ClusterId; +2. _Nodedown_: If the Node that created the cluster dissapears, a surviving Node changes the + ClusterId such that ClusterId is now this new Node's name. In the case of a + disconnected newtwork, one of the islands will have the original ClusterId Node + dissapear, and it will create a new one as described. +3. _Nodeup_ Whenever a Node appears, an arbitary Node ensures that any Nodes that report + a different ClusterId (different than the arbitrary Node's ClusterId) are killed to be + restarted by the hearbeat application. If any Nodes required restarting, the entire + cache is flushed or not per policy in config.sys. + === Build Instructions === <ul> -<li>Ensure that Erlang 17 or higher is installed</li> +<li>Ensure that Erlang 20 or higher is installed</li> <li>Get the Source Code from Stash</li> </ul> <p><code>[root@db01] git clone https://github.com/jr0senblum/jc.git</code></p> diff --git a/include/records.hrl b/include/records.hrl index 0e5398e..f3e094b 100644 --- a/include/records.hrl +++ b/include/records.hrl @@ -1,22 +1,21 @@ % Types -type seconds() :: non_neg_integer(). +-type ttl() :: seconds(). +-type time_stamp() :: seconds(). -type map_name() :: any(). -type key() :: any(). -type value() :: any(). --type ttl() :: seconds(). + -type rec_ref() :: reference(). --type time_stamp() :: seconds(). -% Key_to_value - an ordered_set table whose key is {key, map}. Ref is used by -% jc_eviction manager as the key of the cache item to evict. i1 - i4 are -% fields that can be used to hold values pulled from a json value to support a -% querry-select feature (see README and jc_store:start_indexing/2 for more. Seq -% is an integer supplied by the client that, if provided, is expected to be -% strictly monotinic. If it is not, the put with the non monotonic value will -% be evicted and the old one re-inserted. -% + +% Key_to_value - an ordered_set table whose key is {key, map_name}. Ref is used +% by jc_eviction manager as the key of the cache item to evict. i1 - i4 are +% fields that can be used to hold values pulled from a json value to support an +% indexed querry-select feature (see README and jc_store:start_indexing/2 for +% more). -record (key_to_value, {jc_key :: {key(), map_name()}, map :: map_name(), @@ -35,14 +34,19 @@ -type key_to_value() :: #key_to_value{}. +% Seq_no is an integer supplied by the client that, if provided, MUST be +% strictly monotinic and is used as a sequence number to ensure that a stale +% operation doesn't make it to jcache after the fact and clobber a more recent +% put or evict operation. -record(seq, {map :: map_name(), seq_no :: jc_sequence:seq() }). -% Holds information about json-path's that will be the target of a query-sellect -% for a given map. Position indicates which column (i1-i4) in key_to_value to -% use. + +% Defines the index for a given map and JSON path. Used for query-selects and +% evicts. Position indicates which column (i1-i4) in key_to_value to store the +% index. -record (to_index, {map_path :: {map_name(), tuple()}, map_name :: map_name(), @@ -50,6 +54,8 @@ }). +% Record that keeps track of the accounting around a JSON query as part of +% determining whether an index should be initiatied. -record (auto_index, {map_path :: {map_name(), tuple()} | '_', count :: non_neg_integer() | '_', @@ -86,7 +92,6 @@ }). - % Jc_psub records. Subscription patterns and the set of PIDS subscribed to those % patterns. -record (ps_sub, @@ -94,6 +99,7 @@ clients = sets:new() :: sets:set() }). + % Ps_client records. Unique processes that are subscribers, includings what type % of mechanism is used to monitor the client - link to the Pid or monitor the % node. diff --git a/rebar.config b/rebar.config index 4706976..3fb796b 100644 --- a/rebar.config +++ b/rebar.config @@ -11,9 +11,8 @@ {deps, [ {jwalk, "1.1.0"}, - {jsone, "1.2.0"}, - {lager, "3.2.1"}, - {ranch, "1.1.0"} + {jsone, {git, "git://github.com/sile/jsone", {branch, "otp21-rc1"}}}, + {lager, "3.7.0"} ]}. @@ -41,7 +40,6 @@ ['jc', kernel, stdlib, - sasl, inets, {observer,load}, {wx, load}, diff --git a/rebar.lock b/rebar.lock deleted file mode 100644 index 69812fa..0000000 --- a/rebar.lock +++ /dev/null @@ -1,5 +0,0 @@ -[{<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.7">>},1}, - {<<"jsone">>,{pkg,<<"jsone">>,<<"1.2.0">>},0}, - {<<"jwalk">>,{pkg,<<"jwalk">>,<<"1.1.0">>},0}, - {<<"lager">>,{pkg,<<"lager">>,<<"3.0.1">>},0}, - {<<"ranch">>,{pkg,<<"ranch">>,<<"1.1.0">>},0}]. diff --git a/rebar3 b/rebar3 index f49bd25..dc95a3a 100755 Binary files a/rebar3 and b/rebar3 differ diff --git a/src/jc.app.src b/src/jc.app.src index 7541d38..0f421a5 100644 --- a/src/jc.app.src +++ b/src/jc.app.src @@ -14,11 +14,9 @@ {applications, [ kernel, stdlib, - sasl, stdlib, syntax_tools, compiler, - ranch, lager, jsone, jwalk @@ -33,7 +31,6 @@ jc_cluster, jc_eviction_manager, jc_netsplit, - jc_protocol, jc_psub, jc_s, jc_sequence, diff --git a/src/jc.erl b/src/jc.erl index abb791d..f43373e 100644 --- a/src/jc.erl +++ b/src/jc.erl @@ -1,14 +1,15 @@ %%% ---------------------------------------------------------------------------- %%% @author Jim Rosenblum -%%% @copyright (C) 2011 - 2015, Jim Rosenblum +%%% @copyright (C) 2011 - 2017, Jim Rosenblum %%% @doc This module wraps the mnesia-interacting, lower-level functions %%% implemented in {@link jc_store. jc_store} to provide a public, DIRTY, %%% set of opperations. {@link jc_s. jc_s} provides functions that take a %%% sequence parameter to better support serilization (consistency) without. %%% resorting to transactions. %%% -%%% JC can be called directly by Erlang clients; or, +%%% The jc module can be called directly by Erlang clients; or, %%% Java node -> JInterface -> {@link jc_bridge. jc_bridge} -> jc; or, +%%% experimentally, %%% Application -> TPC/IP -> {@link jc_protocol. jc_protocol} -> jc %%% %%% @version {@version} @@ -29,17 +30,22 @@ flush/0, flush/1, remove_items/2]). - % Get Functions --export([contains_key/2, - get/2, +-export([get/2, get_all/2, key_set/1, values/1, values_match/2]). -% CACHE META-INFO SUPPORT --export([cache_nodes/0, cache_size/0, map_size/1, maps/0, up/0, stop/0]). +% Predicates +-export([contains_key/2, + map_exists/1]). + +% Control +-export([stop/0]). + +% Cache Meta-data support +-export([cache_nodes/0, cache_size/0, map_size/1, maps/0, up/0]). % Used by jc_s for evict_match -export([fun_match/3]). @@ -47,18 +53,18 @@ % Used by eviction manager to evict an entry based on a reference -export ([delete_record_by_ref/1]). - % definitions of global records and types. -include("../include/records.hrl"). +% ttl is either infinity (0) or an integer > 0. -define(INFINITY, 0). -define(VALID(X), is_integer(X) andalso (X >= 0)). %% ============================================================================= -%% Meta data API +%% Cache control %% ============================================================================= @@ -75,6 +81,12 @@ stop()-> ok. + +%% ============================================================================= +%% Meta data API +%% ============================================================================= + + %% ----------------------------------------------------------------------------- %% @doc Return a sorted list of all maps currently in the cache. %% @@ -136,7 +148,7 @@ cache_nodes() -> Configured = application:get_env(jc, cache_nodes,[]), MnesiaUp = jc_store:up_nodes(), Running = [N || N <- MnesiaUp, - undefined /= rpc:call(N, erlang, whereis, [jc_bridge], 1000)], + is_pid(rpc:call(N, erlang, whereis, [jc_bridge], 1000))], {{active, lists:sort(Running)}, {configured, lists:sort(Configured)}}. @@ -213,7 +225,8 @@ put_all(_m, _K, _T) -> clear(Map) -> lager:debug("~p: clear map ~p.", [?MODULE, Map]), - jc_store:clear(Map), + F = fun() -> jc_store:clear(Map) end, + trans_execute(F), ok. @@ -357,21 +370,6 @@ remove_items(Map, Keys)-> %% ============================================================================= -%% ----------------------------------------------------------------------------- -%% @doc Return true if the {@link key(). Key} is in the {@link map_name()}, -%% else false. -%% --spec contains_key(Map::map_name(), Key::key()) -> true | false. - -contains_key(Map, Key) -> - case get(Map, Key) of - {ok, _} -> - true; - _ -> - false - end. - - %% ----------------------------------------------------------------------------- %% @doc Retrieve the data associated with Key. %% % @@ -467,6 +465,37 @@ fun_match(Map, Criteria, Fun) -> +%% ============================================================================= +%% Predicate API +%% ============================================================================= + + +%% ----------------------------------------------------------------------------- +%% @doc Return true if the {@link key(). Key} is in the {@link map_name()}, +%% else false. +%% +-spec contains_key(Map::map_name(), Key::key()) -> true | false. + +contains_key(Map, Key) -> + case get(Map, Key) of + {ok, _} -> + true; + _ -> + false + end. + + +%% ----------------------------------------------------------------------------- +%% @doc Return true if the given {@map mpa(). Map} exists, else false. +%% +-spec map_exists(Map::map_name()) -> true | false. + +map_exists(Map) -> + F = fun() -> jc_store:map_exists(Map) end, + trans_execute(F). + + + %% ============================================================================= %% Utility functions %% ============================================================================= @@ -481,7 +510,7 @@ fun_match(Map, Criteria, Fun) -> trans_execute(F) -> case mnesia:is_transaction() of true -> F(); - false -> mnesia:sync_dirty(F) + false -> mnesia:async_dirty(F) end. diff --git a/src/jc_app.erl b/src/jc_app.erl index 15a6aa3..16418fc 100644 --- a/src/jc_app.erl +++ b/src/jc_app.erl @@ -21,21 +21,15 @@ %%% ============================================================================ %% ----------------------------------------------------------------------------- -%% @private Ask jc_store to initialize or join the the mnesia cluster, start -%% the tcp listeners (for protocol users), and fire-up the top level supervisor. +%% @private Ask jc_store to initialize or join the the mnesia cluster, and +%% fire-up the top level supervisor. %% -spec start (normal | {takeover | failover, atom()}, [{node, atom()}]) -> {ok, pid()} | {error, atom()}. start(_StartType, _StartArgs) -> ok = jc_cluster:init(), - Port = application:get_env(jc, protocol_port, 5555), - {ok, _} = ranch:start_listener(jc_proto, - 100, - ranch_tcp, - [{port, Port}], - jc_protocol, [Port]), - lager:info("tcp, protocol listener is up and listening on port: ~p", [Port]), + case jc_sup:start_link() of {ok, Pid} -> {ok, Pid}; diff --git a/src/jc_bridge.erl b/src/jc_bridge.erl index 377b4fa..536e122 100644 --- a/src/jc_bridge.erl +++ b/src/jc_bridge.erl @@ -18,7 +18,9 @@ %% Module API --export([start_link/0, do/1]). +-export([start_link/0]). + +-export([do/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -27,8 +29,11 @@ -define(SERVER, ?MODULE). +%% State is just a list of active nodes, used to enable fast response to the +%% 'which' info_message - see jc_store:locus/2 +%% +-record(jc_bridge_state, {nodes :: list(node())}). --record(jc_bridge_state, {}). @@ -56,6 +61,7 @@ do(Message) -> ?SERVER ! {self(), Message}. + %%% ---------------------------------------------------------------------------- %%% gen_server callbacks %%% ---------------------------------------------------------------------------- @@ -68,13 +74,35 @@ do(Message) -> init([]) -> Nodes = application:get_env(jc, cache_nodes,[node()]), + UpNodes = [node() | up_nodes(Nodes)], lager:info("~p: letting ~p know that this node is ready.", [?MODULE, Nodes]), gen_server:abcast(Nodes, jc_psub, {jc_bridge_up, node()}), + subscribe_and_monitor(), + + % Store the cach node names, cnt and subscribe to changes + lager:info("~p: up.", [?MODULE]), - {ok, #jc_bridge_state{}}. + {ok, #jc_bridge_state{nodes = UpNodes}}. + + + +% Return the list of up cache nodes. +% +up_nodes(Nodes) -> + [N || N <- Nodes, + is_pid(rpc:call(N, erlang, whereis, [jc_bridge], 1000))]. + + +% Subscribe to jc_node_events and monitor jc_psub so that if it goes down, +% jc_bridge can re-subscribe. +% +subscribe_and_monitor() -> + jc_psub:topic_subscribe(self(), jc_node_events, any), + _ = erlang:monitor(process, jc_psub). + %% ----------------------------------------------------------------------------- %% @private Handling call messages @@ -100,9 +128,27 @@ handle_cast(Msg, State) -> %% ----------------------------------------------------------------------------- %% @private Handling info messages: %% Execute the requested j_cache operation and return the answer to requester. +%% Also recieve the jc_psub, subscription message notifiying of node up down. %% -spec handle_info(any(), #jc_bridge_state{}) -> {noreply, #jc_bridge_state{}}. +handle_info({_From, {jc_node_events, {_Type, _Node, Active, _C}}}, _State) -> + NewState = #jc_bridge_state{nodes = Active}, + lager:info("~p: nodes changed. New actives: ~p.", [?MODULE, Active]), + {noreply, NewState}; + + +handle_info({'DOWN', _Ref, process, {jc_psub, _Node}, _Info}, State) -> + % Need to resubscribe to node up/down messages. + lager:info("~p: jc_psub went down. Resubscribing.", [?MODULE]), + subscribe_and_monitor(), + {noreply, State}; + +handle_info({From, {locus, Key}}, #jc_bridge_state{nodes=Nodes} = State) -> + _Pid = spawn(fun() -> From ! jc_store:locus(Key, Nodes) end), + {noreply, State}; + + handle_info({From, {map_subscribe, Map, Key, Ops}}, State) -> _Pid = spawn(fun()->From ! jc_psub:map_subscribe(From, Map, Key, Ops) end), {noreply, State}; @@ -149,9 +195,8 @@ handle_info({From, {get_max_ttls}}, State) -> spawn(fun()->From ! jc_eviction_manager:get_max_ttls() end), {noreply, State}; - handle_info({From, {put, Map, Key, Value}}, State) -> - _Pid = spawn(fun() ->From ! jc:put(Map, Key, Value) end), + _Pid = spawn(fun() -> From ! jc:put(Map, Key, Value) end), {noreply, State}; handle_info({From, {put_s, Map, Key, Value, Seq}}, State) -> @@ -202,6 +247,7 @@ handle_info({From, {evict_map_since, Map, Secs}}, State) -> handle_info({From, {evict, Map, Key}}, State) -> _Pid = spawn(fun() -> From ! jc:evict(Map, Key) end), {noreply, State}; + handle_info({From, {evict_s, Map, Key, Seq}}, State) -> _Pid = spawn(fun() -> From ! jc_s:evict(Map, Key, Seq) end), {noreply, State}; @@ -329,3 +375,4 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. + diff --git a/src/jc_cluster.erl b/src/jc_cluster.erl index 146a69f..aad0e18 100644 --- a/src/jc_cluster.erl +++ b/src/jc_cluster.erl @@ -113,7 +113,7 @@ get_cluster_id() -> end. -% kill this node... +% kill the application, it will be restarted... % kamakazee() -> lager:notice("~p: ~p seppuku.", [?MODULE, node()]), @@ -159,8 +159,8 @@ init()-> %% ----------------------------------------------------------------------------- -%% @private Jcache is considered loaded if jc_psub is up - it is the last gen_server -%% started by the superviosr. +%% @private Jcache is considered loaded if jc_psub is up - it is the last +%% gen_server started by the superviosr. %% -spec jc_loaded(Node::node()) -> true | false. @@ -179,18 +179,21 @@ dynamic_db_init([]) -> mnesia:create_table(key_to_value, [{attributes, record_info(fields, key_to_value)}, - {type, ordered_set}, + {type, set}, {index, [map, key, ref, create_tm]} ]), + mnesia:create_table(seq, [{attributes, record_info(fields, seq)}, {type, set} ]), + mnesia:create_table(to_index, [{attributes, record_info(fields, to_index)}, {type, set}, {index, [map_name, position]} ]), + mnesia:create_table(auto_index, [{attributes, record_info(fields, auto_index)}, {type, set} @@ -198,17 +201,21 @@ dynamic_db_init([]) -> mnesia:create_table(ttl, [{attributes, record_info(fields, ttl)} ]), + mnesia:create_table(max_ttl, [{attributes, record_info(fields, max_ttl)}, {type, set} ]), + mnesia:create_table(stats, [{attributes, record_info(fields, stats)} ]), + mnesia:create_table(ps_sub, [{attributes, record_info(fields, ps_sub)}, {local_content, true} ]), + mnesia:create_table(ps_client, [{attributes, record_info(fields, ps_client)}, {local_content, true} diff --git a/src/jc_netsplit.erl b/src/jc_netsplit.erl index 57d883a..be73d58 100644 --- a/src/jc_netsplit.erl +++ b/src/jc_netsplit.erl @@ -28,8 +28,8 @@ %%% node will report different ClusterId - bad. %%% %%% For any bad outcome, all nodes having the 'different' ClusterId are killed -%%% to be restarted by the heart process, and a survivor does a flush. -%%% +%%% to be restarted by the heartbeat process, and a survivor may do a flush per +%%% policy in configy.sys. %%% %%% @end %%% Created : 18 May 2016 by Jim Rosenblum <jrosenblum@Jims-MacBook-Pro.local> @@ -56,9 +56,10 @@ -define(LOCK, {?MODULE, self()}). -% State: list of configured nodes. +% State: list of configured nodes, and flush policy on join after net split. -record(jc_ns_state, - {nodes = [] :: [Configured::node()]}). + {nodes = [] :: [Configured::node()], + should_flush = true :: boolean()}). @@ -90,10 +91,12 @@ start_link() -> init([]) -> ok = net_kernel:monitor_nodes(true), {ok, Configured} = application:get_env(jc, cache_nodes), + ShouldFlush = application:get_env(jc, should_flush, false), lager:info("~p: up and watching events for ~p.", [?SERVER, Configured]), - {ok, #jc_ns_state{nodes = lists:sort(Configured)}}. + {ok, #jc_ns_state{nodes = lists:sort(Configured), + should_flush = ShouldFlush}}. %% ----------------------------------------------------------------------------- @@ -121,8 +124,8 @@ handle_cast(Msg, State) -> %% -spec handle_info(any(), #{}) -> {noreply, #{}}. -handle_info({nodeup, Upped}, #jc_ns_state{nodes = Ns} = State) -> - check_cluster_health(Upped, Ns), +handle_info({nodeup, Upped}, #jc_ns_state{nodes = Ns, should_flush = Sf} = State) -> + check_cluster_health(Upped, Ns, Sf), {noreply, State}; handle_info({nodedown, Downed}, State) -> @@ -191,23 +194,24 @@ do_change(Downed) -> % all nodes will report the same ClusterId. If not, then 'outliers' should % kill themselves and let the hearbeat process restart them. % -check_cluster_health(Upped, Nodes) -> +check_cluster_health(Upped, Nodes, ShouldFlush) -> case is_relevant(Upped, Nodes) of false -> ok; true -> - check(Upped, Nodes, jc_cluster:get_cluster_id()), + check(Upped, Nodes, jc_cluster:get_cluster_id(), ShouldFlush), ok end. -% Ask each node to check that it has the same ClusterId, if not flush. -% Any node that has a different ClusterId will kill itself and be restarted -% by the heartbeat process. +% Ask each node to check that it has the same ClusterId. Any node that has +% a different ClusterId will kill itself and be restarted by the heartbeat +% process. If any nodes were killed, flush the entire cache per policy in +% sys.config % -check(Upped, Nodes, ClusterId) -> +check(Upped, Nodes, ClusterId, ShouldFlush) -> {Res, _Bad} = global:trans(?LOCK, fun() -> @@ -223,9 +227,13 @@ check(Upped, Nodes, ClusterId) -> infinity), case lists:member(bad, Res) of - true -> - lager:notice("~p: cluster repaired, flushing.", [?SERVER]), + true when ShouldFlush -> + lager:notice("~p: cluster repaired, flushing cache per policy.", + [?SERVER]), jc:flush(); + true when not ShouldFlush -> + lager:notice("~p: cluster repaired, not flushing cache per policy.", + [?SERVER]); false -> lager:notice("~p: cluster showed no signs of inconsistency.", [?SERVER]) diff --git a/src/jc_protocol.erl b/src/jc_protocol.erl deleted file mode 100644 index cb30c0f..0000000 --- a/src/jc_protocol.erl +++ /dev/null @@ -1,411 +0,0 @@ -%%% ---------------------------------------------------------------------------- -%%% @author Jim Rosenblum <jrosenblum> -%%% @copyright (C) 2015, Jim Rosenblum -%%% @doc -%%% -%%% JC Protocol 1.0 -%%% A binary-encoded, string protocol used to provide socket-based -%%% interoperability with JC. Incoming messages are string representations -%%% of a tuple. Responses are JSON. -%%% -%%% The protocol defines three message types: CONNECT, CLOSE and COMMAND all of -%%% which are binary strings consisting of a header, indicating the size of the -%%% message in bytes, follwed by the actual message itself. The header is an -%%% 8-byte, big endian, unsigned integer indicating the size of the message in -%%% bytes. -%%% -%%% A RESPONSE is structured the same as messages - 8-byte size header followed -%%% by the content of the response. -%%% -%%% The CONNECT command initiates a session, -%%% -%%% ```M = <<"{connect,{version,\"1.0\"}}">>''' -%%% -%%% The byte size is 26, so the CONNECT message is: -%%% -%%% ```<<26:8/integer-unit:8, M/binary>> = -%%% <<0,0,0,0,0,0,0,26,123,99,111,110,110,101,99,116,44,123,118,101,114, -%%% 115,105,111,110,44,32,34,49,46,48,34,125,125>> ''' -%%% -%%% The server will respond to a CONNECT command with either an error or -%%% the appropriately encoded version of {\"version\":\"1.0\"} -%%% -%%% ``` <<17:8/integer-unit:8, <<"{\"version\":1.0}">> = -%%% <0,0,0,0,0,0,0,17,123,34,118,101,114,115,105,111,110,34, -%%% 58,34,49,46,48,34,125>> ''' -%%% -%%% -%%% The CLOSE command closes the socket, ending the session -%%% -%%% ```M = <<"{close}">>''' -%%% -%%% Size is 7 so the CLOSE message is: -%%% ```<0,0,0,0,0,0,0,7,123,99,108,111,115,101,125>> ''' -%%% -%%% -%%% COMMAND messages are string versions of the tuple-messages which -%%% {@link jc_bridge. jc_bridge} uses, only without the self() parameter. For -%%% example the jc_brdige message, {self(), {put, Map, Key, Value}} becomes -%%% {put, Map, Key, Value} -%%% -%%% The RESPONSE will be an appropriately encoded, binary version of a JSON -%%% response representing the Erlang return value. -%%% -%%% A client session might look as follows: -%%% -%%% ```client:send("{put, evs, 1, \"a string value\"}") -%%% ==> <<"{\"ok\":1}">> -%%% -%%% client:send("{get, evs, 1}"), -%%% ==> <<"{\"ok\": \"a string value\"}">>''' -%%% -%%% -%%% @end -%%% Created : 25 Aug 2015 by Jim Rosenblum <jrosenblum@carelogistics.coml> -%%% ---------------------------------------------------------------------------- - --module(jc_protocol). - --behaviour(gen_server). --behaviour(ranch_protocol). - - -%% module API --export([start_link/4]). - - -%% gen_server callbacks --export([init/1, init/4, - handle_call/3, - handle_cast/2, - handle_info/2, - terminate/2, - code_change/3]). - --define(SERVER, ?MODULE). --define(TIMEOUT, infinity). - --record(jc_p, {socket, - trans :: 'jc_protocol', - connected = false :: boolean(), - acc = undefined :: undefined | binary(), - size = undefined :: undefined | non_neg_integer(), - version = <<"1.0">> :: binary()}). --type jc_p() :: #jc_p{}. - - - -%%% ============================================================================ -%%% Modue API -%%% ============================================================================ - - -%% ----------------------------------------------------------------------------- -%% Start the gen_server which will handle the socket. -%% --spec start_link(ranch:ref(), any(), jc_protocol, any()) -> {ok,pid()}. - -start_link(Ref, Socket, Trans, Opts) -> - proc_lib:start_link(?MODULE, init, [Ref, Socket, Trans, Opts]). - - - -%%% ============================================================================ -%%% gen_server callbacks -%%% ============================================================================ - - -%% ----------------------------------------------------------------------------- -%% This function is never called. Its defined so that the gen_server contract -%% is fufilled. -%% --spec init([]) -> {ok, undefined}. - -init([]) -> {ok, undefined}. - - -%% ----------------------------------------------------------------------------- -%% Set up the socket and convert the process into a gen_server. -%% --spec init(ranch:ref(),any(),jc_protocol|undefined,[Port::integer()]) -> any(). - -init(Ref, S, T, _Opts = [Port]) -> - ok = proc_lib:init_ack({ok, self()}), - ok = ranch:accept_ack(Ref), - ok = T:setopts(S, [{active, once}, {packet, 0}]), - lager:info("~p(~p): up and listening on port ~p.", - [?MODULE, self(), Port]), - - gen_server:enter_loop(?MODULE, [], - #jc_p{socket = S, - trans = T, - connected = false, - size = undefined, - acc = <<>>}, - ?TIMEOUT). - - -%% ----------------------------------------------------------------------------- -%% @private Handle info messages: Socket messages and jc_psub subscription -%% messages. -%% --spec handle_info(any(), jc_p()) -> {noreply, jc_p()} | {stop, normal, jc_p()}. - -handle_info({tcp, S, Data}, State = #jc_p{socket=S, trans=T})-> - T:setopts(S, [{active, once}]), - NewState = handle_data(Data, State), - {noreply, NewState, ?TIMEOUT}; - -handle_info({tcp_closed, _Socket}, State) -> - {stop, normal, State}; - -handle_info({tcp_error, _, Reason}, State) -> - {stop, Reason, State}; - -handle_info(timeout, State) -> - {stop, normal, State}; - -handle_info(Msg, State = #jc_p{socket=S, trans = T}) -> - T:send(S, marshal(Msg)), - {noreply, State, ?TIMEOUT}. - - -%% ----------------------------------------------------------------------------- -%% @private Hande call messages. -%% --spec handle_call(term(), {pid(), _}, jc_p()) -> {reply, ok, jc_p()}. - -handle_call(Request, _From, State) -> - lager:warning("~p: unrecognized handle_call request: ~p.", - [?MODULE, Request]), - {reply, ok, State}. - - -%% ----------------------------------------------------------------------------- -%% @private Handle cast messages. -%% --spec handle_cast(any(), jc_p()) -> {noreply, jc_p()}. - -handle_cast(Msg, State) -> - lager:warning("~p: unexpected cast message: ~p.", [?MODULE, Msg]), - {noreply, State}. - - -%% ----------------------------------------------------------------------------- -%% @private Terminate server. -%% --spec terminate(any(), jc_p()) -> any(). - -terminate(Reason, _State) -> - lager:warning("~p: terminated with reason: ~p.", [?MODULE, Reason]), - ok. - - -%% ----------------------------------------------------------------------------- -%% @private Convert process state when code is changed. -%% --spec code_change(term(), jc_p(), any()) -> {ok, jc_p()}. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - - - -%%% ============================================================================ -%%% Internal functions -%%% ============================================================================ - - -%% ----------------------------------------------------------------------------- -%% @private Top-level handler for socket data, returns a new state. -%% --spec handle_data(Data::binary(), OldState::jc_p()) -> NewState::jc_p(). - -handle_data(Data, #jc_p{trans = T, socket = S, connected = C} = State) -> - try - handle(Data, State) - catch - throw:{fatal, F} -> - T:send(S, marshal({error, {protocol_error, F}})), - self() ! {tcp_closed, S}, - State; - _:Error -> - T:send(S, marshal({error, Error})), - State#jc_p{acc = <<>>, - size = undefined, - connected=C} - end. - - -%% ----------------------------------------------------------------------------- -%% Get Size Header, Get size number of bytes, Execute Command, Repeat. When no -%% more bytes, save what has been retrieved thus far in the State and return. -%% --spec handle(Data::binary(), OldState::jc_p()) -> NewState::jc_p(). - -handle(<<>>, State) -> - State; - -handle(<<Size:8/integer-unit:8,C/binary>>,#jc_p{size=undefined, acc = <<>>}=S)-> - case byte_size(C) of - CSize when CSize >= Size -> - <<Command:Size/binary, Remainder/binary>> = C, - S2 = execute(Command, S), - handle(Remainder, S2); - Less -> - S#jc_p{size = Size - Less, acc = C} - end; - -handle(Data, #jc_p{size = undefined, acc = Acc} = S) -> - NewData = <<Acc/binary, Data/binary>>, - case byte_size(NewData) of - NSize when NSize >= 8 -> - <<Size:8/integer-unit:8, Remainder/binary>> = NewData, - handle(Remainder, S#jc_p{size=Size, acc = <<>>}); - _ -> - S#jc_p{acc = NewData, size = undefined} - end; - -handle(Data, #jc_p{acc = Acc, size = Size} = S) when size /= undefined-> - case byte_size(Data) of - BSize when BSize >= Size -> - <<Seg:Size/binary, Remainder/binary>> = Data, - Command = <<Acc/binary, Seg/binary>>, - S2 = execute(Command, S), - handle(Remainder, S2#jc_p{acc = <<>>, size = undefined}); - Less -> - S#jc_p{size = Size - Less, acc = <<Acc/binary, Data/binary>>} - end. - - -%% ----------------------------------------------------------------------------- -%% Execute the command. -% -execute(Command, #jc_p{trans=T, socket=S, connected=false} = State)-> - case evaluate(Command) of - open_session -> - lager:debug("~p (~p): connected with version: ~p", - [?MODULE, self(), <<"1.0">>]), - T:send(S, marshal({version, <<"1.0">>})), - State#jc_p{connected = true}; - _ -> - throw({fatal, missing_connect}) - end; - -execute(Command, #jc_p{trans=T, socket = S} = State) -> - case evaluate(Command) of - close_session -> - T:send(S, marshal({close, 1.0})), - self() ! {tcp_closed, S}; - {command, R} -> - jc_bridge:do(R), - State; - _ -> - throw(bad_command) - end. - - -% ============================================================================== -% Utility functions -% ============================================================================== - - -% ------------------------------------------------------------------------------ -% Given a string representation of a tuple, convert it to an Erlang tuple, make -% strings binary and determine the cache action implied by the Command. -% -evaluate(Command) -> - try - {ok,Scanned,_} = erl_scan:string(binary_to_list(<<Command/binary,".">>)), - {ok,Parsed} = erl_parse:parse_exprs(strings_to_binary(Scanned, [])), - determine_action(Parsed) - catch - _:_ -> - throw(command_syntax) - end. - - -determine_action([{tuple,1,[{atom,1,close}]}]) -> - close_session; - -determine_action([{tuple,1,[{atom,1,connect},{tuple,1, - [{atom,1,version}, - {bin,1, - [{bin_element,1, - {string,1,"1.0"}, - default,default}]}]}]}]) -> - open_session; - -determine_action([{tuple,1,_}] = AST) -> - {value, R, _} = erl_eval:exprs(AST, []), - {command, R}; - -determine_action(_) -> - {error, badarg}. - - -% Marshal the message: make it binary JSON and package it according to the -% protocol. -marshal(Message) -> - Result = try - to_json(Message) - catch - _:_ -> to_json({error, jc_result_to_json}) - end, - package(Result). - -to_json({ok,{H, M}}) -> - - jsone:encode([{hits, encode_keys(H)}, {misses, M}], - [{float_format, [{decimals, 10}, compact]}]); - -to_json({ok, [{_,_}|_Vs]=Lst}) -> - jsone:encode(encode_keys(Lst), - [{float_format, [{decimals, 10}, compact]}]); - -to_json({{active, _}=A,{configured, _}=C}) -> - jsone:encode([A,C], - [{float_format, [{decimals, 10}, compact]}]); - -to_json({size, Sizes}) -> - Os = lists:map(fun({A,B,C}) -> {A, [B,C]} end, Sizes), - jsone:encode(Os, - [{float_format, [{decimals, 10}, compact]}]); - -to_json({uptime, [{up_at, U}, {now, N},{up_time, _UT}]}) -> - jsone:encode([{uptime, [{up_at, list_to_binary(U)}, - {now, list_to_binary(N)}]}], - [{float_format, [{decimals, 10}, compact]}]); - -to_json({X, Y}) -> - jsone:encode([{X,Y}], - [{float_format, [{decimals, 10}, compact]}]); - -to_json(M) -> - jsone:encode(M, - [{float_format, [{decimals, 10}, compact]}]). - - -% JC Keys can be anytime, but json needs object keys to be strings, so -% {k,v} needs to be embedded in a json \"key\" : key, \"value\":value -% object. -encode_keys(KVList) -> - lists:map(fun({K,V}) -> {[{key, K},{value, V}]} end, - KVList). - - -% Make binary pay-load with size header. -package(Message) -> - Size = byte_size(Message), - <<Size:8/integer-unit:8, Message/binary>>. - - -% Walk the scanned list of tokens making strings binary strings -strings_to_binary([], Acc) -> - lists:reverse(Acc); - -strings_to_binary([{string, _, _}=S|Tl], Acc) -> - strings_to_binary(Tl, [{'>>',1}, S, {'<<',1}|Acc]); - -strings_to_binary([Hd|Tl],Acc) -> - strings_to_binary(Tl, [Hd|Acc]). diff --git a/src/jc_sequence.erl b/src/jc_sequence.erl index 17b9dec..6f9b4f5 100644 --- a/src/jc_sequence.erl +++ b/src/jc_sequence.erl @@ -42,7 +42,8 @@ -define(SERVER, ?MODULE). --record(jc_seq_state, {}). +-record(jc_seq_state, {singleton :: boolean()}). + %%% ============================================================================ @@ -58,7 +59,12 @@ -spec test_set(Map::map(), NewSeq::seq()) -> true | false. test_set(Map, Seq) -> - gen_server:call({global, ?MODULE}, {test_set, Map, Seq}, 1000). + case global:whereis_name(?SERVER) of + undefined -> + gen_server:call(?SERVER, {test_set, Map, Seq}, 1000); + _Pid -> + gen_server:call({global, ?SERVER}, {test_set, Map, Seq}, 1000) + end. %% ----------------------------------------------------------------------------- @@ -78,15 +84,17 @@ start_link() -> %% ----------------------------------------------------------------------------- %% @private Initialize the server by trying to register under the global name, -%% jc_sequence. +%% jc_sequence. If singleton, try to register jc_sequence globally to this +%% process. %% -spec init([]) -> {ok, #jc_seq_state{}}. init([]) -> - lager:info("~p: up.", [?MODULE]), + IsSingleton = application:get_env(jc, jc_sequence_singleton, true), + grab_name(IsSingleton), + lager:info("~p: up with Singletong = ~p.", [?MODULE, IsSingleton]), - grab_name(), - {ok, #jc_seq_state{}}. + {ok, #jc_seq_state{singleton = IsSingleton}}. %% ----------------------------------------------------------------------------- @@ -122,10 +130,10 @@ handle_cast(Msg, State) -> %% -spec handle_info(any(), #jc_seq_state{}) -> {noreply, #jc_seq_state{}}. -handle_info({'DOWN', _MonitorRef, _Type, Object, Info}, State) -> - lager:debug("~p: jc_sequence master at ~p went down with ~p.", - [?SERVER, Object, Info]), - grab_name(), +handle_info({'DOWN', _MRef, _Type, Obj, Info}, State) -> + lager:debug("~p: jc_sequence master at ~p went down with ~p.", + [?SERVER, Obj, Info]), + grab_name(State#jc_seq_state.singleton), {noreply, State}; @@ -159,12 +167,14 @@ code_change(_OldVsn, State, _Extra) -> %%% Internal functions %%% ============================================================================ +%% Assuming that jc_sequence is to be a singleton, if the global registry +%% doesn't have a PID registered for jc_sequence, then claim it; otherwise, +%% monitor the master. +%% -%% ----------------------------------------------------------------------------- -%% If the global registry doesn't have a PID registered for jc_sequence, then -%% claim it; otherwise, monitor the master. - -grab_name() -> +grab_name(false) -> + ok; +grab_name(true) -> case global:register_name(?MODULE, self()) of yes -> lager:info("~p: master is ~p.", [?MODULE, self()]); diff --git a/src/jc_store.erl b/src/jc_store.erl index 4fcbf9b..fe1668f 100644 --- a/src/jc_store.erl +++ b/src/jc_store.erl @@ -40,11 +40,13 @@ flush/1, get/2, get_map/1, get_map_since/2, key_set/1, + map_exists/1, maps/0, put/5]). %% Meta-data API. --export([up_nodes/0, stats/1]). +-export([locus/2, up_nodes/0, stats/1]). + %% Custom-field indexing API. -export([indexes/0, indexes/1, create_index/2, start_indexing/2, stop_indexing/2]). @@ -68,6 +70,20 @@ %% ============================================================================= +%% ----------------------------------------------------------------------------- +%% Given a Key, hash it producing an index into the list of supplied Nodes and +%% return the indexed node. Used to help avoid transactions by allowing a client +%% to locate a key-specific node for destructive (write, delete, etc.) operations. +%% It is faster to ask for a node and then utilize that node's jc_bridge or +%% Jcache API rather than using transactions. +%% +-spec locus(key(), list(node())) -> node(). + +locus(K, Nodes) -> + Bucket = erlang:phash2(K, length(Nodes)) + 1, + lists:nth(Bucket, Nodes). + + %% ----------------------------------------------------------------------------- %% Return list of running mnesia nodes. %% @@ -105,6 +121,28 @@ stats(_) -> {error, badarg}. + +%%------------------------------------------------------------------------------ +%% @doc Return true if map exists, else false. +%% +-spec map_exists(Map::map()) -> [true | false]. + +map_exists(Map) -> + LocInRec = #key_to_value.map, + F = fun() -> + mnesia:select(key_to_value, + [{'$1', [{'==', Map, {element, LocInRec, '$1'}}], + [{element, LocInRec,'$1'}]}], + 1, + read) end, + case mnesia:async_dirty(F) of + '$end_of_table' -> + false; + _ -> + true + end. + + %%------------------------------------------------------------------------------ %% @doc Return a sorted list of all maps currently in the cache. %% @@ -130,12 +168,10 @@ maps() -> -spec clear(map_name()) -> ok. clear(Map) -> - F = fun() -> - Items = mnesia:index_read(key_to_value, Map, #key_to_value.map), - [mnesia:delete_object(Rec) || Rec <- Items], - mnesia:delete({seq, Map}) - end, - mnesia:sync_dirty(F), + + Items = mnesia:index_read(key_to_value, Map, #key_to_value.map), + [mnesia:delete_object(Rec) || Rec <- Items], + mnesia:delete({seq, Map}), ok. diff --git a/test/app.config b/test/app.config index bc24935..e0fbefb 100644 --- a/test/app.config +++ b/test/app.config @@ -2,16 +2,6 @@ %% the jc stanza. [ - %% SASL config - type of logging not typically used, but might come in handy - %% log files will be created, but are small and rotated. - {sasl, [ - {sasl_error_logger, {file, "log/sasl-error.log"}}, - {errlog_type, error}, - {error_logger_mf_dir, "log/sasl"}, % Log directory - {error_logger_mf_maxbytes, 10485760}, % 10 MB max file size - {error_logger_mf_maxfiles, 5} % 5 files max - ]}, - {mnesia, [ %% Specifies the maximum number of writes allowed to the transaction log @@ -31,7 +21,7 @@ {lager, [ {handlers, [ - {lager_console_backend, info}, + {lager_console_backend, [{level,info}]}, {lager_file_backend, [{file, "log/error.log"}, {level, error}, {size, 10485760}, diff --git a/test/app2.config b/test/app2.config index b4b371b..4f94df6 100644 --- a/test/app2.config +++ b/test/app2.config @@ -2,16 +2,6 @@ %% the jc stanza. [ - %% SASL config - type of logging not typically used, but might come in handy - %% log files will be created, but are small and rotated. - {sasl, [ - {sasl_error_logger, {file, "log/sasl-error.log"}}, - {errlog_type, error}, - {error_logger_mf_dir, "log/sasl"}, % Log directory - {error_logger_mf_maxbytes, 10485760}, % 10 MB max file size - {error_logger_mf_maxfiles, 5} % 5 files max - ]}, - {mnesia, [ %% Specifies the maximum number of writes allowed to the transaction log @@ -31,7 +21,7 @@ {lager, [ {handlers, [ - {lager_console_backend, info}, + {lager_console_backend, [{level, info}]}, {lager_file_backend, [{file, "log/error.log"}, {level, error}, {size, 10485760}, diff --git a/test/jc.coverspec b/test/jc.coverspec index 7f1f459..ab22f60 100644 --- a/test/jc.coverspec +++ b/test/jc.coverspec @@ -30,6 +30,7 @@ jc_bridge, jc_cluster, jc_eviction_manager, + jc_netsplit, jc_protocol, jc_psub, jc_sequence, diff --git a/test/jc_SUITE.erl b/test/jc_SUITE.erl index 898cf35..000a8e3 100644 --- a/test/jc_SUITE.erl +++ b/test/jc_SUITE.erl @@ -202,8 +202,8 @@ meta_data_test(_Config) -> [{auto_index,{records,0},{bytes,_}}, {key_to_value,{records,0},{bytes,_}}, {max_ttl,{records,0},{bytes,_}}, - {ps_client,{records,0},{bytes,_}}, - {ps_sub,{records,0},{bytes,_}}, + {ps_client,{records,1},{bytes,_}}, + {ps_sub,{records,1},{bytes,_}}, {schema,{records,10},{bytes,_}}, {seq,{records,0},{bytes,_}}, {stats,{records,2},{bytes,_}}, @@ -224,12 +224,15 @@ meta_data_test(_Config) -> % put_s should create map if not there % deleting last item from map should remove it from use maps_test(_Config) -> + + false = jc:map_exists(bed), {maps, []} = bridge({maps}), {ok, 1} = bridge({put, bed, 1, 1}), {ok, 1} = bridge({put, evs, 1, 1}), {ok, 2} = bridge({put, evs, 2, 2}), {ok, 2} = bridge({put_s, trx, 2, 2, 22}), {maps, [bed, evs, trx]} = bridge({maps}), + true = jc:map_exists(bed), bridge({evict, bed, 1}), {maps, [evs, trx]} = bridge({maps}), bridge({evict, evs, 1}), @@ -388,9 +391,9 @@ put_all_test(_Config)-> {ok, 100} = bridge({put_all, bed, KVs, 2}), {ok, TestVs} = bridge({values, bed}), - lists:sort(Vs) == lists:sort(TestVs), + true = (lists:sort(Vs) == lists:sort(TestVs)), {ok, TestKs} = bridge({key_set, bed}), - lists:sort(Ks) == lists:sort(TestKs), + true = (lists:sort(Ks) == lists:sort(TestKs)), timer:sleep(2100), {ok, {[], M}} = bridge({get_all, bed, Ks}), @@ -754,7 +757,8 @@ remove_items_test(_config) -> {ok,3} = jc:put_all(bed, [{1, one},{2, two},{3, three}]), {ok,[{1,one}]} = bridge({remove_items, bed, [1, 22]}), {ok, []} = bridge({remove_items, bed, [1, 22]}), - {ok, [2,3]} = jc:key_set(bed), + {ok, Result} = jc:key_set(bed), + [2,3] = lists:sort(Result), {ok,[{3, three}, {2, two}]} = bridge({remove_items, bed, [2, 3, 3, 4]}), {records, 0} = jc:map_size(bed), jc:flush(), @@ -762,7 +766,8 @@ remove_items_test(_config) -> {ok,3} = jc_s:put_all(bed, [{1, one},{2, two},{3, three}], 10), {ok,[{1,one}]} = bridge({remove_items_s, bed, [1, 22], 11}), {ok, []} = bridge({remove_items_s, bed, [1, 22], 12}), - {ok, [2,3]} = jc:key_set(bed), + {ok, Result} = jc:key_set(bed), + true = ([2,3] == lists:sort(Result)), {error, out_of_seq} = bridge({remove_items_s, bed, [2, 3, 3, 4], 1}), {ok,[{3, three}, {2, two}]} = bridge({remove_items_s, bed, [2, 3, 3, 4], 111}), {records, 0} = jc:map_size(bed). @@ -838,13 +843,19 @@ map_subscribe_test(_Config) -> jc_bridge ! {self(), {map_subscribe, bed, key, write}}, timer:sleep(100), This = self(), - This = mnesia:dirty_first(ps_client), - [{ps_sub, {map_sub, bed, key, write}, Set}] = - mnesia:dirty_read(ps_sub, mnesia:dirty_first(ps_sub)), - sets:is_element(self(), Set), + true = lists:member(This, mnesia:dirty_all_keys(ps_client)), + A = mnesia:dirty_read(ps_sub, mnesia:dirty_next(ps_sub, mnesia:dirty_first(ps_sub))), + case A of + [{ps_sub, {map_sub, bed, key, write}, Set}] -> + sets:is_element(self(), Set); + _ -> + [{ps_sub, {map_sub, bed, key, write}, Set2}] = + mnesia:dirty_read(ps_sub, mnesia:dirty_first(ps_sub)), + sets:is_element(self(), Set2) + end, - 1 = jc_psub:client_count(), - 2 = jc_psub:load(), + 2 = jc_psub:client_count(), + 4 = jc_psub:load(), jc_bridge ! {self(), {put, bed, key, 1}}, jc_bridge ! {self(), {put, bed, 1, 1}}, @@ -870,11 +881,13 @@ map_subscribe_test(_Config) -> jc_psub ! {evict_deadbeats, 120000}, timer:sleep(200), - '$end_of_table' = mnesia:dirty_first(ps_sub), - '$end_of_table' = mnesia:dirty_first(ps_client), - - 0 = jc_psub:load(), - 0 = jc_psub:client_count(), + 1 = length(mnesia:dirty_all_keys(ps_client)), + 1 = length(mnesia:dirty_all_keys(ps_sub)), + + 2 = jc_psub:load(), + 1 = jc_psub:client_count(), + + flush(), jc:put(bed, otherkey, 1), @@ -887,7 +900,9 @@ map_subscribe_test(_Config) -> jc_bridge ! {self(), {map_subscribe, evs, any, any}}, timer:sleep(600), - First = mnesia:dirty_first(ps_sub), + Zeroth = mnesia:dirty_first(ps_sub), + First = mnesia:dirty_next(ps_sub, mnesia:dirty_first(ps_sub)), + Second = mnesia:dirty_next(ps_sub, First), @@ -908,7 +923,8 @@ map_subscribe_test(_Config) -> catch _:_ -> [{ps_sub, {map_sub, bed, key, delete}, Y2}] = - mnesia:dirty_read(ps_sub, First), + mnesia:dirty_read(ps_sub, Zeroth), + sets:is_element(self(), Y2) end, diff --git a/test/protocol_SUITE.erl b/test/protocol_SUITE.erl deleted file mode 100644 index 0b4212a..0000000 --- a/test/protocol_SUITE.erl +++ /dev/null @@ -1,219 +0,0 @@ - - - --module(protocol_SUITE). - --include("../include/records.hrl"). --inclde_lib("common_test/include/ct.hrl"). - - --export([all/0, - init_per_suite/1, - init_per_testcase/2, - end_per_testcase/2, - end_per_suite/1]). - --export([put_get_test/1, error_test/1, quant_test/1]). - - -all() -> - [ put_get_test, error_test, quant_test -]. - -init_per_suite(Config) -> - net_kernel:start(['jc1@127.0.0.1', longnames]), - application:set_env(jc, cache_nodes, - ['jc1@127.0.0.1','jc2@127.0.0.1', 'jc3@127.0.0.1']), - application:set_env(jc, max_ttl_maps, [{testmap, 100}]), - application:set_env(jc, indexes, [{bed, "identifier"}, - {bed, "menu.2.id.'2'"}, - {cow, "cow.2.id.'2'"}]), - application:set_env(jc, analyze_freq, {5, 5}), - application:set_env(jc, protocol_port, 5555), - - application:ensure_all_started(jc), - lager:set_loglevel(lager_console_backend, info), - Config. - - -init_per_testcase(_, Config) -> - code:load_file(t), - t:start_link(), - <<"{\"version\":\"1.0\"}">> = get_result(), - - {maps, Maps} = jc:maps(), - [jc:clear(Map) || Map <- Maps], - Config. - -end_per_testcase(_, Config) -> - jc:flush(silent), - t:send("{close}"), - get_all_results(), - Config. - -end_per_suite(Config) -> - jc:stop(), - Config. - - -error_test(_Config) -> - t:send("{put, bed, \"1\", 1"), - <<"{\"error\":\"command_syntax\"}">> = get_result(), - - t:send("{what, bed, \"1\", 1}"), - <<"{\"error\":\"unrecognized_command\"}">> = get_result(). - - - -put_get_test(_Config) -> - - t:send("{put, bed, \"1\", 1}"), - <<"{\"ok\":\"1\"}">> = get_result(), - - t:send("{put, bed, \"3\", 3.3}"), - <<"{\"ok\":\"3\"}">>= get_result(), - - t:send("{put, bed, \"4\", [1,2,3]}"), - <<"{\"ok\":\"4\"}">> = get_result(), - - J1 = "\"{\\\"first\\\":{\\\"second\\\":1, \\\"third\\\":true},\\\"second\\\":true}\"", - J2 = "\"{\\\"first\\\":{\\\"second\\\":2, \\\"third\\\":false},\\\"second\\\":true}\"", - - t:send("{put, json, \"1\", " ++ J1 ++ "}"), - <<"{\"ok\":\"1\"}">> = get_result(), - - t:send("{put, json, \"2\", " ++ J2 ++ "}"), - <<"{\"ok\":\"2\"}">> = get_result(), - - t:send("{get, bed, \"1\"}"), - <<"{\"ok\":1}">> = get_result(), - - t:send("{get, bed, \"3\"}"), - <<"{\"ok\":3.3}">> = get_result(), - - t:send("{get, bed, \"4\"}"), - <<"{\"ok\":[1,2,3]}">> = get_result(), - - - t:send("{put_all, evs, [{1,1}, {2,\"2\"}, {\"3\",3.3}]}"), - <<"{\"ok\":3}">> = get_result(), - - t:send("{get_all, evs, [1, 2, 4]}"), - <<"{\"hits\":[{\"key\":2,\"value\":\"2\"},{\"key\":1,\"value\":1}],\"misses\":[4]}">> = get_result(), - - t:send("{key_set, bed}"), - <<"{\"ok\":[\"1\",\"3\",\"4\"]}">> = get_result(), - - t:send("{values, bed}"), - true = - case lists:sort(maps:get(<<"ok">>,jsone:decode(get_result()))) of - [1,3.3,[1,2,3]] -> true - end, - - - t:send("{values_match, json, \"first.second=1\"}"), - R = list_to_binary("[{\"key\":\"1\",\"value\":" ++ J1 ++ "}]"), - R = get_result(), - - t:send("{values_match, json, \"second=true\"}"), - - R2 = list_to_binary("[{\"key\":\"1\",\"value\":" ++ J1 ++ "},{\"key\":\"2\",\"value\":" ++ J2 ++ "}]"), - R2Alt = list_to_binary("[{\"key\":\"2\",\"value\":" ++ J2 ++ "},{\"key\":\"1\",\"value\":" ++ J1 ++ "}]"), - - true = case get_result() of - R2 -> true; - R2Alt -> true; - _ -> false - end, - - - t:send("{put, bed, \"1\", 1}"), - <<"{\"ok\":\"1\"}">> = get_result(), - - t:send("{evict, evs, 1}"), - <<"\"ok\"">> = get_result(), - - t:send("{map_size, evs}"), - <<"{\"records\":2}">> = get_result(), - - t:send("{map_size, json}"), - <<"{\"records\":2}">> = get_result(), - - t:send("{evict_all_match, \"first.second=1\"}"), - <<"\"ok\"">> = get_result(), - - t:send("{map_size, json}"), - <<"{\"records\":1}">> = get_result(), - - t:send("{contains_key, evs, 2}"), - <<"true">> = get_result(), - - t:send("{contains_key, evs, 123}"), - <<"false">> = get_result(), - - t:send("{cache_nodes}"), - <<"{\"active\":[\"jc1@127.0.0.1\"],\"configured\":[\"jc1@127.0.0.1\",\"jc2@127.0.0.1\",\"jc3@127.0.0.1\"]}">> - = get_result(), - - t:send("{flush}"), - <<"\"ok\"">> = get_result(), - - t:send("{cache_size}"), - <<"{\"key_to_value\":", _/binary >> = get_result(), - - t:send("{map_size, bed}"), - <<"{\"records\":0}">> = get_result(), - - t:send("{put_s, bed, \"1\", 1, 20}"), - <<"{\"ok\":\"1\"}">> = get_result(), - - t:send("{put_s, evs, \"1\", 1, 30}"), - <<"{\"ok\":\"1\"}">> = get_result(), - - t:send("{maps}"), - <<"{\"maps\":[\"bed\",\"evs\"]}">> = get_result(), - - t:send("{up}"), - <<"{\"uptime\":", _/binary>> = get_result(), - - t:send("{sequence}"), - <<"{\"bed\":20,\"evs\":30}">> = get_result(), - - t:send("{sequence, map}"), - <<"0">> = get_result(), - - t:send("{set_max_ttl, bed, 1000}"), - <<"\"ok\"">> = get_result(), - - t:send("{get_max_ttls}"), - <<"{\"bed\":1000,\"testmap\":100}">> = get_result(). - - - -quant_test(_config) -> - Limit = 5000, - [ t:send("{put, bed, \"" ++ integer_to_list(X) ++ "\", " ++ integer_to_list(X) ++ "}") || - X <- lists:seq(1, Limit)], - R = get_all_results(), - Limit = length(R) - 1, - t:send("{map_size, bed}"), - <<"{\"records\":5000}">> = get_result(). - - -get_all_results() -> - receive X -> - [X|get_all_results()] - after - 1000 -> - [ok] - end. - -get_result() -> - receive - X -> X - after - 400 -> - error - end. - -
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor