File 2961-Rewrite-erpc-multicall-to-utilize-new-receive-optimi.patch of Package erlang

From 3cff897b83decdd82e2eebc60d98dfa57d89a467 Mon Sep 17 00:00:00 2001
From: Rickard Green <rickard@erlang.org>
Date: Mon, 15 Feb 2021 19:53:11 +0100
Subject: [PATCH] Rewrite erpc:multicall() to utilize new receive optimization

---
 lib/kernel/src/erpc.erl        | 194 +++++++++++++++++++++------------
 lib/kernel/test/erpc_SUITE.erl | 112 +++++++++++++++++++
 2 files changed, 235 insertions(+), 71 deletions(-)

diff --git a/lib/kernel/src/erpc.erl b/lib/kernel/src/erpc.erl
index 89b286f479..a6b691754b 100644
--- a/lib/kernel/src/erpc.erl
+++ b/lib/kernel/src/erpc.erl
@@ -319,33 +319,9 @@ multicall(Ns, M, F, A, T) ->
         true = is_atom(M),
         true = is_atom(F),
         true = is_list(A),
-        Deadline = deadline(T),
-        {ReqIds, LC} = mcall_send_requests(Ns, M, F, A, [], T, false),
-        LRes = case LC of
-                   false ->
-                       undefined;
-                   true ->
-                       %% Timeout infinity and call on local node wanted;
-                       %% execute local call in this process...
-                       try
-                           {return, Return} = execute_call(M, F, A),
-                           {ok, Return}
-                       catch
-                           throw:Thrown ->
-                               {throw, Thrown};
-                           exit:Reason ->
-                               {exit, {exception, Reason}};
-                           error:Reason:Stack ->
-                               case is_arg_error(Reason, M, F, A) of
-                                   true ->
-                                       {error, {?MODULE, Reason}};
-                                   false ->
-                                       ErpcStack = trim_stack(Stack, M, F, A),
-                                       {error, {exception, Reason, ErpcStack}}
-                               end
-                       end
-               end,
-        mcall_receive_replies(ReqIds, [], LRes, Deadline)
+        Tag = make_ref(),
+        SendState = mcall_send_requests(Tag, Ns, M, F, A, T),
+        mcall_receive_replies(Tag, SendState)
     catch
         error:NotIErr when NotIErr /= internal_error ->
             error({?MODULE, badarg})
@@ -569,6 +545,8 @@ deadline(T) when ?IS_VALID_TMO_INT(T) ->
 
 time_left(infinity) ->
     infinity;
+time_left(expired) ->
+    0;
 time_left(Deadline) ->
     case Deadline - erlang:monotonic_time() of
         TimeLeft when TimeLeft =< 0 ->
@@ -577,50 +555,124 @@ time_left(Deadline) ->
             erlang:convert_time_unit(TimeLeft-1, native, millisecond) + 1
     end.
 
-mcall_send_requests([], _M, _F, _A, RIDs, _T, LC) ->
-    {RIDs, LC};
-mcall_send_requests([N|Ns], M, F, A, RIDs,
-                    infinity, false) when N == node() ->
-    mcall_send_requests(Ns, M, F, A, [local_call|RIDs], infinity, true);
-mcall_send_requests([N|Ns], M, F, A, RIDs, T, LC) ->
-    RID = try
-              send_request(N, M, F, A)
-          catch
-              _:_ ->
-                  %% Bad arguments... Abandon
-                  %% requests we've already sent
-                  %% and then fail...
-                  mcall_failure_abandon(RIDs)
-          end,
-    mcall_send_requests(Ns, M, F, A, [RID|RIDs], T, LC);
-mcall_send_requests(_, _, _, _, RIDs, _T, _LC) ->
-    %% Bad nodes list... Abandon requests we've
-    %% already sent and then fail...
-    mcall_failure_abandon(RIDs).
-
-mcall_failure_abandon([]) ->
-    error(badarg);
-mcall_failure_abandon([local_call|RIDs]) ->
-    mcall_failure_abandon(RIDs);
-mcall_failure_abandon([RID|RIDs]) ->
+mcall_local_call(M, F, A) ->
     try
-        _ = receive_response(RID, 0),
-        ok
+        {return, Return} = execute_call(M, F, A),
+        {ok, Return}
+    catch
+        throw:Thrown ->
+            {throw, Thrown};
+        exit:Reason ->
+            {exit, {exception, Reason}};
+        error:Reason:Stack ->
+            case is_arg_error(Reason, M, F, A) of
+                true ->
+                    {error, {?MODULE, Reason}};
+                false ->
+                    ErpcStack = trim_stack(Stack, M, F, A),
+                    {error, {exception, Reason, ErpcStack}}
+            end
+    end.
+
+mcall_send_request(T, N, M, F, A) when is_reference(T),
+                                       is_atom(N),
+                                       is_atom(M),
+                                       is_atom(F),
+                                       is_list(A) ->
+    spawn_request(N, ?MODULE, execute_call, [T, M, F, A],
+                  [{reply, error_only},
+                   {reply_tag, T},
+                   {monitor, [{tag, T}]}]).
+
+mcall_send_requests(Tag, Ns, M, F, A, Tmo) ->
+    DL = deadline(Tmo),
+    mcall_send_requests(Tag, Ns, M, F, A, [], DL, undefined, 0).
+
+mcall_send_requests(_Tag, [], M, F, A, RIDs, DL, local_call, NRs) ->
+    %% Timeout infinity and call on local node wanted;
+    %% excecute local call in this process...
+    LRes = mcall_local_call(M, F, A),
+    {ok, RIDs, #{local_call => LRes}, NRs, DL};
+mcall_send_requests(_Tag, [], _M, _F, _A, RIDs, DL, _LC, NRs) ->
+    {ok, RIDs, #{}, NRs, DL};
+mcall_send_requests(Tag, [N|Ns], M, F, A, RIDs,
+                    infinity, undefined, NRs) when N == node() ->
+    mcall_send_requests(Tag, Ns, M, F, A, [local_call|RIDs],
+                        infinity, local_call, NRs);
+mcall_send_requests(Tag, [N|Ns], M, F, A, RIDs, DL, LC, NRs) ->
+    try mcall_send_request(Tag, N, M, F, A) of
+        RID ->
+            mcall_send_requests(Tag, Ns, M, F, A, [RID|RIDs],
+                                DL, LC, NRs+1)
     catch
         _:_ ->
-            ok
-    end,
-    mcall_failure_abandon(RIDs).
-
-mcall_receive_replies([], Replies, undefined, _Deadline) ->
-    Replies;
-mcall_receive_replies([local_call|RIDs], Replies, LRes, Deadline) ->
-    mcall_receive_replies(RIDs, [LRes|Replies], undefined, Deadline);    
-mcall_receive_replies([RID|RIDs], Replies, LRes, Deadline) ->
-    Reply = try
-                {ok, receive_response(RID, time_left(Deadline))}
-            catch
-                Class:Reason ->
-                    {Class, Reason}
-            end,
-    mcall_receive_replies(RIDs, [Reply|Replies], LRes, Deadline).
+            %% Bad argument... Abandon requests and cleanup
+            %% any responses by receiving replies with a zero
+            %% timeout and then fail...
+            {badarg, RIDs, #{}, NRs, expired}
+    end;
+mcall_send_requests(_Tag, _Ns, _M, _F, _A, RIDs, _DL, _LC, NRs) ->
+    %% Bad nodes list... Abandon requests and cleanup any responses
+    %% by receiving replies with a zero timeout and then fail...
+    {badarg, RIDs, #{}, NRs, expired}.
+
+mcall_receive_replies(Tag, {SendRes, RIDs, Rpls, NRs, DL}) ->
+    ResRpls = mcall_receive_replies(Tag, RIDs, Rpls, NRs, DL),
+    if SendRes /= ok ->
+            error(SendRes); %% Cleanup done; fail...
+       true ->
+            mcall_map_replies(RIDs, ResRpls, [])
+    end.
+
+mcall_receive_replies(_Tag, _ReqIds, Rpls, 0, _DL) ->
+    Rpls;
+mcall_receive_replies(Tag, ReqIDs, Rpls, NRs, DL) ->
+    Tmo = time_left(DL),
+    receive
+        {Tag, ReqId, error, Reason} ->
+            Res = mcall_result(spawn_reply, ReqId, Tag, Reason),
+            mcall_receive_replies(Tag, ReqIDs, Rpls#{ReqId => Res},
+                                  NRs-1, DL);
+        {Tag, ReqId, process, _Pid, Reason} ->
+            Res = mcall_result(down, ReqId, Tag, Reason),
+            mcall_receive_replies(Tag, ReqIDs, Rpls#{ReqId => Res},
+                                  NRs-1, DL)
+    after Tmo ->
+            if ReqIDs == [] ->
+                    Rpls;
+               true ->
+                    NewNRs = mcall_abandon(Tag, ReqIDs, Rpls, NRs),
+                    mcall_receive_replies(Tag, [], Rpls, NewNRs, expired)
+            end
+    end.
+
+mcall_result(ResType, ReqId, Tag, ResultReason) ->
+    try
+        {ok, result(ResType, ReqId, Tag, ResultReason)}
+    catch
+        Class:Reason ->
+            {Class, Reason}
+    end.
+
+mcall_abandon(_Tag, [], _Rpls, NRs) ->
+    NRs;
+mcall_abandon(Tag, [local_call | RIDs], Rpls, NRs) ->
+    mcall_abandon(Tag, RIDs, Rpls, NRs);
+mcall_abandon(Tag, [RID | RIDs], Rpls, NRs) ->
+    NewNRs = case maps:is_key(RID, Rpls) of
+                 true ->
+                     NRs;
+                 false ->
+                     case call_abandon(RID) of
+                         true -> NRs-1;
+                         false -> NRs
+                     end
+             end,
+    mcall_abandon(Tag, RIDs, Rpls, NewNRs).
+
+mcall_map_replies([], _Rpls, Res) ->
+    Res;
+mcall_map_replies([RID|RIDs], Rpls, Res) ->
+    Timeout = {error, {?MODULE, timeout}},
+    mcall_map_replies(RIDs, Rpls, [maps:get(RID, Rpls, Timeout) | Res]).
+    
diff --git a/lib/kernel/test/erpc_SUITE.erl b/lib/kernel/test/erpc_SUITE.erl
index 1c969ad183..b1e8ab7c75 100644
--- a/lib/kernel/test/erpc_SUITE.erl
+++ b/lib/kernel/test/erpc_SUITE.erl
@@ -28,6 +28,9 @@
          send_request_check_reqtmo/1,
          send_request_against_old_node/1,
          multicall/1, multicall_reqtmo/1,
+         multicall_recv_opt/1,
+         multicall_recv_opt2/1,
+         multicall_recv_opt3/1,
          multicast/1,
          timeout_limit/1]).
 -export([init_per_testcase/2, end_per_testcase/2]).
@@ -55,6 +58,9 @@ all() ->
      send_request_against_old_node,
      multicall,
      multicall_reqtmo,
+     multicall_recv_opt,
+     multicall_recv_opt2,
+     multicall_recv_opt3,
      multicast,
      timeout_limit].
 
@@ -1088,6 +1094,102 @@ multicall_reqtmo(Config) when is_list(Config) ->
     stop_node(QuickNode2),
     Res.
 
+multicall_recv_opt(Config) when is_list(Config) ->
+    Loops = 1000,
+    HugeMsgQ = 500000,
+    process_flag(message_queue_data, off_heap),
+    {ok, Node1} = start_node(Config),
+    {ok, Node2} = start_node(Config),
+    ExpRes = [{ok, node()}, {ok, Node1}, {ok, Node2}],
+    Nodes = [node(), Node1, Node2],
+    Fun = fun () -> erlang:node() end,
+    _Warmup = time_multicall(ExpRes, Nodes, Fun, infinity, Loops div 10),
+    Empty = time_multicall(ExpRes, Nodes, Fun, infinity, Loops),
+    io:format("Time with empty message queue: ~p microsecond~n",
+	      [erlang:convert_time_unit(Empty, native, microsecond)]),
+    _ = [self() ! {msg,N} || N <- lists:seq(1, HugeMsgQ)],
+    Huge = time_multicall(ExpRes, Nodes, Fun, infinity, Loops),
+    io:format("Time with huge message queue: ~p microsecond~n",
+	      [erlang:convert_time_unit(Huge, native, microsecond)]),
+    stop_node(Node1),
+    stop_node(Node2),
+    Q = Huge / Empty,
+    HugeMsgQ = flush_msgq(),
+    case Q > 10 of
+	true ->
+	    ct:fail({ratio, Q});
+	false ->
+	    {comment, "Ratio: "++erlang:float_to_list(Q)}
+    end.
+
+multicall_recv_opt2(Config) when is_list(Config) ->
+    Loops = 1000,
+    HugeMsgQ = 500000,
+    process_flag(message_queue_data, off_heap),
+    {ok, Node1} = start_node(Config),
+    stop_node(Node1),
+    {ok, Node2} = start_node(Config),
+    ExpRes = [{ok, node()}, {error, {erpc, noconnection}}, {ok, Node2}],
+    Nodes = [node(), Node1, Node2],
+    Fun = fun () -> erlang:node() end,
+    _Warmup = time_multicall(ExpRes, Nodes, Fun, infinity, Loops div 10),
+    Empty = time_multicall(ExpRes, Nodes, Fun, infinity, Loops),
+    io:format("Time with empty message queue: ~p microsecond~n",
+	      [erlang:convert_time_unit(Empty, native, microsecond)]),
+    _ = [self() ! {msg,N} || N <- lists:seq(1, HugeMsgQ)],
+    Huge = time_multicall(ExpRes, Nodes, Fun, infinity, Loops),
+    io:format("Time with huge message queue: ~p microsecond~n",
+	      [erlang:convert_time_unit(Huge, native, microsecond)]),
+    stop_node(Node2),
+    Q = Huge / Empty,
+    HugeMsgQ = flush_msgq(),
+    case Q > 10 of
+	true ->
+	    ct:fail({ratio, Q});
+	false ->
+	    {comment, "Ratio: "++erlang:float_to_list(Q)}
+    end.
+
+multicall_recv_opt3(Config) when is_list(Config) ->
+    Loops = 1000,
+    HugeMsgQ = 500000,
+    process_flag(message_queue_data, off_heap),
+    {ok, Node1} = start_node(Config),
+    stop_node(Node1),
+    {ok, Node2} = start_node(Config),
+    Nodes = [node(), Node1, Node2],
+    Fun = fun () -> erlang:node() end,
+    _Warmup = time_multicall(undefined, Nodes, Fun, infinity, Loops div 10),
+    Empty = time_multicall(undefined, Nodes, Fun, infinity, Loops),
+    io:format("Time with empty message queue: ~p microsecond~n",
+	      [erlang:convert_time_unit(Empty, native, microsecond)]),
+    _ = [self() ! {msg,N} || N <- lists:seq(1, HugeMsgQ)],
+    Huge = time_multicall(undefined, Nodes, Fun, 0, Loops),
+    io:format("Time with huge message queue: ~p microsecond~n",
+	      [erlang:convert_time_unit(Huge, native, microsecond)]),
+    stop_node(Node2),
+    Q = Huge / Empty,
+    HugeMsgQ = flush_msgq(),
+    case Q > 10 of
+	true ->
+	    ct:fail({ratio, Q});
+	false ->
+	    {comment, "Ratio: "++erlang:float_to_list(Q)}
+    end.
+
+time_multicall(Expect, Nodes, Fun, Tmo, Times) ->
+    Start = erlang:monotonic_time(),
+    ok = do_time_multicall(Expect, Nodes, Fun, Tmo, Times),
+    erlang:monotonic_time() - Start.
+
+do_time_multicall(_Expect, _Nodes, _Fun, _Tmo, 0) ->
+    ok;
+do_time_multicall(undefined, Nodes, Fun, Tmo, N) ->
+    _ = erpc:multicall(Nodes, Fun, Tmo),
+    do_time_multicall(undefined, Nodes, Fun, Tmo, N-1);
+do_time_multicall(Expect, Nodes, Fun, Tmo, N) ->
+    Expect = erpc:multicall(Nodes, Fun, Tmo),
+    do_time_multicall(Expect, Nodes, Fun, Tmo, N-1).
 
 multicast(Config) when is_list(Config) ->
     {ok, Node} = start_node(Config),
@@ -1280,3 +1382,13 @@ f() ->
 f2() ->
     timer:sleep(500),
     halt().
+
+flush_msgq() ->
+    flush_msgq(0).
+flush_msgq(N) ->
+    receive
+	_ ->
+	    flush_msgq(N+1)
+    after 0 ->
+	    N
+    end.
-- 
2.26.2

openSUSE Build Service is sponsored by