File 5743-pg-Implement-single-group-monitoring.patch of Package erlang

From af9b3d19d834e6bdc1da21eaaf7e43d74220004b Mon Sep 17 00:00:00 2001
From: Maxim Fedorov <maximfca@gmail.com>
Date: Tue, 7 Jun 2022 15:15:11 -0700
Subject: [PATCH 3/4] [pg] Implement single group monitoring

Implement pg:monitor/1,2 that subscribe to changes for a single
process group (similar to monitor_scope/0,1).
---
 lib/kernel/doc/src/pg.xml    |  15 ++-
 lib/kernel/src/pg.erl        | 185 ++++++++++++++++++++++++-----------
 lib/kernel/test/pg_SUITE.erl |  97 ++++++++++++------
 3 files changed, 210 insertions(+), 87 deletions(-)

diff --git a/lib/kernel/doc/src/pg.xml b/lib/kernel/doc/src/pg.xml
index 85b1529f44..4a308dca73 100644
--- a/lib/kernel/doc/src/pg.xml
+++ b/lib/kernel/doc/src/pg.xml
@@ -165,12 +165,25 @@
       </desc>
     </func>
 
+    <func>
+      <name name="monitor" arity="1" since="OTP 25.1"/>
+      <name name="monitor" arity="2" since="OTP 25.1"/>
+      <fsummary>Starts membership monitoring for a specified group.</fsummary>
+      <desc>
+        <p>Subscribes the caller to updates for the specified group. Returns
+          list of processes currently in the group, and a reference to match
+          the upcoming notifications.</p>
+          <p>See <seemfa marker="#monitor_scope/0"><c>monitor_scope/0</c></seemfa>
+          for the update message structure.</p>
+      </desc>
+    </func>
+
     <func>
       <name name="demonitor" arity="1" since="OTP 25.1"/>
       <name name="demonitor" arity="2" since="OTP 25.1"/>
       <fsummary>Stops group membership monitoring.</fsummary>
       <desc>
-        <p>Unsubscribes the caller from updates off the specified scope.
+        <p>Unsubscribes the caller from updates (scope or group).
         Flushes all outstanding updates that were already in the message
         queue of the calling process.</p>
       </desc>
diff --git a/lib/kernel/src/pg.erl b/lib/kernel/src/pg.erl
index 845021c7fa..a44794e352 100644
--- a/lib/kernel/src/pg.erl
+++ b/lib/kernel/src/pg.erl
@@ -62,6 +62,8 @@
     leave/2,
     monitor_scope/0,
     monitor_scope/1,
+    monitor/1,
+    monitor/2,
     demonitor/1,
     demonitor/2,
     get_members/1,
@@ -160,6 +162,26 @@ monitor_scope() ->
 monitor_scope(Scope) ->
     gen_server:call(Scope, monitor, infinity).
 
+%%--------------------------------------------------------------------
+%% @doc
+%% Returns list of processes in the requested group, and begins monitoring
+%%  group changes. Calling process will receive {Ref, join, Group, Pids}
+%%  message when new Pids join the Group, and {Ref, leave, Group, Pids} when
+%%  Pids leave the group.
+-spec monitor(Group :: group()) -> {reference(), [pid()]}.
+monitor(Group) ->
+    ?MODULE:monitor(?DEFAULT_SCOPE, Group).
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Returns list of processes in the requested group, and begins monitoring
+%%  group changes. Calling process will receive {Ref, join, Group, Pids}
+%%  message when new Pids join the Group, and {Ref, leave, Group, Pids} when
+%%  Pids leave the group.
+-spec monitor(Scope :: atom(), Group :: group()) -> {reference(), [pid()]}.
+monitor(Scope, Group) ->
+    gen_server:call(Scope, {monitor, Group}, infinity).
+
 %%--------------------------------------------------------------------
 %% @doc
 %% Stops monitoring Scope for groups changes. Flushes all
@@ -238,7 +260,10 @@ which_local_groups(Scope) when is_atom(Scope) ->
     %% remote node: scope process monitor and map of groups to pids for fast sync routine
     remote = #{} :: #{pid() => {reference(), #{group() => [pid()]}}},
     %% processes monitoring group membership
-    scope_monitors = #{} :: #{reference() => pid()}
+    scope_monitors = #{} :: #{reference() => pid()},
+    %% processes monitoring specific groups (forward and reverse map)
+    group_monitors = #{} :: #{reference() => group()},
+    monitored_groups = #{} :: #{group() => [{pid(), reference()}]}
 }).
 
 -type state() :: #state{}.
@@ -254,25 +279,26 @@ init([Scope]) ->
 -spec handle_call(Call :: {join_local, Group :: group(), Pid :: pid()}
                         | {leave_local, Group :: group(), Pid :: pid()}
                         | monitor
+                        | {monitor, Group :: group()}
                         | {demonitor, Ref :: reference()},
                   From :: {pid(), Tag :: any()},
                   State :: state()) ->
     {reply, ok | not_joined | {reference(), #{group() => [pid()]}} | false, state()}.
 
 handle_call({join_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local,
-    remote = Remote, scope_monitors = ScopeMon} = State) ->
+    remote = Remote, scope_monitors = ScopeMon, monitored_groups = MG} = State) ->
     NewLocal = join_local(PidOrPids, Group, Local),
-    join_local_update_ets(Scope, ScopeMon, Group, PidOrPids),
+    join_local_update_ets(Scope, ScopeMon, MG, Group, PidOrPids),
     broadcast(maps:keys(Remote), {join, self(), Group, PidOrPids}),
     {reply, ok, State#state{local = NewLocal}};
 
 handle_call({leave_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local,
-    remote = Remote, scope_monitors = ScopeMon} = State) ->
+    remote = Remote, scope_monitors = ScopeMon, monitored_groups = MG} = State) ->
     case leave_local(PidOrPids, Group, Local) of
         Local ->
             {reply, not_joined, State};
         NewLocal ->
-            leave_local_update_ets(Scope, ScopeMon, Group, PidOrPids),
+            leave_local_update_ets(Scope, ScopeMon, MG, Group, PidOrPids),
             broadcast(maps:keys(Remote), {leave, self(), PidOrPids, [Group]}),
             {reply, ok, State#state{local = NewLocal}}
     end;
@@ -283,24 +309,45 @@ handle_call(monitor, {Pid, _Tag}, #state{scope = Scope, scope_monitors = ScopeMo
     MRef = erlang:monitor(process, Pid), %% monitor the monitor, to discard it upon termination, and generate MRef
     {reply, {MRef, Local}, State#state{scope_monitors = ScopeMon#{MRef => Pid}}};
 
-handle_call({demonitor, Ref}, _From, #state{scope_monitors = ScopeMon} = State) ->
+handle_call({monitor, Group}, {Pid, _Tag}, #state{scope = Scope, group_monitors = GM, monitored_groups = MG} = State) ->
+    %% ETS cache is writable only from this process - so get_members is safe to use
+    Members = get_members(Scope, Group),
+    MRef = erlang:monitor(process, Pid),
+    NewMG = maps:update_with(Group, fun (Ex) -> [{Pid, MRef} | Ex] end, [{Pid, MRef}], MG),
+    {reply, {MRef, Members}, State#state{group_monitors = GM#{MRef => {Pid, Group}}, monitored_groups = NewMG}};
+
+handle_call({demonitor, Ref}, _From, #state{scope_monitors = ScopeMon, group_monitors = GM,
+    monitored_groups = MG} = State) ->
+    %% not using maybe_drop_monitor here as it does not demonitor, and can not return 'false'
     case maps:take(Ref, ScopeMon) of
         {_, NewMons} ->
             erlang:demonitor(Ref),
             {reply, ok, State#state{scope_monitors = NewMons}};
         error ->
-            {reply, false, State}
+            %% group monitor
+            case maps:take(Ref, GM) of
+                {{Pid, Group}, NewGM} ->
+                    erlang:demonitor(Ref),
+                    {reply, ok, State#state{group_monitors = NewGM,
+                        monitored_groups = demonitor_group({Pid, Ref}, Group, MG)}};
+                error ->
+                    {reply, false, State}
+            end
     end;
 
 handle_call(_Request, _From, _S) ->
     erlang:error(badarg).
 
 -spec handle_cast(
-    {sync, Peer :: pid(), Groups :: [{group(), [pid()]}]},
+    {sync, Peer :: pid(), Groups :: [{group(), [pid()]}]} |
+    {discover, Peer :: pid()} |
+    {join, Peer :: pid(), group(), pid() | [pid()]} |
+    {leave, Peer :: pid(), pid() | [pid()], [group()]},
     State :: state()) -> {noreply, state()}.
 
-handle_cast({sync, Peer, Groups}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon} = State) ->
-    {noreply, State#state{remote = handle_sync(Scope, ScopeMon, Peer, Remote, Groups)}};
+handle_cast({sync, Peer, Groups}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon,
+    monitored_groups = MG} = State) ->
+    {noreply, State#state{remote = handle_sync(Scope, ScopeMon, MG, Peer, Remote, Groups)}};
 
 handle_cast(_, _State) ->
     erlang:error(badarg).
@@ -313,10 +360,11 @@ handle_cast(_, _State) ->
     {nodedown, node()} | {nodeup, node()}, State :: state()) -> {noreply, state()}.
 
 %% remote pid or several pids joining the group
-handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon} = State) ->
+handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon,
+    monitored_groups = MG} = State) ->
     case maps:get(Peer, Remote, []) of
         {MRef, RemoteGroups} ->
-            join_remote_update_ets(Scope, ScopeMon, Group, PidOrPids),
+            join_remote_update_ets(Scope, ScopeMon, MG, Group, PidOrPids),
             %% store remote group => pids map for fast sync operation
             NewRemoteGroups = join_remote(Group, PidOrPids, RemoteGroups),
             {noreply, State#state{remote = Remote#{Peer => {MRef, NewRemoteGroups}}}};
@@ -329,10 +377,11 @@ handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, remote = Remot
     end;
 
 %% remote pid leaving (multiple groups at once)
-handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon} = State) ->
+handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon,
+    monitored_groups = MG} = State) ->
     case maps:get(Peer, Remote, []) of
         {MRef, RemoteMap} ->
-            _ = leave_remote_update_ets(Scope, ScopeMon, PidOrPids, Groups),
+            _ = leave_remote_update_ets(Scope, ScopeMon, MG, PidOrPids, Groups),
             NewRemoteMap = leave_remote(PidOrPids, RemoteMap, Groups),
             {noreply, State#state{remote = Remote#{Peer => {MRef, NewRemoteMap}}}};
         [] ->
@@ -358,28 +407,29 @@ handle_info({discover, Peer}, #state{remote = Remote, local = Local} = State) ->
             {noreply, State#state{remote = Remote#{Peer => {MRef, #{}}}}}
     end;
 
-%% handle local process exit
-handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, local = Local, remote = Remote,
-    scope_monitors = ScopeMon} = State) when node(Pid) =:= node() ->
+%% handle local process exit, or a local monitor exit
+handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, local = Local,
+    remote = Remote, scope_monitors = ScopeMon, monitored_groups = MG} = State) when node(Pid) =:= node() ->
     case maps:take(Pid, Local) of
         error ->
-            maybe_drop_monitor(MRef, State);
+            {noreply, maybe_drop_monitor(MRef, State)};
         {{MRef, Groups}, NewLocal} ->
-            [leave_local_update_ets(Scope, ScopeMon, Group, Pid) || Group <- Groups],
-            %% send update to all scope processes on remote nodes
+            [leave_local_update_ets(Scope, ScopeMon, MG, Group, Pid) || Group <- Groups],
+            %% send update to all remote peers
             broadcast(maps:keys(Remote), {leave, self(), Pid, Groups}),
             {noreply, State#state{local = NewLocal}}
     end;
 
 %% handle remote node down or scope leaving overlay network, or a monitor from the remote node went down
 handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, remote = Remote,
-    scope_monitors = ScopeMon} = State)  ->
+    scope_monitors = ScopeMon, monitored_groups = MG} = State)  ->
     case maps:take(Pid, Remote) of
         {{MRef, RemoteMap}, NewRemote} ->
-            maps:foreach(fun (Group, Pids) -> leave_remote_update_ets(Scope, ScopeMon, Pids, [Group]) end, RemoteMap),
+            maps:foreach(fun (Group, Pids) ->
+                leave_remote_update_ets(Scope, ScopeMon, MG, Pids, [Group]) end, RemoteMap),
             {noreply, State#state{remote = NewRemote}};
         error ->
-            maybe_drop_monitor(MRef, State)
+            {noreply, maybe_drop_monitor(MRef, State)}
     end;
 
 %% nodedown: ignore, and wait for 'DOWN' signal for monitored process
@@ -420,7 +470,7 @@ ensure_local(Bad) ->
 %% Override all knowledge of the remote node with information it sends
 %%  to local node. Current implementation must do the full table scan
 %%  to remove stale pids (just as for 'nodedown').
-handle_sync(Scope, ScopeMon, Peer, Remote, Groups) ->
+handle_sync(Scope, ScopeMon, MG, Peer, Remote, Groups) ->
     %% can't use maps:get() because it evaluates 'default' value first,
     %%   and in this case monitor() call has side effect.
     {MRef, RemoteGroups} =
@@ -431,25 +481,25 @@ handle_sync(Scope, ScopeMon, Peer, Remote, Groups) ->
                 MRef0
         end,
     %% sync RemoteMap and transform ETS table
-    _ = sync_groups(Scope, ScopeMon, RemoteGroups, Groups),
+    _ = sync_groups(Scope, ScopeMon, MG, RemoteGroups, Groups),
     Remote#{Peer => {MRef, maps:from_list(Groups)}}.
 
-sync_groups(Scope, ScopeMon, RemoteGroups, []) ->
+sync_groups(Scope, ScopeMon, MG, RemoteGroups, []) ->
     %% leave all missing groups
-    [leave_remote_update_ets(Scope, ScopeMon, Pids, [Group]) || {Group, Pids} <- maps:to_list(RemoteGroups)];
-sync_groups(Scope, ScopeMon, RemoteGroups, [{Group, Pids} | Tail]) ->
+    [leave_remote_update_ets(Scope, ScopeMon, MG, Pids, [Group]) || {Group, Pids} <- maps:to_list(RemoteGroups)];
+sync_groups(Scope, ScopeMon, MG, RemoteGroups, [{Group, Pids} | Tail]) ->
     case maps:take(Group, RemoteGroups) of
         {Pids, NewRemoteGroups} ->
-            sync_groups(Scope, ScopeMon, NewRemoteGroups, Tail);
+            sync_groups(Scope, ScopeMon, MG, NewRemoteGroups, Tail);
         {OldPids, NewRemoteGroups} ->
             [{Group, AllOldPids, LocalPids}] = ets:lookup(Scope, Group),
             %% should be really rare...
             AllNewPids = Pids ++ AllOldPids -- OldPids,
             true = ets:insert(Scope, {Group, AllNewPids, LocalPids}),
-            sync_groups(Scope, ScopeMon, NewRemoteGroups, Tail);
+            sync_groups(Scope, ScopeMon, MG, NewRemoteGroups, Tail);
         error ->
-            join_remote_update_ets(Scope, ScopeMon, Group, Pids),
-            sync_groups(Scope, ScopeMon, RemoteGroups, Tail)
+            join_remote_update_ets(Scope, ScopeMon, MG, Group, Pids),
+            sync_groups(Scope, ScopeMon, MG, RemoteGroups, Tail)
     end.
 
 join_local(Pid, Group, Local) when is_pid(Pid) ->
@@ -465,39 +515,39 @@ join_local([], _Group, Local) ->
 join_local([Pid | Tail], Group, Local) ->
     join_local(Tail, Group, join_local(Pid, Group, Local)).
 
-join_local_update_ets(Scope, ScopeMon, Group, Pid) when is_pid(Pid) ->
+join_local_update_ets(Scope, ScopeMon, MG, Group, Pid) when is_pid(Pid) ->
     case ets:lookup(Scope, Group) of
         [{Group, All, Local}] ->
             ets:insert(Scope, {Group, [Pid | All], [Pid | Local]});
         [] ->
             ets:insert(Scope, {Group, [Pid], [Pid]})
     end,
-    notify_group(ScopeMon, join, Group, [Pid]);
-join_local_update_ets(Scope, ScopeMon, Group, Pids) ->
+    notify_group(ScopeMon, MG, join, Group, [Pid]);
+join_local_update_ets(Scope, ScopeMon, MG, Group, Pids) ->
     case ets:lookup(Scope, Group) of
         [{Group, All, Local}] ->
             ets:insert(Scope, {Group, Pids ++ All, Pids ++ Local});
         [] ->
             ets:insert(Scope, {Group, Pids, Pids})
     end,
-    notify_group(ScopeMon, join, Group, Pids).
+    notify_group(ScopeMon, MG, join, Group, Pids).
 
-join_remote_update_ets(Scope, ScopeMon, Group, Pid) when is_pid(Pid) ->
+join_remote_update_ets(Scope, ScopeMon, MG, Group, Pid) when is_pid(Pid) ->
     case ets:lookup(Scope, Group) of
         [{Group, All, Local}] ->
             ets:insert(Scope, {Group, [Pid | All], Local});
         [] ->
             ets:insert(Scope, {Group, [Pid], []})
     end,
-    notify_group(ScopeMon, join, Group, [Pid]);
-join_remote_update_ets(Scope, ScopeMon, Group, Pids) ->
+    notify_group(ScopeMon, MG, join, Group, [Pid]);
+join_remote_update_ets(Scope, ScopeMon, MG, Group, Pids) ->
     case ets:lookup(Scope, Group) of
         [{Group, All, Local}] ->
             ets:insert(Scope, {Group, Pids ++ All, Local});
         [] ->
             ets:insert(Scope, {Group, Pids, []})
     end,
-    notify_group(ScopeMon, join, Group, Pids).
+    notify_group(ScopeMon, MG, join, Group, Pids).
 
 join_remote(Group, Pid, RemoteGroups) when is_pid(Pid) ->
     maps:update_with(Group, fun (List) -> [Pid | List] end, [Pid], RemoteGroups);
@@ -524,19 +574,19 @@ leave_local([], _Group, Local) ->
 leave_local([Pid | Tail], Group, Local) ->
     leave_local(Tail, Group, leave_local(Pid, Group, Local)).
 
-leave_local_update_ets(Scope, ScopeMon, Group, Pid) when is_pid(Pid) ->
+leave_local_update_ets(Scope, ScopeMon, MG, Group, Pid) when is_pid(Pid) ->
     case ets:lookup(Scope, Group) of
         [{Group, [Pid], [Pid]}] ->
             ets:delete(Scope, Group),
-            notify_group(ScopeMon, leave, Group, [Pid]);
+            notify_group(ScopeMon, MG, leave, Group, [Pid]);
         [{Group, All, Local}] ->
             ets:insert(Scope, {Group, lists:delete(Pid, All), lists:delete(Pid, Local)}),
-            notify_group(ScopeMon, leave, Group, [Pid]);
+            notify_group(ScopeMon, MG, leave, Group, [Pid]);
         [] ->
             %% rare race condition when 'DOWN' from monitor stays in msg queue while process is leave-ing.
             true
     end;
-leave_local_update_ets(Scope, ScopeMon, Group, Pids) ->
+leave_local_update_ets(Scope, ScopeMon, MG, Group, Pids) ->
     case ets:lookup(Scope, Group) of
         [{Group, All, Local}] ->
             case All -- Pids of
@@ -545,25 +595,25 @@ leave_local_update_ets(Scope, ScopeMon, Group, Pids) ->
                 NewAll ->
                     ets:insert(Scope, {Group, NewAll, Local -- Pids})
             end,
-            notify_group(ScopeMon, leave, Group, Pids);
+            notify_group(ScopeMon, MG, leave, Group, Pids);
         [] ->
             true
     end.
 
-leave_remote_update_ets(Scope, ScopeMon, Pid, Groups) when is_pid(Pid) ->
+leave_remote_update_ets(Scope, ScopeMon, MG, Pid, Groups) when is_pid(Pid) ->
     _ = [
         case ets:lookup(Scope, Group) of
             [{Group, [Pid], []}] ->
                 ets:delete(Scope, Group),
-                notify_group(ScopeMon, leave, Group, [Pid]);
+                notify_group(ScopeMon, MG, leave, Group, [Pid]);
             [{Group, All, Local}] ->
                 ets:insert(Scope, {Group, lists:delete(Pid, All), Local}),
-                notify_group(ScopeMon, leave, Group, [Pid]);
+                notify_group(ScopeMon, MG, leave, Group, [Pid]);
             [] ->
                 true
         end ||
         Group <- Groups];
-leave_remote_update_ets(Scope, ScopeMon, Pids, Groups) ->
+leave_remote_update_ets(Scope, ScopeMon, MG, Pids, Groups) ->
     _ = [
         case ets:lookup(Scope, Group) of
             [{Group, All, Local}] ->
@@ -573,7 +623,7 @@ leave_remote_update_ets(Scope, ScopeMon, Pids, Groups) ->
                     NewAll ->
                         ets:insert(Scope, {Group, NewAll, Local})
                 end,
-                notify_group(ScopeMon, leave, Group, Pids);
+                notify_group(ScopeMon, MG, leave, Group, Pids);
             [] ->
                 true
         end ||
@@ -611,25 +661,44 @@ broadcast([Dest | Tail], Msg) ->
     erlang:send(Dest, Msg, [noconnect]),
     broadcast(Tail, Msg).
 
-
 %% drops a monitor if DOWN was received
-maybe_drop_monitor(MRef, #state{scope_monitors = ScopeMon} = State) ->
+maybe_drop_monitor(MRef, #state{scope_monitors = ScopeMon, group_monitors = GMs, monitored_groups = MG} = State) ->
     %% could be a local monitor going DOWN. Since it's a rare event, check should
     %%  not stay in front of any other, more frequent events
     case maps:take(MRef, ScopeMon) of
         error ->
-            %% this can only happen when leave request and 'DOWN' are in pg queue
-            {noreply, State};
+            case maps:take(MRef, GMs) of
+                error ->
+                    State;
+                {{Pid, Group}, NewGM} ->
+                    %% clean up the inverse map
+                    State#state{group_monitors = NewGM, monitored_groups = demonitor_group({Pid, MRef}, Group, MG)}
+            end;
         {_Pid, NewScopeMon} ->
-            {noreply, State#state{scope_monitors = NewScopeMon}}
+            State#state{scope_monitors = NewScopeMon}
+    end.
+
+demonitor_group(Tag, Group, MG) ->
+    case maps:find(Group, MG) of
+        {ok, [Tag]} ->
+            maps:remove(Group, MG);
+        {ok, Tags} ->
+            maps:put(Group, Tags -- [Tag], MG)
     end.
 
-%% notify all scope monitors about an Action in Groups for Pids
-notify_group(ScopeMon, Action, Group, Pids) ->
+%% notify all monitors about an Action in Groups for Pids
+notify_group(ScopeMonitors, MG, Action, Group, Pids) ->
     maps:foreach(
         fun (Ref, Pid) ->
             erlang:send(Pid, {Ref, Action, Group, Pids}, [noconnect])
-        end, ScopeMon).
+        end, ScopeMonitors),
+    case maps:find(Group, MG) of
+        error ->
+            ok;
+        {ok, Monitors} ->
+            [erlang:send(Pid, {Ref, Action, Group, Pids}, [noconnect]) || {Pid, Ref} <- Monitors],
+            ok
+    end.
 
 %% remove all messages that were send to monitor groups
 flush(Ref) ->
diff --git a/lib/kernel/test/pg_SUITE.erl b/lib/kernel/test/pg_SUITE.erl
index 5d56b751e0..130b54a5f8 100644
--- a/lib/kernel/test/pg_SUITE.erl
+++ b/lib/kernel/test/pg_SUITE.erl
@@ -55,7 +55,8 @@
     disconnected_start/1,
     forced_sync/0, forced_sync/1,
     group_leave/1,
-    monitor_scope/0, monitor_scope/1
+    monitor_scope/0, monitor_scope/1,
+    monitor/1
 ]).
 
 -export([
@@ -82,7 +83,7 @@ groups() ->
         {cluster, [parallel], [process_owner_check, two, initial, netsplit, trisplit, foursplit,
             exchange, nolocal, double, scope_restart, missing_scope_join, empty_group_by_remote_leave,
             disconnected_start, forced_sync, group_leave]},
-        {monitor, [parallel], [monitor_scope]}
+        {monitor, [parallel], [monitor_scope, monitor]}
     ].
 
 %%--------------------------------------------------------------------
@@ -111,7 +112,7 @@ errors(_Config) ->
     ?assertException(error, badarg, pg:handle_cast(garbage, garbage)),
     %% kill with call
     {ok, _Pid} = pg:start(second),
-    ?assertException(exit, {{badarg, _}, _}, gen_server:call(second, garbage, 100)).
+    ?assertException(exit, {{badarg, _}, _}, gen_server:call(second, garbage)).
 
 leave_exit_race() ->
     [{doc, "Tests that pg correctly handles situation when leave and 'DOWN' messages are both in pg queue"}].
@@ -228,7 +229,9 @@ two(Config) when is_list(Config) ->
     Pid = erlang:spawn(forever()),
     ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, Pid)),
     ?assertEqual([Pid], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
-    %% first RPC must be serialised
+    %% first RPC must be serialised 3 times
+    sync({?FUNCTION_NAME, TwoPeer}),
+    sync(?FUNCTION_NAME),
     sync({?FUNCTION_NAME, TwoPeer}),
     ?assertEqual([Pid], rpc:call(TwoPeer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])),
     ?assertEqual([], rpc:call(TwoPeer, pg, get_local_members, [?FUNCTION_NAME, ?FUNCTION_NAME])),
@@ -243,7 +246,7 @@ two(Config) when is_list(Config) ->
     ?assertEqual(ok, rpc:call(TwoPeer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pid2])),
     ?assertEqual(ok, rpc:call(TwoPeer, pg, join, [?FUNCTION_NAME, ?FUNCTION_NAME, Pid3])),
     %% serialise through the *other* node
-    sync({?FUNCTION_NAME, TwoPeer}),
+    sync_via({?FUNCTION_NAME, TwoPeer}, ?FUNCTION_NAME),
     ?assertEqual(lists:sort([Pid2, Pid3]),
         lists:sort(pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME))),
     %% stop the peer
@@ -311,7 +314,11 @@ initial(Config) when is_list(Config) ->
     ?assertEqual(ok, pg:join(?FUNCTION_NAME, ?FUNCTION_NAME, Pid)),
     ?assertEqual([Pid], pg:get_local_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
     {Peer, Socket} = spawn_node(?FUNCTION_NAME, ?FUNCTION_NAME),
-    %% first RPC must be serialised
+    %% first sync makes the peer node to process 'nodeup' (and send discover)
+    sync({?FUNCTION_NAME, Peer}),
+    %% second sync makes origin node pg to reply to discover'
+    sync(?FUNCTION_NAME),
+    %% third sync makes peer node to finish processing 'exchange'
     sync({?FUNCTION_NAME, Peer}),
     ?assertEqual([Pid], rpc:call(Peer, pg, get_members, [?FUNCTION_NAME, ?FUNCTION_NAME])),
 
@@ -564,24 +571,44 @@ monitor_scope() ->
     [{doc, "Tests monitor_scope/1 and demonitor/2"}].
 
 monitor_scope(Config) when is_list(Config) ->
-    Self = self(),
-    Scope = ?FUNCTION_NAME,
-    Group = ?FUNCTION_ARITY,
     %% ensure that demonitoring returns 'false' when monitor is not installed
-    ?assertEqual(false, pg:demonitor(Scope, erlang:make_ref())),
-    %% start the actual test case
-    {Ref, #{}} = pg:monitor_scope(Scope),
+    ?assertEqual(false, pg:demonitor(?FUNCTION_NAME, erlang:make_ref())),
+    InitialMonitor = fun (Scope) -> {Ref, #{}} = pg:monitor_scope(Scope), Ref end,
+    SecondMonitor = fun (Scope, Group, Control) -> {Ref, #{Group := [Control]}} = pg:monitor_scope(Scope), Ref end,
+    %% WHITE BOX: knowing pg state internals - only the original monitor should stay
+    DownMonitor = fun (Scope, Ref, Self) ->
+        {state, _, _, _, ScopeMonitors, _, _} = sys:get_state(Scope),
+        ?assertEqual(#{Ref => Self}, ScopeMonitors, "pg did not remove DOWNed scope monitor")
+                  end,
+    monitor_test_impl(?FUNCTION_NAME, ?FUNCTION_ARITY, InitialMonitor, SecondMonitor, DownMonitor).
+
+monitor(Config) when is_list(Config) ->
+    ExpectedGroup = {?FUNCTION_NAME, ?FUNCTION_ARITY},
+    InitialMonitor = fun (Scope) -> {Ref, []} = pg:monitor(Scope, ExpectedGroup), Ref end,
+    SecondMonitor = fun (Scope, Group, Control) ->
+        {Ref, [Control]} = pg:monitor(Scope, Group), Ref end,
+    DownMonitor = fun (Scope, Ref, Self) ->
+        {state, _, _, _, _, GM, MG} = sys:get_state(Scope),
+        ?assertEqual(#{Ref => {Self, ExpectedGroup}}, GM, "pg did not remove DOWNed group monitor"),
+        ?assertEqual(#{ExpectedGroup => [{Self, Ref}]}, MG, "pg did not remove DOWNed group")
+                  end,
+    monitor_test_impl(?FUNCTION_NAME, ExpectedGroup, InitialMonitor, SecondMonitor, DownMonitor).
+
+monitor_test_impl(Scope, Group, InitialMonitor, SecondMonitor, DownMonitor) ->
+    Self = self(),
+    Ref = InitialMonitor(Scope),
     %% local join
     ?assertEqual(ok, pg:join(Scope, Group, Self)),
     wait_message(Ref, join, Group, [Self], "Local"),
     %% start second monitor (which has 1 local pid at the start)
-    SecondMonitor = spawn_link(fun() -> second_monitor(Scope, Group, Self) end),
-    Ref2 = receive {second, SecondRef} -> SecondRef end,
+    ExtraMonitor = spawn_link(fun() -> second_monitor(Scope, Group, Self, SecondMonitor) end),
+    Ref2 = receive {ExtraMonitor, SecondRef} -> SecondRef end,
     %% start a remote node, and a remote monitor
     {Peer, Node} = spawn_node(Scope),
     ScopePid = whereis(Scope),
     %% do not care about the remote monitor, it is started only to check DOWN handling
-    _ThirdMonitor = spawn(Node, fun() -> second_monitor(ScopePid, Group, Self) end),
+    ThirdMonitor = spawn_link(Node, fun() -> second_monitor(ScopePid, Group, Self, SecondMonitor) end),
+    Ref3 = receive {ThirdMonitor, ThirdRef} -> ThirdRef end,
     %% remote join
     RemotePid = erlang:spawn(Node, forever()),
     ?assertEqual(ok, rpc:call(Node, pg, join, [Scope, Group, [RemotePid, RemotePid]])),
@@ -592,20 +619,27 @@ monitor_scope(Config) when is_list(Config) ->
     wait_message(Ref, leave, Group, [Self], "Local"),
     %% remote leave
     ?assertEqual(ok, rpc:call(Node, pg, leave, [Scope, Group, RemotePid])),
+    %% flush the local pg scope via remote pg (to ensure local pg finished sending notifications)
+    sync_via({?FUNCTION_NAME, Node}, ?FUNCTION_NAME),
     wait_message(Ref, leave, Group, [RemotePid], "Remote"),
-    %% drop the SecondMonitor - this keeps original and remote monitors
-    SecondMonMsgs = gen_server:call(SecondMonitor, flush),
+    %% drop the ExtraMonitor - this keeps original and remote monitors
+    SecondMonMsgs = gen_server:call(ExtraMonitor, flush),
     %% inspect the queue, it should contain double remote join, then single local and single remove leave
-    ?assertEqual([
+    ExpectedLocalMessages = [
         {Ref2, join, Group, [RemotePid, RemotePid]},
         {Ref2, leave, Group, [Self]},
         {Ref2, leave, Group, [RemotePid]}],
-        SecondMonMsgs),
+    ?assertEqual(ExpectedLocalMessages, SecondMonMsgs, "Local monitor failed"),
+    %% inspect remote monitor queue
+    ThirdMonMsgs = gen_server:call(ThirdMonitor, flush),
+    ExpectedRemoteMessages = [
+        {Ref3, join, Group, [RemotePid, RemotePid]},
+        {Ref3, leave, Group, [Self]},
+        {Ref3, leave, Group, [RemotePid]}],
+    ?assertEqual(ExpectedRemoteMessages, ThirdMonMsgs, "Remote monitor failed"),
     %% remote leave via stop (causes remote monitor to go DOWN)
     wait_message(Ref, leave, Group, [RemotePid], "Remote stop"),
-    %% WHITE BOX: knowing pg state internals - only the original monitor should stay
-    {state, _, _, _, InternalMonitors} = sys:get_state(?FUNCTION_NAME),
-    ?assertEqual(#{Ref => Self}, InternalMonitors, "pg did not remove DOWNed monitor"),
+    DownMonitor(Scope, Ref, Self),
     %% demonitor
     ?assertEqual(ok, pg:demonitor(Scope, Ref)),
     ?assertEqual(false, pg:demonitor(Scope, Ref)),
@@ -615,7 +649,7 @@ monitor_scope(Config) when is_list(Config) ->
     sync(Scope),
     %% join should not be here
     receive {Ref, Action, Group, [Self]} -> ?assert(false, lists:concat(["Unexpected ", Action, "event"]))
-        after 0 -> ok end.
+    after 0 -> ok end.
 
 wait_message(Ref, Action, Group, Pids, Msg) ->
     receive
@@ -624,12 +658,12 @@ wait_message(Ref, Action, Group, Pids, Msg) ->
     after 1000 ->
         {messages, Msgs} = process_info(self(), messages),
         ct:pal("Message queue: ~0p", [Msgs]),
-        ?assert(false, Msg ++ " " ++ atom_to_list(Action) ++ " event failed to occur")
+        ?assert(false, lists:flatten(io_lib:format("Expected ~s ~s for ~p", [Msg, Action, Group])))
     end.
 
-second_monitor(Scope, Group, Control) ->
-    {Ref, #{Group := [Control]}} = pg:monitor_scope(Scope),
-    Control ! {second, Ref},
+second_monitor(Scope, Group, Control, SecondMonitor) ->
+    Ref = SecondMonitor(Scope, Group, Control),
+    Control ! {self(), Ref},
     second_monitor([]).
 
 second_monitor(Msgs) ->
@@ -643,9 +677,16 @@ second_monitor(Msgs) ->
 %%--------------------------------------------------------------------
 %% Test Helpers - start/stop additional Erlang nodes
 
+%% flushes GS (GenServer) queue, ensuring that all prior
+%%  messages have been processed
 sync(GS) ->
     _ = sys:log(GS, get).
 
+%% flushes GS queue from the point of view of a registered process RegName
+%%  running on the Node.
+sync_via({RegName, Node}, GS) ->
+    rpc:call(Node, sys, replace_state, [RegName, fun (S) -> (catch sys:get_state(GS)), S end]).
+
 ensure_peers_info(Scope, Peers) ->
     %% Ensures that pg server on local node has gotten info from
     %% pg servers on all Peer nodes passed as argument (assuming
@@ -659,7 +700,7 @@ ensure_peers_info(Scope, Nodes) ->
     %%
 
     sync(Scope),
-    %% Known: nodup handled and discover sent to Peer
+    %% Known: nodeup handled and discover sent to Peer
 
     lists:foreach(fun (Peer) -> sync({Scope, Peer}) end, Peers),
     %% Known: nodeup handled by Peers and discover sent to local
-- 
2.35.3

openSUSE Build Service is sponsored by