Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:23
erlang
0912-mnesia-Avoid-schema-lock-when-loading-tabl...
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 0912-mnesia-Avoid-schema-lock-when-loading-tables.patch of Package erlang
From 01a4882b1c5b7316edd1cf85e19a6afdc632ebb7 Mon Sep 17 00:00:00 2001 From: Dan Gudmundsson <dgud@erlang.org> Date: Tue, 7 Dec 2021 17:30:49 +0100 Subject: [PATCH 1/3] mnesia: Avoid schema lock when loading tables By handling errors that can happen if a table is remove or changed during table copying we can remove the schema lock on that table to greatly increase the startup when many nodes startup at the same time and starts loading tables which will also grab a schema lock and thus cause long startup times. --- lib/mnesia/src/mnesia_controller.erl | 54 ++++--- lib/mnesia/src/mnesia_loader.erl | 202 ++++++++++++++++----------- lib/mnesia/src/mnesia_monitor.erl | 6 +- 3 files changed, 164 insertions(+), 98 deletions(-) diff --git a/lib/mnesia/src/mnesia_controller.erl b/lib/mnesia/src/mnesia_controller.erl index f6ef0bd1ae..5ea3a896e1 100644 --- a/lib/mnesia/src/mnesia_controller.erl +++ b/lib/mnesia/src/mnesia_controller.erl @@ -81,7 +81,7 @@ wait_for_tables/2, get_network_copy/3, merge_schema/0, - start_remote_sender/4, + start_remote_sender/5, schedule_late_disc_load/2 ]). @@ -154,7 +154,8 @@ max_loaders() -> -record(send_table, {table, receiver_pid, - remote_storage + remote_storage, + reason }). -record(disc_load, {table, @@ -800,7 +801,7 @@ handle_call({late_disc_load, Tabs, Reason, RemoteLoaders}, From, State) -> handle_call({unblock_table, Tab}, _Dummy, State) -> Var = {Tab, where_to_commit}, - case val(Var) of + case ?catch_val(Var) of {blocked, List} -> set(Var, List); % where_to_commit _ -> @@ -1031,6 +1032,12 @@ handle_cast({disc_load, Tab, Reason}, State) -> State2 = add_worker(Worker, State), noreply(State2); +handle_cast({send_table, Tab, Pid, Storage}, State) -> + %% {protocol, Node} = {8,5} or less + %% Let reason be undefined + Worker = #send_table{table=Tab, receiver_pid=Pid, remote_storage=Storage}, + State2 = add_worker(Worker, State), + noreply(State2); handle_cast(Worker = #send_table{}, State) -> State2 = add_worker(Worker, State), noreply(State2); @@ -1236,12 +1243,16 @@ handle_info(Done = #loader_done{worker_pid=WPid, table_name=Tab}, State0) -> handle_info(#sender_done{worker_pid=Pid, worker_res=Res}, State) -> Senders = get_senders(State), {value, {Pid,_Worker}} = lists:keysearch(Pid, 1, Senders), - if - Res == ok -> + case Res of + ok -> State2 = State#state{sender_pid = lists:keydelete(Pid, 1, Senders)}, State3 = opt_start_worker(State2), noreply(State3); - true -> + {error, {no_exists, _Tab}} -> + State2 = State#state{sender_pid = lists:keydelete(Pid, 1, Senders)}, + State3 = opt_start_worker(State2), + noreply(State3); + _ -> %% No need to send any message to the table receiver %% since it will soon get a mnesia_down anyway fatal("Sender failed: ~p~n state: ~tp~n", [Res, State]), @@ -1778,7 +1789,7 @@ update_where_to_wlock(Tab) -> %% This code is rpc:call'ed from the tab_copier process %% when it has *not* released it's table lock unannounce_add_table_copy(Tab, To) -> - ?SAFE(del_active_replica(Tab, To)), + ?CATCH(del_active_replica(Tab, To)), try To = val({Tab , where_to_read}), mnesia_lib:set_remote_where_to_read(Tab) catch _:_ -> ignore @@ -2104,10 +2115,17 @@ already_loading2(Tab, [{_,#disc_load{table=Tab}}|_]) -> true; already_loading2(Tab, [_|Rest]) -> already_loading2(Tab,Rest); already_loading2(_,[]) -> false. -start_remote_sender(Node, Tab, Receiver, Storage) -> - Msg = #send_table{table = Tab, - receiver_pid = Receiver, - remote_storage = Storage}, +start_remote_sender(Node, Tab, Receiver, Storage, Why) -> + Msg = case ?catch_val({protocol, Node}) of + {Ver, _} when Ver < {8,6} -> + {send_table, Tab, Receiver, Storage}; + _ -> + #send_table{table = Tab, + receiver_pid = Receiver, + remote_storage = Storage, + reason = Why + } + end, gen_server:cast({?SERVER_NAME, Node}, Msg). dump_and_reply(ReplyTo, Worker) -> @@ -2123,13 +2141,15 @@ dump_and_reply(ReplyTo, Worker) -> unlink(ReplyTo), exit(normal). -send_and_reply(ReplyTo, Worker) -> +send_and_reply(ReplyTo, #send_table{table=Tab, remote_storage=Storage, receiver_pid=Pid, reason=Reason}) -> %% No trap_exit, die intentionally instead - Res = mnesia_loader:send_table(Worker#send_table.receiver_pid, - Worker#send_table.table, - Worker#send_table.remote_storage), - ReplyTo ! #sender_done{worker_pid = self(), - worker_res = Res}, + Res = mnesia_loader:send_table(Pid, Tab, Storage, Reason), + case Res of + {error, {no_exists, _}} -> + Pid ! {copier_done, node()}; + _ -> ok + end, + ReplyTo ! #sender_done{worker_pid = self(), worker_res = Res}, unlink(ReplyTo), exit(normal). diff --git a/lib/mnesia/src/mnesia_loader.erl b/lib/mnesia/src/mnesia_loader.erl index b9285b3d71..da43a521a4 100644 --- a/lib/mnesia/src/mnesia_loader.erl +++ b/lib/mnesia/src/mnesia_loader.erl @@ -26,7 +26,7 @@ %% Mnesia internal stuff -export([disc_load_table/2, net_load_table/4, - send_table/3]). + send_table/4]). -export([spawned_receiver/8]). %% Spawned lock taking process @@ -194,13 +194,16 @@ do_get_disc_copy2(Tab, Reason, Storage = {ext, Alias, Mod}, _Type) -> net_load_table(Tab, {dumper,{add_table_copy, _}}=Reason, Ns, Cs) -> try_net_load_table(Tab, Reason, Ns, Cs); net_load_table(Tab, Reason, Ns, _Cs) -> - try_net_load_table(Tab, Reason, Ns, val({Tab, cstruct})). + try_net_load_table(Tab, Reason, Ns, ?catch_val({Tab, cstruct})). try_net_load_table(Tab, _Reason, [], _Cs) -> verbose("Copy failed. No active replicas of ~tp are available.~n", [Tab]), {not_loaded, none_active}; try_net_load_table(Tab, Reason, Ns, Cs) -> - Storage = mnesia_lib:cs_to_storage_type(node(), Cs), + Storage = case is_record(Cs, cstruct) of + true -> mnesia_lib:cs_to_storage_type(node(), Cs); + false -> unknown + end, do_get_network_copy(Tab, Reason, Ns, Storage, Cs). do_get_network_copy(Tab, _Reason, _Ns, unknown, _Cs) -> @@ -244,20 +244,24 @@ do_snmpify(Tab, Us, Storage) -> %% Start the recieiver init_receiver(Node, Tab, Storage, Cs, Reas={dumper,{add_table_copy, Tid}}) -> - rpc:call(Node, mnesia_lib, set, [{?MODULE, active_trans}, Tid]), - case start_remote_sender(Node, Tab, Storage) of + rpc:call(Node, mnesia_lib, set, [{?MODULE, active_trans}, Tid]), %% Needed for old nodes + case start_remote_sender(Node, Tab, Storage, {add_table_copy, Tid}) of {SenderPid, TabSize, DetsData} -> start_receiver(Tab,Storage,Cs,SenderPid,TabSize,DetsData,Reas); Else -> Else end; init_receiver(Node, Tab,Storage,Cs,Reason) -> - %% Grab a schema lock to avoid deadlock between table_loader and schema_commit dumping. - %% Both may grab tables-locks in different order. Load = fun() -> - {_,Tid,Ts} = get(mnesia_activity_state), - mnesia_locker:rlock(Tid, Ts#tidstore.store, {schema, Tab}), + %% {protocol, Node} = {8,5} or less + %% We need to grab a schema_lock here because sender expects it. + case ?catch_val({protocol, Node}) of + {Ver, _} when Ver < {8,6} -> + {_,Tid,Ts} = get(mnesia_activity_state), + mnesia_locker:rlock(Tid, Ts#tidstore.store, {schema, Tab}); + _ -> ok + end, %% Check that table still exists Active = val({Tab, active_replicas}), %% Check that we haven't loaded it already @@ -265,11 +269,9 @@ init_receiver(Node, Tab,Storage,Cs,Reaso true -> ok; _ -> %% And that sender still got a copy - %% (something might have happened while - %% we where waiting for the lock) true = lists:member(Node, Active), {SenderPid, TabSize, DetsData} = - start_remote_sender(Node,Tab,Storage), + start_remote_sender(Node,Tab,Storage,load), Init = table_init_fun(SenderPid, Storage), Args = [self(),Tab,Storage,Cs,SenderPid, TabSize,DetsData,Init], @@ -296,8 +301,8 @@ init_receiver(Node, Tab,Storage,Cs,Reason) -> unlink(whereis(mnesia_tm)), %% Avoid late unlink from tm Res. -start_remote_sender(Node,Tab,Storage) -> - mnesia_controller:start_remote_sender(Node, Tab, self(), Storage), +start_remote_sender(Node,Tab,Storage, Why) -> + mnesia_controller:start_remote_sender(Node, Tab, self(), Storage, Why), put(mnesia_table_sender_node, {Tab, Node}), receive {SenderPid, {first, _} = Msg} @@ -756,64 +761,42 @@ calc_nokeys(Storage, Tab) -> BinSize = size(term_to_binary(Recs)), (?MAX_TRANSFER_SIZE div BinSize) + 1. -send_table(Pid, Tab, RemoteS) -> +send_table(Pid, Tab, RemoteS, Reason) -> case ?catch_val({Tab, storage_type}) of {'EXIT', _} -> {error, {no_exists, Tab}}; unknown -> {error, {no_exists, Tab}}; Storage -> - do_send_table(Pid, Tab, Storage, RemoteS) + do_send_table(Pid, Tab, Storage, RemoteS, Reason) end. -do_send_table(Pid, Tab, Storage, RemoteS) -> - {Init, Chunk} = - case Storage of - {ext, Alias, Mod} -> - case Mod:sender_init(Alias, Tab, RemoteS, Pid) of - {standard, I, C} -> - Pid ! {self(), {first, Mod:info(Alias, Tab, size)}}, - {I, C}; - {_, _} = Res -> - Res - end; - Storage -> - %% Send first - TabSize = mnesia:table_info(Tab, size), - KeysPerTransfer = calc_nokeys(Storage, Tab), - ChunkData = dets:info(Tab, bchunk_format), - - UseDetsChunk = - Storage == RemoteS andalso - Storage == disc_only_copies andalso - ChunkData /= undefined, - if - UseDetsChunk == true -> - DetsInfo = erlang:system_info(version), - Pid ! {self(), {first, TabSize, {DetsInfo, ChunkData}}}; - true -> - Pid ! {self(), {first, TabSize}} - end, - {_I, _C} = - reader_funcs(UseDetsChunk, Tab, Storage, KeysPerTransfer) - end, +do_send_table(Pid, Tab, Storage, RemoteS, LoadReason) -> %% Debug info put(mnesia_table_sender, {Tab, node(Pid), Pid}), - - SendIt = fun() -> - NeedLock = need_lock(Tab), - {atomic, ok} = prepare_copy(Pid, Tab, Storage, NeedLock), - send_more(Pid, 1, Chunk, Init(), Tab, Storage), - finish_copy(Pid, Tab, Storage, RemoteS, NeedLock) - end, - - try SendIt() of + try + begin + {Init, Chunk} = get_chunk_func(Pid, Tab, Storage, RemoteS), + NeedLock = need_lock(Tab, LoadReason), + case prepare_copy(Pid, Tab, Storage, NeedLock) of + {atomic, ok} -> + send_more(Pid, 1, Chunk, Init(), Tab, Storage), + finish_copy(Pid, Tab, Storage, RemoteS, NeedLock); + Error -> + Error + end + end + of {_, receiver_died} -> ok; - {atomic, no_more} -> ok + {atomic, no_more} -> ok; + {aborted, {no_exists, _}=Err} -> {error, Err} catch throw:receiver_died -> cleanup_tab_copier(Pid, Storage, Tab), ok; + throw:{no_exists, _}=Err -> + cleanup_tab_copier(Pid, Storage, Tab), + {error, Err}; error:Reason:Stacktrace -> %% Prepare failed cleanup_tab_copier(Pid, Storage, Tab), {error, {tab_copier, Tab, {Reason, Stacktrace}}} @@ -824,16 +807,22 @@ do_send_table(Pid, Tab, Storage, RemoteS) -> prepare_copy(Pid, Tab, Storage, NeedLock) -> Trans = fun() -> - NeedLock andalso mnesia:lock_table(Tab, load), + NeedLock andalso mnesia:read_lock_table(Tab), mnesia_subscr:subscribe(Pid, {table, Tab}), update_where_to_write(Tab, node(Pid)), - mnesia_lib:db_fixtable(Storage, Tab, true), + try mnesia_lib:db_fixtable(Storage, Tab, true) + catch _:badarg -> mnesia:abort({no_exists, Tab}) + end, ok end, mnesia:transaction(Trans). - -need_lock(Tab) -> +need_lock(Tab, {add_table_copy, Tid}) -> + case mnesia_locker:get_held_locks(Tab) of + [{write, Tid}|_] -> false; %% Move table grabs write lock + _Locks -> true + end; +need_lock(Tab, undefined) -> case ?catch_val({?MODULE, active_trans}) of #tid{} = Tid -> %% move_table_copy grabs it's own table-lock @@ -843,9 +832,11 @@ need_lock(Tab) -> [{write, Tid}|_] -> false; _Locks -> true end; - _ -> + _Tid -> true - end. + end; +need_lock(_, _) -> + true. update_where_to_write(Tab, Node) -> case val({Tab, access_mode}) of @@ -900,20 +891,72 @@ send_more(Pid, N, Chunk, DataState, Tab, Storage) -> throw(receiver_died) end. +get_chunk_func(Pid, Tab, {ext, Alias, Mod}, RemoteS) -> + try + case Mod:sender_init(Alias, Tab, RemoteS, Pid) of + {standard, I, C} -> + Pid ! {self(), {first, Mod:info(Alias, Tab, size)}}, + {I, C}; + {_, _} = Res -> + Res + end + catch _:Reason -> + verbose("Init chunk failed: ~p ~p~n",[Tab, Reason]), + throw({no_exists, Tab}) + end; +get_chunk_func(Pid, Tab, Storage, RemoteS) -> + try + TabSize = mnesia:table_info(Tab, size), + KeysPerTransfer = calc_nokeys(Storage, Tab), + ChunkData = dets:info(Tab, bchunk_format), + UseDetsChunk = + Storage == RemoteS andalso + Storage == disc_only_copies andalso + ChunkData /= undefined, + if + UseDetsChunk == true -> + DetsInfo = erlang:system_info(version), + Pid ! {self(), {first, TabSize, {DetsInfo, ChunkData}}}; + true -> + Pid ! {self(), {first, TabSize}} + end, + reader_funcs(UseDetsChunk, Tab, Storage, KeysPerTransfer) + catch _:Reason -> + verbose("Init chunk failed: ~p ~p~n",[Tab, Reason]), + throw({no_exists, Tab}) + end. + reader_funcs(UseDetsChunk, Tab, Storage, KeysPerTransfer) -> case UseDetsChunk of false -> - {fun() -> mnesia_lib:db_init_chunk(Storage, Tab, KeysPerTransfer) end, - fun(Cont) -> mnesia_lib:db_chunk(Storage, Cont) end}; + {fun() -> init_chunk(Storage, Tab, KeysPerTransfer) end, + fun(Cont) -> chunk(Tab, Storage, Cont) end}; true -> {fun() -> dets_bchunk(Tab, start) end, fun(Cont) -> dets_bchunk(Tab, Cont) end} end. -dets_bchunk(Tab, Chunk) -> %% Arrg - case dets:bchunk(Tab, Chunk) of +init_chunk(Storage, Tab, KeysPerTransfer) -> + try mnesia_lib:db_init_chunk(Storage, Tab, KeysPerTransfer) + catch _:Reason -> + verbose("Read chunk failed: ~p ~p~n",[Tab, Reason]), + throw({no_exists, Tab}) + end. + +chunk(Tab, Storage, Chunk) -> + try mnesia_lib:db_chunk(Storage, Chunk) + catch _:Reason -> + verbose("Read chunk failed: ~p ~p~n",[Tab, Reason]), + throw({no_exists, Tab}) + end. + +dets_bchunk(Tab, Chunk) -> + try dets:bchunk(Tab, Chunk) of {Cont, Data} -> {Data, Cont}; Else -> Else + catch _:Reason -> + verbose("Read chunk failed: ~p ~p~n",[Tab, Reason]), + throw({no_exists, Tab}) end. zlib_compress(Data, Level) -> @@ -955,21 +998,22 @@ send_packet(_N, _Pid, _Chunk, DataState) -> finish_copy(Pid, Tab, Storage, RemoteS, NeedLock) -> RecNode = node(Pid), - DatBin = dat2bin(Tab, Storage, RemoteS), Node = node(Pid), Trans = fun() -> NeedLock andalso mnesia:read_lock_table(Tab), %% Check that receiver is still alive - receive {copier_done, Node} -> - throw(receiver_died) - after 0 -> ok + receive + {copier_done, Node} -> + receiver_died + after 0 -> + A = val({Tab, access_mode}), + mnesia_controller:sync_and_block_table_whereabouts(Tab, RecNode, RemoteS, A), + mnesia_checkpoint:tm_add_copy(Tab, RecNode), + DatBin = dat2bin(Tab, ?catch_val({Tab, storage_type}), RemoteS), + Pid ! {self(), {no_more, DatBin}}, + cleanup_tab_copier(Pid, Storage, Tab) end, - A = val({Tab, access_mode}), - mnesia_controller:sync_and_block_table_whereabouts(Tab, RecNode, RemoteS, A), - cleanup_tab_copier(Pid, Storage, Tab), - mnesia_checkpoint:tm_add_copy(Tab, RecNode), - Pid ! {self(), {no_more, DatBin}}, receive {Pid, no_more} -> % Dont bother about the spurious 'more' message no_more; @@ -981,8 +1025,10 @@ finish_copy(Pid, Tab, Storage, RemoteS, NeedLock) -> mnesia:transaction(Trans). cleanup_tab_copier(Pid, Storage, Tab) -> - mnesia_lib:db_fixtable(Storage, Tab, false), - mnesia_subscr:unsubscribe(Pid, {table, Tab}). + mnesia_subscr:unsubscribe(Pid, {table, Tab}), + try mnesia_lib:db_fixtable(Storage, Tab, false) + catch _:badarg -> {no_exists, Tab} + end. dat2bin(Tab, ram_copies, ram_copies) -> mnesia_lib:lock_table(Tab), diff --git a/lib/mnesia/src/mnesia_monitor.erl b/lib/mnesia/src/mnesia_monitor.erl index 45609819a7..aee84887dc 100644 --- a/lib/mnesia/src/mnesia_monitor.erl +++ b/lib/mnesia/src/mnesia_monitor.erl @@ -83,9 +83,9 @@ going_down = [], tm_started = false, early_connects = [], connecting, mq = [], remote_node_status = []}). --define(current_protocol_version, {8,5}). +-define(current_protocol_version, {8,6}). --define(previous_protocol_version, {8,4}). +-define(previous_protocol_version, {8,5}). start() -> gen_server:start_link({local, ?MODULE}, ?MODULE, @@ -196,7 +196,7 @@ protocol_version() -> %% A sorted list of acceptable protocols the %% preferred protocols are first in the list acceptable_protocol_versions() -> - [protocol_version(), ?previous_protocol_version, {8,3}]. + [protocol_version(), ?previous_protocol_version, {8,4}, {8,3}]. needs_protocol_conversion(Node) -> case {?catch_val({protocol, Node}), protocol_version()} of -- 2.34.1
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor