File 2621-Use-dedicated-ETS-table-for-clients.patch of Package erlang

From 1f53e96cad14d578e24df4a873b74800ca94b4ce Mon Sep 17 00:00:00 2001
From: Raimo Niskanen <raimo@erlang.org>
Date: Tue, 17 Oct 2023 14:43:57 +0200
Subject: [PATCH 1/3] Use dedicated ETS table for clients

---
 lib/kernel/src/inet_gethost_native.erl | 304 ++++++++++++-------------
 1 file changed, 147 insertions(+), 157 deletions(-)

diff --git a/lib/kernel/src/inet_gethost_native.erl b/lib/kernel/src/inet_gethost_native.erl
index 328cb01e05..3993450956 100644
--- a/lib/kernel/src/inet_gethost_native.erl
+++ b/lib/kernel/src/inet_gethost_native.erl
@@ -1,7 +1,7 @@
 %%
 %% %CopyrightBegin%
 %%
-%% Copyright Ericsson AB 1998-2022. All Rights Reserved.
+%% Copyright Ericsson AB 1998-2023. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -75,16 +75,6 @@
 %% meant to be used in guards, check one such octet.
 -define(VALID_V4(Part), is_integer(Part), Part < 256).
 
-% Requests, one per unbique request to the PORT program, may be more than one client!!!
--record(request, {
-	  rid, % Request id as sent to port
-	  op,
-	  proto,
-	  rdata,
-	  clients = [] % Can be more than one client per request (Pid's). 
-}).
-
-
 % Statistics, not used yet.
 -record(statistics, {
 	  netdb_timeout = 0,
@@ -98,15 +88,32 @@
 }).
 
 % The main loopstate...
--record(state, {
-	  port = noport, % Port() connected to the port program
-	  timeout = 8000, % Timeout value from inet_db:res_option
-	  requests, % Table of request
-	  req_index, % Table of {{op,proto,rdata},rid}
-	  parent,    % The supervisor bridge
-	  pool_size = 4, % Number of C processes in pool.
-	  statistics % Statistics record (records error causes).
-}).
+-record(
+   state,
+   {
+    port = noport, % Port() connected to the port program
+    timeout = 8000, % Timeout value from inet_db:res_option
+    %%
+    %% One per unique request to the PORT program.
+    %% Clients are registered in req_clients, multiple per RID.
+    %% ETS set of {RID,{Op,Proto,Data}=OPD}
+    requests,
+    %%
+    %% One per request as the above,
+    %% but for reverse lookup to find duplicate requests.
+    %% ETS set of {{Op,Proto,Data}=OPD,RID}
+    req_index,
+    %%
+    %% One per requesting client for RID.
+    %% When the request succeeds we can take all clients with key RID.
+    %% When a request times out we can remove just that object from the bag.
+    %% ETS bag of {RID,ClientPid,ClientRef,TimerRef}
+    req_clients,
+    %%
+    parent,    % The supervisor bridge
+    pool_size = 4, % Number of C processes in pool.
+    statistics % Statistics record (records error causes).
+   }).
 -type state() :: #state{}.
 
 %% The supervisor bridge code
@@ -168,7 +175,7 @@ run_once() ->
 %% Server API
 %%-----------------------------------------------------------------------
 server_init(Starter, Ref) ->
-    process_flag(trap_exit,true),
+    _ = process_flag(trap_exit,true),
     case whereis(?MODULE) of
 	undefined ->
 	    case (catch register(?MODULE,self())) of
@@ -180,15 +187,19 @@ server_init(Starter, Ref) ->
 	Winner ->
 	   exit({already_started,Winner})
     end,
+    _ = process_flag(message_queue_data, off_heap),
     Poolsize = get_poolsize(),
     Port = do_open_port(Poolsize, get_extra_args()),
     Timeout = ?REQUEST_TIMEOUT,
     put(rid,0),
     put(num_requests,0),
-    RequestTab = ets:new(ign_requests,[{keypos,#request.rid},set,protected]),
+    RequestTab = ets:new(ign_requests,[set,protected]),
     RequestIndex = ets:new(ign_req_index,[set,protected]),
-    State = #state{port = Port, timeout = Timeout, requests = RequestTab,
-		   req_index = RequestIndex, 
+    RequestClients = ets:new(ign_req_clients, [bag,protected]),
+    State = #state{port = Port, timeout = Timeout,
+                   requests = RequestTab,
+		   req_index = RequestIndex,
+                   req_clients = RequestClients,
 		   pool_size = Poolsize,
 		   statistics = #statistics{},
 		   parent = Starter},
@@ -196,27 +207,27 @@ server_init(Starter, Ref) ->
 
 main_loop(State) ->
     receive
-	Any -> 
+	Any ->
 	    handle_message(Any,State)
     end.
 
-handle_message({{Pid,_} = Client, {?OP_GETHOSTBYNAME, Proto, Name} = R}, 
-	       State) when is_pid(Pid) ->
-    NewState = do_handle_call(R,Client,State,
-			      [<<?OP_GETHOSTBYNAME:8, Proto:8>>, Name,0]),
-    main_loop(NewState);
+handle_message({{Pid,Ref}, {?OP_GETHOSTBYNAME, Proto, Name} = R}, State)
+  when is_pid(Pid) ->
+    do_handle_call(
+      R, Pid, Ref, [<<?OP_GETHOSTBYNAME:8, Proto:8>>, Name,0], State),
+    main_loop(State);
 
-handle_message({{Pid,_} = Client, {?OP_GETHOSTBYADDR, Proto, Data} = R}, 
-	       State) when is_pid(Pid) ->
-    NewState = do_handle_call(R,Client,State,
-			      <<?OP_GETHOSTBYADDR:8, Proto:8, Data/binary>>),
-    main_loop(NewState);
+handle_message({{Pid,Ref}, {?OP_GETHOSTBYADDR, Proto, Data} = R}, State)
+  when is_pid(Pid) ->
+    do_handle_call(
+      R, Pid, Ref, <<?OP_GETHOSTBYADDR:8, Proto:8, Data/binary>>, State),
+    main_loop(State);
 
 handle_message({{Pid,Ref}, {?OP_CONTROL, Ctl, Data}}, State)
   when is_pid(Pid) ->
-    catch port_command(State#state.port, 
-		       <<?INVALID_SERIAL:32, ?OP_CONTROL:8, 
-			Ctl:8, Data/binary>>),
+    _ = catch port_command(
+                State#state.port,
+                <<?INVALID_SERIAL:32, ?OP_CONTROL:8, Ctl:8, Data/binary>>),
     Pid ! {Ref, ok},
     main_loop(State);
 
@@ -227,37 +238,44 @@ handle_message({{Pid,Ref}, restart_port}, State)
     main_loop(State#state{port=NewPort});
 
 handle_message({Port, {data, Data}}, State = #state{port = Port}) ->
-    NewState = case Data of
-		   <<RID:32, BinReply/binary>> ->
-		       case BinReply of
-			   <<Unit, _/binary>> when Unit =:= ?UNIT_ERROR;
-						   Unit =:= ?UNIT_IPV4;
-						   Unit =:= ?UNIT_IPV6 ->
-			       case pick_request(State,RID) of
-				   false ->
-				       State;
-				   Req ->
-				       lists:foreach(fun({P,R,TR}) ->
-							     _= ?CANCEL_TIMER(TR),
-							     P ! {R,
-								  {ok,
-								   BinReply}}
-						     end,
-						     Req#request.clients),
-				       State
-			       end;
-			   _UnitError ->
-			       %% Unexpected data, let's restart it, 
-			       %% it must be broken.
-			       NewPort=restart_port(State),
-			       State#state{port=NewPort}
-		       end;
-		   _BasicFormatError ->
-		       NewPort=restart_port(State),
-		       State#state{port=NewPort}
-	       end,    
+    NewState =
+        case Data of
+            <<RID:32, BinReply/binary>> ->
+                case BinReply of
+                    <<Unit, _/binary>>
+                      when Unit =:= ?UNIT_ERROR;
+                           Unit =:= ?UNIT_IPV4;
+                           Unit =:= ?UNIT_IPV6 ->
+                        case ets:lookup(State#state.requests, RID) of
+                            [] ->
+                                %% We must have cancelled this request
+                                State;
+                            [{_,OPD}] ->
+                                %% Clean up the request and reply to clients
+                                ets:delete(State#state.requests, RID),
+                                ets:delete(State#state.req_index, OPD),
+                                lists:foreach(
+                                  fun ({_,ClientPid,ClientRef,TimerRef}) ->
+                                          _ = ?CANCEL_TIMER(TimerRef),
+                                          ClientPid !
+                                              {ClientRef,{ok,BinReply}}
+                                  end,
+                                  ets:take(State#state.req_clients, RID)),
+                                put(num_requests,get(num_requests) - 1),
+                                State
+                        end;
+                    _UnitError ->
+                        %% Unexpected data, let's restart it,
+                        %% it must be broken.
+                        NewPort = restart_port(State),
+                        State#state{port=NewPort}
+                end;
+            _BasicFormatError ->
+                NewPort = restart_port(State),
+                State#state{port=NewPort}
+        end,
     main_loop(NewState);
-	
+
 handle_message({'EXIT',Port,_Reason}, State = #state{port = Port}) -> 
     ?dbg("Port died.~n",[]),
     NewPort=restart_port(State),
@@ -268,92 +286,61 @@ handle_message({Port,eof}, State = #state{port = Port}) ->
     NewPort=restart_port(State),
     main_loop(State#state{port=NewPort});
 
-handle_message({timeout, Pid, RID}, State) ->
-    case pick_client(State,RID,Pid) of
-	false ->
-	    false;
-	{more, {P,R,_}} ->
-	    P ! {R,{error,timeout}};
-	{last, {LP,LR,_}} ->
-	    LP ! {LR, {error,timeout}},
-	    %% Remove the whole request structure...
-	    _ = pick_request(State, RID),
-	    %% Also cancel the request to the port program...
-	    (catch port_command(State#state.port, 
-				<<RID:32,?OP_CANCEL_REQUEST>>))
+handle_message({timeout,RID,ClientPid,ClientRef}, State) ->
+    ClientReqMS = {RID,ClientPid,ClientRef,'_'},
+    case ets:match_object(State#state.req_clients, ClientReqMS) of
+        [ClientReq] ->
+            ets:delete_object(State#state.req_clients, ClientReq),
+            ClientPid ! {ClientRef,{error,timeout}},
+            case ets:member(State#state.req_clients, RID) of
+                true ->
+                    %% There are still waiting clients
+                    ok;
+                false ->
+                    %% The last client timed out - cancel the request
+                    case ets:lookup(State#state.requests, RID) of
+                        [{_,OPD}] ->
+                            ets:delete(State#state.requests,RID),
+                            ets:delete(State#state.req_index,OPD),
+                            put(num_requests,get(num_requests) - 1),
+                            %% Also cancel the request to the port program...
+                            _ = catch port_command(
+                                        State#state.port,
+                                        <<RID:32,?OP_CANCEL_REQUEST>>),
+                            ok;
+                        [] ->
+                            ok
+                    end
+            end;
+        [] ->
+            ok
     end,
     main_loop(State);
 
 handle_message({system, From, Req}, State) ->
-	    sys:handle_system_msg(Req, From, State#state.parent, ?MODULE, [], 
-				  State);
+    sys:handle_system_msg(
+      Req, From, State#state.parent, ?MODULE, [], State);
 
 handle_message(_, State) -> % Stray messages from dying ports etc.
     main_loop(State).
 
 
-do_handle_call(R,Client0,State,RData) ->
-    Req = find_request(State,R),
-    Timeout = State#state.timeout,
-    {P,Ref} = Client0,
-    TR = ?SEND_AFTER(Timeout,self(),{timeout, P, Req#request.rid}),
-    Client = {P,Ref,TR},
-    case Req#request.clients of
-	[] ->
-	    RealRData = [<<(Req#request.rid):32>>|RData],
-	    (catch port_command(State#state.port, RealRData)),
-	    ets:insert(State#state.requests,Req#request{clients = [Client]});
-	Tail ->
-	    ets:insert(State#state.requests,Req#request{clients = [Client | Tail]})
+do_handle_call(OPD, ClientPid, ClientRef, RData, State) ->
+    case ets:lookup(State#state.req_index, OPD) of
+        [{_,RID}] ->
+            ok;
+        [] ->
+            RID = get_rid(),
+            _ = catch port_command(State#state.port, [<<RID:32>>|RData]),
+            ets:insert(State#state.requests, {RID,OPD}),
+            ets:insert(State#state.req_index, {OPD,RID})
     end,
-    State.
-
-find_request(State, R = {Op, Proto, Data}) ->
-    case ets:lookup(State#state.req_index,R) of
-	[{R, Rid}] ->
-	    [Ret] = ets:lookup(State#state.requests,Rid),
-	    Ret;
-	[] ->
-	    NRid = get_rid(),
-	    Req = #request{rid = NRid, op = Op, proto = Proto, rdata = Data},
-	    ets:insert(State#state.requests, Req),
-	    ets:insert(State#state.req_index,{R,NRid}),
-	    put(num_requests,get(num_requests) + 1),
-	    Req
-    end.
-    
-pick_request(State, RID) ->
-    case ets:lookup(State#state.requests, RID) of
-	[] ->
-	    false;
-	[#request{rid = RID, op = Op, proto = Proto, rdata = Data}=R] ->
-	    ets:delete(State#state.requests,RID),
-	    ets:delete(State#state.req_index,{Op,Proto,Data}),
-	    put(num_requests,get(num_requests) - 1),
-	    R
-    end.
+    TimerMsg = {timeout,RID,ClientPid,ClientRef},
+    TimerRef = ?SEND_AFTER(State#state.timeout, self(), TimerMsg),
+    ClientReq = {RID,ClientPid,ClientRef,TimerRef},
+    ets:insert(State#state.req_clients, ClientReq),
+    ok.
 
-pick_client(State,RID,Clid) ->
-    case ets:lookup(State#state.requests, RID) of
-	[] ->
-	    false;
-	[R] ->
-	    case R#request.clients of
-		[SoleClient] ->
-		    {last, SoleClient}; % Note, not removed, the caller 
-					% should cleanup request data
-		CList ->
-		    case lists:keyfind(Clid,1,CList) of
-			false ->
-			    false;
-			Client ->
-			    NCList = lists:keydelete(Clid,1,CList),
-			    ets:insert(State#state.requests, 
-				       R#request{clients = NCList}),
-			    {more, Client}
-		    end
-	    end
-    end.
 
 get_rid () ->
     New = (get(rid) + 1) rem 16#7FFFFFF,
@@ -372,21 +359,24 @@ foreach(Fun,Table,Key) ->
     foreach(Fun,Table,ets:next(Table,Key)).
 
 restart_port(#state{port = Port, requests = Requests}) ->
-    (catch port_close(Port)),
+    _ = catch port_close(Port),
     NewPort = do_open_port(get_poolsize(), get_extra_args()),
-    foreach(fun(#request{rid = Rid, op = Op, proto = Proto, rdata = Rdata}) ->
-		    case Op of 
-			?OP_GETHOSTBYNAME ->
-			    port_command(NewPort,[<<Rid:32,?OP_GETHOSTBYNAME:8,
-						  Proto:8>>,
-						  Rdata,0]);
-			?OP_GETHOSTBYADDR ->
-			    port_command(NewPort,
-					 <<Rid:32,?OP_GETHOSTBYADDR:8, Proto:8,
-					 Rdata/binary>>)
-		    end
-	    end,
-	    Requests),
+    %%
+    %% Redo all requests on the new port
+    foreach(
+      fun ({RID,{Op,Proto,Rdata}}) ->
+              case Op of
+                  ?OP_GETHOSTBYNAME ->
+                      port_command(
+                        NewPort,
+                        [<<RID:32,?OP_GETHOSTBYNAME:8,Proto:8>>, Rdata, 0]);
+                  ?OP_GETHOSTBYADDR ->
+                      port_command(
+                        NewPort,
+                        <<RID:32,?OP_GETHOSTBYADDR:8,Proto:8,Rdata/binary>>)
+              end
+      end,
+      Requests),
     NewPort.
 
 -dialyzer({no_improper_lists, do_open_port/2}).
-- 
2.35.3

openSUSE Build Service is sponsored by