File 1303-sendmmsg-handle-partial-writes-and-trim-allocations.patch of Package erlang
From d10e3dd2307809f631ee13c2e5f3f4277729666e Mon Sep 17 00:00:00 2001
From: Nelson Vides <videsnelson@gmail.com>
Date: Thu, 29 Jan 2026 19:02:38 +0100
Subject: [PATCH 03/14] sendmmsg: handle partial writes and trim allocations
- Use msg_len from sendmmsg() to detect partial writes; same rule as
send_check_result (written < dataSize => partial).
- Success: return 'ok' when all messages fully sent; {ok, Rest} when
any partial, with Rest = list of rest iovecs for partial messages
only (same shape as sendmsg rest data).
---
erts/emulator/nifs/unix/unix_socket_syncio.c | 54 +++++++--
lib/kernel/src/socket.erl | 119 ++++++++-----------
lib/kernel/test/socket_SUITE.erl | 21 ++--
3 files changed, 107 insertions(+), 87 deletions(-)
diff --git a/erts/emulator/nifs/unix/unix_socket_syncio.c b/erts/emulator/nifs/unix/unix_socket_syncio.c
index 99234c621d..71b462725b 100644
--- a/erts/emulator/nifs/unix/unix_socket_syncio.c
+++ b/erts/emulator/nifs/unix/unix_socket_syncio.c
@@ -4320,6 +4320,10 @@ ERL_NIF_TERM essio_recvmmsg(ErlNifEnv* env,
#ifdef HAVE_SENDMMSG
/* ========================================================================
+ * Same criterion as send_check_result: written < dataSize => partial.
+ * We do not call send_check_result (it has side effects: writer state,
+ * stats). We only produce per-message result: 'full' or bytes written,
+ * so Erlang can build rest iovecs by slicing (same as sendmsg continuation).
*/
ERL_NIF_TERM essio_sendmmsg(ErlNifEnv* env,
ESockDescriptor* descP,
@@ -4374,7 +4378,7 @@ ERL_NIF_TERM essio_sendmmsg(ErlNifEnv* env,
}
if (msgCount == 0) {
- return esock_make_ok2(env, MKI(env, 0));
+ return esock_atom_ok;
}
if (msgCount > ESOCK_MMSG_MAX)
@@ -4421,9 +4425,6 @@ ERL_NIF_TERM essio_sendmmsg(ErlNifEnv* env,
while (!enif_is_empty_list(env, tail) && i < msgCount) {
ERL_NIF_TERM tail2;
sys_memzero((char*) &sendMmsghdrs[i], sizeof(struct mmsghdr));
- ctrlBufLens[i] = 0;
- ctrlBufUseds[i] = 0;
-
enif_get_list_cell(env, tail, &eMsg, &tail);
/* Extract address */
@@ -4439,7 +4440,6 @@ ERL_NIF_TERM essio_sendmmsg(ErlNifEnv* env,
}
} else {
sendMmsghdrs[i].msg_hdr.msg_name = NULL;
- sendMmsghdrs[i].msg_hdr.msg_namelen = 0;
}
/* Extract IOV */
@@ -4470,9 +4470,7 @@ ERL_NIF_TERM essio_sendmmsg(ErlNifEnv* env,
sendMmsghdrs[i].msg_hdr.msg_controllen = ctrlBufUseds[i];
} else {
sendMmsghdrs[i].msg_hdr.msg_control = NULL;
- sendMmsghdrs[i].msg_hdr.msg_controllen = 0;
}
- sendMmsghdrs[i].msg_hdr.msg_flags = 0;
i++;
}
@@ -4487,7 +4485,47 @@ ERL_NIF_TERM essio_sendmmsg(ErlNifEnv* env,
ret = send_check_result(env, descP, sendResult, 0, FALSE,
sockRef, sendRef);
} else {
- ret = esock_make_ok2(env, MKI(env, sendResult));
+ /*
+ * Same criterion as send_check_result: for each updated message,
+ * written < dataSize => partial. Only add partials to the result
+ * list; each element is {Index, Written} so Erlang can slice the
+ * right message. Two indexes: i over messages, resultIdx over
+ * result list (only incremented for partials).
+ */
+ unsigned int updatedCount = (unsigned int) sendResult;
+ BOOLEAN_T allFull = TRUE;
+ ERL_NIF_TERM* resultElems = NULL;
+ unsigned int resultIdx = 0;
+ unsigned int k;
+
+ if (updatedCount > 0) {
+ resultElems = (ERL_NIF_TERM*) enif_alloc(updatedCount * sizeof(ERL_NIF_TERM));
+ if (!resultElems) {
+ ret = esock_make_error_errno(env, ENOMEM);
+ goto cleanup;
+ }
+ for (i = 0; i < updatedCount; i++) {
+ size_t expectedLen = 0;
+ for (k = 0; k < iovecPtrs[i]->iovcnt; k++) {
+ expectedLen += iovecPtrs[i]->iov[k].iov_len;
+ }
+ if (sendMmsghdrs[i].msg_len != expectedLen) {
+ allFull = FALSE;
+ resultElems[resultIdx++] = MKT2(env, MKI(env, (int) i),
+ MKI(env, (int) sendMmsghdrs[i].msg_len));
+ }
+ }
+ if (allFull) {
+ enif_free(resultElems);
+ ret = esock_atom_ok;
+ } else {
+ ret = esock_make_ok2(env,
+ enif_make_list_from_array(env, resultElems, resultIdx));
+ enif_free(resultElems);
+ }
+ } else {
+ ret = esock_atom_ok;
+ }
}
cleanup:
diff --git a/lib/kernel/src/socket.erl b/lib/kernel/src/socket.erl
index ea7010408f..9454a7738a 100644
--- a/lib/kernel/src/socket.erl
+++ b/lib/kernel/src/socket.erl
@@ -4963,88 +4963,51 @@ multiple datagrams.
> This function is only available on Linux and BSD systems (not macOS/Darwin or Windows).
> On unsupported platforms, it will return `{error, notsup}`.
-Returns `{ok, SentCount}` where `SentCount` is the number of messages successfully sent.
+On success, returns either:
+- **`ok`** – when all messages were sent in full (or there were zero messages).
+- **`{ok, Rest}`** – when one or more messages had a partial write. `Rest` is a list with one
+ element per message that was not fully sent, in message order. Each element is the
+ remaining data for that message in the same form as [`sendmsg/4`](`sendmsg/4`)'s rest data
+ (`t:erlang:iovec/0`), so you can retry with `sendmsg` for each.
+
+On error returns `{error, Reason}`.
""".
-spec sendmmsg(Socket, Msgs, Flags, Timeout :: 'infinity') ->
'ok' |
- {'ok', RestData} |
- {'error', Reason} |
- {'error', {Reason, RestData}}
+ {'ok', Rest} |
+ {'error', Reason}
when
- Socket :: socket(),
- Msgs :: [msg_send()],
- Flags :: [msg_flag() | integer()],
- RestData :: erlang:iovec(),
- Reason :: posix() | 'closed' | invalid();
+ Socket :: socket(),
+ Msgs :: [msg_send()],
+ Flags :: [msg_flag() | integer()],
+ Rest :: [erlang:iovec()],
+ Reason :: posix() | 'closed' | invalid();
(Socket, Msgs, Flags, Timeout :: non_neg_integer()) ->
'ok' |
- {'ok', RestData} |
- {'error', Reason | 'timeout'} |
- {'error', {Reason | 'timeout', RestData}}
+ {'ok', Rest} |
+ {'error', Reason | 'timeout'}
when
- Socket :: socket(),
- Msgs :: [msg_send()],
- Flags :: [msg_flag() | integer()],
- RestData :: erlang:iovec(),
- Reason :: posix() | 'closed' | invalid();
+ Socket :: socket(),
+ Msgs :: [msg_send()],
+ Flags :: [msg_flag() | integer()],
+ Rest :: [erlang:iovec()],
+ Reason :: posix() | 'closed' | invalid();
(Socket, Msgs, Flags, 'nowait' | Handle) ->
'ok' |
- {'ok', RestData} |
+ {'ok', Rest} |
+ {'select_write', {SelectInfo, SentCount}} |
{'select', SelectInfo} |
- {'select', {SelectInfo, RestData}} |
{'completion', CompletionInfo} |
- {'error', Reason} |
- {'error', {Reason, RestData}}
+ {'error', Reason}
when
Socket :: socket(),
Msgs :: [msg_send()],
Flags :: [msg_flag() | integer()],
Handle :: select_handle() | completion_handle(),
- RestData :: erlang:iovec(),
- SelectInfo :: select_info(),
- CompletionInfo :: completion_info(),
- Reason :: posix() | 'closed' | invalid();
-
- (Socket, Data, Cont, Timeout :: 'infinity') ->
- 'ok' |
- {'ok', RestData} |
- {'error', Reason} |
- {'error', {Reason, RestData}}
- when
- Socket :: socket(),
- Data :: msg_send() | erlang:iovec(),
- Cont :: select_info(),
- RestData :: erlang:iovec(),
- Reason :: posix() | 'closed' | invalid();
-
- (Socket, Data, Cont, Timeout :: non_neg_integer()) ->
- 'ok' |
- {'ok', RestData} |
- {'error', Reason | 'timeout'} |
- {'error', {Reason | 'timeout', RestData}}
- when
- Socket :: socket(),
- Data :: msg_send() | erlang:iovec(),
- Cont :: select_info(),
- RestData :: erlang:iovec(),
- Reason :: posix() | 'closed' | invalid();
-
- (Socket, Data, Cont, 'nowait' | Handle) ->
- 'ok' |
- {'ok', RestData} |
- {'select', SelectInfo} |
- {'select', {SelectInfo, RestData}} |
- {'completion', CompletionInfo} |
- {'error', Reason} |
- {'error', {Reason, RestData}}
- when
- Socket :: socket(),
- Data :: msg_send() | erlang:iovec(),
- Cont :: select_info(),
- Handle :: select_handle(),
- RestData :: erlang:iovec(),
+ Rest :: [erlang:iovec()],
+ SentCount :: non_neg_integer(),
SelectInfo :: select_info(),
CompletionInfo :: completion_info(),
Reason :: posix() | 'closed' | invalid().
@@ -5076,6 +5039,22 @@ sendmmsg(?socket(SockRef), Msgs, Flags, Timeout)
sendmmsg(Socket, Msgs, Flags, Timeout) ->
error(badarg, [Socket, Msgs, Flags, Timeout]).
+%% Build rest iovecs from partials-only result list.
+%% C returns [{Index, Written}, ...] in message order; we slice the Index-th message's iov.
+sendmmsg_rest_from_result(Msgs, ResultList) ->
+ [iovec_rest(maps:get(iov, lists:nth(Index + 1, Msgs)), Written) ||
+ {Index, Written} <- ResultList].
+
+%% Skip first Written bytes from IOV; return rest as iovec (same as sendmsg rest).
+iovec_rest(IOV, Written) when Written =< 0 ->
+ IOV;
+iovec_rest([], _) ->
+ [];
+iovec_rest([Bin | Rest], Written) when byte_size(Bin) =< Written ->
+ iovec_rest(Rest, Written - byte_size(Bin));
+iovec_rest([Bin | Rest], Written) when byte_size(Bin) > Written ->
+ [binary:part(Bin, Written, byte_size(Bin) - Written) | Rest].
+
sendmmsg_nowait(SockRef, Msgs, Flags, Handle) ->
case prim_socket:sendmmsg(SockRef, Msgs, Flags, Handle) of
{select_write = Tag, SentCount} ->
@@ -5084,8 +5063,10 @@ sendmmsg_nowait(SockRef, Msgs, Flags, Handle) ->
{Tag, ?SELECT_INFO(sendmmsg, Handle)};
completion = Tag ->
{Tag, ?COMPLETION_INFO(sendmmsg, Handle)};
- {ok, SentCount} ->
- {ok, SentCount};
+ ok ->
+ ok;
+ {ok, ResultList} ->
+ {ok, sendmmsg_rest_from_result(Msgs, ResultList)};
{error, _} = Error ->
Error
end.
@@ -5111,8 +5092,10 @@ sendmmsg_deadline(SockRef, Msgs, Flags, Deadline) ->
_ = cancel(SockRef, sendmmsg, Handle),
{error, timeout}
end;
- {ok, SentCount} ->
- {ok, SentCount};
+ ok ->
+ ok;
+ {ok, ResultList} ->
+ {ok, sendmmsg_rest_from_result(Msgs, ResultList)};
{error, _} = Error ->
Error
end.
diff --git a/lib/kernel/test/socket_SUITE.erl b/lib/kernel/test/socket_SUITE.erl
index 642312afe7..1140bdb567 100644
--- a/lib/kernel/test/socket_SUITE.erl
+++ b/lib/kernel/test/socket_SUITE.erl
@@ -14949,7 +14949,7 @@ sendmmsg_basic_udp4(_Config) when is_list(_Config) ->
#{iov => [<<"msg4">>]},
#{iov => [<<"msg5">>]}
],
- {ok, 5} = socket:sendmmsg(S2, Msgs, [], infinity),
+ ok = socket:sendmmsg(S2, Msgs, [], infinity),
%% Receive them one by one
lists:foreach(
fun(Expected) ->
@@ -14990,7 +14990,7 @@ sendmmsg_basic_udp6(_Config) when is_list(_Config) ->
#{iov => [<<"msg2">>]},
#{iov => [<<"msg3">>]}
],
- {ok, 3} = socket:sendmmsg(S2, Msgs, [], infinity),
+ ok = socket:sendmmsg(S2, Msgs, [], infinity),
%% Receive them one by one
lists:foreach(
fun(Expected) ->
@@ -15031,7 +15031,7 @@ recvmmsg_sendmmsg_loopback_udp4(_Config) when is_list(_Config) ->
#{iov => [list_to_binary(["msg", integer_to_list(N)])]}
|| N <- lists:seq(1, 10)
],
- {ok, 10} = socket:sendmmsg(S2, Msgs, [], infinity),
+ ok = socket:sendmmsg(S2, Msgs, [], infinity),
%% Receive all 10 messages at once
{ok, Received} = socket:recvmmsg(S1, 10, 0, 0, [], infinity),
true = length(Received) =:= 10,
@@ -15066,7 +15066,7 @@ recvmmsg_sendmmsg_loopback_udp6(_Config) when is_list(_Config) ->
#{iov => [list_to_binary(["msg", integer_to_list(N)])]}
|| N <- lists:seq(1, 5)
],
- {ok, 5} = socket:sendmmsg(S2, Msgs, [], infinity),
+ ok = socket:sendmmsg(S2, Msgs, [], infinity),
%% Receive all 5 messages at once
{ok, Received} = socket:recvmmsg(S1, 10, 0, 0, [], infinity),
true = length(Received) =:= 5,
@@ -15174,7 +15174,7 @@ sendmmsg_writer(Sock, Id, Parent) ->
Msg = list_to_binary(io_lib:format("msg~p", [Id])),
Msgs = [#{iov => [Msg]}],
case socket:sendmmsg(Sock, Msgs, [], infinity) of
- {ok, 1} ->
+ ok ->
Parent ! {self(), ok};
Other ->
Parent ! {self(), Other}
@@ -15327,7 +15327,7 @@ sendmmsg_large_batch_udp4(_Config) when is_list(_Config) ->
NumMessages = 100,
Msgs = [#{iov => [list_to_binary(io_lib:format("msg~p", [N]))]}
|| N <- lists:seq(1, NumMessages)],
- {ok, 100} = socket:sendmmsg(S2, Msgs, [], infinity),
+ ok = socket:sendmmsg(S2, Msgs, [], infinity),
%% Receive them to verify
ReceivedMsgs = lists:map(
fun(_) ->
@@ -15433,7 +15433,7 @@ sendmmsg_select_nowait_udp4(_Config) when is_list(_Config) ->
%% Normal case: sendmmsg with nowait should succeed immediately for small data
Msgs = [#{iov => [<<"test">>]}],
case socket:sendmmsg(S2, Msgs, [], nowait) of
- {ok, 1} ->
+ ok ->
ok;
{select, {select_info, sendmmsg, _SelectHandle}} ->
%% Socket buffer might be full (unlikely but possible)
@@ -15478,7 +15478,7 @@ sendmmsg_with_addresses_udp4(_Config) when is_list(_Config) ->
#{addr => #{family => inet, addr => Addr, port => R2Port},
iov => [<<"to_r2">>]}
],
- {ok, 2} = socket:sendmmsg(Sender, Msgs, [], infinity),
+ ok = socket:sendmmsg(Sender, Msgs, [], infinity),
%% Verify R1 got its message
{ok, Msg1} = socket:recvmsg(R1),
[<<"to_r1">>] = maps:get(iov, Msg1),
@@ -15512,7 +15512,7 @@ sendmmsg_invalid_msg_format(_Config) when is_list(_Config) ->
{ok, #{port := LocalPort}} = socket:sockname(S1),
ok = socket:connect(S2, #{family => inet, addr => Addr, port => LocalPort}),
%% Empty message list should return {ok, 0}
- {ok, 0} = socket:sendmmsg(S2, [], [], infinity),
+ ok = socket:sendmmsg(S2, [], [], infinity),
%% Message without iov field should fail
InvalidMsgs1 = [#{addr => #{family => inet, addr => Addr, port => LocalPort}}],
case catch socket:sendmmsg(S2, InvalidMsgs1, [], infinity) of
@@ -15609,8 +15609,7 @@ sendmmsg_dirty_scheduler_udp4(_Config) when is_list(_Config) ->
|| N <- lists:seq(1, NumMessages)],
%% Send with msgCount > 64 to trigger dirty scheduler rescheduling
%% The NIF should reschedule to a dirty I/O scheduler
- {ok, SentCount} = socket:sendmmsg(S2, Msgs, [], infinity),
- true = SentCount =:= NumMessages,
+ ok = socket:sendmmsg(S2, Msgs, [], infinity),
%% Receive and verify all messages
ReceivedMsgs = lists:map(
fun(_) ->
--
2.51.0