File 1272-Fix-multiple-ct_gen_conn-issues.patch of Package erlang

From 05b245d83bc544c8e5db209bb724d5cdc19562b3 Mon Sep 17 00:00:00 2001
From: Anders Svensson <anders@erlang.org>
Date: Thu, 8 Apr 2021 00:06:32 +0200
Subject: [PATCH 2/4] Fix multiple ct_gen_conn issues

Redo as a gen_server to simplify the mechanics (no need to reinvent that
wheel) and address the following problems:

- A call/3 reply from the process could be left in the caller's message
  queue after a timeout.
- Failure of a user's terminate callback caused a call result to not be
  sent.
- Unexpected messages accumulated in the message queue with option
  forward = false.
- Documentation and specs were out of sync with code. Redo the edoc specs
  as type specs.

Exit reasons have changed slightly, wrapping them in a shutdown tuple to
avoid gen_server logging expected exits as failures, but these are
mapped back by start/4 and call/3 since test code matches on the values.
(Code that monitors on the exit reason doesn't seem to care.)

do_within_time/2 (called from user callbacks in ct_telnet) is one source
of unexpected messages, which will be addressed in a subsequent commit.

Untabified for consistent indentation.
---
 lib/common_test/src/ct_gen_conn.erl | 762 ++++++++++++++++------------
 1 file changed, 429 insertions(+), 333 deletions(-)

diff --git a/lib/common_test/src/ct_gen_conn.erl b/lib/common_test/src/ct_gen_conn.erl
index f4f002614e..3059cb2b46 100644
--- a/lib/common_test/src/ct_gen_conn.erl
+++ b/lib/common_test/src/ct_gen_conn.erl
@@ -1,7 +1,7 @@
 %%
 %% %CopyrightBegin%
 %%
-%% Copyright Ericsson AB 2003-2018. All Rights Reserved.
+%% Copyright Ericsson AB 2003-2021. All Rights Reserved.
 %%
 %% Licensed under the Apache License, Version 2.0 (the "License");
 %% you may not use this file except in compliance with the License.
@@ -18,16 +18,39 @@
 %% %CopyrightEnd%
 %%
 
-%%% Generic connection owner process.
-%%%
-%%% -type handle() = pid(). A handle for using a connection implemented
-%%% with ct_gen_conn.erl.
+%%
+%% Generic connection owner process.
+%%
 
 -module(ct_gen_conn).
 
--export([start/4, stop/1, get_conn_pid/1, check_opts/1]).
--export([call/2, call/3, return/2, do_within_time/2]).
--export([log/3, start_log/1, cont_log/2, cont_log_no_timestamp/2, end_log/0]).
+-behaviour(gen_server).
+
+-export([start/4,
+         stop/1,
+         get_conn_pid/1]).
+
+-export([call/2,
+         call/3,
+         return/2,
+         do_within_time/2]).
+
+-export([log/3,
+         start_log/1,
+         cont_log/2,
+         cont_log_no_timestamp/2,
+         end_log/0]).
+
+%% gen_server callbacks
+-export([init/1,
+         handle_info/2,
+         handle_cast/2,
+         handle_call/3,
+         code_change/3,
+         terminate/2]).
+
+%% test
+-export([make_opts/1]).
 
 -ifdef(debug).
 -define(dbg,true).
@@ -35,149 +58,170 @@
 -define(dbg,false).
 -endif.
 
--record(gen_opts,{callback,
-		  name,
-		  address,
-		  init_data,
-		  reconnect    = true,
-		  forward      = false,
-		  use_existing = true,
-		  old          = false,
-		  conn_pid,
-		  cb_state,
-		  ct_util_server}).
-
-%%%-----------------------------------------------------------------
-%%% -spec start(Address,InitData,CallbackMod,Opts) ->
-%%%                                     {ok,Handle} | {error,Reason}
-%%%      Name = term()
-%%%      CallbackMod = atom()
-%%%      InitData = term()
-%%%      Address = term()
-%%%      Opts = [Opt]
-%%%      Opt = {name,Name} | {use_existing_connection,boolean()} |
-%%%            {reconnect,boolean()} | {forward_messages,boolean()}
-%%%
-%%% Open a connection and start the generic connection owner process.
-%%%
-%%% The CallbackMod is a specific callback module for
-%%% each type of connection (e.g. telnet, ftp,...). It must export the
-%%% function init/3 which takes the arguments
-%%% Name, Addresse) and
-%%% InitData and returna
-%%% {ok,ConnectionPid,State} or
-%%% {error,Reason}.
-%%%
-%%% If no name is given, the Name argument in init/3 will
-%%% have the value undefined.
-%%%
-%%% The callback modules must also export
-%%%
-%%% handle_msg(Msg,From,State) -> {reply,Reply,State} |
-%%%                               {noreply,State} |
-%%%                               {stop,Reply,State}
-%%% terminate(ConnectionPid,State) -> term()
-%%% close(Handle) -> term()
-%%%
-%%% The close/1 callback function is actually a callback
-%%% for ct_util, for closing registered connections when
-%%% ct_util_server is terminated. Handle is the Pid of
-%%% the ct_gen_conn process.
-%%%
-%%% If option reconnect is true, then the
-%%% callback must also export
-%%%
-%%% reconnect(Address,State) -> {ok,ConnectionPid,State}
-%%%
-%%% If option forward_messages is <ocde>true, then
-%%% the callback must also export
-%%%
-%%% handle_msg(Msg,State) -> {noreply,State} | {stop,State}
-%%%
-%%% An old interface still exists. This is used by ct_telnet, ct_ftp
-%%% and ct_ssh. The start function then has an explicit
-%%% Name argument, and no Opts argument. The
-%%% callback must export:
-%%%
-%%% init(Name,Address,InitData) -> {ok,ConnectionPid,State}
-%%% handle_msg(Msg,State) -> {Reply,State}
-%%% reconnect(Address,State) -> {ok,ConnectionPid,State}
-%%% terminate(ConnectionPid,State) -> term()
-%%% close(Handle) -> term()
-%%%
-start(Address,InitData,CallbackMod,Opts) when is_list(Opts) ->
-    do_start(Address,InitData,CallbackMod,Opts);
-start(Name,Address,InitData,CallbackMod) ->
-    do_start(Address,InitData,CallbackMod,[{name,Name},{old,true}]).
-
-%%%-----------------------------------------------------------------
-%%% -spec stop(Handle) -> ok
-%%%      Handle = handle()
-%%%
-%%% Close the connection and stop the process managing it.
+-type handle() :: pid(). %% connection-owning process spawned here
+
+-record(gen_opts, {callback  :: module(),
+                   name      :: ct:target_name(),
+                   address   :: term(),
+                   init_data :: term(),
+                   reconnect    = true  :: boolean(),
+                   forward      = false :: boolean(),
+                   use_existing = true  :: boolean(),
+                   old          = false :: boolean(),
+                   conn_pid       :: pid() | undefined,
+                   cb_state       :: term(),
+                   ct_util_server :: pid() | undefined}).
+
+%% ---------------------------------------------------------------------
+
+-spec start(Address, InitData, CallbackMod, [Opt])
+   -> {ok, handle()} | {error, Reason :: term()}
+ when Address :: term(),
+      InitData :: term(),
+      CallbackMod :: module(),
+      Opt :: {name, Name :: ct:target_name()}
+           | {use_existing_connection, boolean()}
+           | {reconnect, boolean()}
+           | {forward_messages, boolean()}
+      ;    (Name, Address, InitData, CallbackMod)
+   -> {ok, handle()} | {error, Reason :: term()}
+ when Name :: ct:target_name(),
+      Address :: term(),
+      InitData :: term(),
+      CallbackMod :: module().
+
+%% Open a connection and start the generic connection owner process.
+%%
+%% CallbackMod is a specific callback module for
+%% each type of connection (e.g. telnet, ftp, netconf). It must export:
+%%
+%%   init(Name, Address, InitData) -> {ok, ConnectionPid, State}
+%%                                  | {error,Reason}.
+%%
+%% Name defaults to undefined if unspecified.
+%%
+%% The callback modules must also export:
+%%
+%%   handle_msg(Msg, From, State) -> {reply, Reply, State}
+%%                                 | {noreply, State}
+%%                                 | {stop, Reply, State}
+%%   terminate(ConnectionPid, State) -> term()
+%%   close(Handle) -> term()
+%%
+%% A Reply of the form {retry, _} results in a new call to the server with
+%% the retry tuple as message.
+%%
+%% The close/1 callback function is actually a callback
+%% for ct_util, for closing registered connections when
+%% ct_util_server is terminated. Handle is the Pid of
+%% the ct_gen_conn process.
+%%
+%% If option reconnect is true, then the
+%% callback must also export:
+%%
+%%   reconnect(Address, State) -> {ok, ConnectionPid, State}
+%%
+%% If option forward_messages is true then
+%% the callback must also export:
+%%
+%%   handle_msg(Msg,State) -> {noreply,State}
+%%                          | {stop,State}
+%%
+%% An old interface still exists. This is used by ct_telnet, ct_ftp
+%% and ct_ssh. The start function then has an explicit
+%% Name argument, and no Opts argument.
+
+start(Address, InitData, CallbackMod, Opts) when is_list(Opts) ->
+    do_start(Address, InitData, CallbackMod, Opts);
+start(Name, Address, InitData, CallbackMod) ->
+    do_start(Address, InitData, CallbackMod, [{name, Name}, {old, true}]).
+
+%% ---------------------------------------------------------------------
+
+-spec stop(handle()) -> ok | {error, Reason :: term()}.
+
+%% Close the connection and stop the process managing it.
+
 stop(Handle) ->
-    call(Handle,stop,5000).
+    call(Handle, stop, 5000).
+
+%% ---------------------------------------------------------------------
+
+-spec get_conn_pid(handle()) -> pid().
+
+%% Return the connection pid associated with Handle
 
-%%%-----------------------------------------------------------------
-%%% -spec get_conn_pid(Handle) -> ok
-%%%      Handle = handle()
-%%%
-%%% Return the connection pid associated with Handle
 get_conn_pid(Handle) ->
-    call(Handle,get_conn_pid).
-
-%%%-----------------------------------------------------------------
-%%% -spec log(Heading,Format,Args) -> ok
-%%%
-%%% Log activities on the current connection (tool-internal use only).
-%%% See ct_logs:log/3
-log(Heading,Format,Args) ->
-    log(log,[Heading,Format,Args]).
-
-%%%-----------------------------------------------------------------
-%%% -spec start_log(Heading) -> ok
-%%%
-%%% Log activities on the current connection (tool-internal use only).
-%%% See ct_logs:start_log/1
+    call(Handle, get_conn_pid).
+
+%% ---------------------------------------------------------------------
+
+-spec log(Heading, Format, Args) -> ok
+ when Heading :: iodata(),
+      Format  :: io:format(),
+      Args    :: [term()].
+
+%% Log activities on the current connection.
+%% See ct_logs:log/3
+
+log(Heading, Format, Args) ->
+    log(log, [Heading, Format, Args]).
+
+%% ---------------------------------------------------------------------
+
+-spec start_log(Heading :: iodata()) -> ok.
+
+%% Log activities on the current connection.
+%% See ct_logs:start_log/1
+
 start_log(Heading) ->
-    log(start_log,[Heading]).
+    log(start_log, [Heading]).
+
+%% ---------------------------------------------------------------------
+
+-spec cont_log(Format, Args) -> ok
+ when Format :: io:format(),
+      Args :: [term()].
+
+%% Log activities on the current connection.
+%% See ct_logs:cont_log/2
 
-%%%-----------------------------------------------------------------
-%%% -spec cont_log(Format,Args) -> ok
-%%%
-%%% Log activities on the current connection (tool-internal use only).
-%%% See ct_logs:cont_log/2
 cont_log(Format,Args) ->
-    log(cont_log,[Format,Args]).
-
-%%%-----------------------------------------------------------------
-%%% -spec cont_log_no_timestamp(Format,Args) -> ok
-%%%
-%%% Log activities on the current connection (tool-internal use only).
-%%% See ct_logs:cont_log/2
-cont_log_no_timestamp(Format,Args) ->
-    log(cont_log_no_timestamp,[Format,Args]).
-
-%%%-----------------------------------------------------------------
-%%% -spec end_log() -> ok
-%%%
-%%% Log activities on the current connection (tool-internal use only).
-%%% See ct_logs:end_log/0
+    log(cont_log, [Format, Args]).
+
+%% ---------------------------------------------------------------------
+
+-spec cont_log_no_timestamp(Format, Args) -> ok
+ when Format :: io:format(),
+      Args :: [term()].
+
+%% Log activities on the current connection.
+%% See ct_logs:cont_log/2
+
+cont_log_no_timestamp(Format, Args) ->
+    log(cont_log_no_timestamp, [Format, Args]).
+
+%% ---------------------------------------------------------------------
+
+-spec end_log() -> ok.
+
+%% Log activities on the current connection.
+%% See ct_logs:end_log/0
+
 end_log() ->
-    log(end_log,[]).
-
-%%%-----------------------------------------------------------------
-%%% -spec do_within_time(Fun,Timeout) -> FunResult | {error,Reason}
-%%%      Fun = function()
-%%%      Timeout = integer()
-%%%
-%%% Execute a function within a limited time (tool-internal use only).
-%%%
-%%% Execute the given Fun, but interrupt if it takes
-%%% more than Timeout milliseconds.
-%%%
-%%% The execution is also interrupted if the connection is
-%%% closed.
+    log(end_log, []).
+
+%% ---------------------------------------------------------------------
+
+-spec do_within_time(Fun, Tmo)
+   -> Result
+ when Fun :: fun(),
+      Tmo :: non_neg_integer(),
+      Result :: term().
+
+%% Return the result of evaluating Fun, or interrupt after Tmo
+%% milliseconds or if the connection is closed.
+
 do_within_time(Fun,Timeout) ->
     Self = self(),
     Silent = get(silent),
@@ -211,214 +255,266 @@ do_within_time(Fun,Timeout) ->
 	    end
     end.
 
-%%%=================================================================
-%%% Internal functions
-do_start(Address,InitData,CallbackMod,Opts0) ->
-    Opts = check_opts(Opts0,#gen_opts{callback=CallbackMod,
-				      address=Address,
-				      init_data=InitData}),
-    case ct_util:does_connection_exist(Opts#gen_opts.name,
-				       Address,CallbackMod) of
-	{ok,Pid} when Opts#gen_opts.use_existing ->
-	    log("ct_gen_conn:start","Using existing connection!\n",[]),
-	    {ok,Pid};
-	{ok,Pid} when not Opts#gen_opts.use_existing ->
-	    {error,{connection_exists,Pid}};
-	false ->
-	    do_start(Opts)
+%% ===========================================================================
+
+do_start(Address, InitData, CallbackMod, OptsList) ->
+    #gen_opts{name = Name}
+        = Opts
+        = make_opts(OptsList, #gen_opts{callback  = CallbackMod,
+                                        address   = Address,
+                                        init_data = InitData}),
+    %% Testing for an existing connection is slightly broken as long
+    %% as calls to start aren't serialized: concurrent starts can both
+    %% end up in the final clause.
+    case ct_util:does_connection_exist(Name, Address, CallbackMod) of
+        {ok, _Pid} = Ok when Opts#gen_opts.use_existing ->
+            log("ct_gen_conn:start","Using existing connection!\n", []),
+            Ok;
+        {ok, Pid} when not Opts#gen_opts.use_existing ->
+            {error, {connection_exists, Pid}};
+        false ->
+            do_start(Opts)
     end.
 
 do_start(Opts) ->
-    Self = self(),
-    Pid = spawn(fun() -> init_gen(Self, Opts) end),
-    MRef = erlang:monitor(process,Pid),
-    receive
-	{connected,Pid} ->
-	    erlang:demonitor(MRef, [flush]),
-	    {ok,Pid};
-	{Error,Pid} ->
-	    receive {'DOWN',MRef,process,_,_} -> ok end,
-	    Error;
-	{'DOWN',MRef,process,_,Reason} ->
-	    log("ct_gen_conn:start",
-		"Connection process died: ~tp\n",
-		[Reason]),
-	    {error,{connection_process_died,Reason}}
+    try gen_server:start(?MODULE, Opts, []) of
+        {ok, _} = Ok    -> Ok;
+        {error, Reason} -> {error, rc(Reason)}
+    catch
+        exit: Reason ->
+            log("ct_gen_conn:start",
+                "Connection process died: ~tp\n",
+                [Reason]),
+            {error, {connection_process_died, Reason}}
     end.
 
-check_opts(Opts0) ->
-    check_opts(Opts0,#gen_opts{}).
-
-check_opts([{name,Name}|T],Opts) ->
-    check_opts(T,Opts#gen_opts{name=Name});
-check_opts([{reconnect,Bool}|T],Opts) ->
-    check_opts(T,Opts#gen_opts{reconnect=Bool});
-check_opts([{forward_messages,Bool}|T],Opts) ->
-    check_opts(T,Opts#gen_opts{forward=Bool});
-check_opts([{use_existing_connection,Bool}|T],Opts) ->
-    check_opts(T,Opts#gen_opts{use_existing=Bool});
-check_opts([{old,Bool}|T],Opts) ->
-    check_opts(T,Opts#gen_opts{old=Bool});
-check_opts([],Opts) ->
-    Opts.
+%% Unwrap a {shutdown, _} exit reason for backwards compatibility.
+rc({shutdown, Reason}) -> Reason;
+rc(T)                  -> T.
+
+make_opts(Opts) ->
+    make_opts(Opts, #gen_opts{}).
+
+make_opts(Opts, #gen_opts{} = Rec) ->
+    lists:foldl(fun opt/2, Rec, Opts).
+
+opt({name, Name}, Rec)                    -> Rec#gen_opts{name = Name};
+opt({reconnect, Bool}, Rec)               -> Rec#gen_opts{reconnect = Bool};
+opt({forward_messages, Bool}, Rec)        -> Rec#gen_opts{forward = Bool};
+opt({use_existing_connection, Bool}, Rec) -> Rec#gen_opts{use_existing = Bool};
+opt({old, Bool}, Rec)                     -> Rec#gen_opts{old = Bool}.
+
+%% ===========================================================================
 
 call(Pid, Msg) ->
     call(Pid, Msg, infinity).
 
-call(Pid, Msg, Timeout) ->
-    MRef = erlang:monitor(process,Pid),
-    Ref = make_ref(),
-    Pid ! {Msg,{self(),Ref}},
-    receive
-	{Ref, Result} ->
-	    erlang:demonitor(MRef, [flush]),
-	    case Result of
-		{retry,_Data} ->
-		    call(Pid,Result);
-		Other ->
-		    Other
-	    end;
-	{'DOWN',MRef,process,_,Reason}  ->
-	    {error,{process_down,Pid,Reason}}
-    after Timeout ->
-	    erlang:demonitor(MRef, [flush]),
-	    log("ct_gen_conn",
-		"Connection process ~w not responding. Killing now!",
-		[Pid]),
-	    exit(Pid, kill),
-	    {error,{process_down,Pid,forced_termination}}
+-spec call(Handle, Msg, Tmo)
+   -> term()
+ when Handle :: handle(),
+      Msg :: term(),
+      Tmo :: non_neg_integer()
+           | infinity.
+
+call(Pid, Msg, infinity = Tmo) ->
+    gen_call(Pid, Msg, Tmo);
+
+%% Spawn a middleman process if the call can timeout to avoid the
+%% possibilty of a reply being left in the caller's mailbox after a
+%% timeout.
+call(Pid, Msg, Tmo) ->
+    {_, MRef} = spawn_monitor(fun() -> exit(gen_call(Pid, Msg, Tmo)) end),
+    receive {'DOWN', MRef, process, _, RC} -> RC end.
+
+gen_call(Pid, Msg, Tmo) ->
+    try gen_server:call(Pid, Msg, Tmo) of
+        T -> retry(Pid, T, Tmo)
+    catch
+        exit: Reason -> {error, {process_down, Pid, rc(Pid, Reason)}}
     end.
 
-return({To,Ref},Result) ->
-    To ! {Ref, Result},
-    ok.
+retry(Pid, {retry, _} = T, Tmo) -> gen_call(Pid, T, Tmo);
+retry(_, T, _)                  -> T.
+
+%% Unwrap the MFA gen_server puts into exit reasons.
+rc(Pid, {Reason, {gen_server, call, _}}) ->
+    rc(Pid, Reason);
+
+rc(Pid, timeout) ->
+    log("ct_gen_conn",
+        "Connection process ~w not responding. Killing now!",
+        [Pid]),
+    exit(Pid, kill),
+    forced_termination;
+
+rc(_, Reason) ->
+    rc(Reason).
+
+return(From, Result) ->
+    gen_server:reply(From, Result).
+
+%% ===========================================================================
+%% gen_server callbacks
 
-init_gen(Parent,Opts) ->
-    process_flag(trap_exit,true),
+%% init/1
+
+init(#gen_opts{callback = Mod,
+               name = Name,
+               address = Addr,
+               init_data = InitData}
+     = Opts) ->
+    process_flag(trap_exit, true),
     ct_util:mark_process(),
-    put(silent,false),
-    try (Opts#gen_opts.callback):init(Opts#gen_opts.name,
-				      Opts#gen_opts.address,
-				      Opts#gen_opts.init_data) of
-	{ok,ConnPid,State} when is_pid(ConnPid) ->
-	    link(ConnPid),
-	    put(conn_pid,ConnPid),
-	    CtUtilServer = whereis(ct_util_server),
-	    link(CtUtilServer),
-	    ct_util:register_connection(Opts#gen_opts.name,
-					Opts#gen_opts.address,
-					Opts#gen_opts.callback,
-                                        self()),
-	    Parent ! {connected,self()},
-	    loop(Opts#gen_opts{conn_pid=ConnPid,
-			       cb_state=State,
-			       ct_util_server=CtUtilServer});
-	{error,Reason} ->
-	    Parent ! {{error,Reason},self()}
+    put(silent, false),
+    try Mod:init(Name, Addr, InitData) of
+        {ok, ConnPid, State} when is_pid(ConnPid) ->
+            link(ConnPid),
+            put(conn_pid, ConnPid),
+            SrvPid = whereis(ct_util_server),
+            link(SrvPid),
+            ct_util:register_connection(Name, Addr, Mod, self()),
+            {ok, Opts#gen_opts{conn_pid = ConnPid,
+                               cb_state = State,
+                               ct_util_server = SrvPid}};
+        {error, Reason} ->
+            {stop, {shutdown, Reason}}
     catch
-	throw:{error,Reason} ->
-	    Parent ! {{error,Reason},self()}
+        C: Reason when C /= error ->
+            {stop, {shutdown, Reason}}
     end.
 
-loop(Opts) ->
-    receive
-	{'EXIT',Pid,Reason} when Pid==Opts#gen_opts.conn_pid ->
-	    case Opts#gen_opts.reconnect of
-		true ->
-		    log("Connection down!\nOpening new!",
-			"Reason: ~tp\nAddress: ~tp\n",
-			[Reason,Opts#gen_opts.address]),
-		    case reconnect(Opts) of
-			{ok, NewPid, NewState} ->
-			    link(NewPid),
-			    put(conn_pid,NewPid),
-			    loop(Opts#gen_opts{conn_pid=NewPid,
-					       cb_state=NewState});			
-			Error ->
-			    ct_util:unregister_connection(self()),
-			    log("Reconnect failed. Giving up!",
-				"Reason: ~tp\n",
-				[Error])
-		    end;
-		false ->
-		    ct_util:unregister_connection(self()),
-		    log("Connection closed!","Reason: ~tp\n",[Reason])
-	    end;
-	{'EXIT',Pid,Reason} ->
-	    case Opts#gen_opts.ct_util_server of
-		Pid ->
-		    exit(Reason);
-		_ ->
-		    loop(Opts)
-	    end;
-	{stop, From} ->
-	    ct_util:unregister_connection(self()),
-            ConnPid = Opts#gen_opts.conn_pid,
-            unlink(ConnPid),
-	    (Opts#gen_opts.callback):terminate(ConnPid,Opts#gen_opts.cb_state),
-	    return(From,ok),
-	    ok;
-	{{retry,{Error,_Name,CPid,_Msg}}, From} when 
-	      CPid == Opts#gen_opts.conn_pid ->
-	    %% only retry if failure is because of a reconnection
-	    Return = case Error of
-			 {error,_} -> Error;
-			 Reason -> {error,Reason}
-		     end,
-	    return(From, Return),
-	    loop(Opts);
-	{{retry,{_Error,_Name,_CPid,Msg}}, From} ->
-	    log("Rerunning command","Connection reestablished. "
-		"Rerunning command...",[]),
-	    {Return,NewState} =
-		(Opts#gen_opts.callback):handle_msg(Msg,Opts#gen_opts.cb_state),
-	    return(From, Return),
-	    loop(Opts#gen_opts{cb_state=NewState});
-	{get_conn_pid, From} ->
-	    return(From, Opts#gen_opts.conn_pid),
-	    loop(Opts);
-	{Msg, From={Pid,_Ref}} when is_pid(Pid), Opts#gen_opts.old==true ->
-	    {Return,NewState} =
-		(Opts#gen_opts.callback):handle_msg(Msg,Opts#gen_opts.cb_state),
-	    return(From, Return),
-	    loop(Opts#gen_opts{cb_state=NewState});
-	{Msg,From={Pid,_Ref}} when is_pid(Pid) ->
-	    case (Opts#gen_opts.callback):handle_msg(Msg,From,
-						     Opts#gen_opts.cb_state) of
-		{reply,Reply,NewState} ->
-		    return(From,Reply),
-		    loop(Opts#gen_opts{cb_state=NewState});
-		{noreply,NewState} ->
-		    loop(Opts#gen_opts{cb_state=NewState});
-		{stop,Reply,NewState} ->
-		    ct_util:unregister_connection(self()),
-                    ConnPid = Opts#gen_opts.conn_pid,
-                    unlink(ConnPid),
-		    (Opts#gen_opts.callback):terminate(ConnPid,NewState),
-		    return(From,Reply)
-	    end;
-	Msg when Opts#gen_opts.forward==true ->
-	    case (Opts#gen_opts.callback):handle_msg(Msg,
-						     Opts#gen_opts.cb_state) of
-		{noreply,NewState} ->
-		    loop(Opts#gen_opts{cb_state=NewState});
-		{stop,NewState} ->
-		    ct_util:unregister_connection(self()),
-                    ConnPid = Opts#gen_opts.conn_pid,
-                    unlink(ConnPid),
-		    (Opts#gen_opts.callback):terminate(ConnPid,NewState)
-	    end
+%% handle_call/2
+
+handle_call(get_conn_pid, _From, #gen_opts{conn_pid = Pid} = Opts) ->
+    {reply, Pid, Opts};
+
+handle_call(stop, _From, Opts) ->
+    {stop, normal, ok, Opts};
+
+%% Only retry if failure is because of a reconnection.
+handle_call({retry, {Error, _Name, ConnPid, _Msg}},
+            _From,
+            #gen_opts{conn_pid = ConnPid}
+            = Opts) ->
+    {reply, error_rc(Error), Opts};
+
+handle_call({retry, {_Error, _Name, _CPid, Msg}},
+            _From,
+            #gen_opts{callback = Mod,
+                      cb_state = State}
+            = Opts) ->
+    log("Rerunning command","Connection reestablished. "
+        "Rerunning command...",
+        []),
+    {Reply, NewState} = Mod:handle_msg(Msg, State),
+    {reply, Reply, Opts#gen_opts{cb_state = NewState}};
+
+handle_call(Msg, _From, #gen_opts{old = true,
+                                  callback = Mod,
+                                  cb_state = State}
+                        = Opts) ->
+    {Reply, NewState} = Mod:handle_msg(Msg, State),
+    {reply, Reply, Opts#gen_opts{cb_state = NewState}};
+
+handle_call(Msg, From, #gen_opts{callback = Mod,
+                                  cb_state = State}
+                        = Opts) ->
+    case Mod:handle_msg(Msg, From, State) of
+        {reply, Reply, NewState} ->
+            {reply, Reply, Opts#gen_opts{cb_state = NewState}};
+        {noreply, NewState} ->
+            {noreply, Opts#gen_opts{cb_state = NewState}};
+        {stop, Reply, NewState} ->
+            {stop, normal, Reply, Opts#gen_opts{cb_state = NewState}}
     end.
 
-reconnect(Opts) ->
-    (Opts#gen_opts.callback):reconnect(Opts#gen_opts.address,
-				       Opts#gen_opts.cb_state).
+%% handle_cast/2
+
+handle_cast(_, Opts) ->
+    {noreply, Opts}.
+
+%% handle_info/2
+
+handle_info({'EXIT', Pid, Reason},
+            #gen_opts{reconnect = true,
+                      conn_pid = Pid,
+                      address = Addr}
+            = Opts) ->
+    log("Connection down!\nOpening new!",
+        "Reason: ~tp\nAddress: ~tp\n",
+        [Reason, Addr]),
+    case reconnect(Opts) of
+        {ok, NewPid, NewState} ->
+            link(NewPid),
+            put(conn_pid, NewPid),
+            {noreply, Opts#gen_opts{conn_pid = NewPid,
+                                    cb_state = NewState}};
+        Error ->
+            log("Reconnect failed. Giving up!",
+                "Reason: ~tp\n",
+                [Error]),
+            {stop, normal, Opts}
+    end;
+
+handle_info({'EXIT', Pid, Reason},
+            #gen_opts{reconnect = false,
+                      conn_pid = Pid}
+            = Opts) ->
+    log("Connection closed!", "Reason: ~tp\n", [Reason]),
+    {stop, normal, Opts};
+
+handle_info({'EXIT', Pid, Reason},
+            #gen_opts{ct_util_server = Pid}
+            = Opts) ->
+    {stop, {shutdown, Reason}, Opts};
+
+handle_info(Msg, #gen_opts{forward = true,
+                           callback = Mod,
+                           cb_state = State}
+                 = Opts) ->
+    case Mod:handle_msg(Msg, State) of
+        {noreply, NewState} ->
+            {noreply, Opts#gen_opts{cb_state = NewState}};
+        {stop, NewState} ->
+            {stop, normal, Opts#gen_opts{cb_state = NewState}}
+    end;
+
+handle_info(_, #gen_opts{} = Opts) ->
+    {noreply, Opts}.
+
+%% code_change/2
+
+code_change(_Vsn, State, _Extra) ->
+    {ok, State}.
+
+%% terminate/2
+
+%% Cleanup is only in this case since ct_util also cleans up and
+%% expects us not to, which is at least an odd expectation.
+terminate(normal, #gen_opts{callback = Mod,
+                            conn_pid = Pid,
+                            cb_state = State}) ->
+    ct_util:unregister_connection(self()),
+    unlink(Pid),
+    Mod:terminate(Pid, State);
+
+terminate(_, #gen_opts{}) ->
+    ok.
+
+%% ===========================================================================
+
+error_rc({error, _} = T) -> T;
+error_rc(Reason)         -> {error, Reason}.
 
+reconnect(#gen_opts{callback = Mod,
+                    address = Addr,
+                    cb_state = State}) ->
+    Mod:reconnect(Addr, State).
 
-log(Func,Args) ->
+log(Func, Args) ->
     case get(silent) of
-	true when not ?dbg->
-	    ok;
-	_ ->
-	    apply(ct_logs,Func,Args)
+        true when not ?dbg ->
+            ok;
+        _ ->
+            apply(ct_logs, Func, Args)
     end.
-- 
2.26.2

openSUSE Build Service is sponsored by