File 4146-Clean-up-output_handler.patch of Package erlang
From a626e320710c1a1572126e7aa5e2285a42cb747e Mon Sep 17 00:00:00 2001
From: Raimo Niskanen <raimo@erlang.org>
Date: Mon, 24 Oct 2022 17:11:18 +0200
Subject: [PATCH 26/27] Clean up output_handler
---
lib/ssl/test/inet_crypto_dist.erl | 240 +++++++++++++++++-------------
1 file changed, 139 insertions(+), 101 deletions(-)
diff --git a/lib/ssl/test/inet_crypto_dist.erl b/lib/ssl/test/inet_crypto_dist.erl
index b9a7e5ba1f..f28da210f7 100644
--- a/lib/ssl/test/inet_crypto_dist.erl
+++ b/lib/ssl/test/inet_crypto_dist.erl
@@ -1093,26 +1093,8 @@ handshake(
link(Controller),
receive
DistHandle ->
- ok =
- inet:setopts(
- Socket,
- [{active, ?TCP_ACTIVE},
- inet_tcp_dist:nodelay()]),
- try
- input_handler(
- RecvParams#params{
- dist_handle = DistHandle},
- RecvSeq)
- catch
- Class : Reason : Stacktrace ->
- error_logger:info_report(
- [input_handler_exception,
- {class, Class},
- {reason, Reason},
- {stacktrace, Stacktrace}]),
- erlang:raise(
- Class, Reason, Stacktrace)
- end
+ input_handler(
+ RecvParams, RecvSeq, DistHandle)
end
end,
[link,
@@ -1137,29 +1119,12 @@ handshake(
{fullsweep_after, 0}])),
_ = monitor(process, InputHandler), % For the benchmark test
ok = gen_tcp:controlling_process(Socket, InputHandler),
+ false = erlang:dist_ctrl_set_opt(DistHandle, get_size, true),
ok = erlang:dist_ctrl_input_handler(DistHandle, InputHandler),
InputHandler ! DistHandle,
- crypto:rand_seed_alg(crypto_cache),
reply(From, ok),
process_flag(priority, normal),
- erlang:dist_ctrl_get_data_notification(DistHandle),
- try
- output_handler(
- SendParams#params{
- dist_handle = DistHandle,
- rekey_msg =
- start_rekey_timer(SendParams#params.rekey_time)},
- SendSeq)
- catch
- Class : Reason : Stacktrace ->
- error_logger:info_report(
- [output_handler_exception,
- {class, Class},
- {reason, Reason},
- {stacktrace, Stacktrace}]),
- erlang:raise(
- Class, Reason, Stacktrace)
- end;
+ output_handler(SendParams, SendSeq, DistHandle);
%%
{?MODULE, From, {send, Data}} ->
{SendParams_1, SendSeq_1, Result} =
@@ -1223,10 +1188,38 @@ recv_and_decrypt_chunk(#params{socket = Socket} = RecvParams, RecvSeq) ->
%% -------------------------------------------------------------------------
%% Output handler process
%%
-%% The game here is to flush all dist_data and dist_tick messages,
-%% prioritize dist_data over dist_tick, and to not use selective receive
-%% because that would hurt performance during overload
+%% Await an event about what to do; fetch dist data from the VM,
+%% send a dist tick, or rekey outbound encryption parameters.
+%%
+%% In case we are overloaded and could get many accumulated
+%% dist_data or dist_tick messages; make sure to flush all of them
+%% before proceeding with what to do. But, do not use selective
+%% receive since that does not perform well when there are
+%% many messages in the process mailbox.
+
+%% Entry function
+output_handler(Params, Seq, DistHandle) ->
+ try
+ _ = crypto:rand_seed_alg(crypto_cache),
+ erlang:dist_ctrl_get_data_notification(DistHandle),
+ output_handler(
+ Params#params{
+ dist_handle = DistHandle,
+ rekey_msg = start_rekey_timer(Params#params.rekey_time)},
+ Seq)
+ catch
+ Class : Reason : Stacktrace ->
+ error_logger:info_report(
+ [output_handler_exception,
+ {class, Class},
+ {reason, Reason},
+ {stacktrace, Stacktrace}]),
+ erlang:raise(Class, Reason, Stacktrace)
+ end.
+%% Loop top
+%%
+%% State: lurking until any interesting message
output_handler(Params, Seq) ->
receive
Msg ->
@@ -1305,64 +1298,130 @@ output_handler_rekey(Params, Seq) ->
end.
-
+%% Get outbound data from VM; encrypt and send,
+%% until the VM has no more
+%%
output_handler_xfer(Params, Seq) ->
output_handler_xfer(Params, Seq, [], 0, []).
%%
-output_handler_xfer(Params, Seq, {Front, Size, Rear}) ->
- output_handler_xfer(Params, Seq, Front, Size, Rear).
+%% Front,Size,Rear is an Okasaki queue of binaries with total byte Size
%%
output_handler_xfer(Params, Seq, Front, Size, Rear)
when ?CHUNK_SIZE =< Size ->
- {Data, Q} = deq_iovec(?CHUNK_SIZE, Front, Size, Rear),
- {Params_1, Seq_1, Result} =
- encrypt_and_send_chunk(
- Params, Seq, [?DATA_CHUNK, Data], 1 + ?CHUNK_SIZE),
- if
- Result =:= ok ->
- output_handler_xfer(Params_1, Seq_1, Q);
- true ->
- death_row({send_chunk, trace(Result)})
- end;
+ %%
+ %% We have a full chunk or more
+ %% -> collect one chunk or less and send
+ output_handler_collect(Params, Seq, Front, Size, Rear);
output_handler_xfer(Params, Seq, Front, Size, Rear) ->
+ %% when Size < ?CHUNK_SIZE ->
+ %%
+ %% We do not have a full chunk -> try to fetch more from VM
case erlang:dist_ctrl_get_data(Params#params.dist_handle) of
none ->
if
Size =:= 0 ->
+ %% No more data from VM, nothing buffered
+ %% -> go back to lurking
{Params, Seq};
true ->
- Data = Front ++ lists:reverse(Rear),
- {Params_1, Seq_1, Result} =
- encrypt_and_send_chunk(
- Params, Seq, [?DATA_CHUNK, Data], 1 + Size),
- if
- Result =:= ok ->
- {Params_1, Seq_1};
- true ->
- death_row({send_chunk, trace(Result)})
- end
+ %% The VM had no more -> send what we have
+ output_handler_collect(Params, Seq, Front, Size, Rear)
end;
- Bin when is_binary(Bin) ->
- Len = byte_size(Bin),
- output_handler_xfer(
- Params, Seq, Front,
- Size + 4 + Len, [Bin, <<Len:32>>|Rear]);
- [Bin1, Bin2] ->
- Len = byte_size(Bin1) + byte_size(Bin2),
- output_handler_xfer(
- Params, Seq, Front,
- Size + 4 + Len, [Bin2, Bin1, <<Len:32>>|Rear]);
- Iovec ->
- Len = iolist_size(Iovec),
- output_handler_xfer(
- Params, Seq, Front,
- Size + 4 + Len, lists:reverse(Iovec, [<<Len:32>>|Rear]))
+ {Len,Iov} ->
+ output_handler_enq(
+ Params, Seq, Front, Size + 4 + Len, [<<Len:32>>|Rear], Iov)
+ end.
+
+%% Enqueue VM data while splitting large binaries into ?CHUNK_SIZE
+%%
+output_handler_enq(Params, Seq, Front, Size, Rear, []) ->
+ output_handler_xfer(Params, Seq, Front, Size, Rear);
+output_handler_enq(Params, Seq, Front, Size, Rear, [Bin|Iov]) ->
+ output_handler_enq(Params, Seq, Front, Size, Rear, Iov, Bin).
+%%
+output_handler_enq(Params, Seq, Front, Size, Rear, Iov, Bin) ->
+ BinSize = byte_size(Bin),
+ if
+ BinSize =< ?CHUNK_SIZE ->
+ output_handler_enq(
+ Params, Seq, Front, Size, [Bin|Rear], Iov);
+ true ->
+ <<Bin1:?CHUNK_SIZE/binary, Bin2/binary>> = Bin,
+ output_handler_enq(
+ Params, Seq, Front, Size, [Bin1|Rear], Iov, Bin2)
+ end.
+
+%% Collect small binaries into chunks of at most ?CHUNK_SIZE
+%%
+output_handler_collect(Params, Seq, [], Zero, []) ->
+ 0 = Zero, % Assert
+ %% No more enqueued -> try to get more form VM
+ output_handler_xfer(Params, Seq);
+output_handler_collect(Params, Seq, Front, Size, Rear) ->
+ output_handler_collect(Params, Seq, Front, Size, Rear, [], 0).
+%%
+output_handler_collect(Params, Seq, [], Zero, [], Acc, DataSize) ->
+ 0 = Zero, % Assert
+ output_handler_chunk(Params, Seq, [], Zero, [], Acc, DataSize);
+output_handler_collect(Params, Seq, [], Size, Rear, Acc, DataSize) ->
+ %% Okasaki queue transfer Rear -> Front
+ output_handler_collect(
+ Params, Seq, lists:reverse(Rear), Size, [], Acc, DataSize);
+output_handler_collect(
+ Params, Seq, [Bin|Iov] = Front, Size, Rear, Acc, DataSize) ->
+ BinSize = byte_size(Bin),
+ DataSize_1 = DataSize + BinSize,
+ if
+ ?CHUNK_SIZE < DataSize_1 ->
+ %% Bin does not fit in chunk -> send Acc
+ output_handler_chunk(
+ Params, Seq, Front, Size, Rear, Acc, DataSize);
+ DataSize_1 < ?CHUNK_SIZE ->
+ %% Chunk not full yet -> try to accumulate more
+ output_handler_collect(
+ Params, Seq, Iov, Size - BinSize, Rear, [Bin|Acc], DataSize_1);
+ true -> % DataSize_1 == ?CHUNK_SIZE ->
+ %% Optimize one iteration; Bin fits exactly -> accumulate and send
+ output_handler_chunk(
+ Params, Seq, Iov, Size - BinSize, Rear, [Bin|Acc], DataSize_1)
+ end.
+
+%% Encrypt and send a chunk
+%%
+output_handler_chunk(Params, Seq, Front, Size, Rear, Acc, DataSize) ->
+ Data = lists:reverse(Acc),
+ {Params_1, Seq_1, Result} =
+ encrypt_and_send_chunk(Params, Seq, [?DATA_CHUNK|Data], 1 + DataSize),
+ if
+ Result =:= ok ->
+ %% Try to collect another chunk
+ output_handler_collect(Params_1, Seq_1, Front, Size, Rear);
+ true ->
+ death_row({send_chunk, trace(Result)})
end.
%% -------------------------------------------------------------------------
%% Input handler process
%%
+%% Entry function
+input_handler(#params{socket = Socket} = Params, Seq, DistHandle) ->
+ try
+ ok = inet:setopts(Socket, [{active, ?TCP_ACTIVE}, nodelay()]),
+ input_handler(
+ Params#params{dist_handle = DistHandle},
+ Seq)
+ catch
+ Class : Reason : Stacktrace ->
+ error_logger:info_report(
+ [input_handler_exception,
+ {class, Class},
+ {reason, Reason},
+ {stacktrace, Stacktrace}]),
+ erlang:raise(Class, Reason, Stacktrace)
+ end.
+
+%% Loop top
input_handler(Params, Seq) ->
%% Shortcut into the loop
{Params_1, Seq_1, Data} = input_data(Params, Seq),
@@ -1626,27 +1685,6 @@ block_decrypt(
error
end.
-%% -------------------------------------------------------------------------
-%% Queue of binaries i.e an iovec queue
-
-deq_iovec(GetSize, Front, Size, Rear) ->
- deq_iovec(GetSize, Front, Size, Rear, []).
-%%
-deq_iovec(GetSize, [], Size, Rear, Acc) ->
- deq_iovec(GetSize, lists:reverse(Rear), Size, [], Acc);
-deq_iovec(GetSize, [Bin|Front], Size, Rear, Acc) ->
- BinSize = byte_size(Bin),
- if
- BinSize < GetSize ->
- deq_iovec(
- GetSize - BinSize, Front, Size - BinSize, Rear, [Bin|Acc]);
- GetSize < BinSize ->
- {Bin1,Bin2} = erlang:split_binary(Bin, GetSize),
- {lists:reverse(Acc, [Bin1]), {[Bin2|Front], Size - GetSize, Rear}};
- true ->
- {lists:reverse(Acc, [Bin]), {Front, Size - BinSize, Rear}}
- end.
-
%% -------------------------------------------------------------------------
death_row(Reason) ->
--
2.35.3