File 0912-mnesia-Avoid-schema-lock-when-loading-tables.patch of Package erlang

From 01a4882b1c5b7316edd1cf85e19a6afdc632ebb7 Mon Sep 17 00:00:00 2001
From: Dan Gudmundsson <dgud@erlang.org>
Date: Tue, 7 Dec 2021 17:30:49 +0100
Subject: [PATCH 1/3] mnesia: Avoid schema lock when loading tables

By handling errors that can happen if a table is remove or changed
during table copying we can remove the schema lock on that table
to greatly increase the startup when many nodes startup at the
same time and starts loading tables which will also grab a schema
lock and thus cause long startup times.
---
 lib/mnesia/src/mnesia_controller.erl |  54 ++++---
 lib/mnesia/src/mnesia_loader.erl     | 202 ++++++++++++++++-----------
 lib/mnesia/src/mnesia_monitor.erl    |   6 +-
 3 files changed, 164 insertions(+), 98 deletions(-)

diff --git a/lib/mnesia/src/mnesia_controller.erl b/lib/mnesia/src/mnesia_controller.erl
index f6ef0bd1ae..5ea3a896e1 100644
--- a/lib/mnesia/src/mnesia_controller.erl
+++ b/lib/mnesia/src/mnesia_controller.erl
@@ -81,7 +81,7 @@
 	 wait_for_tables/2,
 	 get_network_copy/3,
 	 merge_schema/0,
-	 start_remote_sender/4,
+	 start_remote_sender/5,
 	 schedule_late_disc_load/2
 	]).
 
@@ -154,7 +154,8 @@ max_loaders() ->
 
 -record(send_table, {table,
 		     receiver_pid,
-		     remote_storage
+		     remote_storage,
+                     reason
 		    }).
 
 -record(disc_load, {table,
@@ -800,7 +801,7 @@ handle_call({late_disc_load, Tabs, Reason, RemoteLoaders}, From, State) ->
 
 handle_call({unblock_table, Tab}, _Dummy, State) ->
     Var = {Tab, where_to_commit},
-    case val(Var) of
+    case ?catch_val(Var) of
 	{blocked, List} ->
 	    set(Var, List); % where_to_commit
 	_ ->
@@ -1031,6 +1032,12 @@ handle_cast({disc_load, Tab, Reason}, State) ->
     State2 = add_worker(Worker, State),
     noreply(State2);
 
+handle_cast({send_table, Tab, Pid, Storage}, State) ->
+    %% {protocol, Node} = {8,5} or less
+    %% Let reason be undefined
+    Worker = #send_table{table=Tab, receiver_pid=Pid, remote_storage=Storage},
+    State2 = add_worker(Worker, State),
+    noreply(State2);
 handle_cast(Worker = #send_table{}, State) ->
     State2 = add_worker(Worker, State),
     noreply(State2);
@@ -1236,12 +1243,16 @@ handle_info(Done = #loader_done{worker_pid=WPid, table_name=Tab}, State0) ->
 handle_info(#sender_done{worker_pid=Pid, worker_res=Res}, State)  ->
     Senders = get_senders(State),
     {value, {Pid,_Worker}} = lists:keysearch(Pid, 1, Senders),
-    if
-	Res == ok ->
+    case Res of
+	ok ->
 	    State2 = State#state{sender_pid = lists:keydelete(Pid, 1, Senders)},
 	    State3 = opt_start_worker(State2),
 	    noreply(State3);
-	true ->
+        {error, {no_exists, _Tab}} ->
+            State2 = State#state{sender_pid = lists:keydelete(Pid, 1, Senders)},
+	    State3 = opt_start_worker(State2),
+	    noreply(State3);
+        _ ->
 	    %% No need to send any message to the table receiver
 	    %% since it will soon get a mnesia_down anyway
 	    fatal("Sender failed: ~p~n state: ~tp~n", [Res, State]),
@@ -1778,7 +1789,7 @@ update_where_to_wlock(Tab) ->
 %% This code is rpc:call'ed from the tab_copier process
 %% when it has *not* released it's table lock
 unannounce_add_table_copy(Tab, To) ->
-    ?SAFE(del_active_replica(Tab, To)),
+    ?CATCH(del_active_replica(Tab, To)),
     try To = val({Tab , where_to_read}),
 	 mnesia_lib:set_remote_where_to_read(Tab)
     catch _:_ -> ignore
@@ -2104,10 +2115,17 @@ already_loading2(Tab, [{_,#disc_load{table=Tab}}|_]) -> true;
 already_loading2(Tab, [_|Rest]) -> already_loading2(Tab,Rest);
 already_loading2(_,[]) -> false.
 
-start_remote_sender(Node, Tab, Receiver, Storage) ->
-    Msg = #send_table{table = Tab,
-		      receiver_pid = Receiver,
-		      remote_storage = Storage},
+start_remote_sender(Node, Tab, Receiver, Storage, Why) ->
+    Msg = case ?catch_val({protocol, Node}) of
+              {Ver, _} when Ver < {8,6} ->
+                  {send_table, Tab, Receiver, Storage};
+              _ ->
+                  #send_table{table = Tab,
+                              receiver_pid = Receiver,
+                              remote_storage = Storage,
+                              reason = Why
+                             }
+          end,
     gen_server:cast({?SERVER_NAME, Node}, Msg).
 
 dump_and_reply(ReplyTo, Worker) ->
@@ -2123,13 +2141,15 @@ dump_and_reply(ReplyTo, Worker) ->
     unlink(ReplyTo),
     exit(normal).
 
-send_and_reply(ReplyTo, Worker) ->
+send_and_reply(ReplyTo, #send_table{table=Tab, remote_storage=Storage, receiver_pid=Pid, reason=Reason}) ->
     %% No trap_exit, die intentionally instead
-    Res = mnesia_loader:send_table(Worker#send_table.receiver_pid,
-				   Worker#send_table.table,
-				   Worker#send_table.remote_storage),
-    ReplyTo ! #sender_done{worker_pid = self(),
-			   worker_res = Res},
+    Res = mnesia_loader:send_table(Pid, Tab, Storage, Reason),
+    case Res of
+        {error, {no_exists, _}} ->
+            Pid ! {copier_done, node()};
+        _ -> ok
+    end,
+    ReplyTo ! #sender_done{worker_pid = self(), worker_res = Res},
     unlink(ReplyTo),
     exit(normal).
 
diff --git a/lib/mnesia/src/mnesia_loader.erl b/lib/mnesia/src/mnesia_loader.erl
index b9285b3d71..da43a521a4 100644
--- a/lib/mnesia/src/mnesia_loader.erl
+++ b/lib/mnesia/src/mnesia_loader.erl
@@ -26,7 +26,7 @@
 %% Mnesia internal stuff
 -export([disc_load_table/2,
 	 net_load_table/4,
-	 send_table/3]).
+	 send_table/4]).
 
 -export([spawned_receiver/8]).    %% Spawned lock taking process
 
@@ -194,13 +194,16 @@ do_get_disc_copy2(Tab, Reason, Storage = {ext, Alias, Mod}, _Type) ->
 net_load_table(Tab, {dumper,{add_table_copy, _}}=Reason, Ns, Cs) ->
     try_net_load_table(Tab, Reason, Ns, Cs);
 net_load_table(Tab, Reason, Ns, _Cs) ->
-    try_net_load_table(Tab, Reason, Ns, val({Tab, cstruct})).
+    try_net_load_table(Tab, Reason, Ns, ?catch_val({Tab, cstruct})).
 
 try_net_load_table(Tab, _Reason, [], _Cs) ->
     verbose("Copy failed. No active replicas of ~tp are available.~n", [Tab]),
     {not_loaded, none_active};
 try_net_load_table(Tab, Reason, Ns, Cs) ->
-    Storage = mnesia_lib:cs_to_storage_type(node(), Cs),
+    Storage = case is_record(Cs, cstruct) of
+                  true -> mnesia_lib:cs_to_storage_type(node(), Cs);
+                  false -> unknown
+              end,
     do_get_network_copy(Tab, Reason, Ns, Storage, Cs).
 
 do_get_network_copy(Tab, _Reason, _Ns, unknown, _Cs) ->
@@ -244,20 +244,24 @@ do_snmpify(Tab, Us, Storage) ->
 
 %% Start the recieiver
 init_receiver(Node, Tab, Storage, Cs, Reas={dumper,{add_table_copy, Tid}}) ->
-    rpc:call(Node, mnesia_lib, set, [{?MODULE, active_trans}, Tid]),
-    case start_remote_sender(Node, Tab, Storage) of
+    rpc:call(Node, mnesia_lib, set, [{?MODULE, active_trans}, Tid]),  %% Needed for old nodes
+    case start_remote_sender(Node, Tab, Storage, {add_table_copy, Tid}) of
 	{SenderPid, TabSize, DetsData} ->
 	    start_receiver(Tab,Storage,Cs,SenderPid,TabSize,DetsData,Reas);
 	Else ->
 	    Else
     end;
 init_receiver(Node, Tab,Storage,Cs,Reason) ->
-    %% Grab a schema lock to avoid deadlock between table_loader and schema_commit dumping.
-    %% Both may grab tables-locks in different order.
     Load =
 	fun() ->
-		{_,Tid,Ts} = get(mnesia_activity_state),
-		mnesia_locker:rlock(Tid, Ts#tidstore.store, {schema, Tab}),
+                %% {protocol, Node} = {8,5} or less
+                %% We need to grab a schema_lock here because sender expects it.
+                case ?catch_val({protocol, Node}) of
+                    {Ver, _} when Ver < {8,6} ->
+                        {_,Tid,Ts} = get(mnesia_activity_state),
+                        mnesia_locker:rlock(Tid, Ts#tidstore.store, {schema, Tab});
+                    _ -> ok
+                end,
 		%% Check that table still exists
 		Active = val({Tab, active_replicas}),
 		%% Check that we haven't loaded it already
@@ -265,11 +269,9 @@ init_receiver(Node, Tab,Storage,Cs,Reaso
 		    true -> ok;
 		    _ ->
 			%% And that sender still got a copy
-			%% (something might have happened while
-			%% we where waiting for the lock)
 			true = lists:member(Node, Active),
 			{SenderPid, TabSize, DetsData} =
-			    start_remote_sender(Node,Tab,Storage),
+			    start_remote_sender(Node,Tab,Storage,load),
 			Init = table_init_fun(SenderPid, Storage),
 			Args = [self(),Tab,Storage,Cs,SenderPid,
 				TabSize,DetsData,Init],
@@ -296,8 +301,8 @@ init_receiver(Node, Tab,Storage,Cs,Reason) ->
     unlink(whereis(mnesia_tm)),  %% Avoid late unlink from tm
     Res.
 
-start_remote_sender(Node,Tab,Storage) ->
-    mnesia_controller:start_remote_sender(Node, Tab, self(), Storage),
+start_remote_sender(Node,Tab,Storage, Why) ->
+    mnesia_controller:start_remote_sender(Node, Tab, self(), Storage, Why),
     put(mnesia_table_sender_node, {Tab, Node}),
     receive
 	{SenderPid, {first, _} = Msg}
@@ -756,64 +761,42 @@ calc_nokeys(Storage, Tab) ->
     BinSize = size(term_to_binary(Recs)),
     (?MAX_TRANSFER_SIZE div BinSize) + 1.
 
-send_table(Pid, Tab, RemoteS) ->
+send_table(Pid, Tab, RemoteS, Reason) ->
     case ?catch_val({Tab, storage_type}) of
 	{'EXIT', _} ->
 	    {error, {no_exists, Tab}};
 	unknown ->
 	    {error, {no_exists, Tab}};
 	Storage ->
-	    do_send_table(Pid, Tab, Storage, RemoteS)
+	    do_send_table(Pid, Tab, Storage, RemoteS, Reason)
     end.
 
-do_send_table(Pid, Tab, Storage, RemoteS) ->
-    {Init, Chunk} =
-	case Storage of
-	    {ext, Alias, Mod} ->
-		case Mod:sender_init(Alias, Tab, RemoteS, Pid) of
-		    {standard, I, C} ->
-			Pid ! {self(), {first, Mod:info(Alias, Tab, size)}},
-			{I, C};
-		    {_, _} = Res ->
-			Res
-		end;
-	    Storage ->
-		%% Send first
-		TabSize = mnesia:table_info(Tab, size),
-		KeysPerTransfer = calc_nokeys(Storage, Tab),
-		ChunkData = dets:info(Tab, bchunk_format),
-
-		UseDetsChunk =
-		    Storage == RemoteS andalso
-		    Storage == disc_only_copies andalso
-		    ChunkData /= undefined,
-		if
-		    UseDetsChunk == true ->
-			DetsInfo = erlang:system_info(version),
-			Pid ! {self(), {first, TabSize, {DetsInfo, ChunkData}}};
-		    true  ->
-			Pid ! {self(), {first, TabSize}}
-		end,
-		{_I, _C} =
-		    reader_funcs(UseDetsChunk, Tab, Storage, KeysPerTransfer)
-	end,
+do_send_table(Pid, Tab, Storage, RemoteS, LoadReason) ->
     %% Debug info
     put(mnesia_table_sender, {Tab, node(Pid), Pid}),
-
-    SendIt = fun() ->
-		     NeedLock = need_lock(Tab),
-		     {atomic, ok} = prepare_copy(Pid, Tab, Storage, NeedLock),
-		     send_more(Pid, 1, Chunk, Init(), Tab, Storage),
-		     finish_copy(Pid, Tab, Storage, RemoteS, NeedLock)
-	     end,
-
-    try SendIt() of
+    try
+        begin
+            {Init, Chunk} = get_chunk_func(Pid, Tab, Storage, RemoteS),
+            NeedLock = need_lock(Tab, LoadReason),
+            case prepare_copy(Pid, Tab, Storage, NeedLock) of
+                {atomic, ok} ->
+                    send_more(Pid, 1, Chunk, Init(), Tab, Storage),
+                    finish_copy(Pid, Tab, Storage, RemoteS, NeedLock);
+                Error ->
+                    Error
+            end
+        end
+    of
         {_, receiver_died} -> ok;
-        {atomic, no_more} ->  ok
+        {atomic, no_more} ->  ok;
+        {aborted, {no_exists, _}=Err} -> {error, Err}
     catch
         throw:receiver_died ->
             cleanup_tab_copier(Pid, Storage, Tab),
             ok;
+        throw:{no_exists, _}=Err ->
+            cleanup_tab_copier(Pid, Storage, Tab),
+            {error, Err};
         error:Reason:Stacktrace -> %% Prepare failed
             cleanup_tab_copier(Pid, Storage, Tab),
             {error, {tab_copier, Tab, {Reason, Stacktrace}}}
@@ -824,16 +807,22 @@ do_send_table(Pid, Tab, Storage, RemoteS) ->
 prepare_copy(Pid, Tab, Storage, NeedLock) ->
     Trans =
 	fun() ->
-		NeedLock andalso mnesia:lock_table(Tab, load),
+		NeedLock andalso mnesia:read_lock_table(Tab),
 		mnesia_subscr:subscribe(Pid, {table, Tab}),
 		update_where_to_write(Tab, node(Pid)),
-		mnesia_lib:db_fixtable(Storage, Tab, true),
+                try mnesia_lib:db_fixtable(Storage, Tab, true)
+                catch _:badarg -> mnesia:abort({no_exists, Tab})
+                end,
 		ok
 	end,
     mnesia:transaction(Trans).
 
-
-need_lock(Tab) ->
+need_lock(Tab, {add_table_copy, Tid}) ->
+    case mnesia_locker:get_held_locks(Tab) of
+        [{write, Tid}|_] -> false;  %% Move table grabs write lock
+        _Locks -> true
+    end;
+need_lock(Tab, undefined) ->
     case ?catch_val({?MODULE, active_trans}) of
 	#tid{} = Tid ->
 	    %% move_table_copy grabs it's own table-lock
@@ -843,9 +832,11 @@ need_lock(Tab) ->
 		[{write, Tid}|_] -> false;
 		_Locks -> true
 	    end;
-	_ ->
+	_Tid ->
 	    true
-    end.
+    end;
+need_lock(_, _) ->
+    true.
 
 update_where_to_write(Tab, Node) ->
     case val({Tab, access_mode}) of
@@ -900,20 +891,72 @@ send_more(Pid, N, Chunk, DataState, Tab, Storage) ->
 	    throw(receiver_died)
     end.
 
+get_chunk_func(Pid, Tab, {ext, Alias, Mod}, RemoteS) ->
+    try
+        case Mod:sender_init(Alias, Tab, RemoteS, Pid) of
+            {standard, I, C} ->
+                Pid ! {self(), {first, Mod:info(Alias, Tab, size)}},
+                {I, C};
+            {_, _} = Res ->
+                Res
+        end
+    catch _:Reason ->
+            verbose("Init chunk failed: ~p ~p~n",[Tab, Reason]),
+            throw({no_exists, Tab})
+    end;
+get_chunk_func(Pid, Tab, Storage, RemoteS) ->
+    try
+        TabSize = mnesia:table_info(Tab, size),
+        KeysPerTransfer = calc_nokeys(Storage, Tab),
+        ChunkData = dets:info(Tab, bchunk_format),
+        UseDetsChunk =
+            Storage == RemoteS andalso
+            Storage == disc_only_copies andalso
+            ChunkData /= undefined,
+        if
+            UseDetsChunk == true ->
+                DetsInfo = erlang:system_info(version),
+                Pid ! {self(), {first, TabSize, {DetsInfo, ChunkData}}};
+            true  ->
+                Pid ! {self(), {first, TabSize}}
+        end,
+        reader_funcs(UseDetsChunk, Tab, Storage, KeysPerTransfer)
+    catch _:Reason ->
+            verbose("Init chunk failed: ~p ~p~n",[Tab, Reason]),
+            throw({no_exists, Tab})
+    end.
+
 reader_funcs(UseDetsChunk, Tab, Storage, KeysPerTransfer) ->
     case UseDetsChunk of
 	false ->
-	    {fun() -> mnesia_lib:db_init_chunk(Storage, Tab, KeysPerTransfer) end,
-	     fun(Cont) -> mnesia_lib:db_chunk(Storage, Cont) end};
+	    {fun() -> init_chunk(Storage, Tab, KeysPerTransfer) end,
+	     fun(Cont) -> chunk(Tab, Storage, Cont) end};
 	true ->
 	    {fun() -> dets_bchunk(Tab, start) end,
 	     fun(Cont) -> dets_bchunk(Tab, Cont) end}
     end.
 
-dets_bchunk(Tab, Chunk) -> %% Arrg
-    case dets:bchunk(Tab, Chunk) of
+init_chunk(Storage, Tab, KeysPerTransfer) ->
+    try mnesia_lib:db_init_chunk(Storage, Tab, KeysPerTransfer)
+    catch _:Reason ->
+            verbose("Read chunk failed: ~p ~p~n",[Tab, Reason]),
+            throw({no_exists, Tab})
+    end.
+
+chunk(Tab, Storage, Chunk) ->
+    try mnesia_lib:db_chunk(Storage, Chunk)
+    catch _:Reason ->
+            verbose("Read chunk failed: ~p ~p~n",[Tab, Reason]),
+            throw({no_exists, Tab})
+    end.
+
+dets_bchunk(Tab, Chunk) ->
+    try dets:bchunk(Tab, Chunk) of
 	{Cont, Data} -> {Data, Cont};
 	Else -> Else
+    catch _:Reason ->
+            verbose("Read chunk failed: ~p ~p~n",[Tab, Reason]),
+            throw({no_exists, Tab})
     end.
 
 zlib_compress(Data, Level) ->
@@ -955,21 +998,22 @@ send_packet(_N, _Pid, _Chunk, DataState) ->
 
 finish_copy(Pid, Tab, Storage, RemoteS, NeedLock) ->
     RecNode = node(Pid),
-    DatBin = dat2bin(Tab, Storage, RemoteS),
     Node = node(Pid),
     Trans =
 	fun() ->
 		NeedLock andalso mnesia:read_lock_table(Tab),
                 %% Check that receiver is still alive
-                receive {copier_done, Node} ->
-                        throw(receiver_died)
-                after 0 -> ok
+                receive
+                    {copier_done, Node} ->
+                        receiver_died
+                after 0 ->
+                        A = val({Tab, access_mode}),
+                        mnesia_controller:sync_and_block_table_whereabouts(Tab, RecNode, RemoteS, A),
+                        mnesia_checkpoint:tm_add_copy(Tab, RecNode),
+                        DatBin = dat2bin(Tab, ?catch_val({Tab, storage_type}), RemoteS),
+                        Pid ! {self(), {no_more, DatBin}},
+                        cleanup_tab_copier(Pid, Storage, Tab)
                 end,
-		A = val({Tab, access_mode}),
-		mnesia_controller:sync_and_block_table_whereabouts(Tab, RecNode, RemoteS, A),
-		cleanup_tab_copier(Pid, Storage, Tab),
-		mnesia_checkpoint:tm_add_copy(Tab, RecNode),
-		Pid ! {self(), {no_more, DatBin}},
 		receive
 		    {Pid, no_more} -> % Dont bother about the spurious 'more' message
 			no_more;
@@ -981,8 +1025,10 @@ finish_copy(Pid, Tab, Storage, RemoteS, NeedLock) ->
     mnesia:transaction(Trans).
 
 cleanup_tab_copier(Pid, Storage, Tab) ->
-    mnesia_lib:db_fixtable(Storage, Tab, false),
-    mnesia_subscr:unsubscribe(Pid, {table, Tab}).
+    mnesia_subscr:unsubscribe(Pid, {table, Tab}),
+    try mnesia_lib:db_fixtable(Storage, Tab, false)
+    catch _:badarg -> {no_exists, Tab}
+    end.
 
 dat2bin(Tab, ram_copies, ram_copies) ->
     mnesia_lib:lock_table(Tab),
diff --git a/lib/mnesia/src/mnesia_monitor.erl b/lib/mnesia/src/mnesia_monitor.erl
index 45609819a7..aee84887dc 100644
--- a/lib/mnesia/src/mnesia_monitor.erl
+++ b/lib/mnesia/src/mnesia_monitor.erl
@@ -83,9 +83,9 @@
 		going_down = [], tm_started = false, early_connects = [],
 		connecting, mq = [], remote_node_status = []}).
 
--define(current_protocol_version,  {8,5}).
+-define(current_protocol_version,  {8,6}).
 
--define(previous_protocol_version, {8,4}).
+-define(previous_protocol_version, {8,5}).
 
 start() ->
     gen_server:start_link({local, ?MODULE}, ?MODULE,
@@ -196,7 +196,7 @@ protocol_version() ->
 %% A sorted list of acceptable protocols the
 %% preferred protocols are first in the list
 acceptable_protocol_versions() ->
-    [protocol_version(), ?previous_protocol_version, {8,3}].
+    [protocol_version(), ?previous_protocol_version, {8,4}, {8,3}].
 
 needs_protocol_conversion(Node) ->
     case {?catch_val({protocol, Node}), protocol_version()} of
-- 
2.34.1

openSUSE Build Service is sponsored by