A new user interface for you! Read more...

File 1506-Add-consistent-hashing-to-diameter_dist-route_sessio.patch of Package erlang

From 376e8fac401bd11b3cb3d2f9661eb7ff0b9bbcd7 Mon Sep 17 00:00:00 2001
From: Anders Svensson <anders@erlang.org>
Date: Wed, 6 Mar 2019 17:13:07 +0100
Subject: [PATCH 6/7] Add consistent hashing to diameter_dist:route_session/2

If the Session-Id optional value to node() mapping fails then hash
Session-Id to a node by default, instead of selecting the local node as
in the parent commit. The previous behaviour is configurable by setting
default = local in an options map.

Nodes make themselves part of the pool from which nodes are selected by
calling diameter_dist:attach/1 with the list of service names they are
willing to handle requests for, the local node being selected in the
absence of any attached nodes. The original idea was to base the node
pool on share_peers and/or use_shared_peers configuration, but that
configuration determines where outgoing requests can be sent, while
route_session/2 deals with incoming requests, so it's not obvious that
conflating the two is a good thing. (Also because
share_peers/use_shared_peers can be used in different ways; the former
could have been skipped entirely.)

The hashing effectively places nodes on a circle, a hashed Session-Id
being mapped to the nearest predecessor node (clockwise). Nodes are
rehashed with each Session-Id (with the id as salt) for a more even
distribution, at the cost of performance, although how high the cost or
how even the distribution has yet to be tested. Obviously, the larger
the number of attached nodes, the higher the cost. Adding/removing an
attached node only affects session ids that hash in the interval between
the added/removed node and its successor (hence consistent hashing).

Options are tweaked slightly compared to the parent commit, and it is
now possible to restrict the optional value mapping to specific Diameter
identities, to avoid mapping an id that was generated at the peer when
the peer is also implemented with the diameter application.

Note that diameter_dist is not yet an officially documented interface,
so could change. Documentation is in the module itself.
---
 lib/diameter/src/base/diameter_dist.erl    | 293 +++++++++++++++++++++++++----
 lib/diameter/src/base/diameter_traffic.erl |  13 +-
 2 files changed, 264 insertions(+), 42 deletions(-)

diff --git a/lib/diameter/src/base/diameter_dist.erl b/lib/diameter/src/base/diameter_dist.erl
index ed2859e914..5c29ea95a4 100644
--- a/lib/diameter/src/base/diameter_dist.erl
+++ b/lib/diameter/src/base/diameter_dist.erl
@@ -28,12 +28,20 @@
 %% requests to handler processes (local or remote) in various ways.
 %%
 
-%% spawn_opt callbacks; initial argument constructed in diameter_traffic
+%% spawn_opt callbacks
 -export([spawn_local/2,
          spawn_local/1,
          route_session/2,
          route_session/1]).
 
+%% signal availability for handling incoming requests to route_sesssion/2
+-export([attach/1,
+         detach/1]).
+
+%% consistent hashing
+-export([hash/3,   %% for use as default MFA in route_session/2 options map
+         hash/2]). %% arbitrary key/values
+
 -include_lib("diameter/include/diameter.hrl").
 
 %% server start
@@ -50,9 +58,21 @@
 -type request() :: tuple().  %% callback argument from diameter_traffic
 
 -define(SERVER, ?MODULE).  %% server monitoring node connections
--define(TABLE, ?MODULE).   %% node() binary -> node() atom
+
+%% Maps a node name binary to the corresponding atom. Used by
+%% route_session/2 to map the optional value of a Session-Id to
+%% node().
+-define(NODE_TABLE, diameter_dist_node).
+
+%% Maps a diameter:service_name() to a node() that has called attach/1
+%% to declare its willingness to handle incoming requests for the
+%% service. Use by route_session/2 in case the optional value mapping
+%% has failed.
+-define(SERVICE_TABLE, diameter_dist_service).
 
 -define(B(A), atom_to_binary(A, utf8)).
+-define(ORCOND(List), list_to_tuple(['orelse', false | List])).
+-define(HASH(T), erlang:phash2(T, 16#100000000)).
 
 %% spawn_local/2
 %%
@@ -76,32 +96,82 @@ spawn_local(ReqT) ->
 
 %% route_session/2
 %%
-%% Callback that routes requests containing Session-Id AVPs as
-%% returned by diameter:session_id/0 back to the node on which the
-%% function was called. This is only appropriate when sessions are
-%% only initiated by the own (typically client) node, and ids have
-%% been returned from diameter:session_id/0.
+%% Callback that maps the Session-Id of an incoming request to a
+%% handler node.
 %%
-%% This can be used with #{search => 0} to route on something other
-%% than Session-Id since default can be an MFA returning a node()
-%% (applied to the incoming diameter_packet record) and dispatch can
-%% be an MFA returning a pid() (applied to Node and the request MFA),
-%% but this is no simpler than just implementing an own spawn_opt
-%% callback. (Except with the default dispatch possibly.)
-
+%% With an options list, maps an id whose optional value is the name
+%% of a connected node to the same node, to handle the case that the
+%% session id has been returned from diameter:session_id/1; otherwise
+%% to a node that has called diameter_dist:attach/1 using the
+%% consistent hashing provided by hash/3, or to the local node() if a
+%% session id could not be extracted or there are no attached nodes. A
+%% handler process is spawned on the selected node using
+%% erlang:spawn_opt/4.
+%%
+%% Different behaviour can be configured by supplying an options map
+%% of the following form:
+%%
+%%   #{search => non_neg_integer(),
+%%     id => [binary()],
+%%     default => discard | local | mfa(),
+%%     dispatch => list() | mfa()}
+%%
+%% The search member limits the number of AVPs that are examined in
+%% the message (from the front), to avoid searching entire message in
+%% case it's known that peers follow RFC 6733's recommendation that
+%% Session-Id be placed at the head of a message. The default is to
+%% search the entire message.
+%%
+%% The id member restricts the optional value mapping to session ids
+%% whose DiamterIdentity is one of those specified. Set this to the
+%% list of Diameter identities advertised by the service in question
+%% (typically one) to ensure that only locally generated session ids
+%% are mapped; or to the empty list to disable the mapping.
+%%
+%% The default member determines where to handle a message whose
+%% Session-Id isn't found or whose optional value isn't mapped to the
+%% name of a connected node. The atom local says the local node, an
+%% MFA is invoked on Session-Id | false, the name of the diameter
+%% service, and the message binary, and should return either a node()
+%% or false to discard the message. Defaults to {diameter_dist, hash, []}.
+%%
+%% The dispatch member determines how the pid() of the request handler
+%% process is retrieved. An MFA is applied to a previously selected
+%% node(), and the module, function, and arguments list to apply in
+%% the handler process to handle the request, the MFA being supplied
+%% by diameter, and returns pid() | discard. A list is equivalent to
+%% {erlang, spawn_opt, []}. Defaults to [].
+%%
+%% This can be used with search = 0 to route on something other than
+%% Session-Id, but this is probably no simpler than just implementing
+%% an own spawn_opt callback. (Except with the default dispatch possibly.)
+%%
+%% Note that if the peer is also implemented with OTP diameter and
+%% generating session ids with diameter:session_id/1 then
+%% route_session/2 can map an optional value to a local node that
+%% happens to have the same name as one of the peer's nodes. This
+%% could lead to an uneven distribution; for example, if the peer
+%% nodes are a subset of the local nodes. In practice, it's typically
+%% known if it's peers or the local node originating sessions; if the
+%% former then setting id = [] disables the optional value mapping, if
+%% the latter then setting default = local disables the hashing.
 -spec route_session(ReqT :: request(), Opts)
    -> discard
     | pid()
  when Opts :: pos_integer()   %% aka #{search => N}
             | list()          %% aka #{dispatch => Opts}
             | #{search => non_neg_integer(), %% limit number of examined AVPs
-                default => discard | mfa(),  %% return node() | false
-                dispatch => list() | mfa()}. %% spawn options or return pid()
+                id => [binary()],   %% restrict optional value map on DiamIdent
+                default => local    %% handle locally
+                         | discard
+                         | mfa(),   %% return node() | false
+                dispatch => list()  %% spawn options
+                          | mfa()}. %% (Node, M, F, A) -> pid() | discard
 
 route_session(ReqT, Opts) ->
-    #diameter_packet{bin = Bin} = Pkt = element(1, ReqT),
+    {_, Bin} = Info = diameter_traffic:request_info(ReqT),
     Sid = session_id(avps(Bin), search(Opts)),
-    Node = default(node_of_session_id(Sid), Sid, Opts, Pkt),
+    Node = default(node_of_session_id(Sid, Opts), Sid, Opts, Info),
     dispatch(Node, ReqT, dispatch(Opts)).
 
 %% avps/1
@@ -128,24 +198,31 @@ dispatch(Node, ReqT, Opts) ->
 route_session(ReqT) ->
     route_session(ReqT, []).
 
-%% node_of_session_id/1
+%% node_of_session_id/2
 %%
 %% Return the node name encoded as optional value in a Session-Id,
 %% assuming the id has been created with diameter:session_id/0. Lookup
 %% the node name to ensure we don't convert arbitrary binaries to
 %% atom.
 
-node_of_session_id([_, _, _, Bin]) ->
-    case ets:lookup(?TABLE, Bin) of
-        [{_, Node}] ->
-            Node;
-        [] ->
-            false
-    end;
+node_of_session_id([Id, _, _, Bin], #{id := Ids}) ->
+    lists:member(Id, Ids) andalso nodemap(Bin);
 
-node_of_session_id(_) ->
+node_of_session_id([_, _, _, Bin], _) ->
+    nodemap(Bin);
+
+node_of_session_id(_, _) ->
     false.
 
+%% nodemap/1
+
+nodemap(Bin) ->
+    try
+        ets:lookup_element(?NODE_TABLE, Bin, 2)
+    catch
+        error: badarg -> false
+    end.
+
 %% session_id/2
 
 session_id(_, 0) ->  %% give up
@@ -154,7 +231,7 @@ session_id(_, 0) ->  %% give up
 %% Session-Id = Command Code 263, V-bit = 0.
 session_id(<<263:32, 0:1, _:7, Len:24, _/binary>> = Bin, _) ->
     case Bin of
-        <<Avp:Len/binary>> ->
+        <<Avp:Len/binary, _/binary>> ->
             <<_:8/binary, Sid/binary>> = Avp,
             split(Sid);
         _ ->
@@ -248,20 +325,123 @@ search(_) ->
 %%
 %% Choose a node when Session-Id lookup has failed.
 
-default(false = No, _, #{default := discard}, _) ->
-    No;
-
-default(false, Sid, #{default := {M,F,A}}, Pkt) ->
-    apply(M, F, [Sid, Pkt | A]);  %% false | node()
+default(false, _, #{default := discard}, _) ->
+    false;
 
-default(false, _, _, _) ->
+default(false, _, #{default := local}, _) ->
     node();
 
+default(false, Sid, #{default := {M,F,A}}, Info) ->
+    {ServiceName, Bin} = Info,
+    apply(M, F, [Sid, ServiceName, Bin | A]);  %% node() | false
+
+default(false, Sid, _, Info) -> %% aka {?MODULE, hash, []}
+    {ServiceName, Bin} = Info,
+    hash(Sid, ServiceName, Bin);
+
 default(Node, _, _, _) ->
     Node.
 
 %% ===========================================================================
 
+%% hash/3
+%%
+%% Consistent hashing of Session-Id to an attached node, or the local
+%% node if Session-Id = false or no attached nodes.
+
+hash(Sid, ServiceName, _) ->
+    case false /= Sid andalso attached(ServiceName) of
+        [_|_] = Nodes ->
+            hash(Sid, Nodes);
+        _ ->
+            node()
+    end.
+
+%% hash/2
+%%
+%% Consistent hashing on arbitrary key/values. Returns false if the
+%% list is empty.
+
+%% No key or no values.
+hash(_, []) ->
+    false;
+
+%% Not much choice.
+hash(_, [Value]) ->
+    Value;
+
+%% Hash on a circle and choose the closest predecessor.
+hash(Key, Values) ->
+    Hash = ?HASH(Key),
+    tl(lists:foldl(fun(V,A) ->
+                           choose(Hash, [?HASH({Key, V}) | V], A)
+                   end,
+                   false,  %% < list()
+                   Values)).
+
+%% choose/3
+
+choose(Hash, [Hash1 | _] = T, [Hash2 | _])
+  when Hash1 =< Hash, Hash < Hash2 ->
+    T;
+
+choose(Hash, [Hash1 | _], [Hash2 | _] = T)
+  when Hash2 =< Hash, Hash < Hash1 ->
+    T;
+
+choose(_, T1, T2) ->
+    max(T1, T2).
+
+%% ===========================================================================
+
+%% attach/1
+%%
+%% Register the local node as a handler of incoming requests for the
+%% specified services when using the route_session/2 spawn_opt
+%% callback.
+
+attach(ServiceNames) ->
+    abcast({attach, node(), ServiceNames}).
+
+%% detach/1
+%%
+%% Deregister the local node as a handler of incoming requests.
+
+detach(ServiceNames) ->
+    abcast({detach, node(), ServiceNames}).
+
+%% abcast/1
+
+abcast(T) ->
+    gen_server:abcast([node() | nodes()], ?SERVER, T),
+    ok.
+
+%% attached/1
+
+attached(ServiceName) ->
+    try
+        ets:lookup_element(?SERVICE_TABLE, ServiceName, 2)
+    catch
+        error: badarg -> []
+    end.
+
+%% cast/2
+
+cast(Node, T) ->
+    gen_server:cast({?SERVER, Node}, T).
+
+%% attach/2
+
+attach(Node, S) ->
+    case sets:to_list(S) of
+        [] ->
+            ok;
+        Services ->
+            cast(Node, {attach, node(), Services})
+    end.
+
+%% ===========================================================================
+
 start_link() ->
     gen_server:start_link({local, ?SERVER}, ?MODULE, _Args = [], _Opts  = []).
 
@@ -272,11 +452,12 @@ start_link() ->
 %% binaries that aren't necessarily node names.
 
 init([]) ->
-    ets:new(?TABLE, [set, named_table]),
+    ets:new(?NODE_TABLE,    [set, named_table]),
+    ets:new(?SERVICE_TABLE, [bag, named_table]),
     ok = net_kernel:monitor_nodes(true, [{node_type, all}, nodedown_reason]),
-    ets:insert(?TABLE, [{B,N} || N <- [node() | nodes()],
-                                 B <- [?B(N)]]),
-    {ok, erlang:monotonic_time()}.
+    ets:insert(?NODE_TABLE, [{?B(N), N} || N <- [node() | nodes()]]),
+    abcast({attach, node()}),
+    {ok, sets:new()}.
 
 %% handle_call/3
 
@@ -285,17 +466,49 @@ handle_call(_, _From, S) ->
 
 %% handle_cast/2
 
+%% Remote node is asking which services the local node wants to handle.
+handle_cast({attach, Node}, S)
+  when Node /= node() ->
+    attach(Node, S),
+    {noreply, S};
+
+%% Node wants to handle incoming requests ...
+handle_cast({attach, Node, ServiceNames}, S) ->
+    ets:insert(?SERVICE_TABLE, [{N, Node} || N <- ServiceNames]),
+    {noreply, case node() of
+                  Node ->
+                      sets:union(S, sets:from_list(ServiceNames));
+                  _ ->
+                      S
+              end};
+
+%% ... or not.
+handle_cast({detach, Node, ServiceNames}, S) ->
+    ets:select_delete(?SERVICE_TABLE, [{{'$1', Node},
+                                        [?ORCOND([{'==', '$1', {const, N}}
+                                                  || N <- ServiceNames])],
+                                        [true]}]),
+    {noreply, case node() of
+                  Node ->
+                      sets:subtract(S, sets:from_list(ServiceNames));
+                  _ ->
+                      S
+              end};
+
 handle_cast(_, S) ->
     {noreply, S}.
 
 %% handle_info/2
 
 handle_info({nodeup, Node, _}, S) ->
-    ets:insert(?TABLE, {?B(Node), Node}),
+    ets:insert(?NODE_TABLE, {?B(Node), Node}),
+    cast(Node, {attach, node()}),  %% ask which services remote node handles
+    attach(Node, S),               %% say which service local node handles
     {noreply, S};
 
 handle_info({nodedown, Node, _}, S) ->
-    ets:delete(?TABLE, ?B(Node)),
+    ets:delete(?NODE_TABLE, ?B(Node)),
+    ets:select_delete(?SERVICE_TABLE, [{{'_', Node}, [], [true]}]),
     {noreply, S};
 
 handle_info(_, S) ->
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl
index b1b797aad8..c0643402a6 100644
--- a/lib/diameter/src/base/diameter_traffic.erl
+++ b/lib/diameter/src/base/diameter_traffic.erl
@@ -42,6 +42,9 @@
          peer_up/1,
          peer_down/1]).
 
+%% towards diameter_dist
+-export([request_info/1]).
+
 %% internal
 -export([send/1,    %% send from remote node
          request/1, %% process request in handler process
@@ -289,8 +292,7 @@ spawn_request(false, _, _, _, _, _, _) ->  %% no transport
 %% handler process dies (in a handle_request callback for example).
 spawn_request(AppT, {M,F,A}, Ack, TPid, Pkt, Dict0, RecvData) ->
     %% Term to pass to request/1 in an appropriate process. Module
-    %% diameter_dist implements callbacks, and uses the form of the
-    %% argument tuple constructed below.
+    %% diameter_dist implements callbacks.
     ReqT = {Pkt, AppT, Ack, TPid, Dict0, RecvData},
     apply(M, F, [ReqT | A]);
 
@@ -302,6 +304,13 @@ spawn_request(AppT, Opts, Ack, TPid, Pkt, Dict0, RecvData) ->
               end,
               Opts).
 
+%% request_info/1
+%%
+%% Limited request information for diameter_dist.
+
+request_info({Pkt, _AppT, _Ack, _TPid, _Dict0, RecvData} = _ReqT) ->
+    {RecvData#recvdata.service_name, Pkt#diameter_packet.bin}.
+
 %% request/1
 %%
 %% Called from a handler process chosen by a transport spawn_opt MFA
-- 
2.16.4