File 6401-erts-Support-for-truly-asynchronous-distributed-sign.patch of Package erlang

From 349b244f2e63eda1579891de027e96c6dfedfd55 Mon Sep 17 00:00:00 2001
From: Rickard Green <rickard@erlang.org>
Date: Fri, 16 Dec 2022 18:36:39 +0100
Subject: [PATCH] [erts] Support for truly asynchronous distributed signaling

---
 erts/doc/src/erl_cmd.xml                  |  22 +
 erts/doc/src/erlang.xml                   | 124 +++++-
 erts/emulator/beam/atom.names             |   1 +
 erts/emulator/beam/bif.c                  |  16 +-
 erts/emulator/beam/dist.c                 | 471 +++++++++++++++-------
 erts/emulator/beam/dist.h                 |   1 +
 erts/emulator/beam/erl_bif_info.c         |  14 +-
 erts/emulator/beam/erl_init.c             |  26 +-
 erts/emulator/beam/erl_node_tables.c      |   7 +-
 erts/emulator/beam/erl_node_tables.h      |  13 +-
 erts/emulator/beam/erl_process.c          |  19 +
 erts/emulator/beam/erl_process.h          |   7 +-
 erts/emulator/test/distribution_SUITE.erl | 280 ++++++++++++-
 erts/etc/common/erlexec.c                 |   4 +-
 erts/preloaded/ebin/erlang.beam           | Bin 132536 -> 132640 bytes
 erts/preloaded/src/erlang.erl             |  11 +-
 16 files changed, 829 insertions(+), 187 deletions(-)

diff --git a/erts/doc/src/erl_cmd.xml b/erts/doc/src/erl_cmd.xml
index 9f764d2b4e..6c4919413d 100644
--- a/erts/doc/src/erl_cmd.xml
+++ b/erts/doc/src/erl_cmd.xml
@@ -1036,6 +1036,23 @@ $ <input>erl \
         <p>Memory allocator-specific flags. For more information, see
           <seecref marker="erts_alloc"><c>erts_alloc(3)</c></seecref>.</p>
       </item>
+      <tag><marker id="+pad"/><c>+pad true|false</c></tag>
+      <item>
+        <p>Since: OTP 25.3</p>
+        <p>
+          The boolean value used with the <c>+pad</c> parameter determines
+          the default value of the
+          <seeerl marker="erlang#process_flag_async_dist">
+            <c>async_dist</c></seeerl> process flag of newly spawned processes.
+          By default, if no <c>+pad</c> command line option is passed,
+          the <c>async_dist</c> flag will be set to <c>false</c>.
+        </p>
+        <p>
+          The value used in runtime can be inspected by calling
+          <seeerl marker="erlang#system_info_async_dist">
+            <c>erlang:system_info(async_dist)</c></seeerl>.
+        </p>
+      </item>
       <tag><marker id="+pc"/><marker id="printable_character_range"/>
         <c><![CDATA[+pc Range]]></c></tag>
       <item>
@@ -1781,6 +1798,11 @@ $ <input>erl \
               limit is per distribution channel. A higher limit
               gives lower latency and higher throughput at the expense
               of higher memory use.</p>
+            <p>
+              This limit only affects processes that have disabled
+              <seeerl marker="erlang#process_flag_async_dist"><i>fully
+              asynchronous distributed signaling</i></seeerl>.
+            </p>
           </item>
           <tag><marker id="+zdntgc"/><c>+zdntgc time</c></tag>
           <item>
diff --git a/erts/doc/src/erlang.xml b/erts/doc/src/erlang.xml
index 34fb78e323..ceb85415c1 100644
--- a/erts/doc/src/erlang.xml
+++ b/erts/doc/src/erlang.xml
@@ -6154,6 +6154,68 @@ receive_replies(ReqId, N, Acc) ->
 
     <func>
       <name name="process_flag" arity="2" clause_i="1"
+	    anchor="process_flag_async_dist" since="OTP 25.3"/>
+      <fsummary>
+        Enable or disable fully asynchronous distributed signaling
+        for the calling process.
+      </fsummary>
+      <desc>
+        <p>
+          Enable or disable <i>fully asynchronous distributed signaling</i>
+          for the calling process. When disabled, which is the default, the
+          process sending a distributed signal will block in the send
+          operation if the buffer for the distribution channel reach the
+          <seecom marker="erts:erl#+zdbbl">distribution buffer busy
+          limit</seecom>. The process will remain blocked until the buffer
+          shrinks enough. This might in some cases take a substantial amount
+          of time. When <c>async_dist</c> is enabled, send operations of
+          distributed signals will always buffer the signal on the outgoing
+          distribution channel and then immediately return. That is, these
+          send operations will <em>never</em> block the sending process.
+        </p>
+        <note><p>
+          Since no flow control is enforced by the runtime system when
+          <c>async_dist</c> process flag is enabled, you need to make sure
+          that flow control for such data is implemented, or that the amount
+          of such data is known to always be limited. Unlimited signaling with
+          <c>async_dist</c> enabled in the absence of flow control will
+          typically cause the sending runtime system to crash on an out of
+          memory condition.
+        </p></note>
+        <p>
+          Blocking due to disabled <c>async_dist</c> can be monitored by
+          <seemfa marker="#system_monitor/2"><c>erlang:system_montor()</c></seemfa>
+          using the
+          <seeerl marker="#busy_dist_port"><c>busy_dist_port</c></seeerl>
+          option. Only data buffered by processes which (at the time of sending
+          a signal) have disabled <c>async_dist</c> will be counted when
+          determining whether or not an operation should block the caller.
+        </p>
+        <p>
+          The <c>async_dist</c> flag can also be set on a new process when
+          spawning it using the
+          <seemfa marker="#spawn_opt/4"><c>spawn_opt()</c></seemfa> BIF with the
+          option <seeerl marker="#spawn_opt_async_dist"><c>{async_dist,
+          Enable}</c></seeerl>. The default <c>async_dist</c> flag to use on
+          newly spawned processes can be set by passing the command line
+          argument <seecom marker="erl#+pad"><c>+pad
+          &lt;boolean&gt;</c></seecom> when starting the runtime system. If the
+          <c>+pad &lt;boolean&gt;</c> command line argument is not passed, the
+          default value of the <c>async_dist</c> flag will be <c>false</c>.
+        </p>
+        <p>
+          You can inspect the state of the <c>async_dist</c> process flag of a
+          process by calling <seeerl marker="#process_info_async_dist">
+          <c>process_info(Pid, async_dist)</c></seeerl>.
+        </p>
+        <p>
+          Returns the old value of the <c>async_dist</c> flag.
+        </p>
+      </desc>
+    </func>
+
+    <func>
+      <name name="process_flag" arity="2" clause_i="2"
 	    anchor="process_flag_trap_exit" since=""/>
       <fsummary>Set process flag trap_exit for the calling process.</fsummary>
       <desc>
@@ -6171,7 +6233,7 @@ receive_replies(ReqId, N, Acc) ->
     </func>
 
     <func>
-      <name name="process_flag" arity="2" clause_i="2" since=""/>
+      <name name="process_flag" arity="2" clause_i="3" since=""/>
       <fsummary>Set process flag error_handler for the calling process.
       </fsummary>
       <desc>
@@ -6185,7 +6247,7 @@ receive_replies(ReqId, N, Acc) ->
     </func>
 
     <func>
-      <name name="process_flag" arity="2" clause_i="3" since="OTP 24.0"/>
+      <name name="process_flag" arity="2" clause_i="4" since="OTP 24.0"/>
       <fsummary>Set process flag fullsweep_after for the calling process.
       </fsummary>
       <desc>
@@ -6196,7 +6258,7 @@ receive_replies(ReqId, N, Acc) ->
     </func>
 
     <func>
-      <name name="process_flag" arity="2" clause_i="4"
+      <name name="process_flag" arity="2" clause_i="5"
 	    anchor="process_flag_min_heap_size" since=""/>
       <fsummary>Set process flag min_heap_size for the calling process.
       </fsummary>
@@ -6207,7 +6269,7 @@ receive_replies(ReqId, N, Acc) ->
     </func>
 
     <func>
-      <name name="process_flag" arity="2" clause_i="5" since="OTP R13B04"/>
+      <name name="process_flag" arity="2" clause_i="6" since="OTP R13B04"/>
       <fsummary>Set process flag min_bin_vheap_size for the calling process.
       </fsummary>
       <desc>
@@ -6218,7 +6280,7 @@ receive_replies(ReqId, N, Acc) ->
     </func>
 
     <func>
-      <name name="process_flag" arity="2" clause_i="6"
+      <name name="process_flag" arity="2" clause_i="7"
 	    anchor="process_flag_max_heap_size" since="OTP 19.0"/>
       <fsummary>Set process flag max_heap_size for the calling process.
       </fsummary>
@@ -6297,7 +6359,7 @@ receive_replies(ReqId, N, Acc) ->
     </func>
 
     <func>
-      <name name="process_flag" arity="2" clause_i="7"
+      <name name="process_flag" arity="2" clause_i="8"
 	    anchor="process_flag_message_queue_data" since="OTP 19.0"/>
       <fsummary>Set process flag message_queue_data for the calling process.
       </fsummary>
@@ -6339,7 +6401,7 @@ receive_replies(ReqId, N, Acc) ->
     </func>
 
     <func>
-      <name name="process_flag" arity="2" clause_i="8"
+      <name name="process_flag" arity="2" clause_i="9"
 	    anchor="process_flag_priority" since=""/>
       <fsummary>Set process flag priority for the calling process.</fsummary>
       <type name="priority_level"/>
@@ -6412,7 +6474,7 @@ receive_replies(ReqId, N, Acc) ->
     </func>
 
     <func>
-      <name name="process_flag" arity="2" clause_i="9" since=""/>
+      <name name="process_flag" arity="2" clause_i="10" since=""/>
       <fsummary>Set process flag save_calls for the calling process.</fsummary>
       <desc>
         <p><c><anno>N</anno></c> must be an integer in the interval 0..10000.
@@ -6443,7 +6505,7 @@ receive_replies(ReqId, N, Acc) ->
     </func>
 
     <func>
-      <name name="process_flag" arity="2" clause_i="10" since=""/>
+      <name name="process_flag" arity="2" clause_i="11" since=""/>
       <fsummary>Set process flag sensitive for the calling process.</fsummary>
       <desc>
         <p>Sets or clears flag <c>sensitive</c> for the current process.
@@ -6588,6 +6650,18 @@ receive_replies(ReqId, N, Acc) ->
         <p>Valid <c><anno>InfoTuple</anno></c>s with corresponding
           <c><anno>Item</anno></c>s:</p>
         <taglist>
+          <tag>
+            <marker id="process_info_async_dist"/>
+            <c>{async_dist, Enabled}</c>
+          </tag>
+          <item>
+            <p>Since: OTP 25.3</p>
+            <p>
+              Current value of the
+              <seeerl marker="erlang#process_flag_async_dist">
+                <c>async_dist</c></seeerl> process flag.
+            </p>
+          </item>
           <tag><c>{backtrace, <anno>Bin</anno>}</c></tag>
           <item>
             <p>Binary <c><anno>Bin</anno></c> contains the same information
@@ -7831,6 +7905,21 @@ true</pre>
               <c>process_flag(message_queue_data,
               <anno>MQD</anno>)</c></seeerl>.</p>
           </item>
+          <tag>
+            <marker id="spawn_opt_async_dist"/>
+            <c>{async_dist, Enabled}</c>
+          </tag>
+          <item>
+            <p>Since: OTP 25.3</p>
+            <p>
+              Set the
+              <seeerl marker="erlang#process_flag_async_dist">
+                <c>async_dist</c></seeerl> process flag of the spawned process.
+              This option will override the default value set by the command
+              line argument
+              <seecom marker="erl#+pad"><c>+pad &lt;boolean&gt;</c></seecom>.
+            </p>
+          </item>
         </taglist>
       </desc>
     </func>
@@ -10861,6 +10950,8 @@ Metadata = #{ pid => pid(),
     </func>
 
     <func>
+      <name name="system_info" arity="1" clause_i="79"
+	    anchor="system_info_async_dist" since="OTP 25.3"/>  <!-- async_dist -->
       <name name="system_info" arity="1" clause_i="14"
 	    anchor="system_info_dist" since=""/>  <!-- creation -->
       <name name="system_info" arity="1" clause_i="16" since="OTP 18.0"/>  <!-- delayed_node_table_gc -->
@@ -10873,6 +10964,17 @@ Metadata = #{ pid => pid(),
         <p>Returns information about Erlang Distribution in the
         current system as specified by <c><anno>Item</anno></c>:</p>
         <taglist>
+          <tag><marker id="system_info_async_dist"/><c>async_dist</c></tag>
+          <item>
+            <p>Since: OTP 25.3</p>
+            <p>
+              Returns the value of the command line argument
+              <seecom marker="erl#+pad">+pad &lt;boolean&gt;</seecom>
+              which the runtime system use. This value determines the default
+              <seeerl marker="erlang#process_flag_async_dist">
+                <c>async_dist</c></seeerl> value for newly spawned processes.
+            </p>
+          </item>
           <tag><marker id="system_info_creation"/>
             <c>creation</c></tag>
           <item>
@@ -11011,7 +11113,7 @@ Metadata = #{ pid => pid(),
       <!-- <name name="system_info" arity="1" clause_i="76"/>  update_cpu_info -->
       <name name="system_info" arity="1" clause_i="77" since=""/>  <!-- version -->
       <name name="system_info" arity="1" clause_i="78" since=""/>  <!-- wordsize -->
-      <!-- <name name="system_info" arity="1" clause_i="79"/>  overview -->
+      <!-- <name name="system_info" arity="1" clause_i="80"/>  overview -->
       <!--    When adding any entry, make sure to update the overview clause_i -->
       <fsummary>Information about the system.</fsummary>
       <desc>
@@ -11379,7 +11481,7 @@ Metadata = #{ pid => pid(),
               <c><anno>MonitorPid</anno></c>. <c>SusPid</c> is the pid
               that got suspended when sending to <c>Port</c>.</p>
           </item>
-          <tag><c>busy_dist_port</c></tag>
+          <tag><c>busy_dist_port</c><marker id="busy_dist_port"/></tag>
           <item>
             <p>If a process in the system gets suspended because it
               sends to a process on a remote node whose inter-node
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names
index 313b646cdd..5a5bcbceea 100644
--- a/erts/emulator/beam/atom.names
+++ b/erts/emulator/beam/atom.names
@@ -105,6 +105,7 @@ atom arg0
 atom arity
 atom asn1
 atom async
+atom async_dist
 atom asynchronous
 atom atom
 atom atom_used
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c
index 4e6279704b..fdef0682e0 100644
--- a/erts/emulator/beam/bif.c
+++ b/erts/emulator/beam/bif.c
@@ -1777,7 +1777,21 @@ static Eterm process_flag_aux(Process *c_p, int *redsp, Eterm flag, Eterm val)
 BIF_RETTYPE process_flag_2(BIF_ALIST_2)
 {
    Eterm old_value;
-   if (BIF_ARG_1 == am_error_handler) {
+
+   if (BIF_ARG_1 == am_async_dist) {
+       old_value = (BIF_P->flags & F_ASYNC_DIST) ? am_true : am_false;
+       if (BIF_ARG_2 == am_false) {
+           BIF_P->flags &= ~F_ASYNC_DIST;
+       }
+       else if (BIF_ARG_2 == am_true) {
+           BIF_P->flags |= F_ASYNC_DIST;
+       }
+       else {
+           goto error;
+       }
+       BIF_RET(old_value);
+   }
+   else if (BIF_ARG_1 == am_error_handler) {
       if (is_not_atom(BIF_ARG_2)) {
 	 goto error;
       }
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 0789813a9c..5455b47f59 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -211,6 +211,37 @@ struct {
     ErlHeapFragment *bp;
 } nodedown;
 
+/*
+ * Dist entry queue flags are only modified while
+ * the dist entry queue lock is held...
+ */
+static ERTS_INLINE erts_aint32_t
+de_qflags_read(DistEntry *dep)
+{
+    return erts_atomic32_read_nob(&dep->qflgs);
+}
+
+static ERTS_INLINE erts_aint32_t
+de_qflags_read_set(DistEntry *dep, erts_aint32_t set)
+{
+    erts_aint32_t qflgs, new_qflgs;
+    ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock));
+    new_qflgs = qflgs = erts_atomic32_read_nob(&dep->qflgs);
+    new_qflgs |= set;
+    erts_atomic32_set_nob(&dep->qflgs, new_qflgs);
+    return qflgs;
+}
+
+static ERTS_INLINE erts_aint32_t
+de_qflags_read_unset(DistEntry *dep, erts_aint32_t unset)
+{
+    erts_aint32_t qflgs, new_qflgs;
+    ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock));
+    new_qflgs = qflgs = erts_atomic32_read_nob(&dep->qflgs);
+    new_qflgs &= ~unset;
+    erts_atomic32_set_nob(&dep->qflgs, new_qflgs);
+    return qflgs;
+}
 
 static void
 delete_cache(ErtsAtomCache *cache)
@@ -250,7 +281,7 @@ get_suspended_on_de(DistEntry *dep, erts_aint32_t unset_qflgs)
 {
     erts_aint32_t qflgs;
     ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock));
-    qflgs = erts_atomic32_read_band_acqb(&dep->qflgs, ~unset_qflgs);
+    qflgs = de_qflags_read_unset(dep, unset_qflgs);
     qflgs &= ~unset_qflgs;
     if (qflgs & ERTS_DE_QFLG_EXIT) {
 	/* No resume when exit has been scheduled */
@@ -1010,14 +1041,14 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
         }
 
 	if (dep->state == ERTS_DE_STATE_EXITING) {
-	    ASSERT(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT);
+	    ASSERT(de_qflags_read(dep) & ERTS_DE_QFLG_EXIT);
 	}
 	else {
             ASSERT(dep->state == ERTS_DE_STATE_CONNECTED);
 	    dep->state = ERTS_DE_STATE_EXITING;
 	    erts_mtx_lock(&dep->qlock);
-	    ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT));
-	    erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_EXIT);
+	    ASSERT(!(de_qflags_read(dep) & ERTS_DE_QFLG_EXIT));
+	    de_qflags_read_set(dep, ERTS_DE_QFLG_EXIT);
 	    erts_mtx_unlock(&dep->qlock);
 	}
 
@@ -1043,6 +1074,8 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
         suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL);
 
         erts_mtx_unlock(&dep->qlock);
+
+        erts_atomic32_set_relb(&dep->notify, 0);
         erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0);
         dep->send = NULL;
 
@@ -1144,7 +1177,8 @@ void init_dist(void)
 
 static ERTS_INLINE ErtsDistOutputBuf *
 alloc_dist_obufs(byte **extp, TTBEncodeContext *ctx,
-                 Uint data_size, Uint fragments, Uint vlen)
+                 Uint data_size, Uint fragments, Uint vlen,
+                 int ignore_busy)
 {
     int ix;
     ErtsDistOutputBuf *obuf;
@@ -1175,6 +1209,7 @@ alloc_dist_obufs(byte **extp, TTBEncodeContext *ctx,
     erts_refc_add(&bin->intern.refc, fragments - 1, 1);
 
     for (ix = 0; ix < fragments; ix++) {
+        obuf[ix].ignore_busy = ignore_busy;
         obuf[ix].bin = bin;
         obuf[ix].eiov = &ctx->fragment_eiovs[ix];
 #ifdef DEBUG
@@ -1220,6 +1255,81 @@ size_obuf(ErtsDistOutputBuf *obuf)
     return sz;
 }
 
+static ERTS_INLINE void
+get_obuf_sizes(ErtsDistOutputBuf *obuf, Sint *size, Sint *ignore_size)
+{
+    Sint sz = size_obuf(obuf);
+    ASSERT(sz >= 0);
+    *size = sz;
+    *ignore_size = obuf->ignore_busy ? sz : 0;
+}
+
+static ERTS_INLINE void
+add_obuf_sizes(ErtsDistOutputBuf *obuf, Sint *size, Sint *ignore_size)
+{
+    Sint sz, isz;
+    get_obuf_sizes(obuf, &sz, &isz);
+    *size += sz;
+    *ignore_size += isz;
+}
+
+static ERTS_INLINE void
+subtract_obuf_sizes(ErtsDistOutputBuf *obuf, Sint *size, Sint *ignore_size)
+{
+    Sint sz, isz;
+    get_obuf_sizes(obuf, &sz, &isz);
+    *size -= sz;
+    *ignore_size -= isz;
+}
+
+static ERTS_INLINE void
+update_qsizes(DistEntry *dep, int *empty_fillp, Sint *qsizep,
+              Sint add_total_qsize, Sint ignore_qsize)
+{
+    /*
+     * All modifications of the 'total_qsize' and 'qsize' fields are
+     * made while holding the 'qlock', so read/modify/write of each
+     * field does not need to be atomic. Readers without the lock will
+     * still see consistent updates of each 'field'.
+     */
+    erts_aint_t qsize, add_qsize;
+
+    ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock));
+
+    if (empty_fillp)
+        *empty_fillp = 0;
+
+    if (add_total_qsize) {
+        qsize = erts_atomic_read_nob(&dep->total_qsize);
+        qsize += (erts_aint_t) add_total_qsize;
+        if (empty_fillp && qsize == add_total_qsize)
+            *empty_fillp = !0;
+        erts_atomic_set_nob(&dep->total_qsize, (erts_aint_t) qsize);
+    }
+
+    add_qsize = (erts_aint_t) (add_total_qsize - ignore_qsize);
+    if (add_qsize) {
+        qsize = erts_atomic_read_nob(&dep->qsize);
+        qsize += add_qsize;
+        if (qsizep)
+            *qsizep = qsize;
+        erts_atomic_set_nob(&dep->qsize, (erts_aint_t) qsize);
+    }
+    else if (qsizep) {
+        *qsizep = erts_atomic_read_nob(&dep->qsize);
+    }
+
+#ifdef DEBUG
+    {
+        erts_aint_t tqsize = erts_atomic_read_nob(&dep->total_qsize);
+        qsize = erts_atomic_read_nob(&dep->qsize);
+        ASSERT(tqsize >= 0);
+        ASSERT(qsize >= 0);
+        ASSERT(tqsize >= qsize);
+    }
+#endif
+}
+
 static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep)
 {
     ErtsDistOutputBuf *obuf;
@@ -1250,21 +1360,19 @@ static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep)
 
 static void free_de_out_queues(DistEntry* dep, ErtsDistOutputBuf *obuf)
 {
-    Sint obufsize = 0;
+    Sint obufsize = 0, ignore_obufsize = 0;
 
     while (obuf) {
 	ErtsDistOutputBuf *fobuf;
 	fobuf = obuf;
 	obuf = obuf->next;
-	obufsize += size_obuf(fobuf);
+        add_obuf_sizes(fobuf, &obufsize, &ignore_obufsize);
 	free_dist_obuf(fobuf, !0);
     }
 
     if (obufsize) {
 	erts_mtx_lock(&dep->qlock);
-        ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize);
-        erts_atomic_add_nob(&dep->qsize,
-                            (erts_aint_t) -obufsize);
+        update_qsizes(dep, NULL, NULL, -obufsize, -ignore_obufsize);
 	erts_mtx_unlock(&dep->qlock);
     }
 }
@@ -3033,11 +3141,17 @@ retry:
 	goto fail;
     }
 
-    if (no_suspend && proc) {
-	if (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY) {
-	    res = ERTS_DSIG_PREP_WOULD_SUSPEND;
-	    goto fail;
-	}
+    if (!proc || (proc->flags & F_ASYNC_DIST)) {
+        ctx->ignore_busy = !0;
+    }
+    else {
+        ctx->ignore_busy = 0;
+        if (no_suspend) {
+            if (de_qflags_read(dep) & ERTS_DE_QFLG_BUSY) {
+                res = ERTS_DSIG_PREP_WOULD_SUSPEND;
+                goto fail;
+            }
+        }
     }
 
     ctx->c_p = proc;
@@ -3211,7 +3325,8 @@ erts_dsig_send(ErtsDSigSendContext *ctx)
                                          + ((ctx->fragments - 1)
                                             * ERTS_DIST_FRAGMENT_HEADER_SIZE),
                                          ctx->fragments,
-                                         ctx->vlen);
+                                         ctx->vlen,
+                                         ctx->ignore_busy);
             ctx->alloced_fragments = ctx->fragments;
 	    /* Encode internal version of dist header */
             ctx->dhdrp = ctx->extp;
@@ -3362,20 +3477,23 @@ erts_dsig_send(ErtsDSigSendContext *ctx)
                 ctx->fragments = 0;
 	    }
 	    else {
-                Sint qsize = erts_atomic_read_nob(&dep->qsize);
+                Sint qsize = (Sint) erts_atomic_read_nob(&dep->qsize);
                 erts_aint32_t qflgs;
 		ErtsProcList *plp = NULL;
                 Eterm notify_proc = NIL;
                 Sint obsz;
-                int fragments;
+                int fragments, empty_fill;
 
                 /* Calculate how many fragments to send. This depends on
                    the available space in the distr queue and the amount
                    of remaining reductions. */
                 for (fragments = 0, obsz = 0;
-                     fragments < ctx->fragments &&
-                         ((ctx->reds > 0 && (qsize + obsz) < erts_dist_buf_busy_limit) ||
-                          ctx->no_trap || ctx->no_suspend);
+                     (fragments < ctx->fragments
+                      && ((ctx->reds > 0
+                           && (ctx->ignore_busy
+                               || (qsize + obsz < erts_dist_buf_busy_limit)))
+                          || ctx->no_trap
+                          || ctx->no_suspend));
                      fragments++) {
 #ifdef DEBUG
                     int reds = 100;
@@ -3391,33 +3509,27 @@ erts_dsig_send(ErtsDSigSendContext *ctx)
                        (!ctx->no_trap && !ctx->no_suspend));
 
 		erts_mtx_lock(&dep->qlock);
-		qsize = erts_atomic_add_read_mb(&dep->qsize, (erts_aint_t) obsz);
-                ASSERT(qsize >= obsz);
-                qflgs = erts_atomic32_read_nob(&dep->qflgs);
-		if (!(qflgs & ERTS_DE_QFLG_BUSY) && qsize >= erts_dist_buf_busy_limit) {
-		    erts_atomic32_read_bor_relb(&dep->qflgs, ERTS_DE_QFLG_BUSY);
+                update_qsizes(dep, &empty_fill, &qsize, obsz,
+                              ctx->ignore_busy ? obsz : 0);
+                qflgs = de_qflags_read(dep);
+		if (!(qflgs & ERTS_DE_QFLG_BUSY)
+                    && qsize >= erts_dist_buf_busy_limit) {
+		    qflgs = de_qflags_read_set(dep, ERTS_DE_QFLG_BUSY);
                     qflgs |= ERTS_DE_QFLG_BUSY;
                 }
-                if (qsize == obsz && (qflgs & ERTS_DE_QFLG_REQ_INFO)) {
-                    /* Previously empty queue and info requested... */
-                    qflgs = erts_atomic32_read_band_mb(&dep->qflgs,
-                                                       ~ERTS_DE_QFLG_REQ_INFO);
-                    if (qflgs & ERTS_DE_QFLG_REQ_INFO) {
+                if (empty_fill && is_internal_pid(dep->cid)) {
+                    erts_aint32_t notify;
+                    notify = erts_atomic32_xchg_mb(&dep->notify,
+                                                   (erts_aint32_t) 0);
+                    if (notify) {
+                        /*
+                         * Previously empty queue and notification
+                         * requested...
+                         */
                         notify_proc = dep->cid;
                         ASSERT(is_internal_pid(notify_proc));
                     }
-                    /* else: requester will send itself the message... */
-                    qflgs &= ~ERTS_DE_QFLG_REQ_INFO;
                 }
-		if (!ctx->no_suspend && (qflgs & ERTS_DE_QFLG_BUSY)) {
-		    erts_mtx_unlock(&dep->qlock);
-
-		    plp = erts_proclist_create(ctx->c_p);
-
-		    erts_suspend(ctx->c_p, ERTS_PROC_LOCK_MAIN, NULL);
-		    suspended = 1;
-		    erts_mtx_lock(&dep->qlock);
-		}
 
                 ASSERT(fragments < 2
                        || (get_int64(&((char*)ctx->obuf->eiov->iov[1].iov_base)[10])
@@ -3435,30 +3547,41 @@ erts_dsig_send(ErtsDSigSendContext *ctx)
                     ctx->obuf = &ctx->obuf[fragments];
                 }
 
-		if (!ctx->no_suspend) {
-                    qflgs = erts_atomic32_read_nob(&dep->qflgs);
-		    if (!(qflgs & ERTS_DE_QFLG_BUSY)) {
-			if (suspended)
-			    resume = 1; /* was busy when we started, but isn't now */
+		if ((qflgs & ERTS_DE_QFLG_BUSY)
+                    && !ctx->ignore_busy
+                    && !ctx->no_suspend) {
+
+                    erts_mtx_unlock(&dep->qlock);
+
+                    plp = erts_proclist_create(ctx->c_p);
+
+                    erts_suspend(ctx->c_p, ERTS_PROC_LOCK_MAIN, NULL);
+                    suspended = 1;
+
+                    erts_mtx_lock(&dep->qlock);
+
+                    qflgs = de_qflags_read(dep);
+                    if (qflgs & ERTS_DE_QFLG_BUSY) {
+                        /* Enqueue suspended process on dist entry */
+                        ASSERT(plp);
+                        erts_proclist_store_last(&dep->suspended, plp);
+                    }
+                    else {
+                        resume = 1; /* was busy, but isn't now */
     #ifdef USE_VM_PROBES
-			if (resume && DTRACE_ENABLED(dist_port_not_busy)) {
-			    DTRACE_CHARBUF(port_str, 64);
-			    DTRACE_CHARBUF(remote_str, 64);
-
-			    erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
-					  "%T", cid);
-			    erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
-					  "%T", dep->sysname);
-			    DTRACE3(dist_port_not_busy, erts_this_node_sysname,
-				    port_str, remote_str);
-			}
+                        if (resume && DTRACE_ENABLED(dist_port_not_busy)) {
+                            DTRACE_CHARBUF(port_str, 64);
+                            DTRACE_CHARBUF(remote_str, 64);
+
+                            erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
+                                          "%T", cid);
+                            erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
+                                          "%T", dep->sysname);
+                            DTRACE3(dist_port_not_busy, erts_this_node_sysname,
+                                    port_str, remote_str);
+                        }
     #endif
-		    }
-		    else {
-			/* Enqueue suspended process on dist entry */
-			ASSERT(plp);
-			erts_proclist_store_last(&dep->suspended, plp);
-		    }
+                    }
 		}
 
 		erts_mtx_unlock(&dep->qlock);
@@ -3676,13 +3799,64 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf)
    ? ((Sint) 1) \
    : ((((Sint) (SZ)) >> 10) & ((Sint) ERTS_PORT_REDS_MASK__)))
 
+#ifndef DEBUG
+#define ERTS_DBG_CHK_DIST_QSIZE(DEP, PRT)
+#else
+#define ERTS_DBG_CHK_DIST_QSIZE(DEP, PRT)           \
+    dbg_check_dist_qsize((DEP), (PRT))
+
+static void
+dbg_check_dist_qsize(DistEntry *dep, Port *prt)
+{
+    int ix;
+    Sint sz = 0, isz = 0, tqsz, qsz;
+    ErtsDistOutputBuf *qs[2];
+
+    ERTS_LC_ASSERT(dep && erts_lc_mtx_is_locked(&dep->qlock));
+    ASSERT(prt && erts_lc_is_port_locked(prt));
+    ERTS_LC_ASSERT((erts_atomic32_read_nob(&prt->sched.flags)
+                    & ERTS_PTS_FLG_EXIT)
+                   || prt->common.id == dep->cid);
+
+    tqsz = erts_atomic_read_nob(&dep->total_qsize);
+    qsz = erts_atomic_read_nob(&dep->qsize);
+
+    ASSERT(tqsz >= 0);
+    ASSERT(qsz >= 0);
+    ASSERT(tqsz >= qsz);
+
+    qs[0] = dep->out_queue.first;
+    qs[1] = dep->finalized_out_queue.first;
+
+    for (ix = 0; ix < sizeof(qs)/sizeof(qs[0]); ix++) {
+        ErtsDistOutputBuf *obuf = qs[ix];
+        while (obuf) {
+            add_obuf_sizes(obuf, &sz, &isz);
+            obuf = obuf->next;
+        }
+    }
+
+    ASSERT(tqsz == sz);
+    ASSERT(qsz == sz - isz);
+}
+
+#endif
+
 int
 erts_dist_command(Port *prt, int initial_reds)
 {
     Sint reds = initial_reds - ERTS_PORT_REDS_DIST_CMD_START;
     enum dist_entry_state state;
     Uint64 flags;
-    Sint qsize, obufsize = 0;
+    /*
+     * 'obufsize' and 'ignore_obufsize' contains the number of bytes removed
+     * from the queue which will be updated (in dep->total_qsize and
+     * dep->qsize) before we return from this function. Note that
+     * 'obufsize' and 'ignore_obufsize' may be negative if we added to the
+     * queue size. This may occur since finalization of a buffer may increase
+     * buffer size.
+     */
+    Sint qsize, obufsize = 0, ignore_obufsize = 0;
     ErtsDistOutputQueue oq, foq;
     DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
     Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf);
@@ -3718,6 +3892,7 @@ erts_dist_command(Port *prt, int initial_reds)
      */
 
     erts_mtx_lock(&dep->qlock);
+    ERTS_DBG_CHK_DIST_QSIZE(dep, prt);
     oq.first = dep->out_queue.first;
     oq.last = dep->out_queue.last;
     dep->out_queue.first = NULL;
@@ -3729,23 +3904,6 @@ erts_dist_command(Port *prt, int initial_reds)
     dep->finalized_out_queue.first = NULL;
     dep->finalized_out_queue.last = NULL;
 
-#ifdef DEBUG
-    {
-        Uint sz = 0;
-        ErtsDistOutputBuf *curr = oq.first;
-        while (curr) {
-            sz += size_obuf(curr);
-            curr = curr->next;
-        }
-        curr = foq.first;
-        while (curr) {
-            sz += size_obuf(curr);
-            curr = curr->next;
-        }
-        ASSERT(sz <= erts_atomic_read_nob(&dep->qsize));
-    }
-#endif
-
     sched_flags = erts_atomic32_read_nob(&prt->sched.flags);
 
     if (reds < 0)
@@ -3756,7 +3914,7 @@ erts_dist_command(Port *prt, int initial_reds)
 	do {
             Uint size;
             ErtsDistOutputBuf *fob;
-	    obufsize += size_obuf(foq.first);
+            add_obuf_sizes(foq.first, &obufsize, &ignore_obufsize);
             size = (*send)(prt, foq.first);
             erts_atomic64_inc_nob(&dep->out);
             esdp->io.out += (Uint64) size;
@@ -3784,9 +3942,9 @@ erts_dist_command(Port *prt, int initial_reds)
 	    ob = oq.first;
 	    ASSERT(ob);
 	    do {
-                obufsize += size_obuf(ob);
+                add_obuf_sizes(ob, &obufsize, &ignore_obufsize);
 		reds = erts_encode_ext_dist_header_finalize(ob, dep, flags, reds);
-                obufsize -= size_obuf(ob);
+                subtract_obuf_sizes(ob, &obufsize, &ignore_obufsize);
                 if (reds < 0)
                     break; /* finalize needs to be restarted... */
                 last_finalized  = ob;
@@ -3824,12 +3982,11 @@ erts_dist_command(Port *prt, int initial_reds)
 	int preempt = 0;
 	while (oq.first && !preempt) {
 	    ErtsDistOutputBuf *fob;
-	    Uint size, obsz;
-            obufsize += size_obuf(oq.first);
+	    Uint size;
+            add_obuf_sizes(oq.first, &obufsize, &ignore_obufsize);
             reds = erts_encode_ext_dist_header_finalize(oq.first, dep, flags, reds);
-            obsz = size_obuf(oq.first);
-            obufsize -= obsz;
             if (reds < 0) { /* finalize needs to be restarted... */
+                subtract_obuf_sizes(oq.first, &obufsize, &ignore_obufsize);
                 preempt = 1;
                 break;
             }
@@ -3838,7 +3995,6 @@ erts_dist_command(Port *prt, int initial_reds)
 	    esdp->io.out += (Uint64) size;
 	    reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size);
 	    fob = oq.first;
-	    obufsize += obsz;
 	    oq.first = oq.first->next;
 	    free_dist_obuf(fob, !0);
 	    sched_flags = erts_atomic32_read_nob(&prt->sched.flags);
@@ -3869,13 +4025,12 @@ erts_dist_command(Port *prt, int initial_reds)
 	 * processes.
 	 */
 	erts_mtx_lock(&dep->qlock);
-        de_busy = !!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_BUSY);
-        qsize = (Sint) erts_atomic_add_read_nob(&dep->qsize,
-                                                (erts_aint_t) -obufsize);
-	ASSERT(qsize >= 0);
-	obufsize = 0;
+        de_busy = !!(de_qflags_read(dep) & ERTS_DE_QFLG_BUSY);
+        update_qsizes(dep, NULL, &qsize, -obufsize, -ignore_obufsize);
+	obufsize = ignore_obufsize = 0;
 	if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT)
-	    && de_busy && qsize < erts_dist_buf_busy_limit) {
+	    && de_busy
+            && qsize < erts_dist_buf_busy_limit) {
 	    int resumed;
 	    ErtsProcList *suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
 	    erts_mtx_unlock(&dep->qlock);
@@ -3891,17 +4046,7 @@ erts_dist_command(Port *prt, int initial_reds)
 
  done:
 
-    if (obufsize != 0) {
-	erts_mtx_lock(&dep->qlock);
-#ifdef DEBUG
-        qsize = (Sint) erts_atomic_add_read_nob(&dep->qsize,
-                                                (erts_aint_t) -obufsize);
-	ASSERT(qsize >= 0);
-#else
-        erts_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize);
-#endif
-	erts_mtx_unlock(&dep->qlock);
-    }
+    ASSERT(!ignore_obufsize || obufsize);
 
     ASSERT(!!foq.first == !!foq.last);
     ASSERT(!dep->finalized_out_queue.first);
@@ -3912,7 +4057,21 @@ erts_dist_command(Port *prt, int initial_reds)
 	dep->finalized_out_queue.last = foq.last;
     }
 
-     /* Avoid wrapping reduction counter... */
+    if (obufsize != 0) {
+	erts_mtx_lock(&dep->qlock);
+        update_qsizes(dep, NULL, NULL, -obufsize, -ignore_obufsize);
+        ERTS_DBG_CHK_DIST_QSIZE(dep, prt);
+	erts_mtx_unlock(&dep->qlock);
+    }
+#ifdef DEBUG
+    else {
+        erts_mtx_lock(&dep->qlock);
+        ERTS_DBG_CHK_DIST_QSIZE(dep, prt);
+	erts_mtx_unlock(&dep->qlock);
+    }
+#endif
+
+    /* Avoid wrapping reduction counter... */
     if (reds < INT_MIN/2)
 	reds = INT_MIN/2;
 
@@ -3942,7 +4101,7 @@ erts_dist_command(Port *prt, int initial_reds)
 	while (oq.first) {
 	    ErtsDistOutputBuf *fob = oq.first;
 	    oq.first = oq.first->next;
-	    obufsize += size_obuf(fob);
+            add_obuf_sizes(fob, &obufsize, &ignore_obufsize);
 	    free_dist_obuf(fob, !0);
 	}
 
@@ -3951,14 +4110,15 @@ erts_dist_command(Port *prt, int initial_reds)
     }
     else {
 	if (oq.first) {
+	    erts_mtx_lock(&dep->qlock);
+            update_qsizes(dep, NULL, NULL, -obufsize, -ignore_obufsize);
+	    obufsize = ignore_obufsize = 0;
+
 	    /*
-	     * Unhandle buffers need to be put back first
+	     * Unhandled buffers need to be put back first
 	     * in out_queue.
 	     */
-	    erts_mtx_lock(&dep->qlock);
-	    erts_atomic_add_nob(&dep->qsize, -obufsize);
-	    obufsize = 0;
-	    oq.last->next = dep->out_queue.first;
+            oq.last->next = dep->out_queue.first;
 	    dep->out_queue.first = oq.first;
 	    if (!dep->out_queue.last)
 		dep->out_queue.last = oq.last;
@@ -3974,7 +4134,7 @@ BIF_RETTYPE
 dist_ctrl_get_data_notification_1(BIF_ALIST_1)
 {
     DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P);
-    erts_aint32_t qflgs;
+    erts_aint32_t notify;
     erts_aint_t qsize;
     Eterm receiver = NIL;
     Uint32 conn_id;
@@ -3987,7 +4147,7 @@ dist_ctrl_get_data_notification_1(BIF_ALIST_1)
 
     /*
      * Caller is the only one that can consume from this queue
-     * and the only one that can set the req-info flag...
+     * and the only one that can set the notify field...
      */
 
     erts_de_rlock(dep);
@@ -3999,23 +4159,21 @@ dist_ctrl_get_data_notification_1(BIF_ALIST_1)
 
     ASSERT(dep->cid == BIF_P->common.id);
 
-    qflgs = erts_atomic32_read_nob(&dep->qflgs);
+    notify = erts_atomic32_read_nob(&dep->notify);
 
-    if (!(qflgs & ERTS_DE_QFLG_REQ_INFO)) {
+    if (!notify) {
         ERTS_THR_READ_MEMORY_BARRIER;
-        qsize = erts_atomic_read_nob(&dep->qsize);
+        qsize = erts_atomic_read_nob(&dep->total_qsize);
         ASSERT(qsize >= 0);
         if (qsize > 0)
             receiver = BIF_P->common.id; /* Notify ourselves... */
-        else { /* Empty queue; set req-info flag... */
-            qflgs = erts_atomic32_read_bor_mb(&dep->qflgs,
-                                                  ERTS_DE_QFLG_REQ_INFO);
-            qsize = erts_atomic_read_nob(&dep->qsize);
+        else { /* Empty queue; set the notify field... */
+            notify = erts_atomic32_xchg_mb(&dep->notify, (erts_aint32_t) !0);
+            qsize = erts_atomic_read_nob(&dep->total_qsize);
             ASSERT(qsize >= 0);
             if (qsize > 0) {
-                qflgs = erts_atomic32_read_band_mb(&dep->qflgs,
-                                                       ~ERTS_DE_QFLG_REQ_INFO);
-                if (qflgs & ERTS_DE_QFLG_REQ_INFO)
+                notify = erts_atomic32_xchg_mb(&dep->notify, (erts_aint32_t) 0);
+                if (notify)
                     receiver = BIF_P->common.id; /* Notify ourselves... */
                 /* else: someone else will notify us... */
             }
@@ -4203,7 +4361,7 @@ dist_get_stat_1(BIF_ALIST_1)
     }
     read = (Sint64) erts_atomic64_read_nob(&dep->in);
     write = (Sint64) erts_atomic64_read_nob(&dep->out);
-    pend = (Sint64) erts_atomic_read_nob(&dep->qsize);
+    pend = (Sint64) erts_atomic_read_nob(&dep->total_qsize);
 
     erts_de_runlock(dep);
 
@@ -4258,10 +4416,19 @@ dist_ctrl_get_data_1(BIF_ALIST_1)
 {
     DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P);
     const Sint initial_reds = ERTS_BIF_REDS_LEFT(BIF_P);
-    Sint reds = initial_reds, obufsize = 0, ix, vlen;
+    Sint reds = initial_reds, ix, vlen;
+    /*
+     * 'obufsize' and 'ignore_obufsize' contains the number of bytes removed
+     * from the queue which will be updated (in dep->total_qsize and
+     * dep->qsize) before we return from this function. Note that
+     * 'obufsize' and 'ignore_obufsize' may be negative if we added to the
+     * queue size. This may occur since finalization of a buffer may increase
+     * buffer size.
+     */
+    Sint obufsize = 0, ignore_obufsize = 0;
     ErtsDistOutputBuf *obuf;
     Eterm *hp, res;
-    erts_aint_t qsize;
+    Sint qsize;
     Uint32 conn_id, get_size;
     Uint hsz = 0, data_sz;
     SysIOVec *iov;
@@ -4300,7 +4467,7 @@ dist_ctrl_get_data_1(BIF_ALIST_1)
     {
         if (!dep->tmp_out_queue.first) {
             ASSERT(!dep->tmp_out_queue.last);
-            qsize = erts_atomic_read_acqb(&dep->qsize);
+            qsize = (Sint) erts_atomic_read_acqb(&dep->total_qsize);
             if (qsize > 0) {
                 erts_mtx_lock(&dep->qlock);
                 dep->tmp_out_queue.first = dep->out_queue.first;
@@ -4319,13 +4486,16 @@ dist_ctrl_get_data_1(BIF_ALIST_1)
         }
 
         obuf = dep->tmp_out_queue.first;
-        obufsize += size_obuf(obuf);
+        add_obuf_sizes(obuf, &obufsize, &ignore_obufsize);
         reds = erts_encode_ext_dist_header_finalize(obuf, dep, dep->dflags, reds);
-        obufsize -= size_obuf(obuf);
+        subtract_obuf_sizes(obuf, &obufsize, &ignore_obufsize);
         if (reds < 0) { /* finalize needs to be restarted... */
             erts_de_runlock(dep);
-            if (obufsize)
-                erts_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize);
+            if (obufsize) {
+                erts_mtx_lock(&dep->qlock);
+                update_qsizes(dep, NULL, NULL, -obufsize, -ignore_obufsize);
+                erts_mtx_unlock(&dep->qlock);
+            }
             ERTS_BIF_YIELD1(BIF_TRAP_EXPORT(BIF_dist_ctrl_get_data_1),
                             BIF_P, BIF_ARG_1);
         }
@@ -4406,16 +4576,18 @@ dist_ctrl_get_data_1(BIF_ALIST_1)
         hp += 2;
     }
 
-    obufsize += size_obuf(obuf);
+    add_obuf_sizes(obuf, &obufsize, &ignore_obufsize);
 
-    qsize = erts_atomic_add_read_nob(&dep->qsize, (erts_aint_t) -obufsize);
+    erts_mtx_lock(&dep->qlock);
 
-    ASSERT(qsize >= 0);
+    update_qsizes(dep, NULL, &qsize, -obufsize, -ignore_obufsize);
 
-    if (qsize < erts_dist_buf_busy_limit/2
-        && (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY)) {
+    if (qsize >= erts_dist_buf_busy_limit/2
+        || !(de_qflags_read(dep) & ERTS_DE_QFLG_BUSY)) {
+        erts_mtx_unlock(&dep->qlock);
+    }
+    else {
         ErtsProcList *resume_procs = NULL;
-        erts_mtx_lock(&dep->qlock);
         resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
         erts_mtx_unlock(&dep->qlock);
         if (resume_procs) {
@@ -4470,8 +4642,8 @@ static void kill_connection(DistEntry *dep)
 
     dep->state = ERTS_DE_STATE_EXITING;
     erts_mtx_lock(&dep->qlock);
-    ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT));
-    erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT);
+    ASSERT(!(de_qflags_read(dep) & ERTS_DE_QFLG_EXIT));
+    de_qflags_read_set(dep, ERTS_DE_QFLG_EXIT);
     erts_mtx_unlock(&dep->qlock);
 
     if (is_internal_port(dep->cid))
@@ -5071,7 +5243,6 @@ setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep,
                                   Process *net_kernel)
 {
     Eterm notify_proc = NIL;
-    erts_aint32_t qflgs;
     ErtsProcLocks nk_locks;
     int success = 0;
 
@@ -5107,17 +5278,18 @@ setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep,
     erts_set_dist_entry_connected(dep, ctrlr, flags);
 
     notify_proc = NIL;
-    if (erts_atomic_read_nob(&dep->qsize)) {
+    if (erts_atomic_read_nob(&dep->total_qsize)) {
         if (is_internal_port(dep->cid)) {
             erts_schedule_dist_command(NULL, dep);
         }
         else {
+            erts_aint32_t notify;
             ERTS_THR_READ_MEMORY_BARRIER;
-            qflgs = erts_atomic32_read_nob(&dep->qflgs);
-            if (qflgs & ERTS_DE_QFLG_REQ_INFO) {
-                qflgs = erts_atomic32_read_band_mb(&dep->qflgs,
-                                                   ~ERTS_DE_QFLG_REQ_INFO);
-                if (qflgs & ERTS_DE_QFLG_REQ_INFO) {
+            notify = erts_atomic32_read_nob(&dep->notify);
+            if (notify) {
+                notify = erts_atomic32_xchg_mb(&dep->notify,
+                                               (erts_aint32_t) 0);
+                if (notify) {
                     notify_proc = dep->cid;
                     ASSERT(is_internal_pid(notify_proc));
                 }
@@ -5337,6 +5509,7 @@ Sint erts_abort_pending_connection_rwunlock(DistEntry* dep,
         ASSERT(!dep->finalized_out_queue.first);
         resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL);
 	erts_mtx_unlock(&dep->qlock);
+        erts_atomic32_set_relb(&dep->notify, 0);
 	erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0);
 	dep->send = NULL;
 
diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h
index 9d10bbcd36..7dabdd4961 100644
--- a/erts/emulator/beam/dist.h
+++ b/erts/emulator/beam/dist.h
@@ -342,6 +342,7 @@ typedef struct erts_dsig_send_context {
     int connect;
     int no_suspend;
     int no_trap;
+    int ignore_busy;
 
     Eterm ctl;
     Eterm msg;
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c
index 6ae5f3ce57..8f8ff989d5 100644
--- a/erts/emulator/beam/erl_bif_info.c
+++ b/erts/emulator/beam/erl_bif_info.c
@@ -777,6 +777,7 @@ collect_one_suspend_monitor(ErtsMonitor *mon, void *vsmicp, Sint reds)
 #define ERTS_PI_IX_MAGIC_REF                            34
 #define ERTS_PI_IX_FULLSWEEP_AFTER                      35
 #define ERTS_PI_IX_PARENT                               36
+#define ERTS_PI_IX_ASYNC_DIST                           37
 
 #define ERTS_PI_FLAG_SINGELTON                          (1 << 0)
 #define ERTS_PI_FLAG_ALWAYS_WRAP                        (1 << 1)
@@ -833,7 +834,8 @@ static ErtsProcessInfoArgs pi_args[] = {
     {am_garbage_collection_info, ERTS_PROCESS_GC_INFO_MAX_SIZE, 0, ERTS_PROC_LOCK_MAIN},
     {am_magic_ref, 0, ERTS_PI_FLAG_FORCE_SIG_SEND, ERTS_PROC_LOCK_MAIN},
     {am_fullsweep_after, 0, 0, ERTS_PROC_LOCK_MAIN},
-    {am_parent, 0, 0, ERTS_PROC_LOCK_MAIN}
+    {am_parent, 0, 0, ERTS_PROC_LOCK_MAIN},
+    {am_async_dist, 0, 0, ERTS_PROC_LOCK_MAIN}
 };
 
 #define ERTS_PI_ARGS ((int) (sizeof(pi_args)/sizeof(pi_args[0])))
@@ -954,6 +956,8 @@ pi_arg2ix(Eterm arg)
         return ERTS_PI_IX_FULLSWEEP_AFTER;
     case am_parent:
         return ERTS_PI_IX_PARENT;
+    case am_async_dist:
+        return ERTS_PI_IX_ASYNC_DIST;
     default:
         return -1;
     }
@@ -2125,6 +2129,10 @@ process_info_aux(Process *c_p,
         }
         break;
 
+    case ERTS_PI_IX_ASYNC_DIST:
+        res = (rp->flags & F_ASYNC_DIST) ? am_true : am_false;
+        break;
+
     case ERTS_PI_IX_MAGIC_REF: {
 	Uint sz = 0;
 	(void) bld_magic_ref_bin_list(NULL, &sz, &MSO(rp));
@@ -2781,6 +2789,10 @@ BIF_RETTYPE system_info_1(BIF_ALIST_1)
 	res = new_binary(BIF_P, (byte *) dsbufp->str, dsbufp->str_len);
 	erts_destroy_info_dsbuf(dsbufp);
 	BIF_RET(res);
+    } else if (am_async_dist == BIF_ARG_1) {
+        BIF_RET((erts_default_spo_flags & SPO_ASYNC_DIST)
+                ? am_true
+                : am_false);
     } else if (ERTS_IS_ATOM_STR("dist_ctrl", BIF_ARG_1)) {
 	DistEntry *dep;
 	i = 0;
diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c
index 95e41226fc..ab3823d51a 100644
--- a/erts/emulator/beam/erl_init.c
+++ b/erts/emulator/beam/erl_init.c
@@ -663,6 +663,7 @@ void erts_usage(void)
     erts_fprintf(stderr, "\n");
 
     erts_fprintf(stderr, "-pc <set>      control what characters are considered printable (default latin1)\n");
+    erts_fprintf(stderr, "-pad bool      set default process async data (default false)\n");
     erts_fprintf(stderr, "-P number      set maximum number of processes on this node;\n");
     erts_fprintf(stderr, "               valid range is [%d-%d]\n",
 		 ERTS_MIN_PROCESSES, ERTS_MAX_PROCESSES);
@@ -1417,11 +1418,28 @@ erl_start(int argc, char **argv)
 		    erts_usage();
 		}
 		erts_set_printable_characters(printable_chars);
-		break;
-	    } else {
-		erts_fprintf(stderr, "%s unknown flag %s\n", argv[0], argv[i]);
-		erts_usage();
 	    }
+            else {
+                char *sub_param = argv[i]+2;
+                if (has_prefix("ad", sub_param)) {
+                    arg = get_arg(sub_param+2, argv[i+1], &i);
+                    if (sys_strcmp("true", arg) == 0) {
+                        erts_default_spo_flags |= SPO_ASYNC_DIST;
+                    }
+                    else if (sys_strcmp("false", arg) == 0) {
+                        erts_default_spo_flags &= ~SPO_ASYNC_DIST;
+                    }
+                    else {
+                        erts_fprintf(stderr, "bad async dist value %s\n", arg);
+                        erts_usage();
+                    }
+                }
+                else {
+                    erts_fprintf(stderr, "%s unknown flag %s\n", argv[0], argv[i]);
+                    erts_usage();
+                }
+            }
+            break;
 	case 'f':
 	    if (!sys_strncmp(argv[i],"-fn",3)) {
 		int warning_type =  ERL_FILENAME_WARNING_WARNING;
diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c
index 23a19064b1..2bec8ff20e 100644
--- a/erts/emulator/beam/erl_node_tables.c
+++ b/erts/emulator/beam/erl_node_tables.c
@@ -191,6 +191,8 @@ dist_table_alloc(void *dep_tmpl)
     erts_mtx_init(&dep->qlock, "dist_entry_out_queue", sysname,
         ERTS_LOCK_FLAGS_CATEGORY_DISTRIBUTION);
     erts_atomic32_init_nob(&dep->qflgs, 0);
+    erts_atomic32_init_nob(&dep->notify, 0);
+    erts_atomic_init_nob(&dep->total_qsize, 0);
     erts_atomic_init_nob(&dep->qsize, 0);
     erts_atomic64_init_nob(&dep->in, 0);
     erts_atomic64_init_nob(&dep->out, 0);
@@ -729,8 +731,6 @@ erts_set_dist_entry_pending(DistEntry *dep)
 void
 erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint64 flags)
 {
-    erts_aint32_t set_qflgs;
-
     ASSERT(dep->mld);
 
     ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep));
@@ -767,9 +767,6 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint64 flags)
 
     erts_atomic64_set_nob(&dep->in, 0);
     erts_atomic64_set_nob(&dep->out, 0);
-    set_qflgs = (is_internal_port(cid) ?
-                 ERTS_DE_QFLG_PORT_CTRL : ERTS_DE_QFLG_PROC_CTRL);
-    erts_atomic32_read_bor_nob(&dep->qflgs, set_qflgs);
 
     if(flags & DFLAG_PUBLISHED) {
 	dep->next = erts_visible_dist_entries;
diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h
index f8c448de3c..9109c50e1d 100644
--- a/erts/emulator/beam/erl_node_tables.h
+++ b/erts/emulator/beam/erl_node_tables.h
@@ -77,15 +77,9 @@ enum dist_entry_state {
 
 #define ERTS_DE_QFLG_BUSY			(((erts_aint32_t) 1) <<  0)
 #define ERTS_DE_QFLG_EXIT			(((erts_aint32_t) 1) <<  1)
-#define ERTS_DE_QFLG_REQ_INFO			(((erts_aint32_t) 1) <<  2)
-#define ERTS_DE_QFLG_PORT_CTRL                  (((erts_aint32_t) 1) <<  3)
-#define ERTS_DE_QFLG_PROC_CTRL                  (((erts_aint32_t) 1) <<  4)
 
 #define ERTS_DE_QFLGS_ALL			(ERTS_DE_QFLG_BUSY \
-						 | ERTS_DE_QFLG_EXIT \
-                                                 | ERTS_DE_QFLG_REQ_INFO \
-                                                 | ERTS_DE_QFLG_PORT_CTRL \
-                                                 | ERTS_DE_QFLG_PROC_CTRL)
+						 | ERTS_DE_QFLG_EXIT)
 
 #if defined(ARCH_64)
 #define ERTS_DIST_OUTPUT_BUF_DBG_PATTERN ((Uint) 0xf713f713f713f713UL)
@@ -105,6 +99,7 @@ struct ErtsDistOutputBuf_ {
      * iov[2 ... vsize-1] data
      */
     ErlIOVec *eiov;
+    int ignore_busy;
 };
 
 struct ErtsDistOutputBufsContainer_ {
@@ -153,7 +148,9 @@ struct dist_entry_ {
 
     erts_mtx_t qlock;           /* Protects qflgs and out_queue */
     erts_atomic32_t qflgs;
-    erts_atomic_t qsize;
+    erts_atomic32_t notify;     /* User wants queue notification? */
+    erts_atomic_t qsize;        /* Size of data in queue respecting busy dist */
+    erts_atomic_t total_qsize;  /* Total size of data in queue */
     erts_atomic64_t in;
     erts_atomic64_t out;
     ErtsDistOutputQueue out_queue;
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 3e56d6c62b..15d4be3755 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -12030,6 +12030,22 @@ erts_parse_spawn_opts(ErlSpawnOpts *sop, Eterm opts_list, Eterm *tag,
 		    sop->priority = PRIORITY_LOW;
 		else
                     result = -1;
+            } else if (arg == am_async_dist) {
+                if (val == am_true) {
+                    if (sop->flags & SPO_ASYNC_DIST)
+                        sop->multi_set = !0;
+                    else
+                        sop->flags |= SPO_ASYNC_DIST;
+                }
+                else if (val == am_false) {
+                    if (!(sop->flags & SPO_ASYNC_DIST))
+                        sop->multi_set = !0;
+                    else
+                        sop->flags &= ~SPO_ASYNC_DIST;
+                }
+                else {
+                    result = -1;
+                }
 	    } else if (arg == am_message_queue_data) {
                 if (sop->flags & (SPO_OFF_HEAP_MSGQ|SPO_ON_HEAP_MSGQ))
                     sop->multi_set = !0;
@@ -12267,6 +12283,9 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
     /* Reserve place for continuation pointer, redzone, etc */
     heap_need = arg_size + S_RESERVED;
 
+    if (so->flags & SPO_ASYNC_DIST)
+        flags |= F_ASYNC_DIST;
+
     p->flags = flags;
     p->sig_qs.flags = qs_flags;
 
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index 949727d950..ac70260d0d 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -1410,8 +1410,9 @@ void erts_check_for_holes(Process* p);
 #define SPO_IX_ASYNC            11
 #define SPO_IX_NO_SMSG          12
 #define SPO_IX_NO_EMSG          13
+#define SPO_IX_ASYNC_DIST       14
 
-#define SPO_NO_INDICES          (SPO_IX_ASYNC+1)
+#define SPO_NO_INDICES          (SPO_IX_ASYNC_DIST+1)
 
 #define SPO_LINK                (1 << SPO_IX_LINK)
 #define SPO_MONITOR             (1 << SPO_IX_MONITOR)
@@ -1427,8 +1428,9 @@ void erts_check_for_holes(Process* p);
 #define SPO_ASYNC               (1 << SPO_IX_ASYNC)
 #define SPO_NO_SMSG             (1 << SPO_IX_NO_SMSG)
 #define SPO_NO_EMSG             (1 << SPO_IX_NO_EMSG)
+#define SPO_ASYNC_DIST          (1 << SPO_IX_ASYNC_DIST)
 
-#define SPO_MAX_FLAG            SPO_NO_EMSG
+#define SPO_MAX_FLAG            SPO_ASYNC_DIST
 
 #define SPO_USE_ARGS                 \
     (SPO_MIN_HEAP_SIZE               \
@@ -1570,6 +1572,7 @@ extern int erts_system_profile_ts_type;
 #define F_FRAGMENTED_SEND    (1 << 23) /* Process is doing a distributed fragmented send */
 #define F_DBG_FORCED_TRAP    (1 << 24) /* DEBUG: Last BIF call was a forced trap */
 #define F_DIRTY_CHECK_CLA    (1 << 25) /* Check if copy literal area GC scheduled */
+#define F_ASYNC_DIST         (1 << 26) /* Truly asynchronous distribution */
 
 /* Signal queue flags */
 #define FS_OFF_HEAP_MSGQ       (1 << 0) /* Off heap msg queue */
diff --git a/erts/emulator/test/distribution_SUITE.erl b/erts/emulator/test/distribution_SUITE.erl
index edab0be20b..c1a31c6abf 100644
--- a/erts/emulator/test/distribution_SUITE.erl
+++ b/erts/emulator/test/distribution_SUITE.erl
@@ -81,6 +81,9 @@
          hopefull_data_encoding/1,
          hopefull_export_fun_bug/1,
          huge_iovec/1,
+         async_dist_flag/1,
+         async_dist_port_dctrlr/1,
+         async_dist_proc_dctrlr/1,
          creation_selection/1,
          creation_selection_test/1]).
 
@@ -114,7 +117,8 @@ all() ->
      dist_entry_refc_race,
      start_epmd_false, no_epmd, epmd_module, system_limit,
      hopefull_data_encoding, hopefull_export_fun_bug,
-     huge_iovec, creation_selection].
+     huge_iovec,
+     {group, async_dist}, creation_selection].
 
 groups() ->
     [{bulk_send, [], [bulk_send_small, bulk_send_big, bulk_send_bigbig]},
@@ -133,7 +137,11 @@ groups() ->
       [message_latency_large_message,
        message_latency_large_link_exit,
        message_latency_large_monitor_exit,
-       message_latency_large_exit2]}
+       message_latency_large_exit2]},
+     {async_dist, [],
+      [async_dist_flag,
+       async_dist_port_dctrlr,
+       async_dist_proc_dctrlr]}
     ].
 
 init_per_suite(Config) ->
@@ -3088,6 +3096,271 @@ derr_sender(Main, Nodes) ->
     Main ! count,
     derr_sender(Main, Nodes).
 
+async_dist_flag(Config) when is_list(Config) ->
+    {ok, Peer1, Node1} = ?CT_PEER(),
+    async_dist_flag_test(Node1, false),
+    peer:stop(Peer1),
+    {ok, Peer2, Node2} = ?CT_PEER(["+pad", "false"]),
+    async_dist_flag_test(Node2, false),
+    peer:stop(Peer2),
+    {ok, Peer3, Node3} = ?CT_PEER(["+pad", "true", "+pad", "false"]),
+    async_dist_flag_test(Node3, false),
+    peer:stop(Peer3),
+
+    {ok, Peer4, Node4} = ?CT_PEER(["+pad", "true"]),
+    async_dist_flag_test(Node4, true),
+    peer:stop(Peer4),
+    {ok, Peer5, Node5} = ?CT_PEER(["+pad", "false", "+pad", "true"]),
+    async_dist_flag_test(Node5, true),
+    peer:stop(Peer5),
+
+    ok.
+
+async_dist_flag_test(Node, Default) when is_atom(Node), is_boolean(Default) ->
+    Tester = self(),
+    NotDefault = not Default,
+
+    Default = erpc:call(Node, erlang, system_info, [async_dist]),
+
+    {P1, M1} = spawn_opt(Node, fun () ->
+                                       receive after infinity -> ok end
+                               end, [link, monitor]),
+    {P2, M2} = spawn_opt(Node, fun () ->
+                                       receive after infinity -> ok end
+                               end, [link, monitor, {async_dist, false}]),
+    {P3, M3} = spawn_opt(Node, fun () ->
+                                       receive after infinity -> ok end
+                               end, [link, monitor, {async_dist, true}]),
+    {async_dist, Default} = erpc:call(Node, erlang, process_info, [P1, async_dist]),
+    {async_dist, false} = erpc:call(Node, erlang, process_info, [P2, async_dist]),
+    {async_dist, true} = erpc:call(Node, erlang, process_info, [P3, async_dist]),
+
+    R4 = make_ref(),
+    {P4, M4} = spawn_opt(Node, fun () ->
+                                       Default = process_flag(async_dist, NotDefault),
+                                       Tester ! R4,
+                                       receive after infinity -> ok end
+                               end, [link, monitor]),
+
+    R5 = make_ref(),
+    {P5, M5} = spawn_opt(Node, fun () ->
+                                       false = process_flag(async_dist, true),
+                                       Tester ! R5,
+                                       receive after infinity -> ok end
+                               end, [link, monitor, {async_dist, false}]),
+    R6 = make_ref(),
+    {P6, M6} = spawn_opt(Node, fun () ->
+                                       true = process_flag(async_dist, false),
+                                       Tester ! R6,
+                                       receive after infinity -> ok end
+                               end, [link, monitor, {async_dist, true}]),
+    receive R4 -> ok end,
+    {async_dist, NotDefault} = erpc:call(Node, erlang, process_info, [P4, async_dist]),
+    receive R5 -> ok end,
+    {async_dist, true} = erpc:call(Node, erlang, process_info, [P5, async_dist]),
+    receive R6 -> ok end,
+    {async_dist, false} = erpc:call(Node, erlang, process_info, [P6, async_dist]),
+
+
+    R7 = make_ref(),
+    {P7, M7} = spawn_opt(Node, fun () ->
+                                       Default = process_flag(async_dist, NotDefault),
+                                       NotDefault = process_flag(async_dist, NotDefault),
+                                       NotDefault = process_flag(async_dist, NotDefault),
+                                       NotDefault = process_flag(async_dist, Default),
+                                       Default = process_flag(async_dist, Default),
+                                       Default = process_flag(async_dist, Default),
+                                       Tester ! R7,
+                                       receive after infinity -> ok end
+                               end, [link, monitor]),
+    receive R7 -> ok end,
+
+    unlink(P1),
+    exit(P1, bang),
+    unlink(P2),
+    exit(P2, bang),
+    unlink(P3),
+    exit(P3, bang),
+    unlink(P4),
+    exit(P4, bang),
+    unlink(P5),
+    exit(P5, bang),
+    unlink(P6),
+    exit(P6, bang),
+    unlink(P7),
+    exit(P7, bang),
+
+    receive {'DOWN', M1, process, P1, bang} -> ok end,
+    receive {'DOWN', M2, process, P2, bang} -> ok end,
+    receive {'DOWN', M3, process, P3, bang} -> ok end,
+    receive {'DOWN', M4, process, P4, bang} -> ok end,
+    receive {'DOWN', M5, process, P5, bang} -> ok end,
+    receive {'DOWN', M6, process, P6, bang} -> ok end,
+    receive {'DOWN', M7, process, P7, bang} -> ok end,
+
+    ok.
+
+async_dist_port_dctrlr(Config) when is_list(Config) ->
+    {ok, RecvPeer, RecvNode} = ?CT_PEER(),
+    ok = async_dist_test(RecvNode),
+    peer:stop(RecvPeer),
+    ok.
+
+async_dist_proc_dctrlr(Config) when is_list(Config) ->
+    {ok, SendPeer, SendNode} = ?CT_PEER(["-proto_dist", "gen_tcp"]),
+    {ok, RecvPeer, RecvNode} = ?CT_PEER(["-proto_dist", "gen_tcp"]),
+    {Pid, Mon} = spawn_monitor(SendNode,
+                               fun () ->
+                                       ok = async_dist_test(RecvNode),
+                                       exit(test_success)
+                               end),
+    receive
+        {'DOWN', Mon, process, Pid, Reason} ->
+            test_success = Reason
+    end,
+    peer:stop(SendPeer),
+    peer:stop(RecvPeer),
+    ok.
+
+async_dist_test(Node) ->
+    Scale = case round(test_server:timetrap_scale_factor()/3) of
+                S when S < 1 -> 1;
+                S -> S
+            end,
+    _ = process_flag(async_dist, false),
+    Tester = self(),
+    AliveReceiver1 = spawn_link(Node, fun () ->
+                                              register(alive_receiver_1, self()),
+                                              Tester ! {registered, self()},
+                                              receive after infinity -> ok end
+                                      end),
+    receive {registered, AliveReceiver1} -> ok end,
+    AliveReceiver2 = spawn(Node, fun () -> receive after infinity -> ok end end),
+    {AliveReceiver3, AR3Mon} = spawn_monitor(Node,
+                                             fun () ->
+                                                     receive after infinity -> ok end
+                                             end),
+    {DeadReceiver, DRMon} = spawn_monitor(Node, fun () -> ok end),
+    receive
+        {'DOWN', DRMon, process, DeadReceiver, DRReason} ->
+            normal = DRReason
+    end,
+    Data = lists:duplicate($x, 256),
+    GoNuts = fun GN () ->
+                     DeadReceiver ! hello,
+                     GN()
+             end,
+    erpc:call(Node, erts_debug, set_internal_state, [available_internal_state, true]),
+    erpc:cast(Node, erts_debug, set_internal_state, [block, 4000*Scale]),
+    DistBufFiller = spawn_link(fun () ->
+                                       process_flag(async_dist, false),
+                                       receive go_nuts -> ok end,
+                                       GoNuts()
+                               end),
+    BDMon = spawn_link(fun SysMon () ->
+                               receive
+                                   {monitor, Pid, busy_dist_port, _} ->
+                                       Tester ! {busy_dist_port, Pid}
+                               end,
+                               SysMon()
+                       end),
+    _ = erlang:system_monitor(BDMon, [busy_dist_port]),
+    DistBufFiller ! go_nuts,
+
+    %% Busy dist entry may release after it has triggered even
+    %% though noone is consuming anything at the receiving end.
+    %% Continue banging until we stop getting new busy_dist_port...
+    WaitFilled = fun WF (Tmo) ->
+                         receive
+                             {busy_dist_port, DistBufFiller} ->
+                                 WF(1000*Scale)
+                         after
+                             Tmo ->
+                                 ok
+                         end
+                 end,
+    WaitFilled(infinity),
+
+    BusyDistChecker = spawn_link(fun () ->
+                                         process_flag(async_dist, false),
+                                         DeadReceiver ! hello,
+                                         exit(unexpected_return_from_bang)
+                                 end),
+    receive {busy_dist_port, BusyDistChecker} -> ok end,
+    {async_dist, false} = process_info(self(), async_dist),
+    {async_dist, false} = process_info(BusyDistChecker, async_dist),
+    false = process_flag(async_dist, true),
+    {async_dist, true} = process_info(self(), async_dist),
+
+    Start = erlang:monotonic_time(millisecond),
+    M1 = erlang:monitor(process, AliveReceiver1),
+    true = is_reference(M1),
+    M2 = erlang:monitor(process, AliveReceiver2),
+    true = is_reference(M2),
+    {pid, Data} = AliveReceiver1 ! {pid, Data},
+    {reg_name, Data} = {alive_receiver_1, Node} ! {reg_name, Data},
+    true = erlang:demonitor(M1),
+    true = link(AliveReceiver2),
+    true = unlink(AliveReceiver1),
+    RId = spawn_request(Node, fun () -> receive bye -> ok end end, [link, monitor]),
+    true = is_reference(RId),
+    erlang:group_leader(self(), AliveReceiver2),
+    AR3XReason = make_ref(),
+    true = exit(AliveReceiver3, AR3XReason),
+    End = erlang:monotonic_time(millisecond),
+
+    %% These signals should have been buffered immediately. Make sure
+    %% it did not take a long time...
+    true = 500*Scale >= End - Start,
+
+    receive after 500*Scale -> ok end,
+
+    unlink(BusyDistChecker),
+    exit(BusyDistChecker, bang),
+    false = is_process_alive(BusyDistChecker),
+
+    unlink(DistBufFiller),
+    exit(DistBufFiller, bang),
+    false = is_process_alive(DistBufFiller),
+
+    %% Verify that the signals eventually get trough when the other
+    %% node continue to work...
+    {links, []}
+        = erpc:call(Node, erlang, process_info, [AliveReceiver1, links]),
+    {links, [Tester]}
+        = erpc:call(Node, erlang, process_info, [AliveReceiver2, links]),
+    {monitored_by, []}
+        = erpc:call(Node, erlang, process_info, [AliveReceiver1, monitored_by]),
+    {monitored_by, [Tester]}
+        = erpc:call(Node, erlang, process_info, [AliveReceiver2, monitored_by]),
+    {messages, [{pid, Data}, {reg_name, Data}]}
+        = erpc:call(Node, erlang, process_info, [AliveReceiver1, messages]),
+    {group_leader, Tester}
+        = erpc:call(Node, erlang, process_info, [AliveReceiver2, group_leader]),
+
+    Spawned = receive
+                  {spawn_reply, RId, SpawnRes, Pid} ->
+                      ok = SpawnRes,
+                      true = is_pid(Pid),
+                      Pid
+              end,
+    {links, [Tester]}
+        = erpc:call(Node, erlang, process_info, [Spawned, links]),
+    {monitored_by, [Tester]}
+        = erpc:call(Node, erlang, process_info, [Spawned, monitored_by]),
+
+    receive
+        {'DOWN', AR3Mon, process, AliveReceiver3, ActualAR3XReason} ->
+            AR3XReason = ActualAR3XReason
+    end,
+
+    unlink(AliveReceiver2),
+    unlink(Spawned),
+
+    true = process_flag(async_dist, false),
+    {async_dist, false} = process_info(self(), async_dist),
+
+    ok.
 
 %%% Utilities
 
diff --git a/erts/etc/common/erlexec.c b/erts/etc/common/erlexec.c
index 3e44ad2b77..fa951ae770 100644
--- a/erts/etc/common/erlexec.c
+++ b/erts/etc/common/erlexec.c
@@ -1000,8 +1000,10 @@ int main(int argc, char **argv)
 		      }
 		      break;
 		  case 'p':
-		      if (argv[i][2] != 'c' || argv[i][3] != '\0')
+		      if (!(argv[i][2] == 'c' && argv[i][3] == '\0')
+                          && !(argv[i][2] == 'a' && argv[i][3] == 'd' && argv[i][4] == '\0')) {
 			  goto the_default;
+                      }
                       NEXT_ARG_CHECK();
 		      argv[i][0] = '-';
 		      add_Eargs(argv[i]);
diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl
index 470f861e08..41471e51e2 100644
--- a/erts/preloaded/src/erlang.erl
+++ b/erts/preloaded/src/erlang.erl
@@ -2425,7 +2425,10 @@ open_port(PortName, PortSettings) ->
 -type message_queue_data() ::
 	off_heap | on_heap.
 
--spec process_flag(trap_exit, Boolean) -> OldBoolean when
+-spec process_flag(async_dist, Boolean) -> OldBoolean when
+      Boolean :: boolean(),
+      OldBoolean :: boolean();
+                  (trap_exit, Boolean) -> OldBoolean when
       Boolean :: boolean(),
       OldBoolean :: boolean();
                   (error_handler, Module) -> OldModule when
@@ -2463,6 +2466,7 @@ process_flag(_Flag, _Value) ->
     erlang:nif_error(undefined).
 
 -type process_info_item() ::
+      async_dist |
       backtrace |
       binary |
       catchlevel |
@@ -2499,6 +2503,7 @@ process_flag(_Flag, _Value) ->
       trap_exit.
 
 -type process_info_result_item() ::
+      {async_dist, Enabled :: boolean()} |
       {backtrace, Bin :: binary()} |
       {binary, BinInfo :: [{non_neg_integer(),
                             non_neg_integer(),
@@ -2949,6 +2954,7 @@ tuple_to_list(_Tuple) ->
          (update_cpu_info) -> changed | unchanged;
          (version) -> string();
          (wordsize | {wordsize, internal} | {wordsize, external}) -> 4 | 8;
+         (async_dist) -> boolean();
          (overview) -> boolean();
          %% Deliberately left undocumented
          (sequential_tracer) -> {sequential_tracer, pid() | port() | {module(),term()} | false}.
@@ -3078,7 +3084,8 @@ spawn_monitor(M, F, A) ->
       | {min_heap_size, Size :: non_neg_integer()}
       | {min_bin_vheap_size, VSize :: non_neg_integer()}
       | {max_heap_size, Size :: max_heap_size()}
-      | {message_queue_data, MQD :: message_queue_data()}.
+      | {message_queue_data, MQD :: message_queue_data()}
+      | {async_dist, Enabled :: boolean()}.
 
 -spec spawn_opt(Fun, Options) -> pid() | {pid(), reference()} when
       Fun :: function(),
-- 
2.35.3

openSUSE Build Service is sponsored by