File 4140-Improve-output-xfer-loop.patch of Package erlang

From 575a02e6867bcb808e6326df48ae27ccd108d066 Mon Sep 17 00:00:00 2001
From: Raimo Niskanen <raimo@erlang.org>
Date: Thu, 20 Oct 2022 11:25:53 +0200
Subject: [PATCH 20/27] Improve output xfer loop

---
 lib/ssl/test/inet_crypto_dist.erl | 153 ++++++++++++++++--------------
 1 file changed, 83 insertions(+), 70 deletions(-)

diff --git a/lib/ssl/test/inet_crypto_dist.erl b/lib/ssl/test/inet_crypto_dist.erl
index 7640464664..8c17bbe4e5 100644
--- a/lib/ssl/test/inet_crypto_dist.erl
+++ b/lib/ssl/test/inet_crypto_dist.erl
@@ -1157,29 +1157,31 @@ handshake(
             end;
         %%
         {?MODULE, From, {send, Data}} ->
-            case
+            {SendParams_1, SendSeq_1, Result} =
                 encrypt_and_send_chunk(
-                  SendParams, SendSeq, [?HANDSHAKE_CHUNK, Data])
-            of
-                {SendParams_1, SendSeq_1, ok} ->
+                  SendParams, SendSeq, [?HANDSHAKE_CHUNK, Data]),
+            if
+                Result =:= ok ->
                     reply(From, ok),
                     handshake(
                       SendParams_1, SendSeq_1, RecvParams, RecvSeq,
                       Controller);
-                {_, _, Error} ->
+                true ->
                     reply(From, {error, closed}),
-                    death_row({send, trace(Error)})
+                    death_row({send, trace(Result)})
             end;
         {?MODULE, From, recv} ->
-            case recv_and_decrypt_chunk(RecvParams, RecvSeq) of
-                {RecvParams_1, RecvSeq_1, {ok, _} = Reply} ->
-                    reply(From, Reply),
+            {RecvParams_1, RecvSeq_1, Result} =
+                recv_and_decrypt_chunk(RecvParams, RecvSeq),
+            case Result of
+                {ok, _} ->
+                    reply(From, Result),
                     handshake(
                       SendParams, SendSeq, RecvParams_1, RecvSeq_1,
                       Controller);
-                {_, _, Error} ->
-                    reply(From, Error),
-                    death_row({recv, trace(Error)})
+                {error, _} ->
+                    reply(From, Result),
+                    death_row({recv, trace(Result)})
             end;
         {?MODULE, From, peername} ->
             reply(From, inet:peername(Socket)),
@@ -1211,6 +1213,7 @@ recv_and_decrypt_chunk(#params{socket = Socket} = RecvParams, RecvSeq) ->
 %%
 %% The game here is to flush all dist_data and dist_tick messages,
 %% prioritize dist_data over dist_tick, and to not use selective receive
+%% because that would hurt performance during overload
 
 output_handler(Params, Seq) ->
     receive
@@ -1221,8 +1224,7 @@ output_handler(Params, Seq) ->
                 dist_tick ->
                     output_handler_tick(Params, Seq);
                 _ when Msg =:= Params#params.rekey_msg ->
-                    Params_1 = output_handler_rekey(Params, Seq),
-                    output_handler(Params_1, 0);
+                    output_handler_rekey(Params, Seq);
                 _ ->
                     %% Ignore
                     _ = trace(Msg),
@@ -1230,6 +1232,7 @@ output_handler(Params, Seq) ->
             end
     end.
 
+%% State: we have received at least one dist_data message
 output_handler_data(Params, Seq) ->
     receive
         Msg ->
@@ -1239,21 +1242,19 @@ output_handler_data(Params, Seq) ->
                 dist_tick ->
                     output_handler_data(Params, Seq);
                 _ when Msg =:= Params#params.rekey_msg ->
-                    Params_1 = output_handler_rekey(Params, Seq),
-                    output_handler_data(Params_1, 0);
+                    output_handler_rekey(Params, Seq);
                 _ ->
                     %% Ignore
                     _ = trace(Msg),
                     output_handler_data(Params, Seq)
             end
     after 0 ->
-            DistHandle = Params#params.dist_handle,
-            Q = get_data(DistHandle, empty_q()),
-            {Params_1, Seq_1} = output_handler_send(Params, Seq, Q),
-            erlang:dist_ctrl_get_data_notification(DistHandle),
+            {Params_1, Seq_1} = output_handler_xfer(Params, Seq),
+            erlang:dist_ctrl_get_data_notification(Params#params.dist_handle),
             output_handler(Params_1, Seq_1)
     end.
 
+%% State: we have received at least one dist_tick but no dist_data message
 output_handler_tick(Params, Seq) ->
     receive
         Msg ->
@@ -1263,8 +1264,7 @@ output_handler_tick(Params, Seq) ->
                 dist_tick ->
                     output_handler_tick(Params, Seq);
                 _ when Msg =:= Params#params.rekey_msg ->
-                    Params_1 = output_handler_rekey(Params, Seq),
-                    output_handler(Params_1, 0);
+                    output_handler_rekey(Params, Seq);
                 _ ->
                     %% Ignore
                     _ = trace(Msg),
@@ -1273,35 +1273,76 @@ output_handler_tick(Params, Seq) ->
     after 0 ->
             TickSize = 7 + rand:uniform(56),
             TickData = binary:copy(<<0>>, TickSize),
-            case
-                encrypt_and_send_chunk(Params, Seq, [?TICK_CHUNK, TickData])
-            of
-                {Params_1, Seq_1, ok} ->
+            {Params_1, Seq_1, Result} =
+                encrypt_and_send_chunk(Params, Seq, [?TICK_CHUNK, TickData]),
+            if
+                Result =:= ok ->
                     output_handler(Params_1, Seq_1);
-                {_, _, Error} ->
-                    death_row({send_tick, trace(Error)})
+                true ->
+                    death_row({send_tick, trace(Result)})
             end
     end.
 
 output_handler_rekey(Params, Seq) ->
     case encrypt_and_send_rekey_chunk(Params, Seq) of
         #params{} = Params_1 ->
-            Params_1;
+            output_handler(Params_1, 0);
         SendError ->
             death_row({send_rekey, trace(SendError)})
     end.
 
-output_handler_send(Params, Seq, {_Front, 0, _Rear}) ->
-    {Params, Seq};
-output_handler_send(Params, Seq, {Front, _Size, Rear}) ->
-    Cleartext = Front ++ lists:reverse(Rear),
-    case
-        encrypt_and_send_chunk(Params, Seq, [?DATA_CHUNK, Cleartext])
-    of
-        {Params_1, Seq_1, ok} ->
-            {Params_1, Seq_1};
-        {_, _, Error} ->
-            death_row({send_chunk, trace(Error)})
+
+
+output_handler_xfer(Params, Seq) ->
+    output_handler_xfer(Params, Seq, [], 0, []).
+%%
+output_handler_xfer(Params, Seq, {Front, Size, Rear}) ->
+    output_handler_xfer(Params, Seq, Front, Size, Rear).
+%%
+output_handler_xfer(Params, Seq, Front, Size, Rear)
+  when ?CHUNK_SIZE =< Size ->
+    {Data, Q} = deq_iovec(?CHUNK_SIZE, Front, Size, Rear),
+    {Params_1, Seq_1, Result} =
+        encrypt_and_send_chunk(Params, Seq, [?DATA_CHUNK, Data]),
+    if
+        Result =:= ok ->
+            output_handler_xfer(Params_1, Seq_1, Q);
+        true ->
+            death_row({send_chunk, trace(Result)})
+    end;
+output_handler_xfer(Params, Seq, Front, Size, Rear) ->
+    case erlang:dist_ctrl_get_data(Params#params.dist_handle) of
+        none ->
+            if
+                Size =:= 0 ->
+                    {Params, Seq};
+                true ->
+                    Data = Front ++ lists:reverse(Rear),
+                    {Params_1, Seq_1, Result} =
+                        encrypt_and_send_chunk(
+                          Params, Seq, [?DATA_CHUNK, Data]),
+                    if
+                        Result =:= ok ->
+                            {Params_1, Seq_1};
+                        true ->
+                            death_row({send_chunk, trace(Result)})
+                    end
+            end;
+        Bin when is_binary(Bin) ->
+            Len = byte_size(Bin),
+            output_handler_xfer(
+              Params, Seq, Front,
+              Size + 4 + Len, [Bin, <<Len:32>>|Rear]);
+        [Bin1, Bin2] ->
+            Len = byte_size(Bin1) + byte_size(Bin2),
+            output_handler_xfer(
+              Params, Seq, Front,
+              Size + 4 + Len, [Bin2, Bin1, <<Len:32>>|Rear]);
+        Iovec ->
+            Len = iolist_size(Iovec),
+            output_handler_xfer(
+              Params, Seq, Front,
+              Size + 4 + Len, lists:reverse(Iovec, [<<Len:32>>|Rear]))
     end.
 
 %% -------------------------------------------------------------------------
@@ -1347,34 +1388,6 @@ input_chunk(Params, Seq, Q, Chunk) ->
 %% -------------------------------------------------------------------------
 %% erlang:dist_ctrl_* helpers
 
-%% Get data for sending from the VM and place it in a queue
-%%
-get_data(DistHandle, {Front, Size, Rear}) ->
-    get_data(DistHandle, Front, Size, Rear).
-%%
-get_data(_DistHandle, Front, Size, Rear) when ?CHUNK_SIZE =< Size ->
-    {Front, Size, Rear};
-get_data(DistHandle, Front, Size, Rear) ->
-    case erlang:dist_ctrl_get_data(DistHandle) of
-        none ->
-            {Front, Size, Rear};
-        Bin when is_binary(Bin)  ->
-            Len = byte_size(Bin),
-            get_data(
-              DistHandle, Front, Size + 4 + Len,
-              [Bin, <<Len:32>>|Rear]);
-        [Bin1, Bin2] ->
-            Len = byte_size(Bin1) + byte_size(Bin2),
-            get_data(
-              DistHandle, Front, Size + 4 + Len,
-              [Bin2, Bin1, <<Len:32>>|Rear]);
-        Iovec ->
-            Len = iolist_size(Iovec),
-            get_data(
-              DistHandle, Front, Size + 4 + Len,
-              lists:reverse(Iovec, [<<Len:32>>|Rear]))
-    end.
-
 %% De-packet and deliver received data to the VM from a queue
 %%
 deliver_data(DistHandle, Q) ->
@@ -1434,8 +1447,8 @@ deliver_data(DistHandle, Front, Size, Rear, Bin) ->
 
 encrypt_and_send_chunk(
   #params{
-     socket = Socket, rekey_count = Seq, rekey_msg = RekeyMsg} = Params,
-  Seq, Cleartext) ->
+     socket = Socket, rekey_count = RekeyCount, rekey_msg = RekeyMsg} = Params,
+  Seq, Cleartext) when Seq =:= RekeyCount ->
     %%
     cancel_rekey_timer(RekeyMsg),
     case encrypt_and_send_rekey_chunk(Params, Seq) of
-- 
2.35.3

openSUSE Build Service is sponsored by