File 5961-Introduce-receive-optimisation-to-dets-calls.patch of Package erlang

From c1bf961a6e42b71317271d46cdd34985ff25b81f Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Sebastian=20Smyczy=C5=84ski?= <s.smyczynski@simplito.com>
Date: Thu, 2 Jun 2022 19:04:55 +0200
Subject: [PATCH] Introduce receive optimisation to dets calls

---
 lib/stdlib/src/dets.erl        | 98 +++++++++++++++++++---------------
 lib/stdlib/test/dets_SUITE.erl | 32 ++++++++++-
 2 files changed, 84 insertions(+), 46 deletions(-)

diff --git a/lib/stdlib/src/dets.erl b/lib/stdlib/src/dets.erl
index 640ad7a81c..133d209ae1 100644
--- a/lib/stdlib/src/dets.erl
+++ b/lib/stdlib/src/dets.erl
@@ -93,7 +93,8 @@
               tab_name/0]).
 
 -compile({inline, [{einval,2},{badarg,2},{undefined,1},
-                   {badarg_exit,2},{lookup_reply,2}]}).
+                   {badarg_exit,2},{lookup_reply,2},
+                   {pidof,1},{resp,2}]}).
 
 -include_lib("kernel/include/file.hrl").
 
@@ -1237,15 +1238,24 @@ treq(Tab, R) ->
 
 req(Proc, R) ->
     Ref = erlang:monitor(process, Proc),
-    Proc ! ?DETS_CALL(self(), R),
+    Proc ! ?DETS_CALL({self(), Ref}, R),
     receive 
 	{'DOWN', Ref, process, Proc, _Info} ->
             badarg;
-	{Proc, Reply} ->
+	{Ref, Reply} ->
 	    erlang:demonitor(Ref, [flush]),
 	    Reply
     end.
 
+%% Inlined.
+pidof({Pid, _Tag}) ->
+    Pid.
+
+%% Inlined.
+resp({Pid, Tag} = _From, Message) ->
+    Pid ! {Tag, Message},
+    ok.
+
 %% Inlined.
 einval({error, {file_error, _, einval}}, A) ->
     erlang:error(badarg, A);
@@ -1398,7 +1408,7 @@ apply_op(Op, From, Head, N) ->
 		      true ->
 			  err({error, incompatible_arguments})
 		  end,
-	    From ! {self(), Res},
+	    resp(From, Res),
 	    ok;
 	auto_save ->
 	    case Head#head.update_mode of
@@ -1419,7 +1429,7 @@ apply_op(Op, From, Head, N) ->
 		    {0, Head}
 	    end;
 	close  ->
-	    From ! {self(), fclose(Head)},
+	    resp(From, fclose(Head)),
 	    _NewHead = unlink_fixing_procs(Head),
 	    ?PROFILE(ep:done()),
 	    exit(normal);
@@ -1427,25 +1437,25 @@ apply_op(Op, From, Head, N) ->
 	    %% Used from dets_server when Pid has closed the table,
 	    %% but the table is still opened by some process.
 	    NewHead = remove_fix(Head, Pid, close),
-	    From ! {self(), status(NewHead)},
+	    resp(From, status(NewHead)),
 	    NewHead;
 	{corrupt, Reason} ->
 	    {H2, Error} = dets_utils:corrupt_reason(Head, Reason),
-	    From ! {self(), Error},
+	    resp(From, Error),
 	    H2;
 	{delayed_write, WrTime} ->
 	    delayed_write(Head, WrTime);
 	info ->
 	    {H2, Res} = finfo(Head),
-	    From ! {self(), Res},
+	    resp(From, Res),
 	    H2;
 	{info, Tag} ->
 	    {H2, Res} = finfo(Head, Tag),
-	    From ! {self(), Res},
+	    resp(From, Res),
 	    H2;
         {is_compatible_bchunk_format, Term} ->
             Res = test_bchunk_format(Head, Term),
-            From ! {self(), Res},
+            resp(From, Res),
             ok;
 	{internal_open, Ref, Args} ->
             do_internal_open(Head#head.parent, Head#head.server, From,
@@ -1462,27 +1472,27 @@ apply_op(Op, From, Head, N) ->
 	    end;
 	{set_verbose, What} ->
 	    set_verbose(What), 
-	    From ! {self(), ok},
+	    resp(From, ok),
 	    ok;
 	{where, Object} ->
 	    {H2, Res} = where_is_object(Head, Object),
-	    From ! {self(), Res},
+	    resp(From, Res),
 	    H2;
 	_Message when element(1, Head#head.update_mode) =:= error ->
-	    From ! {self(), status(Head)},
+	    resp(From, status(Head)),
 	    ok;
 	%% The following messages assume that the status of the table is OK.
 	{bchunk_init, Tab} ->
 	    {H2, Res} = do_bchunk_init(Head, Tab),
-	    From ! {self(), Res},
+	    resp(From, Res),
 	    H2;
 	{bchunk, State} ->
 	    {H2, Res} = do_bchunk(Head, State),
-	    From ! {self(), Res},
+	    resp(From, Res),
 	    H2;
 	delete_all_objects ->
 	    {H2, Res} = fdelete_all_objects(Head),
-	    From ! {self(), Res},
+	    resp(From, Res),
 	    erlang:garbage_collect(),
 	    {0, H2};
 	{delete_key, _Keys} when Head#head.update_mode =:= dirty ->
@@ -1492,16 +1502,16 @@ apply_op(Op, From, Head, N) ->
 		true ->
 		    stream_op(Op, From, [], Head, N);
 		false ->
-		    From ! {self(), badarg},
+		    resp(From, badarg),
 		    ok
 	    end;
 	first ->
 	    {H2, Res} = ffirst(Head),
-	    From ! {self(), Res},
+	    resp(From, Res),
 	    H2;
         {initialize, InitFun, Format, MinNoSlots} ->
             {H2, Res} = finit(Head, InitFun, Format, MinNoSlots),
-            From ! {self(), Res},
+            resp(From, Res),
 	    erlang:garbage_collect(),
             H2;
 	{insert, Objs} when Head#head.update_mode =:= dirty ->
@@ -1509,12 +1519,12 @@ apply_op(Op, From, Head, N) ->
 		true ->
 		    stream_op(Op, From, [], Head, N);
 		false ->
-		    From ! {self(), badarg},
+		    resp(From, badarg),
 		    ok
 	    end;
 	{insert_new, Objs} when Head#head.update_mode =:= dirty ->
             {H2, Res} = finsert_new(Head, Objs),
-            From ! {self(), Res},
+            resp(From, Res),
             {N + 1, H2};
 	{lookup_keys, _Keys} ->
 	    stream_op(Op, From, [], Head, N);
@@ -1523,48 +1533,48 @@ apply_op(Op, From, Head, N) ->
             H2 = case Res of
                      {cont,_} -> H1;
                      _ when Safe =:= no_safe-> H1;
-                     _ when Safe =:= safe -> do_safe_fixtable(H1, From, false)
+                     _ when Safe =:= safe -> do_safe_fixtable(H1, pidof(From), false)
                  end,
-	    From ! {self(), Res},
+	    resp(From, Res),
 	    H2;
 	{match, MP, Spec, NObjs, Safe} ->
 	    {H2, Res} = fmatch(Head, MP, Spec, NObjs, Safe, From),
-	    From ! {self(), Res},
+	    resp(From, Res),
 	    H2;
 	{member, _Key} = Op ->
 	    stream_op(Op, From, [], Head, N);
 	{next, Key} ->
 	    {H2, Res} = fnext(Head, Key),
-	    From ! {self(), Res},
+	    resp(From, Res),
 	    H2;
 	{match_delete, State} when Head#head.update_mode =:= dirty ->
 	    {H1, Res} = fmatch_delete(Head, State),
             H2 = case Res of
                      {cont,_S,_N} -> H1;
-                     _ -> do_safe_fixtable(H1, From, false)
+                     _ -> do_safe_fixtable(H1, pidof(From), false)
                  end,
-	    From ! {self(), Res},
+	    resp(From, Res),
 	    {N + 1, H2};
 	{match_delete_init, MP, Spec} when Head#head.update_mode =:= dirty ->
 	    {H2, Res} = fmatch_delete_init(Head, MP, Spec, From),
-	    From ! {self(), Res},
+	    resp(From, Res),
 	    {N + 1, H2};
 	{safe_fixtable, Bool} ->
-	    NewHead = do_safe_fixtable(Head, From, Bool),
-	    From ! {self(), ok},
+	    NewHead = do_safe_fixtable(Head, pidof(From), Bool),
+	    resp(From, ok),
 	    NewHead;
 	{slot, Slot} ->
 	    {H2, Res} = fslot(Head, Slot),
-	    From ! {self(), Res},
+	    resp(From, Res),
 	    H2;
 	sync ->
 	    {NewHead, Res} = perform_save(Head, true),
-	    From ! {self(), Res},
+	    resp(From, Res),
 	    erlang:garbage_collect(),
 	    {0, NewHead};
 	{update_counter, Key, Incr} when Head#head.update_mode =:= dirty ->
 	    {NewHead, Res} = do_update_counter(Head, Key, Incr),
-	    From ! {self(), Res},
+	    resp(From, Res),
 	    {N + 1, NewHead};
 	WriteOp when Head#head.update_mode =:= new_dirty ->
 	    H2 = Head#head{update_mode = dirty},
@@ -1577,12 +1587,12 @@ apply_op(Op, From, Head, N) ->
 		    H2 = Head#head{update_mode = dirty},
 		    apply_op(WriteOp, From, H2, 0);
 		{NewHead, Error} when is_record(NewHead, head) ->
-		    From ! {self(), Error},
+		    resp(From, Error),
 		    NewHead
 	    end;
 	WriteOp when is_tuple(WriteOp), Head#head.access =:= read ->
 	    Reason = {access_mode, Head#head.filename},
-	    From ! {self(), err({error, Reason})},
+	    resp(From, err({error, Reason})),
 	    ok
     end.
 
@@ -1603,7 +1613,7 @@ bug_found(Name, Op, Bad, Stacktrace, From) ->
     end,
     if
         From =/= self() ->
-            From ! {self(), {error, {dets_bug, Name, Op, Bad}}},
+            resp(From, {error, {dets_bug, Name, Op, Bad}}),
             ok;
         true -> % auto_save | may_grow | {delayed_write, _}
             ok
@@ -1613,10 +1623,10 @@ do_internal_open(Parent, Server, From, Ref, Args) ->
     ?PROFILE(ep:do()),
     case do_open_file(Args, Parent, Server, Ref) of
         {ok, Head} ->
-            From ! {self(), ok},
+            resp(From, ok),
             Head;
         Error ->
-            From ! {self(), Error},
+            resp(From, Error),
             exit(normal)
     end.
 
@@ -1698,7 +1708,7 @@ stream_end1(Pids, Next, N, C, Head, PwriteList) ->
     stream_end2(Pids, Pids, Next, N, C, Head1, PR).
 
 stream_end2([Pid | Pids], Ps, Next, N, C, Head, Reply) ->
-    Pid ! {self(), Reply},
+    resp(Pid, Reply),
     stream_end2(Pids, Ps, Next, N+1, C, Head, Reply);
 stream_end2([], Ps, no_more, N, C, Head, _Reply) ->
     penalty(Head, Ps, C),
@@ -1710,7 +1720,7 @@ penalty(H, _Ps, _C) when H#head.fixed =:= false ->
     ok;
 penalty(_H, _Ps, [{{lookup,_Pids},_Keys}]) ->
     ok;
-penalty(#head{fixed = {_,[{Pid,_}]}}, [Pid], _C) ->
+penalty(#head{fixed = {_,[{Pid, _}]}}, [{Pid, _Tag} = _From], _C) ->
     ok;
 penalty(_H, _Ps, _C) ->
     timer:sleep(1).
@@ -1729,9 +1739,9 @@ lookup_replies(P, O, [{P2,O2} | L]) ->
 
 %% If a list of Pid then op was {member, Key}. Inlined.
 lookup_reply([P], O) ->
-    P ! {self(), O =/= []};
+    resp(P, O =/= []);
 lookup_reply(P, O) ->
-    P ! {self(), O}.
+    resp(P, O).
 
 %%-----------------------------------------------------------------
 %% Callback functions for system messages handling.
@@ -2253,7 +2263,7 @@ fmatch(Head, MP, Spec, N, Safe, From) ->
 		{Head1, []} ->
                     NewHead =
                         case Safe of
-                            safe -> do_safe_fixtable(Head1, From, true);
+                            safe -> do_safe_fixtable(Head1, pidof(From), true);
                             no_safe -> Head1
                         end,
 		    C0 = init_scan(NewHead, N),
@@ -2370,7 +2380,7 @@ do_fmatch_delete_var_keys(Head, _MP, ?PATTERN_TO_TRUE_MATCH_SPEC('_'), _From)
 	    Reply
     end;
 do_fmatch_delete_var_keys(Head, MP, _Spec, From) ->
-    Head1 = do_safe_fixtable(Head, From, true),
+    Head1 = do_safe_fixtable(Head, pidof(From), true),
     {NewHead, []} = write_cache(Head1),
     C0 = init_scan(NewHead, default),
     {NewHead, {cont, C0#dets_cont{match_program = MP}, 0}}.
diff --git a/lib/stdlib/test/dets_SUITE.erl b/lib/stdlib/test/dets_SUITE.erl
index c50a2d5baf..8fa877f5a5 100644
--- a/lib/stdlib/test/dets_SUITE.erl
+++ b/lib/stdlib/test/dets_SUITE.erl
@@ -46,7 +46,7 @@
          otp_5487/1, otp_6206/1, otp_6359/1, otp_4738/1, otp_7146/1,
          otp_8070/1, otp_8856/1, otp_8898/1, otp_8899/1, otp_8903/1,
          otp_8923/1, otp_9282/1, otp_11245/1, otp_11709/1, otp_13229/1,
-         otp_13260/1, otp_13830/1]).
+         otp_13260/1, otp_13830/1, receive_optimisation/1]).
 
 -export([dets_dirty_loop/0]).
 
@@ -94,7 +94,7 @@ all() ->
 	insert_new, repair_continuation, otp_5487, otp_6206,
 	otp_6359, otp_4738, otp_7146, otp_8070, otp_8856, otp_8898,
 	otp_8899, otp_8903, otp_8923, otp_9282, otp_11245, otp_11709,
-        otp_13229, otp_13260, otp_13830
+        otp_13229, otp_13260, otp_13830, receive_optimisation
     ].
 
 groups() -> 
@@ -3492,6 +3492,34 @@ otp_13830(Config) ->
     {ok, Tab} = dets:open_file(Tab, [{file, File}, {version, default}]),
     ok = dets:close(Tab).
 
+receive_optimisation(Config) ->
+    Tab = dets_receive_optimisation_test,
+    FName = filename(Tab, Config),
+
+    % Spam message box
+    lists:foreach(fun(_) -> self() ! {spam, it} end, lists:seq(1, 1_000_000)),
+
+    {ok, _} = dets:open_file(Tab,[{file, FName}]),
+    ok = dets:insert(Tab,{one, record}),
+
+    StartTime = os:system_time(millisecond),
+
+    % We expect one thousand of simple lookups to finish in one second
+    Lookups = 1000,
+    Timeout = 1000,
+    Loop =  fun Loop(N) when N =< 0 -> ok;
+		Loop(N) ->
+		    Now = os:system_time(millisecond),
+		    (Now - StartTime > Timeout) andalso throw({timeout_after, Lookups - N}),
+		    [{one, record}] = dets:lookup(Tab, one),
+		    Loop(N-1)
+	    end,
+
+    ok = Loop(Lookups),
+
+    ok = dets:close(Tab),
+    ok = file:delete(FName).
+
 %%
 %% Parts common to several test cases
 %% 
-- 
2.35.3

openSUSE Build Service is sponsored by