File 0229-kernel-Prepare-pg-for-protocol-upgrade.patch of Package erlang
From 8e8d4adcd4f2356697db958f4707d198591134a3 Mon Sep 17 00:00:00 2001
From: Sverker Eriksson <sverker@erlang.org>
Date: Fri, 4 Nov 2022 17:33:37 +0100
Subject: [PATCH 3/3] kernel: Prepare pg for protocol upgrade
---
lib/kernel/src/pg.erl | 34 +++++++++++++++++++++++-----------
lib/kernel/test/pg_SUITE.erl | 29 +++++++++++++++++++++++++++--
2 files changed, 50 insertions(+), 13 deletions(-)
diff --git a/lib/kernel/src/pg.erl b/lib/kernel/src/pg.erl
index d598511c16..1f608d9362 100644
--- a/lib/kernel/src/pg.erl
+++ b/lib/kernel/src/pg.erl
@@ -351,6 +351,7 @@ handle_cast(_, _State) ->
-spec handle_info(
{discover, Peer :: pid()} |
+ {discover, Peer :: pid(), any()} |
{join, Peer :: pid(), group(), pid() | [pid()]} |
{leave, Peer :: pid(), pid() | [pid()], [group()]} |
{'DOWN', reference(), process, pid(), term()} |
@@ -392,17 +393,13 @@ handle_info({leave, Peer, PidOrPids, Groups}, #state{scope = Scope, remote = Rem
end;
%% we're being discovered, let's exchange!
-handle_info({discover, Peer}, #state{remote = Remote, local = Local} = State) ->
- gen_server:cast(Peer, {sync, self(), all_local_pids(Local)}),
- %% do we know who is looking for us?
- case maps:is_key(Peer, Remote) of
- true ->
- {noreply, State};
- false ->
- MRef = erlang:monitor(process, Peer),
- erlang:send(Peer, {discover, self()}, [noconnect]),
- {noreply, State#state{remote = Remote#{Peer => {MRef, #{}}}}}
- end;
+handle_info({discover, Peer}, State) ->
+ handle_discover(Peer, State);
+
+%% New discover message sent by a future pg version.
+%% Accepted first in OTP 26, to be used by OTP 28 or later.
+handle_info({discover, Peer, _ProtocolVersion}, State) ->
+ handle_discover(Peer, State);
%% handle local process exit, or a local monitor exit
handle_info({'DOWN', MRef, process, Pid, _Info}, #state{scope = Scope, local = Local,
@@ -450,6 +447,21 @@ terminate(_Reason, #state{scope = Scope}) ->
%%--------------------------------------------------------------------
%% Internal implementation
+handle_discover(Peer, #state{remote = Remote, local = Local} = State) ->
+ gen_server:cast(Peer, {sync, self(), all_local_pids(Local)}),
+ %% do we know who is looking for us?
+ case maps:is_key(Peer, Remote) of
+ true ->
+ {noreply, State};
+ false ->
+ MRef = erlang:monitor(process, Peer),
+ erlang:send(Peer, {discover, self()}, [noconnect]),
+ {noreply, State#state{remote = Remote#{Peer => {MRef, #{}}}}}
+ end;
+handle_discover(_, _) ->
+ erlang:error(badarg).
+
+
%% Ensures argument is either a node-local pid or a list of such, or it throws an error
ensure_local(Pid) when is_pid(Pid), node(Pid) =:= node() ->
ok;
diff --git a/lib/kernel/test/pg_SUITE.erl b/lib/kernel/test/pg_SUITE.erl
index 423fc42a5a..a7e0e8a18a 100644
--- a/lib/kernel/test/pg_SUITE.erl
+++ b/lib/kernel/test/pg_SUITE.erl
@@ -57,7 +57,8 @@
group_leave/1,
monitor_nonempty_scope/0, monitor_nonempty_scope/1,
monitor_scope/0, monitor_scope/1,
- monitor/1
+ monitor/1,
+ protocol_upgrade/1
]).
-include_lib("common_test/include/ct.hrl").
@@ -79,7 +80,8 @@ all() ->
groups() ->
[
- {basic, [parallel], [errors, pg, leave_exit_race, single, overlay_missing]},
+ {basic, [parallel], [errors, pg, leave_exit_race, single, overlay_missing,
+ protocol_upgrade]},
{performance, [], [thundering_herd]},
{cluster, [parallel], [process_owner_check, two, initial, netsplit, trisplit, foursplit,
exchange, nolocal, double, scope_restart, missing_scope_join, empty_group_by_remote_leave,
@@ -704,9 +706,32 @@ second_monitor(Msgs) ->
second_monitor([Msg | Msgs])
end.
+protocol_upgrade(Config) when is_list(Config) ->
+ Scope = ?FUNCTION_NAME,
+ Group = ?FUNCTION_NAME,
+ {Peer, Node} = spawn_node(Scope),
+ PgPid = rpc:call(Node, erlang, whereis, [Scope]),
+
+ RemotePid = erlang:spawn(Node, forever()),
+ ok = rpc:call(Node, pg, join, [Scope, Group, RemotePid]),
+
+ %% OTP 26:
+ %% Just do a white-box test and verify that pg accepts
+ %% a "future" discover message and replies with a sync.
+ PgPid ! {discover, self(), "Protocol version (ignore me)"},
+ {'$gen_cast', {sync, PgPid, [{Group, [RemotePid]}]}} = receive_any(),
+
+ %% stop the peer
+ peer:stop(Peer),
+ ok.
+
+
%%--------------------------------------------------------------------
%% Test Helpers - start/stop additional Erlang nodes
+receive_any() ->
+ receive M -> M end.
+
%% flushes GS (GenServer) queue, ensuring that all prior
%% messages have been processed
sync(GS) ->
--
2.35.3