File 2901-Adjustable-busy-port-parameters-for-default-drivers.patch of Package erlang
From b7494031c58da6c68668a741ee81e09fe232ad81 Mon Sep 17 00:00:00 2001
From: Rickard Green <rickard@erlang.org>
Date: Tue, 19 Nov 2019 18:12:46 +0100
Subject: [PATCH] Adjustable busy port parameters for default drivers
---
 erts/doc/src/erlang.xml                            |  65 ++++++++
 erts/emulator/beam/atom.names                      |   2 +
 erts/emulator/beam/erl_bif_port.c                  |  63 +++++++-
 erts/emulator/beam/erl_sys_driver.h                |  31 +++-
 erts/emulator/beam/io.c                            |  18 +++
 erts/emulator/beam/sys.h                           |  23 ---
 erts/emulator/sys/unix/sys_drivers.c               |  93 ++++++++---
 erts/emulator/sys/win32/sys.c                      | 128 +++++++++------
 erts/emulator/test/port_bif_SUITE.erl              | 172 ++++++++++++++++++++-
 .../emulator/test/port_bif_SUITE_data/Makefile.src |   8 +-
 erts/emulator/test/port_bif_SUITE_data/sleeper.c   |  66 ++++++++
 erts/preloaded/ebin/erlang.beam                    | Bin 100344 -> 100436 bytes
 erts/preloaded/src/erlang.erl                      |   4 +-
 13 files changed, 572 insertions(+), 101 deletions(-)
 create mode 100644 erts/emulator/test/port_bif_SUITE_data/sleeper.c
diff --git a/erts/doc/src/erlang.xml b/erts/doc/src/erlang.xml
index 2183f75487..2f3d2f9624 100644
--- a/erts/doc/src/erlang.xml
+++ b/erts/doc/src/erlang.xml
@@ -4121,6 +4121,71 @@ RealSystem = system + MissedSystem</code>
               <seealso marker="erl#+spp"><c>+spp</c></seealso> to
               <c>erl(1)</c>.</p>
           </item>
+	  <tag><c>{busy_limits_port, {Low, High} | disabled}</c></tag>
+	  <item>
+	    <p>Sets limits that will be used for controlling the
+            busy state of the port.</p>
+	    <p>When the ports internal output queue size becomes
+	    larger than or equal to <c>High</c> bytes, it enters
+	    the busy state. When it becomes less than <c>Low</c>
+	    bytes it leaves the busy state. When the port is in
+	    the busy state, processes sending commands to it will
+	    be suspended until the port leaves the busy state.
+	    Commands are in this context either
+            <c>Port ! {Owner, {command, Data}}</c> or
+            <c>port_command/[2,3]</c>.</p>
+	    <p>
+	    The <c>Low</c> limit is automatically adjusted to the
+	    same as <c>High</c> if it is set larger then <c>High</c>.
+	    Valid range of values for <c>Low</c> and <c>High</c> is
+	    <c>[1, (1 bsl (8*erlang:system_info(wordsize)))-2]</c>.
+	    If the atom <c>disabled</c> is passed, the port will
+	    never enter the busy state.</p>
+	    <p>The defaults are <c>Low = 4096</c> and
+	    <c>High = 8192</c>.</p>
+	    <p><em>Note</em> that this option is only valid when
+	    spawning an executable (port program) by opening the
+	    spawn driver and when opening the <c>fd</c> driver.
+	    This option will cause a failure with a <c>badarg</c>
+	    exception when opening other drivers.</p>
+	  </item>
+	  <tag><c>{busy_limits_msgq, {Low, High} | disabled}</c></tag>
+	  <item>
+	    <p>Sets limits that will be used for controlling the
+            busy state of the port message queue.</p>
+	    <p>When the ports message queue size becomes larger
+	    than or equal to <c>High</c> bytes it enters the busy
+	    state. When it becomes less than <c>Low</c> bytes it
+	    leaves the busy state. When the port message queue is
+	    in the busy state, processes sending commands to it
+	    will be suspended until the port message queue leaves
+	    the busy state. Commands are in this context either
+            <c>Port ! {Owner, {command, Data}}</c> or
+            <c>port_command/[2,3]</c>.</p>
+	    <p>The <c>Low</c> limit is automatically adjusted to the
+	    same as <c>High</c> if it is set larger then <c>High</c>.
+	    Valid range of values for <c>Low</c> and <c>High</c> is
+	    <c>[1, (1 bsl (8*erlang:system_info(wordsize)))-2]</c>.
+	    If the atom <c>disabled</c> is passed, the port
+	    message queue will never enter the busy state.</p>
+	    <p><em>Note</em> that if the driver statically has
+	    disabled the use of this feature, a failure with a
+	    <c>badarg</c> exception will be raised unless this
+	    option also is set to <c>disable</c> or not passed
+	    at all.</p>
+	    <p>The defaults are <c>Low = 4096</c> and
+	    <c>High = 8192</c> unless the driver itself does
+	    modifications of these values.</p>
+	    <p><em>Note</em> that the driver might fail if
+	    it also adjust these limits by itself and you
+	    have disabled this feature.</p>
+	    <p>The spawn driver (used when spawning an executable)
+	    and the <c>fd</c> driver do not disable this feature
+	    and do not adjust these limits by themselves.</p>
+	    <p>For more information see the documentation
+	    <seealso marker="erl_driver#erl_drv_busy_msgq_limits"><c>erl_drv_busy_msgq_limits()</c></seealso>.
+	    </p>
+	  </item>
         </taglist>
         <p>Default is <c>stream</c> for all port types and
           <c>use_stdio</c> for spawned ports.</p>
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names
index 57f3a53481..af7a6d7ed4 100644
--- a/erts/emulator/beam/atom.names
+++ b/erts/emulator/beam/atom.names
@@ -144,6 +144,8 @@ atom bsr_unicode
 atom build_type
 atom busy
 atom busy_dist_port
+atom busy_limits_port
+atom busy_limits_msgq
 atom busy_port
 atom call
 atom call_count
diff --git a/erts/emulator/beam/erl_bif_port.c b/erts/emulator/beam/erl_bif_port.c
index ed825d3dda..63bfaf8572 100644
--- a/erts/emulator/beam/erl_bif_port.c
+++ b/erts/emulator/beam/erl_bif_port.c
@@ -691,6 +691,10 @@ open_port(Process* p, Eterm name, Eterm settings, int *err_typep, int *err_nump)
     opts.spawn_type = ERTS_SPAWN_ANY; 
     opts.argv = NULL;
     opts.parallelism = erts_port_parallelism;
+    opts.high_watermark = 8192;
+    opts.low_watermark = opts.high_watermark / 2;
+    opts.port_watermarks_set = 0;
+    opts.msgq_watermarks_set = 0;
     erts_osenv_init(&opts.envir);
 
     linebuf = 0;
@@ -782,6 +786,62 @@ open_port(Process* p, Eterm name, Eterm settings, int *err_typep, int *err_nump)
 			opts.parallelism = 0;
 		    else
 			goto badarg;
+                } else if (option == am_busy_limits_port) {
+                    Uint high, low;
+                    if (*tp == am_disabled)
+                        low = high = ERL_DRV_BUSY_MSGQ_DISABLED;
+                    else if (!is_tuple_arity(*tp, 2))
+                        goto badarg;
+                    else {
+                        Eterm *wtp = tuple_val(*tp);
+                        if (!term_to_Uint(wtp[1], &low))
+                            goto badarg;
+                        if (!term_to_Uint(wtp[2], &high))
+                            goto badarg;
+                        if (high < ERL_DRV_BUSY_MSGQ_LIM_MIN)
+                            goto badarg;
+                        if (high > ERL_DRV_BUSY_MSGQ_LIM_MAX)
+                            goto badarg;
+                        if (low < ERL_DRV_BUSY_MSGQ_LIM_MIN)
+                            goto badarg;
+                        if (low > ERL_DRV_BUSY_MSGQ_LIM_MAX)
+                            goto badarg;
+                        if (high == ~((Uint) 0) || low == ~((Uint) 0))
+                            goto badarg;
+                        if (low > high)
+                            low = high;
+                    }
+                    opts.low_watermark = low;
+                    opts.high_watermark = high;
+                    opts.port_watermarks_set = !0;
+                } else if (option == am_busy_limits_msgq) {
+                    Uint high, low;
+                    if (*tp == am_disabled)
+                        low = high = ERL_DRV_BUSY_MSGQ_DISABLED;
+                    else if (!is_tuple_arity(*tp, 2))
+                        goto badarg;
+                    else {
+                        Eterm *wtp = tuple_val(*tp);
+                        if (!term_to_Uint(wtp[1], &low))
+                            goto badarg;
+                        if (!term_to_Uint(wtp[2], &high))
+                            goto badarg;
+                        if (high < ERL_DRV_BUSY_MSGQ_LIM_MIN)
+                            goto badarg;
+                        if (high > ERL_DRV_BUSY_MSGQ_LIM_MAX)
+                            goto badarg;
+                        if (low < ERL_DRV_BUSY_MSGQ_LIM_MIN)
+                            goto badarg;
+                        if (low > ERL_DRV_BUSY_MSGQ_LIM_MAX)
+                            goto badarg;
+                        if (high == ~((Uint) 0) || low == ~((Uint) 0))
+                            goto badarg;
+                        if (low > high)
+                            low = high;
+                    }
+                    opts.low_msgq_watermark = low;
+                    opts.high_msgq_watermark = high;
+                    opts.msgq_watermarks_set = !0;
 		} else {
 		    goto badarg;
 		}
@@ -820,6 +880,7 @@ open_port(Process* p, Eterm name, Eterm settings, int *err_typep, int *err_nump)
 	    nargs = list_val(*nargs);
 	}
     }
+
     if (opts.read_write == 0)	/* implement default */
 	opts.read_write = DO_READ|DO_WRITE;
 
@@ -827,7 +888,7 @@ open_port(Process* p, Eterm name, Eterm settings, int *err_typep, int *err_nump)
     if((linebuf && opts.packet_bytes) || 
        (opts.redir_stderr && !opts.use_stdio)) {
 	goto badarg;
-}
+    }
 
     /* If we lacked an env option, fill in the global environment without
      * changes. */
diff --git a/erts/emulator/beam/erl_sys_driver.h b/erts/emulator/beam/erl_sys_driver.h
index d46e88cb05..8f1963fbd4 100644
--- a/erts/emulator/beam/erl_sys_driver.h
+++ b/erts/emulator/beam/erl_sys_driver.h
@@ -33,10 +33,39 @@
 
 typedef long ErlDrvEvent; /* An event to be selected on. */
 
-/* typedef struct _SysDriverOpts SysDriverOpts; defined in sys.h */
+typedef struct _SysDriverOpts SysDriverOpts;
 
 #include "erl_driver.h"
 
+/*
+ * This structure contains options to all built in drivers.
+ * None of the drivers use all of the fields.
+ */
+
+struct _SysDriverOpts {
+    Uint ifd;			/* Input file descriptor (fd driver). */
+    Uint ofd;			/* Outputfile descriptor (fd driver). */
+    int packet_bytes;		/* Number of bytes in packet header. */
+    int read_write;		/* Read and write bits. */
+    int use_stdio;		/* Use standard I/O: TRUE or FALSE. */
+    int redir_stderr;           /* Redirect stderr to stdout: TRUE/FALSE. */
+    int hide_window;		/* Hide this windows (Windows). */
+    int exit_status;		/* Report exit status of subprocess. */
+    int overlapped_io;          /* Only has effect on windows NT et al */
+    erts_osenv_t envir;		/* Environment of the port process */
+    char **argv;                /* Argument vector in Unix'ish format. */
+    char *wd;			/* Working directory. */
+    unsigned spawn_type;        /* Bitfield of ERTS_SPAWN_DRIVER | 
+				   ERTS_SPAWN_EXTERNAL | both*/ 
+    int parallelism;            /* Optimize for parallelism */
+    ErlDrvSizeT high_watermark;
+    ErlDrvSizeT low_watermark;
+    ErlDrvSizeT high_msgq_watermark;
+    ErlDrvSizeT low_msgq_watermark;
+    char port_watermarks_set;
+    char msgq_watermarks_set;
+};
+
 #endif
 
 
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 20a155e1d8..78f7a146f2 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -605,6 +605,19 @@ erts_open_driver(erts_driver_t* driver,	/* Pointer to driver. */
 	ERTS_OPEN_DRIVER_RET(NULL, -3, BADARG);
     }
 
+    if (opts->port_watermarks_set && driver != &spawn_driver
+        && driver != &fd_driver && driver != &vanilla_driver) {
+	erts_rwmtx_runlock(&erts_driver_list_lock);
+	ERTS_OPEN_DRIVER_RET(NULL, -3, BADARG);
+    }
+
+    if (opts->msgq_watermarks_set
+        && (driver->flags & ERL_DRV_FLAG_NO_BUSY_MSGQ)
+        && opts->high_msgq_watermark != ERL_DRV_BUSY_MSGQ_DISABLED) {
+	erts_rwmtx_runlock(&erts_driver_list_lock);
+	ERTS_OPEN_DRIVER_RET(NULL, -3, BADARG);
+    }
+    
     driver_lock = driver->lock;
 
     if (driver->handle != NULL) {
@@ -644,6 +657,11 @@ erts_open_driver(erts_driver_t* driver,	/* Pointer to driver. */
 				      1));
     }
 
+    if (opts->msgq_watermarks_set)
+        erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(port),
+                                 &opts->low_msgq_watermark,
+                                 &opts->high_msgq_watermark);
+
     error_number = error_type = 0;
     if (driver->start) {
         ERTS_MSACC_PUSH_STATE_M();
diff --git a/erts/emulator/beam/sys.h b/erts/emulator/beam/sys.h
index c534891f30..0d85211be8 100644
--- a/erts/emulator/beam/sys.h
+++ b/erts/emulator/beam/sys.h
@@ -683,29 +683,6 @@ typedef Eterm ErtsTracer;
     int content_size;
 } erts_osenv_t;
 
-/*
- * This structure contains options to all built in drivers.
- * None of the drivers use all of the fields.
- */
-
-typedef struct _SysDriverOpts {
-    Uint ifd;			/* Input file descriptor (fd driver). */
-    Uint ofd;			/* Outputfile descriptor (fd driver). */
-    int packet_bytes;		/* Number of bytes in packet header. */
-    int read_write;		/* Read and write bits. */
-    int use_stdio;		/* Use standard I/O: TRUE or FALSE. */
-    int redir_stderr;           /* Redirect stderr to stdout: TRUE/FALSE. */
-    int hide_window;		/* Hide this windows (Windows). */
-    int exit_status;		/* Report exit status of subprocess. */
-    int overlapped_io;          /* Only has effect on windows NT et al */
-    erts_osenv_t envir;		/* Environment of the port process */
-    char **argv;                /* Argument vector in Unix'ish format. */
-    char *wd;			/* Working directory. */
-    unsigned spawn_type;        /* Bitfield of ERTS_SPAWN_DRIVER | 
-				   ERTS_SPAWN_EXTERNAL | both*/ 
-    int parallelism;            /* Optimize for parallelism */
-} SysDriverOpts;
-
 extern char *erts_default_arg0;
 
 extern char os_type[];
diff --git a/erts/emulator/sys/unix/sys_drivers.c b/erts/emulator/sys/unix/sys_drivers.c
index 92020c6f35..6883519dc3 100644
--- a/erts/emulator/sys/unix/sys_drivers.c
+++ b/erts/emulator/sys/unix/sys_drivers.c
@@ -87,7 +87,7 @@ static Eterm forker_port;
 /* Used by the fd driver iff the fd could not be set to non-blocking */
 typedef struct ErtsSysBlocking_ {
     ErlDrvPDL pdl;
-    int res;
+    ErlDrvSSizeT res;
     int err;
     unsigned int pkey;
 } ErtsSysBlocking;
@@ -112,6 +112,9 @@ typedef struct driver_data {
     int status;
     int terminating;
     ErtsSysBlocking *blocking;
+    int busy;
+    ErlDrvSizeT high_watermark;
+    ErlDrvSizeT low_watermark;
 } ErtsSysDriverData;
 
 #define DIR_SEPARATOR_CHAR    '/'
@@ -169,7 +172,7 @@ typedef struct driver_data {
 void
 erl_sys_late_init(void)
 {
-    SysDriverOpts opts;
+    SysDriverOpts opts = {0};
     Port *port;
 
     sys_signal(SIGPIPE, SIG_IGN); /* Ignore - we'll handle the write failure */
@@ -371,7 +374,8 @@ create_driver_data(ErlDrvPort port_num,
                    int read_write,
                    int exit_status,
                    int pid,
-                   int is_blocking)
+                   int is_blocking,
+                   SysDriverOpts* opts)
 {
     Port *prt;
     ErtsSysDriverData *driver_data;
@@ -430,6 +434,10 @@ create_driver_data(ErlDrvPort port_num,
         driver_data->ofd = NULL;
     }
 
+    driver_data->busy = 0;
+    driver_data->high_watermark = opts->high_watermark;
+    driver_data->low_watermark = opts->low_watermark;
+    
     return driver_data;
 }
 
@@ -719,7 +727,7 @@ static ErlDrvData spawn_start(ErlDrvPort port_num, char* name,
 
     dd = create_driver_data(port_num, ifd[0], ofd[1], opts->packet_bytes,
                              DO_WRITE | DO_READ, opts->exit_status,
-                             0, 0);
+                            0, 0, opts);
 
     {
         /* send ofd[0] + ifd[1] + stderrfd to forker port */
@@ -999,7 +1007,7 @@ static ErlDrvData fd_start(ErlDrvPort port_num, char* name,
     return (ErlDrvData)create_driver_data(port_num, opts->ifd, opts->ofd,
                                           opts->packet_bytes,
                                           opts->read_write, 0, -1,
-                                          !non_blocking);
+                                          !non_blocking, opts);
 }
 
 static void clear_fd_data(ErtsSysFdData *fdd)
@@ -1075,7 +1083,8 @@ static ErlDrvData vanilla_start(ErlDrvPort port_num, char* name,
 
     res = (ErlDrvData)(long)create_driver_data(port_num, fd, fd,
                                                opts->packet_bytes,
-                                               opts->read_write, 0, -1, 0);
+                                               opts->read_write, 0, -1, 0,
+                                               opts);
     return res;
 }
 
@@ -1108,10 +1117,10 @@ static void outputv(ErlDrvData e, ErlIOVec* ev)
     int pb = dd->packet_bytes;
     int ofd = dd->ofd ? dd->ofd->fd : -1;
     ssize_t n;
-    ErlDrvSizeT sz;
     char lb[4];
     char* lbp;
     ErlDrvSizeT len = ev->size;
+    ErlDrvSizeT qsz;
 
     /* (len > ((unsigned long)-1 >> (4-pb)*8)) */
     /*    if (pb >= 0 && (len & (((ErlDrvSizeT)1 << (pb*8))) - 1) != len) {*/
@@ -1130,14 +1139,20 @@ static void outputv(ErlDrvData e, ErlIOVec* ev)
     if (dd->blocking)
         driver_pdl_lock(dd->blocking->pdl);
 
-    if ((sz = driver_sizeq(ix)) > 0) {
-	driver_enqv(ix, ev, 0);
-
+    qsz = driver_sizeq(ix);
+    if (qsz) {
+        if (qsz == (ErlDrvSizeT) -1) {
+            if (dd->blocking)
+                driver_pdl_unlock(dd->blocking->pdl);
+            driver_failure_posix(ix, EINVAL);
+            return;
+        }
+        driver_enqv(ix, ev, 0);
+        qsz += ev->size;
+        if (!dd->busy && qsz >= dd->high_watermark)
+            set_busy_port(ix, (dd->busy = !0));
         if (dd->blocking)
             driver_pdl_unlock(dd->blocking->pdl);
-
-	if (sz + ev->size >= (1 << 13))
-	    set_busy_port(ix, 1);
     }
     else if (!dd->blocking) {
         /* We try to write directly if the fd in non-blocking */
@@ -1154,11 +1169,17 @@ static void outputv(ErlDrvData e, ErlIOVec* ev)
 	    n = 0;
 	}
 	driver_enqv(ix, ev, n);  /* n is the skip value */
+        qsz = ev->size - n;
+        if (!dd->busy && qsz >= dd->high_watermark)
+            set_busy_port(ix, (dd->busy = !0));
 	driver_select(ix, ofd, ERL_DRV_WRITE|ERL_DRV_USE, 1);
     }
     else {
         if (ev->size != 0) {
             driver_enqv(ix, ev, 0);
+            qsz = ev->size;
+            if (!dd->busy && qsz >= dd->high_watermark)
+                set_busy_port(ix, (dd->busy = !0));
             driver_pdl_unlock(dd->blocking->pdl);
             driver_async(ix, &dd->blocking->pkey,
                          fd_async, dd, NULL);
@@ -1166,6 +1187,7 @@ static void outputv(ErlDrvData e, ErlIOVec* ev)
             driver_pdl_unlock(dd->blocking->pdl);
         }
     }
+
     /* return 0;*/
 }
 
@@ -1177,7 +1199,7 @@ static void output(ErlDrvData e, char* buf, ErlDrvSizeT len)
     int pb = dd->packet_bytes;
     int ofd = dd->ofd ? dd->ofd->fd : -1;
     ssize_t n;
-    ErlDrvSizeT sz;
+    ErlDrvSizeT qsz;
     char lb[4];
     char* lbp;
     struct iovec iv[2];
@@ -1192,11 +1214,15 @@ static void output(ErlDrvData e, char* buf, ErlDrvSizeT len)
     put_int32(len, lb);
     lbp = lb + (4-pb);
 
-    if ((sz = driver_sizeq(ix)) > 0) {
+    qsz = driver_sizeq(ix);
+    if (qsz) {
+        if (qsz == (ErlDrvSizeT) -1) {
+            driver_failure_posix(ix, EINVAL);
+            return;
+        }
 	driver_enq(ix, lbp, pb);
 	driver_enq(ix, buf, len);
-	if (sz + len + pb >= (1 << 13))
-	    set_busy_port(ix, 1);
+        qsz += len + pb;
     }
     else {
 	iv[0].iov_base = lbp;
@@ -1213,6 +1239,7 @@ static void output(ErlDrvData e, char* buf, ErlDrvSizeT len)
 	    }
 	    n = 0;
 	}
+        qsz = pb + len - n;
 	if (n < pb) {
 	    driver_enq(ix, lbp+n, pb-n);
 	    driver_enq(ix, buf, len);
@@ -1223,6 +1250,10 @@ static void output(ErlDrvData e, char* buf, ErlDrvSizeT len)
 	}
 	driver_select(ix, ofd, ERL_DRV_WRITE|ERL_DRV_USE, 1);
     }
+
+    if (!dd->busy && qsz >= dd->high_watermark)
+        set_busy_port(ix, (dd->busy = !0));
+
     return; /* 0; */
 }
 
@@ -1479,6 +1510,8 @@ static void ready_output(ErlDrvData e, ErlDrvEvent ready_fd)
     int vsize;
 
     if ((iv = (struct iovec*) driver_peekq(ix, &vsize)) == NULL) {
+        if (dd->busy)
+            set_busy_port(ix, (dd->busy = 0));
 	driver_select(ix, ready_fd, ERL_DRV_WRITE, 0);
         if (dd->pid > 0 && dd->ofd->fd < 0) {
             /* The port was opened with 'in' option, which means we
@@ -1494,8 +1527,13 @@ static void ready_output(ErlDrvData e, ErlDrvEvent ready_fd)
     }
     vsize = vsize > MAX_VSIZE ? MAX_VSIZE : vsize;
     if ((n = writev(ready_fd, iv, vsize)) > 0) {
-	if (driver_deq(ix, n) == 0)
-	    set_busy_port(ix, 0);
+        ErlDrvSizeT qsz = driver_deq(ix, n);
+        if (qsz == (ErlDrvSizeT) -1) {
+            driver_failure_posix(ix, EINVAL);
+            return;
+        }
+        if (dd->busy && qsz < dd->low_watermark)
+            set_busy_port(ix, (dd->busy = 0));
     }
     else if (n < 0) {
 	if (errno == ERRNO_BLOCK || errno == EINTR)
@@ -1519,7 +1557,7 @@ static void stop_select(ErlDrvEvent fd, void* _)
 static void
 fd_async(void *async_data)
 {
-    int res;
+    ErlDrvSSizeT res;
     ErtsSysDriverData *dd = (ErtsSysDriverData *)async_data;
     SysIOVec      *iov0;
     SysIOVec      *iov;
@@ -1544,7 +1582,6 @@ fd_async(void *async_data)
         } while (res < 0 && errno == EINTR);
         if (res < 0)
             err = errno;
-        err = errno;
 
         erts_free(ERTS_ALC_T_SYS_WRITE_BUF, iov);
     }
@@ -1560,10 +1597,18 @@ void fd_ready_async(ErlDrvData drv_data,
     ASSERT(dd->blocking);
 
     if (dd->blocking->res > 0) {
+        ErlDrvSizeT qsz;
         driver_pdl_lock(dd->blocking->pdl);
-        if (driver_deq(port_num, dd->blocking->res) == 0) {
+        qsz = driver_deq(port_num, dd->blocking->res);
+        if (qsz == (ErlDrvSizeT) -1) {
             driver_pdl_unlock(dd->blocking->pdl);
-            set_busy_port(port_num, 0);
+            driver_failure_posix(port_num, EINVAL);
+            return;
+        }
+        if (dd->busy && qsz < dd->low_watermark)
+            set_busy_port(port_num, (dd->busy = 0));
+        driver_pdl_unlock(dd->blocking->pdl);
+        if (qsz == 0) {
             if (dd->terminating) {
                 /* The port is has been ordered to terminate
                    from either fd_flush or port_inp_failure */
@@ -1576,14 +1621,12 @@ void fd_ready_async(ErlDrvData drv_data,
                 return; /* -1; */
             }
         } else {
-            driver_pdl_unlock(dd->blocking->pdl);
             /* still data left to write in queue */
             driver_async(port_num, &dd->blocking->pkey, fd_async, dd, NULL);
             return /* 0; */;
         }
     } else if (dd->blocking->res < 0) {
         if (dd->blocking->err == ERRNO_BLOCK) {
-            set_busy_port(port_num, 1);
             /* still data left to write in queue */
             driver_async(port_num, &dd->blocking->pkey, fd_async, dd, NULL);
         } else
diff --git a/erts/emulator/sys/win32/sys.c b/erts/emulator/sys/win32/sys.c
index b95aadc9b2..53411a4cc2 100644
--- a/erts/emulator/sys/win32/sys.c
+++ b/erts/emulator/sys/win32/sys.c
@@ -507,6 +507,9 @@ struct driver_data {
     AsyncIo out;		/* Control block for overlapped writing. */
     int report_exit;            /* Do report exit status for the port */
     erts_atomic32_t refc;       /* References to this struct */
+    ErlDrvSizeT high_watermark;        /* Q size when to go to busy port state */
+    ErlDrvSizeT low_watermark;         /* Q size when to leave busy port state */
+    int busy;
 };
 
 /* Driver interfaces */
@@ -740,13 +743,8 @@ release_driver_data(DriverData* dp)
     }
     ASSERT(dp->inBufSize == 0);
 
-    if (dp->outbuf != NULL) {
-	ASSERT(erts_atomic_read_nob(&sys_misc_mem_sz) >= dp->outBufSize);
-	erts_atomic_add_nob(&sys_misc_mem_sz, -1*dp->outBufSize);
-	DRV_BUF_FREE(dp->outbuf);
-	dp->outBufSize = 0;
-	dp->outbuf = NULL;
-    }
+    /* outbuf is released when queue is released */
+    ASSERT(!dp->outbuf);
     ASSERT(dp->outBufSize == 0);
 
     if (dp->port_pid != INVALID_HANDLE_VALUE) {
@@ -866,13 +864,18 @@ threaded_handle_closer(LPVOID param)
  */
 
 static ErlDrvData
-set_driver_data(DriverData* dp, HANDLE ifd, HANDLE ofd, int read_write, int report_exit)
+set_driver_data(DriverData* dp, HANDLE ifd, HANDLE ofd, int read_write, int report_exit,
+                SysDriverOpts* opts)
 {
     int result;
 
     dp->in.fd = ifd;
     dp->out.fd = ofd;
     dp->report_exit = report_exit;
+    dp->high_watermark = opts->high_watermark;
+    dp->low_watermark = opts->low_watermark;
+    
+    dp->busy = 0;
 
     if (read_write & DO_READ) {
 	result = driver_select(dp->port_num, (ErlDrvEvent)dp->in.ov.hEvent,
@@ -1322,7 +1325,7 @@ spawn_start(ErlDrvPort port_num, char* utf8_name, SysDriverOpts* opts)
 	}
 #endif
 	retval = set_driver_data(dp, hFromChild, hToChild, opts->read_write,
-				 opts->exit_status);
+				 opts->exit_status, opts);
 	if (retval != ERL_DRV_ERROR_GENERAL && retval != ERL_DRV_ERROR_ERRNO) {
             /* We assume that this cannot generate a negative number */
             erl_drv_set_os_pid(port_num, pid);
@@ -2200,7 +2203,8 @@ fd_start(ErlDrvPort port_num, char* name, SysDriverOpts* opts)
 	} else if (in == 2 && out == 2) {
 	    save_22_port = dp;
 	}
-	return set_driver_data(dp, (HANDLE) opts->ifd, (HANDLE) opts->ofd, opts->read_write, 0);
+	return set_driver_data(dp, (HANDLE) opts->ifd, (HANDLE) opts->ofd, opts->read_write,
+                               0, opts);
     }
 }
 
@@ -2268,7 +2272,8 @@ vanilla_start(ErlDrvPort port_num, char* name, SysDriverOpts* opts)
     }
     if (ofd == INVALID_HANDLE_VALUE)
 	return ERL_DRV_ERROR_GENERAL;
-    return set_driver_data(dp, ifd, ofd, opts->read_write,0);
+    return set_driver_data(dp, ifd, ofd, opts->read_write,
+                           0, opts);
 }
 
 static void
@@ -2433,6 +2438,8 @@ output(ErlDrvData drv_data, char* buf, ErlDrvSizeT len)
     DriverData* dp = (DriverData *) drv_data;
     int pb;			/* The header size for this port. */
     char* current;
+    ErlDrvSizeT qsz, sz;
+    ErlDrvBinary *bin;
 
     pb = dp->packet_bytes;
 
@@ -2452,24 +2459,19 @@ output(ErlDrvData drv_data, char* buf, ErlDrvSizeT len)
      * Allocate memory for both the message and the header.
      */
 
-    ASSERT(dp->outbuf == NULL);
-    ASSERT(dp->outBufSize == 0);
-
-    ASSERT(!dp->outbuf);
-    dp->outbuf = DRV_BUF_ALLOC(pb+len);
-    if (!dp->outbuf) {
-	driver_failure_posix(dp->port_num, ENOMEM);
-	return ; /* -1; */
+    sz = pb+len;
+    bin = driver_alloc_binary(sz);
+    if (!bin) {
+        driver_failure_posix(dp->port_num, ENOMEM);
+        return ; /* -1; */
     }
 
-    dp->outBufSize = pb+len;
-    erts_atomic_add_nob(&sys_misc_mem_sz, dp->outBufSize);
-
     /*
      * Store header bytes (if any).
      */
 
-    current = dp->outbuf;
+    current = bin->orig_bytes;
+
     switch (pb) {
     case 4:
 	*current++ = (len >> 24) & 255;
@@ -2486,18 +2488,34 @@ output(ErlDrvData drv_data, char* buf, ErlDrvSizeT len)
 
     if (len)
 	memcpy(current, buf, len);
-    
-    if (!async_write_file(&dp->out, dp->outbuf, pb+len)) {
-	set_busy_port(dp->port_num, 1);
-    } else {
-	dp->out.ov.Offset += pb+len; /* For vanilla driver. */
-	/* XXX OffsetHigh should be changed too. */
-	ASSERT(erts_atomic_read_nob(&sys_misc_mem_sz) >= dp->outBufSize);
-	erts_atomic_add_nob(&sys_misc_mem_sz, -1*dp->outBufSize);
-	DRV_BUF_FREE(dp->outbuf);
-	dp->outBufSize = 0;
-	dp->outbuf = NULL;
+
+    qsz = driver_sizeq(dp->port_num);
+
+    if (qsz > 0) {
+        driver_enq_bin(dp->port_num, bin, 0, sz);
+        qsz += pb+len;
     }
+    else {
+        ASSERT(!dp->outbuf);
+        dp->outbuf = bin->orig_bytes;
+        dp->outBufSize = sz;
+        if (!async_write_file(&dp->out, dp->outbuf, sz)) {
+            driver_enq_bin(dp->port_num, bin, 0, sz);
+            qsz = sz;
+        } else {
+            dp->out.ov.Offset += pb+len; /* For vanilla driver. */
+            /* XXX OffsetHigh should be changed too. */
+            dp->outBufSize = 0;
+            dp->outbuf = NULL;
+        }
+    }
+
+    if (!dp->busy && qsz >= dp->high_watermark)
+        set_busy_port(dp->port_num, (dp->busy = !0));
+
+    /* Binary either handled or buffered */
+    driver_free_binary(bin);
+    
     /*return 0;*/
 }
 
@@ -2693,20 +2711,19 @@ ready_output(ErlDrvData drv_data, ErlDrvEvent ready_event)
     DWORD bytesWritten;
     DriverData *dp = (DriverData *) drv_data;
     int error;
+    ErlDrvSizeT qsz;
 
     if(dp->out.thread == (HANDLE) -1) {
 	dp->out.async_io_active = 0;
     }
     DEBUGF(("ready_output(%p, 0x%x)\n", drv_data, ready_event));
-    set_busy_port(dp->port_num, 0);
-    if (!(dp->outbuf)) {
+    if (!dp->outbuf) {
 	/* Happens because event sometimes get signalled during a successful
 	   write... */
 	return;
     }
-    ASSERT(erts_atomic_read_nob(&sys_misc_mem_sz) >= dp->outBufSize);
-    erts_atomic_add_nob(&sys_misc_mem_sz, -1*dp->outBufSize);
-    DRV_BUF_FREE(dp->outbuf);
+    
+    qsz = driver_deq(dp->port_num, dp->outBufSize);
     dp->outBufSize = 0;
     dp->outbuf = NULL;
 #ifdef HARD_POLL_DEBUG
@@ -2717,15 +2734,32 @@ ready_output(ErlDrvData drv_data, ErlDrvEvent ready_event)
     poll_debug_write_done(dp->out.ov.hEvent,bytesWritten);
 #endif
 
-    if (error == NO_ERROR) {
-	dp->out.ov.Offset += bytesWritten; /* For vanilla driver. */
-	return ; /* 0; */
+    if (error != NO_ERROR) {
+        (void) driver_select(dp->port_num, ready_event, ERL_DRV_WRITE, 0);
+        _dosmaperr(error);
+        driver_failure_posix(dp->port_num, errno);
+        return;
     }
-
-    (void) driver_select(dp->port_num, ready_event, ERL_DRV_WRITE, 0);
-    _dosmaperr(error);
-    driver_failure_posix(dp->port_num, errno);
-    /* return 0; */
+    
+    dp->out.ov.Offset += bytesWritten; /* For vanilla driver. */
+
+    while (qsz > 0) {
+        int vsize;
+        SysIOVec *iov = driver_peekq(dp->port_num, &vsize);
+        ASSERT(iov->iov_base && iov->iov_len);
+        dp->outbuf = iov->iov_base;
+        dp->outBufSize = iov->iov_len;
+        if (!async_write_file(&dp->out, dp->outbuf, dp->outBufSize))
+            break;
+        dp->out.ov.Offset += dp->outBufSize; /* For vanilla driver. */
+        /* XXX OffsetHigh should be changed too. */
+        qsz = driver_deq(dp->port_num, dp->outBufSize);
+        dp->outbuf = NULL;
+        dp->outBufSize = 0;
+    }
+    
+    if (dp->busy && qsz < dp->low_watermark)
+        set_busy_port(dp->port_num, (dp->busy = 0));
 }
 
 static void stop_select(ErlDrvEvent e, void* _)
diff --git a/erts/emulator/test/port_bif_SUITE.erl b/erts/emulator/test/port_bif_SUITE.erl
index e1e1ec9fb9..333c5bce04 100644
--- a/erts/emulator/test/port_bif_SUITE.erl
+++ b/erts/emulator/test/port_bif_SUITE.erl
@@ -26,7 +26,7 @@
 	 command_e_1/1, command_e_2/1, command_e_3/1, command_e_4/1,
 	 port_info1/1, port_info2/1,
 	 port_info_os_pid/1, port_info_race/1,
-	 connect/1, control/1, echo_to_busy/1]).
+	 connect/1, control/1, echo_to_busy/1, busy_options/1]).
 
 -export([do_command_e_1/1, do_command_e_2/1, do_command_e_4/1]).
 
@@ -38,7 +38,7 @@ suite() ->
 
 all() -> 
     [command, {group, port_info}, connect, control,
-     echo_to_busy].
+     echo_to_busy, busy_options].
 
 groups() -> 
     [{command_e, [],
@@ -475,3 +475,171 @@ sub_bin(Bin) when is_binary(Bin) ->
     B.
 
 id(I) -> I.
+
+busy_options(Config) when is_list(Config) ->
+    SleepTime = 2000,
+    SleepTimeX = SleepTime + 100,
+    MinVal = 1,
+    MaxVal = (1 bsl (8*erlang:system_info(wordsize))) - 2,
+    DataDir = proplists:get_value(data_dir, Config),
+    Sleep = filename:join(DataDir, "sleeper") ++ " " ++ integer_to_list(SleepTime),
+    Data = "hej hopp! hej hopp! hej hopp! hej hopp! hej hopp! hej hopp! hej hopp! hej hopp! hej hopp! hej hopp!",
+
+    process_flag(trap_exit, true),
+    Tester = self(),
+    HejLoop = fun (Prt, _F, 1000) ->
+                      Prt;
+                  (Prt, F, N) ->
+                      Prt ! {Tester, {command, Data}},
+                      F(Prt, F, N+1)
+              end,
+
+    io:format("Test1...~n", []),
+    Start1 = erlang:monotonic_time(millisecond),
+    Prt1 = open_port({spawn, Sleep},
+                     [{busy_limits_port, {MinVal, MinVal}},
+                      {busy_limits_msgq, {MinVal, MinVal}}]),
+    T1 = spawn_link(fun () ->
+                             HejLoop(Prt1, HejLoop, 0)
+                    end),
+    true = wait_until(fun () ->
+                              {status, suspended} == process_info(T1, status)
+                      end,
+                      SleepTimeX),
+    unlink(T1),
+    exit(T1, kill),
+    io:format("Test1 done: ~p ms~n", [erlang:monotonic_time(millisecond)-Start1]),
+
+    io:format("Test2...~n", []),
+    Start2 = erlang:monotonic_time(millisecond),
+    Prt2 = open_port({spawn, Sleep},
+                     [{busy_limits_port, {50, 100}},
+                      {busy_limits_msgq, {50, 100}}]),
+    T2 = spawn_link(fun () ->
+                            HejLoop(Prt2, HejLoop, 0)
+                    end),
+    true = wait_until(fun () ->
+                              {status, suspended} == process_info(T2, status)
+                      end,
+                      SleepTimeX),
+    unlink(T2),
+    exit(T2, kill),
+    io:format("Test2 done: ~p ms~n", [erlang:monotonic_time(millisecond)-Start2]),
+
+    io:format("Test3...~n", []),
+    Start3 = erlang:monotonic_time(millisecond),
+
+    Prt3 = open_port({spawn, Sleep},
+                     [{busy_limits_port, {MaxVal,MaxVal}},
+                      {busy_limits_msgq, {MaxVal,MaxVal}}]),
+    T3 = spawn_link(fun () ->
+                            HejLoop(Prt3, HejLoop, 0)
+                    end),
+    false = wait_until(fun () ->
+                              {status, suspended} == process_info(T3, status)
+                       end,
+                       SleepTimeX),
+    unlink(T3),
+    exit(T3, kill),
+    io:format("Test3 done: ~p ms~n", [erlang:monotonic_time(millisecond)-Start3]),
+
+    io:format("Test4...~n", []),
+    Start4 = erlang:monotonic_time(millisecond),
+
+    Prt4 = open_port({spawn, Sleep},
+                     [{busy_limits_port, disabled},
+                      {busy_limits_msgq, disabled}]),
+    T4 = spawn_link(fun () ->
+                            HejLoop(Prt4, HejLoop, 0)
+                    end),
+    false = wait_until(fun () ->
+                               {status, suspended} == process_info(T4, status)
+                       end,
+                       SleepTimeX),
+    unlink(T4),
+    exit(T4, kill),
+    io:format("Test4 done: ~p ms~n", [erlang:monotonic_time(millisecond)-Start4]),
+
+    try
+        open_port({spawn, Sleep},
+                  [{busy_limits_port, {MinVal-1,MinVal-1}}])
+    catch
+        error:badarg -> ok
+    end,
+
+    try
+        open_port({spawn, Sleep},
+                  [{busy_limits_msgq, {MinVal-1,MinVal-1}}])
+    catch
+        error:badarg -> ok
+    end,
+
+    try
+        open_port({spawn, Sleep},
+                  [{busy_limits_port, {MaxVal+1,MaxVal+1}}])
+    catch
+        error:badarg -> ok
+    end,
+
+    try
+        open_port({spawn, Sleep},
+                  [{busy_limits_msgq, {MaxVal+1,MaxVal+1}}])
+    catch
+        error:badarg -> ok
+    end,
+
+    load_control_drv(Config),
+
+    CtrlPort = open_port({spawn, "control_drv"},
+                         [{busy_limits_msgq, {50,100}}]),
+    unlink(CtrlPort),
+    exit(CtrlPort, kill),
+    
+    try
+        open_port({spawn, "control_drv"},
+                   [{busy_limits_port, {50,100}}])
+    catch
+        error:badarg -> ok
+    end,
+
+    receive {'EXIT', Prt1, _} -> ok end,
+    receive {'EXIT', Prt2, _} -> ok end,
+    receive {'EXIT', Prt3, _} -> ok end,
+    receive {'EXIT', Prt4, _} -> ok end,
+
+    ok.
+
+wait_until(Fun, infinity) ->
+    wait_until_aux(Fun, infinity);
+wait_until(Fun, MaxTime) ->
+    End = erlang:monotonic_time(millisecond) + MaxTime,
+    wait_until_aux(Fun, End).
+
+wait_until_aux(Fun, End) ->
+    case catch Fun() of
+        true ->
+            true;
+        _ ->
+            if End == infinity ->
+                    receive after 100 -> ok end,
+                    wait_until_aux(Fun, infinity);
+               true ->
+                    Now = erlang:monotonic_time(millisecond),
+                    case End =< Now of
+                        true ->
+                            false;
+                        _ ->
+                            Wait = case End - Now of
+                                       Short when End - Now < 100 ->
+                                           Short;
+                                       _ ->
+                                           100
+                                   end,
+                            receive after Wait -> ok end,
+                            wait_until_aux(Fun, End)
+                    end
+            end
+    end.
+                                   
+                                          
+                        
diff --git a/erts/emulator/test/port_bif_SUITE_data/Makefile.src b/erts/emulator/test/port_bif_SUITE_data/Makefile.src
index 1a2d348ecb..85ecf14eac 100644
--- a/erts/emulator/test/port_bif_SUITE_data/Makefile.src
+++ b/erts/emulator/test/port_bif_SUITE_data/Makefile.src
@@ -3,7 +3,7 @@ LD = @LD@
 CFLAGS = @CFLAGS@ -I@erl_include@ @DEFS@
 CROSSLDFLAGS = @CROSSLDFLAGS@
 
-all: control_drv@dll@ port_test@exe@
+all: control_drv@dll@ port_test@exe@ sleeper@exe@
 
 port_test@exe@: port_test@obj@
 	$(LD) $(CROSSLDFLAGS) -o port_test port_test@obj@ @LIBS@
@@ -11,4 +11,10 @@ port_test@exe@: port_test@obj@
 port_test@obj@: port_test.c
 	$(CC) -c -o port_test@obj@ $(CFLAGS) port_test.c
 
+sleeper@exe@: sleeper@obj@
+	$(LD) $(CROSSLDFLAGS) -o sleeper sleeper@obj@ @LIBS@
+
+sleeper@obj@: sleeper.c
+	$(CC) -c -o sleeper@obj@ $(CFLAGS) sleeper.c
+
 @SHLIB_RULES@
diff --git a/erts/emulator/test/port_bif_SUITE_data/sleeper.c b/erts/emulator/test/port_bif_SUITE_data/sleeper.c
new file mode 100644
index 0000000000..bd662f51a4
--- /dev/null
+++ b/erts/emulator/test/port_bif_SUITE_data/sleeper.c
@@ -0,0 +1,66 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 2020. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <errno.h>
+#ifndef __WIN32__
+#  include <unistd.h>
+#  include <sys/time.h>
+#else
+#  include "windows.h"
+#  include "winbase.h"
+#endif
+#include <sys/types.h>
+
+int
+main(int argc, char *argv[])
+{
+    long int ms;
+    char *endp;
+    
+    if (argc != 2) {
+        fprintf(stderr, "Invalid argument count: %d\n", argc);
+        exit(1);
+    }
+
+    errno = 0;
+    ms = strtol(argv[1], &endp, 10);
+    if (errno || argv[1] == endp || *endp != '\0' || ms < 0) {
+        if (errno == 0)
+            errno = EINVAL;
+        perror("Invalid timeout value");
+        exit(1);
+    }
+    
+#ifdef __WIN32__
+    Sleep(ms);
+#else
+    {
+        struct timeval t;
+        t.tv_sec = ms/1000;
+        t.tv_usec = (ms % 1000) * 1000;
+
+        select(0, NULL, NULL, NULL, &t);
+    }
+#endif
+  
+  return 0;
+}
diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl
index 5df74b9668..82d0aa91f8 100644
--- a/erts/preloaded/src/erlang.erl
+++ b/erts/preloaded/src/erlang.erl
@@ -2205,7 +2205,9 @@ nodes(_Arg) ->
            | binary
            | eof
 	   | {parallelism, Boolean :: boolean()}
-	   | hide.
+	   | hide
+           | {busy_limits_port, {non_neg_integer(), non_neg_integer()} | disabled}
+           | {busy_limits_msgq, {non_neg_integer(), non_neg_integer()} | disabled}.
 open_port(PortName, PortSettings) ->
     case case erts_internal:open_port(PortName, PortSettings) of
 	     Ref when erlang:is_reference(Ref) -> receive {Ref, Res} -> Res end;
-- 
2.16.4