File 0013-turtle_subscriber-add-no_ack-support.patch of Package turtle

From aff0ce374922ed61655a898b657075723d372f37 Mon Sep 17 00:00:00 2001
From: Andrey Klymchuk <a.klymchuk@betinvest.com>
Date: Wed, 5 Sep 2018 17:36:39 +0300
Subject: [PATCH 3/3] turtle_subscriber: add no_ack support

Signed-off-by: Led <ledest@gmail.com>
---
 src/turtle_subscriber.erl | 66 ++++++++++++++++++++++++++---------------------
 1 file changed, 37 insertions(+), 29 deletions(-)

diff --git a/src/turtle_subscriber.erl b/src/turtle_subscriber.erl
index 30c71a1..064c402 100644
--- a/src/turtle_subscriber.erl
+++ b/src/turtle_subscriber.erl
@@ -31,6 +31,7 @@
          channel :: undefined | pid(),
          monitor :: reference(),
          consumer_tag,
+         no_ack = #'basic.consume'{}#'basic.consume'.no_ack :: boolean(),
          mode = single
         }).
 
@@ -55,7 +56,8 @@ init([#{
     ok = turtle:qos(Ch, Conf),
     ok = amqp_channel:register_return_handler(Ch, self()),
     ok = turtle:declare(Ch, Decls, #{ passive => Passive }),
-    {ok, Tag} = turtle:consume(Ch, Queue),
+    NoAck = maps:get(no_ack, Conf, #state{}#state.no_ack),
+    {ok, Tag} = turtle:consume(Ch, Queue, NoAck),
     Mode = mode(Conf),
     {ok, #state {
             consumer_tag = Tag, 
@@ -66,6 +68,7 @@ init([#{
             monitor = MRef,
             conn_name = ConnName,
             name = Name,
+            no_ack = NoAck,
             mode = Mode
            }}.
 
@@ -151,17 +154,13 @@ handle_deliver_bulk({#'basic.deliver' {delivery_tag = DTag, routing_key = Key},
 	        reply_to = ReplyTo }} = Content},
 	#state {
 	  invoke = Fun, invoke_state = IState,
-	  channel = Channel,
 	  conn_name = CN,
 	  name = N } = State) ->
     S = erlang:monotonic_time(),
     Tag = {DTag, ReplyTo, CorrID},
     try handle_message(Tag, Fun, Key, Content, IState) of
-        {[], S2} ->
-            E = erlang:monotonic_time(),
-            exometer:update([CN, N, msgs], 1),
-            exometer:update([CN, N, latency],
-                erlang:convert_time_unit(E-S, native, milli_seconds)),
+        {Cmds, S2} when Cmds =:= []; Cmds =:= ok ->
+            exometer_update_latency(CN, N, S),
             {noreply, State#state { invoke_state = S2 }};
         {Cmds, S2} when is_list(Cmds) ->
             handle_commands(S, Cmds, State#state { invoke_state = S2 })
@@ -170,8 +169,7 @@ handle_deliver_bulk({#'basic.deliver' {delivery_tag = DTag, routing_key = Key},
            lager:error("Handler function crashed: {~p, ~p}, stack: ~p, content: ~p",
                [Class, Error, erlang:get_stacktrace(), format_amqp_msg(Content)]),
            lager:error("Mailbox size ~p", [erlang:process_info(self(), message_queue_len)]),
-           ok = amqp_channel:call(Channel, #'basic.reject' { delivery_tag = Tag, requeue = false }),
-           {stop, {Class, Error}, State}
+           handle_exception(Class, Error, DTag, State)
     end.
         
 handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key},
@@ -179,7 +177,7 @@ handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}
 	    props = #'P_basic' {
 	        correlation_id = CorrID,
 	        reply_to = ReplyTo }} = Content},
-	#state { invoke = Fun, invoke_state = IState,channel = Channel } = State) ->
+	#state { invoke = Fun, invoke_state = IState } = State) ->
     S = erlang:monotonic_time(),
     Tag = {DTag, ReplyTo, CorrID},
     try
@@ -192,7 +190,10 @@ handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}
             {reject, IState2} -> {[{reject, Tag}], IState2};
             {remove, IState2} -> {[{remove, Tag}], IState2};
             {stop, Reason, IState2} -> {[{{stop, Reason}, Tag}], IState2};
-            {ok, IState2} -> {[], IState2}
+            {ok, IState2} ->
+                #state { conn_name = CN, name = N } = State,
+                exometer_update_latency(CN, N, S),
+                {[], IState2}
         end,
         handle_commands(S, Cmds, State#state { invoke_state = S2 })
     catch
@@ -200,34 +201,40 @@ handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}
            lager:error("Handler function crashed: {~p, ~p}, stack: ~p, content: ~p",
                [Class, Error, erlang:get_stacktrace(), format_amqp_msg(Content)]),
            lager:error("Mailbox size ~p", [erlang:process_info(self(), message_queue_len)]),
-           ok = amqp_channel:call(Channel, #'basic.reject' { delivery_tag = DTag, requeue = false }),
-           {stop, {Class, Error}, State}
+           handle_exception(Class, Error, DTag, State)
     end.
 
+handle_exception(Class, Error, _DTag, #state{ no_ack = true } = State) ->
+    {stop, {Class, Error}, State};
+handle_exception(Class, Error, DTag, #state{ channel = Ch } = State) ->
+    amqp_channel:call(Ch, #'basic.reject' { delivery_tag = DTag, requeue = false }),
+    {stop, {Class, Error}, State}.
+
 handle_commands(_S, [], State) ->
     {noreply, State};
+handle_commands(S, [{reply, Tag, CType, Msg} | Next],
+  #state { channel = Channel, conn_name = CN, name = N, no_ack = true } = State) ->
+    exometer_update_latency(CN, N, S),
+    reply(Channel, Tag, CType, Msg),
+    handle_commands(S, Next, State);
+handle_commands(_S, [{stop, Reason, _Tag} | _Next], #state { no_ack = true } = State) ->
+    {stop, Reason, State};
+handle_commands(S, [C | Next], #state { no_ack = true } = State) ->
+    lager:warning("Unexpected cmd ~p in no_ack mode is ignored", [C]),
+    handle_commands(S, Next, State);
 handle_commands(S, [C | Next],
 	#state { channel = Channel, conn_name = CN, name = N } = State) ->
     case C of
         {ack, Tag} ->
-            E = erlang:monotonic_time(),
-            exometer:update([CN, N, msgs], 1),
-            exometer:update([CN, N, latency],
-                erlang:convert_time_unit(E-S, native, milli_seconds)),
+            exometer_update_latency(CN, N, S),
            ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }),
            handle_commands(S, Next, State);
        {bulk_ack, Tag} ->
-            E = erlang:monotonic_time(),
-            exometer:update([CN, N, msgs], 1),
-            exometer:update([CN, N, latency],
-                erlang:convert_time_unit(E-S, native, milli_seconds)),
+            exometer_update_latency(CN, N, S),
            ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag), multiple = true }),
            handle_commands(S, Next, State);
        {bulk_nack, Tag} ->
-           E = erlang:monotonic_time(),
-            exometer:update([CN, N, msgs], 1),
-            exometer:update([CN, N, latency],
-                erlang:convert_time_unit(E-S, native, milli_seconds)),
+            exometer_update_latency(CN, N, S),
            ok = amqp_channel:cast(Channel, #'basic.nack' { delivery_tag = delivery_tag(Tag), multiple = true }),
            handle_commands(S, Next, State);
        {reject, Tag} ->
@@ -241,10 +248,7 @@ handle_commands(S, [C | Next],
            	#'basic.reject' { delivery_tag = delivery_tag(Tag), requeue = false}),
            handle_commands(S, Next, State);
         {reply, Tag, CType, Msg} ->
-            E = erlang:monotonic_time(),
-            exometer:update([CN, N, msgs], 1),
-            exometer:update([CN, N, latency],
-                erlang:convert_time_unit(E-S, native, milli_seconds)),
+            exometer_update_latency(CN, N, S),
            reply(Channel, Tag, CType, Msg),
            ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }),
            handle_commands(S, Next, State);
@@ -362,3 +366,7 @@ shutdown_process_commands(_S, Cmds, State, Reason) ->
         [length(Cmds), Reason]),
     State.
 
+exometer_update_latency(CN, N, S) ->
+    L = erlang:convert_time_unit(erlang:monotonic_time() - S, native, milli_seconds),
+    exometer:update([CN, N, msgs], 1),
+    exometer:update([CN, N, latency], L).
-- 
2.16.4

openSUSE Build Service is sponsored by