File 0145-Keep-underutilized-buffer-for-all-recv-operations.patch of Package erlang
From f3df06192adcc22b63bfe8d9efd5a3fbea3d59ce Mon Sep 17 00:00:00 2001
From: Raimo Niskanen <raimo@erlang.org>
Date: Mon, 17 Mar 2025 17:17:30 +0100
Subject: [PATCH] Keep underutilized buffer for all recv operations
---
erts/emulator/nifs/unix/unix_socket_syncio.c | 148 +++++++++++--------
lib/kernel/src/socket.erl | 43 ++++--
2 files changed, 120 insertions(+), 71 deletions(-)
diff --git a/erts/emulator/nifs/unix/unix_socket_syncio.c b/erts/emulator/nifs/unix/unix_socket_syncio.c
index 922b127ff9..1b6bfc1eb8 100644
--- a/erts/emulator/nifs/unix/unix_socket_syncio.c
+++ b/erts/emulator/nifs/unix/unix_socket_syncio.c
@@ -325,6 +325,12 @@ static ERL_NIF_TERM essio_sendfile_ok(ErlNifEnv* env,
size_t count);
#endif
+static BOOLEAN_T recv_alloc_buf(size_t size,
+ ErlNifBinary *bufP);
+static BOOLEAN_T recv_create_bin(ErlNifBinary *bufP,
+ size_t size,
+ ErlNifBinary *binP);
+
static BOOLEAN_T recv_check_entry(ErlNifEnv *env,
ESockDescriptor *descP,
ERL_NIF_TERM recvRef,
@@ -2725,7 +2731,7 @@ ERL_NIF_TERM essio_recv(ErlNifEnv* env,
int flags)
{
int saveErrno;
- ErlNifBinary buf;
+ ErlNifBinary bin, *bufP;
ssize_t readResult;
size_t bufSz = (len != 0 ? len : descP->rBufSz); // 0 means default
ERL_NIF_TERM ret;
@@ -2746,16 +2752,8 @@ ERL_NIF_TERM essio_recv(ErlNifEnv* env,
return ret;
}
- /* Allocate the receive buffer */
- if (descP->buf.data == NULL) {
- ESOCK_ASSERT( ALLOC_BIN(bufSz, &buf) );
- }
- else {
- buf = descP->buf;
- if (buf.size != bufSz) {
- REALLOC_BIN(&buf, bufSz);
- }
- }
+ bufP = &descP->buf;
+ ESOCK_ASSERT( recv_alloc_buf(bufSz, bufP) );
SSDBG( descP, ("UNIX-ESSIO", "essio_recv {%d} -> try read (%lu)\r\n",
descP->sock, (unsigned long) len) );
@@ -2764,7 +2762,7 @@ ERL_NIF_TERM essio_recv(ErlNifEnv* env,
esock_atom_read_tries, &descP->readTries, 1);
/* recv() */
- readResult = sock_recv(descP->sock, buf.data, buf.size, flags);
+ readResult = sock_recv(descP->sock, bufP->data, bufP->size, flags);
saveErrno = ESOCK_IS_ERROR(readResult) ? sock_errno() : 0;
SSDBG( descP, ("UNIX-ESSIO",
@@ -2775,13 +2773,13 @@ ERL_NIF_TERM essio_recv(ErlNifEnv* env,
if (! recv_check_result(env, descP, sockRef, recvRef,
readResult, saveErrno, &ret) ) {
/* Keep the buffer */
- descP->buf = buf;
return ret;
}
/* readResult >= 0 */
- ESOCK_ASSERT( readResult <= buf.size );
- if (readResult < buf.size) {
+ ESOCK_ASSERT( recv_create_bin(bufP, readResult, &bin) );
+
+ if (bin.size < bufP->size) {
/* +++ We did not fill the buffer +++ */
@@ -2789,26 +2787,11 @@ ERL_NIF_TERM essio_recv(ErlNifEnv* env,
("UNIX-ESSIO",
"essio_recv {%d} -> [%lu] "
"did not fill the buffer (%ld)\r\n",
- descP->sock, (unsigned long) buf.size,
- (long) readResult) );
-
- if (// Less than 4K (1 page) wasted
- readResult >= (buf.size & ~4095) ||
- // Less than 25% wasted
- readResult >= (buf.size >> 1) + (buf.size >> 2)) {
- //
- /* Reallocate and drop buffer */
- descP->buf.data = NULL;
- ESOCK_ASSERT( REALLOC_BIN(&buf, readResult) );
- }
- else {
- /* Keep buffer, copy content to new binary*/
- descP->buf = buf;
- ESOCK_ASSERT( ALLOC_BIN(readResult, &buf) );
- sys_memcpy(buf.data, descP->buf.data, buf.size);
- }
+ descP->sock, (unsigned long) bufP->size,
+ (unsigned long) bin.size) );
+
/* Return {ok|timeout|select|select_read, Bin} */
- return recv_check_partial(env, descP, sockRef, recvRef, len, &buf);
+ return recv_check_partial(env, descP, sockRef, recvRef, len, &bin);
} else {
@@ -2817,11 +2800,10 @@ ERL_NIF_TERM essio_recv(ErlNifEnv* env,
SSDBG( descP,
("UNIX-ESSIO",
"essio_recv {%d} -> [%lu] filled the buffer\r\n",
- descP->sock, (unsigned long) buf.size) );
+ descP->sock, (unsigned long) bin.size) );
- descP->buf.data = NULL; // Drop buffer
/* Return {more|ok|select_read, Bin} */
- return recv_check_full(env, descP, sockRef, recvRef, len, &buf);
+ return recv_check_full(env, descP, sockRef, recvRef, len, &bin);
}
}
@@ -2844,7 +2826,7 @@ ERL_NIF_TERM essio_recvfrom(ErlNifEnv* env,
SOCKLEN_T fromAddrLen;
ssize_t readResult;
int saveErrno;
- ErlNifBinary buf;
+ ErlNifBinary bin, *bufP;
size_t bufSz = (len != 0 ? len : descP->rBufSz); // 0 means default
ERL_NIF_TERM ret;
@@ -2861,8 +2843,8 @@ ERL_NIF_TERM essio_recvfrom(ErlNifEnv* env,
return ret;
}
- /* Allocate the receive buffer */
- ESOCK_ASSERT( ALLOC_BIN(bufSz, &buf) );
+ bufP = &descP->buf;
+ ESOCK_ASSERT( recv_alloc_buf(bufSz, bufP) );
ESOCK_CNT_INC(env, descP, sockRef,
esock_atom_read_tries, &descP->readTries, 1);
@@ -2871,18 +2853,19 @@ ERL_NIF_TERM essio_recvfrom(ErlNifEnv* env,
sys_memzero((char*) &fromAddr, fromAddrLen);
/* recvfrom() */
- readResult = sock_recvfrom(descP->sock, buf.data, buf.size, flags,
+ readResult = sock_recvfrom(descP->sock, bufP->data, bufP->size, flags,
&fromAddr.sa, &fromAddrLen);
saveErrno = ESOCK_IS_ERROR(readResult) ? sock_errno() : 0;
/* Check for errors and end of stream */
if (! recv_check_result(env, descP, sockRef, recvRef,
readResult, saveErrno, &ret) ) {
- FREE_BIN(&buf);
+ /* Keep the buffer */
return ret;
}
/* readResult >= 0 */
- ESOCK_ASSERT( readResult <= buf.size );
+
+ ESOCK_ASSERT( recv_create_bin(bufP, readResult, &bin) );
/* The recvfrom function delivers one (1) message. If our buffer
* is too small, the message will be truncated. So, regardless
@@ -2892,18 +2875,14 @@ ERL_NIF_TERM essio_recvfrom(ErlNifEnv* env,
* Encode the message and source address
*/
- if (readResult < buf.size) {
- ESOCK_ASSERT( REALLOC_BIN(&buf, readResult) );
- }
-
descP->rNumCnt = 0;
ESOCK_CNT_INC(env, descP, sockRef, esock_atom_read_pkg,
&descP->readPkgCnt, 1);
ESOCK_CNT_INC(env, descP, sockRef, esock_atom_read_byte,
- &descP->readByteCnt, buf.size);
- if (buf.size > descP->readPkgMax)
- descP->readPkgMax = buf.size;
+ &descP->readByteCnt, bin.size);
+ if (bin.size > descP->readPkgMax)
+ descP->readPkgMax = bin.size;
esock_encode_sockaddr(env,
&fromAddr, fromAddrLen,
@@ -2913,7 +2892,7 @@ ERL_NIF_TERM essio_recvfrom(ErlNifEnv* env,
* erlang term in env (no need to free; it will be GC:ed).
*/
/* {FromAddr, Bin} */
- ret = MKT2(env, ret, MKBIN(env, &buf));
+ ret = MKT2(env, ret, MKBIN(env, &bin));
if (descP->selectRead && (COMPARE(recvRef, esock_atom_zero) != 0)) {
/* Return {select_read, {FromAddr, Bin}} */
@@ -2950,8 +2929,7 @@ ERL_NIF_TERM essio_recvmsg(ErlNifEnv* env,
size_t ctrlSz = (ctrlLen != 0 ? ctrlLen : descP->rCtrlSz);
struct msghdr msgHdr;
SysIOVec iov[1]; // Shall we always use 1?
- ErlNifBinary data[1]; // Shall we always use 1?
- ErlNifBinary ctrl;
+ ErlNifBinary ctrl, bin, *bufP;
ERL_NIF_TERM ret;
ESockAddress addr;
@@ -2970,9 +2948,10 @@ ERL_NIF_TERM essio_recvmsg(ErlNifEnv* env,
return ret;
}
- /* Allocate the (msg) data buffer:
+ /* Allocate the data buffer
*/
- ESOCK_ASSERT( ALLOC_BIN(bufSz, &data[0]) );
+ bufP = &descP->buf;
+ ESOCK_ASSERT( recv_alloc_buf(bufSz, bufP) );
/* Allocate the ctrl (buffer):
*/
@@ -2985,8 +2964,8 @@ ERL_NIF_TERM essio_recvmsg(ErlNifEnv* env,
sys_memzero((char*) &addr, addrLen);
sys_memzero((char*) &msgHdr, sizeof(msgHdr));
- iov[0].iov_base = data[0].data;
- iov[0].iov_len = data[0].size;
+ iov[0].iov_base = bufP->data;
+ iov[0].iov_len = bufP->size;
msgHdr.msg_name = &addr;
msgHdr.msg_namelen = addrLen;
@@ -3002,12 +2981,14 @@ ERL_NIF_TERM essio_recvmsg(ErlNifEnv* env,
/* Check for errors and end of stream */
if (! recv_check_result(env, descP, sockRef, recvRef,
readResult, saveErrno, &ret) ) {
- FREE_BIN(&data[0]);
+ /* Keep the data buffer */
FREE_BIN(&ctrl);
return ret;
}
/* readResult >= 0 */
+ ESOCK_ASSERT( recv_create_bin(bufP, readResult, &bin) );
+
/* The recvmsg function delivers one (1) message. If our buffer
* is to small, the message will be truncated. So, regardless
* if we filled the buffer or not, we have got what we are going
@@ -3038,7 +3019,7 @@ ERL_NIF_TERM essio_recvmsg(ErlNifEnv* env,
descP->readPkgMax = readResult;
encode_msg(env, descP,
- readResult, &msgHdr, &data[0], &ctrl,
+ readResult, &msgHdr, &bin, &ctrl,
&ret);
if (descP->selectRead && (COMPARE(recvRef, esock_atom_zero) != 0)) {
@@ -6861,6 +6842,55 @@ void essio_down(ErlNifEnv* env,
/* *** Recv/recvfrom/recvmsg utility functions *** */
+static
+BOOLEAN_T recv_alloc_buf(size_t size,
+ ErlNifBinary *bufP)
+{
+ if (bufP->data == NULL) {
+ return ALLOC_BIN(size, bufP);
+ }
+ else {
+ if (size != bufP->size)
+ return REALLOC_BIN(bufP, size);
+ else
+ return TRUE;
+ }
+}
+
+static
+BOOLEAN_T recv_create_bin(ErlNifBinary *bufP, size_t size, ErlNifBinary *binP)
+{
+ /* Don't touch bufP->size
+ */
+ if (size >= bufP->size) {
+ /* Buffer full
+ * - use it as return binary and drop buffer
+ */
+ ESOCK_ASSERT( bufP->size >= size );
+ *binP = *bufP;
+ bufP->data = NULL;
+ return TRUE;
+ }
+ else if (size >= (bufP->size & ~4095) ||
+ size >= (bufP->size >> 1) + (bufP->size >> 2)) {
+ /* Less than a 4 K page shrink or less than 25% shrink
+ * - reallocate and drop buffer
+ */
+ *binP = *bufP;
+ bufP->data = NULL;
+ return REALLOC_BIN(binP, size);
+ }
+ else {
+ BOOLEAN_T ret;
+ /* Keep buffer, copy content to new allocated binary
+ */
+ ret = ALLOC_BIN(size, binP);
+ if (ret)
+ sys_memcpy(binP->data, bufP->data, size);
+ return ret;
+ }
+}
+
static
BOOLEAN_T recv_check_entry(ErlNifEnv *env,
ESockDescriptor *descP,
diff --git a/lib/kernel/src/socket.erl b/lib/kernel/src/socket.erl
index ca1271d6dc..4ad7128033 100644
--- a/lib/kernel/src/socket.erl
+++ b/lib/kernel/src/socket.erl
@@ -5427,20 +5427,38 @@ recv_deadline(SockRef, Length, Flags, Deadline, Buf) ->
%%
{select_read, Bin} -> %% All data, new recv operation in progress
+ %% The combination of select_read and recv with time-out
+ %% is contradictive since the return values has no place
+ %% for a continuation because neither a ref nor 'nowait'
+ %% was given, so we handle this as if there was no select_read
+ %% by cancelling the new recv operation
+ %%
_ = cancel(SockRef, recv, Handle),
{ok, condense_buffer(Bin, Buf)};
%%
- Select %% select | {select, Bin} %% No data or incomplete
- when Select =:= select;
- tuple_size(Select) =:= 2, element(1, Select) =:= select ->
- {Length_1, Buf_1} =
- if
- Select =:= select ->
- {Length, Buf};
- true ->
- Bin = element(2, Select),
- {Length - byte_size(Bin), [Bin | Buf]}
- end,
+ select ->
+ %%
+ %% There is nothing just now, but we will be notified
+ %% with a select message when there is something to recv
+ Timeout = timeout(Deadline),
+ receive
+ ?socket_msg(?socket(SockRef), select, Handle) ->
+ if
+ 0 < Timeout ->
+ %% Retry
+ recv_deadline(
+ SockRef, Length, Flags, Deadline, Buf);
+ true ->
+ recv_error(timeout, Buf)
+ end;
+ ?socket_msg(_Socket, abort, {Handle, Reason}) ->
+ recv_error(Reason, Buf)
+ after Timeout ->
+ _ = cancel(SockRef, recv, Handle),
+ recv_error(timeout, Buf)
+ end;
+ {select, Bin} ->
+ Buf_1 = [Bin | Buf],
%%
%% There is nothing just now, but we will be notified
%% with a select message when there is something to recv
@@ -5451,7 +5469,8 @@ recv_deadline(SockRef, Length, Flags, Deadline, Buf) ->
0 < Timeout ->
%% Retry
recv_deadline(
- SockRef, Length_1, Flags, Deadline, Buf_1);
+ SockRef, Length - byte_size(Bin),
+ Flags, Deadline, Buf_1);
true ->
recv_error(timeout, Buf_1)
end;
--
2.51.0