File 3682-pg-Implement-group-membership-monitoring-for-the-sco.patch of Package erlang

From fadfcff7ac17538db6100350585aac6407d70305 Mon Sep 17 00:00:00 2001
From: Maxim Fedorov <maximfca@gmail.com>
Date: Tue, 7 Jun 2022 13:47:52 -0700
Subject: [PATCH 2/4] [pg] Implement group membership monitoring for the scope

This commit adds monitor_scope/0,1 that implements group
membership monitoring. Caller gets the current map of groups
to processes, and atomically subscribes to all changes to
this map.
---
 lib/kernel/doc/src/pg.xml    |  29 +++++-
 lib/kernel/src/pg.erl        | 194 ++++++++++++++++++++++++++---------
 lib/kernel/test/pg_SUITE.erl |  96 +++++++++++++++--
 3 files changed, 262 insertions(+), 57 deletions(-)

diff --git a/lib/kernel/doc/src/pg.xml b/lib/kernel/doc/src/pg.xml
index 9b6d20f818..85b1529f44 100644
--- a/lib/kernel/doc/src/pg.xml
+++ b/lib/kernel/doc/src/pg.xml
@@ -6,7 +6,7 @@
 <erlref>
   <header>
     <copyright>
-      <year>2020</year><year>2020</year>
+      <year>2020</year><year>2022</year>
       <holder>Maxim Fedorov, WhatsApp Inc.</holder>
     </copyright>
     <legalnotice>
@@ -149,6 +149,33 @@
       </desc>
     </func>
 
+    <func>
+      <name name="monitor_scope" arity="0" since="OTP 25.1"/>
+      <name name="monitor_scope" arity="1" since="OTP 25.1"/>
+      <fsummary>Starts group membership monitoring for a scope.</fsummary>
+      <desc>
+        <p>Subscribes the caller to updates from the specified scope. Returns
+        content of the entire scope and a reference to match the upcoming
+        notifications.</p>
+
+        <p>Whenever any group membership changes, an update message is sent
+          to the subscriber:</p>
+          <code type="none">{Ref, join, Group, [JoinPid1, JoinPid2]}</code>
+          <code type="none">{Ref, leave, Group, [LeavePid1]}</code>
+      </desc>
+    </func>
+
+    <func>
+      <name name="demonitor" arity="1" since="OTP 25.1"/>
+      <name name="demonitor" arity="2" since="OTP 25.1"/>
+      <fsummary>Stops group membership monitoring.</fsummary>
+      <desc>
+        <p>Unsubscribes the caller from updates off the specified scope.
+        Flushes all outstanding updates that were already in the message
+        queue of the calling process.</p>
+      </desc>
+    </func>
+
     <func>
       <name name="get_local_members" arity="1" since="OTP 23.0"/>
       <name name="get_local_members" arity="2" since="OTP 23.0"/>
diff --git a/lib/kernel/src/pg.erl b/lib/kernel/src/pg.erl
index 8d0f6124f1..845021c7fa 100644
--- a/lib/kernel/src/pg.erl
+++ b/lib/kernel/src/pg.erl
@@ -60,6 +60,10 @@
 
     join/2,
     leave/2,
+    monitor_scope/0,
+    monitor_scope/1,
+    demonitor/1,
+    demonitor/2,
     get_members/1,
     get_local_members/1,
     which_groups/0,
@@ -142,6 +146,32 @@ leave(Scope, Group, PidOrPids) when is_pid(PidOrPids); is_list(PidOrPids) ->
     ok = ensure_local(PidOrPids),
     gen_server:call(Scope, {leave_local, Group, PidOrPids}, infinity).
 
+%%--------------------------------------------------------------------
+%% @doc
+%% Returns currently known groups, and begins monitoring
+%%  all group changes. Calling process will receive {Ref, join, Group, Pids}
+%%  message when new Pids join the Group, and {Ref, leave, Group, Pids} when
+%%  Pids leave the group.
+-spec monitor_scope() -> {reference(), #{group() => [pid()]}}.
+monitor_scope() ->
+    monitor_scope(?DEFAULT_SCOPE).
+
+-spec monitor_scope(Scope :: atom()) -> {reference(), #{group() => [pid()]}}.
+monitor_scope(Scope) ->
+    gen_server:call(Scope, monitor, infinity).
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Stops monitoring Scope for groups changes. Flushes all
+%%  {Ref, join|leave, Group, Pids} messages from the calling process queue.
+-spec demonitor(Ref :: reference()) -> ok | false.
+demonitor(Ref) ->
+    pg:demonitor(?DEFAULT_SCOPE, Ref).
+
+-spec demonitor(Scope :: atom(), Ref :: reference()) -> ok | false.
+demonitor(Scope, Ref) ->
+    gen_server:call(Scope, {demonitor, Ref}, infinity) =:= ok andalso flush(Ref).
+
 %%--------------------------------------------------------------------
 %% @doc
 %% Returns all processes in a group
@@ -206,7 +236,9 @@ which_local_groups(Scope) when is_atom(Scope) ->
     %% monitored local processes and groups they joined
     local = #{} :: #{pid() => {MRef :: reference(), Groups :: [group()]}},
     %% remote node: scope process monitor and map of groups to pids for fast sync routine
-    remote = #{} :: #{pid() => {reference(), #{group() => [pid()]}}}
+    remote = #{} :: #{pid() => {reference(), #{group() => [pid()]}}},
+    %% processes monitoring group membership
+    scope_monitors = #{} :: #{reference() => pid()}
 }).
 
 -type state() :: #state{}.
@@ -220,26 +252,46 @@ init([Scope]) ->
     {ok, #state{scope = Scope}}.
 
 -spec handle_call(Call :: {join_local, Group :: group(), Pid :: pid()}
-                        | {leave_local, Group :: group(), Pid :: pid()},
+                        | {leave_local, Group :: group(), Pid :: pid()}
+                        | monitor
+                        | {demonitor, Ref :: reference()},
                   From :: {pid(), Tag :: any()},
-                  State :: state()) -> {reply, ok | not_joined, state()}.
+                  State :: state()) ->
+    {reply, ok | not_joined | {reference(), #{group() => [pid()]}} | false, state()}.
 
-handle_call({join_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local, remote = Remote} = State) ->
+handle_call({join_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local,
+    remote = Remote, scope_monitors = ScopeMon} = State) ->
     NewLocal = join_local(PidOrPids, Group, Local),
-    join_local_update_ets(Scope, Group, PidOrPids),
+    join_local_update_ets(Scope, ScopeMon, Group, PidOrPids),
     broadcast(maps:keys(Remote), {join, self(), Group, PidOrPids}),
     {reply, ok, State#state{local = NewLocal}};
 
-handle_call({leave_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local, remote = Remote} = State) ->
+handle_call({leave_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local,
+    remote = Remote, scope_monitors = ScopeMon} = State) ->
     case leave_local(PidOrPids, Group, Local) of
         Local ->
             {reply, not_joined, State};
         NewLocal ->
-            leave_local_update_ets(Scope, Group, PidOrPids),
+            leave_local_update_ets(Scope, ScopeMon, Group, PidOrPids),
             broadcast(maps:keys(Remote), {leave, self(), PidOrPids, [Group]}),
             {reply, ok, State#state{local = NewLocal}}
     end;
 
+handle_call(monitor, {Pid, _Tag}, #state{scope = Scope, scope_monitors = ScopeMon} = State) ->
+    %% next line could also be done with iterating over process state, but it appears to be slower
+    Local = maps:from_list([{G,P} || [G,P] <- ets:match(Scope, {'$1', '_', '$2'})]),
+    MRef = erlang:monitor(process, Pid), %% monitor the monitor, to discard it upon termination, and generate MRef
+    {reply, {MRef, Local}, State#state{scope_monitors = ScopeMon#{MRef => Pid}}};
+
+handle_call({demonitor, Ref}, _From, #state{scope_monitors = ScopeMon} = State) ->
+    case maps:take(Ref, ScopeMon) of
+        {_, NewMons} ->
+            erlang:demonitor(Ref),
+            {reply, ok, State#state{scope_monitors = NewMons}};
+        error ->
+            {reply, false, State}
+    end;
+
 handle_call(_Request, _From, _S) ->
     erlang:error(badarg).
 
@@ -247,8 +299,8 @@ handle_call(_Request, _From, _S) ->
     {sync, Peer :: pid(), Groups :: [{group(), [pid()]}]},
     State :: state()) -> {noreply, state()}.
 
-handle_cast({sync, Peer, Groups}, #state{scope = Scope, remote = Remote} = State) ->
-    {noreply, State#state{remote = handle_sync(Scope, Peer, Remote, Groups)}};
+handle_cast({sync, Peer, Groups}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon} = State) ->
+    {noreply, State#state{remote = handle_sync(Scope, ScopeMon, Peer, Remote, Groups)}};
 
 handle_cast(_, _State) ->
     erlang:error(badarg).
@@ -261,10 +313,10 @@ handle_cast(_, _State) ->
     {nodedown, node()} | {nodeup, node()}, State :: state()) -> {noreply, state()}.
 
 %% remote pid or several pids joining the group
-handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, remote = Remote} = State) ->
+handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon} = State) ->
     case maps:get(Peer, Remote, []) of
         {MRef, RemoteGroups} ->
-            join_remote_update_ets(Scope, Group, PidOrPids),
+            join_remote_update_ets(Scope, ScopeMon, Group, PidOrPids),
             %% store remote group => pids map for fast sync operation
             NewRemoteGroups = join_remote(Group, PidOrPids, RemoteGroups),
             {noreply, State#state{remote = Remote#{Peer => {MRef, NewRemoteGroups}}}};
@@ -277,10 +329,10 @@ handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, remote = Remot
     end;
 
 %% remote pid leaving (multiple groups at once)
-handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, remote = Remote} = State) ->
+handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon} = State) ->
     case maps:get(Peer, Remote, []) of
         {MRef, RemoteMap} ->
-            _ = leave_remote_update_ets(Scope, PidOrPids, Groups),
+            _ = leave_remote_update_ets(Scope, ScopeMon, PidOrPids, Groups),
             NewRemoteMap = leave_remote(PidOrPids, RemoteMap, Groups),
             {noreply, State#state{remote = Remote#{Peer => {MRef, NewRemoteMap}}}};
         [] ->
@@ -307,23 +359,28 @@ handle_info({discover, Peer}, #state{remote = Remote, local = Local} = State) ->
     end;
 
 %% handle local process exit
-handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, local = Local, remote = Remote} = State) when node(Pid) =:= node() ->
+handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, local = Local, remote = Remote,
+    scope_monitors = ScopeMon} = State) when node(Pid) =:= node() ->
     case maps:take(Pid, Local) of
         error ->
-            %% this can only happen when leave request and 'DOWN' are in pg queue
-            {noreply, State};
+            maybe_drop_monitor(MRef, State);
         {{MRef, Groups}, NewLocal} ->
-            [leave_local_update_ets(Scope, Group, Pid) || Group <- Groups],
+            [leave_local_update_ets(Scope, ScopeMon, Group, Pid) || Group <- Groups],
             %% send update to all scope processes on remote nodes
             broadcast(maps:keys(Remote), {leave, self(), Pid, Groups}),
             {noreply, State#state{local = NewLocal}}
     end;
 
-%% handle remote node down or leaving overlay network
-handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, remote = Remote} = State)  ->
-    {{MRef, RemoteMap}, NewRemote} = maps:take(Pid, Remote),
-    maps:foreach(fun (Group, Pids) -> leave_remote_update_ets(Scope, Pids, [Group]) end, RemoteMap),
-    {noreply, State#state{remote = NewRemote}};
+%% handle remote node down or scope leaving overlay network, or a monitor from the remote node went down
+handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, remote = Remote,
+    scope_monitors = ScopeMon} = State)  ->
+    case maps:take(Pid, Remote) of
+        {{MRef, RemoteMap}, NewRemote} ->
+            maps:foreach(fun (Group, Pids) -> leave_remote_update_ets(Scope, ScopeMon, Pids, [Group]) end, RemoteMap),
+            {noreply, State#state{remote = NewRemote}};
+        error ->
+            maybe_drop_monitor(MRef, State)
+    end;
 
 %% nodedown: ignore, and wait for 'DOWN' signal for monitored process
 handle_info({nodedown, _Node}, State) ->
@@ -363,7 +420,7 @@ ensure_local(Bad) ->
 %% Override all knowledge of the remote node with information it sends
 %%  to local node. Current implementation must do the full table scan
 %%  to remove stale pids (just as for 'nodedown').
-handle_sync(Scope, Peer, Remote, Groups) ->
+handle_sync(Scope, ScopeMon, Peer, Remote, Groups) ->
     %% can't use maps:get() because it evaluates 'default' value first,
     %%   and in this case monitor() call has side effect.
     {MRef, RemoteGroups} =
@@ -374,25 +431,25 @@ handle_sync(Scope, Peer, Remote, Groups) ->
                 MRef0
         end,
     %% sync RemoteMap and transform ETS table
-    _ = sync_groups(Scope, RemoteGroups, Groups),
+    _ = sync_groups(Scope, ScopeMon, RemoteGroups, Groups),
     Remote#{Peer => {MRef, maps:from_list(Groups)}}.
 
-sync_groups(Scope, RemoteGroups, []) ->
+sync_groups(Scope, ScopeMon, RemoteGroups, []) ->
     %% leave all missing groups
-    [leave_remote_update_ets(Scope, Pids, [Group]) || {Group, Pids} <- maps:to_list(RemoteGroups)];
-sync_groups(Scope, RemoteGroups, [{Group, Pids} | Tail]) ->
+    [leave_remote_update_ets(Scope, ScopeMon, Pids, [Group]) || {Group, Pids} <- maps:to_list(RemoteGroups)];
+sync_groups(Scope, ScopeMon, RemoteGroups, [{Group, Pids} | Tail]) ->
     case maps:take(Group, RemoteGroups) of
         {Pids, NewRemoteGroups} ->
-            sync_groups(Scope, NewRemoteGroups, Tail);
+            sync_groups(Scope, ScopeMon, NewRemoteGroups, Tail);
         {OldPids, NewRemoteGroups} ->
             [{Group, AllOldPids, LocalPids}] = ets:lookup(Scope, Group),
             %% should be really rare...
             AllNewPids = Pids ++ AllOldPids -- OldPids,
             true = ets:insert(Scope, {Group, AllNewPids, LocalPids}),
-            sync_groups(Scope, NewRemoteGroups, Tail);
+            sync_groups(Scope, ScopeMon, NewRemoteGroups, Tail);
         error ->
-            join_remote_update_ets(Scope, Group, Pids),
-            sync_groups(Scope, RemoteGroups, Tail)
+            join_remote_update_ets(Scope, ScopeMon, Group, Pids),
+            sync_groups(Scope, ScopeMon, RemoteGroups, Tail)
     end.
 
 join_local(Pid, Group, Local) when is_pid(Pid) ->
@@ -408,35 +465,39 @@ join_local([], _Group, Local) ->
 join_local([Pid | Tail], Group, Local) ->
     join_local(Tail, Group, join_local(Pid, Group, Local)).
 
-join_local_update_ets(Scope, Group, Pid) when is_pid(Pid) ->
+join_local_update_ets(Scope, ScopeMon, Group, Pid) when is_pid(Pid) ->
     case ets:lookup(Scope, Group) of
         [{Group, All, Local}] ->
             ets:insert(Scope, {Group, [Pid | All], [Pid | Local]});
         [] ->
             ets:insert(Scope, {Group, [Pid], [Pid]})
-    end;
-join_local_update_ets(Scope, Group, Pids) ->
+    end,
+    notify_group(ScopeMon, join, Group, [Pid]);
+join_local_update_ets(Scope, ScopeMon, Group, Pids) ->
     case ets:lookup(Scope, Group) of
         [{Group, All, Local}] ->
             ets:insert(Scope, {Group, Pids ++ All, Pids ++ Local});
         [] ->
             ets:insert(Scope, {Group, Pids, Pids})
-    end.
+    end,
+    notify_group(ScopeMon, join, Group, Pids).
 
-join_remote_update_ets(Scope, Group, Pid) when is_pid(Pid) ->
+join_remote_update_ets(Scope, ScopeMon, Group, Pid) when is_pid(Pid) ->
     case ets:lookup(Scope, Group) of
         [{Group, All, Local}] ->
             ets:insert(Scope, {Group, [Pid | All], Local});
         [] ->
             ets:insert(Scope, {Group, [Pid], []})
-    end;
-join_remote_update_ets(Scope, Group, Pids) ->
+    end,
+    notify_group(ScopeMon, join, Group, [Pid]);
+join_remote_update_ets(Scope, ScopeMon, Group, Pids) ->
     case ets:lookup(Scope, Group) of
         [{Group, All, Local}] ->
             ets:insert(Scope, {Group, Pids ++ All, Local});
         [] ->
             ets:insert(Scope, {Group, Pids, []})
-    end.
+    end,
+    notify_group(ScopeMon, join, Group, Pids).
 
 join_remote(Group, Pid, RemoteGroups) when is_pid(Pid) ->
     maps:update_with(Group, fun (List) -> [Pid | List] end, [Pid], RemoteGroups);
@@ -463,17 +524,19 @@ leave_local([], _Group, Local) ->
 leave_local([Pid | Tail], Group, Local) ->
     leave_local(Tail, Group, leave_local(Pid, Group, Local)).
 
-leave_local_update_ets(Scope, Group, Pid) when is_pid(Pid) ->
+leave_local_update_ets(Scope, ScopeMon, Group, Pid) when is_pid(Pid) ->
     case ets:lookup(Scope, Group) of
         [{Group, [Pid], [Pid]}] ->
-            ets:delete(Scope, Group);
+            ets:delete(Scope, Group),
+            notify_group(ScopeMon, leave, Group, [Pid]);
         [{Group, All, Local}] ->
-            ets:insert(Scope, {Group, lists:delete(Pid, All), lists:delete(Pid, Local)});
+            ets:insert(Scope, {Group, lists:delete(Pid, All), lists:delete(Pid, Local)}),
+            notify_group(ScopeMon, leave, Group, [Pid]);
         [] ->
             %% rare race condition when 'DOWN' from monitor stays in msg queue while process is leave-ing.
             true
     end;
-leave_local_update_ets(Scope, Group, Pids) ->
+leave_local_update_ets(Scope, ScopeMon, Group, Pids) ->
     case ets:lookup(Scope, Group) of
         [{Group, All, Local}] ->
             case All -- Pids of
@@ -481,23 +544,26 @@ leave_local_update_ets(Scope, Group, Pids) ->
                     ets:delete(Scope, Group);
                 NewAll ->
                     ets:insert(Scope, {Group, NewAll, Local -- Pids})
-            end;
+            end,
+            notify_group(ScopeMon, leave, Group, Pids);
         [] ->
             true
     end.
 
-leave_remote_update_ets(Scope, Pid, Groups) when is_pid(Pid) ->
+leave_remote_update_ets(Scope, ScopeMon, Pid, Groups) when is_pid(Pid) ->
     _ = [
         case ets:lookup(Scope, Group) of
             [{Group, [Pid], []}] ->
-                ets:delete(Scope, Group);
+                ets:delete(Scope, Group),
+                notify_group(ScopeMon, leave, Group, [Pid]);
             [{Group, All, Local}] ->
-                ets:insert(Scope, {Group, lists:delete(Pid, All), Local});
+                ets:insert(Scope, {Group, lists:delete(Pid, All), Local}),
+                notify_group(ScopeMon, leave, Group, [Pid]);
             [] ->
                 true
         end ||
         Group <- Groups];
-leave_remote_update_ets(Scope, Pids, Groups) ->
+leave_remote_update_ets(Scope, ScopeMon, Pids, Groups) ->
     _ = [
         case ets:lookup(Scope, Group) of
             [{Group, All, Local}] ->
@@ -506,7 +572,8 @@ leave_remote_update_ets(Scope, Pids, Groups) ->
                         ets:delete(Scope, Group);
                     NewAll ->
                         ets:insert(Scope, {Group, NewAll, Local})
-                end;
+                end,
+                notify_group(ScopeMon, leave, Group, Pids);
             [] ->
                 true
         end ||
@@ -543,3 +610,32 @@ broadcast([Dest | Tail], Msg) ->
     %%   join/leave messages when dist buffer is full
     erlang:send(Dest, Msg, [noconnect]),
     broadcast(Tail, Msg).
+
+
+%% drops a monitor if DOWN was received
+maybe_drop_monitor(MRef, #state{scope_monitors = ScopeMon} = State) ->
+    %% could be a local monitor going DOWN. Since it's a rare event, check should
+    %%  not stay in front of any other, more frequent events
+    case maps:take(MRef, ScopeMon) of
+        error ->
+            %% this can only happen when leave request and 'DOWN' are in pg queue
+            {noreply, State};
+        {_Pid, NewScopeMon} ->
+            {noreply, State#state{scope_monitors = NewScopeMon}}
+    end.
+
+%% notify all scope monitors about an Action in Groups for Pids
+notify_group(ScopeMon, Action, Group, Pids) ->
+    maps:foreach(
+        fun (Ref, Pid) ->
+            erlang:send(Pid, {Ref, Action, Group, Pids}, [noconnect])
+        end, ScopeMon).
+
+%% remove all messages that were send to monitor groups
+flush(Ref) ->
+    receive
+        {Ref, Verb, _Group, _Pids} when Verb =:= join; Verb =:= leave ->
+            flush(Ref)
+    after 0 ->
+        ok
+    end.
diff --git a/lib/kernel/test/pg_SUITE.erl b/lib/kernel/test/pg_SUITE.erl
index a6708dc419..5d56b751e0 100644
--- a/lib/kernel/test/pg_SUITE.erl
+++ b/lib/kernel/test/pg_SUITE.erl
@@ -54,7 +54,8 @@
     missing_scope_join/1,
     disconnected_start/1,
     forced_sync/0, forced_sync/1,
-    group_leave/1
+    group_leave/1,
+    monitor_scope/0, monitor_scope/1
 ]).
 
 -export([
@@ -72,15 +73,16 @@ end_per_testcase(TestCase, _Config) ->
     ok.
 
 all() ->
-    [dyn_distribution, {group, basic}, {group, cluster}, {group, performance}].
+    [dyn_distribution, {group, basic}, {group, cluster}, {group, performance}, {group, monitor}].
 
 groups() ->
     [
         {basic, [parallel], [errors, pg, leave_exit_race, single, overlay_missing]},
-        {performance, [sequential], [thundering_herd]},
+        {performance, [], [thundering_herd]},
         {cluster, [parallel], [process_owner_check, two, initial, netsplit, trisplit, foursplit,
             exchange, nolocal, double, scope_restart, missing_scope_join, empty_group_by_remote_leave,
-            disconnected_start, forced_sync, group_leave]}
+            disconnected_start, forced_sync, group_leave]},
+        {monitor, [parallel], [monitor_scope]}
     ].
 
 %%--------------------------------------------------------------------
@@ -262,13 +264,13 @@ empty_group_by_remote_leave(Config) when is_list(Config) ->
     sync({?FUNCTION_NAME, TwoPeer}),
     ?assertEqual([RemotePid], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
     % inspecting internal state is not best practice, but there's no other way to check if the state is correct.
-    {state, _, _, #{RemoteNode := {_, RemoteMap}}} = sys:get_state(?FUNCTION_NAME),
+    {_, RemoteMap} = maps:get(RemoteNode, element(4, sys:get_state(?FUNCTION_NAME))),
     ?assertEqual(#{?FUNCTION_NAME => [RemotePid]}, RemoteMap),
     % remote leave
     ?assertEqual(ok, rpc:call(TwoPeer, pg, leave, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])),
     sync({?FUNCTION_NAME, TwoPeer}),
     ?assertEqual([], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
-    {state, _, _, #{RemoteNode := {_, NewRemoteMap}}} = sys:get_state(?FUNCTION_NAME),
+    {_, NewRemoteMap} = maps:get(RemoteNode, element(4, sys:get_state(?FUNCTION_NAME))),
     % empty group should be deleted.
     ?assertEqual(#{}, NewRemoteMap),
 
@@ -281,7 +283,7 @@ empty_group_by_remote_leave(Config) when is_list(Config) ->
     ?assertEqual(ok, rpc:call(TwoPeer, pg, leave, [?FUNCTION_NAME, ?FUNCTION_NAME, [RemotePid2, RemotePid]])),
     sync({?FUNCTION_NAME, TwoPeer}),
     ?assertEqual([], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
-    {state, _, _, #{RemoteNode := {_, NewRemoteMap}}} = sys:get_state(?FUNCTION_NAME),
+    {_, NewRemoteMap} = maps:get(RemoteNode, element(4, sys:get_state(?FUNCTION_NAME))),
     stop_node(TwoPeer, Socket),
     ok.
 
@@ -558,6 +560,85 @@ group_leave(Config) when is_list(Config) ->
     ?assertEqual([], pg:get_members(?FUNCTION_NAME, two)),
     ok.
 
+monitor_scope() ->
+    [{doc, "Tests monitor_scope/1 and demonitor/2"}].
+
+monitor_scope(Config) when is_list(Config) ->
+    Self = self(),
+    Scope = ?FUNCTION_NAME,
+    Group = ?FUNCTION_ARITY,
+    %% ensure that demonitoring returns 'false' when monitor is not installed
+    ?assertEqual(false, pg:demonitor(Scope, erlang:make_ref())),
+    %% start the actual test case
+    {Ref, #{}} = pg:monitor_scope(Scope),
+    %% local join
+    ?assertEqual(ok, pg:join(Scope, Group, Self)),
+    wait_message(Ref, join, Group, [Self], "Local"),
+    %% start second monitor (which has 1 local pid at the start)
+    SecondMonitor = spawn_link(fun() -> second_monitor(Scope, Group, Self) end),
+    Ref2 = receive {second, SecondRef} -> SecondRef end,
+    %% start a remote node, and a remote monitor
+    {Peer, Node} = spawn_node(Scope),
+    ScopePid = whereis(Scope),
+    %% do not care about the remote monitor, it is started only to check DOWN handling
+    _ThirdMonitor = spawn(Node, fun() -> second_monitor(ScopePid, Group, Self) end),
+    %% remote join
+    RemotePid = erlang:spawn(Node, forever()),
+    ?assertEqual(ok, rpc:call(Node, pg, join, [Scope, Group, [RemotePid, RemotePid]])),
+    wait_message(Ref, join, Group, [RemotePid, RemotePid], "Remote"),
+    %% verify leave event
+    ?assertEqual([Self], pg:get_local_members(Scope, Group)),
+    ?assertEqual(ok, pg:leave(Scope, Group, self())),
+    wait_message(Ref, leave, Group, [Self], "Local"),
+    %% remote leave
+    ?assertEqual(ok, rpc:call(Node, pg, leave, [Scope, Group, RemotePid])),
+    wait_message(Ref, leave, Group, [RemotePid], "Remote"),
+    %% drop the SecondMonitor - this keeps original and remote monitors
+    SecondMonMsgs = gen_server:call(SecondMonitor, flush),
+    %% inspect the queue, it should contain double remote join, then single local and single remove leave
+    ?assertEqual([
+        {Ref2, join, Group, [RemotePid, RemotePid]},
+        {Ref2, leave, Group, [Self]},
+        {Ref2, leave, Group, [RemotePid]}],
+        SecondMonMsgs),
+    %% remote leave via stop (causes remote monitor to go DOWN)
+    wait_message(Ref, leave, Group, [RemotePid], "Remote stop"),
+    %% WHITE BOX: knowing pg state internals - only the original monitor should stay
+    {state, _, _, _, InternalMonitors} = sys:get_state(?FUNCTION_NAME),
+    ?assertEqual(#{Ref => Self}, InternalMonitors, "pg did not remove DOWNed monitor"),
+    %% demonitor
+    ?assertEqual(ok, pg:demonitor(Scope, Ref)),
+    ?assertEqual(false, pg:demonitor(Scope, Ref)),
+    %% ensure messages don't come
+    ?assertEqual(ok, pg:join(Scope, Group, Self)),
+    sync(Scope),
+    %% join should not be here
+    receive {Ref, Action, Group, [Self]} -> ?assert(false, lists:concat(["Unexpected ", Action, "event"]))
+        after 0 -> ok end.
+
+wait_message(Ref, Action, Group, Pids, Msg) ->
+    receive
+        {Ref, Action, Group, Pids} ->
+            ok
+    after 1000 ->
+        {messages, Msgs} = process_info(self(), messages),
+        ct:pal("Message queue: ~0p", [Msgs]),
+        ?assert(false, Msg ++ " " ++ atom_to_list(Action) ++ " event failed to occur")
+    end.
+
+second_monitor(Scope, Group, Control) ->
+    {Ref, #{Group := [Control]}} = pg:monitor_scope(Scope),
+    Control ! {second, Ref},
+    second_monitor([]).
+
+second_monitor(Msgs) ->
+    receive
+        {'$gen_call', Reply, flush} ->
+            gen:reply(Reply, lists:reverse(Msgs));
+        Msg ->
+            second_monitor([Msg | Msgs])
+    end.
+
 %%--------------------------------------------------------------------
 %% Test Helpers - start/stop additional Erlang nodes
 
-- 
2.35.3

openSUSE Build Service is sponsored by