File 0013-turtle_subscriber-add-no_ack-support.patch of Package turtle
From aff0ce374922ed61655a898b657075723d372f37 Mon Sep 17 00:00:00 2001
From: Andrey Klymchuk <a.klymchuk@betinvest.com>
Date: Wed, 5 Sep 2018 17:36:39 +0300
Subject: [PATCH 3/3] turtle_subscriber: add no_ack support
Signed-off-by: Led <ledest@gmail.com>
---
src/turtle_subscriber.erl | 66 ++++++++++++++++++++++++++---------------------
1 file changed, 37 insertions(+), 29 deletions(-)
diff --git a/src/turtle_subscriber.erl b/src/turtle_subscriber.erl
index 30c71a1..064c402 100644
--- a/src/turtle_subscriber.erl
+++ b/src/turtle_subscriber.erl
@@ -31,6 +31,7 @@
channel :: undefined | pid(),
monitor :: reference(),
consumer_tag,
+ no_ack = #'basic.consume'{}#'basic.consume'.no_ack :: boolean(),
mode = single
}).
@@ -55,7 +56,8 @@ init([#{
ok = turtle:qos(Ch, Conf),
ok = amqp_channel:register_return_handler(Ch, self()),
ok = turtle:declare(Ch, Decls, #{ passive => Passive }),
- {ok, Tag} = turtle:consume(Ch, Queue),
+ NoAck = maps:get(no_ack, Conf, #state{}#state.no_ack),
+ {ok, Tag} = turtle:consume(Ch, Queue, NoAck),
Mode = mode(Conf),
{ok, #state {
consumer_tag = Tag,
@@ -66,6 +68,7 @@ init([#{
monitor = MRef,
conn_name = ConnName,
name = Name,
+ no_ack = NoAck,
mode = Mode
}}.
@@ -151,17 +154,13 @@ handle_deliver_bulk({#'basic.deliver' {delivery_tag = DTag, routing_key = Key},
reply_to = ReplyTo }} = Content},
#state {
invoke = Fun, invoke_state = IState,
- channel = Channel,
conn_name = CN,
name = N } = State) ->
S = erlang:monotonic_time(),
Tag = {DTag, ReplyTo, CorrID},
try handle_message(Tag, Fun, Key, Content, IState) of
- {[], S2} ->
- E = erlang:monotonic_time(),
- exometer:update([CN, N, msgs], 1),
- exometer:update([CN, N, latency],
- erlang:convert_time_unit(E-S, native, milli_seconds)),
+ {Cmds, S2} when Cmds =:= []; Cmds =:= ok ->
+ exometer_update_latency(CN, N, S),
{noreply, State#state { invoke_state = S2 }};
{Cmds, S2} when is_list(Cmds) ->
handle_commands(S, Cmds, State#state { invoke_state = S2 })
@@ -170,8 +169,7 @@ handle_deliver_bulk({#'basic.deliver' {delivery_tag = DTag, routing_key = Key},
lager:error("Handler function crashed: {~p, ~p}, stack: ~p, content: ~p",
[Class, Error, erlang:get_stacktrace(), format_amqp_msg(Content)]),
lager:error("Mailbox size ~p", [erlang:process_info(self(), message_queue_len)]),
- ok = amqp_channel:call(Channel, #'basic.reject' { delivery_tag = Tag, requeue = false }),
- {stop, {Class, Error}, State}
+ handle_exception(Class, Error, DTag, State)
end.
handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key},
@@ -179,7 +177,7 @@ handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}
props = #'P_basic' {
correlation_id = CorrID,
reply_to = ReplyTo }} = Content},
- #state { invoke = Fun, invoke_state = IState,channel = Channel } = State) ->
+ #state { invoke = Fun, invoke_state = IState } = State) ->
S = erlang:monotonic_time(),
Tag = {DTag, ReplyTo, CorrID},
try
@@ -192,7 +190,10 @@ handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}
{reject, IState2} -> {[{reject, Tag}], IState2};
{remove, IState2} -> {[{remove, Tag}], IState2};
{stop, Reason, IState2} -> {[{{stop, Reason}, Tag}], IState2};
- {ok, IState2} -> {[], IState2}
+ {ok, IState2} ->
+ #state { conn_name = CN, name = N } = State,
+ exometer_update_latency(CN, N, S),
+ {[], IState2}
end,
handle_commands(S, Cmds, State#state { invoke_state = S2 })
catch
@@ -200,34 +201,40 @@ handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}
lager:error("Handler function crashed: {~p, ~p}, stack: ~p, content: ~p",
[Class, Error, erlang:get_stacktrace(), format_amqp_msg(Content)]),
lager:error("Mailbox size ~p", [erlang:process_info(self(), message_queue_len)]),
- ok = amqp_channel:call(Channel, #'basic.reject' { delivery_tag = DTag, requeue = false }),
- {stop, {Class, Error}, State}
+ handle_exception(Class, Error, DTag, State)
end.
+handle_exception(Class, Error, _DTag, #state{ no_ack = true } = State) ->
+ {stop, {Class, Error}, State};
+handle_exception(Class, Error, DTag, #state{ channel = Ch } = State) ->
+ amqp_channel:call(Ch, #'basic.reject' { delivery_tag = DTag, requeue = false }),
+ {stop, {Class, Error}, State}.
+
handle_commands(_S, [], State) ->
{noreply, State};
+handle_commands(S, [{reply, Tag, CType, Msg} | Next],
+ #state { channel = Channel, conn_name = CN, name = N, no_ack = true } = State) ->
+ exometer_update_latency(CN, N, S),
+ reply(Channel, Tag, CType, Msg),
+ handle_commands(S, Next, State);
+handle_commands(_S, [{stop, Reason, _Tag} | _Next], #state { no_ack = true } = State) ->
+ {stop, Reason, State};
+handle_commands(S, [C | Next], #state { no_ack = true } = State) ->
+ lager:warning("Unexpected cmd ~p in no_ack mode is ignored", [C]),
+ handle_commands(S, Next, State);
handle_commands(S, [C | Next],
#state { channel = Channel, conn_name = CN, name = N } = State) ->
case C of
{ack, Tag} ->
- E = erlang:monotonic_time(),
- exometer:update([CN, N, msgs], 1),
- exometer:update([CN, N, latency],
- erlang:convert_time_unit(E-S, native, milli_seconds)),
+ exometer_update_latency(CN, N, S),
ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }),
handle_commands(S, Next, State);
{bulk_ack, Tag} ->
- E = erlang:monotonic_time(),
- exometer:update([CN, N, msgs], 1),
- exometer:update([CN, N, latency],
- erlang:convert_time_unit(E-S, native, milli_seconds)),
+ exometer_update_latency(CN, N, S),
ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag), multiple = true }),
handle_commands(S, Next, State);
{bulk_nack, Tag} ->
- E = erlang:monotonic_time(),
- exometer:update([CN, N, msgs], 1),
- exometer:update([CN, N, latency],
- erlang:convert_time_unit(E-S, native, milli_seconds)),
+ exometer_update_latency(CN, N, S),
ok = amqp_channel:cast(Channel, #'basic.nack' { delivery_tag = delivery_tag(Tag), multiple = true }),
handle_commands(S, Next, State);
{reject, Tag} ->
@@ -241,10 +248,7 @@ handle_commands(S, [C | Next],
#'basic.reject' { delivery_tag = delivery_tag(Tag), requeue = false}),
handle_commands(S, Next, State);
{reply, Tag, CType, Msg} ->
- E = erlang:monotonic_time(),
- exometer:update([CN, N, msgs], 1),
- exometer:update([CN, N, latency],
- erlang:convert_time_unit(E-S, native, milli_seconds)),
+ exometer_update_latency(CN, N, S),
reply(Channel, Tag, CType, Msg),
ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }),
handle_commands(S, Next, State);
@@ -362,3 +366,7 @@ shutdown_process_commands(_S, Cmds, State, Reason) ->
[length(Cmds), Reason]),
State.
+exometer_update_latency(CN, N, S) ->
+ L = erlang:convert_time_unit(erlang:monotonic_time() - S, native, milli_seconds),
+ exometer:update([CN, N, msgs], 1),
+ exometer:update([CN, N, latency], L).
--
2.16.4