File 1331-Guarantee-nodedown-before-nodeup-messages.patch of Package erlang
From 2f1456971af90c94d698f94318e4e26a1157c082 Mon Sep 17 00:00:00 2001
From: Sverker Eriksson <sverker@erlang.org>
Date: Fri, 4 Oct 2019 17:24:55 +0200
Subject: [PATCH 1/7] Guarantee nodedown before nodeup messages
---
erts/emulator/beam/dist.c | 76 +++++++++++++--
erts/emulator/beam/erl_node_tables.c | 3 +
erts/emulator/beam/erl_node_tables.h | 2 +
lib/kernel/test/erl_distribution_SUITE.erl | 102 ++++++++++++++++++++-
4 files changed, 173 insertions(+), 10 deletions(-)
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index f1cd97c3bf..d37211c742 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -279,6 +279,7 @@ typedef enum {
typedef struct {
ErtsConMonLnkSeqCleanupState state;
+ DistEntry* dep;
ErtsMonLnkDist *dist;
DistSeqNode *seq;
void *yield_state;
@@ -343,11 +344,30 @@ con_monitor_link_seq_cleanup(void *vcmlcp)
cmlcp->state = ERTS_CML_CLEANUP_STATE_NODE_MONITORS;
case ERTS_CML_CLEANUP_STATE_NODE_MONITORS:
if (cmlcp->trigger_node_monitors) {
+ Process* waiter;
send_nodes_mon_msgs(NULL,
am_nodedown,
cmlcp->nodename,
cmlcp->visability,
cmlcp->reason);
+ erts_de_rwlock(cmlcp->dep);
+ ASSERT(cmlcp->dep->state == ERTS_DE_STATE_IDLE ||
+ cmlcp->dep->state == ERTS_DE_STATE_PENDING);
+ ASSERT(cmlcp->dep->pending_nodedown);
+ waiter = cmlcp->dep->suspended_nodeup;
+ cmlcp->dep->suspended_nodeup = NULL;
+ cmlcp->dep->pending_nodedown = 0;
+ erts_de_rwunlock(cmlcp->dep);
+ erts_deref_dist_entry(cmlcp->dep);
+
+ if (waiter) {
+ erts_proc_lock(waiter, ERTS_PROC_LOCK_STATUS);
+ if (!ERTS_PROC_IS_EXITING(waiter)) {
+ erts_resume(waiter, ERTS_PROC_LOCK_STATUS);
+ }
+ erts_proc_unlock(waiter, ERTS_PROC_LOCK_STATUS);
+ erts_proc_dec_refc(waiter);
+ }
}
erts_cleanup_offheap(&cmlcp->oh);
erts_free(ERTS_ALC_T_CML_CLEANUP, vcmlcp);
@@ -364,7 +384,8 @@ con_monitor_link_seq_cleanup(void *vcmlcp)
}
static void
-schedule_con_monitor_link_seq_cleanup(ErtsMonLnkDist *dist,
+schedule_con_monitor_link_seq_cleanup(DistEntry* dep,
+ ErtsMonLnkDist *dist,
DistSeqNode *seq,
Eterm nodename,
Eterm visability,
@@ -404,7 +425,16 @@ schedule_con_monitor_link_seq_cleanup(ErtsMonLnkDist *dist,
cmlcp->seq = seq;
- cmlcp->trigger_node_monitors = is_value(nodename);
+ if (is_value(nodename)) {
+ ASSERT(dep);
+ cmlcp->trigger_node_monitors = 1;
+ cmlcp->dep = dep;
+ erts_ref_dist_entry(dep);
+ }
+ else {
+ cmlcp->trigger_node_monitors = 0;
+ cmlcp->dep = NULL;
+ }
cmlcp->nodename = nodename;
cmlcp->visability = visability;
if (rsz == 0)
@@ -693,10 +723,11 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
dep->send = NULL;
erts_set_dist_entry_not_connected(dep);
-
+ dep->pending_nodedown = 1;
erts_de_rwunlock(dep);
- schedule_con_monitor_link_seq_cleanup(mld,
+ schedule_con_monitor_link_seq_cleanup(dep,
+ mld,
sequences,
nodename,
(flags & DFLAG_PUBLISHED
@@ -3850,6 +3881,7 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2)
typedef struct {
DistEntry *dep;
+ int de_locked;
Uint flags;
Uint version;
Eterm setup_pid;
@@ -3944,10 +3976,16 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
goto system_limit; /* Should never happen!!! */
if (is_internal_pid(BIF_ARG_2)) {
+ erts_de_rwlock(dep);
+ de_locked = 1;
+ if (dep->pending_nodedown)
+ goto suspend;
+
if (BIF_P->common.id == BIF_ARG_2) {
ErtsSetupConnDistCtrl scdc;
scdc.dep = dep;
+ scdc.de_locked = 1;
scdc.flags = flags;
scdc.version = version;
scdc.setup_pid = BIF_P->common.id;
@@ -3956,6 +3994,7 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
res = setup_connection_distctrl(BIF_P, &scdc, NULL, NULL);
/* Dec of refc on net_kernel by setup_connection_distctrl() */
net_kernel = NULL;
+ de_locked = 0;
BUMP_REDS(BIF_P, 5);
dep = NULL;
@@ -3968,10 +4007,14 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
else {
ErtsSetupConnDistCtrl *scdcp;
+ erts_de_rwunlock(dep);
+ de_locked = 0;
+
scdcp = erts_alloc(ERTS_ALC_T_SETUP_CONN_ARG,
sizeof(ErtsSetupConnDistCtrl));
scdcp->dep = dep;
+ scdcp->de_locked = 0;
scdcp->flags = flags;
scdcp->version = version;
scdcp->setup_pid = BIF_P->common.id;
@@ -4021,6 +4064,9 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
|| is_not_nil(dep->cid))
goto badarg;
+ if(dep->pending_nodedown)
+ goto suspend;
+
erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);
erts_prtsd_set(pp, ERTS_PRTSD_DIST_ENTRY, dep);
@@ -4084,6 +4130,17 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
return ret;
+ suspend:
+ ASSERT(de_locked);
+ ASSERT(!dep->suspended_nodeup);
+ dep->suspended_nodeup = BIF_P;
+ erts_proc_inc_refc(BIF_P);
+ erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
+ ERTS_BIF_PREP_YIELD4(ret,
+ bif_export[BIF_erts_internal_create_dist_channel_4],
+ BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, BIF_ARG_4);
+ goto done;
+
badarg:
ERTS_BIF_PREP_RET(ret, am_badarg);
goto done;
@@ -4175,7 +4232,6 @@ setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment *
{
ErtsSetupConnDistCtrl *scdcp = (ErtsSetupConnDistCtrl *) arg;
DistEntry *dep = scdcp->dep;
- int dep_locked = 0;
Eterm *hp;
Uint32 conn_id;
int dec_net_kernel_on_error = !0;
@@ -4186,8 +4242,10 @@ setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment *
if (ERTS_PROC_IS_EXITING(c_p))
goto badarg;
- erts_de_rwlock(dep);
- dep_locked = !0;
+ if (!scdcp->de_locked) {
+ erts_de_rwlock(dep);
+ scdcp->de_locked = !0;
+ }
if (dep->state != ERTS_DE_STATE_PENDING)
goto badarg;
@@ -4241,7 +4299,7 @@ badarg:
if (bpp) /* not called directly */
erts_free(ERTS_ALC_T_SETUP_CONN_ARG, arg);
- if (dep_locked)
+ if (scdcp->de_locked)
erts_de_rwunlock(dep);
erts_deref_dist_entry(dep);
@@ -4331,7 +4389,7 @@ Sint erts_abort_pending_connection_rwunlock(DistEntry* dep,
erts_de_rwunlock(dep);
schedule_con_monitor_link_seq_cleanup(
- mld, NULL, THE_NON_VALUE,
+ NULL, mld, NULL, THE_NON_VALUE,
THE_NON_VALUE, THE_NON_VALUE);
if (resume_procs) {
diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c
index b8c36f4ecd..eb2049f565 100644
--- a/erts/emulator/beam/erl_node_tables.c
+++ b/erts/emulator/beam/erl_node_tables.c
@@ -176,6 +176,8 @@ dist_table_alloc(void *dep_tmpl)
erts_atomic_init_nob(&dep->input_handler, (erts_aint_t) NIL);
dep->connection_id = 0;
dep->state = ERTS_DE_STATE_IDLE;
+ dep->pending_nodedown = 0;
+ dep->suspended_nodeup = NULL;
dep->flags = 0;
dep->opts = 0;
dep->version = 0;
@@ -731,6 +733,7 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags)
ASSERT(dep != erts_this_dist_entry);
ASSERT(is_nil(dep->cid));
ASSERT(dep->state == ERTS_DE_STATE_PENDING);
+ ASSERT(!dep->pending_nodedown);
ASSERT(is_internal_port(cid) || is_internal_pid(cid));
if(dep->prev) {
diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h
index 55250b24cb..d8ad6fbeb8 100644
--- a/erts/emulator/beam/erl_node_tables.h
+++ b/erts/emulator/beam/erl_node_tables.h
@@ -147,6 +147,8 @@ struct dist_entry_ {
NIL == free */
Uint32 connection_id; /* Connection id incremented on connect */
enum dist_entry_state state;
+ int pending_nodedown;
+ Process* suspended_nodeup;
Uint32 flags; /* Distribution flags, like hidden,
atom cache etc. */
Uint32 opts;
diff --git a/lib/kernel/test/erl_distribution_SUITE.erl b/lib/kernel/test/erl_distribution_SUITE.erl
index 2b84a68c52..200ef244f2 100644
--- a/lib/kernel/test/erl_distribution_SUITE.erl
+++ b/lib/kernel/test/erl_distribution_SUITE.erl
@@ -41,6 +41,7 @@
monitor_nodes_combinations/1,
monitor_nodes_cleanup/1,
monitor_nodes_many/1,
+ monitor_nodes_down_up/1,
dist_ctrl_proc_smoke/1,
net_kernel_start/1]).
@@ -85,7 +86,8 @@ groups() ->
monitor_nodes_node_type, monitor_nodes_misc,
monitor_nodes_otp_6481, monitor_nodes_errors,
monitor_nodes_combinations, monitor_nodes_cleanup,
- monitor_nodes_many]}].
+ monitor_nodes_many,
+ monitor_nodes_down_up]}].
init_per_suite(Config) ->
Config.
@@ -1465,6 +1467,104 @@ monitor_nodes_many(DCfg, _Config) ->
MonNodeState = monitor_node_state(),
ok.
+%% Test order of messages nodedown and nodeup.
+monitor_nodes_down_up(Config) when is_list(Config) ->
+ [An] = get_nodenames(1, monitor_nodeup),
+ {ok, A} = ct_slave:start(An),
+
+ try
+ monitor_nodes_yoyo(A)
+ after
+ catch ct_slave:stop(A)
+ end.
+
+monitor_nodes_yoyo(A) ->
+ net_kernel:monitor_nodes(true),
+ Papa = self(),
+
+ %% Spawn lots of processes doing one erlang:monitor_node(A,true) each
+ %% just to get lots of other monitors to fire when connection goes down
+ %% and thereby give time for {nodeup,A} to race before {nodedown,A}.
+ NodeMonCnt = 10000,
+ NodeMons = [my_spawn_opt(fun F() ->
+ monitor_node = receive_any(),
+ monitor_node(A, true),
+ Papa ! ready,
+ {nodedown, A} = receive_any(),
+ F()
+ end,
+ [link, monitor, {priority, low}])
+ ||
+ _ <- lists:seq(1, NodeMonCnt)],
+
+ %% Spawn message spamming process to trigger new connection setups
+ %% as quick as possible.
+ Spammer = my_spawn_opt(fun F() ->
+ {dummy, A} ! trigger_auto_connect,
+ F()
+ end,
+ [link, monitor]),
+
+ %% Now bring connection down and verify we get {nodedown,A} before {nodeup,A}.
+ Yoyos = 20,
+ [begin
+ [P ! monitor_node || P <- NodeMons],
+ [receive ready -> ok end || _ <- NodeMons],
+
+ {Owner,_ConnId} = get_node_owner(A),
+ exit(Owner, kill),
+
+ {nodedown, A} = receive_any(),
+ {nodeup, A} = receive_any()
+ end
+ || _ <- lists:seq(1,Yoyos)],
+
+ unlink(Spammer),
+ exit(Spammer, die),
+ receive {'DOWN',_,process,Spammer,_} -> ok end,
+
+ [begin unlink(P), exit(P, die) end || P <- NodeMons],
+ [receive {'DOWN',_,process,P,_} -> ok end || P <- NodeMons],
+
+ net_kernel:monitor_nodes(false),
+ ok.
+
+receive_any() ->
+ receive_any(infinity).
+
+receive_any(Timeout) ->
+ receive
+ M -> M
+ after
+ Timeout -> timeout
+ end.
+
+my_spawn_opt(Fun, Opts) ->
+ case spawn_opt(Fun, Opts) of
+ {Pid, _Mref} -> Pid;
+ Pid -> Pid
+ end.
+
+-record(connection, {
+ node, %% remote node name
+ conn_id, %% Connection identity
+ state, %% pending | up | up_pending
+ owner, %% owner pid
+ pending_owner, %% possible new owner
+ address, %% #net_address
+ waiting = [], %% queued processes
+ type %% normal | hidden
+ }).
+
+get_node_owner(Node) ->
+ case ets:lookup(sys_dist, Node) of
+ [#connection{owner = Owner, conn_id = ConnId}] ->
+ {Owner, ConnId};
+ _ ->
+ error
+ end.
+
+
dist_ctrl_proc_smoke(Config) when is_list(Config) ->
ThisNode = node(),
[Name1, Name2] = get_nodenames(2, dist_ctrl_proc_example_smoke),
--
2.35.3