File 0021-no_ack-reimplemented-as-new-simple-mode.patch of Package turtle

From 9ccd54e9a1fdaf4d7ae0ec7e7db918e7ed6f25de Mon Sep 17 00:00:00 2001
From: Andrey Klymchuk <a.klymchuk@betinvest.com>
Date: Mon, 26 Nov 2018 15:09:50 +0200
Subject: [PATCH 1/6] no_ack reimplemented as new simple mode

---
 src/turtle.erl            | 10 +++---
 src/turtle_subscriber.erl | 84 +++++++++++++++++++++++++----------------------
 2 files changed, 51 insertions(+), 43 deletions(-)

diff --git a/src/turtle.erl b/src/turtle.erl
index 79b5de5..fa0a38c 100644
--- a/src/turtle.erl
+++ b/src/turtle.erl
@@ -290,14 +290,16 @@ await(publisher, Name, Timeout) ->
 consume(Channel, #'basic.consume' {} = Consume) ->
     #'basic.consume_ok' { consumer_tag = Tag } = amqp_channel:call(Channel, Consume),
     {ok, Tag};
-consume(Channel, Queue) ->
-    consume(Channel, #'basic.consume' { queue = Queue }).
+consume(Channel, Queue) -> consume(Channel, Queue, single).
 
 %% @doc consume/3 starts consumption on a channel with default parameters
 %% @end
 %% @private
-consume(Channel, Queue, NoAck) when is_boolean(NoAck) ->
-    consume(Channel, #'basic.consume' { queue = Queue, no_ack = NoAck }).
+consume(Channel, Queue, simple) ->
+    consume(Channel, #'basic.consume' { queue = Queue, no_ack = true });
+
+consume(Channel, Queue, Mode) when Mode =:= single; Mode =:= bulk ->
+	consume(Channel, #'basic.consume' { queue = Queue, no_ack = false }).
 
 
 %% @doc cancel/2 stop consumption on a channel again.
diff --git a/src/turtle_subscriber.erl b/src/turtle_subscriber.erl
index 5d2f759..c6f8aeb 100644
--- a/src/turtle_subscriber.erl
+++ b/src/turtle_subscriber.erl
@@ -32,7 +32,7 @@
          monitor :: reference(),
          consumer_tag,
          no_ack = #'basic.consume'{}#'basic.consume'.no_ack :: boolean(),
-         mode = single
+         mode = single :: simple | single | bulk
         }).
 
 %% LIFETIME MAINTENANCE
@@ -56,9 +56,8 @@ init([#{
     ok = turtle:qos(Ch, Conf),
     ok = amqp_channel:register_return_handler(Ch, self()),
     ok = turtle:declare(Ch, Decls, #{ passive => Passive }),
-    NoAck = maps:get(no_ack, Conf, #state{}#state.no_ack),
-    {ok, Tag} = turtle:consume(Ch, Queue, NoAck),
     Mode = mode(Conf),
+    {ok, Tag} = turtle:consume(Ch, Queue, Mode),
     {ok, #state {
             consumer_tag = Tag, 
             invoke = Fun,
@@ -68,7 +67,6 @@ init([#{
             monitor = MRef,
             conn_name = ConnName,
             name = Name,
-            no_ack = NoAck,
             mode = Mode
            }}.
 
@@ -88,6 +86,8 @@ handle_info(#'basic.consume_ok'{}, State) ->
 handle_info(#'basic.cancel_ok'{}, State) ->
     lager:info("Consumption canceled"),
     {stop, normal, shutdown(rabbitmq_gone, State)};
+handle_info({#'basic.deliver'{}, _Content} = Msg, #state { mode = simple } = State) ->
+    handle_deliver_simple(Msg, State);
 handle_info({#'basic.deliver'{}, _Content} = Msg, #state { mode = single } = State) ->
     handle_deliver_single(Msg, State);
 handle_info({#'basic.deliver'{}, _Content} = Msg, #state { mode = bulk } = State) ->
@@ -112,6 +112,7 @@ handle_info(Info, #state { handle_info = HandleInfo, invoke_state = IState } = S
     S = erlang:monotonic_time(),
     try HandleInfo(Info, IState) of
         {ok, IState2} -> {noreply, State#state { invoke_state = IState2 }};
+        {stop, Reason, IState2} -> {stop, Reason, State#state { invoke_state = IState2 }};
         {Cmds, IState2} when is_list(Cmds) ->
             handle_commands(S, Cmds, State#state { invoke_state = IState2 })
     catch
@@ -147,20 +148,41 @@ code_change(_, State, _) ->
 %% INTERNAL FUNCTIONS
 %%
 
+handle_deliver_simple({#'basic.deliver' {delivery_tag = DTag, routing_key = Key},
+    #amqp_msg {
+        props = #'P_basic' {
+            correlation_id = CorrID,
+            reply_to = ReplyTo }} = Content},
+    #state {
+        channel = Channel, invoke = Fun, invoke_state = IState, mode = Mode} = State) ->
+    Tag = {DTag, ReplyTo, CorrID},
+    try handle_message(Tag, Fun, Mode, Key, Content, IState) of
+        {reply, CType, Msg, S2} ->
+            reply(Channel, Tag, CType, Msg),
+            {noreply, State#state { invoke_state = S2 }};
+        {stop, Reason, S2} ->
+            {stop, Reason, State#state { invoke_state = S2 }};
+        {_, S2} ->
+            {noreply, State#state { invoke_state = S2 }}
+    catch
+        Class:Error ->
+            lager:error("Handler function crashed: {~p, ~p}, stack: ~p, content: ~p",
+                [Class, Error, erlang:get_stacktrace(), format_amqp_msg(Content)]),
+            lager:error("Mailbox size ~p", [erlang:process_info(self(), message_queue_len)]),
+            handle_exception(Class, Error, DTag, State)
+    end.
+
 handle_deliver_bulk({#'basic.deliver' {delivery_tag = DTag, routing_key = Key},
 	#amqp_msg {
 	    props = #'P_basic' {
 	        correlation_id = CorrID,
 	        reply_to = ReplyTo }} = Content},
 	#state {
-	  invoke = Fun, invoke_state = IState,
-	  conn_name = CN,
-	  name = N } = State) ->
+	  invoke = Fun, invoke_state = IState, mode = Mode} = State) ->
     S = erlang:monotonic_time(),
     Tag = {DTag, ReplyTo, CorrID},
-    try handle_message(Tag, Fun, Key, Content, IState) of
+    try handle_message(Tag, Fun, Mode, Key, Content, IState) of
         {Cmds, S2} when Cmds =:= []; Cmds =:= ok ->
-            exometer_update_latency(CN, N, S),
             {noreply, State#state { invoke_state = S2 }};
         {Cmds, S2} when is_list(Cmds) ->
             handle_commands(S, Cmds, State#state { invoke_state = S2 })
@@ -177,23 +199,18 @@ handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}
 	    props = #'P_basic' {
 	        correlation_id = CorrID,
 	        reply_to = ReplyTo }} = Content},
-	#state { invoke = Fun, invoke_state = IState } = State) ->
+	#state { invoke = Fun, invoke_state = IState, mode = Mode } = State) ->
     S = erlang:monotonic_time(),
     Tag = {DTag, ReplyTo, CorrID},
     try
         %% Transform a single message into the style of bulk messages
-        %% by temporarily inserting an empty tag
-        SingleTag = {undefined, ReplyTo, CorrID},
-        {Cmds, S2} = case handle_message(SingleTag, Fun, Key, Content, IState) of
+        {Cmds, S2} = case handle_message(Tag, Fun, Mode, Key, Content, IState) of
             {ack, IState2} -> {[{ack, Tag}], IState2};
-            {reply, _Tag, CType, Msg, IState2} -> {[{reply, Tag, CType, Msg}], IState2};
+            {reply, CType, Msg, IState2} -> {[{reply, Tag, CType, Msg}], IState2};
             {reject, IState2} -> {[{reject, Tag}], IState2};
             {remove, IState2} -> {[{remove, Tag}], IState2};
             {stop, Reason, IState2} -> {[{{stop, Reason}, Tag}], IState2};
-            {ok, IState2} ->
-                #state { conn_name = CN, name = N } = State,
-                exometer_update_latency(CN, N, S),
-                {[], IState2}
+            {ok, IState2} -> {[], IState2}
         end,
         handle_commands(S, Cmds, State#state { invoke_state = S2 })
     catch
@@ -204,25 +221,12 @@ handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}
            handle_exception(Class, Error, DTag, State)
     end.
 
-handle_exception(Class, Error, _DTag, #state{ no_ack = true } = State) ->
+handle_exception(Class, Error, _DTag, #state{ mode = simple } = State) ->
     {stop, {Class, Error}, State};
 handle_exception(Class, Error, DTag, #state{ channel = Ch } = State) ->
     amqp_channel:call(Ch, #'basic.reject' { delivery_tag = DTag, requeue = false }),
     {stop, {Class, Error}, State}.
 
-handle_commands(_S, [], State) ->
-    {noreply, State};
-handle_commands(S, [{reply, Tag, CType, Msg} | Next],
-  #state { channel = Channel, conn_name = CN, name = N, no_ack = true } = State) ->
-    exometer_update_latency(CN, N, S),
-    reply(Channel, Tag, CType, Msg),
-    handle_commands(S, Next, State);
-handle_commands(_S, [{stop, Reason, _Tag} | _Next], #state { no_ack = true } = State) ->
-    {stop, Reason, State};
-handle_commands(_S, [{{stop, Reason}, _Tag} | _Next], #state { no_ack = true } = State) ->
-    {stop, Reason, State};
-handle_commands(S, [_C | Next], #state { no_ack = true } = State) ->
-    handle_commands(S, Next, State);
 handle_commands(S, [C | Next],
 	#state { channel = Channel, conn_name = CN, name = N } = State) ->
     case C of
@@ -253,21 +257,22 @@ handle_commands(S, [C | Next],
            reply(Channel, Tag, CType, Msg),
            ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }),
            handle_commands(S, Next, State);
-
         {{stop, Reason}, Tag} ->
             ok = amqp_channel:cast(Channel,
             	#'basic.reject' { delivery_tag = delivery_tag(Tag), requeue = true }),
+            {stop, Reason, State};
+        {stop, Reason} ->
             {stop, Reason, State}
     end.
 
-handle_message(Tag, Fun, Key,
+handle_message(Tag, Fun, Mode, Key,
 	#amqp_msg {
 	    payload = Payload,
 	    props = #'P_basic' {
 	        content_type = Type }}, IState) ->
-    Res = case Tag of
-        {undefined, _, _} -> Fun(Key, Type, Payload, IState);
-        Tag -> Fun(Key, Type, Payload, Tag, IState)
+    Res = case Mode of
+        M when M =:= simple; M =:= single -> Fun(Key, Type, Payload, IState);
+        bulk -> Fun(Key, Type, Payload, Tag, IState)
     end,
     case Res of
         %% Bulk messages
@@ -277,8 +282,8 @@ handle_message(Tag, Fun, Key,
         %% Single messages
         ack -> {ack, IState};
         {ack, IState2} -> {ack, IState2};
-        {reply, CType, Msg} -> {reply, Tag, CType, Msg, IState};
-        {reply, CType, Msg, IState2} -> {reply, Tag, CType, Msg, IState2};
+        {reply, CType, Msg} -> {reply, CType, Msg, IState};
+        {reply, CType, Msg, IState2} -> {reply, CType, Msg, IState2};
         reject -> {reject, IState};
         {reject, IState2} -> {reject, IState2};
         remove -> {remove, IState};
@@ -335,6 +340,7 @@ drain_reject_messages(Channel) ->
         ok
     end.
 
+mode(#{ mode := simple }) -> simple;
 mode(#{ mode := bulk }) -> bulk;
 mode(#{ mode := single }) -> single;
 mode(#{}) -> single.
-- 
2.16.4

openSUSE Build Service is sponsored by