File 2961-Rewrite-erpc-multicall-to-utilize-new-receive-optimi.patch of Package erlang
From 3cff897b83decdd82e2eebc60d98dfa57d89a467 Mon Sep 17 00:00:00 2001
From: Rickard Green <rickard@erlang.org>
Date: Mon, 15 Feb 2021 19:53:11 +0100
Subject: [PATCH] Rewrite erpc:multicall() to utilize new receive optimization
---
lib/kernel/src/erpc.erl | 194 +++++++++++++++++++++------------
lib/kernel/test/erpc_SUITE.erl | 112 +++++++++++++++++++
2 files changed, 235 insertions(+), 71 deletions(-)
diff --git a/lib/kernel/src/erpc.erl b/lib/kernel/src/erpc.erl
index 89b286f479..a6b691754b 100644
--- a/lib/kernel/src/erpc.erl
+++ b/lib/kernel/src/erpc.erl
@@ -319,33 +319,9 @@ multicall(Ns, M, F, A, T) ->
true = is_atom(M),
true = is_atom(F),
true = is_list(A),
- Deadline = deadline(T),
- {ReqIds, LC} = mcall_send_requests(Ns, M, F, A, [], T, false),
- LRes = case LC of
- false ->
- undefined;
- true ->
- %% Timeout infinity and call on local node wanted;
- %% execute local call in this process...
- try
- {return, Return} = execute_call(M, F, A),
- {ok, Return}
- catch
- throw:Thrown ->
- {throw, Thrown};
- exit:Reason ->
- {exit, {exception, Reason}};
- error:Reason:Stack ->
- case is_arg_error(Reason, M, F, A) of
- true ->
- {error, {?MODULE, Reason}};
- false ->
- ErpcStack = trim_stack(Stack, M, F, A),
- {error, {exception, Reason, ErpcStack}}
- end
- end
- end,
- mcall_receive_replies(ReqIds, [], LRes, Deadline)
+ Tag = make_ref(),
+ SendState = mcall_send_requests(Tag, Ns, M, F, A, T),
+ mcall_receive_replies(Tag, SendState)
catch
error:NotIErr when NotIErr /= internal_error ->
error({?MODULE, badarg})
@@ -569,6 +545,8 @@ deadline(T) when ?IS_VALID_TMO_INT(T) ->
time_left(infinity) ->
infinity;
+time_left(expired) ->
+ 0;
time_left(Deadline) ->
case Deadline - erlang:monotonic_time() of
TimeLeft when TimeLeft =< 0 ->
@@ -577,50 +555,124 @@ time_left(Deadline) ->
erlang:convert_time_unit(TimeLeft-1, native, millisecond) + 1
end.
-mcall_send_requests([], _M, _F, _A, RIDs, _T, LC) ->
- {RIDs, LC};
-mcall_send_requests([N|Ns], M, F, A, RIDs,
- infinity, false) when N == node() ->
- mcall_send_requests(Ns, M, F, A, [local_call|RIDs], infinity, true);
-mcall_send_requests([N|Ns], M, F, A, RIDs, T, LC) ->
- RID = try
- send_request(N, M, F, A)
- catch
- _:_ ->
- %% Bad arguments... Abandon
- %% requests we've already sent
- %% and then fail...
- mcall_failure_abandon(RIDs)
- end,
- mcall_send_requests(Ns, M, F, A, [RID|RIDs], T, LC);
-mcall_send_requests(_, _, _, _, RIDs, _T, _LC) ->
- %% Bad nodes list... Abandon requests we've
- %% already sent and then fail...
- mcall_failure_abandon(RIDs).
-
-mcall_failure_abandon([]) ->
- error(badarg);
-mcall_failure_abandon([local_call|RIDs]) ->
- mcall_failure_abandon(RIDs);
-mcall_failure_abandon([RID|RIDs]) ->
+mcall_local_call(M, F, A) ->
try
- _ = receive_response(RID, 0),
- ok
+ {return, Return} = execute_call(M, F, A),
+ {ok, Return}
+ catch
+ throw:Thrown ->
+ {throw, Thrown};
+ exit:Reason ->
+ {exit, {exception, Reason}};
+ error:Reason:Stack ->
+ case is_arg_error(Reason, M, F, A) of
+ true ->
+ {error, {?MODULE, Reason}};
+ false ->
+ ErpcStack = trim_stack(Stack, M, F, A),
+ {error, {exception, Reason, ErpcStack}}
+ end
+ end.
+
+mcall_send_request(T, N, M, F, A) when is_reference(T),
+ is_atom(N),
+ is_atom(M),
+ is_atom(F),
+ is_list(A) ->
+ spawn_request(N, ?MODULE, execute_call, [T, M, F, A],
+ [{reply, error_only},
+ {reply_tag, T},
+ {monitor, [{tag, T}]}]).
+
+mcall_send_requests(Tag, Ns, M, F, A, Tmo) ->
+ DL = deadline(Tmo),
+ mcall_send_requests(Tag, Ns, M, F, A, [], DL, undefined, 0).
+
+mcall_send_requests(_Tag, [], M, F, A, RIDs, DL, local_call, NRs) ->
+ %% Timeout infinity and call on local node wanted;
+ %% excecute local call in this process...
+ LRes = mcall_local_call(M, F, A),
+ {ok, RIDs, #{local_call => LRes}, NRs, DL};
+mcall_send_requests(_Tag, [], _M, _F, _A, RIDs, DL, _LC, NRs) ->
+ {ok, RIDs, #{}, NRs, DL};
+mcall_send_requests(Tag, [N|Ns], M, F, A, RIDs,
+ infinity, undefined, NRs) when N == node() ->
+ mcall_send_requests(Tag, Ns, M, F, A, [local_call|RIDs],
+ infinity, local_call, NRs);
+mcall_send_requests(Tag, [N|Ns], M, F, A, RIDs, DL, LC, NRs) ->
+ try mcall_send_request(Tag, N, M, F, A) of
+ RID ->
+ mcall_send_requests(Tag, Ns, M, F, A, [RID|RIDs],
+ DL, LC, NRs+1)
catch
_:_ ->
- ok
- end,
- mcall_failure_abandon(RIDs).
-
-mcall_receive_replies([], Replies, undefined, _Deadline) ->
- Replies;
-mcall_receive_replies([local_call|RIDs], Replies, LRes, Deadline) ->
- mcall_receive_replies(RIDs, [LRes|Replies], undefined, Deadline);
-mcall_receive_replies([RID|RIDs], Replies, LRes, Deadline) ->
- Reply = try
- {ok, receive_response(RID, time_left(Deadline))}
- catch
- Class:Reason ->
- {Class, Reason}
- end,
- mcall_receive_replies(RIDs, [Reply|Replies], LRes, Deadline).
+ %% Bad argument... Abandon requests and cleanup
+ %% any responses by receiving replies with a zero
+ %% timeout and then fail...
+ {badarg, RIDs, #{}, NRs, expired}
+ end;
+mcall_send_requests(_Tag, _Ns, _M, _F, _A, RIDs, _DL, _LC, NRs) ->
+ %% Bad nodes list... Abandon requests and cleanup any responses
+ %% by receiving replies with a zero timeout and then fail...
+ {badarg, RIDs, #{}, NRs, expired}.
+
+mcall_receive_replies(Tag, {SendRes, RIDs, Rpls, NRs, DL}) ->
+ ResRpls = mcall_receive_replies(Tag, RIDs, Rpls, NRs, DL),
+ if SendRes /= ok ->
+ error(SendRes); %% Cleanup done; fail...
+ true ->
+ mcall_map_replies(RIDs, ResRpls, [])
+ end.
+
+mcall_receive_replies(_Tag, _ReqIds, Rpls, 0, _DL) ->
+ Rpls;
+mcall_receive_replies(Tag, ReqIDs, Rpls, NRs, DL) ->
+ Tmo = time_left(DL),
+ receive
+ {Tag, ReqId, error, Reason} ->
+ Res = mcall_result(spawn_reply, ReqId, Tag, Reason),
+ mcall_receive_replies(Tag, ReqIDs, Rpls#{ReqId => Res},
+ NRs-1, DL);
+ {Tag, ReqId, process, _Pid, Reason} ->
+ Res = mcall_result(down, ReqId, Tag, Reason),
+ mcall_receive_replies(Tag, ReqIDs, Rpls#{ReqId => Res},
+ NRs-1, DL)
+ after Tmo ->
+ if ReqIDs == [] ->
+ Rpls;
+ true ->
+ NewNRs = mcall_abandon(Tag, ReqIDs, Rpls, NRs),
+ mcall_receive_replies(Tag, [], Rpls, NewNRs, expired)
+ end
+ end.
+
+mcall_result(ResType, ReqId, Tag, ResultReason) ->
+ try
+ {ok, result(ResType, ReqId, Tag, ResultReason)}
+ catch
+ Class:Reason ->
+ {Class, Reason}
+ end.
+
+mcall_abandon(_Tag, [], _Rpls, NRs) ->
+ NRs;
+mcall_abandon(Tag, [local_call | RIDs], Rpls, NRs) ->
+ mcall_abandon(Tag, RIDs, Rpls, NRs);
+mcall_abandon(Tag, [RID | RIDs], Rpls, NRs) ->
+ NewNRs = case maps:is_key(RID, Rpls) of
+ true ->
+ NRs;
+ false ->
+ case call_abandon(RID) of
+ true -> NRs-1;
+ false -> NRs
+ end
+ end,
+ mcall_abandon(Tag, RIDs, Rpls, NewNRs).
+
+mcall_map_replies([], _Rpls, Res) ->
+ Res;
+mcall_map_replies([RID|RIDs], Rpls, Res) ->
+ Timeout = {error, {?MODULE, timeout}},
+ mcall_map_replies(RIDs, Rpls, [maps:get(RID, Rpls, Timeout) | Res]).
+
diff --git a/lib/kernel/test/erpc_SUITE.erl b/lib/kernel/test/erpc_SUITE.erl
index 1c969ad183..b1e8ab7c75 100644
--- a/lib/kernel/test/erpc_SUITE.erl
+++ b/lib/kernel/test/erpc_SUITE.erl
@@ -28,6 +28,9 @@
send_request_check_reqtmo/1,
send_request_against_old_node/1,
multicall/1, multicall_reqtmo/1,
+ multicall_recv_opt/1,
+ multicall_recv_opt2/1,
+ multicall_recv_opt3/1,
multicast/1,
timeout_limit/1]).
-export([init_per_testcase/2, end_per_testcase/2]).
@@ -55,6 +58,9 @@ all() ->
send_request_against_old_node,
multicall,
multicall_reqtmo,
+ multicall_recv_opt,
+ multicall_recv_opt2,
+ multicall_recv_opt3,
multicast,
timeout_limit].
@@ -1088,6 +1094,102 @@ multicall_reqtmo(Config) when is_list(Config) ->
stop_node(QuickNode2),
Res.
+multicall_recv_opt(Config) when is_list(Config) ->
+ Loops = 1000,
+ HugeMsgQ = 500000,
+ process_flag(message_queue_data, off_heap),
+ {ok, Node1} = start_node(Config),
+ {ok, Node2} = start_node(Config),
+ ExpRes = [{ok, node()}, {ok, Node1}, {ok, Node2}],
+ Nodes = [node(), Node1, Node2],
+ Fun = fun () -> erlang:node() end,
+ _Warmup = time_multicall(ExpRes, Nodes, Fun, infinity, Loops div 10),
+ Empty = time_multicall(ExpRes, Nodes, Fun, infinity, Loops),
+ io:format("Time with empty message queue: ~p microsecond~n",
+ [erlang:convert_time_unit(Empty, native, microsecond)]),
+ _ = [self() ! {msg,N} || N <- lists:seq(1, HugeMsgQ)],
+ Huge = time_multicall(ExpRes, Nodes, Fun, infinity, Loops),
+ io:format("Time with huge message queue: ~p microsecond~n",
+ [erlang:convert_time_unit(Huge, native, microsecond)]),
+ stop_node(Node1),
+ stop_node(Node2),
+ Q = Huge / Empty,
+ HugeMsgQ = flush_msgq(),
+ case Q > 10 of
+ true ->
+ ct:fail({ratio, Q});
+ false ->
+ {comment, "Ratio: "++erlang:float_to_list(Q)}
+ end.
+
+multicall_recv_opt2(Config) when is_list(Config) ->
+ Loops = 1000,
+ HugeMsgQ = 500000,
+ process_flag(message_queue_data, off_heap),
+ {ok, Node1} = start_node(Config),
+ stop_node(Node1),
+ {ok, Node2} = start_node(Config),
+ ExpRes = [{ok, node()}, {error, {erpc, noconnection}}, {ok, Node2}],
+ Nodes = [node(), Node1, Node2],
+ Fun = fun () -> erlang:node() end,
+ _Warmup = time_multicall(ExpRes, Nodes, Fun, infinity, Loops div 10),
+ Empty = time_multicall(ExpRes, Nodes, Fun, infinity, Loops),
+ io:format("Time with empty message queue: ~p microsecond~n",
+ [erlang:convert_time_unit(Empty, native, microsecond)]),
+ _ = [self() ! {msg,N} || N <- lists:seq(1, HugeMsgQ)],
+ Huge = time_multicall(ExpRes, Nodes, Fun, infinity, Loops),
+ io:format("Time with huge message queue: ~p microsecond~n",
+ [erlang:convert_time_unit(Huge, native, microsecond)]),
+ stop_node(Node2),
+ Q = Huge / Empty,
+ HugeMsgQ = flush_msgq(),
+ case Q > 10 of
+ true ->
+ ct:fail({ratio, Q});
+ false ->
+ {comment, "Ratio: "++erlang:float_to_list(Q)}
+ end.
+
+multicall_recv_opt3(Config) when is_list(Config) ->
+ Loops = 1000,
+ HugeMsgQ = 500000,
+ process_flag(message_queue_data, off_heap),
+ {ok, Node1} = start_node(Config),
+ stop_node(Node1),
+ {ok, Node2} = start_node(Config),
+ Nodes = [node(), Node1, Node2],
+ Fun = fun () -> erlang:node() end,
+ _Warmup = time_multicall(undefined, Nodes, Fun, infinity, Loops div 10),
+ Empty = time_multicall(undefined, Nodes, Fun, infinity, Loops),
+ io:format("Time with empty message queue: ~p microsecond~n",
+ [erlang:convert_time_unit(Empty, native, microsecond)]),
+ _ = [self() ! {msg,N} || N <- lists:seq(1, HugeMsgQ)],
+ Huge = time_multicall(undefined, Nodes, Fun, 0, Loops),
+ io:format("Time with huge message queue: ~p microsecond~n",
+ [erlang:convert_time_unit(Huge, native, microsecond)]),
+ stop_node(Node2),
+ Q = Huge / Empty,
+ HugeMsgQ = flush_msgq(),
+ case Q > 10 of
+ true ->
+ ct:fail({ratio, Q});
+ false ->
+ {comment, "Ratio: "++erlang:float_to_list(Q)}
+ end.
+
+time_multicall(Expect, Nodes, Fun, Tmo, Times) ->
+ Start = erlang:monotonic_time(),
+ ok = do_time_multicall(Expect, Nodes, Fun, Tmo, Times),
+ erlang:monotonic_time() - Start.
+
+do_time_multicall(_Expect, _Nodes, _Fun, _Tmo, 0) ->
+ ok;
+do_time_multicall(undefined, Nodes, Fun, Tmo, N) ->
+ _ = erpc:multicall(Nodes, Fun, Tmo),
+ do_time_multicall(undefined, Nodes, Fun, Tmo, N-1);
+do_time_multicall(Expect, Nodes, Fun, Tmo, N) ->
+ Expect = erpc:multicall(Nodes, Fun, Tmo),
+ do_time_multicall(Expect, Nodes, Fun, Tmo, N-1).
multicast(Config) when is_list(Config) ->
{ok, Node} = start_node(Config),
@@ -1280,3 +1382,13 @@ f() ->
f2() ->
timer:sleep(500),
halt().
+
+flush_msgq() ->
+ flush_msgq(0).
+flush_msgq(N) ->
+ receive
+ _ ->
+ flush_msgq(N+1)
+ after 0 ->
+ N
+ end.
--
2.26.2