File 2460-Mimic-legacy-behaviour-of-recbuf-and-buffer.patch of Package erlang
From f5762394b66201cba00d35e9367ec6eaec20279b Mon Sep 17 00:00:00 2001
From: Raimo Niskanen <raimo@erlang.org>
Date: Wed, 17 Jan 2024 18:04:50 +0100
Subject: [PATCH 10/14] Mimic legacy behaviour of 'recbuf' and 'buffer'
Optimize by choosing receive size from set buffer size
so for small receive sizes use recv 0 (default size)
and pick packets from the buffer, but for large
receive sizes receive what's missing from the packet.
---
erts/emulator/nifs/common/prim_socket_nif.c | 8 +-
lib/kernel/src/gen_tcp_socket.erl | 103 ++++++++++++++------
2 files changed, 81 insertions(+), 30 deletions(-)
diff --git a/erts/emulator/nifs/common/prim_socket_nif.c b/erts/emulator/nifs/common/prim_socket_nif.c
index b80f7bea5f..a61131c177 100644
--- a/erts/emulator/nifs/common/prim_socket_nif.c
+++ b/erts/emulator/nifs/common/prim_socket_nif.c
@@ -460,6 +460,7 @@ static void (*esock_sctp_freepaddrs)(struct sockaddr *addrs) = NULL;
#else
#define ESOCK_RECV_BUFFER_SIZE_DEFAULT (8*1024)
#endif
+#define ESOCK_RECV_BUFFER_SIZE_MIN 1
#define ESOCK_RECV_CTRL_BUFFER_SIZE_DEFAULT 1024
#define ESOCK_SEND_CTRL_BUFFER_SIZE_DEFAULT 1024
@@ -6945,7 +6946,10 @@ ERL_NIF_TERM esock_setopt_otp_rcvbuf(ErlNifEnv* env,
#ifndef __WIN32__
descP->rNum = n;
#endif
- descP->rBufSz = bufSz;
+ if (bufSz < ESOCK_RECV_BUFFER_SIZE_MIN)
+ descP->rBufSz = ESOCK_RECV_BUFFER_SIZE_MIN;
+ else
+ descP->rBufSz = bufSz;
SSDBG( descP,
("SOCKET", "esock_setopt_otp_rcvbuf {%d} -> ok"
diff --git a/lib/kernel/src/gen_tcp_socket.erl b/lib/kernel/src/gen_tcp_socket.erl
index cd2cbb6fc4..d1b8f571de 100644
--- a/lib/kernel/src/gen_tcp_socket.erl
+++ b/lib/kernel/src/gen_tcp_socket.erl
@@ -54,6 +54,8 @@
-include("inet_int.hrl").
-include("socket_int.hrl").
+-define(RECV_BUFFER_SIZE_DEFAULT, 8192).
+
%% -define(DBG(T),
%% erlang:display({{self(), ?MODULE, ?LINE, ?FUNCTION_NAME}, T})).
@@ -914,7 +916,7 @@ fdopen(Fd, Opts) when is_integer(Fd), 0 =< Fd, is_list(Opts) ->
-compile({inline, [socket_send/3]}).
socket_send(Socket, Data, Timeout) ->
- Result = socket:send(Socket, Data, Timeout),
+ Result = socket:send(Socket, Data, [], Timeout),
case Result of
{error, {timeout = _Reason, RestData}} = E when is_binary(RestData) ->
%% This is better then closing the socket for every timeout
@@ -953,7 +955,7 @@ socket_send(Socket, Data, Timeout) ->
-compile({inline, [socket_recv/2]}).
socket_recv(Socket, Length) ->
- Result = socket:recv(Socket, Length, nowait),
+ Result = socket:recv(Socket, Length, [], nowait),
%% ?DBG({Socket, Length, Result}),
Result.
@@ -2419,22 +2421,36 @@ handle_recv(
P, D#{buffer := NewBuffer}, [], Data);
0 < Length ->
%% Need to receive more data
- handle_recv_more(P, D, ActionsR);
+ handle_recv_more(P, D, Size - Length, ActionsR);
0 < Size -> % Length == 0
%% Deliver all buffered data
Data = condense_buffer(Buffer),
handle_recv_deliver(P, D#{buffer := <<>>}, ActionsR, Data);
true -> % Length == 0, Size == 0
%% Need to receive more data
- handle_recv_more(P, D, ActionsR)
+ handle_recv_more(P, D, Length, ActionsR)
end;
handle_recv(P, D, ActionsR) ->
handle_recv_packet(P, D, ActionsR).
-handle_recv_more(P, #{buffer := Buffer} = D, ActionsR) ->
- case socket_recv(P#params.socket, 0) of
+handle_recv_more(P, #{buffer := Buffer} = D, Length, ActionsR) ->
+ Size = maps:get({otp,rcvbuf}, D, ?RECV_BUFFER_SIZE_DEFAULT),
+ case
+ socket_recv(
+ P#params.socket,
+ if
+ Size < Length -> Length;
+ true -> 0
+ end)
+ of
{ok, <<Data/binary>>} ->
handle_recv(P, D#{buffer := [Data | Buffer]}, ActionsR);
+ {select, {?select_info(_) = SelectInfo, <<Data/binary>>}} ->
+ %% ?DBG([{select_info, SelectInfo}]),
+ {next_state,
+ #recv{info = SelectInfo},
+ {P, D#{buffer := [Data | Buffer]}},
+ reverse(ActionsR)};
{select, ?select_info(_) = SelectInfo} ->
%% ?DBG([{select_info, SelectInfo}]),
{next_state,
@@ -2459,8 +2475,10 @@ handle_recv_packet(P, D, ActionsR) ->
case decode_packet(D) of
{ok, Decoded, Rest} ->
handle_recv_deliver(P, D#{buffer := Rest}, ActionsR, Decoded);
- {more, _} ->
- handle_recv_more(P, D, ActionsR);
+ {more, undefined} ->
+ handle_recv_more(P, D, 0, ActionsR);
+ {more, Length} ->
+ handle_recv_more(P, D, Length, ActionsR);
{error, Reason} ->
handle_recv_error(
P, D, ActionsR,
@@ -2822,26 +2840,10 @@ state_setopts(_P, D, _State, []) ->
{ok, D};
state_setopts(P, D, State, [{Tag,Val} | Opts]) ->
%% ?DBG([{state, State}, {opt, {Tag,Val}}]),
- SocketOpts = socket_opts(),
- case maps:is_key(Tag, SocketOpts) of
- true ->
- %% options for the 'socket' module
- %%
- case P#params.socket of
- undefined ->
- {{error, closed}, D};
- Socket ->
- case
- socket_setopt(
- Socket, maps:get(Tag, SocketOpts), Val)
- of
- ok ->
- state_setopts(P, D, State, Opts);
- {error, _} = Error ->
- {Error, D}
- end
- end;
- false ->
+ case socket_opts() of
+ #{Tag := SocketOpt} ->
+ state_setopts_socket(P, D, State, Opts, SocketOpt, Val);
+ #{} ->
case maps:is_key(Tag, server_write_opts()) of
%% server options for socket send hence
%% duplicated in {opt,meta}
@@ -2882,6 +2884,51 @@ state_setopts(P, D, State, [{Tag,Val} | Opts]) ->
end
end.
+%% options for the 'socket' module
+%%
+state_setopts_socket(P, D, State, Opts, SocketOpt, Val) ->
+ case P#params.socket of
+ undefined ->
+ {{error, closed}, D};
+ Socket ->
+ case socket_setopt(Socket, SocketOpt, Val) of
+ ok when SocketOpt =:= {otp,rcvbuf} ->
+ Size =
+ case Val of
+ {Count, Sz} -> Count * Sz;
+ Sz when is_integer(Sz) -> Sz
+ end,
+ state_setopts(P, D#{SocketOpt => Size}, State, Opts);
+ ok when SocketOpt =:= {socket,rcvbuf} ->
+ state_setopts_socket_rcvbuf(
+ P, D, State, Opts, Socket, Val);
+ ok ->
+ state_setopts(P, D, State, Opts);
+ {error, _} = Error ->
+ {Error, D}
+ end
+ end.
+
+%% Mimic inet_drv.c for SOCK_STREAM:
+%% when setting 'recbuf', if 'buffer' hasn't been set;
+%% set 'buffer' to the same size
+%%
+state_setopts_socket_rcvbuf(P, D, State, Opts, Socket, Val) ->
+ SocketOpt = {otp,rcvbuf},
+ case D of
+ #{SocketOpt := _} ->
+ case socket_setopt(Socket, SocketOpt, Val) of
+ ok ->
+ state_setopts(P, D, State, Opts);
+ {error, _} = Error ->
+ {Error, D}
+ end;
+ #{} ->
+ state_setopts(P, D, State, Opts)
+ end.
+
+%% Options in the server process D variable
+%%
state_setopts_server(P, D, State, Opts, Tag, Value) ->
case Tag of
active ->
--
2.35.3