File 2621-Use-dedicated-ETS-table-for-clients.patch of Package erlang
From 1f53e96cad14d578e24df4a873b74800ca94b4ce Mon Sep 17 00:00:00 2001
From: Raimo Niskanen <raimo@erlang.org>
Date: Tue, 17 Oct 2023 14:43:57 +0200
Subject: [PATCH 1/3] Use dedicated ETS table for clients
---
lib/kernel/src/inet_gethost_native.erl | 304 ++++++++++++-------------
1 file changed, 147 insertions(+), 157 deletions(-)
diff --git a/lib/kernel/src/inet_gethost_native.erl b/lib/kernel/src/inet_gethost_native.erl
index 328cb01e05..3993450956 100644
--- a/lib/kernel/src/inet_gethost_native.erl
+++ b/lib/kernel/src/inet_gethost_native.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 1998-2022. All Rights Reserved.
+%% Copyright Ericsson AB 1998-2023. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -75,16 +75,6 @@
%% meant to be used in guards, check one such octet.
-define(VALID_V4(Part), is_integer(Part), Part < 256).
-% Requests, one per unbique request to the PORT program, may be more than one client!!!
--record(request, {
- rid, % Request id as sent to port
- op,
- proto,
- rdata,
- clients = [] % Can be more than one client per request (Pid's).
-}).
-
-
% Statistics, not used yet.
-record(statistics, {
netdb_timeout = 0,
@@ -98,15 +88,32 @@
}).
% The main loopstate...
--record(state, {
- port = noport, % Port() connected to the port program
- timeout = 8000, % Timeout value from inet_db:res_option
- requests, % Table of request
- req_index, % Table of {{op,proto,rdata},rid}
- parent, % The supervisor bridge
- pool_size = 4, % Number of C processes in pool.
- statistics % Statistics record (records error causes).
-}).
+-record(
+ state,
+ {
+ port = noport, % Port() connected to the port program
+ timeout = 8000, % Timeout value from inet_db:res_option
+ %%
+ %% One per unique request to the PORT program.
+ %% Clients are registered in req_clients, multiple per RID.
+ %% ETS set of {RID,{Op,Proto,Data}=OPD}
+ requests,
+ %%
+ %% One per request as the above,
+ %% but for reverse lookup to find duplicate requests.
+ %% ETS set of {{Op,Proto,Data}=OPD,RID}
+ req_index,
+ %%
+ %% One per requesting client for RID.
+ %% When the request succeeds we can take all clients with key RID.
+ %% When a request times out we can remove just that object from the bag.
+ %% ETS bag of {RID,ClientPid,ClientRef,TimerRef}
+ req_clients,
+ %%
+ parent, % The supervisor bridge
+ pool_size = 4, % Number of C processes in pool.
+ statistics % Statistics record (records error causes).
+ }).
-type state() :: #state{}.
%% The supervisor bridge code
@@ -168,7 +175,7 @@ run_once() ->
%% Server API
%%-----------------------------------------------------------------------
server_init(Starter, Ref) ->
- process_flag(trap_exit,true),
+ _ = process_flag(trap_exit,true),
case whereis(?MODULE) of
undefined ->
case (catch register(?MODULE,self())) of
@@ -180,15 +187,19 @@ server_init(Starter, Ref) ->
Winner ->
exit({already_started,Winner})
end,
+ _ = process_flag(message_queue_data, off_heap),
Poolsize = get_poolsize(),
Port = do_open_port(Poolsize, get_extra_args()),
Timeout = ?REQUEST_TIMEOUT,
put(rid,0),
put(num_requests,0),
- RequestTab = ets:new(ign_requests,[{keypos,#request.rid},set,protected]),
+ RequestTab = ets:new(ign_requests,[set,protected]),
RequestIndex = ets:new(ign_req_index,[set,protected]),
- State = #state{port = Port, timeout = Timeout, requests = RequestTab,
- req_index = RequestIndex,
+ RequestClients = ets:new(ign_req_clients, [bag,protected]),
+ State = #state{port = Port, timeout = Timeout,
+ requests = RequestTab,
+ req_index = RequestIndex,
+ req_clients = RequestClients,
pool_size = Poolsize,
statistics = #statistics{},
parent = Starter},
@@ -196,27 +207,27 @@ server_init(Starter, Ref) ->
main_loop(State) ->
receive
- Any ->
+ Any ->
handle_message(Any,State)
end.
-handle_message({{Pid,_} = Client, {?OP_GETHOSTBYNAME, Proto, Name} = R},
- State) when is_pid(Pid) ->
- NewState = do_handle_call(R,Client,State,
- [<<?OP_GETHOSTBYNAME:8, Proto:8>>, Name,0]),
- main_loop(NewState);
+handle_message({{Pid,Ref}, {?OP_GETHOSTBYNAME, Proto, Name} = R}, State)
+ when is_pid(Pid) ->
+ do_handle_call(
+ R, Pid, Ref, [<<?OP_GETHOSTBYNAME:8, Proto:8>>, Name,0], State),
+ main_loop(State);
-handle_message({{Pid,_} = Client, {?OP_GETHOSTBYADDR, Proto, Data} = R},
- State) when is_pid(Pid) ->
- NewState = do_handle_call(R,Client,State,
- <<?OP_GETHOSTBYADDR:8, Proto:8, Data/binary>>),
- main_loop(NewState);
+handle_message({{Pid,Ref}, {?OP_GETHOSTBYADDR, Proto, Data} = R}, State)
+ when is_pid(Pid) ->
+ do_handle_call(
+ R, Pid, Ref, <<?OP_GETHOSTBYADDR:8, Proto:8, Data/binary>>, State),
+ main_loop(State);
handle_message({{Pid,Ref}, {?OP_CONTROL, Ctl, Data}}, State)
when is_pid(Pid) ->
- catch port_command(State#state.port,
- <<?INVALID_SERIAL:32, ?OP_CONTROL:8,
- Ctl:8, Data/binary>>),
+ _ = catch port_command(
+ State#state.port,
+ <<?INVALID_SERIAL:32, ?OP_CONTROL:8, Ctl:8, Data/binary>>),
Pid ! {Ref, ok},
main_loop(State);
@@ -227,37 +238,44 @@ handle_message({{Pid,Ref}, restart_port}, State)
main_loop(State#state{port=NewPort});
handle_message({Port, {data, Data}}, State = #state{port = Port}) ->
- NewState = case Data of
- <<RID:32, BinReply/binary>> ->
- case BinReply of
- <<Unit, _/binary>> when Unit =:= ?UNIT_ERROR;
- Unit =:= ?UNIT_IPV4;
- Unit =:= ?UNIT_IPV6 ->
- case pick_request(State,RID) of
- false ->
- State;
- Req ->
- lists:foreach(fun({P,R,TR}) ->
- _= ?CANCEL_TIMER(TR),
- P ! {R,
- {ok,
- BinReply}}
- end,
- Req#request.clients),
- State
- end;
- _UnitError ->
- %% Unexpected data, let's restart it,
- %% it must be broken.
- NewPort=restart_port(State),
- State#state{port=NewPort}
- end;
- _BasicFormatError ->
- NewPort=restart_port(State),
- State#state{port=NewPort}
- end,
+ NewState =
+ case Data of
+ <<RID:32, BinReply/binary>> ->
+ case BinReply of
+ <<Unit, _/binary>>
+ when Unit =:= ?UNIT_ERROR;
+ Unit =:= ?UNIT_IPV4;
+ Unit =:= ?UNIT_IPV6 ->
+ case ets:lookup(State#state.requests, RID) of
+ [] ->
+ %% We must have cancelled this request
+ State;
+ [{_,OPD}] ->
+ %% Clean up the request and reply to clients
+ ets:delete(State#state.requests, RID),
+ ets:delete(State#state.req_index, OPD),
+ lists:foreach(
+ fun ({_,ClientPid,ClientRef,TimerRef}) ->
+ _ = ?CANCEL_TIMER(TimerRef),
+ ClientPid !
+ {ClientRef,{ok,BinReply}}
+ end,
+ ets:take(State#state.req_clients, RID)),
+ put(num_requests,get(num_requests) - 1),
+ State
+ end;
+ _UnitError ->
+ %% Unexpected data, let's restart it,
+ %% it must be broken.
+ NewPort = restart_port(State),
+ State#state{port=NewPort}
+ end;
+ _BasicFormatError ->
+ NewPort = restart_port(State),
+ State#state{port=NewPort}
+ end,
main_loop(NewState);
-
+
handle_message({'EXIT',Port,_Reason}, State = #state{port = Port}) ->
?dbg("Port died.~n",[]),
NewPort=restart_port(State),
@@ -268,92 +286,61 @@ handle_message({Port,eof}, State = #state{port = Port}) ->
NewPort=restart_port(State),
main_loop(State#state{port=NewPort});
-handle_message({timeout, Pid, RID}, State) ->
- case pick_client(State,RID,Pid) of
- false ->
- false;
- {more, {P,R,_}} ->
- P ! {R,{error,timeout}};
- {last, {LP,LR,_}} ->
- LP ! {LR, {error,timeout}},
- %% Remove the whole request structure...
- _ = pick_request(State, RID),
- %% Also cancel the request to the port program...
- (catch port_command(State#state.port,
- <<RID:32,?OP_CANCEL_REQUEST>>))
+handle_message({timeout,RID,ClientPid,ClientRef}, State) ->
+ ClientReqMS = {RID,ClientPid,ClientRef,'_'},
+ case ets:match_object(State#state.req_clients, ClientReqMS) of
+ [ClientReq] ->
+ ets:delete_object(State#state.req_clients, ClientReq),
+ ClientPid ! {ClientRef,{error,timeout}},
+ case ets:member(State#state.req_clients, RID) of
+ true ->
+ %% There are still waiting clients
+ ok;
+ false ->
+ %% The last client timed out - cancel the request
+ case ets:lookup(State#state.requests, RID) of
+ [{_,OPD}] ->
+ ets:delete(State#state.requests,RID),
+ ets:delete(State#state.req_index,OPD),
+ put(num_requests,get(num_requests) - 1),
+ %% Also cancel the request to the port program...
+ _ = catch port_command(
+ State#state.port,
+ <<RID:32,?OP_CANCEL_REQUEST>>),
+ ok;
+ [] ->
+ ok
+ end
+ end;
+ [] ->
+ ok
end,
main_loop(State);
handle_message({system, From, Req}, State) ->
- sys:handle_system_msg(Req, From, State#state.parent, ?MODULE, [],
- State);
+ sys:handle_system_msg(
+ Req, From, State#state.parent, ?MODULE, [], State);
handle_message(_, State) -> % Stray messages from dying ports etc.
main_loop(State).
-do_handle_call(R,Client0,State,RData) ->
- Req = find_request(State,R),
- Timeout = State#state.timeout,
- {P,Ref} = Client0,
- TR = ?SEND_AFTER(Timeout,self(),{timeout, P, Req#request.rid}),
- Client = {P,Ref,TR},
- case Req#request.clients of
- [] ->
- RealRData = [<<(Req#request.rid):32>>|RData],
- (catch port_command(State#state.port, RealRData)),
- ets:insert(State#state.requests,Req#request{clients = [Client]});
- Tail ->
- ets:insert(State#state.requests,Req#request{clients = [Client | Tail]})
+do_handle_call(OPD, ClientPid, ClientRef, RData, State) ->
+ case ets:lookup(State#state.req_index, OPD) of
+ [{_,RID}] ->
+ ok;
+ [] ->
+ RID = get_rid(),
+ _ = catch port_command(State#state.port, [<<RID:32>>|RData]),
+ ets:insert(State#state.requests, {RID,OPD}),
+ ets:insert(State#state.req_index, {OPD,RID})
end,
- State.
-
-find_request(State, R = {Op, Proto, Data}) ->
- case ets:lookup(State#state.req_index,R) of
- [{R, Rid}] ->
- [Ret] = ets:lookup(State#state.requests,Rid),
- Ret;
- [] ->
- NRid = get_rid(),
- Req = #request{rid = NRid, op = Op, proto = Proto, rdata = Data},
- ets:insert(State#state.requests, Req),
- ets:insert(State#state.req_index,{R,NRid}),
- put(num_requests,get(num_requests) + 1),
- Req
- end.
-
-pick_request(State, RID) ->
- case ets:lookup(State#state.requests, RID) of
- [] ->
- false;
- [#request{rid = RID, op = Op, proto = Proto, rdata = Data}=R] ->
- ets:delete(State#state.requests,RID),
- ets:delete(State#state.req_index,{Op,Proto,Data}),
- put(num_requests,get(num_requests) - 1),
- R
- end.
+ TimerMsg = {timeout,RID,ClientPid,ClientRef},
+ TimerRef = ?SEND_AFTER(State#state.timeout, self(), TimerMsg),
+ ClientReq = {RID,ClientPid,ClientRef,TimerRef},
+ ets:insert(State#state.req_clients, ClientReq),
+ ok.
-pick_client(State,RID,Clid) ->
- case ets:lookup(State#state.requests, RID) of
- [] ->
- false;
- [R] ->
- case R#request.clients of
- [SoleClient] ->
- {last, SoleClient}; % Note, not removed, the caller
- % should cleanup request data
- CList ->
- case lists:keyfind(Clid,1,CList) of
- false ->
- false;
- Client ->
- NCList = lists:keydelete(Clid,1,CList),
- ets:insert(State#state.requests,
- R#request{clients = NCList}),
- {more, Client}
- end
- end
- end.
get_rid () ->
New = (get(rid) + 1) rem 16#7FFFFFF,
@@ -372,21 +359,24 @@ foreach(Fun,Table,Key) ->
foreach(Fun,Table,ets:next(Table,Key)).
restart_port(#state{port = Port, requests = Requests}) ->
- (catch port_close(Port)),
+ _ = catch port_close(Port),
NewPort = do_open_port(get_poolsize(), get_extra_args()),
- foreach(fun(#request{rid = Rid, op = Op, proto = Proto, rdata = Rdata}) ->
- case Op of
- ?OP_GETHOSTBYNAME ->
- port_command(NewPort,[<<Rid:32,?OP_GETHOSTBYNAME:8,
- Proto:8>>,
- Rdata,0]);
- ?OP_GETHOSTBYADDR ->
- port_command(NewPort,
- <<Rid:32,?OP_GETHOSTBYADDR:8, Proto:8,
- Rdata/binary>>)
- end
- end,
- Requests),
+ %%
+ %% Redo all requests on the new port
+ foreach(
+ fun ({RID,{Op,Proto,Rdata}}) ->
+ case Op of
+ ?OP_GETHOSTBYNAME ->
+ port_command(
+ NewPort,
+ [<<RID:32,?OP_GETHOSTBYNAME:8,Proto:8>>, Rdata, 0]);
+ ?OP_GETHOSTBYADDR ->
+ port_command(
+ NewPort,
+ <<RID:32,?OP_GETHOSTBYADDR:8,Proto:8,Rdata/binary>>)
+ end
+ end,
+ Requests),
NewPort.
-dialyzer({no_improper_lists, do_open_port/2}).
--
2.35.3