File 0021-no_ack-reimplemented-as-new-simple-mode.patch of Package turtle
From 9ccd54e9a1fdaf4d7ae0ec7e7db918e7ed6f25de Mon Sep 17 00:00:00 2001
From: Andrey Klymchuk <a.klymchuk@betinvest.com>
Date: Mon, 26 Nov 2018 15:09:50 +0200
Subject: [PATCH 1/6] no_ack reimplemented as new simple mode
---
src/turtle.erl | 10 +++---
src/turtle_subscriber.erl | 84 +++++++++++++++++++++++++----------------------
2 files changed, 51 insertions(+), 43 deletions(-)
diff --git a/src/turtle.erl b/src/turtle.erl
index 79b5de5..fa0a38c 100644
--- a/src/turtle.erl
+++ b/src/turtle.erl
@@ -290,14 +290,16 @@ await(publisher, Name, Timeout) ->
consume(Channel, #'basic.consume' {} = Consume) ->
#'basic.consume_ok' { consumer_tag = Tag } = amqp_channel:call(Channel, Consume),
{ok, Tag};
-consume(Channel, Queue) ->
- consume(Channel, #'basic.consume' { queue = Queue }).
+consume(Channel, Queue) -> consume(Channel, Queue, single).
%% @doc consume/3 starts consumption on a channel with default parameters
%% @end
%% @private
-consume(Channel, Queue, NoAck) when is_boolean(NoAck) ->
- consume(Channel, #'basic.consume' { queue = Queue, no_ack = NoAck }).
+consume(Channel, Queue, simple) ->
+ consume(Channel, #'basic.consume' { queue = Queue, no_ack = true });
+
+consume(Channel, Queue, Mode) when Mode =:= single; Mode =:= bulk ->
+ consume(Channel, #'basic.consume' { queue = Queue, no_ack = false }).
%% @doc cancel/2 stop consumption on a channel again.
diff --git a/src/turtle_subscriber.erl b/src/turtle_subscriber.erl
index 5d2f759..c6f8aeb 100644
--- a/src/turtle_subscriber.erl
+++ b/src/turtle_subscriber.erl
@@ -32,7 +32,7 @@
monitor :: reference(),
consumer_tag,
no_ack = #'basic.consume'{}#'basic.consume'.no_ack :: boolean(),
- mode = single
+ mode = single :: simple | single | bulk
}).
%% LIFETIME MAINTENANCE
@@ -56,9 +56,8 @@ init([#{
ok = turtle:qos(Ch, Conf),
ok = amqp_channel:register_return_handler(Ch, self()),
ok = turtle:declare(Ch, Decls, #{ passive => Passive }),
- NoAck = maps:get(no_ack, Conf, #state{}#state.no_ack),
- {ok, Tag} = turtle:consume(Ch, Queue, NoAck),
Mode = mode(Conf),
+ {ok, Tag} = turtle:consume(Ch, Queue, Mode),
{ok, #state {
consumer_tag = Tag,
invoke = Fun,
@@ -68,7 +67,6 @@ init([#{
monitor = MRef,
conn_name = ConnName,
name = Name,
- no_ack = NoAck,
mode = Mode
}}.
@@ -88,6 +86,8 @@ handle_info(#'basic.consume_ok'{}, State) ->
handle_info(#'basic.cancel_ok'{}, State) ->
lager:info("Consumption canceled"),
{stop, normal, shutdown(rabbitmq_gone, State)};
+handle_info({#'basic.deliver'{}, _Content} = Msg, #state { mode = simple } = State) ->
+ handle_deliver_simple(Msg, State);
handle_info({#'basic.deliver'{}, _Content} = Msg, #state { mode = single } = State) ->
handle_deliver_single(Msg, State);
handle_info({#'basic.deliver'{}, _Content} = Msg, #state { mode = bulk } = State) ->
@@ -112,6 +112,7 @@ handle_info(Info, #state { handle_info = HandleInfo, invoke_state = IState } = S
S = erlang:monotonic_time(),
try HandleInfo(Info, IState) of
{ok, IState2} -> {noreply, State#state { invoke_state = IState2 }};
+ {stop, Reason, IState2} -> {stop, Reason, State#state { invoke_state = IState2 }};
{Cmds, IState2} when is_list(Cmds) ->
handle_commands(S, Cmds, State#state { invoke_state = IState2 })
catch
@@ -147,20 +148,41 @@ code_change(_, State, _) ->
%% INTERNAL FUNCTIONS
%%
+handle_deliver_simple({#'basic.deliver' {delivery_tag = DTag, routing_key = Key},
+ #amqp_msg {
+ props = #'P_basic' {
+ correlation_id = CorrID,
+ reply_to = ReplyTo }} = Content},
+ #state {
+ channel = Channel, invoke = Fun, invoke_state = IState, mode = Mode} = State) ->
+ Tag = {DTag, ReplyTo, CorrID},
+ try handle_message(Tag, Fun, Mode, Key, Content, IState) of
+ {reply, CType, Msg, S2} ->
+ reply(Channel, Tag, CType, Msg),
+ {noreply, State#state { invoke_state = S2 }};
+ {stop, Reason, S2} ->
+ {stop, Reason, State#state { invoke_state = S2 }};
+ {_, S2} ->
+ {noreply, State#state { invoke_state = S2 }}
+ catch
+ Class:Error ->
+ lager:error("Handler function crashed: {~p, ~p}, stack: ~p, content: ~p",
+ [Class, Error, erlang:get_stacktrace(), format_amqp_msg(Content)]),
+ lager:error("Mailbox size ~p", [erlang:process_info(self(), message_queue_len)]),
+ handle_exception(Class, Error, DTag, State)
+ end.
+
handle_deliver_bulk({#'basic.deliver' {delivery_tag = DTag, routing_key = Key},
#amqp_msg {
props = #'P_basic' {
correlation_id = CorrID,
reply_to = ReplyTo }} = Content},
#state {
- invoke = Fun, invoke_state = IState,
- conn_name = CN,
- name = N } = State) ->
+ invoke = Fun, invoke_state = IState, mode = Mode} = State) ->
S = erlang:monotonic_time(),
Tag = {DTag, ReplyTo, CorrID},
- try handle_message(Tag, Fun, Key, Content, IState) of
+ try handle_message(Tag, Fun, Mode, Key, Content, IState) of
{Cmds, S2} when Cmds =:= []; Cmds =:= ok ->
- exometer_update_latency(CN, N, S),
{noreply, State#state { invoke_state = S2 }};
{Cmds, S2} when is_list(Cmds) ->
handle_commands(S, Cmds, State#state { invoke_state = S2 })
@@ -177,23 +199,18 @@ handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}
props = #'P_basic' {
correlation_id = CorrID,
reply_to = ReplyTo }} = Content},
- #state { invoke = Fun, invoke_state = IState } = State) ->
+ #state { invoke = Fun, invoke_state = IState, mode = Mode } = State) ->
S = erlang:monotonic_time(),
Tag = {DTag, ReplyTo, CorrID},
try
%% Transform a single message into the style of bulk messages
- %% by temporarily inserting an empty tag
- SingleTag = {undefined, ReplyTo, CorrID},
- {Cmds, S2} = case handle_message(SingleTag, Fun, Key, Content, IState) of
+ {Cmds, S2} = case handle_message(Tag, Fun, Mode, Key, Content, IState) of
{ack, IState2} -> {[{ack, Tag}], IState2};
- {reply, _Tag, CType, Msg, IState2} -> {[{reply, Tag, CType, Msg}], IState2};
+ {reply, CType, Msg, IState2} -> {[{reply, Tag, CType, Msg}], IState2};
{reject, IState2} -> {[{reject, Tag}], IState2};
{remove, IState2} -> {[{remove, Tag}], IState2};
{stop, Reason, IState2} -> {[{{stop, Reason}, Tag}], IState2};
- {ok, IState2} ->
- #state { conn_name = CN, name = N } = State,
- exometer_update_latency(CN, N, S),
- {[], IState2}
+ {ok, IState2} -> {[], IState2}
end,
handle_commands(S, Cmds, State#state { invoke_state = S2 })
catch
@@ -204,25 +221,12 @@ handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}
handle_exception(Class, Error, DTag, State)
end.
-handle_exception(Class, Error, _DTag, #state{ no_ack = true } = State) ->
+handle_exception(Class, Error, _DTag, #state{ mode = simple } = State) ->
{stop, {Class, Error}, State};
handle_exception(Class, Error, DTag, #state{ channel = Ch } = State) ->
amqp_channel:call(Ch, #'basic.reject' { delivery_tag = DTag, requeue = false }),
{stop, {Class, Error}, State}.
-handle_commands(_S, [], State) ->
- {noreply, State};
-handle_commands(S, [{reply, Tag, CType, Msg} | Next],
- #state { channel = Channel, conn_name = CN, name = N, no_ack = true } = State) ->
- exometer_update_latency(CN, N, S),
- reply(Channel, Tag, CType, Msg),
- handle_commands(S, Next, State);
-handle_commands(_S, [{stop, Reason, _Tag} | _Next], #state { no_ack = true } = State) ->
- {stop, Reason, State};
-handle_commands(_S, [{{stop, Reason}, _Tag} | _Next], #state { no_ack = true } = State) ->
- {stop, Reason, State};
-handle_commands(S, [_C | Next], #state { no_ack = true } = State) ->
- handle_commands(S, Next, State);
handle_commands(S, [C | Next],
#state { channel = Channel, conn_name = CN, name = N } = State) ->
case C of
@@ -253,21 +257,22 @@ handle_commands(S, [C | Next],
reply(Channel, Tag, CType, Msg),
ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }),
handle_commands(S, Next, State);
-
{{stop, Reason}, Tag} ->
ok = amqp_channel:cast(Channel,
#'basic.reject' { delivery_tag = delivery_tag(Tag), requeue = true }),
+ {stop, Reason, State};
+ {stop, Reason} ->
{stop, Reason, State}
end.
-handle_message(Tag, Fun, Key,
+handle_message(Tag, Fun, Mode, Key,
#amqp_msg {
payload = Payload,
props = #'P_basic' {
content_type = Type }}, IState) ->
- Res = case Tag of
- {undefined, _, _} -> Fun(Key, Type, Payload, IState);
- Tag -> Fun(Key, Type, Payload, Tag, IState)
+ Res = case Mode of
+ M when M =:= simple; M =:= single -> Fun(Key, Type, Payload, IState);
+ bulk -> Fun(Key, Type, Payload, Tag, IState)
end,
case Res of
%% Bulk messages
@@ -277,8 +282,8 @@ handle_message(Tag, Fun, Key,
%% Single messages
ack -> {ack, IState};
{ack, IState2} -> {ack, IState2};
- {reply, CType, Msg} -> {reply, Tag, CType, Msg, IState};
- {reply, CType, Msg, IState2} -> {reply, Tag, CType, Msg, IState2};
+ {reply, CType, Msg} -> {reply, CType, Msg, IState};
+ {reply, CType, Msg, IState2} -> {reply, CType, Msg, IState2};
reject -> {reject, IState};
{reject, IState2} -> {reject, IState2};
remove -> {remove, IState};
@@ -335,6 +340,7 @@ drain_reject_messages(Channel) ->
ok
end.
+mode(#{ mode := simple }) -> simple;
mode(#{ mode := bulk }) -> bulk;
mode(#{ mode := single }) -> single;
mode(#{}) -> single.
--
2.16.4