File 1331-Guarantee-nodedown-before-nodeup-messages.patch of Package erlang

From 2f1456971af90c94d698f94318e4e26a1157c082 Mon Sep 17 00:00:00 2001
From: Sverker Eriksson <sverker@erlang.org>
Date: Fri, 4 Oct 2019 17:24:55 +0200
Subject: [PATCH 1/7] Guarantee nodedown before nodeup messages

---
 erts/emulator/beam/dist.c                  |  76 +++++++++++++--
 erts/emulator/beam/erl_node_tables.c       |   3 +
 erts/emulator/beam/erl_node_tables.h       |   2 +
 lib/kernel/test/erl_distribution_SUITE.erl | 102 ++++++++++++++++++++-
 4 files changed, 173 insertions(+), 10 deletions(-)

diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index f1cd97c3bf..d37211c742 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -279,6 +279,7 @@ typedef enum {
 
 typedef struct {
     ErtsConMonLnkSeqCleanupState state;
+    DistEntry* dep;
     ErtsMonLnkDist *dist;
     DistSeqNode *seq;
     void *yield_state;
@@ -343,11 +344,30 @@ con_monitor_link_seq_cleanup(void *vcmlcp)
         cmlcp->state = ERTS_CML_CLEANUP_STATE_NODE_MONITORS;
     case ERTS_CML_CLEANUP_STATE_NODE_MONITORS:
         if (cmlcp->trigger_node_monitors) {
+            Process* waiter;
             send_nodes_mon_msgs(NULL,
                                 am_nodedown,
                                 cmlcp->nodename,
                                 cmlcp->visability,
                                 cmlcp->reason);
+            erts_de_rwlock(cmlcp->dep);
+            ASSERT(cmlcp->dep->state == ERTS_DE_STATE_IDLE ||
+                   cmlcp->dep->state == ERTS_DE_STATE_PENDING);
+            ASSERT(cmlcp->dep->pending_nodedown);
+            waiter = cmlcp->dep->suspended_nodeup;
+            cmlcp->dep->suspended_nodeup = NULL;
+            cmlcp->dep->pending_nodedown = 0;
+            erts_de_rwunlock(cmlcp->dep);
+            erts_deref_dist_entry(cmlcp->dep);
+
+            if (waiter) {
+                erts_proc_lock(waiter, ERTS_PROC_LOCK_STATUS);
+                if (!ERTS_PROC_IS_EXITING(waiter)) {
+                    erts_resume(waiter, ERTS_PROC_LOCK_STATUS);
+                }
+                erts_proc_unlock(waiter, ERTS_PROC_LOCK_STATUS);
+                erts_proc_dec_refc(waiter);
+            }
         }
         erts_cleanup_offheap(&cmlcp->oh);
         erts_free(ERTS_ALC_T_CML_CLEANUP, vcmlcp);
@@ -364,7 +384,8 @@ con_monitor_link_seq_cleanup(void *vcmlcp)
 }
 
 static void
-schedule_con_monitor_link_seq_cleanup(ErtsMonLnkDist *dist,
+schedule_con_monitor_link_seq_cleanup(DistEntry* dep,
+                                      ErtsMonLnkDist *dist,
                                       DistSeqNode *seq,
                                       Eterm nodename,
                                       Eterm visability,
@@ -404,7 +425,16 @@ schedule_con_monitor_link_seq_cleanup(ErtsMonLnkDist *dist,
 
         cmlcp->seq = seq;
 
-        cmlcp->trigger_node_monitors = is_value(nodename);
+        if (is_value(nodename)) {
+            ASSERT(dep);
+            cmlcp->trigger_node_monitors = 1;
+            cmlcp->dep = dep;
+            erts_ref_dist_entry(dep);
+        }
+        else {
+            cmlcp->trigger_node_monitors = 0;
+            cmlcp->dep = NULL;
+        }
         cmlcp->nodename = nodename;
         cmlcp->visability = visability;
         if (rsz == 0)
@@ -693,10 +723,11 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
         dep->send = NULL;
 
 	erts_set_dist_entry_not_connected(dep);
-
+        dep->pending_nodedown = 1;
 	erts_de_rwunlock(dep);
 
-        schedule_con_monitor_link_seq_cleanup(mld,
+        schedule_con_monitor_link_seq_cleanup(dep,
+                                              mld,
                                               sequences,
                                               nodename,
                                               (flags & DFLAG_PUBLISHED
@@ -3850,6 +3881,7 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2)
 
 typedef struct {
     DistEntry *dep;
+    int de_locked;
     Uint flags;
     Uint version;
     Eterm setup_pid;
@@ -3944,10 +3976,16 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
 	goto system_limit; /* Should never happen!!! */
 
     if (is_internal_pid(BIF_ARG_2)) {
+        erts_de_rwlock(dep);
+        de_locked = 1;
+        if (dep->pending_nodedown)
+            goto suspend;
+
         if (BIF_P->common.id == BIF_ARG_2) {
             ErtsSetupConnDistCtrl scdc;
 
             scdc.dep = dep;
+            scdc.de_locked = 1;
             scdc.flags = flags;
             scdc.version = version;
             scdc.setup_pid = BIF_P->common.id;
@@ -3956,6 +3994,7 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
             res = setup_connection_distctrl(BIF_P, &scdc, NULL, NULL);
             /* Dec of refc on net_kernel by setup_connection_distctrl() */
             net_kernel = NULL;
+            de_locked = 0;
             BUMP_REDS(BIF_P, 5);
             dep = NULL;
 
@@ -3968,10 +4007,14 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
         else {
             ErtsSetupConnDistCtrl *scdcp;
 
+            erts_de_rwunlock(dep);
+            de_locked = 0;
+
             scdcp = erts_alloc(ERTS_ALC_T_SETUP_CONN_ARG,
                                sizeof(ErtsSetupConnDistCtrl));
 
             scdcp->dep = dep;
+            scdcp->de_locked = 0;
             scdcp->flags = flags;
             scdcp->version = version;
             scdcp->setup_pid = BIF_P->common.id;
@@ -4021,6 +4064,9 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
             || is_not_nil(dep->cid))
             goto badarg;
 
+        if(dep->pending_nodedown)
+            goto suspend;
+
         erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);
 
         erts_prtsd_set(pp, ERTS_PRTSD_DIST_ENTRY, dep);
@@ -4084,6 +4130,17 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
 
     return ret;
 
+ suspend:
+     ASSERT(de_locked);
+     ASSERT(!dep->suspended_nodeup);
+     dep->suspended_nodeup = BIF_P;
+     erts_proc_inc_refc(BIF_P);
+     erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
+     ERTS_BIF_PREP_YIELD4(ret,
+                          bif_export[BIF_erts_internal_create_dist_channel_4],
+                          BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, BIF_ARG_4);
+     goto done;
+
  badarg:
     ERTS_BIF_PREP_RET(ret, am_badarg);
     goto done;
@@ -4175,7 +4232,6 @@ setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment *
 {
     ErtsSetupConnDistCtrl *scdcp = (ErtsSetupConnDistCtrl *) arg;
     DistEntry *dep = scdcp->dep;
-    int dep_locked = 0;
     Eterm *hp;
     Uint32 conn_id;
     int dec_net_kernel_on_error = !0;
@@ -4186,8 +4242,10 @@ setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment *
     if (ERTS_PROC_IS_EXITING(c_p))
         goto badarg;
 
-    erts_de_rwlock(dep);
-    dep_locked = !0;
+    if (!scdcp->de_locked) {
+        erts_de_rwlock(dep);
+        scdcp->de_locked = !0;
+    }
 
     if (dep->state != ERTS_DE_STATE_PENDING)
         goto badarg;
@@ -4241,7 +4299,7 @@ badarg:
     if (bpp) /* not called directly */
         erts_free(ERTS_ALC_T_SETUP_CONN_ARG, arg);
 
-    if (dep_locked)
+    if (scdcp->de_locked)
         erts_de_rwunlock(dep);
 
     erts_deref_dist_entry(dep);
@@ -4331,7 +4389,7 @@ Sint erts_abort_pending_connection_rwunlock(DistEntry* dep,
 	erts_de_rwunlock(dep);
 
         schedule_con_monitor_link_seq_cleanup(
-            mld, NULL, THE_NON_VALUE,
+            NULL, mld, NULL, THE_NON_VALUE,
             THE_NON_VALUE, THE_NON_VALUE);
 
         if (resume_procs) {
diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c
index b8c36f4ecd..eb2049f565 100644
--- a/erts/emulator/beam/erl_node_tables.c
+++ b/erts/emulator/beam/erl_node_tables.c
@@ -176,6 +176,8 @@ dist_table_alloc(void *dep_tmpl)
     erts_atomic_init_nob(&dep->input_handler, (erts_aint_t) NIL);
     dep->connection_id			= 0;
     dep->state				= ERTS_DE_STATE_IDLE;
+    dep->pending_nodedown               = 0;
+    dep->suspended_nodeup               = NULL;
     dep->flags				= 0;
     dep->opts                           = 0;
     dep->version			= 0;
@@ -731,6 +733,7 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags)
     ASSERT(dep != erts_this_dist_entry);
     ASSERT(is_nil(dep->cid));
     ASSERT(dep->state == ERTS_DE_STATE_PENDING);
+    ASSERT(!dep->pending_nodedown);
     ASSERT(is_internal_port(cid) || is_internal_pid(cid));
 
     if(dep->prev) {
diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h
index 55250b24cb..d8ad6fbeb8 100644
--- a/erts/emulator/beam/erl_node_tables.h
+++ b/erts/emulator/beam/erl_node_tables.h
@@ -147,6 +147,8 @@ struct dist_entry_ {
                                    NIL == free */
     Uint32 connection_id;	/* Connection id incremented on connect */
     enum dist_entry_state state;
+    int pending_nodedown;
+    Process* suspended_nodeup;
     Uint32 flags;		/* Distribution flags, like hidden, 
 				   atom cache etc. */
     Uint32 opts;
diff --git a/lib/kernel/test/erl_distribution_SUITE.erl b/lib/kernel/test/erl_distribution_SUITE.erl
index 2b84a68c52..200ef244f2 100644
--- a/lib/kernel/test/erl_distribution_SUITE.erl
+++ b/lib/kernel/test/erl_distribution_SUITE.erl
@@ -41,6 +41,7 @@
 	 monitor_nodes_combinations/1,
 	 monitor_nodes_cleanup/1,
 	 monitor_nodes_many/1,
+         monitor_nodes_down_up/1,
          dist_ctrl_proc_smoke/1,
          net_kernel_start/1]).
 
@@ -85,7 +86,8 @@ groups() ->
        monitor_nodes_node_type, monitor_nodes_misc,
        monitor_nodes_otp_6481, monitor_nodes_errors,
        monitor_nodes_combinations, monitor_nodes_cleanup,
-       monitor_nodes_many]}].
+       monitor_nodes_many,
+       monitor_nodes_down_up]}].
 
 init_per_suite(Config) ->
     Config.
@@ -1465,6 +1467,104 @@ monitor_nodes_many(DCfg, _Config) ->
     MonNodeState = monitor_node_state(),
     ok.
 
+%% Test order of messages nodedown and nodeup.
+monitor_nodes_down_up(Config) when is_list(Config) ->
+    [An] = get_nodenames(1, monitor_nodeup),
+    {ok, A} = ct_slave:start(An),
+
+    try
+        monitor_nodes_yoyo(A)
+    after
+        catch ct_slave:stop(A)
+    end.
+
+monitor_nodes_yoyo(A) ->
+    net_kernel:monitor_nodes(true),
+    Papa = self(),
+
+    %% Spawn lots of processes doing one erlang:monitor_node(A,true) each
+    %% just to get lots of other monitors to fire when connection goes down
+    %% and thereby give time for {nodeup,A} to race before {nodedown,A}.
+    NodeMonCnt = 10000,
+    NodeMons = [my_spawn_opt(fun F() ->
+                                     monitor_node = receive_any(),
+                                     monitor_node(A, true),
+                                     Papa ! ready,
+                                     {nodedown, A} =  receive_any(),
+                                     F()
+                             end,
+                             [link, monitor, {priority, low}])
+                ||
+                   _ <- lists:seq(1, NodeMonCnt)],
+
+    %% Spawn message spamming process to trigger new connection setups
+    %% as quick as possible.
+    Spammer = my_spawn_opt(fun F() ->
+                                   {dummy, A} ! trigger_auto_connect,
+                                   F()
+                           end,
+                           [link, monitor]),
+
+    %% Now bring connection down and verify we get {nodedown,A} before {nodeup,A}.
+    Yoyos = 20,
+    [begin
+         [P ! monitor_node || P <- NodeMons],
+         [receive ready -> ok end || _ <- NodeMons],
+
+         {Owner,_ConnId} = get_node_owner(A),
+         exit(Owner, kill),
+
+         {nodedown, A} = receive_any(),
+         {nodeup, A} = receive_any()
+     end
+     || _ <- lists:seq(1,Yoyos)],
+
+    unlink(Spammer),
+    exit(Spammer, die),
+    receive {'DOWN',_,process,Spammer,_} -> ok end,
+
+    [begin unlink(P), exit(P, die) end || P <- NodeMons],
+    [receive {'DOWN',_,process,P,_} -> ok end || P <- NodeMons],
+
+    net_kernel:monitor_nodes(false),
+    ok.
+
+receive_any() ->
+    receive_any(infinity).
+
+receive_any(Timeout) ->
+    receive
+        M -> M
+    after
+        Timeout -> timeout
+    end.
+
+my_spawn_opt(Fun, Opts) ->
+    case spawn_opt(Fun, Opts) of
+        {Pid, _Mref} -> Pid;
+        Pid -> Pid
+    end.
+
+-record(connection, {
+		     node,          %% remote node name
+                     conn_id,       %% Connection identity
+		     state,         %% pending | up | up_pending
+		     owner,         %% owner pid
+	             pending_owner, %% possible new owner
+		     address,       %% #net_address
+		     waiting = [],  %% queued processes
+		     type           %% normal | hidden
+		    }).
+
+get_node_owner(Node) ->
+    case ets:lookup(sys_dist, Node) of
+        [#connection{owner = Owner, conn_id = ConnId}] ->
+            {Owner, ConnId};
+        _ ->
+            error
+    end.
+
+
 dist_ctrl_proc_smoke(Config) when is_list(Config) ->
     ThisNode = node(),
     [Name1, Name2] = get_nodenames(2, dist_ctrl_proc_example_smoke),
-- 
2.35.3

openSUSE Build Service is sponsored by