File 5271-mnesia-Improve-dirty-writes-consistency.patch of Package erlang

From 1731534c5e97731af691eb1228081b0ca3af3c28 Mon Sep 17 00:00:00 2001
From: Dan Gudmundsson <dgud@erlang.org>
Date: Tue, 24 Jan 2023 15:08:22 +0100
Subject: [PATCH 1/4] mnesia: Improve dirty writes consistency

Improve table copying timing, where dirty writes can cause consistency issues
during mnesia:add_table_copy/3 call.

I.e. don't set where_to_read to early.
Keep subscribers longer (sync with mnesia_tm for writes in the msg queue),
and let reciever wait until copier is done.
---
 lib/mnesia/src/mnesia_loader.erl            | 36 +++++----
 lib/mnesia/src/mnesia_schema.erl            |  1 +
 lib/mnesia/src/mnesia_tm.erl                | 88 ++++++++++++++-------
 lib/mnesia/test/mnesia_consistency_test.erl | 16 ++--
 lib/mnesia/test/mt.erl                      | 22 +++---
 5 files changed, 104 insertions(+), 59 deletions(-)

diff --git a/lib/mnesia/src/mnesia_loader.erl b/lib/mnesia/src/mnesia_loader.erl
index 4dea366964..ed5dba3a16 100644
--- a/lib/mnesia/src/mnesia_loader.erl
+++ b/lib/mnesia/src/mnesia_loader.erl
@@ -222,7 +222,6 @@ do_get_network_copy(Tab, Reason, Ns, Storage, Cs) ->
 		ok ->
 		    set({Tab, load_node}, Node),
 		    set({Tab, load_reason}, Reason),
-		    mnesia_controller:i_have_tab(Tab, Cs),
 		    dbg_out("Table ~tp copied from ~p to ~p~n", [Tab, Node, node()]),
 		    {loaded, ok};
 		Err = {error, _} when element(1, Reason) == dumper ->
@@ -291,7 +290,11 @@ init_receiver(Node, Tab,Storage,Cs,Reason) ->
 	    {atomic, {error,Result}} ->
 		fatal("Cannot create table ~tp: ~tp~n",
 		      [[Tab, Storage], Result]);
-	    {atomic,  Result} -> Result;
+            {atomic, ok} ->
+                mnesia_controller:i_have_tab(Tab, Cs),
+                ok;
+	    {atomic, Result} ->
+                Result;
 	    {aborted, nomore} -> restart;
 	    {aborted, _Reas} ->
 		verbose("Receiver failed on ~tp from ~p:~nReason: ~tp~n",
@@ -555,14 +558,17 @@ init_table(Tab, _, Fun, _DetsInfo,_) ->
 
 finish_copy(Storage,Tab,Cs,SenderPid,DatBin,OrigTabRec) ->
     TabRef = {Storage, Tab},
-    subscr_postprocess(TabRef, Cs#cstruct.record_name),
     case handle_last(TabRef, Cs#cstruct.type, DatBin) of
 	ok ->
-	    mnesia_index:init_index(Tab, Storage),
-	    snmpify(Tab, Storage),
+            subscr_postprocess(TabRef, Cs#cstruct.record_name),
 	    %% OrigTabRec must not be the spawned tab-receiver
 	    %% due to old protocol.
 	    SenderPid ! {OrigTabRec, no_more},
+            Ref = monitor(process, SenderPid),
+            %% and all remaining events
+            subscr_receiver(TabRef, Cs#cstruct.record_name, Ref),
+	    mnesia_index:init_index(Tab, Storage),
+	    snmpify(Tab, Storage),
 	    mnesia_tm:unblock_tab(Tab),
 	    ok;
 	{error, Reason} ->
@@ -582,22 +588,21 @@ subscr_postprocess(TabRef, RecName) ->
 		    handle_subscr_event(Event, TabRef, RecName)
 		end, ok, SubscrCache),
 	    ets:delete(SubscrCache)
-    end,
-    % and all remaining events
-    subscr_receiver(TabRef, RecName).
+    end.
 
-subscr_receiver(TabRef = {_, Tab}, RecName) ->
+subscr_receiver(TabRef = {_, Tab}, RecName, Ref) ->
     receive
 	{mnesia_table_event, {_Op, Val, _Tid}} = Event
 	  when element(1, Val) =:= Tab; element(1, Val) =:= schema ->
 	    handle_subscr_event(Event, TabRef, RecName),
-	    subscr_receiver(TabRef, RecName);
+	    subscr_receiver(TabRef, RecName, Ref);
 
 	{'EXIT', Pid, Reason} ->
 	    handle_exit(Pid, Reason),
-	    subscr_receiver(TabRef, RecName)
-    after 0 ->
-	    ok
+	    subscr_receiver(TabRef, RecName, Ref);
+
+        {'DOWN', Ref, process, _, _} ->
+            ok
     end.
 
 handle_subscr_event(Event, TabRef = {_, Tab}, RecName) ->
@@ -1014,12 +1019,15 @@ finish_copy(Pid, Tab, Storage, RemoteS, NeedLock) ->
                         mnesia_checkpoint:tm_add_copy(Tab, RecNode),
                         DatBin = dat2bin(Tab, ?catch_val({Tab, storage_type}), RemoteS),
                         Pid ! {self(), {no_more, DatBin}},
-                        cleanup_tab_copier(Pid, Storage, Tab),
                         receive
                             {Pid, no_more} -> % Dont bother about the spurious 'more' message
+                                %% Sync mnesia_tm (before unsubscribing)
+                                mnesia_tm:sync(),
+                                cleanup_tab_copier(Pid, Storage, Tab),
                                 no_more;
                             {copier_done, Node} ->
                                 verbose("Tab receiver ~tp crashed (more): ~p~n", [Tab, Node]),
+                                cleanup_tab_copier(Pid, Storage, Tab),
                                 receiver_died
                         end
                 end
diff --git a/lib/mnesia/src/mnesia_schema.erl b/lib/mnesia/src/mnesia_schema.erl
index 42c717425e..ea77e5ec6a 100644
--- a/lib/mnesia/src/mnesia_schema.erl
+++ b/lib/mnesia/src/mnesia_schema.erl
@@ -2472,6 +2472,7 @@ prepare_op(Tid, {op, add_table_copy, Storage, Node, TabDef}, _WaitFor) ->
 		{loaded, ok} ->
                     %% Tables are created by mnesia_loader get_network code
                     insert_cstruct(Tid, Cs, true),
+                    mnesia_controller:i_have_tab(Tab, Cs),
 		    {true, optional};
                 {not_loaded, {not_active, schema, Node}} ->
                     mnesia:abort({node_not_running, Node});
diff --git a/lib/mnesia/src/mnesia_tm.erl b/lib/mnesia/src/mnesia_tm.erl
index 847cd0074b..3a563db0be 100644
--- a/lib/mnesia/src/mnesia_tm.erl
+++ b/lib/mnesia/src/mnesia_tm.erl
@@ -42,6 +42,7 @@
 	 put_activity_id/2,
 	 block_tab/1,
 	 unblock_tab/1,
+         sync/0,
 	 fixtable/3,
 	 new_cr_format/1
 	]).
@@ -205,6 +206,17 @@ block_tab(Tab) ->
 unblock_tab(Tab) ->
     req({unblock_tab, Tab}).
 
+fixtable(Tab, Lock, Me) ->
+    case req({fixtable, [Tab,Lock,Me]}) of
+	error ->
+	    exit({no_exists, Tab});
+	Else ->
+	    Else
+    end.
+
+sync() ->
+    req(sync).
+
 doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=Sup}=State) ->
     receive
 	{_From, {async_dirty, Tid, Commit, Tab}} ->
@@ -250,25 +262,30 @@ doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=
 			    [{tid, Tid}, {prot, Protocol}]),
 	    mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),
 	    Commit = new_cr_format(Commit0),
-	    Pid =
-                if
-                    node(Tid#tid.pid) =:= node() ->
-                        error({internal_error, local_node});
-                    Protocol =:= asym_trans orelse Protocol =:= sync_asym_trans ->
-			Args = [Protocol, tmpid(From), Tid, Commit, DiscNs, RamNs],
-			spawn_link(?MODULE, commit_participant, Args);
-                    true -> %% *_sym_trans
-			reply(From, {vote_yes, Tid}),
-			nopid
-		end,
-	    P = #participant{tid = Tid,
-			     pid = Pid,
-			     commit = Commit,
-			     disc_nodes = DiscNs,
-			     ram_nodes = RamNs,
-			     protocol = Protocol},
-	    State2 = State#state{participants = gb_trees:insert(Tid,P,Participants)},
-	    doit_loop(State2);
+            case is_blocked(State#state.blocked_tabs, Commit) of
+                false ->
+                    Pid =
+                        if
+                            node(Tid#tid.pid) =:= node() ->
+                                error({internal_error, local_node});
+                            Protocol =:= asym_trans orelse Protocol =:= sync_asym_trans ->
+                                Args = [Protocol, tmpid(From), Tid, Commit, DiscNs, RamNs],
+                                spawn_link(?MODULE, commit_participant, Args);
+                            true -> %% *_sym_trans
+                                reply(From, {vote_yes, Tid}),
+                                nopid
+                        end,
+                    P = #participant{tid = Tid,
+                                     pid = Pid,
+                                     commit = Commit,
+                                     disc_nodes = DiscNs,
+                                     ram_nodes = RamNs,
+                                     protocol = Protocol},
+                    State2 = State#state{participants = gb_trees:insert(Tid,P,Participants)},
+                    doit_loop(State2);
+                true ->
+                    reply(From, {vote_no, Tid, {bad_commit, node()}}, State)
+            end;
 
 	{Tid, do_commit} ->
 	    case gb_trees:lookup(Tid, Participants) of
@@ -449,6 +466,9 @@ doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=
 		    reply(From, ok, State2)
 	    end;
 
+        {From, sync} ->
+            reply(From, ok, State);
+
 	{From, {prepare_checkpoint, Cp}} ->
 	    Res = mnesia_checkpoint:tm_prepare(Cp),
 	    case Res of
@@ -478,6 +498,28 @@ doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=
 	    doit_loop(State)
     end.
 
+is_blocked([], _Commit) ->
+    false;
+is_blocked([Tab|Tabs], #commit{ram_copies=RCs, disc_copies=DCs,
+                               disc_only_copies=DOs, ext=Exts} = Commit) ->
+    is_blocked_tab(RCs, Tab) orelse
+        is_blocked_tab(DCs, Tab) orelse
+        is_blocked_tab(DOs, Tab) orelse
+        is_blocked_ext_tab(Exts, Tab) orelse
+        is_blocked(Tabs, Commit).
+
+is_blocked_tab([{{Tab,_},_,_}|_Ops], Tab) -> true;
+is_blocked_tab([_|Ops], Tab) -> is_blocked_tab(Ops, Tab);
+is_blocked_tab([],_) -> false.
+
+is_blocked_ext_tab([], _Tab) ->
+    false;
+is_blocked_ext_tab(Exts, Tab) ->
+    case lists:keyfind(ext_copies, 1, Exts) of
+        false -> false;
+        {_, ExtOps} -> is_blocked_tab([Op || {_, Op} <- ExtOps], Tab)
+    end.
+
 do_sync_dirty(From, Tid, Commit, _Tab) ->
     ?eval_debug_fun({?MODULE, sync_dirty, pre}, [{tid, Tid}]),
     Res = do_dirty(Tid, Commit),
@@ -2326,14 +2368,6 @@ do_stop(#state{coordinators = Coordinators}) ->
     mnesia_log:stop(),
     exit(shutdown).
 
-fixtable(Tab, Lock, Me) ->
-    case req({fixtable, [Tab,Lock,Me]}) of
-	error ->
-	    exit({no_exists, Tab});
-	Else ->
-	    Else
-    end.
-
 %%%%%%%%%%%%%%%%%%%%%%%%%%%
 %% System upgrade
 
diff --git a/lib/mnesia/test/mnesia_consistency_test.erl b/lib/mnesia/test/mnesia_consistency_test.erl
index 2be17e69c2..fa729aa714 100644
--- a/lib/mnesia/test/mnesia_consistency_test.erl
+++ b/lib/mnesia/test/mnesia_consistency_test.erl
@@ -381,7 +381,7 @@ consistency_after_restart(ReplicaType, NodeConfig, Config) ->
     TpcbConfig = tpcb_config(ReplicaType, NodeConfig, Nodes, [Node1]),
     mnesia_tpcb:init(TpcbConfig),
     A ! fun () -> mnesia_tpcb:run(TpcbConfig) end,
-    timer:sleep(timer:seconds(10)),
+    timer:sleep(timer:seconds(3)),
     mnesia_test_lib:kill_mnesia([Node1]),
     %% Start and wait for tables to be loaded on all nodes
     timer:sleep(timer:seconds(3)),
@@ -408,7 +408,7 @@ consistency_after_dump_tables(ReplicaType, NodeConfig, Config) ->
     TpcbConfig = tpcb_config(ReplicaType, NodeConfig, Nodes, []),
     mnesia_tpcb:init(TpcbConfig),
     A ! fun() -> mnesia_tpcb:run(TpcbConfig) end,
-    timer:sleep(timer:seconds(10)),
+    timer:sleep(timer:seconds(3)),
     ?match({atomic, ok}, rpc:call(Node1, mnesia, dump_tables,
                         [[branch, teller, account, history]])),
     mnesia_tpcb:stop(),
@@ -459,7 +459,7 @@ consistency_after_add_replica(ReplicaType, NodeConfig, Config) ->
     TpcbConfig = tpcb_config(ReplicaType, NodeConfig, Nodes, []),
     mnesia_tpcb:init(TpcbConfig),
     A ! fun () -> mnesia_tpcb:run(TpcbConfig) end,
-    timer:sleep(timer:seconds(10)),
+    timer:sleep(timer:seconds(2)),
     ?match({atomic, ok}, mnesia:add_table_copy(account, AddNode, ReplicaType)),
     mnesia_tpcb:stop(),
     ?match(ok, mnesia_tpcb:verify_tabs()),
@@ -501,7 +501,7 @@ consistency_after_del_replica(ReplicaType, NodeConfig, Config) ->
     TpcbConfig = tpcb_config(ReplicaType, NodeConfig, Nodes, []),
     mnesia_tpcb:init(TpcbConfig),
     A ! fun () -> mnesia_tpcb:run(TpcbConfig) end,
-    timer:sleep(timer:seconds(10)),
+    timer:sleep(timer:seconds(3)),
     ?match({atomic, ok}, mnesia:del_table_copy(account, Node2)),
     mnesia_tpcb:stop(),
     ?match(ok, mnesia_tpcb:verify_tabs()),
@@ -543,7 +543,7 @@ consistency_after_move_replica(ReplicaType, NodeConfig, Config) ->
     TpcbConfig = tpcb_config(ReplicaType, NodeConfig, Nodes -- [Node2], []),
     mnesia_tpcb:init(TpcbConfig),
     A ! fun () -> mnesia_tpcb:run(TpcbConfig) end,
-    timer:sleep(timer:seconds(10)),
+    timer:sleep(timer:seconds(3)),
     ?match({atomic, ok}, mnesia:move_table_copy(account, Node1, Node2)),    
     ?log("First move completed from node ~p to ~p ~n", [Node1, Node2]),
     ?match({atomic, ok}, mnesia:move_table_copy(account, Node2, Node1)),
@@ -638,7 +638,7 @@ consistency_after_fallback_3_disc_only(Config) when is_list(Config) ->
 consistency_after_fallback(ReplicaType, NodeConfig, Config) ->
     put(mnesia_test_verbose, true),
     %%?verbose("Starting consistency_after_fallback2 at ~p~n", [self()]),
-    Delay = 5,
+    Delay = 3,
     Nodes = ?acquire_nodes(NodeConfig, [{tc_timeout, timer:minutes(10)} | Config]),
     Node1 = hd(Nodes),
     %%?verbose("Mnesia info: ~p~n", [mnesia:info()]),
@@ -867,7 +867,7 @@ updates_during_checkpoint_activation_3_disc_only(Config) when is_list(Config) ->
 
 updates_during_checkpoint_activation(ReplicaType,NodeConfig,Config) ->
     %%?verbose("updates_during_checkpoint_activation2 at ~p~n", [self()]),
-    Delay = 5,
+    Delay = 2,
     Nodes = ?acquire_nodes(NodeConfig, Config),
     Node1 = hd(Nodes),
     %%?verbose("Mnesia info: ~p~n", [mnesia:info()]),
@@ -922,7 +922,7 @@ updates_during_checkpoint_iteration_2_disc_only(Config) when is_list(Config) ->
 
 updates_during_checkpoint_iteration(ReplicaType,NodeConfig,Config) ->
    %?verbose("updates_during_checkpoint_iteration2 at ~p~n", [self()]),
-    Delay = 5,
+    Delay = 2,
     Nodes = ?acquire_nodes(NodeConfig, Config),
     Node1 = hd(Nodes),
    %?verbose("Mnesia info: ~p~n", [mnesia:info()]),
diff --git a/lib/mnesia/test/mt.erl b/lib/mnesia/test/mt.erl
index c1859bef3f..0bb6aadc4e 100644
--- a/lib/mnesia/test/mt.erl
+++ b/lib/mnesia/test/mt.erl
@@ -245,30 +245,32 @@ start_nodes() ->
 %% loop one testcase /suite until it fails
 
 loop(Case) ->
-    loop_1(Case,-1,read_config()).
+    loop_1(Case,1,infinity,read_config()).
 
 loop(M,F) when is_atom(F) ->
-    loop_1({M,F},-1,read_config());
+    loop_1({M,F},1, infinity, read_config());
 loop(Case,N) when is_integer(N) ->
-    loop_1(Case, N,read_config()).
+    loop_1(Case, 1, N,read_config()).
 
 loop(M,F,N) when is_integer(N) ->
-    loop_1({M,F},N,read_config()).
+    loop_1({M,F},1, N,read_config()).
+
+loop_1(Case,N,Max,Config) when N < Max ->
+    io:format("~nLoop test ~p ~n", [abs(N)]),
 
-loop_1(Case,N,Config) when N /= 0 ->
-    io:format("Loop test ~p ~n", [abs(N)]),
     case ok_result(Res = t(Case,Config)) of
 	true ->
-	    loop_1(Case,N-1,Config);
+	    loop_1(Case,N+1,Max,Config);
 	error ->
+            io:format("Failed after ~p~n", [N]),
 	    Res
     end;
-loop_1(_,_,_) ->
+loop_1(_,_,_,_) ->
     ok.
-	    
+
 ok_result([{_T,{ok,_,_}}|R]) -> 
     ok_result(R);
-ok_result([{_T,{TC,List}}|R]) when is_tuple(TC), is_list(List) -> 
+ok_result([{_T,{TC,List}}|R]) when is_tuple(TC), is_list(List) ->
     ok_result(List) andalso ok_result(R);
 ok_result([]) -> true;
 ok_result(_) -> error.
-- 
2.35.3

openSUSE Build Service is sponsored by