Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:20
turtle
0013-turtle_subscriber-add-no_ack-support.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 0013-turtle_subscriber-add-no_ack-support.patch of Package turtle
From aff0ce374922ed61655a898b657075723d372f37 Mon Sep 17 00:00:00 2001 From: Andrey Klymchuk <a.klymchuk@betinvest.com> Date: Wed, 5 Sep 2018 17:36:39 +0300 Subject: [PATCH 3/3] turtle_subscriber: add no_ack support Signed-off-by: Led <ledest@gmail.com> --- src/turtle_subscriber.erl | 66 ++++++++++++++++++++++++++--------------------- 1 file changed, 37 insertions(+), 29 deletions(-) diff --git a/src/turtle_subscriber.erl b/src/turtle_subscriber.erl index 30c71a1..064c402 100644 --- a/src/turtle_subscriber.erl +++ b/src/turtle_subscriber.erl @@ -31,6 +31,7 @@ channel :: undefined | pid(), monitor :: reference(), consumer_tag, + no_ack = #'basic.consume'{}#'basic.consume'.no_ack :: boolean(), mode = single }). @@ -55,7 +56,8 @@ init([#{ ok = turtle:qos(Ch, Conf), ok = amqp_channel:register_return_handler(Ch, self()), ok = turtle:declare(Ch, Decls, #{ passive => Passive }), - {ok, Tag} = turtle:consume(Ch, Queue), + NoAck = maps:get(no_ack, Conf, #state{}#state.no_ack), + {ok, Tag} = turtle:consume(Ch, Queue, NoAck), Mode = mode(Conf), {ok, #state { consumer_tag = Tag, @@ -66,6 +68,7 @@ init([#{ monitor = MRef, conn_name = ConnName, name = Name, + no_ack = NoAck, mode = Mode }}. @@ -151,17 +154,13 @@ handle_deliver_bulk({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}, reply_to = ReplyTo }} = Content}, #state { invoke = Fun, invoke_state = IState, - channel = Channel, conn_name = CN, name = N } = State) -> S = erlang:monotonic_time(), Tag = {DTag, ReplyTo, CorrID}, try handle_message(Tag, Fun, Key, Content, IState) of - {[], S2} -> - E = erlang:monotonic_time(), - exometer:update([CN, N, msgs], 1), - exometer:update([CN, N, latency], - erlang:convert_time_unit(E-S, native, milli_seconds)), + {Cmds, S2} when Cmds =:= []; Cmds =:= ok -> + exometer_update_latency(CN, N, S), {noreply, State#state { invoke_state = S2 }}; {Cmds, S2} when is_list(Cmds) -> handle_commands(S, Cmds, State#state { invoke_state = S2 }) @@ -170,8 +169,7 @@ handle_deliver_bulk({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}, lager:error("Handler function crashed: {~p, ~p}, stack: ~p, content: ~p", [Class, Error, erlang:get_stacktrace(), format_amqp_msg(Content)]), lager:error("Mailbox size ~p", [erlang:process_info(self(), message_queue_len)]), - ok = amqp_channel:call(Channel, #'basic.reject' { delivery_tag = Tag, requeue = false }), - {stop, {Class, Error}, State} + handle_exception(Class, Error, DTag, State) end. handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}, @@ -179,7 +177,7 @@ handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key} props = #'P_basic' { correlation_id = CorrID, reply_to = ReplyTo }} = Content}, - #state { invoke = Fun, invoke_state = IState,channel = Channel } = State) -> + #state { invoke = Fun, invoke_state = IState } = State) -> S = erlang:monotonic_time(), Tag = {DTag, ReplyTo, CorrID}, try @@ -192,7 +190,10 @@ handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key} {reject, IState2} -> {[{reject, Tag}], IState2}; {remove, IState2} -> {[{remove, Tag}], IState2}; {stop, Reason, IState2} -> {[{{stop, Reason}, Tag}], IState2}; - {ok, IState2} -> {[], IState2} + {ok, IState2} -> + #state { conn_name = CN, name = N } = State, + exometer_update_latency(CN, N, S), + {[], IState2} end, handle_commands(S, Cmds, State#state { invoke_state = S2 }) catch @@ -200,34 +201,40 @@ handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key} lager:error("Handler function crashed: {~p, ~p}, stack: ~p, content: ~p", [Class, Error, erlang:get_stacktrace(), format_amqp_msg(Content)]), lager:error("Mailbox size ~p", [erlang:process_info(self(), message_queue_len)]), - ok = amqp_channel:call(Channel, #'basic.reject' { delivery_tag = DTag, requeue = false }), - {stop, {Class, Error}, State} + handle_exception(Class, Error, DTag, State) end. +handle_exception(Class, Error, _DTag, #state{ no_ack = true } = State) -> + {stop, {Class, Error}, State}; +handle_exception(Class, Error, DTag, #state{ channel = Ch } = State) -> + amqp_channel:call(Ch, #'basic.reject' { delivery_tag = DTag, requeue = false }), + {stop, {Class, Error}, State}. + handle_commands(_S, [], State) -> {noreply, State}; +handle_commands(S, [{reply, Tag, CType, Msg} | Next], + #state { channel = Channel, conn_name = CN, name = N, no_ack = true } = State) -> + exometer_update_latency(CN, N, S), + reply(Channel, Tag, CType, Msg), + handle_commands(S, Next, State); +handle_commands(_S, [{stop, Reason, _Tag} | _Next], #state { no_ack = true } = State) -> + {stop, Reason, State}; +handle_commands(S, [C | Next], #state { no_ack = true } = State) -> + lager:warning("Unexpected cmd ~p in no_ack mode is ignored", [C]), + handle_commands(S, Next, State); handle_commands(S, [C | Next], #state { channel = Channel, conn_name = CN, name = N } = State) -> case C of {ack, Tag} -> - E = erlang:monotonic_time(), - exometer:update([CN, N, msgs], 1), - exometer:update([CN, N, latency], - erlang:convert_time_unit(E-S, native, milli_seconds)), + exometer_update_latency(CN, N, S), ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }), handle_commands(S, Next, State); {bulk_ack, Tag} -> - E = erlang:monotonic_time(), - exometer:update([CN, N, msgs], 1), - exometer:update([CN, N, latency], - erlang:convert_time_unit(E-S, native, milli_seconds)), + exometer_update_latency(CN, N, S), ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag), multiple = true }), handle_commands(S, Next, State); {bulk_nack, Tag} -> - E = erlang:monotonic_time(), - exometer:update([CN, N, msgs], 1), - exometer:update([CN, N, latency], - erlang:convert_time_unit(E-S, native, milli_seconds)), + exometer_update_latency(CN, N, S), ok = amqp_channel:cast(Channel, #'basic.nack' { delivery_tag = delivery_tag(Tag), multiple = true }), handle_commands(S, Next, State); {reject, Tag} -> @@ -241,10 +248,7 @@ handle_commands(S, [C | Next], #'basic.reject' { delivery_tag = delivery_tag(Tag), requeue = false}), handle_commands(S, Next, State); {reply, Tag, CType, Msg} -> - E = erlang:monotonic_time(), - exometer:update([CN, N, msgs], 1), - exometer:update([CN, N, latency], - erlang:convert_time_unit(E-S, native, milli_seconds)), + exometer_update_latency(CN, N, S), reply(Channel, Tag, CType, Msg), ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }), handle_commands(S, Next, State); @@ -362,3 +366,7 @@ shutdown_process_commands(_S, Cmds, State, Reason) -> [length(Cmds), Reason]), State. +exometer_update_latency(CN, N, S) -> + L = erlang:convert_time_unit(erlang:monotonic_time() - S, native, milli_seconds), + exometer:update([CN, N, msgs], 1), + exometer:update([CN, N, latency], L). -- 2.16.4
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor