File 2181-kernel-esock-test-Add-benchmark-sendv-test-case.patch of Package erlang
From 735cd453bc1fbeb19704023aa2b07fd8a2a8d359 Mon Sep 17 00:00:00 2001
From: Micael Karlberg <bmk@erlang.org>
Date: Tue, 8 Apr 2025 17:30:15 +0200
Subject: [PATCH 01/10] [kernel|esock|test] Add benchmark (sendv) test case
---
lib/kernel/test/socket_traffic_SUITE.erl | 347 ++++++++++++++++++++++-
1 file changed, 338 insertions(+), 9 deletions(-)
diff --git a/lib/kernel/test/socket_traffic_SUITE.erl b/lib/kernel/test/socket_traffic_SUITE.erl
index 9bdeffd0eb..7ae3c19c57 100644
--- a/lib/kernel/test/socket_traffic_SUITE.erl
+++ b/lib/kernel/test/socket_traffic_SUITE.erl
@@ -26,6 +26,7 @@
%% ESOCK_TEST_TRAFFIC_COUNTERS: include
%% ESOCK_TEST_TRAFFIC_CHUNKS: include
%% ESOCK_TEST_TRAFFIC_PING_PONG: include
+%% ESOCK_TEST_TRAFFIC_BENCH: exclude
%%
%% Variable that controls "verbosity" of the test case(s):
%%
@@ -33,22 +34,24 @@
%%
%% Run the entire test suite:
-%% ts:run(kernel, socket_SUITE, [batch]).
+%% ts:run(kernel, socket_traffic_SUITE, [batch]).
%%
%% Run a specific group:
-%% ts:run(kernel, socket_SUITE, {group, foo}, [batch]).
+%% ts:run(kernel, socket_traffic_SUITE, {group, foo}, [batch]).
%%
%% Run a specific test case:
-%% ts:run(kernel, socket_SUITE, foo, [batch]).
+%% ts:run(kernel, socket_traffic_SUITE, foo, [batch]).
%%
%% (cd /mnt/c/$LOCAL_TESTS/26/kernel_test/ && $ERL_TOP/bin/win32/erl.exe -sname kernel-26-tester -pa c:$LOCAL_TESTS/26/test_server)
%% application:set_env(kernel, test_inet_backends, true).
+%%
%% S = fun() -> ts:run(kernel, socket_SUITE, [batch]) end.
%% S = fun(SUITE) -> ts:run(kernel, SUITE, [batch]) end.
-%% S = fun() -> ct:run_test([{suite, socket_SUITE}]) end.
-%% S = fun(SUITE) -> ct:run_test([{suite, SUITE}]) end.
%% G = fun(GROUP) -> ts:run(kernel, socket_SUITE, {group, GROUP}, [batch]) end.
%% G = fun(SUITE, GROUP) -> ts:run(kernel, SUITE, {group, GROUP}, [batch]) end.
+%%
+%% S = fun() -> ct:run_test([{suite, socket_SUITE}]) end.
+%% S = fun(SUITE) -> ct:run_test([{suite, SUITE}]) end.
%% G = fun(GROUP) -> ct:run_test([{suite, socket_SUITE}, {group, GROUP}]) end.
%% G = fun(SUITE, GROUP) -> ct:run_test([{suite, SUITE}, {group, GROUP}]) end.
%% T = fun(TC) -> ts:run(kernel, socket_SUITE, TC, [batch]) end.
@@ -76,7 +79,7 @@
%% Test cases
-export([
- %% *** Traffic ***
+ %% *** Counters ***
traffic_send_and_recv_counters_tcp4/1,
traffic_send_and_recv_counters_tcp6/1,
traffic_send_and_recv_counters_tcpL/1,
@@ -94,12 +97,14 @@
traffic_sendmsg_and_recvmsg_counters_udp6/1,
traffic_sendmsg_and_recvmsg_counters_udpL/1,
+ %% *** Chunks ***
traffic_send_and_recv_chunks_tcp4/1,
traffic_send_and_recv_chunks_tcp6/1,
traffic_send_and_recv_chunks_tcpL/1,
traffic_send_and_recv_chunks_sctp4/1,
traffic_send_and_recv_chunks_sctp6/1,
+ %% *** Ping Pong ***
traffic_ping_pong_small_send_and_recv_tcp4/1,
traffic_ping_pong_small_send_and_recv_tcp6/1,
traffic_ping_pong_small_send_and_recv_tcpL/1,
@@ -144,7 +149,10 @@
traffic_ping_pong_small_sendmsg_and_recvmsg_udpL/1,
traffic_ping_pong_medium_sendmsg_and_recvmsg_udp4/1,
traffic_ping_pong_medium_sendmsg_and_recvmsg_udp6/1,
- traffic_ping_pong_medium_sendmsg_and_recvmsg_udpL/1
+ traffic_ping_pong_medium_sendmsg_and_recvmsg_udpL/1,
+
+ %% *** Bench ***
+ traffic_bench_sendv_and_recv_tcp4/1
]).
@@ -175,9 +183,12 @@ suite() ->
{timetrap, {minutes,1}}].
all() ->
- Groups = [{counters, "ESOCK_TEST_TRAFFIC_COUNTERS", include},
+ Groups = [
+ {counters, "ESOCK_TEST_TRAFFIC_COUNTERS", include},
{chunks, "ESOCK_TEST_TRAFFIC_CHUNKS", include},
- {ping_pong, "ESOCK_TEST_TRAFFIC_PING_PONG", include}],
+ {ping_pong, "ESOCK_TEST_TRAFFIC_PING_PONG", include},
+ {bench, "ESOCK_TEST_TRAFFIC_BENCH", exclude}
+ ],
[use_group(Group, Env, Default) || {Group, Env, Default} <- Groups].
use_group(_Group, undefined, exclude) ->
@@ -206,6 +217,7 @@ groups() ->
[{counters, [], traffic_counters_cases()},
{chunks, [], traffic_chunks_cases()},
{ping_pong, [], traffic_ping_pong_cases()},
+ {bench, [], traffic_bench_cases()},
{pp_send_recv, [], traffic_pp_send_recv_cases()},
{pp_sendto_recvfrom, [], traffic_pp_sendto_recvfrom_cases()},
{pp_sendmsg_recvmsg, [], traffic_pp_sendmsg_recvmsg_cases()}
@@ -248,6 +260,16 @@ traffic_ping_pong_cases() ->
{group, pp_sendmsg_recvmsg}
].
+traffic_bench_cases() ->
+ [
+ traffic_bench_sendv_and_recv_tcp4%% ,
+ %% traffic_bench_send_recv_tcp4,
+ %% traffic_bench_sendv_recv_tcp6,
+ %% traffic_bench_send_recv_tcp6,
+ %% traffic_bench_sendv_recv_tcpL,
+ %% traffic_bench_send_recv_tcpL
+ ].
+
traffic_pp_send_recv_cases() ->
[
traffic_ping_pong_small_send_and_recv_tcp4,
@@ -7002,6 +7024,311 @@ tpp_udp_sock_close(Sock, Path) ->
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+-define(TB_IOV_CHUNK(Sz,V), list_to_binary(lists:duplicate((Sz), (V)))).
+tb_iov() ->
+ IOV0 =
+ [
+ ?TB_IOV_CHUNK(8, 16#01),
+ ?TB_IOV_CHUNK(16*1024, 16#02),
+ ?TB_IOV_CHUNK(256, 16#03),
+ ?TB_IOV_CHUNK(8*1024, 16#04),
+ ?TB_IOV_CHUNK(512, 16#05),
+ ?TB_IOV_CHUNK(1*1024, 16#06),
+ ?TB_IOV_CHUNK(1*1024, 16#07),
+ ?TB_IOV_CHUNK(1*1024, 16#08),
+ ?TB_IOV_CHUNK(1*1024, 16#09),
+ ?TB_IOV_CHUNK(1*1024, 16#0A),
+ ?TB_IOV_CHUNK(1*1024, 16#0B),
+ ?TB_IOV_CHUNK(1*1024, 16#0C),
+ ?TB_IOV_CHUNK(16, 16#0D),
+ ?TB_IOV_CHUNK(256, 16#0E),
+ ?TB_IOV_CHUNK(16*1024, 16#0F),
+ ?TB_IOV_CHUNK(32, 16#10),
+ ?TB_IOV_CHUNK(8, 16#11),
+ ?TB_IOV_CHUNK(128, 16#12),
+ ?TB_IOV_CHUNK(2*1024, 16#13),
+ ?TB_IOV_CHUNK(16, 16#14),
+ ?TB_IOV_CHUNK(32, 16#15),
+ ?TB_IOV_CHUNK(64, 16#16),
+ ?TB_IOV_CHUNK(4*1024, 16#17)
+ ],
+ IOV1 = lists:flatten([begin
+ Sz = byte_size(B),
+ [<<Sz:32/integer>>, B]
+ end || B <- IOV0]),
+ TSz = iolist_size(IOV1),
+ [<<TSz:32/integer>> | IOV1].
+
+traffic_bench_sendv_and_recv_tcp4(Config) when is_list(Config) ->
+ ?TT(?MINS(2)), %% Test *should* run for 60 secs
+ IOV = tb_iov(),
+ Send = fun(S, Data) when is_list(Data) ->
+ socket:sendv(S, Data)
+ end,
+ tc_try(?FUNCTION_NAME,
+ fun() -> has_support_ipv4() end,
+ fun() ->
+ InitState = #{domain => inet,
+ send => Send,
+ iov => IOV,
+ run_time => ?MINS(1)},
+ do_traffic_bench_send_and_recv(InitState)
+ end).
+
+do_traffic_bench_send_and_recv(#{run_time := RTime} = InitState) ->
+ ?SEV_IPRINT("[ctrl] start server"),
+ {PortNumber, Server} = tb_server_start(InitState),
+ ?SEV_IPRINT("[ctrl] start client"),
+ Client = tb_client_start(InitState, PortNumber),
+ TRef = erlang:start_timer(RTime, self(), tb_timeout),
+ ?SEV_IPRINT("[ctrl] await completion"),
+ tb_await_completion(Server, Client, TRef).
+
+tb_await_completion({ServerPid, ServerMRef} = Server,
+ {ClientPid, ClientMRef} = Client,
+ TRef) ->
+ receive
+ {timeout, TRef, tb_timeout} ->
+ ?SEV_IPRINT("[ctrl] done - begin termination"),
+ ClientPid ! {self(), stop},
+ tb_await_termination(Server, Client);
+ {'DOWN', ClientMRef, process, ClientPid, ClientReason} ->
+ ?SEV_EPRINT("[ctrl] received unexpected client down: "
+ "~n ~p", [ClientReason]),
+ erlang:cancel_timer(TRef),
+ exit(ClientPid, kill),
+ exit(ClientReason);
+ {'DOWN', ServerMRef, process, ServerPid, ServerReason} ->
+ ?SEV_EPRINT("[ctrl] received unexpected server down: "
+ "~n ~p", [ServerReason]),
+ erlang:cancel_timer(TRef),
+ exit(ClientPid, kill),
+ exit(ServerReason)
+ end.
+
+tb_await_termination(Server, Client) ->
+ tb_await_termination(Server, Client, undefined).
+
+tb_await_termination({ServerPid, ServerMRef} = Server,
+ {ClientPid, ClientMRef} = Client,
+ undefined = Comment) ->
+ receive
+ {'DOWN', ClientMRef, process, ClientPid, {done, {Exchange, UnitStr}}} ->
+ ?SEV_IPRINT("[ctrl] "
+ "Received (expected) down from client with result"),
+ tb_await_termination(Server, undefined,
+ {comment, ?F("~p ~s", [Exchange, UnitStr])});
+ {'DOWN', ClientMRef, process, ClientPid, ClientReason} ->
+ ?SEV_EPRINT("[ctrl] unexpected termination from client: "
+ "~n ~p", [ClientReason]),
+ exit(ServerPid, kill),
+ exit(ClientReason);
+ {'DOWN', ServerMRef, process, ServerPid, ServerReason} ->
+ ?SEV_IPRINT("[ctrl] Received down from server: "
+ "~n ~p", [ServerReason]),
+ tb_await_termination(undefined, Client, Comment)
+ end;
+tb_await_termination({ServerPid, ServerMRef} = _Server,
+ undefined,
+ Result) ->
+ receive
+ {'DOWN', ServerMRef, process, ServerPid, _} ->
+ ?SEV_IPRINT("[ctrl] Received (expected) down from server"),
+ Result
+ end;
+tb_await_termination(undefined,
+ {ClientPid, ClientMRef} = _Client,
+ undefined = _Result) ->
+ receive
+ {'DOWN', ClientMRef, process, ClientPid, {done, {Exchange, UnitStr}}} ->
+ ?SEV_IPRINT("[ctrl] Received down from client"),
+ {comment, ?F("~p ~s", [Exchange, UnitStr])};
+ {'DOWN', ClientMRef, process, ClientPid, ClientReason} ->
+ ?SEV_EPRINT("[ctrl] unexpected termination from client: "
+ "~n ~p", [ClientReason]),
+ exit(ClientReason)
+ end.
+
+
+tb_server_start(#{domain := Fam,
+ send := Send}) ->
+ Self = self(),
+ Server = {Pid, MRef} =
+ spawn_monitor(fun() ->
+ tb_server_init(#{parent => Self,
+ domain => Fam,
+ send => Send})
+ end),
+ receive
+ {Pid, PortNumber} ->
+ {PortNumber, Server};
+ {'DOWN', MRef, process, Pid, Info} ->
+ ?SEV_EPRINT("[ctrl] server start failure: "
+ "~n ~p", [Info]),
+ exit({tb_server_start, Info})
+ end.
+
+tb_decode(Bin) when is_binary(Bin) ->
+ tb_decode(Bin, []).
+
+tb_decode(<<>>, Acc) ->
+ lists:reverse(Acc);
+tb_decode(<<Sz:32/integer, Data:Sz/binary, Rest/binary>>, Acc) ->
+ tb_decode(Rest, [Data, <<Sz:32/integer>> | Acc]).
+
+tb_server_init(#{parent := Pid, domain := Fam} = State) ->
+ SA = which_local_socket_addr(Fam),
+ {ok, LS} = socket:open(Fam, stream),
+ ok = socket:bind(LS, SA#{port => 0}),
+ ok = socket:listen(LS),
+ {ok, #{port := Port}} = socket:sockname(LS),
+ Pid ! {self(), Port},
+ {ok, AS} = socket:accept(LS),
+ tb_server_loop(State#{listen => LS,
+ accept => AS}).
+
+%% Make it simple: The data begins with a 4 byte size, so read that,
+%% and then that amount of data.
+tb_server_loop(#{listen := LS, accept := AS, send := Send} = State) ->
+ case socket:recv(AS, 4) of
+ {ok, <<Sz:32/integer>> = SzBin} ->
+ case socket:recv(AS, Sz) of
+ {ok, Data} ->
+ IOV = tb_decode(Data),
+ case Send(AS, [SzBin | IOV]) of
+ ok ->
+ tb_server_loop(State);
+ {error, SReason} ->
+ ?SEV_EPRINT("[server] unexpected send error:"
+ "~n ~p", [SReason]),
+ (catch socket:close(LS)),
+ (catch socket:close(AS)),
+ exit({tb_server_send, SReason})
+ end;
+ {error, R2Reason} ->
+ ?SEV_EPRINT("[server] unexpected read (data) error:"
+ "~n ~p", [R2Reason]),
+ (catch socket:close(LS)),
+ (catch socket:close(AS)),
+ exit({tb_server_recv2, R2Reason})
+ end;
+ {error, closed} ->
+ ?SEV_IPRINT("[server] socket closed => terminating"),
+ (catch socket:close(LS)),
+ (catch socket:close(AS)),
+ exit(normal);
+ {error, R1Reason} ->
+ ?SEV_EPRINT("[server] unexpected read (sz) error:"
+ "~n ~p", [R1Reason]),
+ (catch socket:close(LS)),
+ (catch socket:close(AS)),
+ exit({tb_server_recv1, R1Reason})
+ end.
+
+
+tb_client_start(#{domain := Fam,
+ send := Send,
+ iov := IOV}, PortNumber) ->
+ Self = self(),
+ Client = {Pid, MRef} =
+ spawn_monitor(fun() ->
+ tb_client_init(#{parent => Self,
+ domain => Fam,
+ send => Send,
+ iov => IOV,
+ port => PortNumber})
+ end),
+ receive
+ {Pid, ok} ->
+ Client;
+ {'DOWN', MRef, process, Pid, Info} ->
+ ?SEV_EPRINT("client start failure: "
+ "~n ~p", [Info]),
+ exit({tb_client_start, Info})
+ end.
+
+tb_client_init(#{parent := Pid,
+ domain := Fam,
+ port := Port,
+ send := Send,
+ iov := IOV}) ->
+ SA = which_local_socket_addr(Fam),
+ {ok, CS} = socket:open(Fam, stream),
+ ok = socket:bind(CS, SA#{port => 0}),
+ ok = socket:connect(CS, SA#{port => Port}),
+ Pid ! {self(), ok},
+ tb_client_loop(Pid, CS, Send, IOV, ts(), 0, 0).
+
+tb_client_loop(Pid, Sock, Send, Data0, TStart, ARcv0, N0) ->
+ case Send(Sock, Data0) of
+ ok ->
+ case socket:recv(Sock, 4) of
+ {ok, <<Sz:32/integer>> = SzBin} ->
+ case socket:recv(Sock, Sz) of
+ {ok, Data} ->
+ IOV0 = tb_decode(Data),
+ case tb_client_is_done(Pid) of
+ true ->
+ TStop = ts(),
+ TDiff = TStop - TStart,
+ ARcv = ARcv0 + 4 + byte_size(Data),
+ {Exchange, UnitStr} = Res =
+ case ARcv div TDiff of
+ E when (E > 1024) ->
+ {E div 1024,
+ "kb/msec"};
+ E ->
+ {E, "b/msec"}
+ end,
+ N = N0 + 1,
+ ?SEV_IPRINT("[client] test done:"
+ "~n TDiff: ~w msec"
+ "~n Data: ~w bytes"
+ "~n Exchange: ~w ~s"
+ "~n Iterations: ~w",
+ [TDiff, ARcv,
+ Exchange, UnitStr,
+ N]),
+ (catch socket:close(Sock)),
+ exit({done, Res});
+ false ->
+ IOV = [SzBin | IOV0],
+ tb_client_loop(Pid,
+ Sock, Send, IOV,
+ TStart,
+ ARcv0 + 4 + byte_size(Data),
+ N0+1)
+ end;
+ {error, R2Reason} ->
+ ?SEV_EPRINT("[client] unexpected read (data) error:"
+ "~n ~p", [R2Reason]),
+ (catch socket:close(Sock)),
+ exit({tb_client_recv2, R2Reason})
+ end;
+ {error, R1Reason} ->
+ ?SEV_EPRINT("[client] unexpected read (sz) error:"
+ "~n ~p", [R1Reason]),
+ (catch socket:close(Sock)),
+ exit({tb_client_recv1, R1Reason})
+ end;
+ {error, SReason} ->
+ ?SEV_EPRINT("[client] unexpected send error:"
+ "~n ~p", [SReason]),
+ (catch socket:close(Sock)),
+ exit({tb_client_send, SReason})
+ end.
+
+tb_client_is_done(Pid) ->
+ receive
+ {Pid, stop} ->
+ true
+ after 0 ->
+ false
+end.
+
+
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
sock_bind(Sock, LSA) ->
@@ -7293,3 +7620,5 @@ i(F, A) ->
io:format(user, FStr ++ "~n", []),
io:format(FStr, []).
+ts() ->
+ erlang:system_time(millisecond).
--
2.43.0