File 2052-Implement-getstat-for-gen_tcp_socket.patch of Package erlang
From f74c09429bd336c41d710e44ced92d5b2dca6cf2 Mon Sep 17 00:00:00 2001
From: Raimo Niskanen <raimo@erlang.org>
Date: Thu, 16 Jan 2020 16:30:04 +0100
Subject: [PATCH 2/6] Implement getstat for gen_tcp_socket
---
lib/kernel/src/gen_tcp_socket.erl | 309 ++++++++++++++++++++++++++------------
lib/kernel/src/inet.erl | 7 +-
2 files changed, 222 insertions(+), 94 deletions(-)
diff --git a/lib/kernel/src/gen_tcp_socket.erl b/lib/kernel/src/gen_tcp_socket.erl
index 1b5dd21d04..26c0a357c1 100644
--- a/lib/kernel/src/gen_tcp_socket.erl
+++ b/lib/kernel/src/gen_tcp_socket.erl
@@ -27,7 +27,8 @@
shutdown/2, close/1, controlling_process/2]).
%% inet
-export([setopts/2, getopts/2,
- sockname/1, peername/1]).
+ sockname/1, peername/1,
+ getstat/2]).
-ifdef(undefined).
-export([unrecv/2]).
@@ -61,6 +62,8 @@
{'$socket', (Socket), abort, {(SelectRef), (Reason)}}).
-define(socket_select(Socket, SelectRef),
{'$socket', (Socket), select, (SelectRef)}).
+-define(socket_counter_wrap(Socket, Counter),
+ {'$socket', (Socket), counter_wrap, (Counter)}).
-define(select_info(SelectRef),
{select_info, _, (SelectRef)}).
@@ -129,7 +132,9 @@ connect_open(Addrs, Domain, ConnectOpts, Opts, Fd, Timer, BindAddr) ->
of
{ok, Server} ->
{Setopts, _} =
- setopts_split(#{socket => [], server => []}, ConnectOpts),
+ setopts_split(
+ #{socket => [], server_read => [], server_write => []},
+ ConnectOpts),
ErrorRef = make_ref(),
Ok = ok_fun(ErrorRef),
try
@@ -208,7 +213,9 @@ listen_open(Domain, ListenOpts, Opts, Fd, Backlog, BindAddr) ->
of
{ok, Server} ->
{Setopts, _} =
- setopts_split(#{socket => [], server => []}, ListenOpts),
+ setopts_split(
+ #{socket => [], server_read => [], server_write => []},
+ ListenOpts),
ErrorRef = make_ref(),
Ok = ok_fun(ErrorRef),
try
@@ -239,7 +246,7 @@ accept(?module_socket(ListenServer, ListenSocket), Timeout) ->
Ok = ok_fun(ErrorRef),
try
{ok, #{start_opts := StartOpts} = ServerData} =
- Ok(call(ListenServer, get_server_data)),
+ Ok(call(ListenServer, get_server_opts)),
{ok, Server} =
Ok(start_server(
ServerData,
@@ -404,7 +411,10 @@ peername(?module_socket(_Server, Socket)) ->
{error, _} = Error -> Error
end.
+%% -------------------------------------------------------------------------
+getstat(?module_socket(Server, _Socket), What) when is_list(What) ->
+ call(Server, {getstat, What}).
%%% ========================================================================
%%% Socket glue code
@@ -513,7 +523,7 @@ setopts_split(_FilterTags, [], True, False) ->
{reverse(True), reverse(False)};
setopts_split(FilterTags, [Opt | Opts], True, False) ->
Opt_1 = conv_setopt(Opt),
- case member(setopt_categories(Opt_1), FilterTags) of
+ case member(FilterTags, setopt_categories(Opt_1)) of
true ->
setopts_split(FilterTags, Opts, [Opt_1 | True], False);
false ->
@@ -523,25 +533,18 @@ setopts_split(FilterTags, [Opt | Opts], True, False) ->
%% Set operation on atom sets that are atoms or maps with atom tags.
%% Returns true if sets have at least one common member, false otherwise.
-member(X, Y) when is_atom(X) ->
+%% X is atom() or map(), Y is map().
+member(X, Y) when is_atom(X), is_map(Y) ->
case Y of
- X -> true;
#{X := _} -> true;
- #{} -> false;
- _ when is_atom(Y) -> false
+ #{} -> false
end;
-member(X, Y) when is_atom(Y) -> member(Y, X);
-member(#{} = X, #{} = Y) ->
+member(X, Y) when is_map(X), is_map(Y) ->
maps:fold(
fun (_, _, true) -> true;
(Key, _, false) -> maps:is_key(Key, Y)
end, false, X).
-%% Ensure a map with atom tag
-member(X) when is_atom(X) ->
- #{X => []};
-member(#{} = X) -> X.
-
conv_setopt(binary) -> {mode, binary};
conv_setopt(list) -> {mode, list};
@@ -592,40 +595,53 @@ socket_copy_opt(Socket, Tag, TargetSocket) when is_atom(Tag) ->
start_opts([{sys_debug, D} | Opts]) ->
[{debug, D} | start_opts(Opts)];
+start_opts([Opt | Opts]) ->
+ [Opt | start_opts(Opts)];
start_opts([]) -> [].
-%% Categories: socket, ignore, start, server, einval
+%% Categories: socket, ignore, start, server_read, server_write, einval
+%% returns a maps set
setopt_categories(Opt) ->
case Opt of
- {raw, _, _, _} -> socket;
+ {raw, _, _, _} -> #{socket => []};
{Tag, _} -> opt_categories(Tag);
_ -> ignore
end.
getopt_categories(Opt) ->
case Opt of
- {raw, _, _, _} -> socket;
+ {raw, _, _, _} -> #{socket => []};
_ -> opt_categories(Opt)
end.
%% setopt and getopt category
opt_categories(Tag) when is_atom(Tag) ->
case Tag of
- sys_debug -> start;
- debug -> #{socket => [], start => []}; %% Cheeting!
+ sys_debug -> #{start => []};
+ debug -> #{socket => [], start => []};
_ ->
case maps:is_key(Tag, socket_opt()) of
- true -> socket;
+ true -> #{socket => []};
false ->
- case maps:is_key(Tag, server_data()) of
- true -> server;
+ case maps:is_key(Tag, ignore_opt()) of
+ true ->
+ #{ignore => []};
false ->
- case maps:is_key(Tag, ignore_opt()) of
- true -> ignore;
- false -> einval
- end
+ maps:merge(
+ case maps:is_key(Tag, server_read_opts()) of
+ true ->
+ #{server_read => []};
+ false ->
+ #{}
+ end,
+ case maps:is_key(Tag, server_write_opts()) of
+ true ->
+ #{server_write => []};
+ false ->
+ #{}
+ end)
end
end
end.
@@ -683,33 +699,43 @@ socket_opt() ->
socket_inherit_opts() ->
[priority].
--define(server_write_opts(),
- #{packet => raw,
- packet_size => 16#4000000, % 64 MByte
- show_econnreset => false,
- send_timeout => infinity,
- send_timeout_close => false}).
--compile({inline, [server_write_opts/0]}).
-server_write_opts() -> ?server_write_opts().
-%% Category 'server'
-%%
-%% Default values
--compile({inline, [server_data/0]}).
-server_data() ->
+-compile({inline, [server_read_write_opts/0]}).
+server_read_write_opts() ->
+ %% Common for read and write side
+ #{packet => raw,
+ packet_size => 16#4000000, % 64 MByte
+ show_econnreset => false}.
+-compile({inline, [server_read_opts/0]}).
+server_read_opts() ->
+ %% Read side only opts
maps:merge(
#{active => true,
mode => list,
header => 0,
deliver => term,
- start_opts => [],
- %% XXX the below have no implementation
+ start_opts => [], % Just to make it settable
+ %% XXX not implemented yet
exit_on_close => true,
- line_delimiter => $\n,
- delay_send => false
- }, ?server_write_opts()).
+ line_delimiter => $\n},
+ server_read_write_opts()).
+-compile({inline, [server_write_opts/0]}).
+server_write_opts() ->
+ %% Write side only opts
+ maps:merge(
+ #{send_timeout => infinity,
+ send_timeout_close => false,
+ %% XXX not implemented yet
+ delay_send => false},
+ server_read_write_opts()).
+%% Category 'server'
+%%
+%% Default values
+-compile({inline, [server_opts/0]}).
+server_opts() ->
+ maps:merge(server_read_opts(), server_write_opts()).
-compile({inline, [meta/1]}).
-meta(D) -> maps:with(maps:keys(?server_write_opts()), D).
+meta(D) -> maps:with(maps:keys(server_write_opts()), D).
%%% ========================================================================
%%% State Machine
@@ -759,7 +785,7 @@ callback_mode() -> handle_event_function.
{owner :: pid(),
state :: term()}).
%% A super state that encapsulates any other state
-%% and postpones all events but get_server_data/0
+%% and postpones all events but get_server_opts/0
%% and Owner 'DOWN'
%% 'accept'
@@ -797,7 +823,8 @@ init({open, Domain, ExtraOpts, Owner}) ->
Extra = maps:from_list(ExtraOpts),
case socket:open(Domain, stream, tcp, Extra) of
{ok, Socket} ->
- D = server_data(),
+ D = server_opts(),
+ ok = socket:setopt(Socket, otp, iow, true),
ok = socket:setopt(Socket, otp, meta, meta(D)),
P =
#params{
@@ -888,9 +915,9 @@ is_packet_option_value(Value) ->
%% Any state:
-%% Call: get_server_data/0
-handle_event({call, From}, get_server_data, _State, {_P, D}) ->
- ServerData = maps:with(maps:keys(server_data()), D),
+%% Call: get_server_opts/0
+handle_event({call, From}, get_server_opts, _State, {_P, D}) ->
+ ServerData = maps:with(maps:keys(server_opts()), D),
{keep_state_and_data,
[{reply, From, {ok, ServerData}}]};
@@ -901,6 +928,20 @@ handle_event(
%%
{stop, {shutdown, Reason}, P_D};
+%% Event: ?socket_counter_wrap/2
+handle_event(
+ info, ?socket_counter_wrap(Socket, Counter),
+ 'connected' = _State, {#params{socket = Socket} = P, D}) ->
+ {keep_state, {P, wrap_counter(Counter, D)}};
+handle_event(
+ info, ?socket_counter_wrap(Socket, Counter),
+ #recv{} = _State, {#params{socket = Socket} = P, D}) ->
+ {keep_state, {P, wrap_counter(Counter, D)}};
+handle_event(
+ info, ?socket_counter_wrap(_Socket, _Counter), _State, _P_D) ->
+ {keep_state_and_data,
+ [postpone]};
+
%% Call: controlling_process/1
handle_event(
{call, {Caller, _} = From}, {controlling_process, NewOwner},
@@ -977,12 +1018,23 @@ handle_event({call, From}, {setopts, Opts}, State, {P, D}) ->
P, D_1,
[Reply]);
_ ->
- {keep_state,
- {P, D_1},
+ {keep_state, {P, D_1},
[Reply]}
end;
-%% State: 'closed' - what is not handled in close/0 above
+%% Call: getstat/2
+handle_event({call, From}, {getstat, What}, State, {P, D}) ->
+ case State of
+ 'closed' ->
+ {keep_state_and_data,
+ [{reply, From, {error, closed}}]};
+ _ ->
+ {D_1, Result} = getstat(P#params.socket, D, What),
+ {keep_state, {P, D_1},
+ [{reply, From, {ok, Result}}]}
+ end;
+
+%% State: 'closed' - what is not handled above
handle_event(Type, Content, 'closed' = State, P_D) ->
handle_closed(Type, Content, State, P_D);
%% Handled state: 'closed'
@@ -1231,6 +1283,7 @@ handle_connect(
handle_accept(P, D, From, ListenSocket, Timeout) ->
case socket:accept(ListenSocket, nowait) of
{ok, Socket} ->
+ ok = socket:setopt(Socket, otp, iow, true),
ok = socket:setopt(Socket, otp, meta, meta(D)),
[ok = socket_copy_opt(ListenSocket, Opt, Socket)
|| Opt <- socket_inherit_opts()],
@@ -1720,7 +1773,7 @@ state_setopts(_P, D, _State, []) ->
{ok, D};
state_setopts(P, D, State, [Opt | Opts]) ->
Opt_1 = conv_setopt(Opt),
- case member(setopt_categories(Opt_1)) of
+ case setopt_categories(Opt_1) of
#{socket := _} ->
case P#params.socket of
undefined ->
@@ -1733,27 +1786,26 @@ state_setopts(P, D, State, [Opt | Opts]) ->
{Error, D}
end
end;
- #{server := _} ->
+ %%
+ #{server_write := _} when State =:= 'closed' ->
+ {{error, einval}, D};
+ #{server_write := _} ->
state_setopts_server(P, D, State, Opts, Opt_1);
+ %%
+ #{server_read := _} when State =:= 'closed' ->
+ {{error, einval}, D};
+ #{server_read := _} when State =:= 'closed_read' ->
+ {{error, einval}, D};
+ #{server_read := _} ->
+ state_setopts_server(P, D, State, Opts, Opt_1);
+ %%
#{ignore := _} ->
state_setopts(P, D, State, Opts);
- _ -> % extra | einval
+ #{} -> % extra | einval
{{error, einval}, D}
end.
-state_setopts_server(_P, D, 'closed', _Opts, {_Tag, _Value}) ->
- {{error, einval}, D};
-state_setopts_server(P, D, 'closed_read' = State, Opts, {Tag, Value}) ->
- case member(Tag, server_write_opts()) of
- true ->
- state_setopts_server(P, D, State, Opts, Tag, Value);
- false ->
- {{error, einval}, D}
- end;
state_setopts_server(P, D, State, Opts, {Tag, Value}) ->
- state_setopts_server(P, D, State, Opts, Tag, Value).
-%%
-state_setopts_server(P, D, State, Opts, Tag, Value) ->
case Tag of
active ->
state_setopts_active(P, D, State, Opts, Value);
@@ -1818,7 +1870,7 @@ state_getopts(P, D, State, Opts) ->
state_getopts(_P, _D, _State, [], Acc) ->
{ok, reverse(Acc)};
state_getopts(P, D, State, [Tag | Tags], Acc) ->
- case member(getopt_categories(Tag)) of
+ case getopt_categories(Tag) of
#{socket := _} ->
case P#params.socket of
undefined ->
@@ -1832,34 +1884,107 @@ state_getopts(P, D, State, [Tag | Tags], Acc) ->
state_getopts(P, D, State, Tags, Acc)
end
end;
- #{server := _} when State =:= 'closed' ->
+ #{server_write := _} when State =:= 'closed' ->
{error, einval};
- #{server := _} ->
- case D of
- #{Tag := Value} ->
- case State of
- 'closed_read' ->
- case member(Tag, server_write_opts()) of
- true ->
- state_getopts(
- P, D, State, Tags,
- [{Tag, Value} | Acc]);
- false ->
- {error, einval}
- end;
- _ ->
- state_getopts(
- P, D, State, Tags, [{Tag, Value} | Acc])
- end;
- #{} ->
- state_getopts(P, D, State, Tags, Acc)
- end;
+ #{server_write := _} ->
+ Value = maps:get(Tag, D),
+ state_getopts(P, D, State, Tags, [{Tag, Value} | Acc]);
+ #{server_read := _} when State =:= 'closed' ->
+ {error, einval};
+ #{server_read := _} when State =:= 'closed_read' ->
+ {error, einval};
+ #{server_read := _} ->
+ Value = maps:get(Tag, D),
+ state_getopts(P, D, State, Tags, [{Tag, Value} | Acc]);
#{einval := _} ->
state_getopts(P, D, State, Tags, Acc);
#{} -> % extra | einval
{error, einval}
end.
+
+getstat(Socket, D, What) ->
+ %% Read counters
+ Counters_1 = socket_info_counters(Socket),
+ %% Check for recent wraps
+ {D_1, Wrapped} = receive_counter_wrap(Socket, D, []),
+ %%
+ %% Assumption: a counter that we just now got a wrap message from
+ %% will not wrap again before we read the updated value
+ %%
+ %% Update wrapped counters
+ Counters_2 = socket_info_counters(Socket),
+ Counters_3 = maps:merge(Counters_1, maps:with(Wrapped, Counters_2)),
+ %% Go ahead with wrap updated counters
+ {D_1, getstat_what(What, D_1, Counters_3)}.
+
+getstat_what([], _D, _C) -> [];
+getstat_what([Tag | What], D, C) ->
+ Val =
+ case Tag of
+ recv_oct ->
+ counter_value(read_byte, D, C);
+ recv_cnt ->
+ counter_value(read_pkg, D, C);
+ recv_max ->
+ getstat_avg(read_byte, D, C, read_pkg);
+ recv_avg ->
+ getstat_avg(read_byte, D, C, read_pkg);
+ recv_dvi -> 0;
+ %%
+ send_oct ->
+ counter_value(write_byte, D, C);
+ send_cnt ->
+ counter_value(write_pkg, D, C);
+ send_max ->
+ getstat_avg(write_byte, D, C, write_pkg);
+ send_avg ->
+ getstat_avg(write_byte, D, C, write_pkg);
+ send_pend -> 0
+ end,
+ [{Tag, Val} | getstat_what(What, D, C)].
+
+getstat_avg(SumTag, D, C, CntTag) ->
+ Cnt = counter_value(CntTag, D, C),
+ if
+ Cnt =:= 0 ->
+ counter_value(SumTag, D, C);
+ true ->
+ round(counter_value(SumTag, D, C) / Cnt)
+ end.
+
+socket_info_counters(Socket) ->
+ #{counters := Counters} = socket:info(Socket),
+ maps:from_list(Counters).
+
+receive_counter_wrap(Socket, D, Wrapped) ->
+ receive
+ ?socket_counter_wrap(Socket, Counter) ->
+ receive_counter_wrap(
+ Socket, wrap_counter(Counter, D) , [Counter | Wrapped])
+ after 0 ->
+ {D, Wrapped}
+ end.
+
+wrap_counter(Counter, D) ->
+ case D of
+ #{Counter := N} ->
+ D#{Counter := N + 1};
+ #{} ->
+ D#{Counter => 1}
+ end.
+
+-define(COUNTER_BITS, 32).
+counter_value(Counter, D, Counters) ->
+ case D of
+ #{Counter := Wraps} ->
+ (Wraps bsl ?COUNTER_BITS) + maps:get(Counter, Counters);
+ #{} ->
+ maps:get(Counter, Counters)
+ end.
+
+
+
-compile({inline, [reverse/1]}).
reverse([]) -> [];
reverse([_] = L) -> L;
diff --git a/lib/kernel/src/inet.erl b/lib/kernel/src/inet.erl
index 1ab528d1c5..b357dfff77 100644
--- a/lib/kernel/src/inet.erl
+++ b/lib/kernel/src/inet.erl
@@ -519,7 +519,7 @@ gethostname(Socket) ->
OptionValues :: [{stat_option(), integer()}].
getstat(Socket) ->
- prim_inet:getstat(Socket, stats()).
+ getstat(Socket, stats()).
-spec getstat(Socket, Options) ->
{ok, OptionValues} | {error, posix()} when
@@ -527,7 +527,10 @@ getstat(Socket) ->
Options :: [stat_option()],
OptionValues :: [{stat_option(), integer()}].
-getstat(Socket,What) ->
+getstat({'$inet', GenSocketMod, _} = Socket, What)
+ when is_atom(GenSocketMod) ->
+ GenSocketMod:?FUNCTION_NAME(Socket, What);
+getstat(Socket, What) ->
prim_inet:getstat(Socket, What).
-spec gethostbyname(Hostname) -> {ok, Hostent} | {error, posix()} when
--
2.16.4