Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:26
erlang
3031-Chunk-I-O-vectors-for-send.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 3031-Chunk-I-O-vectors-for-send.patch of Package erlang
From 9b7a7c1933cc173be0f8b826218c2de7533d78e4 Mon Sep 17 00:00:00 2001 From: Raimo Niskanen <raimo@erlang.org> Date: Thu, 8 Feb 2024 20:16:53 +0100 Subject: [PATCH 01/22] Chunk I/O vectors for send --- lib/kernel/test/gen_tcp_socket_SUITE.erl | 205 ++++++++++++++--------- 1 file changed, 122 insertions(+), 83 deletions(-) diff --git a/lib/kernel/test/gen_tcp_socket_SUITE.erl b/lib/kernel/test/gen_tcp_socket_SUITE.erl index ce29ad7e03..321b30da66 100644 --- a/lib/kernel/test/gen_tcp_socket_SUITE.erl +++ b/lib/kernel/test/gen_tcp_socket_SUITE.erl @@ -83,11 +83,12 @@ init_per_group(Nm, Config) -> smoketest -> [{burden,0} | Config]; benchmark -> [{burden,2} | Config]; %% - dev -> init_per_group(small, [{burden,3} | Config]); - dev_inet -> [{backend,inet} | Config]; - dev_socket -> [{backend,socket} | Config]; - dev_direct -> [{backend,direct} | Config]; + dev -> init_per_group(dev_size, [{burden,0} | Config]); + dev_inet -> init_per_group(inet, Config); + dev_socket -> init_per_group(socket, Config); + dev_direct -> init_per_group(direct, Config); %% + dev_size -> init_per_group_size(6, 8, Config); small -> init_per_group_size(7, ?NORM_EXP - 7 - 3, Config); medium -> init_per_group_size(10, ?NORM_EXP - 10 - 1, Config); large -> init_per_group_size(15, ?NORM_EXP - 15, Config); @@ -101,63 +102,62 @@ init_per_group(Nm, Config) -> end. init_per_group_size(K, L, Config) -> - %% 2^(K+1) = Max packet size -> 2^K = Mean packet size - %% 2^L = Number of packets {_, Burden} = proplists:lookup(burden, Config), M = L + Burden, - StopTag = spawn_testdata_server(K+1, (1 bsl M)), - %% + N = 1 bsl M, + %% 2^K = Mean packet size + %% 2^(K+1) = Max packet size + %% N = Number of packets; + StopTag = spawn_testdata_server(K+1, N), + %% {MeanSize, SizeSuffix} = size_and_suffix(1 bsl K), - {PacketCount, CountSuffix} = size_and_suffix(1 bsl M), + {PacketCount, CountSuffix} = size_and_suffix(N), ct:pal("Packet mean size: ~w ~sByte, packet count: ~w ~s", [MeanSize, SizeSuffix, PacketCount, CountSuffix]), %% - [{testdata_server,StopTag}, - {testdata_size, {K, M}} | Config]. - -size_and_suffix(P) -> - size_and_suffix(P, 1, ["", "K", "M", "T", "Z"]). -%% -size_and_suffix(P, _, []) -> - {P, ""}; -size_and_suffix(P, Q, Suffixes) -> - Q_1 = Q bsl 10, - if - is_integer(P), Q_1 =< P, P band (Q_1 - 1) =:= 0 -> - size_and_suffix(P, Q_1, tl(Suffixes)); - Q bsl 13 =< P -> - size_and_suffix(P, Q_1, tl(Suffixes)); - true -> - {round(P / Q), hd(Suffixes)} - end. + [{testdata_server, StopTag}, + {testdata_size, {K, M}} | Config]. end_per_group(Nm, Config) -> case Nm of - dev -> end_per_group(benchmark, end_per_group(huge, Config)); + dev -> proplists:delete(burden, end_per_group(dev_size, Config)); dev_inet -> end_per_group(inet, Config); dev_socket -> end_per_group(socket, Config); dev_direct -> end_per_group(direct, Config); _ when Nm =:= smoketest; Nm =:= dev; Nm =:= benchmark -> proplists:delete(burden, Config); - _ when Nm =:= small; + _ when Nm =:= dev_size; + Nm =:= small; Nm =:= medium; Nm =:= large; Nm =:= huge -> - case proplists:lookup(testdata_server, Config) of - {_, StopTag} -> - stop_testdata_server(StopTag), - proplists:delete( - testdata_server, - proplists:delete(testdata_size, Config)); - none -> - Config - end; + {_, StopTag} = proplists:lookup(testdata_server, Config), + stop_testdata_server(StopTag), + proplists:delete( + testdata_server, proplists:delete(testdata_size, Config)); _ when Nm =:= inet; Nm =:= socket; Nm =:= direct -> proplists:delete(backend, Config) end. + +size_and_suffix(P) -> + size_and_suffix(P, 1, ["", "K", "M", "T", "Z"]). +%% +size_and_suffix(P, _, []) -> + {P, ""}; +size_and_suffix(P, Q, Suffixes) -> + Q_1 = Q bsl 10, + if + is_integer(P), Q_1 =< P, P band (Q_1 - 1) =:= 0 -> + size_and_suffix(P, Q_1, tl(Suffixes)); + Q bsl 13 =< P -> + size_and_suffix(P, Q_1, tl(Suffixes)); + true -> + {round(P / Q), hd(Suffixes)} + end. + %% ------- %% Testcases @@ -191,20 +191,24 @@ tc2active(TC) -> end. xfer(Config, TC) when is_list(Config) -> - {_, Backend} = proplists:lookup(backend, Config), - {_, TestdataSize} = proplists:lookup(testdata_size, Config), - {_, BindAddr} = proplists:lookup(bind_addr, Config), - run_xfer(TC, Backend, TestdataSize, BindAddr, testdata()). + {_, Backend} = proplists:lookup(backend, Config), + {_, BindAddr} = proplists:lookup(bind_addr, Config), + {_, TestdataSize} = proplists:lookup(testdata_size, Config), + run_xfer(TC, Backend, BindAddr, TestdataSize, testdata()). run_xfer( - TC, Backend, {K,_} = TestdataSize, BindAddr, {Iovec, Sizes, TotalSize}) -> + TC, Backend, BindAddr, {K, _} = TestdataSize, + #{iovecs := Iovecs, + packet_sizes := PacketSizes, + total_size := TotalSize}) -> + %% Parent = self(), Tag = make_ref(), {Sender, Mref} = spawn_opt( fun () -> try - %% Send an iovec efficiently + %% Send iovecs efficiently {ok, L} = gen_tcp:listen( 0, [{ifaddr,BindAddr}, {sndbuf, 2 bsl K}]), @@ -214,7 +218,7 @@ run_xfer( Parent ! {Tag, Sockaddr}, {ok, A} = gen_tcp:accept(L), ok = gen_tcp:close(L), - ok = gen_tcp:send(A, Iovec), + send_loop(A, Iovecs), ok = gen_tcp:close(A) catch Class : Reason : Stacktrace -> ct:pal( @@ -228,7 +232,7 @@ run_xfer( {ok, C} = connect(Backend, Sockaddr, TC), try T1 = erlang:monotonic_time(), - {ok, TotalSize} = recv_loop(C, Sizes, TC), + assert({ok, TotalSize}, recv_loop(C, PacketSizes, TC)), T2 = erlang:monotonic_time(), T = erlang:convert_time_unit(T2 - T1, native, millisecond), report_MByte_s(Backend, TestdataSize, TC, TotalSize, T) @@ -251,6 +255,13 @@ run_xfer( error({sender_died,Reason}) end. +send_loop(_S, []) -> + ok; +send_loop(S, [Iovec | Iovecs]) -> + ok = gen_tcp:send(S, Iovec), + send_loop(S, Iovecs). + + connect(direct, Sockaddr, _) -> {ok, S} = socket:open(?DOMAIN, stream), ok = socket:setopt(S, {socket,rcvbuf}, ?BUFSIZE), @@ -282,9 +293,9 @@ close(_, S) -> gen_tcp:close(S). -report_MByte_s(Backend, {K, L}, TC, Size, Time) -> +report_MByte_s(Backend, {K, M}, TC, Size, Time) -> ct:log("Size: ~w. Time: ~w.", [Size, Time]), - Name = io_lib:format("~w 2^(~w+~w)-~w", [Backend, K, L, TC]), + Name = io_lib:format("~w 2^(~w+~w)-~w", [Backend, K, M, TC]), {Value, Suffix} = size_and_suffix(Size * 1000 / Time), report(Name, Value, Suffix++"Byte/s"). @@ -339,37 +350,59 @@ testdata() -> end. generate_testdata(K, N) -> - Iovec = generate_iovec(K, N), - PacketSizes = [byte_size(Bin) - 4 || Bin <- Iovec], - TotalSize = - lists:foldl( - fun (Bin, Size) -> - Size + byte_size(Bin) - end, 0, Iovec), - {Iovec, PacketSizes, TotalSize}. - -generate_iovec(K, N) -> - DataBlock = create_data_block(1 bsl K), - Offsets = generate_offsets(1 bsl K, N), - generate_iovec_1(Offsets, DataBlock). + %% 2^K = Max packet size + %% N = Number of packets + %% + Offsets = generate_offsets(K, N), + DataBlock = create_data_block(1 bsl K), + Iovecs = generate_iovecs(Offsets, DataBlock, 1000), + DataBlockSize = byte_size(DataBlock), + PacketSizes = packet_sizes(Offsets, DataBlockSize), + TotalSize = total_size(Offsets, DataBlockSize), + #{iovecs => Iovecs, + packet_sizes => PacketSizes, + total_size => TotalSize}. + + +packet_sizes(Offsets, DataBlockSize) -> + [DataBlockSize - Offset - 4 || Offset <- Offsets]. + +total_size([], _DataBlockSize) -> 0; +total_size([Offset | Offsets], DataBlockSize) -> + DataBlockSize - Offset + total_size(Offsets, DataBlockSize). + +generate_iovecs([], _DataBlock, _ChunkSize) -> []; +generate_iovecs(Offsets, DataBlock, ChunkSize) -> + {Offsets_1, Iovecs} = generate_iovec(Offsets, DataBlock, ChunkSize), + [Iovecs | generate_iovecs(Offsets_1, DataBlock, ChunkSize)]. + +generate_iovec(Offsets, DataBlock, N) -> + generate_iovec(Offsets, DataBlock, N, []). %% -generate_iovec_1([], _) -> []; -generate_iovec_1([Offset | Offsets], DataBlock) -> +generate_iovec(Offsets, _DataBlock, N, Acc) + when N =:= 0; + Offsets =:= [], is_integer(N), 0 < N -> + {Offsets, lists:reverse(Acc)}; +generate_iovec([Offset | Offsets], DataBlock, N, Acc) + when is_integer(N), 0 < N -> <<_:Offset/binary, Bin/binary>> = DataBlock, - [Bin | generate_iovec_1(Offsets, DataBlock)]. - -%% Generate N offsets in the range 0 .. 2^K -1, == 0 mod 4 -%% -generate_offsets(M, N) -> generate_offsets(M, N, []). + generate_iovec(Offsets, DataBlock, N - 1, [Bin | Acc]). + +%% 0 =< Offset < 2^K - 1 +%% Offset mod 4 == 0 +generate_offsets(K, N) when is_integer(K), 6 =< K -> + %% 3 here corresponds to the 8 iterations below, + %% and 2 corresponds to the bsl 2 that makes mod 4 == 0 + Range = 1 bsl (K - (3 + 2)), + generate_offsets(Range, N, []). %% -generate_offsets(_, 0, Offsets) -> Offsets; -generate_offsets(M, N, Offsets) -> - %% 0 =< Offset < 2^K - 1 - %% Offset mod 4 == 0 - Offset = (rand_sum(M bsr 3, 8) - 8) band -4, - generate_offsets(M, N - 1, [Offset | Offsets]). - -%% Sum I number of rand values range N +generate_offsets(_Range, 0, Offsets) -> + Offsets; +generate_offsets(Range, N, Offsets) when is_integer(N), 0 < N -> + Offset = (rand_sum(Range, 8) - 8) bsl 2, + generate_offsets(Range, N - 1, [Offset | Offsets]). + +%% Sum I number of rand:uniform values range N %% rand_sum(_, 0) -> 0; rand_sum(N, I) -> rand:uniform(N) + rand_sum(N, I - 1). @@ -480,7 +513,7 @@ recv_loop_packet_buf(S, Sizes, M, Recv, Buf) -> %% case Buf of <<PacketSize:32, _:PacketSize/binary, Rest/binary>> -> - PacketSize = hd(Sizes), + assert(PacketSize, hd(Sizes)), NewM = M + 4 + PacketSize, recv_loop_packet_buf(S, tl(Sizes), NewM, Recv, Rest); <<PacketSize:32, Start/binary>> -> @@ -494,9 +527,9 @@ recv_loop_packet_buf(S, Sizes, M, Recv, Buf) -> case Recv(S, RestSize) of {ok, Rest} when byte_size(Rest) =:= RestSize-> Packet = <<Start/binary, Rest/binary>>, - PacketSize = byte_size(Packet), + assert(PacketSize, byte_size(Packet)), NewM = M + 4 + PacketSize, - PacketSize = hd(Sizes), + assert(PacketSize, hd(Sizes)), recv_loop_packet_buf(S, tl(Sizes), NewM, Recv); {error, _} = Error -> Error @@ -528,6 +561,8 @@ recv_loop_packet_cheat(S, [Size|Sizes], M, Recv) -> case Recv(S, 4 + Size) of {ok, <<Size:32, _:Size/binary>>} -> recv_loop_packet_cheat(S, Sizes, M + 4 + Size, Recv); + {ok, Data} -> + {error,{bad_packet,Data,Size}}; Err1 -> Err1 end. @@ -539,7 +574,7 @@ recv_loop_active_false(S, Sizes, M, Recv) -> case Recv(S) of {ok, Data} -> DataSize = byte_size(Data), - DataSize = hd(Sizes), + assert(DataSize, hd(Sizes)), recv_loop_active_false(S, tl(Sizes), M + DataSize + 4, Recv); {error, closed} -> [] = Sizes, @@ -552,7 +587,7 @@ recv_loop_active_true(S, Sizes, M) -> receive {tcp, S, Data} -> DataSize = byte_size(Data), - DataSize = hd(Sizes), + assert(DataSize, hd(Sizes)), recv_loop_active_true(S, tl(Sizes), M + DataSize + 4); {tcp_closed, S} -> Sizes = [], @@ -565,7 +600,7 @@ recv_loop_active_once(S, Sizes, M) -> receive {tcp, S, Data} -> DataSize = byte_size(Data), - DataSize = hd(Sizes), + assert(DataSize, hd(Sizes)), ok = inet:setopts(S, [{active,once}]), recv_loop_active_once(S, tl(Sizes), M + DataSize + 4); {tcp_closed, S} -> @@ -579,7 +614,7 @@ recv_loop_active_n(S, Sizes, M, N) -> receive {tcp, S, Data} -> DataSize = byte_size(Data), - DataSize = hd(Sizes), + assert(DataSize, hd(Sizes)), recv_loop_active_n(S, tl(Sizes), M + DataSize + 4, N); {tcp_passive, S} -> ok = inet:setopts(S, [{active,N}]), @@ -603,3 +638,7 @@ report(Name, Value, Suffix) -> term_to_string(Term) -> unicode:characters_to_list(io_lib:write(Term, [{encoding, unicode}])). + +-compile({inline, [assert/2]}). +assert(X, X) -> ok; +assert(X, Y) -> error({assert, X, Y}). -- 2.35.3
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor