File 5271-mnesia-Improve-dirty-writes-consistency.patch of Package erlang
From 1731534c5e97731af691eb1228081b0ca3af3c28 Mon Sep 17 00:00:00 2001
From: Dan Gudmundsson <dgud@erlang.org>
Date: Tue, 24 Jan 2023 15:08:22 +0100
Subject: [PATCH 1/4] mnesia: Improve dirty writes consistency
Improve table copying timing, where dirty writes can cause consistency issues
during mnesia:add_table_copy/3 call.
I.e. don't set where_to_read to early.
Keep subscribers longer (sync with mnesia_tm for writes in the msg queue),
and let reciever wait until copier is done.
---
lib/mnesia/src/mnesia_loader.erl | 36 +++++----
lib/mnesia/src/mnesia_schema.erl | 1 +
lib/mnesia/src/mnesia_tm.erl | 88 ++++++++++++++-------
lib/mnesia/test/mnesia_consistency_test.erl | 16 ++--
lib/mnesia/test/mt.erl | 22 +++---
5 files changed, 104 insertions(+), 59 deletions(-)
diff --git a/lib/mnesia/src/mnesia_loader.erl b/lib/mnesia/src/mnesia_loader.erl
index 4dea366964..ed5dba3a16 100644
--- a/lib/mnesia/src/mnesia_loader.erl
+++ b/lib/mnesia/src/mnesia_loader.erl
@@ -222,7 +222,6 @@ do_get_network_copy(Tab, Reason, Ns, Storage, Cs) ->
ok ->
set({Tab, load_node}, Node),
set({Tab, load_reason}, Reason),
- mnesia_controller:i_have_tab(Tab, Cs),
dbg_out("Table ~tp copied from ~p to ~p~n", [Tab, Node, node()]),
{loaded, ok};
Err = {error, _} when element(1, Reason) == dumper ->
@@ -291,7 +290,11 @@ init_receiver(Node, Tab,Storage,Cs,Reason) ->
{atomic, {error,Result}} ->
fatal("Cannot create table ~tp: ~tp~n",
[[Tab, Storage], Result]);
- {atomic, Result} -> Result;
+ {atomic, ok} ->
+ mnesia_controller:i_have_tab(Tab, Cs),
+ ok;
+ {atomic, Result} ->
+ Result;
{aborted, nomore} -> restart;
{aborted, _Reas} ->
verbose("Receiver failed on ~tp from ~p:~nReason: ~tp~n",
@@ -555,14 +558,17 @@ init_table(Tab, _, Fun, _DetsInfo,_) ->
finish_copy(Storage,Tab,Cs,SenderPid,DatBin,OrigTabRec) ->
TabRef = {Storage, Tab},
- subscr_postprocess(TabRef, Cs#cstruct.record_name),
case handle_last(TabRef, Cs#cstruct.type, DatBin) of
ok ->
- mnesia_index:init_index(Tab, Storage),
- snmpify(Tab, Storage),
+ subscr_postprocess(TabRef, Cs#cstruct.record_name),
%% OrigTabRec must not be the spawned tab-receiver
%% due to old protocol.
SenderPid ! {OrigTabRec, no_more},
+ Ref = monitor(process, SenderPid),
+ %% and all remaining events
+ subscr_receiver(TabRef, Cs#cstruct.record_name, Ref),
+ mnesia_index:init_index(Tab, Storage),
+ snmpify(Tab, Storage),
mnesia_tm:unblock_tab(Tab),
ok;
{error, Reason} ->
@@ -582,22 +588,21 @@ subscr_postprocess(TabRef, RecName) ->
handle_subscr_event(Event, TabRef, RecName)
end, ok, SubscrCache),
ets:delete(SubscrCache)
- end,
- % and all remaining events
- subscr_receiver(TabRef, RecName).
+ end.
-subscr_receiver(TabRef = {_, Tab}, RecName) ->
+subscr_receiver(TabRef = {_, Tab}, RecName, Ref) ->
receive
{mnesia_table_event, {_Op, Val, _Tid}} = Event
when element(1, Val) =:= Tab; element(1, Val) =:= schema ->
handle_subscr_event(Event, TabRef, RecName),
- subscr_receiver(TabRef, RecName);
+ subscr_receiver(TabRef, RecName, Ref);
{'EXIT', Pid, Reason} ->
handle_exit(Pid, Reason),
- subscr_receiver(TabRef, RecName)
- after 0 ->
- ok
+ subscr_receiver(TabRef, RecName, Ref);
+
+ {'DOWN', Ref, process, _, _} ->
+ ok
end.
handle_subscr_event(Event, TabRef = {_, Tab}, RecName) ->
@@ -1014,12 +1019,15 @@ finish_copy(Pid, Tab, Storage, RemoteS, NeedLock) ->
mnesia_checkpoint:tm_add_copy(Tab, RecNode),
DatBin = dat2bin(Tab, ?catch_val({Tab, storage_type}), RemoteS),
Pid ! {self(), {no_more, DatBin}},
- cleanup_tab_copier(Pid, Storage, Tab),
receive
{Pid, no_more} -> % Dont bother about the spurious 'more' message
+ %% Sync mnesia_tm (before unsubscribing)
+ mnesia_tm:sync(),
+ cleanup_tab_copier(Pid, Storage, Tab),
no_more;
{copier_done, Node} ->
verbose("Tab receiver ~tp crashed (more): ~p~n", [Tab, Node]),
+ cleanup_tab_copier(Pid, Storage, Tab),
receiver_died
end
end
diff --git a/lib/mnesia/src/mnesia_schema.erl b/lib/mnesia/src/mnesia_schema.erl
index 42c717425e..ea77e5ec6a 100644
--- a/lib/mnesia/src/mnesia_schema.erl
+++ b/lib/mnesia/src/mnesia_schema.erl
@@ -2472,6 +2472,7 @@ prepare_op(Tid, {op, add_table_copy, Storage, Node, TabDef}, _WaitFor) ->
{loaded, ok} ->
%% Tables are created by mnesia_loader get_network code
insert_cstruct(Tid, Cs, true),
+ mnesia_controller:i_have_tab(Tab, Cs),
{true, optional};
{not_loaded, {not_active, schema, Node}} ->
mnesia:abort({node_not_running, Node});
diff --git a/lib/mnesia/src/mnesia_tm.erl b/lib/mnesia/src/mnesia_tm.erl
index 847cd0074b..3a563db0be 100644
--- a/lib/mnesia/src/mnesia_tm.erl
+++ b/lib/mnesia/src/mnesia_tm.erl
@@ -42,6 +42,7 @@
put_activity_id/2,
block_tab/1,
unblock_tab/1,
+ sync/0,
fixtable/3,
new_cr_format/1
]).
@@ -205,6 +206,17 @@ block_tab(Tab) ->
unblock_tab(Tab) ->
req({unblock_tab, Tab}).
+fixtable(Tab, Lock, Me) ->
+ case req({fixtable, [Tab,Lock,Me]}) of
+ error ->
+ exit({no_exists, Tab});
+ Else ->
+ Else
+ end.
+
+sync() ->
+ req(sync).
+
doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=Sup}=State) ->
receive
{_From, {async_dirty, Tid, Commit, Tab}} ->
@@ -250,25 +262,30 @@ doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=
[{tid, Tid}, {prot, Protocol}]),
mnesia_checkpoint:tm_enter_pending(Tid, DiscNs, RamNs),
Commit = new_cr_format(Commit0),
- Pid =
- if
- node(Tid#tid.pid) =:= node() ->
- error({internal_error, local_node});
- Protocol =:= asym_trans orelse Protocol =:= sync_asym_trans ->
- Args = [Protocol, tmpid(From), Tid, Commit, DiscNs, RamNs],
- spawn_link(?MODULE, commit_participant, Args);
- true -> %% *_sym_trans
- reply(From, {vote_yes, Tid}),
- nopid
- end,
- P = #participant{tid = Tid,
- pid = Pid,
- commit = Commit,
- disc_nodes = DiscNs,
- ram_nodes = RamNs,
- protocol = Protocol},
- State2 = State#state{participants = gb_trees:insert(Tid,P,Participants)},
- doit_loop(State2);
+ case is_blocked(State#state.blocked_tabs, Commit) of
+ false ->
+ Pid =
+ if
+ node(Tid#tid.pid) =:= node() ->
+ error({internal_error, local_node});
+ Protocol =:= asym_trans orelse Protocol =:= sync_asym_trans ->
+ Args = [Protocol, tmpid(From), Tid, Commit, DiscNs, RamNs],
+ spawn_link(?MODULE, commit_participant, Args);
+ true -> %% *_sym_trans
+ reply(From, {vote_yes, Tid}),
+ nopid
+ end,
+ P = #participant{tid = Tid,
+ pid = Pid,
+ commit = Commit,
+ disc_nodes = DiscNs,
+ ram_nodes = RamNs,
+ protocol = Protocol},
+ State2 = State#state{participants = gb_trees:insert(Tid,P,Participants)},
+ doit_loop(State2);
+ true ->
+ reply(From, {vote_no, Tid, {bad_commit, node()}}, State)
+ end;
{Tid, do_commit} ->
case gb_trees:lookup(Tid, Participants) of
@@ -449,6 +466,9 @@ doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=
reply(From, ok, State2)
end;
+ {From, sync} ->
+ reply(From, ok, State);
+
{From, {prepare_checkpoint, Cp}} ->
Res = mnesia_checkpoint:tm_prepare(Cp),
case Res of
@@ -478,6 +498,28 @@ doit_loop(#state{coordinators=Coordinators,participants=Participants,supervisor=
doit_loop(State)
end.
+is_blocked([], _Commit) ->
+ false;
+is_blocked([Tab|Tabs], #commit{ram_copies=RCs, disc_copies=DCs,
+ disc_only_copies=DOs, ext=Exts} = Commit) ->
+ is_blocked_tab(RCs, Tab) orelse
+ is_blocked_tab(DCs, Tab) orelse
+ is_blocked_tab(DOs, Tab) orelse
+ is_blocked_ext_tab(Exts, Tab) orelse
+ is_blocked(Tabs, Commit).
+
+is_blocked_tab([{{Tab,_},_,_}|_Ops], Tab) -> true;
+is_blocked_tab([_|Ops], Tab) -> is_blocked_tab(Ops, Tab);
+is_blocked_tab([],_) -> false.
+
+is_blocked_ext_tab([], _Tab) ->
+ false;
+is_blocked_ext_tab(Exts, Tab) ->
+ case lists:keyfind(ext_copies, 1, Exts) of
+ false -> false;
+ {_, ExtOps} -> is_blocked_tab([Op || {_, Op} <- ExtOps], Tab)
+ end.
+
do_sync_dirty(From, Tid, Commit, _Tab) ->
?eval_debug_fun({?MODULE, sync_dirty, pre}, [{tid, Tid}]),
Res = do_dirty(Tid, Commit),
@@ -2326,14 +2368,6 @@ do_stop(#state{coordinators = Coordinators}) ->
mnesia_log:stop(),
exit(shutdown).
-fixtable(Tab, Lock, Me) ->
- case req({fixtable, [Tab,Lock,Me]}) of
- error ->
- exit({no_exists, Tab});
- Else ->
- Else
- end.
-
%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% System upgrade
diff --git a/lib/mnesia/test/mnesia_consistency_test.erl b/lib/mnesia/test/mnesia_consistency_test.erl
index 2be17e69c2..fa729aa714 100644
--- a/lib/mnesia/test/mnesia_consistency_test.erl
+++ b/lib/mnesia/test/mnesia_consistency_test.erl
@@ -381,7 +381,7 @@ consistency_after_restart(ReplicaType, NodeConfig, Config) ->
TpcbConfig = tpcb_config(ReplicaType, NodeConfig, Nodes, [Node1]),
mnesia_tpcb:init(TpcbConfig),
A ! fun () -> mnesia_tpcb:run(TpcbConfig) end,
- timer:sleep(timer:seconds(10)),
+ timer:sleep(timer:seconds(3)),
mnesia_test_lib:kill_mnesia([Node1]),
%% Start and wait for tables to be loaded on all nodes
timer:sleep(timer:seconds(3)),
@@ -408,7 +408,7 @@ consistency_after_dump_tables(ReplicaType, NodeConfig, Config) ->
TpcbConfig = tpcb_config(ReplicaType, NodeConfig, Nodes, []),
mnesia_tpcb:init(TpcbConfig),
A ! fun() -> mnesia_tpcb:run(TpcbConfig) end,
- timer:sleep(timer:seconds(10)),
+ timer:sleep(timer:seconds(3)),
?match({atomic, ok}, rpc:call(Node1, mnesia, dump_tables,
[[branch, teller, account, history]])),
mnesia_tpcb:stop(),
@@ -459,7 +459,7 @@ consistency_after_add_replica(ReplicaType, NodeConfig, Config) ->
TpcbConfig = tpcb_config(ReplicaType, NodeConfig, Nodes, []),
mnesia_tpcb:init(TpcbConfig),
A ! fun () -> mnesia_tpcb:run(TpcbConfig) end,
- timer:sleep(timer:seconds(10)),
+ timer:sleep(timer:seconds(2)),
?match({atomic, ok}, mnesia:add_table_copy(account, AddNode, ReplicaType)),
mnesia_tpcb:stop(),
?match(ok, mnesia_tpcb:verify_tabs()),
@@ -501,7 +501,7 @@ consistency_after_del_replica(ReplicaType, NodeConfig, Config) ->
TpcbConfig = tpcb_config(ReplicaType, NodeConfig, Nodes, []),
mnesia_tpcb:init(TpcbConfig),
A ! fun () -> mnesia_tpcb:run(TpcbConfig) end,
- timer:sleep(timer:seconds(10)),
+ timer:sleep(timer:seconds(3)),
?match({atomic, ok}, mnesia:del_table_copy(account, Node2)),
mnesia_tpcb:stop(),
?match(ok, mnesia_tpcb:verify_tabs()),
@@ -543,7 +543,7 @@ consistency_after_move_replica(ReplicaType, NodeConfig, Config) ->
TpcbConfig = tpcb_config(ReplicaType, NodeConfig, Nodes -- [Node2], []),
mnesia_tpcb:init(TpcbConfig),
A ! fun () -> mnesia_tpcb:run(TpcbConfig) end,
- timer:sleep(timer:seconds(10)),
+ timer:sleep(timer:seconds(3)),
?match({atomic, ok}, mnesia:move_table_copy(account, Node1, Node2)),
?log("First move completed from node ~p to ~p ~n", [Node1, Node2]),
?match({atomic, ok}, mnesia:move_table_copy(account, Node2, Node1)),
@@ -638,7 +638,7 @@ consistency_after_fallback_3_disc_only(Config) when is_list(Config) ->
consistency_after_fallback(ReplicaType, NodeConfig, Config) ->
put(mnesia_test_verbose, true),
%%?verbose("Starting consistency_after_fallback2 at ~p~n", [self()]),
- Delay = 5,
+ Delay = 3,
Nodes = ?acquire_nodes(NodeConfig, [{tc_timeout, timer:minutes(10)} | Config]),
Node1 = hd(Nodes),
%%?verbose("Mnesia info: ~p~n", [mnesia:info()]),
@@ -867,7 +867,7 @@ updates_during_checkpoint_activation_3_disc_only(Config) when is_list(Config) ->
updates_during_checkpoint_activation(ReplicaType,NodeConfig,Config) ->
%%?verbose("updates_during_checkpoint_activation2 at ~p~n", [self()]),
- Delay = 5,
+ Delay = 2,
Nodes = ?acquire_nodes(NodeConfig, Config),
Node1 = hd(Nodes),
%%?verbose("Mnesia info: ~p~n", [mnesia:info()]),
@@ -922,7 +922,7 @@ updates_during_checkpoint_iteration_2_disc_only(Config) when is_list(Config) ->
updates_during_checkpoint_iteration(ReplicaType,NodeConfig,Config) ->
%?verbose("updates_during_checkpoint_iteration2 at ~p~n", [self()]),
- Delay = 5,
+ Delay = 2,
Nodes = ?acquire_nodes(NodeConfig, Config),
Node1 = hd(Nodes),
%?verbose("Mnesia info: ~p~n", [mnesia:info()]),
diff --git a/lib/mnesia/test/mt.erl b/lib/mnesia/test/mt.erl
index c1859bef3f..0bb6aadc4e 100644
--- a/lib/mnesia/test/mt.erl
+++ b/lib/mnesia/test/mt.erl
@@ -245,30 +245,32 @@ start_nodes() ->
%% loop one testcase /suite until it fails
loop(Case) ->
- loop_1(Case,-1,read_config()).
+ loop_1(Case,1,infinity,read_config()).
loop(M,F) when is_atom(F) ->
- loop_1({M,F},-1,read_config());
+ loop_1({M,F},1, infinity, read_config());
loop(Case,N) when is_integer(N) ->
- loop_1(Case, N,read_config()).
+ loop_1(Case, 1, N,read_config()).
loop(M,F,N) when is_integer(N) ->
- loop_1({M,F},N,read_config()).
+ loop_1({M,F},1, N,read_config()).
+
+loop_1(Case,N,Max,Config) when N < Max ->
+ io:format("~nLoop test ~p ~n", [abs(N)]),
-loop_1(Case,N,Config) when N /= 0 ->
- io:format("Loop test ~p ~n", [abs(N)]),
case ok_result(Res = t(Case,Config)) of
true ->
- loop_1(Case,N-1,Config);
+ loop_1(Case,N+1,Max,Config);
error ->
+ io:format("Failed after ~p~n", [N]),
Res
end;
-loop_1(_,_,_) ->
+loop_1(_,_,_,_) ->
ok.
-
+
ok_result([{_T,{ok,_,_}}|R]) ->
ok_result(R);
-ok_result([{_T,{TC,List}}|R]) when is_tuple(TC), is_list(List) ->
+ok_result([{_T,{TC,List}}|R]) when is_tuple(TC), is_list(List) ->
ok_result(List) andalso ok_result(R);
ok_result([]) -> true;
ok_result(_) -> error.
--
2.35.3