File 4146-Clean-up-output_handler.patch of Package erlang

From a626e320710c1a1572126e7aa5e2285a42cb747e Mon Sep 17 00:00:00 2001
From: Raimo Niskanen <raimo@erlang.org>
Date: Mon, 24 Oct 2022 17:11:18 +0200
Subject: [PATCH 26/27] Clean up output_handler

---
 lib/ssl/test/inet_crypto_dist.erl | 240 +++++++++++++++++-------------
 1 file changed, 139 insertions(+), 101 deletions(-)

diff --git a/lib/ssl/test/inet_crypto_dist.erl b/lib/ssl/test/inet_crypto_dist.erl
index b9a7e5ba1f..f28da210f7 100644
--- a/lib/ssl/test/inet_crypto_dist.erl
+++ b/lib/ssl/test/inet_crypto_dist.erl
@@ -1093,26 +1093,8 @@ handshake(
                             link(Controller),
                             receive
                                 DistHandle ->
-                                    ok =
-                                        inet:setopts(
-                                          Socket,
-                                          [{active, ?TCP_ACTIVE},
-                                           inet_tcp_dist:nodelay()]),
-                                    try
-                                        input_handler(
-                                          RecvParams#params{
-                                            dist_handle = DistHandle},
-                                          RecvSeq)
-                                    catch
-                                        Class : Reason : Stacktrace ->
-                                            error_logger:info_report(
-                                              [input_handler_exception,
-                                               {class, Class},
-                                               {reason, Reason},
-                                               {stacktrace, Stacktrace}]),
-                                            erlang:raise(
-                                              Class, Reason, Stacktrace)
-                                    end
+                                    input_handler(
+                                      RecvParams, RecvSeq, DistHandle)
                             end
                     end,
                     [link,
@@ -1137,29 +1119,12 @@ handshake(
                      {fullsweep_after, 0}])),
             _ = monitor(process, InputHandler), % For the benchmark test
             ok = gen_tcp:controlling_process(Socket, InputHandler),
+            false = erlang:dist_ctrl_set_opt(DistHandle, get_size, true),
             ok = erlang:dist_ctrl_input_handler(DistHandle, InputHandler),
             InputHandler ! DistHandle,
-            crypto:rand_seed_alg(crypto_cache),
             reply(From, ok),
             process_flag(priority, normal),
-            erlang:dist_ctrl_get_data_notification(DistHandle),
-            try
-                output_handler(
-                  SendParams#params{
-                    dist_handle = DistHandle,
-                    rekey_msg =
-                        start_rekey_timer(SendParams#params.rekey_time)},
-                  SendSeq)
-            catch
-                Class : Reason : Stacktrace ->
-                    error_logger:info_report(
-                      [output_handler_exception,
-                       {class, Class},
-                       {reason, Reason},
-                       {stacktrace, Stacktrace}]),
-                    erlang:raise(
-                      Class, Reason, Stacktrace)
-            end;
+            output_handler(SendParams, SendSeq, DistHandle);
         %%
         {?MODULE, From, {send, Data}} ->
             {SendParams_1, SendSeq_1, Result} =
@@ -1223,10 +1188,38 @@ recv_and_decrypt_chunk(#params{socket = Socket} = RecvParams, RecvSeq) ->
 %% -------------------------------------------------------------------------
 %% Output handler process
 %%
-%% The game here is to flush all dist_data and dist_tick messages,
-%% prioritize dist_data over dist_tick, and to not use selective receive
-%% because that would hurt performance during overload
+%% Await an event about what to do; fetch dist data from the VM,
+%% send a dist tick, or rekey outbound encryption parameters.
+%%
+%% In case we are overloaded and could get many accumulated
+%% dist_data or dist_tick messages; make sure to flush all of them
+%% before proceeding with what to do.  But, do not use selective
+%% receive since that does not perform well when there are
+%% many messages in the process mailbox.
+
+%% Entry function
+output_handler(Params, Seq, DistHandle) ->
+    try
+        _ = crypto:rand_seed_alg(crypto_cache),
+        erlang:dist_ctrl_get_data_notification(DistHandle),
+        output_handler(
+          Params#params{
+            dist_handle = DistHandle,
+            rekey_msg = start_rekey_timer(Params#params.rekey_time)},
+          Seq)
+    catch
+        Class : Reason : Stacktrace ->
+            error_logger:info_report(
+              [output_handler_exception,
+               {class, Class},
+               {reason, Reason},
+               {stacktrace, Stacktrace}]),
+            erlang:raise(Class, Reason, Stacktrace)
+    end.
 
+%% Loop top
+%%
+%% State: lurking until any interesting message
 output_handler(Params, Seq) ->
     receive
         Msg ->
@@ -1305,64 +1298,130 @@ output_handler_rekey(Params, Seq) ->
     end.
 
 
-
+%% Get outbound data from VM; encrypt and send,
+%% until the VM has no more
+%%
 output_handler_xfer(Params, Seq) ->
     output_handler_xfer(Params, Seq, [], 0, []).
 %%
-output_handler_xfer(Params, Seq, {Front, Size, Rear}) ->
-    output_handler_xfer(Params, Seq, Front, Size, Rear).
+%% Front,Size,Rear is an Okasaki queue of binaries with total byte Size
 %%
 output_handler_xfer(Params, Seq, Front, Size, Rear)
   when ?CHUNK_SIZE =< Size ->
-    {Data, Q} = deq_iovec(?CHUNK_SIZE, Front, Size, Rear),
-    {Params_1, Seq_1, Result} =
-        encrypt_and_send_chunk(
-          Params, Seq, [?DATA_CHUNK, Data], 1 + ?CHUNK_SIZE),
-    if
-        Result =:= ok ->
-            output_handler_xfer(Params_1, Seq_1, Q);
-        true ->
-            death_row({send_chunk, trace(Result)})
-    end;
+    %%
+    %% We have a full chunk or more
+    %% -> collect one chunk or less and send
+    output_handler_collect(Params, Seq, Front, Size, Rear);
 output_handler_xfer(Params, Seq, Front, Size, Rear) ->
+    %% when Size < ?CHUNK_SIZE ->
+    %%
+    %% We do not have a full chunk -> try to fetch more from VM
     case erlang:dist_ctrl_get_data(Params#params.dist_handle) of
         none ->
             if
                 Size =:= 0 ->
+                    %% No more data from VM, nothing buffered
+                    %% -> go back to lurking
                     {Params, Seq};
                 true ->
-                    Data = Front ++ lists:reverse(Rear),
-                    {Params_1, Seq_1, Result} =
-                        encrypt_and_send_chunk(
-                          Params, Seq, [?DATA_CHUNK, Data], 1 + Size),
-                    if
-                        Result =:= ok ->
-                            {Params_1, Seq_1};
-                        true ->
-                            death_row({send_chunk, trace(Result)})
-                    end
+                    %% The VM had no more -> send what we have
+                    output_handler_collect(Params, Seq, Front, Size, Rear)
             end;
-        Bin when is_binary(Bin) ->
-            Len = byte_size(Bin),
-            output_handler_xfer(
-              Params, Seq, Front,
-              Size + 4 + Len, [Bin, <<Len:32>>|Rear]);
-        [Bin1, Bin2] ->
-            Len = byte_size(Bin1) + byte_size(Bin2),
-            output_handler_xfer(
-              Params, Seq, Front,
-              Size + 4 + Len, [Bin2, Bin1, <<Len:32>>|Rear]);
-        Iovec ->
-            Len = iolist_size(Iovec),
-            output_handler_xfer(
-              Params, Seq, Front,
-              Size + 4 + Len, lists:reverse(Iovec, [<<Len:32>>|Rear]))
+        {Len,Iov} ->
+            output_handler_enq(
+              Params, Seq, Front, Size + 4 + Len, [<<Len:32>>|Rear], Iov)
+    end.
+
+%% Enqueue VM data while splitting large binaries into ?CHUNK_SIZE
+%%
+output_handler_enq(Params, Seq, Front, Size, Rear, []) ->
+    output_handler_xfer(Params, Seq, Front, Size, Rear);
+output_handler_enq(Params, Seq, Front, Size, Rear, [Bin|Iov]) ->
+    output_handler_enq(Params, Seq, Front, Size, Rear, Iov, Bin).
+%%
+output_handler_enq(Params, Seq, Front, Size, Rear, Iov, Bin) ->
+    BinSize = byte_size(Bin),
+    if
+        BinSize =< ?CHUNK_SIZE ->
+            output_handler_enq(
+              Params, Seq, Front, Size, [Bin|Rear], Iov);
+        true ->
+            <<Bin1:?CHUNK_SIZE/binary, Bin2/binary>> = Bin,
+            output_handler_enq(
+              Params, Seq, Front, Size, [Bin1|Rear], Iov, Bin2)
+    end.
+
+%% Collect small binaries into chunks of at most ?CHUNK_SIZE
+%%
+output_handler_collect(Params, Seq, [], Zero, []) ->
+    0 = Zero, % Assert
+    %% No more enqueued -> try to get more form VM
+    output_handler_xfer(Params, Seq);
+output_handler_collect(Params, Seq, Front, Size, Rear) ->
+    output_handler_collect(Params, Seq, Front, Size, Rear, [], 0).
+%%
+output_handler_collect(Params, Seq, [], Zero, [], Acc, DataSize) ->
+    0 = Zero, % Assert
+    output_handler_chunk(Params, Seq, [], Zero, [], Acc, DataSize);
+output_handler_collect(Params, Seq, [], Size, Rear, Acc, DataSize) ->
+    %% Okasaki queue transfer Rear -> Front
+    output_handler_collect(
+      Params, Seq, lists:reverse(Rear), Size, [], Acc, DataSize);
+output_handler_collect(
+  Params, Seq, [Bin|Iov] = Front, Size, Rear, Acc, DataSize) ->
+    BinSize = byte_size(Bin),
+    DataSize_1 = DataSize + BinSize,
+    if
+        ?CHUNK_SIZE < DataSize_1 ->
+            %% Bin does not fit in chunk -> send Acc
+            output_handler_chunk(
+              Params, Seq, Front, Size, Rear, Acc, DataSize);
+        DataSize_1 < ?CHUNK_SIZE ->
+            %% Chunk not full yet -> try to accumulate more
+            output_handler_collect(
+              Params, Seq, Iov, Size - BinSize, Rear, [Bin|Acc], DataSize_1);
+        true -> % DataSize_1 == ?CHUNK_SIZE ->
+            %% Optimize one iteration; Bin fits exactly -> accumulate and send
+            output_handler_chunk(
+              Params, Seq, Iov, Size - BinSize, Rear, [Bin|Acc], DataSize_1)
+    end.
+
+%% Encrypt and send a chunk
+%%
+output_handler_chunk(Params, Seq, Front, Size, Rear, Acc, DataSize) ->
+    Data = lists:reverse(Acc),
+    {Params_1, Seq_1, Result} =
+        encrypt_and_send_chunk(Params, Seq, [?DATA_CHUNK|Data], 1 + DataSize),
+    if
+        Result =:= ok ->
+            %% Try to collect another chunk
+            output_handler_collect(Params_1, Seq_1, Front, Size, Rear);
+        true ->
+            death_row({send_chunk, trace(Result)})
     end.
 
 %% -------------------------------------------------------------------------
 %% Input handler process
 %%
 
+%% Entry function
+input_handler(#params{socket = Socket} = Params, Seq, DistHandle) ->
+    try
+        ok = inet:setopts(Socket, [{active, ?TCP_ACTIVE}, nodelay()]),
+        input_handler(
+          Params#params{dist_handle = DistHandle},
+          Seq)
+    catch
+        Class : Reason : Stacktrace ->
+            error_logger:info_report(
+              [input_handler_exception,
+               {class, Class},
+               {reason, Reason},
+               {stacktrace, Stacktrace}]),
+            erlang:raise(Class, Reason, Stacktrace)
+    end.
+
+%% Loop top
 input_handler(Params, Seq) ->
     %% Shortcut into the loop
     {Params_1, Seq_1, Data} = input_data(Params, Seq),
@@ -1626,27 +1685,6 @@ block_decrypt(
             error
     end.
 
-%% -------------------------------------------------------------------------
-%% Queue of binaries i.e an iovec queue
-
-deq_iovec(GetSize, Front, Size, Rear) ->
-    deq_iovec(GetSize, Front, Size, Rear, []).
-%%
-deq_iovec(GetSize, [], Size, Rear, Acc) ->
-    deq_iovec(GetSize, lists:reverse(Rear), Size, [], Acc);
-deq_iovec(GetSize, [Bin|Front], Size, Rear, Acc) ->
-    BinSize = byte_size(Bin),
-    if
-        BinSize < GetSize ->
-            deq_iovec(
-              GetSize - BinSize, Front, Size - BinSize, Rear, [Bin|Acc]);
-        GetSize < BinSize ->
-            {Bin1,Bin2} = erlang:split_binary(Bin, GetSize),
-            {lists:reverse(Acc, [Bin1]), {[Bin2|Front], Size - GetSize, Rear}};
-        true ->
-            {lists:reverse(Acc, [Bin]), {Front, Size - BinSize, Rear}}
-    end.
-
 %% -------------------------------------------------------------------------
 
 death_row(Reason) ->
-- 
2.35.3

openSUSE Build Service is sponsored by