File 5741-pg-refactor-internals-for-readability.patch of Package erlang

From 1dde3119e070472709f6d28296546b2d15b3c4a3 Mon Sep 17 00:00:00 2001
From: Maxim Fedorov <>
Date: Tue, 7 Jun 2022 09:47:09 -0700
Subject: [PATCH 1/4] [pg] refactor internals for readability

Original implementation has a number of inconsistencies that
makes it harder to read. This patch:
 - renames 'monitors' to 'local' (processes running on the local node)
 - renames 'nodes' to 'remote' for consnstency with 'local'
 - makes naming for join/leave updating server state and ETS table consistent
 lib/kernel/src/pg.erl | 184 ++++++++++++++++++++----------------------
 1 file changed, 89 insertions(+), 95 deletions(-)

diff --git a/lib/kernel/src/pg.erl b/lib/kernel/src/pg.erl
index 573d2e6953..8d0f6124f1 100644
--- a/lib/kernel/src/pg.erl
+++ b/lib/kernel/src/pg.erl
@@ -204,9 +204,9 @@ which_local_groups(Scope) when is_atom(Scope) ->
     %% ETS table name, and also the registered process name (self())
     scope :: atom(),
     %% monitored local processes and groups they joined
-    monitors = #{} :: #{pid() => {MRef :: reference(), Groups :: [group()]}},
+    local = #{} :: #{pid() => {MRef :: reference(), Groups :: [group()]}},
     %% remote node: scope process monitor and map of groups to pids for fast sync routine
-    nodes = #{} :: #{pid() => {reference(), #{group() => [pid()]}}}
+    remote = #{} :: #{pid() => {reference(), #{group() => [pid()]}}}
 -type state() :: #state{}.
@@ -214,44 +214,44 @@ which_local_groups(Scope) when is_atom(Scope) ->
 -spec init([Scope :: atom()]) -> {ok, state()}.
 init([Scope]) ->
     ok = net_kernel:monitor_nodes(true),
-    %% discover all nodes in the cluster
+    %% discover all nodes running this scope in the cluster
     broadcast([{Scope, Node} || Node <- nodes()], {discover, self()}),
     Scope = ets:new(Scope, [set, protected, named_table, {read_concurrency, true}]),
     {ok, #state{scope = Scope}}.
 -spec handle_call(Call :: {join_local, Group :: group(), Pid :: pid()}
                         | {leave_local, Group :: group(), Pid :: pid()},
-                  From :: {pid(),Tag :: any()},
+                  From :: {pid(), Tag :: any()},
                   State :: state()) -> {reply, ok | not_joined, state()}.
-handle_call({join_local, Group, PidOrPids}, _From, #state{scope = Scope, monitors = Monitors, nodes = Nodes} = State) ->
-    NewMons = join_monitors(PidOrPids, Group, Monitors),
-    join_local_group(Scope, Group, PidOrPids),
-    broadcast(maps:keys(Nodes), {join, self(), Group, PidOrPids}),
-    {reply, ok, State#state{monitors = NewMons}};
+handle_call({join_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local, remote = Remote} = State) ->
+    NewLocal = join_local(PidOrPids, Group, Local),
+    join_local_update_ets(Scope, Group, PidOrPids),
+    broadcast(maps:keys(Remote), {join, self(), Group, PidOrPids}),
+    {reply, ok, State#state{local = NewLocal}};
-handle_call({leave_local, Group, PidOrPids}, _From, #state{scope = Scope, monitors = Monitors, nodes = Nodes} = State) ->
-    case leave_monitors(PidOrPids, Group, Monitors) of
-        Monitors ->
+handle_call({leave_local, Group, PidOrPids}, _From, #state{scope = Scope, local = Local, remote = Remote} = State) ->
+    case leave_local(PidOrPids, Group, Local) of
+        Local ->
             {reply, not_joined, State};
-        NewMons ->
-            leave_local_group(Scope, Group, PidOrPids),
-            broadcast(maps:keys(Nodes), {leave, self(), PidOrPids, [Group]}),
-            {reply, ok, State#state{monitors = NewMons}}
+        NewLocal ->
+            leave_local_update_ets(Scope, Group, PidOrPids),
+            broadcast(maps:keys(Remote), {leave, self(), PidOrPids, [Group]}),
+            {reply, ok, State#state{local = NewLocal}}
 handle_call(_Request, _From, _S) ->
-    error(badarg).
+    erlang:error(badarg).
 -spec handle_cast(
     {sync, Peer :: pid(), Groups :: [{group(), [pid()]}]},
     State :: state()) -> {noreply, state()}.
-handle_cast({sync, Peer, Groups}, #state{scope = Scope, nodes = Nodes} = State) ->
-    {noreply, State#state{nodes = handle_sync(Scope, Peer, Nodes, Groups)}};
+handle_cast({sync, Peer, Groups}, #state{scope = Scope, remote = Remote} = State) ->
+    {noreply, State#state{remote = handle_sync(Scope, Peer, Remote, Groups)}};
 handle_cast(_, _State) ->
-    error(badarg).
+    erlang:error(badarg).
 -spec handle_info(
     {discover, Peer :: pid()} |
@@ -261,13 +261,13 @@ handle_cast(_, _State) ->
     {nodedown, node()} | {nodeup, node()}, State :: state()) -> {noreply, state()}.
 %% remote pid or several pids joining the group
-handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, nodes = Nodes} = State) ->
-    case maps:get(Peer, Nodes, []) of
+handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, remote = Remote} = State) ->
+    case maps:get(Peer, Remote, []) of
         {MRef, RemoteGroups} ->
-            join_remote(Scope, Group, PidOrPids),
+            join_remote_update_ets(Scope, Group, PidOrPids),
             %% store remote group => pids map for fast sync operation
-            NewRemoteGroups = join_remote_map(Group, PidOrPids, RemoteGroups),
-            {noreply, State#state{nodes = Nodes#{Peer => {MRef, NewRemoteGroups}}}};
+            NewRemoteGroups = join_remote(Group, PidOrPids, RemoteGroups),
+            {noreply, State#state{remote = Remote#{Peer => {MRef, NewRemoteGroups}}}};
         [] ->
             %% handle possible race condition, when remote node is flickering up/down,
             %%  and remote join can happen after the node left overlay network
@@ -277,12 +277,12 @@ handle_info({join, Peer, Group, PidOrPids}, #state{scope = Scope, nodes = Nodes}
 %% remote pid leaving (multiple groups at once)
-handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, nodes = Nodes} = State) ->
-    case maps:get(Peer, Nodes, []) of
+handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, remote = Remote} = State) ->
+    case maps:get(Peer, Remote, []) of
         {MRef, RemoteMap} ->
-            _ = leave_remote(Scope, PidOrPids, Groups),
-            NewRemoteMap = leave_update_remote_map(PidOrPids, RemoteMap, Groups),
-            {noreply, State#state{nodes = Nodes#{Peer => {MRef, NewRemoteMap}}}};
+            _ = leave_remote_update_ets(Scope, PidOrPids, Groups),
+            NewRemoteMap = leave_remote(PidOrPids, RemoteMap, Groups),
+            {noreply, State#state{remote = Remote#{Peer => {MRef, NewRemoteMap}}}};
         [] ->
             %% Handle race condition: remote node disconnected, but scope process
             %%  of the remote node was just about to send 'leave' message. In this
@@ -294,36 +294,36 @@ handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, nodes = Node
 %% we're being discovered, let's exchange!
-handle_info({discover, Peer}, #state{nodes = Nodes, monitors = Monitors} = State) ->
-    gen_server:cast(Peer, {sync, self(), all_local_pids(Monitors)}),
+handle_info({discover, Peer}, #state{remote = Remote, local = Local} = State) ->
+    gen_server:cast(Peer, {sync, self(), all_local_pids(Local)}),
     %% do we know who is looking for us?
-    case maps:is_key(Peer, Nodes) of
+    case maps:is_key(Peer, Remote) of
         true ->
             {noreply, State};
         false ->
-            MRef = monitor(process, Peer),
+            MRef = erlang:monitor(process, Peer),
             erlang:send(Peer, {discover, self()}, [noconnect]),
-            {noreply, State#state{nodes = Nodes#{Peer => {MRef, #{}}}}}
+            {noreply, State#state{remote = Remote#{Peer => {MRef, #{}}}}}
 %% handle local process exit
-handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, monitors = Monitors, nodes = Nodes} = State) when node(Pid) =:= node() ->
-    case maps:take(Pid, Monitors) of
+handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, local = Local, remote = Remote} = State) when node(Pid) =:= node() ->
+    case maps:take(Pid, Local) of
         error ->
             %% this can only happen when leave request and 'DOWN' are in pg queue
             {noreply, State};
-        {{MRef, Groups}, NewMons} ->
-            [leave_local_group(Scope, Group, Pid) || Group <- Groups],
-            %% send update to all nodes
-            broadcast(maps:keys(Nodes), {leave, self(), Pid, Groups}),
-            {noreply, State#state{monitors = NewMons}}
+        {{MRef, Groups}, NewLocal} ->
+            [leave_local_update_ets(Scope, Group, Pid) || Group <- Groups],
+            %% send update to all scope processes on remote nodes
+            broadcast(maps:keys(Remote), {leave, self(), Pid, Groups}),
+            {noreply, State#state{local = NewLocal}}
 %% handle remote node down or leaving overlay network
-handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, nodes = Nodes} = State)  ->
-    {{MRef, RemoteMap}, NewNodes} = maps:take(Pid, Nodes),
-    maps:foreach(fun (Group, Pids) -> leave_remote(Scope, Pids, [Group]) end, RemoteMap),
-    {noreply, State#state{nodes = NewNodes}};
+handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, remote = Remote} = State)  ->
+    {{MRef, RemoteMap}, NewRemote} = maps:take(Pid, Remote),
+    maps:foreach(fun (Group, Pids) -> leave_remote_update_ets(Scope, Pids, [Group]) end, RemoteMap),
+    {noreply, State#state{remote = NewRemote}};
 %% nodedown: ignore, and wait for 'DOWN' signal for monitored process
 handle_info({nodedown, _Node}, State) ->
@@ -337,7 +337,7 @@ handle_info({nodeup, Node}, #state{scope = Scope} = State) ->
     {noreply, State};
 handle_info(_Info, _State) ->
-    error(badarg).
+    erlang:error(badarg).
 -spec terminate(Reason :: any(), State :: state()) -> true.
 terminate(_Reason, #state{scope = Scope}) ->
@@ -355,31 +355,31 @@ ensure_local(Pids) when is_list(Pids) ->
             (Pid) when is_pid(Pid), node(Pid) =:= node() ->
             (Bad) ->
-                error({nolocal, Bad})
+                erlang:error({nolocal, Bad})
         end, Pids);
 ensure_local(Bad) ->
-    error({nolocal, Bad}).
+    erlang:error({nolocal, Bad}).
 %% Override all knowledge of the remote node with information it sends
 %%  to local node. Current implementation must do the full table scan
 %%  to remove stale pids (just as for 'nodedown').
-handle_sync(Scope, Peer, Nodes, Groups) ->
+handle_sync(Scope, Peer, Remote, Groups) ->
     %% can't use maps:get() because it evaluates 'default' value first,
     %%   and in this case monitor() call has side effect.
     {MRef, RemoteGroups} =
-        case maps:find(Peer, Nodes) of
+        case maps:find(Peer, Remote) of
             error ->
-                {monitor(process, Peer), #{}};
+                {erlang:monitor(process, Peer), #{}};
             {ok, MRef0} ->
     %% sync RemoteMap and transform ETS table
     _ = sync_groups(Scope, RemoteGroups, Groups),
-    Nodes#{Peer => {MRef, maps:from_list(Groups)}}.
+    Remote#{Peer => {MRef, maps:from_list(Groups)}}.
 sync_groups(Scope, RemoteGroups, []) ->
     %% leave all missing groups
-    [leave_remote(Scope, Pids, [Group]) || {Group, Pids} <- maps:to_list(RemoteGroups)];
+    [leave_remote_update_ets(Scope, Pids, [Group]) || {Group, Pids} <- maps:to_list(RemoteGroups)];
 sync_groups(Scope, RemoteGroups, [{Group, Pids} | Tail]) ->
     case maps:take(Group, RemoteGroups) of
         {Pids, NewRemoteGroups} ->
@@ -391,31 +391,31 @@ sync_groups(Scope, RemoteGroups, [{Group, Pids} | Tail]) ->
             true = ets:insert(Scope, {Group, AllNewPids, LocalPids}),
             sync_groups(Scope, NewRemoteGroups, Tail);
         error ->
-            join_remote(Scope, Group, Pids),
+            join_remote_update_ets(Scope, Group, Pids),
             sync_groups(Scope, RemoteGroups, Tail)
-join_monitors(Pid, Group, Monitors) when is_pid(Pid) ->
-    case maps:find(Pid, Monitors) of
+join_local(Pid, Group, Local) when is_pid(Pid) ->
+    case maps:find(Pid, Local) of
         {ok, {MRef, Groups}} ->
-            maps:put(Pid, {MRef, [Group | Groups]}, Monitors);
+            maps:put(Pid, {MRef, [Group | Groups]}, Local);
         error ->
             MRef = erlang:monitor(process, Pid),
-            Monitors#{Pid => {MRef, [Group]}}
+            Local#{Pid => {MRef, [Group]}}
-join_monitors([], _Group, Monitors) ->
-    Monitors;
-join_monitors([Pid | Tail], Group, Monitors) ->
-    join_monitors(Tail, Group, join_monitors(Pid, Group, Monitors)).
+join_local([], _Group, Local) ->
+    Local;
+join_local([Pid | Tail], Group, Local) ->
+    join_local(Tail, Group, join_local(Pid, Group, Local)).
-join_local_group(Scope, Group, Pid) when is_pid(Pid) ->
+join_local_update_ets(Scope, Group, Pid) when is_pid(Pid) ->
     case ets:lookup(Scope, Group) of
         [{Group, All, Local}] ->
             ets:insert(Scope, {Group, [Pid | All], [Pid | Local]});
         [] ->
             ets:insert(Scope, {Group, [Pid], [Pid]})
-join_local_group(Scope, Group, Pids) ->
+join_local_update_ets(Scope, Group, Pids) ->
     case ets:lookup(Scope, Group) of
         [{Group, All, Local}] ->
             ets:insert(Scope, {Group, Pids ++ All, Pids ++ Local});
@@ -423,14 +423,14 @@ join_local_group(Scope, Group, Pids) ->
             ets:insert(Scope, {Group, Pids, Pids})
-join_remote(Scope, Group, Pid) when is_pid(Pid) ->
+join_remote_update_ets(Scope, Group, Pid) when is_pid(Pid) ->
     case ets:lookup(Scope, Group) of
         [{Group, All, Local}] ->
             ets:insert(Scope, {Group, [Pid | All], Local});
         [] ->
             ets:insert(Scope, {Group, [Pid], []})
-join_remote(Scope, Group, Pids) ->
+join_remote_update_ets(Scope, Group, Pids) ->
     case ets:lookup(Scope, Group) of
         [{Group, All, Local}] ->
             ets:insert(Scope, {Group, Pids ++ All, Local});
@@ -438,32 +438,32 @@ join_remote(Scope, Group, Pids) ->
             ets:insert(Scope, {Group, Pids, []})
-join_remote_map(Group, Pid, RemoteGroups) when is_pid(Pid) ->
+join_remote(Group, Pid, RemoteGroups) when is_pid(Pid) ->
     maps:update_with(Group, fun (List) -> [Pid | List] end, [Pid], RemoteGroups);
-join_remote_map(Group, Pids, RemoteGroups) ->
+join_remote(Group, Pids, RemoteGroups) ->
     maps:update_with(Group, fun (List) -> Pids ++ List end, Pids, RemoteGroups).
-leave_monitors(Pid, Group, Monitors) when is_pid(Pid) ->
-    case maps:find(Pid, Monitors) of
+leave_local(Pid, Group, Local) when is_pid(Pid) ->
+    case maps:find(Pid, Local) of
         {ok, {MRef, [Group]}} ->
-            maps:remove(Pid, Monitors);
+            maps:remove(Pid, Local);
         {ok, {MRef, Groups}} ->
             case lists:member(Group, Groups) of
                 true ->
-                    maps:put(Pid, {MRef, lists:delete(Group, Groups)}, Monitors);
+                    maps:put(Pid, {MRef, lists:delete(Group, Groups)}, Local);
                 false ->
-                    Monitors
+                    Local
         _ ->
-            Monitors
+            Local
-leave_monitors([], _Group, Monitors) ->
-    Monitors;
-leave_monitors([Pid | Tail], Group, Monitors) ->
-    leave_monitors(Tail, Group, leave_monitors(Pid, Group, Monitors)).
+leave_local([], _Group, Local) ->
+    Local;
+leave_local([Pid | Tail], Group, Local) ->
+    leave_local(Tail, Group, leave_local(Pid, Group, Local)).
-leave_local_group(Scope, Group, Pid) when is_pid(Pid) ->
+leave_local_update_ets(Scope, Group, Pid) when is_pid(Pid) ->
     case ets:lookup(Scope, Group) of
         [{Group, [Pid], [Pid]}] ->
             ets:delete(Scope, Group);
@@ -473,7 +473,7 @@ leave_local_group(Scope, Group, Pid) when is_pid(Pid) ->
             %% rare race condition when 'DOWN' from monitor stays in msg queue while process is leave-ing.
-leave_local_group(Scope, Group, Pids) ->
+leave_local_update_ets(Scope, Group, Pids) ->
     case ets:lookup(Scope, Group) of
         [{Group, All, Local}] ->
             case All -- Pids of
@@ -486,7 +486,7 @@ leave_local_group(Scope, Group, Pids) ->
-leave_remote(Scope, Pid, Groups) when is_pid(Pid) ->
+leave_remote_update_ets(Scope, Pid, Groups) when is_pid(Pid) ->
     _ = [
         case ets:lookup(Scope, Group) of
             [{Group, [Pid], []}] ->
@@ -497,7 +497,7 @@ leave_remote(Scope, Pid, Groups) when is_pid(Pid) ->
         end ||
         Group <- Groups];
-leave_remote(Scope, Pids, Groups) ->
+leave_remote_update_ets(Scope, Pids, Groups) ->
     _ = [
         case ets:lookup(Scope, Group) of
             [{Group, All, Local}] ->
@@ -512,9 +512,9 @@ leave_remote(Scope, Pids, Groups) ->
         end ||
         Group <- Groups].
-leave_update_remote_map(Pid, RemoteMap, Groups) when is_pid(Pid) ->
-    leave_update_remote_map([Pid], RemoteMap, Groups);
-leave_update_remote_map(Pids, RemoteMap, Groups) ->
+leave_remote(Pid, RemoteMap, Groups) when is_pid(Pid) ->
+    leave_remote([Pid], RemoteMap, Groups);
+leave_remote(Pids, RemoteMap, Groups) ->
         fun (Group, Acc) ->
             case maps:get(Group, Acc) -- Pids of
@@ -525,20 +525,14 @@ leave_update_remote_map(Pids, RemoteMap, Groups) ->
         end, RemoteMap, Groups).
-all_local_pids(Monitors) ->
+all_local_pids(Local) ->
         fun(Pid, {_Ref, Groups}, Acc) ->
                 fun(Group, Acc1) ->
                     Acc1#{Group => [Pid | maps:get(Group, Acc1, [])]}
-                end,
-                Acc,
-                Groups
-            )
-        end,
-        #{},
-        Monitors
-    )).
+                end, Acc, Groups)
+        end, #{}, Local)).
 %% Works as gen_server:abcast(), but accepts a list of processes
 %%   instead of nodes list.

openSUSE Build Service is sponsored by