File 7043-Move-time-out-to-clients.patch of Package erlang

From e517d17b06fafe5ab781ab6f3043ef5382443442 Mon Sep 17 00:00:00 2001
From: Raimo Niskanen <raimo@erlang.org>
Date: Thu, 19 Oct 2023 20:38:17 +0200
Subject: [PATCH 3/3] Move time-out to clients

Reduces the time-out message passing during high load, since
the server no longer acts as a proxy for client time-out messages.

There is still one timer per request towards the port driver,
instead of one per requesting client, and the clients use
receive...after.
---
 lib/kernel/src/inet_gethost_native.erl | 129 ++++++++++++++++---------
 1 file changed, 86 insertions(+), 43 deletions(-)

diff --git a/lib/kernel/src/inet_gethost_native.erl b/lib/kernel/src/inet_gethost_native.erl
index 572785cea4..8299d83933 100644
--- a/lib/kernel/src/inet_gethost_native.erl
+++ b/lib/kernel/src/inet_gethost_native.erl
@@ -65,9 +65,6 @@
 -define(dbg(A,B), noop).
 -endif.
 
--define(SEND_AFTER(A,B,C),erlang:send_after(A,B,C)).
--define(CANCEL_TIMER(A),erlang:cancel_timer(A)).
-
 %% In erlang, IPV6 addresses are built as 8-tuples of 16bit values (not 16-tuples of octets).
 %% This macro, meant to be used in guards checks one such 16bit value in the 8-tuple.
 -define(VALID_V6(Part), is_integer(Part), Part < 65536).
@@ -87,6 +84,17 @@
 	  no_data = 0
 }).
 
+-record(request,
+        {rid,
+         req, % {Op,Proto,Data}
+         timer_ref,
+         %%
+         %% Timestamp in monotonic time native units,
+         %% for latest piggy backed client request, used to
+         %% restart server time-out matching latest client time-out
+         req_ts = undefined
+        }).
+
 % The main loopstate...
 -record(
    state,
@@ -152,6 +160,7 @@ terminate(_Reason, Pid) ->
 run_once() ->
     Port = do_open_port(get_poolsize(), get_extra_args()),
     Timeout = ?REQUEST_TIMEOUT,
+    persistent_term:put({?MODULE,timeout}, Timeout),
     RID = 1,
     {ClientHandle, Request} =
 	receive
@@ -194,9 +203,11 @@ server_init(Starter, Ref) ->
     Poolsize = get_poolsize(),
     Port = do_open_port(Poolsize, get_extra_args()),
     Timeout = ?REQUEST_TIMEOUT,
+    persistent_term:put({?MODULE,timeout}, Timeout),
     put(rid,0),
     put(num_requests,0),
-    RequestTab = ets:new(ign_requests,[set,protected]),
+    RequestTab =
+        ets:new(ign_requests,[set,protected,{keypos,#request.rid}]),
     RequestIndex = ets:new(ign_req_index,[set,protected]),
     RequestClients = ets:new(ign_req_clients, [bag,protected]),
     State = #state{port = Port, timeout = Timeout,
@@ -249,17 +260,18 @@ handle_message({Port, {data, Data}}, State = #state{port = Port}) ->
                       when Unit =:= ?UNIT_ERROR;
                            Unit =:= ?UNIT_IPV4;
                            Unit =:= ?UNIT_IPV6 ->
-                        case ets:lookup(State#state.requests, RID) of
+                        case ets:take(State#state.requests, RID) of
                             [] ->
                                 %% We must have cancelled this request
                                 State;
-                            [{_,Req}] ->
+                            [#request{timer_ref = TimerRef, req = Req}] ->
+                                _ = erlang:cancel_timer(
+                                      TimerRef,
+                                      [{async,true},{info,false}]),
                                 %% Clean up the request and reply to clients
-                                ets:delete(State#state.requests, RID),
                                 ets:delete(State#state.req_index, Req),
                                 lists:foreach(
-                                  fun ({_,ClientHandle,TimerRef}) ->
-                                          _ = ?CANCEL_TIMER(TimerRef),
+                                  fun ({_,ClientHandle}) ->
                                           ClientHandle !
                                               {ClientHandle, {ok,BinReply}}
                                   end,
@@ -289,33 +301,36 @@ handle_message({Port,eof}, State = #state{port = Port}) ->
     NewPort=restart_port(State),
     main_loop(State#state{port=NewPort});
 
-handle_message({timeout,RID,ClientHandle}, State) ->
-    ClientReqMS = {RID,ClientHandle,'_'},
-    case ets:match_object(State#state.req_clients, ClientReqMS) of
-        [ClientReq] ->
-            ets:delete_object(State#state.req_clients, ClientReq),
-            ClientHandle ! {ClientHandle, {error,timeout}},
-            case ets:member(State#state.req_clients, RID) of
-                true ->
-                    %% There are still waiting clients
-                    ok;
-                false ->
-                    %% The last client timed out - cancel the request
-                    case ets:lookup(State#state.requests, RID) of
-                        [{_,Req}] ->
-                            ets:delete(State#state.requests,RID),
-                            ets:delete(State#state.req_index,Req),
-                            put(num_requests,get(num_requests) - 1),
-                            %% Also cancel the request to the port program...
-                            _ = catch port_command(
-                                        State#state.port,
-                                        <<RID:32,?OP_CANCEL_REQUEST>>),
-                            ok;
-                        [] ->
-                            ok
-                    end
-            end;
+handle_message({timeout,TimerRef,RID}, State) ->
+    case ets:lookup(State#state.requests, RID) of
         [] ->
+            ok;
+        [#request{timer_ref = TimerRef, req = Req, req_ts = undefined}] ->
+            %% Cancel the request, let the clients do their own time-out
+            ets:delete(State#state.requests, RID),
+            ets:delete(State#state.req_index, Req),
+            ets:delete(State#state.req_clients, RID),
+            put(num_requests,get(num_requests) - 1),
+            %% Also cancel the request to the port program...
+            _ = catch port_command(
+                        State#state.port,
+                        <<RID:32,?OP_CANCEL_REQUEST>>),
+            ok;
+        [#request{timer_ref = TimerRef, req_ts = ReqTs}] ->
+            %% We may have more clients, restart the timer
+            TimeoutTime =
+                erlang:convert_time_unit(ReqTs, native, millisecond)
+                + State#state.timeout,
+            NewTimerRef =
+                erlang:start_timer(TimeoutTime, self(), RID, [{abs,true}]),
+            true =
+                ets:update_element(
+                  State#state.requests, RID,
+                  [{#request.timer_ref,NewTimerRef},
+                   {#request.req_ts,undefined}]),
+            ok;
+        [#request{}] ->
+            %% Stale timer_ref - ignore
             ok
     end,
     main_loop(State);
@@ -331,17 +346,22 @@ handle_message(_, State) -> % Stray messages from dying ports etc.
 do_handle_call(ClientHandle, Req, RData, State) ->
     case ets:lookup(State#state.req_index, Req) of
         [{_,RID}] ->
+            true =
+                ets:update_element(
+                  State#state.requests, RID,
+                  {#request.req_ts,erlang:monotonic_time()}),
             ok;
         [] ->
             RID = get_rid(),
             _ = catch port_command(State#state.port, [<<RID:32>>|RData]),
-            ets:insert(State#state.requests, {RID,Req}),
+            Timeout = State#state.timeout,
+            TimerRef = erlang:start_timer(Timeout, self(), RID),
+            ets:insert(
+              State#state.requests,
+              #request{rid = RID, req = Req, timer_ref = TimerRef}),
             ets:insert(State#state.req_index, {Req,RID})
     end,
-    TimerMsg = {timeout,RID,ClientHandle},
-    TimerRef = ?SEND_AFTER(State#state.timeout, self(), TimerMsg),
-    ClientReq = {RID,ClientHandle,TimerRef},
-    ets:insert(State#state.req_clients, ClientReq),
+    ets:insert(State#state.req_clients, {RID,ClientHandle}),
     ok.
 
 
@@ -367,7 +387,7 @@ restart_port(#state{port = Port, requests = Requests}) ->
     %%
     %% Redo all requests on the new port
     foreach(
-      fun ({RID,{Op,Proto,Rdata}}) ->
+      fun (#request{rid = RID, req = {Op,Proto,Rdata}}) ->
               case Op of
                   ?OP_GETHOSTBYNAME ->
                       port_command(
@@ -483,22 +503,45 @@ control(soft_restart) ->
     getit(restart_port, undefined);
 control(_) -> {error, formerr}.
 
+
 getit(Op, Proto, Data, DefaultName) ->
     getit({Op, Proto, Data}, DefaultName).
 
 getit(Req, DefaultName) ->
     Pid = ensure_started(),
+    DefaultTimeout = (#state{})#state.timeout,
+    Timeout = persistent_term:get({?MODULE,timeout}, DefaultTimeout),
+    case call(Pid, Req, Timeout) of
+        {ok, BinHostent} ->
+            parse_address(BinHostent, DefaultName);
+        ok ->
+            ok;
+        {error, _} = Result->
+            Result
+    end.
+
+call(Pid, Req, Timeout) ->
     ReqHandle = monitor(process, Pid, [{alias,reply_demonitor}]),
     Pid ! {ReqHandle, Req},
+    wait_reply(ReqHandle, Timeout).
+
+wait_reply(ReqHandle, Timeout) ->
     receive
-        {ReqHandle, {ok,BinHostent}} ->
-            parse_address(BinHostent, DefaultName);
         {ReqHandle, Result} ->
             Result;
         {'DOWN', ReqHandle, process, _, Reason} ->
             {error, Reason}
+    after Timeout ->
+           case unalias(ReqHandle) of
+                true ->
+                   erlang:demonitor(ReqHandle, [flush]),
+                    {error, timeout};
+                false ->
+                   wait_reply(ReqHandle, infinity)
+           end
     end.
 
+
 ensure_started() ->
     case whereis(?MODULE) of
 	undefined ->
-- 
2.35.3

openSUSE Build Service is sponsored by