File 0516-pg-Fix-pg-crash-for-processes-both-joined-and-monito.patch of Package erlang

From c13277110a837dcd507a30e7f1aee182c98098fa Mon Sep 17 00:00:00 2001
From: Maxim Fedorov <maximfca@gmail.com>
Date: Sun, 17 Sep 2023 07:34:13 -0700
Subject: [PATCH] [pg] Fix pg crash for processes both joined and monitoring

Closes #7625

The same process could be monitored multiple times, for joining
the group, and for being a scope/group monitor. Before this fix,
such process termination resulted in cleaning up the wrong
structure, so the second 'DOWN' message resulted in a crash.

While at it, add a white-box test ensuring multi-monitor cleanup.
---
 lib/kernel/src/pg.erl        | 45 +++++++++++++++++-----------------
 lib/kernel/test/pg_SUITE.erl | 47 +++++++++++++++++++++++++++++++++++-
 2 files changed, 68 insertions(+), 24 deletions(-)

diff --git a/lib/kernel/src/pg.erl b/lib/kernel/src/pg.erl
index 8204576ba7..2e86dbf4dd 100644
--- a/lib/kernel/src/pg.erl
+++ b/lib/kernel/src/pg.erl
@@ -306,13 +306,13 @@ handle_call({leave_local, Group, PidOrPids}, _From, #state{scope = Scope, local
 handle_call(monitor, {Pid, _Tag}, #state{scope = Scope, scope_monitors = ScopeMon} = State) ->
     %% next line could also be done with iterating over process state, but it appears to be slower
     Local = maps:from_list([{G,P} || [G,P] <- ets:match(Scope, {'$1', '$2', '_'})]),
-    MRef = erlang:monitor(process, Pid), %% monitor the monitor, to discard it upon termination, and generate MRef
+    MRef = erlang:monitor(process, Pid, [{tag, {'DOWN', scope_monitors}}]), %% to discard it upon termination
     {reply, {MRef, Local}, State#state{scope_monitors = ScopeMon#{MRef => Pid}}};
 
 handle_call({monitor, Group}, {Pid, _Tag}, #state{scope = Scope, group_monitors = GM, monitored_groups = MG} = State) ->
     %% ETS cache is writable only from this process - so get_members is safe to use
     Members = get_members(Scope, Group),
-    MRef = erlang:monitor(process, Pid),
+    MRef = erlang:monitor(process, Pid, [{tag, {'DOWN', group_monitors}}]),
     NewMG = maps:update_with(Group, fun (Ex) -> [{Pid, MRef} | Ex] end, [{Pid, MRef}], MG),
     {reply, {MRef, Members}, State#state{group_monitors = GM#{MRef => {Pid, Group}}, monitored_groups = NewMG}};
 
@@ -401,12 +401,13 @@ handle_info({discover, Peer}, State) ->
 handle_info({discover, Peer, _ProtocolVersion}, State) ->
     handle_discover(Peer, State);
 
-%% handle local process exit, or a local monitor exit
+%% handle local process exit
 handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, local = Local,
     remote = Remote, scope_monitors = ScopeMon, monitored_groups = MG} = State) when node(Pid) =:= node() ->
     case maps:take(Pid, Local) of
         error ->
-            {noreply, maybe_drop_monitor(MRef, State)};
+            %% ignore late monitor: this can only happen when leave request and 'DOWN' are in pg queue
+            {noreply, State};
         {{MRef, Groups}, NewLocal} ->
             [leave_local_update_ets(Scope, ScopeMon, MG, Group, Pid) || Group <- Groups],
             %% send update to all remote peers
@@ -414,7 +415,7 @@ handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, local = L
             {noreply, State#state{local = NewLocal}}
     end;
 
-%% handle remote node down or scope leaving overlay network, or a monitor from the remote node went down
+%% handle remote node down or scope leaving overlay network
 handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, remote = Remote,
     scope_monitors = ScopeMon, monitored_groups = MG} = State)  ->
     case maps:take(Pid, Remote) of
@@ -423,7 +424,22 @@ handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, remote =
                 leave_remote_update_ets(Scope, ScopeMon, MG, Pids, [Group]) end, RemoteMap),
             {noreply, State#state{remote = NewRemote}};
         error ->
-            {noreply, maybe_drop_monitor(MRef, State)}
+            {noreply, MRef, State}
+    end;
+
+%% handle scope monitor exiting
+handle_info({{'DOWN', scope_monitors}, MRef, process, _Pid, _Info}, #state{scope_monitors = ScopeMon} = State) ->
+    {noreply, State#state{scope_monitors = maps:remove(MRef, ScopeMon)}};
+
+%% handle group monitor exiting
+handle_info({{'DOWN', group_monitors}, MRef, process, Pid, _Info}, #state{
+    group_monitors = GMs, monitored_groups = MG} = State) ->
+    case maps:take(MRef, GMs) of
+        error ->
+            {noreply, State};
+        {{Pid, Group}, NewGM} ->
+            %% clean up the inverse map
+            {noreply, State#state{group_monitors = NewGM, monitored_groups = demonitor_group({Pid, MRef}, Group, MG)}}
     end;
 
 %% nodedown: ignore, and wait for 'DOWN' signal for monitored process
@@ -670,23 +686,6 @@ broadcast([Dest | Tail], Msg) ->
     erlang:send(Dest, Msg, [noconnect]),
     broadcast(Tail, Msg).
 
-%% drops a monitor if DOWN was received
-maybe_drop_monitor(MRef, #state{scope_monitors = ScopeMon, group_monitors = GMs, monitored_groups = MG} = State) ->
-    %% could be a local monitor going DOWN. Since it's a rare event, check should
-    %%  not stay in front of any other, more frequent events
-    case maps:take(MRef, ScopeMon) of
-        error ->
-            case maps:take(MRef, GMs) of
-                error ->
-                    State;
-                {{Pid, Group}, NewGM} ->
-                    %% clean up the inverse map
-                    State#state{group_monitors = NewGM, monitored_groups = demonitor_group({Pid, MRef}, Group, MG)}
-            end;
-        {_Pid, NewScopeMon} ->
-            State#state{scope_monitors = NewScopeMon}
-    end.
-
 demonitor_group(Tag, Group, MG) ->
     case maps:find(Group, MG) of
         {ok, [Tag]} ->
diff --git a/lib/kernel/test/pg_SUITE.erl b/lib/kernel/test/pg_SUITE.erl
index f10e89d774..de38d391e4 100644
--- a/lib/kernel/test/pg_SUITE.erl
+++ b/lib/kernel/test/pg_SUITE.erl
@@ -60,6 +60,8 @@
     monitor_nonempty_scope/0, monitor_nonempty_scope/1,
     monitor_scope/0, monitor_scope/1,
     monitor/1,
+    monitor_self/1,
+    multi_monitor/1,
     protocol_upgrade/1
 ]).
 
@@ -89,7 +91,7 @@ all() ->
 groups() ->
     [
         {basic, [parallel], [errors, pg, leave_exit_race, single, overlay_missing,
-                             protocol_upgrade]},
+                             protocol_upgrade, monitor_self, multi_monitor]},
         {performance, [], [thundering_herd]},
         {cluster, [parallel], [process_owner_check, two, initial, netsplit, trisplit, foursplit,
             exchange, nolocal, double, scope_restart, missing_scope_join, empty_group_by_remote_leave,
@@ -757,6 +759,49 @@ second_monitor(Msgs) ->
             second_monitor([Msg | Msgs])
     end.
 
+%% Test for GH-7625: monitor process that joined a group
+monitor_self(Config) when is_list(Config) ->
+    F = fun() ->
+        %% spawned process both monitor and group-joined
+        pg:monitor(?FUNCTION_NAME, ?FUNCTION_NAME),
+        pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, self())
+        end,
+    {Pid, Mon} = spawn_monitor(F),
+    receive
+        {'DOWN', Mon, process, Pid, Reason} ->
+            ?assertEqual(normal, Reason)
+    end,
+    %% if pg crashes, next expression fails the test
+    sync(?FUNCTION_NAME).
+
+%% check same process monitoring several things at once,
+%% and also joining a few groups
+multi_monitor(Config) when is_list(Config) ->
+    F = fun() ->
+        Self = self(),
+        %% spawned process both monitor and group-joined
+        {RefOne, []} = pg:monitor(?FUNCTION_NAME, one),
+        {RefTwo, []} = pg:monitor(?FUNCTION_NAME, two),
+        {RefScope, _} = pg:monitor_scope(?FUNCTION_NAME),
+        ok = pg:join(?FUNCTION_NAME, one, Self),
+        ok = pg:join(?FUNCTION_NAME, two, Self),
+        sync(?FUNCTION_NAME),
+        %% ensure receiving 4 messages: two per group this process
+        [wait_message(Ref, join, Group, [Self], "Local") || {Ref, Group} <-
+            [{RefOne, one}, {RefScope, one}, {RefTwo, two}, {RefScope, two}]]
+        end,
+    {Pid, Mon} = spawn_monitor(F),
+    receive
+        {'DOWN', Mon, process, Pid, Reason} ->
+            ?assertEqual(normal, Reason)
+    end,
+    %% if pg crashes, next expression fails the test
+    sync(?FUNCTION_NAME),
+    %% white box: pg should not have any group or scope monitors
+    {state, _, _, _, SM, GM, _} = sys:get_state(?FUNCTION_NAME),
+    ?assertEqual(#{}, SM),
+    ?assertEqual(#{}, GM).
+
 protocol_upgrade(Config) when is_list(Config) ->
     Scope = ?FUNCTION_NAME,
     Group = ?FUNCTION_NAME,
-- 
2.35.3

openSUSE Build Service is sponsored by