File 4140-Improve-output-xfer-loop.patch of Package erlang
From 575a02e6867bcb808e6326df48ae27ccd108d066 Mon Sep 17 00:00:00 2001
From: Raimo Niskanen <raimo@erlang.org>
Date: Thu, 20 Oct 2022 11:25:53 +0200
Subject: [PATCH 20/27] Improve output xfer loop
---
lib/ssl/test/inet_crypto_dist.erl | 153 ++++++++++++++++--------------
1 file changed, 83 insertions(+), 70 deletions(-)
diff --git a/lib/ssl/test/inet_crypto_dist.erl b/lib/ssl/test/inet_crypto_dist.erl
index 7640464664..8c17bbe4e5 100644
--- a/lib/ssl/test/inet_crypto_dist.erl
+++ b/lib/ssl/test/inet_crypto_dist.erl
@@ -1157,29 +1157,31 @@ handshake(
end;
%%
{?MODULE, From, {send, Data}} ->
- case
+ {SendParams_1, SendSeq_1, Result} =
encrypt_and_send_chunk(
- SendParams, SendSeq, [?HANDSHAKE_CHUNK, Data])
- of
- {SendParams_1, SendSeq_1, ok} ->
+ SendParams, SendSeq, [?HANDSHAKE_CHUNK, Data]),
+ if
+ Result =:= ok ->
reply(From, ok),
handshake(
SendParams_1, SendSeq_1, RecvParams, RecvSeq,
Controller);
- {_, _, Error} ->
+ true ->
reply(From, {error, closed}),
- death_row({send, trace(Error)})
+ death_row({send, trace(Result)})
end;
{?MODULE, From, recv} ->
- case recv_and_decrypt_chunk(RecvParams, RecvSeq) of
- {RecvParams_1, RecvSeq_1, {ok, _} = Reply} ->
- reply(From, Reply),
+ {RecvParams_1, RecvSeq_1, Result} =
+ recv_and_decrypt_chunk(RecvParams, RecvSeq),
+ case Result of
+ {ok, _} ->
+ reply(From, Result),
handshake(
SendParams, SendSeq, RecvParams_1, RecvSeq_1,
Controller);
- {_, _, Error} ->
- reply(From, Error),
- death_row({recv, trace(Error)})
+ {error, _} ->
+ reply(From, Result),
+ death_row({recv, trace(Result)})
end;
{?MODULE, From, peername} ->
reply(From, inet:peername(Socket)),
@@ -1211,6 +1213,7 @@ recv_and_decrypt_chunk(#params{socket = Socket} = RecvParams, RecvSeq) ->
%%
%% The game here is to flush all dist_data and dist_tick messages,
%% prioritize dist_data over dist_tick, and to not use selective receive
+%% because that would hurt performance during overload
output_handler(Params, Seq) ->
receive
@@ -1221,8 +1224,7 @@ output_handler(Params, Seq) ->
dist_tick ->
output_handler_tick(Params, Seq);
_ when Msg =:= Params#params.rekey_msg ->
- Params_1 = output_handler_rekey(Params, Seq),
- output_handler(Params_1, 0);
+ output_handler_rekey(Params, Seq);
_ ->
%% Ignore
_ = trace(Msg),
@@ -1230,6 +1232,7 @@ output_handler(Params, Seq) ->
end
end.
+%% State: we have received at least one dist_data message
output_handler_data(Params, Seq) ->
receive
Msg ->
@@ -1239,21 +1242,19 @@ output_handler_data(Params, Seq) ->
dist_tick ->
output_handler_data(Params, Seq);
_ when Msg =:= Params#params.rekey_msg ->
- Params_1 = output_handler_rekey(Params, Seq),
- output_handler_data(Params_1, 0);
+ output_handler_rekey(Params, Seq);
_ ->
%% Ignore
_ = trace(Msg),
output_handler_data(Params, Seq)
end
after 0 ->
- DistHandle = Params#params.dist_handle,
- Q = get_data(DistHandle, empty_q()),
- {Params_1, Seq_1} = output_handler_send(Params, Seq, Q),
- erlang:dist_ctrl_get_data_notification(DistHandle),
+ {Params_1, Seq_1} = output_handler_xfer(Params, Seq),
+ erlang:dist_ctrl_get_data_notification(Params#params.dist_handle),
output_handler(Params_1, Seq_1)
end.
+%% State: we have received at least one dist_tick but no dist_data message
output_handler_tick(Params, Seq) ->
receive
Msg ->
@@ -1263,8 +1264,7 @@ output_handler_tick(Params, Seq) ->
dist_tick ->
output_handler_tick(Params, Seq);
_ when Msg =:= Params#params.rekey_msg ->
- Params_1 = output_handler_rekey(Params, Seq),
- output_handler(Params_1, 0);
+ output_handler_rekey(Params, Seq);
_ ->
%% Ignore
_ = trace(Msg),
@@ -1273,35 +1273,76 @@ output_handler_tick(Params, Seq) ->
after 0 ->
TickSize = 7 + rand:uniform(56),
TickData = binary:copy(<<0>>, TickSize),
- case
- encrypt_and_send_chunk(Params, Seq, [?TICK_CHUNK, TickData])
- of
- {Params_1, Seq_1, ok} ->
+ {Params_1, Seq_1, Result} =
+ encrypt_and_send_chunk(Params, Seq, [?TICK_CHUNK, TickData]),
+ if
+ Result =:= ok ->
output_handler(Params_1, Seq_1);
- {_, _, Error} ->
- death_row({send_tick, trace(Error)})
+ true ->
+ death_row({send_tick, trace(Result)})
end
end.
output_handler_rekey(Params, Seq) ->
case encrypt_and_send_rekey_chunk(Params, Seq) of
#params{} = Params_1 ->
- Params_1;
+ output_handler(Params_1, 0);
SendError ->
death_row({send_rekey, trace(SendError)})
end.
-output_handler_send(Params, Seq, {_Front, 0, _Rear}) ->
- {Params, Seq};
-output_handler_send(Params, Seq, {Front, _Size, Rear}) ->
- Cleartext = Front ++ lists:reverse(Rear),
- case
- encrypt_and_send_chunk(Params, Seq, [?DATA_CHUNK, Cleartext])
- of
- {Params_1, Seq_1, ok} ->
- {Params_1, Seq_1};
- {_, _, Error} ->
- death_row({send_chunk, trace(Error)})
+
+
+output_handler_xfer(Params, Seq) ->
+ output_handler_xfer(Params, Seq, [], 0, []).
+%%
+output_handler_xfer(Params, Seq, {Front, Size, Rear}) ->
+ output_handler_xfer(Params, Seq, Front, Size, Rear).
+%%
+output_handler_xfer(Params, Seq, Front, Size, Rear)
+ when ?CHUNK_SIZE =< Size ->
+ {Data, Q} = deq_iovec(?CHUNK_SIZE, Front, Size, Rear),
+ {Params_1, Seq_1, Result} =
+ encrypt_and_send_chunk(Params, Seq, [?DATA_CHUNK, Data]),
+ if
+ Result =:= ok ->
+ output_handler_xfer(Params_1, Seq_1, Q);
+ true ->
+ death_row({send_chunk, trace(Result)})
+ end;
+output_handler_xfer(Params, Seq, Front, Size, Rear) ->
+ case erlang:dist_ctrl_get_data(Params#params.dist_handle) of
+ none ->
+ if
+ Size =:= 0 ->
+ {Params, Seq};
+ true ->
+ Data = Front ++ lists:reverse(Rear),
+ {Params_1, Seq_1, Result} =
+ encrypt_and_send_chunk(
+ Params, Seq, [?DATA_CHUNK, Data]),
+ if
+ Result =:= ok ->
+ {Params_1, Seq_1};
+ true ->
+ death_row({send_chunk, trace(Result)})
+ end
+ end;
+ Bin when is_binary(Bin) ->
+ Len = byte_size(Bin),
+ output_handler_xfer(
+ Params, Seq, Front,
+ Size + 4 + Len, [Bin, <<Len:32>>|Rear]);
+ [Bin1, Bin2] ->
+ Len = byte_size(Bin1) + byte_size(Bin2),
+ output_handler_xfer(
+ Params, Seq, Front,
+ Size + 4 + Len, [Bin2, Bin1, <<Len:32>>|Rear]);
+ Iovec ->
+ Len = iolist_size(Iovec),
+ output_handler_xfer(
+ Params, Seq, Front,
+ Size + 4 + Len, lists:reverse(Iovec, [<<Len:32>>|Rear]))
end.
%% -------------------------------------------------------------------------
@@ -1347,34 +1388,6 @@ input_chunk(Params, Seq, Q, Chunk) ->
%% -------------------------------------------------------------------------
%% erlang:dist_ctrl_* helpers
-%% Get data for sending from the VM and place it in a queue
-%%
-get_data(DistHandle, {Front, Size, Rear}) ->
- get_data(DistHandle, Front, Size, Rear).
-%%
-get_data(_DistHandle, Front, Size, Rear) when ?CHUNK_SIZE =< Size ->
- {Front, Size, Rear};
-get_data(DistHandle, Front, Size, Rear) ->
- case erlang:dist_ctrl_get_data(DistHandle) of
- none ->
- {Front, Size, Rear};
- Bin when is_binary(Bin) ->
- Len = byte_size(Bin),
- get_data(
- DistHandle, Front, Size + 4 + Len,
- [Bin, <<Len:32>>|Rear]);
- [Bin1, Bin2] ->
- Len = byte_size(Bin1) + byte_size(Bin2),
- get_data(
- DistHandle, Front, Size + 4 + Len,
- [Bin2, Bin1, <<Len:32>>|Rear]);
- Iovec ->
- Len = iolist_size(Iovec),
- get_data(
- DistHandle, Front, Size + 4 + Len,
- lists:reverse(Iovec, [<<Len:32>>|Rear]))
- end.
-
%% De-packet and deliver received data to the VM from a queue
%%
deliver_data(DistHandle, Q) ->
@@ -1434,8 +1447,8 @@ deliver_data(DistHandle, Front, Size, Rear, Bin) ->
encrypt_and_send_chunk(
#params{
- socket = Socket, rekey_count = Seq, rekey_msg = RekeyMsg} = Params,
- Seq, Cleartext) ->
+ socket = Socket, rekey_count = RekeyCount, rekey_msg = RekeyMsg} = Params,
+ Seq, Cleartext) when Seq =:= RekeyCount ->
%%
cancel_rekey_timer(RekeyMsg),
case encrypt_and_send_rekey_chunk(Params, Seq) of
--
2.35.3