File 3682-pg-Implement-group-membership-monitoring-for-the-sco.patch of Package erlang
From fadfcff7ac17538db6100350585aac6407d70305 Mon Sep 17 00:00:00 2001
From: Maxim Fedorov <maximfca@gmail.com>
Date: Tue, 7 Jun 2022 13:47:52 -0700
Subject: [PATCH 2/4] [pg] Implement group membership monitoring for the scope
This commit adds monitor_scope/0,1 that implements group
membership monitoring. Caller gets the current map of groups
to processes, and atomically subscribes to all changes to
this map.
---
lib/kernel/doc/src/pg.xml | 29 +++++-
lib/kernel/src/pg.erl | 194 ++++++++++++++++++++++++++---------
lib/kernel/test/pg_SUITE.erl | 96 +++++++++++++++--
3 files changed, 262 insertions(+), 57 deletions(-)
diff --git a/lib/kernel/doc/src/pg.xml b/lib/kernel/doc/src/pg.xml
index 9b6d20f818..85b1529f44 100644
--- a/lib/kernel/doc/src/pg.xml
+++ b/lib/kernel/doc/src/pg.xml
@@ -6,7 +6,7 @@
<erlref>
<header>
<copyright>
- <year>2020</year><year>2020</year>
+ <year>2020</year><year>2022</year>
<holder>Maxim Fedorov, WhatsApp Inc.</holder>
</copyright>
<legalnotice>
@@ -149,6 +149,33 @@
</desc>
</func>
+ <func>
+ <name name="monitor_scope" arity="0" since="OTP 25.1"/>
+ <name name="monitor_scope" arity="1" since="OTP 25.1"/>
+ <fsummary>Starts group membership monitoring for a scope.</fsummary>
+ <desc>
+ <p>Subscribes the caller to updates from the specified scope. Returns
+ content of the entire scope and a reference to match the upcoming
+ notifications.</p>
+
+ <p>Whenever any group membership changes, an update message is sent
+ to the subscriber:</p>
+ <code type="none">{Ref, join, Group, [JoinPid1, JoinPid2]}</code>
+ <code type="none">{Ref, leave, Group, [LeavePid1]}</code>
+ </desc>
+ </func>
+
+ <func>
+ <name name="demonitor" arity="1" since="OTP 25.1"/>
+ <name name="demonitor" arity="2" since="OTP 25.1"/>
+ <fsummary>Stops group membership monitoring.</fsummary>
+ <desc>
+ <p>Unsubscribes the caller from updates off the specified scope.
+ Flushes all outstanding updates that were already in the message
+ queue of the calling process.</p>
+ </desc>
+ </func>
+
<func>
<name name="get_local_members" arity="1" since="OTP 23.0"/>
<name name="get_local_members" arity="2" since="OTP 23.0"/>
diff --git a/lib/kernel/src/pg.erl b/lib/kernel/src/pg.erl
index 8d0f6124f1..845021c7fa 100644
--- a/lib/kernel/src/pg.erl
+++ b/lib/kernel/src/pg.erl
@@ -60,6 +60,10 @@
join/2,
leave/2,
+ monitor_scope/0,
+ monitor_scope/1,
+ demonitor/1,
+ demonitor/2,
get_members/1,
get_local_members/1,
which_groups/0,
@@ -142,6 +146,32 @@ leave(Scope, Group, PidOrPids) when is_pid(PidOrPids); is_list(PidOrPids) ->
ok = ensure_local(PidOrPids),
gen_server:call(Scope, {leave_local, Group, PidOrPids}, infinity).
+%%--------------------------------------------------------------------
+%% @doc
+%% Returns currently known groups, and begins monitoring
+%% all group changes. Calling process will receive {Ref, join, Group, Pids}
+%% message when new Pids join the Group, and {Ref, leave, Group, Pids} when
+%% Pids leave the group.
+-spec monitor_scope() -> {reference(), #{group() => [pid()]}}.
+monitor_scope() ->
+ monitor_scope(?DEFAULT_SCOPE).
+
+-spec monitor_scope(Scope :: atom()) -> {reference(), #{group() => [pid()]}}.
+monitor_scope(Scope) ->
+ gen_server:call(Scope, monitor, infinity).
+
+%%--------------------------------------------------------------------
+%% @doc
+%% Stops monitoring Scope for groups changes. Flushes all
+%% {Ref, join|leave, Group, Pids} messages from the calling process queue.
+-spec demonitor(Ref :: reference()) -> ok | false.
+demonitor(Ref) ->
+ pg:demonitor(?DEFAULT_SCOPE, Ref).
+
+-spec demonitor(Scope :: atom(), Ref :: reference()) -> ok | false.
+demonitor(Scope, Ref) ->
+ gen_server:call(Scope, {demonitor, Ref}, infinity) =:= ok andalso flush(Ref).
+
%%--------------------------------------------------------------------
%% @doc
%% Returns all processes in a group
@@ -206,7 +236,9 @@ which_local_groups(Scope) when is_atom(Scope) ->
%% monitored local processes and groups they joined
local = #{} :: #{pid() => {MRef :: reference(), Groups :: [group()]}},
%% remote node: scope process monitor and map of groups to pids for fast sync routine
- remote = #{} :: #{pid() => {reference(), #{group() => [pid()]}}}
+ remote = #{} :: #{pid() => {reference(), #{group() => [pid()]}}},
+ %% processes monitoring group membership
+ scope_monitors = #{} :: #{reference() => pid()}
}).
-type state() :: #state{}.
@@ -220,26 +252,46 @@ init([Scope]) ->
{ok, #state{scope = Scope}}.
-spec handle_call(Call :: {join_local, Group :: group(), Pid :: pid()}
- | {leave_local, Group :: group(), Pid :: pid()},
+ | {leave_local, Group :: group(), Pid :: pid()}
+ | monitor
+ | {demonitor, Ref :: reference()},
From :: {pid(), Tag :: any()},
- State :: state()) -> {reply, ok | not_joined, state()}.
+ State :: state()) ->
+ {reply, ok | not_joined | {reference(), #{group() => [pid()]}} | false, state()}.
-handle_call({join_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local, remote = Remote} = State) ->
+handle_call({join_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local,
+ remote = Remote, scope_monitors = ScopeMon} = State) ->
NewLocal = join_local(PidOrPids, Group, Local),
- join_local_update_ets(Scope, Group, PidOrPids),
+ join_local_update_ets(Scope, ScopeMon, Group, PidOrPids),
broadcast(maps:keys(Remote), {join, self(), Group, PidOrPids}),
{reply, ok, State#state{local = NewLocal}};
-handle_call({leave_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local, remote = Remote} = State) ->
+handle_call({leave_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local,
+ remote = Remote, scope_monitors = ScopeMon} = State) ->
case leave_local(PidOrPids, Group, Local) of
Local ->
{reply, not_joined, State};
NewLocal ->
- leave_local_update_ets(Scope, Group, PidOrPids),
+ leave_local_update_ets(Scope, ScopeMon, Group, PidOrPids),
broadcast(maps:keys(Remote), {leave, self(), PidOrPids, [Group]}),
{reply, ok, State#state{local = NewLocal}}
end;
+handle_call(monitor, {Pid, _Tag}, #state{scope = Scope, scope_monitors = ScopeMon} = State) ->
+ %% next line could also be done with iterating over process state, but it appears to be slower
+ Local = maps:from_list([{G,P} || [G,P] <- ets:match(Scope, {'$1', '_', '$2'})]),
+ MRef = erlang:monitor(process, Pid), %% monitor the monitor, to discard it upon termination, and generate MRef
+ {reply, {MRef, Local}, State#state{scope_monitors = ScopeMon#{MRef => Pid}}};
+
+handle_call({demonitor, Ref}, _From, #state{scope_monitors = ScopeMon} = State) ->
+ case maps:take(Ref, ScopeMon) of
+ {_, NewMons} ->
+ erlang:demonitor(Ref),
+ {reply, ok, State#state{scope_monitors = NewMons}};
+ error ->
+ {reply, false, State}
+ end;
+
handle_call(_Request, _From, _S) ->
erlang:error(badarg).
@@ -247,8 +299,8 @@ handle_call(_Request, _From, _S) ->
{sync, Peer :: pid(), Groups :: [{group(), [pid()]}]},
State :: state()) -> {noreply, state()}.
-handle_cast({sync, Peer, Groups}, #state{scope = Scope, remote = Remote} = State) ->
- {noreply, State#state{remote = handle_sync(Scope, Peer, Remote, Groups)}};
+handle_cast({sync, Peer, Groups}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon} = State) ->
+ {noreply, State#state{remote = handle_sync(Scope, ScopeMon, Peer, Remote, Groups)}};
handle_cast(_, _State) ->
erlang:error(badarg).
@@ -261,10 +313,10 @@ handle_cast(_, _State) ->
{nodedown, node()} | {nodeup, node()}, State :: state()) -> {noreply, state()}.
%% remote pid or several pids joining the group
-handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, remote = Remote} = State) ->
+handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon} = State) ->
case maps:get(Peer, Remote, []) of
{MRef, RemoteGroups} ->
- join_remote_update_ets(Scope, Group, PidOrPids),
+ join_remote_update_ets(Scope, ScopeMon, Group, PidOrPids),
%% store remote group => pids map for fast sync operation
NewRemoteGroups = join_remote(Group, PidOrPids, RemoteGroups),
{noreply, State#state{remote = Remote#{Peer => {MRef, NewRemoteGroups}}}};
@@ -277,10 +329,10 @@ handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, remote = Remot
end;
%% remote pid leaving (multiple groups at once)
-handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, remote = Remote} = State) ->
+handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, remote = Remote, scope_monitors = ScopeMon} = State) ->
case maps:get(Peer, Remote, []) of
{MRef, RemoteMap} ->
- _ = leave_remote_update_ets(Scope, PidOrPids, Groups),
+ _ = leave_remote_update_ets(Scope, ScopeMon, PidOrPids, Groups),
NewRemoteMap = leave_remote(PidOrPids, RemoteMap, Groups),
{noreply, State#state{remote = Remote#{Peer => {MRef, NewRemoteMap}}}};
[] ->
@@ -307,23 +359,28 @@ handle_info({discover, Peer}, #state{remote = Remote, local = Local} = State) ->
end;
%% handle local process exit
-handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, local = Local, remote = Remote} = State) when node(Pid) =:= node() ->
+handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, local = Local, remote = Remote,
+ scope_monitors = ScopeMon} = State) when node(Pid) =:= node() ->
case maps:take(Pid, Local) of
error ->
- %% this can only happen when leave request and 'DOWN' are in pg queue
- {noreply, State};
+ maybe_drop_monitor(MRef, State);
{{MRef, Groups}, NewLocal} ->
- [leave_local_update_ets(Scope, Group, Pid) || Group <- Groups],
+ [leave_local_update_ets(Scope, ScopeMon, Group, Pid) || Group <- Groups],
%% send update to all scope processes on remote nodes
broadcast(maps:keys(Remote), {leave, self(), Pid, Groups}),
{noreply, State#state{local = NewLocal}}
end;
-%% handle remote node down or leaving overlay network
-handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, remote = Remote} = State) ->
- {{MRef, RemoteMap}, NewRemote} = maps:take(Pid, Remote),
- maps:foreach(fun (Group, Pids) -> leave_remote_update_ets(Scope, Pids, [Group]) end, RemoteMap),
- {noreply, State#state{remote = NewRemote}};
+%% handle remote node down or scope leaving overlay network, or a monitor from the remote node went down
+handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, remote = Remote,
+ scope_monitors = ScopeMon} = State) ->
+ case maps:take(Pid, Remote) of
+ {{MRef, RemoteMap}, NewRemote} ->
+ maps:foreach(fun (Group, Pids) -> leave_remote_update_ets(Scope, ScopeMon, Pids, [Group]) end, RemoteMap),
+ {noreply, State#state{remote = NewRemote}};
+ error ->
+ maybe_drop_monitor(MRef, State)
+ end;
%% nodedown: ignore, and wait for 'DOWN' signal for monitored process
handle_info({nodedown, _Node}, State) ->
@@ -363,7 +420,7 @@ ensure_local(Bad) ->
%% Override all knowledge of the remote node with information it sends
%% to local node. Current implementation must do the full table scan
%% to remove stale pids (just as for 'nodedown').
-handle_sync(Scope, Peer, Remote, Groups) ->
+handle_sync(Scope, ScopeMon, Peer, Remote, Groups) ->
%% can't use maps:get() because it evaluates 'default' value first,
%% and in this case monitor() call has side effect.
{MRef, RemoteGroups} =
@@ -374,25 +431,25 @@ handle_sync(Scope, Peer, Remote, Groups) ->
MRef0
end,
%% sync RemoteMap and transform ETS table
- _ = sync_groups(Scope, RemoteGroups, Groups),
+ _ = sync_groups(Scope, ScopeMon, RemoteGroups, Groups),
Remote#{Peer => {MRef, maps:from_list(Groups)}}.
-sync_groups(Scope, RemoteGroups, []) ->
+sync_groups(Scope, ScopeMon, RemoteGroups, []) ->
%% leave all missing groups
- [leave_remote_update_ets(Scope, Pids, [Group]) || {Group, Pids} <- maps:to_list(RemoteGroups)];
-sync_groups(Scope, RemoteGroups, [{Group, Pids} | Tail]) ->
+ [leave_remote_update_ets(Scope, ScopeMon, Pids, [Group]) || {Group, Pids} <- maps:to_list(RemoteGroups)];
+sync_groups(Scope, ScopeMon, RemoteGroups, [{Group, Pids} | Tail]) ->
case maps:take(Group, RemoteGroups) of
{Pids, NewRemoteGroups} ->
- sync_groups(Scope, NewRemoteGroups, Tail);
+ sync_groups(Scope, ScopeMon, NewRemoteGroups, Tail);
{OldPids, NewRemoteGroups} ->
[{Group, AllOldPids, LocalPids}] = ets:lookup(Scope, Group),
%% should be really rare...
AllNewPids = Pids ++ AllOldPids -- OldPids,
true = ets:insert(Scope, {Group, AllNewPids, LocalPids}),
- sync_groups(Scope, NewRemoteGroups, Tail);
+ sync_groups(Scope, ScopeMon, NewRemoteGroups, Tail);
error ->
- join_remote_update_ets(Scope, Group, Pids),
- sync_groups(Scope, RemoteGroups, Tail)
+ join_remote_update_ets(Scope, ScopeMon, Group, Pids),
+ sync_groups(Scope, ScopeMon, RemoteGroups, Tail)
end.
join_local(Pid, Group, Local) when is_pid(Pid) ->
@@ -408,35 +465,39 @@ join_local([], _Group, Local) ->
join_local([Pid | Tail], Group, Local) ->
join_local(Tail, Group, join_local(Pid, Group, Local)).
-join_local_update_ets(Scope, Group, Pid) when is_pid(Pid) ->
+join_local_update_ets(Scope, ScopeMon, Group, Pid) when is_pid(Pid) ->
case ets:lookup(Scope, Group) of
[{Group, All, Local}] ->
ets:insert(Scope, {Group, [Pid | All], [Pid | Local]});
[] ->
ets:insert(Scope, {Group, [Pid], [Pid]})
- end;
-join_local_update_ets(Scope, Group, Pids) ->
+ end,
+ notify_group(ScopeMon, join, Group, [Pid]);
+join_local_update_ets(Scope, ScopeMon, Group, Pids) ->
case ets:lookup(Scope, Group) of
[{Group, All, Local}] ->
ets:insert(Scope, {Group, Pids ++ All, Pids ++ Local});
[] ->
ets:insert(Scope, {Group, Pids, Pids})
- end.
+ end,
+ notify_group(ScopeMon, join, Group, Pids).
-join_remote_update_ets(Scope, Group, Pid) when is_pid(Pid) ->
+join_remote_update_ets(Scope, ScopeMon, Group, Pid) when is_pid(Pid) ->
case ets:lookup(Scope, Group) of
[{Group, All, Local}] ->
ets:insert(Scope, {Group, [Pid | All], Local});
[] ->
ets:insert(Scope, {Group, [Pid], []})
- end;
-join_remote_update_ets(Scope, Group, Pids) ->
+ end,
+ notify_group(ScopeMon, join, Group, [Pid]);
+join_remote_update_ets(Scope, ScopeMon, Group, Pids) ->
case ets:lookup(Scope, Group) of
[{Group, All, Local}] ->
ets:insert(Scope, {Group, Pids ++ All, Local});
[] ->
ets:insert(Scope, {Group, Pids, []})
- end.
+ end,
+ notify_group(ScopeMon, join, Group, Pids).
join_remote(Group, Pid, RemoteGroups) when is_pid(Pid) ->
maps:update_with(Group, fun (List) -> [Pid | List] end, [Pid], RemoteGroups);
@@ -463,17 +524,19 @@ leave_local([], _Group, Local) ->
leave_local([Pid | Tail], Group, Local) ->
leave_local(Tail, Group, leave_local(Pid, Group, Local)).
-leave_local_update_ets(Scope, Group, Pid) when is_pid(Pid) ->
+leave_local_update_ets(Scope, ScopeMon, Group, Pid) when is_pid(Pid) ->
case ets:lookup(Scope, Group) of
[{Group, [Pid], [Pid]}] ->
- ets:delete(Scope, Group);
+ ets:delete(Scope, Group),
+ notify_group(ScopeMon, leave, Group, [Pid]);
[{Group, All, Local}] ->
- ets:insert(Scope, {Group, lists:delete(Pid, All), lists:delete(Pid, Local)});
+ ets:insert(Scope, {Group, lists:delete(Pid, All), lists:delete(Pid, Local)}),
+ notify_group(ScopeMon, leave, Group, [Pid]);
[] ->
%% rare race condition when 'DOWN' from monitor stays in msg queue while process is leave-ing.
true
end;
-leave_local_update_ets(Scope, Group, Pids) ->
+leave_local_update_ets(Scope, ScopeMon, Group, Pids) ->
case ets:lookup(Scope, Group) of
[{Group, All, Local}] ->
case All -- Pids of
@@ -481,23 +544,26 @@ leave_local_update_ets(Scope, Group, Pids) ->
ets:delete(Scope, Group);
NewAll ->
ets:insert(Scope, {Group, NewAll, Local -- Pids})
- end;
+ end,
+ notify_group(ScopeMon, leave, Group, Pids);
[] ->
true
end.
-leave_remote_update_ets(Scope, Pid, Groups) when is_pid(Pid) ->
+leave_remote_update_ets(Scope, ScopeMon, Pid, Groups) when is_pid(Pid) ->
_ = [
case ets:lookup(Scope, Group) of
[{Group, [Pid], []}] ->
- ets:delete(Scope, Group);
+ ets:delete(Scope, Group),
+ notify_group(ScopeMon, leave, Group, [Pid]);
[{Group, All, Local}] ->
- ets:insert(Scope, {Group, lists:delete(Pid, All), Local});
+ ets:insert(Scope, {Group, lists:delete(Pid, All), Local}),
+ notify_group(ScopeMon, leave, Group, [Pid]);
[] ->
true
end ||
Group <- Groups];
-leave_remote_update_ets(Scope, Pids, Groups) ->
+leave_remote_update_ets(Scope, ScopeMon, Pids, Groups) ->
_ = [
case ets:lookup(Scope, Group) of
[{Group, All, Local}] ->
@@ -506,7 +572,8 @@ leave_remote_update_ets(Scope, Pids, Groups) ->
ets:delete(Scope, Group);
NewAll ->
ets:insert(Scope, {Group, NewAll, Local})
- end;
+ end,
+ notify_group(ScopeMon, leave, Group, Pids);
[] ->
true
end ||
@@ -543,3 +610,32 @@ broadcast([Dest | Tail], Msg) ->
%% join/leave messages when dist buffer is full
erlang:send(Dest, Msg, [noconnect]),
broadcast(Tail, Msg).
+
+
+%% drops a monitor if DOWN was received
+maybe_drop_monitor(MRef, #state{scope_monitors = ScopeMon} = State) ->
+ %% could be a local monitor going DOWN. Since it's a rare event, check should
+ %% not stay in front of any other, more frequent events
+ case maps:take(MRef, ScopeMon) of
+ error ->
+ %% this can only happen when leave request and 'DOWN' are in pg queue
+ {noreply, State};
+ {_Pid, NewScopeMon} ->
+ {noreply, State#state{scope_monitors = NewScopeMon}}
+ end.
+
+%% notify all scope monitors about an Action in Groups for Pids
+notify_group(ScopeMon, Action, Group, Pids) ->
+ maps:foreach(
+ fun (Ref, Pid) ->
+ erlang:send(Pid, {Ref, Action, Group, Pids}, [noconnect])
+ end, ScopeMon).
+
+%% remove all messages that were send to monitor groups
+flush(Ref) ->
+ receive
+ {Ref, Verb, _Group, _Pids} when Verb =:= join; Verb =:= leave ->
+ flush(Ref)
+ after 0 ->
+ ok
+ end.
diff --git a/lib/kernel/test/pg_SUITE.erl b/lib/kernel/test/pg_SUITE.erl
index a6708dc419..5d56b751e0 100644
--- a/lib/kernel/test/pg_SUITE.erl
+++ b/lib/kernel/test/pg_SUITE.erl
@@ -54,7 +54,8 @@
missing_scope_join/1,
disconnected_start/1,
forced_sync/0, forced_sync/1,
- group_leave/1
+ group_leave/1,
+ monitor_scope/0, monitor_scope/1
]).
-export([
@@ -72,15 +73,16 @@ end_per_testcase(TestCase, _Config) ->
ok.
all() ->
- [dyn_distribution, {group, basic}, {group, cluster}, {group, performance}].
+ [dyn_distribution, {group, basic}, {group, cluster}, {group, performance}, {group, monitor}].
groups() ->
[
{basic, [parallel], [errors, pg, leave_exit_race, single, overlay_missing]},
- {performance, [sequential], [thundering_herd]},
+ {performance, [], [thundering_herd]},
{cluster, [parallel], [process_owner_check, two, initial, netsplit, trisplit, foursplit,
exchange, nolocal, double, scope_restart, missing_scope_join, empty_group_by_remote_leave,
- disconnected_start, forced_sync, group_leave]}
+ disconnected_start, forced_sync, group_leave]},
+ {monitor, [parallel], [monitor_scope]}
].
%%--------------------------------------------------------------------
@@ -262,13 +264,13 @@ empty_group_by_remote_leave(Config) when is_list(Config) ->
sync({?FUNCTION_NAME, TwoPeer}),
?assertEqual([RemotePid], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
% inspecting internal state is not best practice, but there's no other way to check if the state is correct.
- {state, _, _, #{RemoteNode := {_, RemoteMap}}} = sys:get_state(?FUNCTION_NAME),
+ {_, RemoteMap} = maps:get(RemoteNode, element(4, sys:get_state(?FUNCTION_NAME))),
?assertEqual(#{?FUNCTION_NAME => [RemotePid]}, RemoteMap),
% remote leave
?assertEqual(ok, rpc:call(TwoPeer, pg, leave, [?FUNCTION_NAME, ?FUNCTION_NAME, RemotePid])),
sync({?FUNCTION_NAME, TwoPeer}),
?assertEqual([], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
- {state, _, _, #{RemoteNode := {_, NewRemoteMap}}} = sys:get_state(?FUNCTION_NAME),
+ {_, NewRemoteMap} = maps:get(RemoteNode, element(4, sys:get_state(?FUNCTION_NAME))),
% empty group should be deleted.
?assertEqual(#{}, NewRemoteMap),
@@ -281,7 +283,7 @@ empty_group_by_remote_leave(Config) when is_list(Config) ->
?assertEqual(ok, rpc:call(TwoPeer, pg, leave, [?FUNCTION_NAME, ?FUNCTION_NAME, [RemotePid2, RemotePid]])),
sync({?FUNCTION_NAME, TwoPeer}),
?assertEqual([], pg:get_members(?FUNCTION_NAME, ?FUNCTION_NAME)),
- {state, _, _, #{RemoteNode := {_, NewRemoteMap}}} = sys:get_state(?FUNCTION_NAME),
+ {_, NewRemoteMap} = maps:get(RemoteNode, element(4, sys:get_state(?FUNCTION_NAME))),
stop_node(TwoPeer, Socket),
ok.
@@ -558,6 +560,85 @@ group_leave(Config) when is_list(Config) ->
?assertEqual([], pg:get_members(?FUNCTION_NAME, two)),
ok.
+monitor_scope() ->
+ [{doc, "Tests monitor_scope/1 and demonitor/2"}].
+
+monitor_scope(Config) when is_list(Config) ->
+ Self = self(),
+ Scope = ?FUNCTION_NAME,
+ Group = ?FUNCTION_ARITY,
+ %% ensure that demonitoring returns 'false' when monitor is not installed
+ ?assertEqual(false, pg:demonitor(Scope, erlang:make_ref())),
+ %% start the actual test case
+ {Ref, #{}} = pg:monitor_scope(Scope),
+ %% local join
+ ?assertEqual(ok, pg:join(Scope, Group, Self)),
+ wait_message(Ref, join, Group, [Self], "Local"),
+ %% start second monitor (which has 1 local pid at the start)
+ SecondMonitor = spawn_link(fun() -> second_monitor(Scope, Group, Self) end),
+ Ref2 = receive {second, SecondRef} -> SecondRef end,
+ %% start a remote node, and a remote monitor
+ {Peer, Node} = spawn_node(Scope),
+ ScopePid = whereis(Scope),
+ %% do not care about the remote monitor, it is started only to check DOWN handling
+ _ThirdMonitor = spawn(Node, fun() -> second_monitor(ScopePid, Group, Self) end),
+ %% remote join
+ RemotePid = erlang:spawn(Node, forever()),
+ ?assertEqual(ok, rpc:call(Node, pg, join, [Scope, Group, [RemotePid, RemotePid]])),
+ wait_message(Ref, join, Group, [RemotePid, RemotePid], "Remote"),
+ %% verify leave event
+ ?assertEqual([Self], pg:get_local_members(Scope, Group)),
+ ?assertEqual(ok, pg:leave(Scope, Group, self())),
+ wait_message(Ref, leave, Group, [Self], "Local"),
+ %% remote leave
+ ?assertEqual(ok, rpc:call(Node, pg, leave, [Scope, Group, RemotePid])),
+ wait_message(Ref, leave, Group, [RemotePid], "Remote"),
+ %% drop the SecondMonitor - this keeps original and remote monitors
+ SecondMonMsgs = gen_server:call(SecondMonitor, flush),
+ %% inspect the queue, it should contain double remote join, then single local and single remove leave
+ ?assertEqual([
+ {Ref2, join, Group, [RemotePid, RemotePid]},
+ {Ref2, leave, Group, [Self]},
+ {Ref2, leave, Group, [RemotePid]}],
+ SecondMonMsgs),
+ %% remote leave via stop (causes remote monitor to go DOWN)
+ wait_message(Ref, leave, Group, [RemotePid], "Remote stop"),
+ %% WHITE BOX: knowing pg state internals - only the original monitor should stay
+ {state, _, _, _, InternalMonitors} = sys:get_state(?FUNCTION_NAME),
+ ?assertEqual(#{Ref => Self}, InternalMonitors, "pg did not remove DOWNed monitor"),
+ %% demonitor
+ ?assertEqual(ok, pg:demonitor(Scope, Ref)),
+ ?assertEqual(false, pg:demonitor(Scope, Ref)),
+ %% ensure messages don't come
+ ?assertEqual(ok, pg:join(Scope, Group, Self)),
+ sync(Scope),
+ %% join should not be here
+ receive {Ref, Action, Group, [Self]} -> ?assert(false, lists:concat(["Unexpected ", Action, "event"]))
+ after 0 -> ok end.
+
+wait_message(Ref, Action, Group, Pids, Msg) ->
+ receive
+ {Ref, Action, Group, Pids} ->
+ ok
+ after 1000 ->
+ {messages, Msgs} = process_info(self(), messages),
+ ct:pal("Message queue: ~0p", [Msgs]),
+ ?assert(false, Msg ++ " " ++ atom_to_list(Action) ++ " event failed to occur")
+ end.
+
+second_monitor(Scope, Group, Control) ->
+ {Ref, #{Group := [Control]}} = pg:monitor_scope(Scope),
+ Control ! {second, Ref},
+ second_monitor([]).
+
+second_monitor(Msgs) ->
+ receive
+ {'$gen_call', Reply, flush} ->
+ gen:reply(Reply, lists:reverse(Msgs));
+ Msg ->
+ second_monitor([Msg | Msgs])
+ end.
+
%%--------------------------------------------------------------------
%% Test Helpers - start/stop additional Erlang nodes
--
2.35.3