File 4142-Rewrite-input_handler-loop.patch of Package erlang
From 5ab182d7f15942f0e0e65f92e2cd88d4fa9ab1f8 Mon Sep 17 00:00:00 2001
From: Raimo Niskanen <raimo@erlang.org>
Date: Thu, 20 Oct 2022 15:47:40 +0200
Subject: [PATCH 22/27] Rewrite input_handler loop
---
lib/ssl/test/inet_crypto_dist.erl | 175 ++++++++++++++++++------------
1 file changed, 103 insertions(+), 72 deletions(-)
diff --git a/lib/ssl/test/inet_crypto_dist.erl b/lib/ssl/test/inet_crypto_dist.erl
index 217eff661e..9e461ac185 100644
--- a/lib/ssl/test/inet_crypto_dist.erl
+++ b/lib/ssl/test/inet_crypto_dist.erl
@@ -971,7 +971,12 @@ init_recv(
RecvParams_1#params{iv = {IV2BSalt, IV2BNo}}}
end
catch
- error : Reason : Stacktrace->
+ Class : Reason : Stacktrace when Class =:= error ->
+ error_logger:info_report(
+ [init_recv_exception,
+ {class, Class},
+ {reason, Reason},
+ {stacktrace, Stacktrace}]),
_ = trace({Reason, Stacktrace}),
exit(connection_closed)
end.
@@ -1113,7 +1118,7 @@ handshake(
input_handler(
RecvParams#params{
dist_handle = DistHandle},
- RecvSeq, empty_q())
+ RecvSeq)
catch
Class : Reason : Stacktrace ->
error_logger:info_report(
@@ -1198,11 +1203,17 @@ recv_and_decrypt_chunk(#params{socket = Socket} = RecvParams, RecvSeq) ->
case decrypt_chunk(RecvParams, RecvSeq, Chunk) of
<<?HANDSHAKE_CHUNK, Cleartext/binary>> ->
{RecvParams, RecvSeq + 1, {ok, Cleartext}};
- OtherChunk when is_binary(OtherChunk) ->
- {RecvParams, RecvSeq + 1, {error, decrypt_error}};
+ UnknownChunk when is_binary(UnknownChunk) ->
+ error_logger:error_report(
+ [?FUNCTION_NAME,
+ {reason,unknown_chunk}]),
+ {RecvParams, RecvSeq + 1, {error, unknown_chunk}};
#params{} = RecvParams_1 ->
recv_and_decrypt_chunk(RecvParams_1, 0);
error ->
+ error_logger:error_report(
+ [?FUNCTION_NAME,
+ {reason,decrypt_error}]),
{RecvParams, RecvSeq, {error, decrypt_error}}
end;
Error ->
@@ -1352,98 +1363,121 @@ output_handler_xfer(Params, Seq, Front, Size, Rear) ->
%% Input handler process
%%
-input_handler(#params{socket = Socket} = Params, Seq, Q) ->
+input_handler(Params, Seq) ->
+ input_handler(Params, Seq, [], 0, []).
+%%
+input_handler(#params{socket = Socket} = Params, Seq, Front, Size, Rear) ->
receive
Msg ->
case Msg of
{tcp_passive, Socket} ->
ok = inet:setopts(Socket, [{active, ?TCP_ACTIVE}]),
- input_handler(Params, Seq, Q);
+ input_handler(Params, Seq, Front, Size, Rear);
{tcp, Socket, Chunk} ->
- input_chunk(Params, Seq, Q, Chunk);
+ input_chunk(Params, Seq, Front, Size, Rear, Chunk);
{tcp_closed, Socket} ->
+ error_logger:info_report(
+ [?FUNCTION_NAME,
+ {reason, tcp_closed}]),
exit(connection_closed);
Other ->
%% Ignore...
_ = trace(Other),
- input_handler(Params, Seq, Q)
+ input_handler(Params, Seq, Front, Size, Rear)
end
end.
-input_chunk(Params, Seq, Q, Chunk) ->
+input_chunk(Params, Seq, Front, Size, Rear, Chunk) ->
case decrypt_chunk(Params, Seq, Chunk) of
<<?DATA_CHUNK, Cleartext/binary>> ->
- Q_1 = enq_binary(Cleartext, Q),
- Q_2 = deliver_data(Params#params.dist_handle, Q_1),
- input_handler(Params, Seq + 1, Q_2);
+ input_deliver(
+ Params, Seq + 1, Front,
+ Size + byte_size(Cleartext), [Cleartext|Rear]);
<<?TICK_CHUNK, _/binary>> ->
- input_handler(Params, Seq + 1, Q);
- OtherChunk when is_binary(OtherChunk) ->
+ input_handler(Params, Seq + 1, Front, Size, Rear);
+ UnknownChunk when is_binary(UnknownChunk) ->
+ error_logger:error_report(
+ [?FUNCTION_NAME,
+ {reason, unknown_chunk}]),
_ = trace(invalid_chunk),
exit(connection_closed);
#params{} = Params_1 ->
- input_handler(Params_1, 0, Q);
+ input_handler(Params_1, 0, Front, Size, Rear);
error ->
_ = trace(decrypt_error),
exit(connection_closed)
end.
-%% -------------------------------------------------------------------------
-%% erlang:dist_ctrl_* helpers
-
-%% De-packet and deliver received data to the VM from a queue
-%%
-deliver_data(DistHandle, Q) ->
- case Q of
- {[], Size, []} ->
- Size = 0, % Assert
- Q;
- {[], Size, Rear} ->
- [Bin|Front] = lists:reverse(Rear),
- deliver_data(DistHandle, Front, Size, [], Bin);
- {[Bin|Front], Size, Rear} ->
- deliver_data(DistHandle, Front, Size, Rear, Bin)
- end.
+input_deliver(Params, Seq, [], Size, []) ->
+ Size = 0, % Assert
+ input_handler(Params, Seq, [], Size, []);
+input_deliver(Params, Seq, [], Size, Rear) ->
+ [Bin|Front] = lists:reverse(Rear),
+ input_deliver(Params, Seq, Front, Size, [], Bin);
+input_deliver(Params, Seq, [Bin|Front], Size, Rear) ->
+ input_deliver(Params, Seq, Front, Size, Rear, Bin).
%%
-deliver_data(DistHandle, Front, Size, Rear, Bin) ->
+input_deliver(Params, Seq, Front, Size, Rear, Bin) ->
case Bin of
<<DataSizeA:32, DataA:DataSizeA/binary,
DataSizeB:32, DataB:DataSizeB/binary, Rest/binary>> ->
+ DistHandle = Params#params.dist_handle,
erlang:dist_ctrl_put_data(DistHandle, DataA),
erlang:dist_ctrl_put_data(DistHandle, DataB),
- deliver_data(
- DistHandle,
+ input_deliver(
+ Params, Seq,
Front, Size - (4 + DataSizeA + 4 + DataSizeB), Rear,
Rest);
<<DataSize:32, Data:DataSize/binary, Rest/binary>> ->
+ DistHandle = Params#params.dist_handle,
erlang:dist_ctrl_put_data(DistHandle, Data),
- deliver_data(DistHandle, Front, Size - (4 + DataSize), Rear, Rest);
+ input_deliver(
+ Params, Seq,
+ Front, Size - (4 + DataSize), Rear,
+ Rest);
<<DataSize:32, FirstData/binary>> ->
+ %% We do not have a complete packet in the first binary
TotalSize = 4 + DataSize,
if
TotalSize =< Size ->
+ %% We have a complete packet queued
BinSize = byte_size(Bin),
{MoreData, Q} =
deq_iovec(
TotalSize - BinSize,
Front, Size - BinSize, Rear),
- erlang:dist_ctrl_put_data(DistHandle, [FirstData|MoreData]),
- deliver_data(DistHandle, Q);
- true -> % Incomplete data
- {[Bin|Front], Size, Rear}
+ DistHandle = Params#params.dist_handle,
+ erlang:dist_ctrl_put_data(
+ DistHandle, [FirstData|MoreData]),
+ input_deliver(Params, Seq, Q);
+ true ->
+ %% We an incomplete packet
+ input_handler(Params, Seq, [Bin|Front], Size, Rear)
end;
<<_/binary>> ->
+ %% We do not have a size header in the first binary
BinSize = byte_size(Bin),
if
- 4 =< Size -> % Fragmented header - extract a header bin
- {RestHeader, {Front_1, _Size_1, Rear_1}} =
- deq_iovec(4 - BinSize, Front, Size - BinSize, Rear),
+ 4 =< Size ->
+ %% We have a size header queued
+ %%
+ %% Extract a binary with just the size header
+ RestSize = 4 - BinSize,
+ {RestHeader, Q} =
+ deq_iovec(RestSize, Front, Size + RestSize, Rear),
Header = iolist_to_binary([Bin|RestHeader]),
- deliver_data(DistHandle, Front_1, Size, Rear_1, Header);
- true -> % Incomplete header
- {[Bin|Front], Size, Rear}
+ input_deliver(Params, Seq, Q, Header);
+ true ->
+ %% We an incomplete size header
+ input_handler(Params, Seq, [Bin|Front], Size, Rear)
end
end.
+%%
+input_deliver(Params, Seq, {Front, Size, Rear}) ->
+ input_deliver(Params, Seq, Front, Size, Rear).
+%%
+input_deliver(Params, Seq, {Front, Size, Rear}, Bin) ->
+ input_deliver(Params, Seq, Front, Size, Rear, Bin).
%% -------------------------------------------------------------------------
%% Encryption and decryption helpers
@@ -1485,7 +1519,7 @@ encrypt_and_send_rekey_chunk(
gen_tcp:send(
Socket,
encrypt_chunk(
- Params, Seq, [?REKEY_CHUNK, PubKeyA], byte_size(PubKeyA)))
+ Params, Seq, [?REKEY_CHUNK, PubKeyA], 1 + byte_size(PubKeyA)))
of
ok ->
SharedSecret = compute_shared_secret(KeyPair, PubKeyB),
@@ -1524,20 +1558,19 @@ decrypt_chunk(
ChunkLen = byte_size(Chunk),
if
ChunkLen < TagLen ->
+ error_logger:error_report(
+ [?FUNCTION_NAME,
+ {reason,short_chunk}]),
error;
true ->
AAD = <<Seq:32, ChunkLen:32>>,
IVBin = <<IVSalt/binary, (IVNo + Seq):48>>,
CiphertextLen = ChunkLen - TagLen,
- case Chunk of
- <<Ciphertext:CiphertextLen/binary,
- CipherTag:TagLen/binary>> ->
- block_decrypt(
- Params, Seq, AeadCipher, Key, IVBin,
- Ciphertext, AAD, CipherTag);
- _ ->
- error
- end
+ <<Ciphertext:CiphertextLen/binary,
+ CipherTag:TagLen/binary>> = Chunk,
+ block_decrypt(
+ Params, Seq, AeadCipher, Key, IVBin,
+ Ciphertext, AAD, CipherTag)
end.
block_decrypt(
@@ -1549,9 +1582,9 @@ block_decrypt(
crypto:crypto_one_time_aead(
AeadCipher, Key, IV, Ciphertext, AAD, CipherTag, false)
of
- <<?REKEY_CHUNK, Rest/binary>> ->
+ <<?REKEY_CHUNK, Chunk/binary>> ->
PubKeyLen = byte_size(PubKeyA),
- case Rest of
+ case Chunk of
<<PubKeyB:PubKeyLen/binary>> ->
SharedSecret = compute_shared_secret(KeyPair, PubKeyB),
KeyLen = byte_size(Key),
@@ -1563,34 +1596,32 @@ block_decrypt(
SharedSecret, [Key, IV], KeyLen, IVLen),
Params#params{iv = {IVSalt, IVNo}, key = Key_1};
_ ->
+ error_logger:error_report(
+ [?FUNCTION_NAME,
+ {reason,bad_rekey_chunk}]),
error
end;
Chunk when is_binary(Chunk) ->
case Seq of
RekeyCount ->
%% This was one chunk too many without rekeying
+ error_logger:error_report(
+ [?FUNCTION_NAME,
+ {reason,rekey_overdue}]),
error;
_ ->
Chunk
end;
error ->
+ error_logger:error_report(
+ [?FUNCTION_NAME,
+ {reason,decrypt_error}]),
error
end.
%% -------------------------------------------------------------------------
%% Queue of binaries i.e an iovec queue
-empty_q() ->
- {[], 0, []}.
-
-enq_binary(Bin, {Front, Size, Rear}) ->
- {Front, Size + byte_size(Bin), [Bin|Rear]}.
-
--ifdef(undefined).
-deq_iovec(GetSize, {Front, Size, Rear}) when GetSize =< Size ->
- deq_iovec(GetSize, Front, Size, Rear, []).
--endif.
-%%
deq_iovec(GetSize, Front, Size, Rear) ->
deq_iovec(GetSize, Front, Size, Rear, []).
%%
@@ -1613,7 +1644,7 @@ deq_iovec(GetSize, [Bin|Front], Size, Rear, Acc) ->
death_row(Reason) ->
error_logger:info_report(
- [death_row,
+ [?FUNCTION_NAME,
{reason, Reason},
{pid, self()}]),
receive after 5000 -> exit(Reason) end.
@@ -1633,18 +1664,18 @@ monitor_dist_proc(Tag, Pid) ->
fun () ->
MRef = erlang:monitor(process, Pid),
error_logger:info_report(
- [monitor_dist_proc,
+ [?FUNCTION_NAME,
{type, Tag},
{pid, Pid}]),
receive
{'DOWN', MRef, _, _, normal} ->
error_logger:error_report(
- [dist_proc_died,
+ [?FUNCTION_NAME,
{reason, normal},
{pid, Pid}]);
{'DOWN', MRef, _, _, Reason} ->
error_logger:info_report(
- [dist_proc_died,
+ [?FUNCTION_NAME,
{reason, Reason},
{pid, Pid}])
end
--
2.35.3