File 7043-Move-time-out-to-clients.patch of Package erlang
From e517d17b06fafe5ab781ab6f3043ef5382443442 Mon Sep 17 00:00:00 2001
From: Raimo Niskanen <raimo@erlang.org>
Date: Thu, 19 Oct 2023 20:38:17 +0200
Subject: [PATCH 3/3] Move time-out to clients
Reduces the time-out message passing during high load, since
the server no longer acts as a proxy for client time-out messages.
There is still one timer per request towards the port driver,
instead of one per requesting client, and the clients use
receive...after.
---
lib/kernel/src/inet_gethost_native.erl | 129 ++++++++++++++++---------
1 file changed, 86 insertions(+), 43 deletions(-)
diff --git a/lib/kernel/src/inet_gethost_native.erl b/lib/kernel/src/inet_gethost_native.erl
index 572785cea4..8299d83933 100644
--- a/lib/kernel/src/inet_gethost_native.erl
+++ b/lib/kernel/src/inet_gethost_native.erl
@@ -65,9 +65,6 @@
-define(dbg(A,B), noop).
-endif.
--define(SEND_AFTER(A,B,C),erlang:send_after(A,B,C)).
--define(CANCEL_TIMER(A),erlang:cancel_timer(A)).
-
%% In erlang, IPV6 addresses are built as 8-tuples of 16bit values (not 16-tuples of octets).
%% This macro, meant to be used in guards checks one such 16bit value in the 8-tuple.
-define(VALID_V6(Part), is_integer(Part), Part < 65536).
@@ -87,6 +84,17 @@
no_data = 0
}).
+-record(request,
+ {rid,
+ req, % {Op,Proto,Data}
+ timer_ref,
+ %%
+ %% Timestamp in monotonic time native units,
+ %% for latest piggy backed client request, used to
+ %% restart server time-out matching latest client time-out
+ req_ts = undefined
+ }).
+
% The main loopstate...
-record(
state,
@@ -152,6 +160,7 @@ terminate(_Reason, Pid) ->
run_once() ->
Port = do_open_port(get_poolsize(), get_extra_args()),
Timeout = ?REQUEST_TIMEOUT,
+ persistent_term:put({?MODULE,timeout}, Timeout),
RID = 1,
{ClientHandle, Request} =
receive
@@ -194,9 +203,11 @@ server_init(Starter, Ref) ->
Poolsize = get_poolsize(),
Port = do_open_port(Poolsize, get_extra_args()),
Timeout = ?REQUEST_TIMEOUT,
+ persistent_term:put({?MODULE,timeout}, Timeout),
put(rid,0),
put(num_requests,0),
- RequestTab = ets:new(ign_requests,[set,protected]),
+ RequestTab =
+ ets:new(ign_requests,[set,protected,{keypos,#request.rid}]),
RequestIndex = ets:new(ign_req_index,[set,protected]),
RequestClients = ets:new(ign_req_clients, [bag,protected]),
State = #state{port = Port, timeout = Timeout,
@@ -249,17 +260,18 @@ handle_message({Port, {data, Data}}, State = #state{port = Port}) ->
when Unit =:= ?UNIT_ERROR;
Unit =:= ?UNIT_IPV4;
Unit =:= ?UNIT_IPV6 ->
- case ets:lookup(State#state.requests, RID) of
+ case ets:take(State#state.requests, RID) of
[] ->
%% We must have cancelled this request
State;
- [{_,Req}] ->
+ [#request{timer_ref = TimerRef, req = Req}] ->
+ _ = erlang:cancel_timer(
+ TimerRef,
+ [{async,true},{info,false}]),
%% Clean up the request and reply to clients
- ets:delete(State#state.requests, RID),
ets:delete(State#state.req_index, Req),
lists:foreach(
- fun ({_,ClientHandle,TimerRef}) ->
- _ = ?CANCEL_TIMER(TimerRef),
+ fun ({_,ClientHandle}) ->
ClientHandle !
{ClientHandle, {ok,BinReply}}
end,
@@ -289,33 +301,36 @@ handle_message({Port,eof}, State = #state{port = Port}) ->
NewPort=restart_port(State),
main_loop(State#state{port=NewPort});
-handle_message({timeout,RID,ClientHandle}, State) ->
- ClientReqMS = {RID,ClientHandle,'_'},
- case ets:match_object(State#state.req_clients, ClientReqMS) of
- [ClientReq] ->
- ets:delete_object(State#state.req_clients, ClientReq),
- ClientHandle ! {ClientHandle, {error,timeout}},
- case ets:member(State#state.req_clients, RID) of
- true ->
- %% There are still waiting clients
- ok;
- false ->
- %% The last client timed out - cancel the request
- case ets:lookup(State#state.requests, RID) of
- [{_,Req}] ->
- ets:delete(State#state.requests,RID),
- ets:delete(State#state.req_index,Req),
- put(num_requests,get(num_requests) - 1),
- %% Also cancel the request to the port program...
- _ = catch port_command(
- State#state.port,
- <<RID:32,?OP_CANCEL_REQUEST>>),
- ok;
- [] ->
- ok
- end
- end;
+handle_message({timeout,TimerRef,RID}, State) ->
+ case ets:lookup(State#state.requests, RID) of
[] ->
+ ok;
+ [#request{timer_ref = TimerRef, req = Req, req_ts = undefined}] ->
+ %% Cancel the request, let the clients do their own time-out
+ ets:delete(State#state.requests, RID),
+ ets:delete(State#state.req_index, Req),
+ ets:delete(State#state.req_clients, RID),
+ put(num_requests,get(num_requests) - 1),
+ %% Also cancel the request to the port program...
+ _ = catch port_command(
+ State#state.port,
+ <<RID:32,?OP_CANCEL_REQUEST>>),
+ ok;
+ [#request{timer_ref = TimerRef, req_ts = ReqTs}] ->
+ %% We may have more clients, restart the timer
+ TimeoutTime =
+ erlang:convert_time_unit(ReqTs, native, millisecond)
+ + State#state.timeout,
+ NewTimerRef =
+ erlang:start_timer(TimeoutTime, self(), RID, [{abs,true}]),
+ true =
+ ets:update_element(
+ State#state.requests, RID,
+ [{#request.timer_ref,NewTimerRef},
+ {#request.req_ts,undefined}]),
+ ok;
+ [#request{}] ->
+ %% Stale timer_ref - ignore
ok
end,
main_loop(State);
@@ -331,17 +346,22 @@ handle_message(_, State) -> % Stray messages from dying ports etc.
do_handle_call(ClientHandle, Req, RData, State) ->
case ets:lookup(State#state.req_index, Req) of
[{_,RID}] ->
+ true =
+ ets:update_element(
+ State#state.requests, RID,
+ {#request.req_ts,erlang:monotonic_time()}),
ok;
[] ->
RID = get_rid(),
_ = catch port_command(State#state.port, [<<RID:32>>|RData]),
- ets:insert(State#state.requests, {RID,Req}),
+ Timeout = State#state.timeout,
+ TimerRef = erlang:start_timer(Timeout, self(), RID),
+ ets:insert(
+ State#state.requests,
+ #request{rid = RID, req = Req, timer_ref = TimerRef}),
ets:insert(State#state.req_index, {Req,RID})
end,
- TimerMsg = {timeout,RID,ClientHandle},
- TimerRef = ?SEND_AFTER(State#state.timeout, self(), TimerMsg),
- ClientReq = {RID,ClientHandle,TimerRef},
- ets:insert(State#state.req_clients, ClientReq),
+ ets:insert(State#state.req_clients, {RID,ClientHandle}),
ok.
@@ -367,7 +387,7 @@ restart_port(#state{port = Port, requests = Requests}) ->
%%
%% Redo all requests on the new port
foreach(
- fun ({RID,{Op,Proto,Rdata}}) ->
+ fun (#request{rid = RID, req = {Op,Proto,Rdata}}) ->
case Op of
?OP_GETHOSTBYNAME ->
port_command(
@@ -483,22 +503,45 @@ control(soft_restart) ->
getit(restart_port, undefined);
control(_) -> {error, formerr}.
+
getit(Op, Proto, Data, DefaultName) ->
getit({Op, Proto, Data}, DefaultName).
getit(Req, DefaultName) ->
Pid = ensure_started(),
+ DefaultTimeout = (#state{})#state.timeout,
+ Timeout = persistent_term:get({?MODULE,timeout}, DefaultTimeout),
+ case call(Pid, Req, Timeout) of
+ {ok, BinHostent} ->
+ parse_address(BinHostent, DefaultName);
+ ok ->
+ ok;
+ {error, _} = Result->
+ Result
+ end.
+
+call(Pid, Req, Timeout) ->
ReqHandle = monitor(process, Pid, [{alias,reply_demonitor}]),
Pid ! {ReqHandle, Req},
+ wait_reply(ReqHandle, Timeout).
+
+wait_reply(ReqHandle, Timeout) ->
receive
- {ReqHandle, {ok,BinHostent}} ->
- parse_address(BinHostent, DefaultName);
{ReqHandle, Result} ->
Result;
{'DOWN', ReqHandle, process, _, Reason} ->
{error, Reason}
+ after Timeout ->
+ case unalias(ReqHandle) of
+ true ->
+ erlang:demonitor(ReqHandle, [flush]),
+ {error, timeout};
+ false ->
+ wait_reply(ReqHandle, infinity)
+ end
end.
+
ensure_started() ->
case whereis(?MODULE) of
undefined ->
--
2.35.3