File 5961-Introduce-receive-optimisation-to-dets-calls.patch of Package erlang
From c1bf961a6e42b71317271d46cdd34985ff25b81f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Sebastian=20Smyczy=C5=84ski?= <s.smyczynski@simplito.com>
Date: Thu, 2 Jun 2022 19:04:55 +0200
Subject: [PATCH] Introduce receive optimisation to dets calls
---
lib/stdlib/src/dets.erl | 98 +++++++++++++++++++---------------
lib/stdlib/test/dets_SUITE.erl | 32 ++++++++++-
2 files changed, 84 insertions(+), 46 deletions(-)
diff --git a/lib/stdlib/src/dets.erl b/lib/stdlib/src/dets.erl
index 640ad7a81c..133d209ae1 100644
--- a/lib/stdlib/src/dets.erl
+++ b/lib/stdlib/src/dets.erl
@@ -93,7 +93,8 @@
tab_name/0]).
-compile({inline, [{einval,2},{badarg,2},{undefined,1},
- {badarg_exit,2},{lookup_reply,2}]}).
+ {badarg_exit,2},{lookup_reply,2},
+ {pidof,1},{resp,2}]}).
-include_lib("kernel/include/file.hrl").
@@ -1237,15 +1238,24 @@ treq(Tab, R) ->
req(Proc, R) ->
Ref = erlang:monitor(process, Proc),
- Proc ! ?DETS_CALL(self(), R),
+ Proc ! ?DETS_CALL({self(), Ref}, R),
receive
{'DOWN', Ref, process, Proc, _Info} ->
badarg;
- {Proc, Reply} ->
+ {Ref, Reply} ->
erlang:demonitor(Ref, [flush]),
Reply
end.
+%% Inlined.
+pidof({Pid, _Tag}) ->
+ Pid.
+
+%% Inlined.
+resp({Pid, Tag} = _From, Message) ->
+ Pid ! {Tag, Message},
+ ok.
+
%% Inlined.
einval({error, {file_error, _, einval}}, A) ->
erlang:error(badarg, A);
@@ -1398,7 +1408,7 @@ apply_op(Op, From, Head, N) ->
true ->
err({error, incompatible_arguments})
end,
- From ! {self(), Res},
+ resp(From, Res),
ok;
auto_save ->
case Head#head.update_mode of
@@ -1419,7 +1429,7 @@ apply_op(Op, From, Head, N) ->
{0, Head}
end;
close ->
- From ! {self(), fclose(Head)},
+ resp(From, fclose(Head)),
_NewHead = unlink_fixing_procs(Head),
?PROFILE(ep:done()),
exit(normal);
@@ -1427,25 +1437,25 @@ apply_op(Op, From, Head, N) ->
%% Used from dets_server when Pid has closed the table,
%% but the table is still opened by some process.
NewHead = remove_fix(Head, Pid, close),
- From ! {self(), status(NewHead)},
+ resp(From, status(NewHead)),
NewHead;
{corrupt, Reason} ->
{H2, Error} = dets_utils:corrupt_reason(Head, Reason),
- From ! {self(), Error},
+ resp(From, Error),
H2;
{delayed_write, WrTime} ->
delayed_write(Head, WrTime);
info ->
{H2, Res} = finfo(Head),
- From ! {self(), Res},
+ resp(From, Res),
H2;
{info, Tag} ->
{H2, Res} = finfo(Head, Tag),
- From ! {self(), Res},
+ resp(From, Res),
H2;
{is_compatible_bchunk_format, Term} ->
Res = test_bchunk_format(Head, Term),
- From ! {self(), Res},
+ resp(From, Res),
ok;
{internal_open, Ref, Args} ->
do_internal_open(Head#head.parent, Head#head.server, From,
@@ -1462,27 +1472,27 @@ apply_op(Op, From, Head, N) ->
end;
{set_verbose, What} ->
set_verbose(What),
- From ! {self(), ok},
+ resp(From, ok),
ok;
{where, Object} ->
{H2, Res} = where_is_object(Head, Object),
- From ! {self(), Res},
+ resp(From, Res),
H2;
_Message when element(1, Head#head.update_mode) =:= error ->
- From ! {self(), status(Head)},
+ resp(From, status(Head)),
ok;
%% The following messages assume that the status of the table is OK.
{bchunk_init, Tab} ->
{H2, Res} = do_bchunk_init(Head, Tab),
- From ! {self(), Res},
+ resp(From, Res),
H2;
{bchunk, State} ->
{H2, Res} = do_bchunk(Head, State),
- From ! {self(), Res},
+ resp(From, Res),
H2;
delete_all_objects ->
{H2, Res} = fdelete_all_objects(Head),
- From ! {self(), Res},
+ resp(From, Res),
erlang:garbage_collect(),
{0, H2};
{delete_key, _Keys} when Head#head.update_mode =:= dirty ->
@@ -1492,16 +1502,16 @@ apply_op(Op, From, Head, N) ->
true ->
stream_op(Op, From, [], Head, N);
false ->
- From ! {self(), badarg},
+ resp(From, badarg),
ok
end;
first ->
{H2, Res} = ffirst(Head),
- From ! {self(), Res},
+ resp(From, Res),
H2;
{initialize, InitFun, Format, MinNoSlots} ->
{H2, Res} = finit(Head, InitFun, Format, MinNoSlots),
- From ! {self(), Res},
+ resp(From, Res),
erlang:garbage_collect(),
H2;
{insert, Objs} when Head#head.update_mode =:= dirty ->
@@ -1509,12 +1519,12 @@ apply_op(Op, From, Head, N) ->
true ->
stream_op(Op, From, [], Head, N);
false ->
- From ! {self(), badarg},
+ resp(From, badarg),
ok
end;
{insert_new, Objs} when Head#head.update_mode =:= dirty ->
{H2, Res} = finsert_new(Head, Objs),
- From ! {self(), Res},
+ resp(From, Res),
{N + 1, H2};
{lookup_keys, _Keys} ->
stream_op(Op, From, [], Head, N);
@@ -1523,48 +1533,48 @@ apply_op(Op, From, Head, N) ->
H2 = case Res of
{cont,_} -> H1;
_ when Safe =:= no_safe-> H1;
- _ when Safe =:= safe -> do_safe_fixtable(H1, From, false)
+ _ when Safe =:= safe -> do_safe_fixtable(H1, pidof(From), false)
end,
- From ! {self(), Res},
+ resp(From, Res),
H2;
{match, MP, Spec, NObjs, Safe} ->
{H2, Res} = fmatch(Head, MP, Spec, NObjs, Safe, From),
- From ! {self(), Res},
+ resp(From, Res),
H2;
{member, _Key} = Op ->
stream_op(Op, From, [], Head, N);
{next, Key} ->
{H2, Res} = fnext(Head, Key),
- From ! {self(), Res},
+ resp(From, Res),
H2;
{match_delete, State} when Head#head.update_mode =:= dirty ->
{H1, Res} = fmatch_delete(Head, State),
H2 = case Res of
{cont,_S,_N} -> H1;
- _ -> do_safe_fixtable(H1, From, false)
+ _ -> do_safe_fixtable(H1, pidof(From), false)
end,
- From ! {self(), Res},
+ resp(From, Res),
{N + 1, H2};
{match_delete_init, MP, Spec} when Head#head.update_mode =:= dirty ->
{H2, Res} = fmatch_delete_init(Head, MP, Spec, From),
- From ! {self(), Res},
+ resp(From, Res),
{N + 1, H2};
{safe_fixtable, Bool} ->
- NewHead = do_safe_fixtable(Head, From, Bool),
- From ! {self(), ok},
+ NewHead = do_safe_fixtable(Head, pidof(From), Bool),
+ resp(From, ok),
NewHead;
{slot, Slot} ->
{H2, Res} = fslot(Head, Slot),
- From ! {self(), Res},
+ resp(From, Res),
H2;
sync ->
{NewHead, Res} = perform_save(Head, true),
- From ! {self(), Res},
+ resp(From, Res),
erlang:garbage_collect(),
{0, NewHead};
{update_counter, Key, Incr} when Head#head.update_mode =:= dirty ->
{NewHead, Res} = do_update_counter(Head, Key, Incr),
- From ! {self(), Res},
+ resp(From, Res),
{N + 1, NewHead};
WriteOp when Head#head.update_mode =:= new_dirty ->
H2 = Head#head{update_mode = dirty},
@@ -1577,12 +1587,12 @@ apply_op(Op, From, Head, N) ->
H2 = Head#head{update_mode = dirty},
apply_op(WriteOp, From, H2, 0);
{NewHead, Error} when is_record(NewHead, head) ->
- From ! {self(), Error},
+ resp(From, Error),
NewHead
end;
WriteOp when is_tuple(WriteOp), Head#head.access =:= read ->
Reason = {access_mode, Head#head.filename},
- From ! {self(), err({error, Reason})},
+ resp(From, err({error, Reason})),
ok
end.
@@ -1603,7 +1613,7 @@ bug_found(Name, Op, Bad, Stacktrace, From) ->
end,
if
From =/= self() ->
- From ! {self(), {error, {dets_bug, Name, Op, Bad}}},
+ resp(From, {error, {dets_bug, Name, Op, Bad}}),
ok;
true -> % auto_save | may_grow | {delayed_write, _}
ok
@@ -1613,10 +1623,10 @@ do_internal_open(Parent, Server, From, Ref, Args) ->
?PROFILE(ep:do()),
case do_open_file(Args, Parent, Server, Ref) of
{ok, Head} ->
- From ! {self(), ok},
+ resp(From, ok),
Head;
Error ->
- From ! {self(), Error},
+ resp(From, Error),
exit(normal)
end.
@@ -1698,7 +1708,7 @@ stream_end1(Pids, Next, N, C, Head, PwriteList) ->
stream_end2(Pids, Pids, Next, N, C, Head1, PR).
stream_end2([Pid | Pids], Ps, Next, N, C, Head, Reply) ->
- Pid ! {self(), Reply},
+ resp(Pid, Reply),
stream_end2(Pids, Ps, Next, N+1, C, Head, Reply);
stream_end2([], Ps, no_more, N, C, Head, _Reply) ->
penalty(Head, Ps, C),
@@ -1710,7 +1720,7 @@ penalty(H, _Ps, _C) when H#head.fixed =:= false ->
ok;
penalty(_H, _Ps, [{{lookup,_Pids},_Keys}]) ->
ok;
-penalty(#head{fixed = {_,[{Pid,_}]}}, [Pid], _C) ->
+penalty(#head{fixed = {_,[{Pid, _}]}}, [{Pid, _Tag} = _From], _C) ->
ok;
penalty(_H, _Ps, _C) ->
timer:sleep(1).
@@ -1729,9 +1739,9 @@ lookup_replies(P, O, [{P2,O2} | L]) ->
%% If a list of Pid then op was {member, Key}. Inlined.
lookup_reply([P], O) ->
- P ! {self(), O =/= []};
+ resp(P, O =/= []);
lookup_reply(P, O) ->
- P ! {self(), O}.
+ resp(P, O).
%%-----------------------------------------------------------------
%% Callback functions for system messages handling.
@@ -2253,7 +2263,7 @@ fmatch(Head, MP, Spec, N, Safe, From) ->
{Head1, []} ->
NewHead =
case Safe of
- safe -> do_safe_fixtable(Head1, From, true);
+ safe -> do_safe_fixtable(Head1, pidof(From), true);
no_safe -> Head1
end,
C0 = init_scan(NewHead, N),
@@ -2370,7 +2380,7 @@ do_fmatch_delete_var_keys(Head, _MP, ?PATTERN_TO_TRUE_MATCH_SPEC('_'), _From)
Reply
end;
do_fmatch_delete_var_keys(Head, MP, _Spec, From) ->
- Head1 = do_safe_fixtable(Head, From, true),
+ Head1 = do_safe_fixtable(Head, pidof(From), true),
{NewHead, []} = write_cache(Head1),
C0 = init_scan(NewHead, default),
{NewHead, {cont, C0#dets_cont{match_program = MP}, 0}}.
diff --git a/lib/stdlib/test/dets_SUITE.erl b/lib/stdlib/test/dets_SUITE.erl
index c50a2d5baf..8fa877f5a5 100644
--- a/lib/stdlib/test/dets_SUITE.erl
+++ b/lib/stdlib/test/dets_SUITE.erl
@@ -46,7 +46,7 @@
otp_5487/1, otp_6206/1, otp_6359/1, otp_4738/1, otp_7146/1,
otp_8070/1, otp_8856/1, otp_8898/1, otp_8899/1, otp_8903/1,
otp_8923/1, otp_9282/1, otp_11245/1, otp_11709/1, otp_13229/1,
- otp_13260/1, otp_13830/1]).
+ otp_13260/1, otp_13830/1, receive_optimisation/1]).
-export([dets_dirty_loop/0]).
@@ -94,7 +94,7 @@ all() ->
insert_new, repair_continuation, otp_5487, otp_6206,
otp_6359, otp_4738, otp_7146, otp_8070, otp_8856, otp_8898,
otp_8899, otp_8903, otp_8923, otp_9282, otp_11245, otp_11709,
- otp_13229, otp_13260, otp_13830
+ otp_13229, otp_13260, otp_13830, receive_optimisation
].
groups() ->
@@ -3492,6 +3492,34 @@ otp_13830(Config) ->
{ok, Tab} = dets:open_file(Tab, [{file, File}, {version, default}]),
ok = dets:close(Tab).
+receive_optimisation(Config) ->
+ Tab = dets_receive_optimisation_test,
+ FName = filename(Tab, Config),
+
+ % Spam message box
+ lists:foreach(fun(_) -> self() ! {spam, it} end, lists:seq(1, 1_000_000)),
+
+ {ok, _} = dets:open_file(Tab,[{file, FName}]),
+ ok = dets:insert(Tab,{one, record}),
+
+ StartTime = os:system_time(millisecond),
+
+ % We expect one thousand of simple lookups to finish in one second
+ Lookups = 1000,
+ Timeout = 1000,
+ Loop = fun Loop(N) when N =< 0 -> ok;
+ Loop(N) ->
+ Now = os:system_time(millisecond),
+ (Now - StartTime > Timeout) andalso throw({timeout_after, Lookups - N}),
+ [{one, record}] = dets:lookup(Tab, one),
+ Loop(N-1)
+ end,
+
+ ok = Loop(Lookups),
+
+ ok = dets:close(Tab),
+ ok = file:delete(FName).
+
%%
%% Parts common to several test cases
%%
--
2.35.3