File 0035-turtle_subscriber-can-use-exchange-from-state-for-RP.patch of Package turtle

From 97a68d44e5197d9915c1febe14d86b9e5c8aee94 Mon Sep 17 00:00:00 2001
From: Led <ledest@gmail.com>
Date: Fri, 8 Nov 2019 14:58:01 +0200
Subject: [PATCH] turtle_subscriber: can use exchange from state for RPC reply

---
 src/turtle_subscriber.erl | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

diff --git a/src/turtle_subscriber.erl b/src/turtle_subscriber.erl
index 92b458e..194a7de 100644
--- a/src/turtle_subscriber.erl
+++ b/src/turtle_subscriber.erl
@@ -157,7 +157,7 @@ handle_deliver_simple({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}
     Tag = {DTag, ReplyTo, CorrID},
     try handle_message(Tag, Key, Content, State) of
         {reply, CType, Msg, IState} ->
-            reply(Channel, Tag, CType, Msg),
+            reply(Channel, reply_exchange(IState), Tag, CType, Msg),
             {noreply, State#state { invoke_state = IState }};
         {stop, Reason, IState} ->
             {stop, Reason, State#state { invoke_state = IState }};
@@ -248,7 +248,7 @@ handle_commands(S, [C | Next], #state { channel = Channel, conn_name = CN, name
             handle_commands(S, Next, State);
         {reply, Tag, CType, Msg} ->
             exometer_update_latency(CN, N, S),
-            reply(Channel, Tag, CType, Msg),
+            reply(Channel, reply_exchange(State#state.invoke_state), Tag, CType, Msg),
             ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }),
             handle_commands(S, Next, State);
         {{stop, Reason}, Tag} ->
@@ -293,18 +293,21 @@ invoke_state(_) -> init.
 handle_info(#{ handle_info := Handler }) -> Handler;
 handle_info(_) -> undefined.
 
-reply(_Ch, {_Tag, undefined, _CorrID}, _CType, _Msg) ->
+reply(_Ch, _Exchange, {_Tag, undefined, _CorrID}, _CType, _Msg) ->
     lager:warning("Replying to target with no reply-to queue defined"),
     ok;
-reply(Ch, {_Tag, ReplyTo, CorrID}, CType, Msg) ->
+reply(Ch, Exchange, {_Tag, ReplyTo, CorrID}, CType, Msg) ->
     Publish = #'basic.publish' {
-        exchange = <<>>,
+        exchange = Exchange,
         routing_key = ReplyTo
     },
     Props = #'P_basic' { content_type = CType, correlation_id = CorrID },
     AMQPMsg = #amqp_msg { props = Props, payload = Msg},
     amqp_channel:cast(Ch, Publish, AMQPMsg).
 
+reply_exchange(#{reply_exchange := E}) when is_binary(E) -> E;
+reply_exchange(_IState) -> <<>>.
+
 await_cancel_ok() ->
     receive
        #'basic.cancel_ok'{} ->
-- 
2.16.4

openSUSE Build Service is sponsored by