File 0516-pg-Fix-pg-crash-for-processes-both-joined-and-monito.patch of Package erlang
From c13277110a837dcd507a30e7f1aee182c98098fa Mon Sep 17 00:00:00 2001
From: Maxim Fedorov <maximfca@gmail.com>
Date: Sun, 17 Sep 2023 07:34:13 -0700
Subject: [PATCH] [pg] Fix pg crash for processes both joined and monitoring
Closes #7625
The same process could be monitored multiple times, for joining
the group, and for being a scope/group monitor. Before this fix,
such process termination resulted in cleaning up the wrong
structure, so the second 'DOWN' message resulted in a crash.
While at it, add a white-box test ensuring multi-monitor cleanup.
---
lib/kernel/src/pg.erl | 45 +++++++++++++++++-----------------
lib/kernel/test/pg_SUITE.erl | 47 +++++++++++++++++++++++++++++++++++-
2 files changed, 68 insertions(+), 24 deletions(-)
diff --git a/lib/kernel/src/pg.erl b/lib/kernel/src/pg.erl
index 8204576ba7..2e86dbf4dd 100644
--- a/lib/kernel/src/pg.erl
+++ b/lib/kernel/src/pg.erl
@@ -306,13 +306,13 @@ handle_call({leave_local, Group, PidOrPids}, _From, #state{scope = Scope, local
handle_call(monitor, {Pid, _Tag}, #state{scope = Scope, scope_monitors = ScopeMon} = State) ->
%% next line could also be done with iterating over process state, but it appears to be slower
Local = maps:from_list([{G,P} || [G,P] <- ets:match(Scope, {'$1', '$2', '_'})]),
- MRef = erlang:monitor(process, Pid), %% monitor the monitor, to discard it upon termination, and generate MRef
+ MRef = erlang:monitor(process, Pid, [{tag, {'DOWN', scope_monitors}}]), %% to discard it upon termination
{reply, {MRef, Local}, State#state{scope_monitors = ScopeMon#{MRef => Pid}}};
handle_call({monitor, Group}, {Pid, _Tag}, #state{scope = Scope, group_monitors = GM, monitored_groups = MG} = State) ->
%% ETS cache is writable only from this process - so get_members is safe to use
Members = get_members(Scope, Group),
- MRef = erlang:monitor(process, Pid),
+ MRef = erlang:monitor(process, Pid, [{tag, {'DOWN', group_monitors}}]),
NewMG = maps:update_with(Group, fun (Ex) -> [{Pid, MRef} | Ex] end, [{Pid, MRef}], MG),
{reply, {MRef, Members}, State#state{group_monitors = GM#{MRef => {Pid, Group}}, monitored_groups = NewMG}};
@@ -401,12 +401,13 @@ handle_info({discover, Peer}, State) ->
handle_info({discover, Peer, _ProtocolVersion}, State) ->
handle_discover(Peer, State);
-%% handle local process exit, or a local monitor exit
+%% handle local process exit
handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, local = Local,
remote = Remote, scope_monitors = ScopeMon, monitored_groups = MG} = State) when node(Pid) =:= node() ->
case maps:take(Pid, Local) of
error ->
- {noreply, maybe_drop_monitor(MRef, State)};
+ %% ignore late monitor: this can only happen when leave request and 'DOWN' are in pg queue
+ {noreply, State};
{{MRef, Groups}, NewLocal} ->
[leave_local_update_ets(Scope, ScopeMon, MG, Group, Pid) || Group <- Groups],
%% send update to all remote peers
@@ -414,7 +415,7 @@ handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, local = L
{noreply, State#state{local = NewLocal}}
end;
-%% handle remote node down or scope leaving overlay network, or a monitor from the remote node went down
+%% handle remote node down or scope leaving overlay network
handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, remote = Remote,
scope_monitors = ScopeMon, monitored_groups = MG} = State) ->
case maps:take(Pid, Remote) of
@@ -423,7 +424,22 @@ handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, remote =
leave_remote_update_ets(Scope, ScopeMon, MG, Pids, [Group]) end, RemoteMap),
{noreply, State#state{remote = NewRemote}};
error ->
- {noreply, maybe_drop_monitor(MRef, State)}
+ {noreply, MRef, State}
+ end;
+
+%% handle scope monitor exiting
+handle_info({{'DOWN', scope_monitors}, MRef, process, _Pid, _Info}, #state{scope_monitors = ScopeMon} = State) ->
+ {noreply, State#state{scope_monitors = maps:remove(MRef, ScopeMon)}};
+
+%% handle group monitor exiting
+handle_info({{'DOWN', group_monitors}, MRef, process, Pid, _Info}, #state{
+ group_monitors = GMs, monitored_groups = MG} = State) ->
+ case maps:take(MRef, GMs) of
+ error ->
+ {noreply, State};
+ {{Pid, Group}, NewGM} ->
+ %% clean up the inverse map
+ {noreply, State#state{group_monitors = NewGM, monitored_groups = demonitor_group({Pid, MRef}, Group, MG)}}
end;
%% nodedown: ignore, and wait for 'DOWN' signal for monitored process
@@ -670,23 +686,6 @@ broadcast([Dest | Tail], Msg) ->
erlang:send(Dest, Msg, [noconnect]),
broadcast(Tail, Msg).
-%% drops a monitor if DOWN was received
-maybe_drop_monitor(MRef, #state{scope_monitors = ScopeMon, group_monitors = GMs, monitored_groups = MG} = State) ->
- %% could be a local monitor going DOWN. Since it's a rare event, check should
- %% not stay in front of any other, more frequent events
- case maps:take(MRef, ScopeMon) of
- error ->
- case maps:take(MRef, GMs) of
- error ->
- State;
- {{Pid, Group}, NewGM} ->
- %% clean up the inverse map
- State#state{group_monitors = NewGM, monitored_groups = demonitor_group({Pid, MRef}, Group, MG)}
- end;
- {_Pid, NewScopeMon} ->
- State#state{scope_monitors = NewScopeMon}
- end.
-
demonitor_group(Tag, Group, MG) ->
case maps:find(Group, MG) of
{ok, [Tag]} ->
diff --git a/lib/kernel/test/pg_SUITE.erl b/lib/kernel/test/pg_SUITE.erl
index f10e89d774..de38d391e4 100644
--- a/lib/kernel/test/pg_SUITE.erl
+++ b/lib/kernel/test/pg_SUITE.erl
@@ -60,6 +60,8 @@
monitor_nonempty_scope/0, monitor_nonempty_scope/1,
monitor_scope/0, monitor_scope/1,
monitor/1,
+ monitor_self/1,
+ multi_monitor/1,
protocol_upgrade/1
]).
@@ -89,7 +91,7 @@ all() ->
groups() ->
[
{basic, [parallel], [errors, pg, leave_exit_race, single, overlay_missing,
- protocol_upgrade]},
+ protocol_upgrade, monitor_self, multi_monitor]},
{performance, [], [thundering_herd]},
{cluster, [parallel], [process_owner_check, two, initial, netsplit, trisplit, foursplit,
exchange, nolocal, double, scope_restart, missing_scope_join, empty_group_by_remote_leave,
@@ -757,6 +759,49 @@ second_monitor(Msgs) ->
second_monitor([Msg | Msgs])
end.
+%% Test for GH-7625: monitor process that joined a group
+monitor_self(Config) when is_list(Config) ->
+ F = fun() ->
+ %% spawned process both monitor and group-joined
+ pg:monitor(?FUNCTION_NAME, ?FUNCTION_NAME),
+ pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, self())
+ end,
+ {Pid, Mon} = spawn_monitor(F),
+ receive
+ {'DOWN', Mon, process, Pid, Reason} ->
+ ?assertEqual(normal, Reason)
+ end,
+ %% if pg crashes, next expression fails the test
+ sync(?FUNCTION_NAME).
+
+%% check same process monitoring several things at once,
+%% and also joining a few groups
+multi_monitor(Config) when is_list(Config) ->
+ F = fun() ->
+ Self = self(),
+ %% spawned process both monitor and group-joined
+ {RefOne, []} = pg:monitor(?FUNCTION_NAME, one),
+ {RefTwo, []} = pg:monitor(?FUNCTION_NAME, two),
+ {RefScope, _} = pg:monitor_scope(?FUNCTION_NAME),
+ ok = pg:join(?FUNCTION_NAME, one, Self),
+ ok = pg:join(?FUNCTION_NAME, two, Self),
+ sync(?FUNCTION_NAME),
+ %% ensure receiving 4 messages: two per group this process
+ [wait_message(Ref, join, Group, [Self], "Local") || {Ref, Group} <-
+ [{RefOne, one}, {RefScope, one}, {RefTwo, two}, {RefScope, two}]]
+ end,
+ {Pid, Mon} = spawn_monitor(F),
+ receive
+ {'DOWN', Mon, process, Pid, Reason} ->
+ ?assertEqual(normal, Reason)
+ end,
+ %% if pg crashes, next expression fails the test
+ sync(?FUNCTION_NAME),
+ %% white box: pg should not have any group or scope monitors
+ {state, _, _, _, SM, GM, _} = sys:get_state(?FUNCTION_NAME),
+ ?assertEqual(#{}, SM),
+ ?assertEqual(#{}, GM).
+
protocol_upgrade(Config) when is_list(Config) ->
Scope = ?FUNCTION_NAME,
Group = ?FUNCTION_NAME,
--
2.35.3