Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:20
turtle
0035-turtle_subscriber-can-use-exchange-from-st...
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 0035-turtle_subscriber-can-use-exchange-from-state-for-RP.patch of Package turtle
From 97a68d44e5197d9915c1febe14d86b9e5c8aee94 Mon Sep 17 00:00:00 2001 From: Led <ledest@gmail.com> Date: Fri, 8 Nov 2019 14:58:01 +0200 Subject: [PATCH] turtle_subscriber: can use exchange from state for RPC reply --- src/turtle_subscriber.erl | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/turtle_subscriber.erl b/src/turtle_subscriber.erl index 92b458e..194a7de 100644 --- a/src/turtle_subscriber.erl +++ b/src/turtle_subscriber.erl @@ -157,7 +157,7 @@ handle_deliver_simple({#'basic.deliver' {delivery_tag = DTag, routing_key = Key} Tag = {DTag, ReplyTo, CorrID}, try handle_message(Tag, Key, Content, State) of {reply, CType, Msg, IState} -> - reply(Channel, Tag, CType, Msg), + reply(Channel, reply_exchange(IState), Tag, CType, Msg), {noreply, State#state { invoke_state = IState }}; {stop, Reason, IState} -> {stop, Reason, State#state { invoke_state = IState }}; @@ -248,7 +248,7 @@ handle_commands(S, [C | Next], #state { channel = Channel, conn_name = CN, name handle_commands(S, Next, State); {reply, Tag, CType, Msg} -> exometer_update_latency(CN, N, S), - reply(Channel, Tag, CType, Msg), + reply(Channel, reply_exchange(State#state.invoke_state), Tag, CType, Msg), ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }), handle_commands(S, Next, State); {{stop, Reason}, Tag} -> @@ -293,18 +293,21 @@ invoke_state(_) -> init. handle_info(#{ handle_info := Handler }) -> Handler; handle_info(_) -> undefined. -reply(_Ch, {_Tag, undefined, _CorrID}, _CType, _Msg) -> +reply(_Ch, _Exchange, {_Tag, undefined, _CorrID}, _CType, _Msg) -> lager:warning("Replying to target with no reply-to queue defined"), ok; -reply(Ch, {_Tag, ReplyTo, CorrID}, CType, Msg) -> +reply(Ch, Exchange, {_Tag, ReplyTo, CorrID}, CType, Msg) -> Publish = #'basic.publish' { - exchange = <<>>, + exchange = Exchange, routing_key = ReplyTo }, Props = #'P_basic' { content_type = CType, correlation_id = CorrID }, AMQPMsg = #amqp_msg { props = Props, payload = Msg}, amqp_channel:cast(Ch, Publish, AMQPMsg). +reply_exchange(#{reply_exchange := E}) when is_binary(E) -> E; +reply_exchange(_IState) -> <<>>. + await_cancel_ok() -> receive #'basic.cancel_ok'{} -> -- 2.16.4
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor