File 1272-Fix-multiple-ct_gen_conn-issues.patch of Package erlang
From 05b245d83bc544c8e5db209bb724d5cdc19562b3 Mon Sep 17 00:00:00 2001
From: Anders Svensson <anders@erlang.org>
Date: Thu, 8 Apr 2021 00:06:32 +0200
Subject: [PATCH 2/4] Fix multiple ct_gen_conn issues
Redo as a gen_server to simplify the mechanics (no need to reinvent that
wheel) and address the following problems:
- A call/3 reply from the process could be left in the caller's message
queue after a timeout.
- Failure of a user's terminate callback caused a call result to not be
sent.
- Unexpected messages accumulated in the message queue with option
forward = false.
- Documentation and specs were out of sync with code. Redo the edoc specs
as type specs.
Exit reasons have changed slightly, wrapping them in a shutdown tuple to
avoid gen_server logging expected exits as failures, but these are
mapped back by start/4 and call/3 since test code matches on the values.
(Code that monitors on the exit reason doesn't seem to care.)
do_within_time/2 (called from user callbacks in ct_telnet) is one source
of unexpected messages, which will be addressed in a subsequent commit.
Untabified for consistent indentation.
---
lib/common_test/src/ct_gen_conn.erl | 762 ++++++++++++++++------------
1 file changed, 429 insertions(+), 333 deletions(-)
diff --git a/lib/common_test/src/ct_gen_conn.erl b/lib/common_test/src/ct_gen_conn.erl
index f4f002614e..3059cb2b46 100644
--- a/lib/common_test/src/ct_gen_conn.erl
+++ b/lib/common_test/src/ct_gen_conn.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2003-2018. All Rights Reserved.
+%% Copyright Ericsson AB 2003-2021. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -18,16 +18,39 @@
%% %CopyrightEnd%
%%
-%%% Generic connection owner process.
-%%%
-%%% -type handle() = pid(). A handle for using a connection implemented
-%%% with ct_gen_conn.erl.
+%%
+%% Generic connection owner process.
+%%
-module(ct_gen_conn).
--export([start/4, stop/1, get_conn_pid/1, check_opts/1]).
--export([call/2, call/3, return/2, do_within_time/2]).
--export([log/3, start_log/1, cont_log/2, cont_log_no_timestamp/2, end_log/0]).
+-behaviour(gen_server).
+
+-export([start/4,
+ stop/1,
+ get_conn_pid/1]).
+
+-export([call/2,
+ call/3,
+ return/2,
+ do_within_time/2]).
+
+-export([log/3,
+ start_log/1,
+ cont_log/2,
+ cont_log_no_timestamp/2,
+ end_log/0]).
+
+%% gen_server callbacks
+-export([init/1,
+ handle_info/2,
+ handle_cast/2,
+ handle_call/3,
+ code_change/3,
+ terminate/2]).
+
+%% test
+-export([make_opts/1]).
-ifdef(debug).
-define(dbg,true).
@@ -35,149 +58,170 @@
-define(dbg,false).
-endif.
--record(gen_opts,{callback,
- name,
- address,
- init_data,
- reconnect = true,
- forward = false,
- use_existing = true,
- old = false,
- conn_pid,
- cb_state,
- ct_util_server}).
-
-%%%-----------------------------------------------------------------
-%%% -spec start(Address,InitData,CallbackMod,Opts) ->
-%%% {ok,Handle} | {error,Reason}
-%%% Name = term()
-%%% CallbackMod = atom()
-%%% InitData = term()
-%%% Address = term()
-%%% Opts = [Opt]
-%%% Opt = {name,Name} | {use_existing_connection,boolean()} |
-%%% {reconnect,boolean()} | {forward_messages,boolean()}
-%%%
-%%% Open a connection and start the generic connection owner process.
-%%%
-%%% The CallbackMod is a specific callback module for
-%%% each type of connection (e.g. telnet, ftp,...). It must export the
-%%% function init/3 which takes the arguments
-%%% Name, Addresse) and
-%%% InitData and returna
-%%% {ok,ConnectionPid,State} or
-%%% {error,Reason}.
-%%%
-%%% If no name is given, the Name argument in init/3 will
-%%% have the value undefined.
-%%%
-%%% The callback modules must also export
-%%%
-%%% handle_msg(Msg,From,State) -> {reply,Reply,State} |
-%%% {noreply,State} |
-%%% {stop,Reply,State}
-%%% terminate(ConnectionPid,State) -> term()
-%%% close(Handle) -> term()
-%%%
-%%% The close/1 callback function is actually a callback
-%%% for ct_util, for closing registered connections when
-%%% ct_util_server is terminated. Handle is the Pid of
-%%% the ct_gen_conn process.
-%%%
-%%% If option reconnect is true, then the
-%%% callback must also export
-%%%
-%%% reconnect(Address,State) -> {ok,ConnectionPid,State}
-%%%
-%%% If option forward_messages is <ocde>true, then
-%%% the callback must also export
-%%%
-%%% handle_msg(Msg,State) -> {noreply,State} | {stop,State}
-%%%
-%%% An old interface still exists. This is used by ct_telnet, ct_ftp
-%%% and ct_ssh. The start function then has an explicit
-%%% Name argument, and no Opts argument. The
-%%% callback must export:
-%%%
-%%% init(Name,Address,InitData) -> {ok,ConnectionPid,State}
-%%% handle_msg(Msg,State) -> {Reply,State}
-%%% reconnect(Address,State) -> {ok,ConnectionPid,State}
-%%% terminate(ConnectionPid,State) -> term()
-%%% close(Handle) -> term()
-%%%
-start(Address,InitData,CallbackMod,Opts) when is_list(Opts) ->
- do_start(Address,InitData,CallbackMod,Opts);
-start(Name,Address,InitData,CallbackMod) ->
- do_start(Address,InitData,CallbackMod,[{name,Name},{old,true}]).
-
-%%%-----------------------------------------------------------------
-%%% -spec stop(Handle) -> ok
-%%% Handle = handle()
-%%%
-%%% Close the connection and stop the process managing it.
+-type handle() :: pid(). %% connection-owning process spawned here
+
+-record(gen_opts, {callback :: module(),
+ name :: ct:target_name(),
+ address :: term(),
+ init_data :: term(),
+ reconnect = true :: boolean(),
+ forward = false :: boolean(),
+ use_existing = true :: boolean(),
+ old = false :: boolean(),
+ conn_pid :: pid() | undefined,
+ cb_state :: term(),
+ ct_util_server :: pid() | undefined}).
+
+%% ---------------------------------------------------------------------
+
+-spec start(Address, InitData, CallbackMod, [Opt])
+ -> {ok, handle()} | {error, Reason :: term()}
+ when Address :: term(),
+ InitData :: term(),
+ CallbackMod :: module(),
+ Opt :: {name, Name :: ct:target_name()}
+ | {use_existing_connection, boolean()}
+ | {reconnect, boolean()}
+ | {forward_messages, boolean()}
+ ; (Name, Address, InitData, CallbackMod)
+ -> {ok, handle()} | {error, Reason :: term()}
+ when Name :: ct:target_name(),
+ Address :: term(),
+ InitData :: term(),
+ CallbackMod :: module().
+
+%% Open a connection and start the generic connection owner process.
+%%
+%% CallbackMod is a specific callback module for
+%% each type of connection (e.g. telnet, ftp, netconf). It must export:
+%%
+%% init(Name, Address, InitData) -> {ok, ConnectionPid, State}
+%% | {error,Reason}.
+%%
+%% Name defaults to undefined if unspecified.
+%%
+%% The callback modules must also export:
+%%
+%% handle_msg(Msg, From, State) -> {reply, Reply, State}
+%% | {noreply, State}
+%% | {stop, Reply, State}
+%% terminate(ConnectionPid, State) -> term()
+%% close(Handle) -> term()
+%%
+%% A Reply of the form {retry, _} results in a new call to the server with
+%% the retry tuple as message.
+%%
+%% The close/1 callback function is actually a callback
+%% for ct_util, for closing registered connections when
+%% ct_util_server is terminated. Handle is the Pid of
+%% the ct_gen_conn process.
+%%
+%% If option reconnect is true, then the
+%% callback must also export:
+%%
+%% reconnect(Address, State) -> {ok, ConnectionPid, State}
+%%
+%% If option forward_messages is true then
+%% the callback must also export:
+%%
+%% handle_msg(Msg,State) -> {noreply,State}
+%% | {stop,State}
+%%
+%% An old interface still exists. This is used by ct_telnet, ct_ftp
+%% and ct_ssh. The start function then has an explicit
+%% Name argument, and no Opts argument.
+
+start(Address, InitData, CallbackMod, Opts) when is_list(Opts) ->
+ do_start(Address, InitData, CallbackMod, Opts);
+start(Name, Address, InitData, CallbackMod) ->
+ do_start(Address, InitData, CallbackMod, [{name, Name}, {old, true}]).
+
+%% ---------------------------------------------------------------------
+
+-spec stop(handle()) -> ok | {error, Reason :: term()}.
+
+%% Close the connection and stop the process managing it.
+
stop(Handle) ->
- call(Handle,stop,5000).
+ call(Handle, stop, 5000).
+
+%% ---------------------------------------------------------------------
+
+-spec get_conn_pid(handle()) -> pid().
+
+%% Return the connection pid associated with Handle
-%%%-----------------------------------------------------------------
-%%% -spec get_conn_pid(Handle) -> ok
-%%% Handle = handle()
-%%%
-%%% Return the connection pid associated with Handle
get_conn_pid(Handle) ->
- call(Handle,get_conn_pid).
-
-%%%-----------------------------------------------------------------
-%%% -spec log(Heading,Format,Args) -> ok
-%%%
-%%% Log activities on the current connection (tool-internal use only).
-%%% See ct_logs:log/3
-log(Heading,Format,Args) ->
- log(log,[Heading,Format,Args]).
-
-%%%-----------------------------------------------------------------
-%%% -spec start_log(Heading) -> ok
-%%%
-%%% Log activities on the current connection (tool-internal use only).
-%%% See ct_logs:start_log/1
+ call(Handle, get_conn_pid).
+
+%% ---------------------------------------------------------------------
+
+-spec log(Heading, Format, Args) -> ok
+ when Heading :: iodata(),
+ Format :: io:format(),
+ Args :: [term()].
+
+%% Log activities on the current connection.
+%% See ct_logs:log/3
+
+log(Heading, Format, Args) ->
+ log(log, [Heading, Format, Args]).
+
+%% ---------------------------------------------------------------------
+
+-spec start_log(Heading :: iodata()) -> ok.
+
+%% Log activities on the current connection.
+%% See ct_logs:start_log/1
+
start_log(Heading) ->
- log(start_log,[Heading]).
+ log(start_log, [Heading]).
+
+%% ---------------------------------------------------------------------
+
+-spec cont_log(Format, Args) -> ok
+ when Format :: io:format(),
+ Args :: [term()].
+
+%% Log activities on the current connection.
+%% See ct_logs:cont_log/2
-%%%-----------------------------------------------------------------
-%%% -spec cont_log(Format,Args) -> ok
-%%%
-%%% Log activities on the current connection (tool-internal use only).
-%%% See ct_logs:cont_log/2
cont_log(Format,Args) ->
- log(cont_log,[Format,Args]).
-
-%%%-----------------------------------------------------------------
-%%% -spec cont_log_no_timestamp(Format,Args) -> ok
-%%%
-%%% Log activities on the current connection (tool-internal use only).
-%%% See ct_logs:cont_log/2
-cont_log_no_timestamp(Format,Args) ->
- log(cont_log_no_timestamp,[Format,Args]).
-
-%%%-----------------------------------------------------------------
-%%% -spec end_log() -> ok
-%%%
-%%% Log activities on the current connection (tool-internal use only).
-%%% See ct_logs:end_log/0
+ log(cont_log, [Format, Args]).
+
+%% ---------------------------------------------------------------------
+
+-spec cont_log_no_timestamp(Format, Args) -> ok
+ when Format :: io:format(),
+ Args :: [term()].
+
+%% Log activities on the current connection.
+%% See ct_logs:cont_log/2
+
+cont_log_no_timestamp(Format, Args) ->
+ log(cont_log_no_timestamp, [Format, Args]).
+
+%% ---------------------------------------------------------------------
+
+-spec end_log() -> ok.
+
+%% Log activities on the current connection.
+%% See ct_logs:end_log/0
+
end_log() ->
- log(end_log,[]).
-
-%%%-----------------------------------------------------------------
-%%% -spec do_within_time(Fun,Timeout) -> FunResult | {error,Reason}
-%%% Fun = function()
-%%% Timeout = integer()
-%%%
-%%% Execute a function within a limited time (tool-internal use only).
-%%%
-%%% Execute the given Fun, but interrupt if it takes
-%%% more than Timeout milliseconds.
-%%%
-%%% The execution is also interrupted if the connection is
-%%% closed.
+ log(end_log, []).
+
+%% ---------------------------------------------------------------------
+
+-spec do_within_time(Fun, Tmo)
+ -> Result
+ when Fun :: fun(),
+ Tmo :: non_neg_integer(),
+ Result :: term().
+
+%% Return the result of evaluating Fun, or interrupt after Tmo
+%% milliseconds or if the connection is closed.
+
do_within_time(Fun,Timeout) ->
Self = self(),
Silent = get(silent),
@@ -211,214 +255,266 @@ do_within_time(Fun,Timeout) ->
end
end.
-%%%=================================================================
-%%% Internal functions
-do_start(Address,InitData,CallbackMod,Opts0) ->
- Opts = check_opts(Opts0,#gen_opts{callback=CallbackMod,
- address=Address,
- init_data=InitData}),
- case ct_util:does_connection_exist(Opts#gen_opts.name,
- Address,CallbackMod) of
- {ok,Pid} when Opts#gen_opts.use_existing ->
- log("ct_gen_conn:start","Using existing connection!\n",[]),
- {ok,Pid};
- {ok,Pid} when not Opts#gen_opts.use_existing ->
- {error,{connection_exists,Pid}};
- false ->
- do_start(Opts)
+%% ===========================================================================
+
+do_start(Address, InitData, CallbackMod, OptsList) ->
+ #gen_opts{name = Name}
+ = Opts
+ = make_opts(OptsList, #gen_opts{callback = CallbackMod,
+ address = Address,
+ init_data = InitData}),
+ %% Testing for an existing connection is slightly broken as long
+ %% as calls to start aren't serialized: concurrent starts can both
+ %% end up in the final clause.
+ case ct_util:does_connection_exist(Name, Address, CallbackMod) of
+ {ok, _Pid} = Ok when Opts#gen_opts.use_existing ->
+ log("ct_gen_conn:start","Using existing connection!\n", []),
+ Ok;
+ {ok, Pid} when not Opts#gen_opts.use_existing ->
+ {error, {connection_exists, Pid}};
+ false ->
+ do_start(Opts)
end.
do_start(Opts) ->
- Self = self(),
- Pid = spawn(fun() -> init_gen(Self, Opts) end),
- MRef = erlang:monitor(process,Pid),
- receive
- {connected,Pid} ->
- erlang:demonitor(MRef, [flush]),
- {ok,Pid};
- {Error,Pid} ->
- receive {'DOWN',MRef,process,_,_} -> ok end,
- Error;
- {'DOWN',MRef,process,_,Reason} ->
- log("ct_gen_conn:start",
- "Connection process died: ~tp\n",
- [Reason]),
- {error,{connection_process_died,Reason}}
+ try gen_server:start(?MODULE, Opts, []) of
+ {ok, _} = Ok -> Ok;
+ {error, Reason} -> {error, rc(Reason)}
+ catch
+ exit: Reason ->
+ log("ct_gen_conn:start",
+ "Connection process died: ~tp\n",
+ [Reason]),
+ {error, {connection_process_died, Reason}}
end.
-check_opts(Opts0) ->
- check_opts(Opts0,#gen_opts{}).
-
-check_opts([{name,Name}|T],Opts) ->
- check_opts(T,Opts#gen_opts{name=Name});
-check_opts([{reconnect,Bool}|T],Opts) ->
- check_opts(T,Opts#gen_opts{reconnect=Bool});
-check_opts([{forward_messages,Bool}|T],Opts) ->
- check_opts(T,Opts#gen_opts{forward=Bool});
-check_opts([{use_existing_connection,Bool}|T],Opts) ->
- check_opts(T,Opts#gen_opts{use_existing=Bool});
-check_opts([{old,Bool}|T],Opts) ->
- check_opts(T,Opts#gen_opts{old=Bool});
-check_opts([],Opts) ->
- Opts.
+%% Unwrap a {shutdown, _} exit reason for backwards compatibility.
+rc({shutdown, Reason}) -> Reason;
+rc(T) -> T.
+
+make_opts(Opts) ->
+ make_opts(Opts, #gen_opts{}).
+
+make_opts(Opts, #gen_opts{} = Rec) ->
+ lists:foldl(fun opt/2, Rec, Opts).
+
+opt({name, Name}, Rec) -> Rec#gen_opts{name = Name};
+opt({reconnect, Bool}, Rec) -> Rec#gen_opts{reconnect = Bool};
+opt({forward_messages, Bool}, Rec) -> Rec#gen_opts{forward = Bool};
+opt({use_existing_connection, Bool}, Rec) -> Rec#gen_opts{use_existing = Bool};
+opt({old, Bool}, Rec) -> Rec#gen_opts{old = Bool}.
+
+%% ===========================================================================
call(Pid, Msg) ->
call(Pid, Msg, infinity).
-call(Pid, Msg, Timeout) ->
- MRef = erlang:monitor(process,Pid),
- Ref = make_ref(),
- Pid ! {Msg,{self(),Ref}},
- receive
- {Ref, Result} ->
- erlang:demonitor(MRef, [flush]),
- case Result of
- {retry,_Data} ->
- call(Pid,Result);
- Other ->
- Other
- end;
- {'DOWN',MRef,process,_,Reason} ->
- {error,{process_down,Pid,Reason}}
- after Timeout ->
- erlang:demonitor(MRef, [flush]),
- log("ct_gen_conn",
- "Connection process ~w not responding. Killing now!",
- [Pid]),
- exit(Pid, kill),
- {error,{process_down,Pid,forced_termination}}
+-spec call(Handle, Msg, Tmo)
+ -> term()
+ when Handle :: handle(),
+ Msg :: term(),
+ Tmo :: non_neg_integer()
+ | infinity.
+
+call(Pid, Msg, infinity = Tmo) ->
+ gen_call(Pid, Msg, Tmo);
+
+%% Spawn a middleman process if the call can timeout to avoid the
+%% possibilty of a reply being left in the caller's mailbox after a
+%% timeout.
+call(Pid, Msg, Tmo) ->
+ {_, MRef} = spawn_monitor(fun() -> exit(gen_call(Pid, Msg, Tmo)) end),
+ receive {'DOWN', MRef, process, _, RC} -> RC end.
+
+gen_call(Pid, Msg, Tmo) ->
+ try gen_server:call(Pid, Msg, Tmo) of
+ T -> retry(Pid, T, Tmo)
+ catch
+ exit: Reason -> {error, {process_down, Pid, rc(Pid, Reason)}}
end.
-return({To,Ref},Result) ->
- To ! {Ref, Result},
- ok.
+retry(Pid, {retry, _} = T, Tmo) -> gen_call(Pid, T, Tmo);
+retry(_, T, _) -> T.
+
+%% Unwrap the MFA gen_server puts into exit reasons.
+rc(Pid, {Reason, {gen_server, call, _}}) ->
+ rc(Pid, Reason);
+
+rc(Pid, timeout) ->
+ log("ct_gen_conn",
+ "Connection process ~w not responding. Killing now!",
+ [Pid]),
+ exit(Pid, kill),
+ forced_termination;
+
+rc(_, Reason) ->
+ rc(Reason).
+
+return(From, Result) ->
+ gen_server:reply(From, Result).
+
+%% ===========================================================================
+%% gen_server callbacks
-init_gen(Parent,Opts) ->
- process_flag(trap_exit,true),
+%% init/1
+
+init(#gen_opts{callback = Mod,
+ name = Name,
+ address = Addr,
+ init_data = InitData}
+ = Opts) ->
+ process_flag(trap_exit, true),
ct_util:mark_process(),
- put(silent,false),
- try (Opts#gen_opts.callback):init(Opts#gen_opts.name,
- Opts#gen_opts.address,
- Opts#gen_opts.init_data) of
- {ok,ConnPid,State} when is_pid(ConnPid) ->
- link(ConnPid),
- put(conn_pid,ConnPid),
- CtUtilServer = whereis(ct_util_server),
- link(CtUtilServer),
- ct_util:register_connection(Opts#gen_opts.name,
- Opts#gen_opts.address,
- Opts#gen_opts.callback,
- self()),
- Parent ! {connected,self()},
- loop(Opts#gen_opts{conn_pid=ConnPid,
- cb_state=State,
- ct_util_server=CtUtilServer});
- {error,Reason} ->
- Parent ! {{error,Reason},self()}
+ put(silent, false),
+ try Mod:init(Name, Addr, InitData) of
+ {ok, ConnPid, State} when is_pid(ConnPid) ->
+ link(ConnPid),
+ put(conn_pid, ConnPid),
+ SrvPid = whereis(ct_util_server),
+ link(SrvPid),
+ ct_util:register_connection(Name, Addr, Mod, self()),
+ {ok, Opts#gen_opts{conn_pid = ConnPid,
+ cb_state = State,
+ ct_util_server = SrvPid}};
+ {error, Reason} ->
+ {stop, {shutdown, Reason}}
catch
- throw:{error,Reason} ->
- Parent ! {{error,Reason},self()}
+ C: Reason when C /= error ->
+ {stop, {shutdown, Reason}}
end.
-loop(Opts) ->
- receive
- {'EXIT',Pid,Reason} when Pid==Opts#gen_opts.conn_pid ->
- case Opts#gen_opts.reconnect of
- true ->
- log("Connection down!\nOpening new!",
- "Reason: ~tp\nAddress: ~tp\n",
- [Reason,Opts#gen_opts.address]),
- case reconnect(Opts) of
- {ok, NewPid, NewState} ->
- link(NewPid),
- put(conn_pid,NewPid),
- loop(Opts#gen_opts{conn_pid=NewPid,
- cb_state=NewState});
- Error ->
- ct_util:unregister_connection(self()),
- log("Reconnect failed. Giving up!",
- "Reason: ~tp\n",
- [Error])
- end;
- false ->
- ct_util:unregister_connection(self()),
- log("Connection closed!","Reason: ~tp\n",[Reason])
- end;
- {'EXIT',Pid,Reason} ->
- case Opts#gen_opts.ct_util_server of
- Pid ->
- exit(Reason);
- _ ->
- loop(Opts)
- end;
- {stop, From} ->
- ct_util:unregister_connection(self()),
- ConnPid = Opts#gen_opts.conn_pid,
- unlink(ConnPid),
- (Opts#gen_opts.callback):terminate(ConnPid,Opts#gen_opts.cb_state),
- return(From,ok),
- ok;
- {{retry,{Error,_Name,CPid,_Msg}}, From} when
- CPid == Opts#gen_opts.conn_pid ->
- %% only retry if failure is because of a reconnection
- Return = case Error of
- {error,_} -> Error;
- Reason -> {error,Reason}
- end,
- return(From, Return),
- loop(Opts);
- {{retry,{_Error,_Name,_CPid,Msg}}, From} ->
- log("Rerunning command","Connection reestablished. "
- "Rerunning command...",[]),
- {Return,NewState} =
- (Opts#gen_opts.callback):handle_msg(Msg,Opts#gen_opts.cb_state),
- return(From, Return),
- loop(Opts#gen_opts{cb_state=NewState});
- {get_conn_pid, From} ->
- return(From, Opts#gen_opts.conn_pid),
- loop(Opts);
- {Msg, From={Pid,_Ref}} when is_pid(Pid), Opts#gen_opts.old==true ->
- {Return,NewState} =
- (Opts#gen_opts.callback):handle_msg(Msg,Opts#gen_opts.cb_state),
- return(From, Return),
- loop(Opts#gen_opts{cb_state=NewState});
- {Msg,From={Pid,_Ref}} when is_pid(Pid) ->
- case (Opts#gen_opts.callback):handle_msg(Msg,From,
- Opts#gen_opts.cb_state) of
- {reply,Reply,NewState} ->
- return(From,Reply),
- loop(Opts#gen_opts{cb_state=NewState});
- {noreply,NewState} ->
- loop(Opts#gen_opts{cb_state=NewState});
- {stop,Reply,NewState} ->
- ct_util:unregister_connection(self()),
- ConnPid = Opts#gen_opts.conn_pid,
- unlink(ConnPid),
- (Opts#gen_opts.callback):terminate(ConnPid,NewState),
- return(From,Reply)
- end;
- Msg when Opts#gen_opts.forward==true ->
- case (Opts#gen_opts.callback):handle_msg(Msg,
- Opts#gen_opts.cb_state) of
- {noreply,NewState} ->
- loop(Opts#gen_opts{cb_state=NewState});
- {stop,NewState} ->
- ct_util:unregister_connection(self()),
- ConnPid = Opts#gen_opts.conn_pid,
- unlink(ConnPid),
- (Opts#gen_opts.callback):terminate(ConnPid,NewState)
- end
+%% handle_call/2
+
+handle_call(get_conn_pid, _From, #gen_opts{conn_pid = Pid} = Opts) ->
+ {reply, Pid, Opts};
+
+handle_call(stop, _From, Opts) ->
+ {stop, normal, ok, Opts};
+
+%% Only retry if failure is because of a reconnection.
+handle_call({retry, {Error, _Name, ConnPid, _Msg}},
+ _From,
+ #gen_opts{conn_pid = ConnPid}
+ = Opts) ->
+ {reply, error_rc(Error), Opts};
+
+handle_call({retry, {_Error, _Name, _CPid, Msg}},
+ _From,
+ #gen_opts{callback = Mod,
+ cb_state = State}
+ = Opts) ->
+ log("Rerunning command","Connection reestablished. "
+ "Rerunning command...",
+ []),
+ {Reply, NewState} = Mod:handle_msg(Msg, State),
+ {reply, Reply, Opts#gen_opts{cb_state = NewState}};
+
+handle_call(Msg, _From, #gen_opts{old = true,
+ callback = Mod,
+ cb_state = State}
+ = Opts) ->
+ {Reply, NewState} = Mod:handle_msg(Msg, State),
+ {reply, Reply, Opts#gen_opts{cb_state = NewState}};
+
+handle_call(Msg, From, #gen_opts{callback = Mod,
+ cb_state = State}
+ = Opts) ->
+ case Mod:handle_msg(Msg, From, State) of
+ {reply, Reply, NewState} ->
+ {reply, Reply, Opts#gen_opts{cb_state = NewState}};
+ {noreply, NewState} ->
+ {noreply, Opts#gen_opts{cb_state = NewState}};
+ {stop, Reply, NewState} ->
+ {stop, normal, Reply, Opts#gen_opts{cb_state = NewState}}
end.
-reconnect(Opts) ->
- (Opts#gen_opts.callback):reconnect(Opts#gen_opts.address,
- Opts#gen_opts.cb_state).
+%% handle_cast/2
+
+handle_cast(_, Opts) ->
+ {noreply, Opts}.
+
+%% handle_info/2
+
+handle_info({'EXIT', Pid, Reason},
+ #gen_opts{reconnect = true,
+ conn_pid = Pid,
+ address = Addr}
+ = Opts) ->
+ log("Connection down!\nOpening new!",
+ "Reason: ~tp\nAddress: ~tp\n",
+ [Reason, Addr]),
+ case reconnect(Opts) of
+ {ok, NewPid, NewState} ->
+ link(NewPid),
+ put(conn_pid, NewPid),
+ {noreply, Opts#gen_opts{conn_pid = NewPid,
+ cb_state = NewState}};
+ Error ->
+ log("Reconnect failed. Giving up!",
+ "Reason: ~tp\n",
+ [Error]),
+ {stop, normal, Opts}
+ end;
+
+handle_info({'EXIT', Pid, Reason},
+ #gen_opts{reconnect = false,
+ conn_pid = Pid}
+ = Opts) ->
+ log("Connection closed!", "Reason: ~tp\n", [Reason]),
+ {stop, normal, Opts};
+
+handle_info({'EXIT', Pid, Reason},
+ #gen_opts{ct_util_server = Pid}
+ = Opts) ->
+ {stop, {shutdown, Reason}, Opts};
+
+handle_info(Msg, #gen_opts{forward = true,
+ callback = Mod,
+ cb_state = State}
+ = Opts) ->
+ case Mod:handle_msg(Msg, State) of
+ {noreply, NewState} ->
+ {noreply, Opts#gen_opts{cb_state = NewState}};
+ {stop, NewState} ->
+ {stop, normal, Opts#gen_opts{cb_state = NewState}}
+ end;
+
+handle_info(_, #gen_opts{} = Opts) ->
+ {noreply, Opts}.
+
+%% code_change/2
+
+code_change(_Vsn, State, _Extra) ->
+ {ok, State}.
+
+%% terminate/2
+
+%% Cleanup is only in this case since ct_util also cleans up and
+%% expects us not to, which is at least an odd expectation.
+terminate(normal, #gen_opts{callback = Mod,
+ conn_pid = Pid,
+ cb_state = State}) ->
+ ct_util:unregister_connection(self()),
+ unlink(Pid),
+ Mod:terminate(Pid, State);
+
+terminate(_, #gen_opts{}) ->
+ ok.
+
+%% ===========================================================================
+
+error_rc({error, _} = T) -> T;
+error_rc(Reason) -> {error, Reason}.
+reconnect(#gen_opts{callback = Mod,
+ address = Addr,
+ cb_state = State}) ->
+ Mod:reconnect(Addr, State).
-log(Func,Args) ->
+log(Func, Args) ->
case get(silent) of
- true when not ?dbg->
- ok;
- _ ->
- apply(ct_logs,Func,Args)
+ true when not ?dbg ->
+ ok;
+ _ ->
+ apply(ct_logs, Func, Args)
end.
--
2.26.2