File 0022-turtle_subscriber-stylistic-changes.patch of Package turtle

From 73170b0464131647269a1eb33f325d293f17a53e Mon Sep 17 00:00:00 2001
From: Andrey Klymchuk <a.klymchuk@betinvest.com>
Date: Wed, 28 Nov 2018 12:01:02 +0200
Subject: [PATCH 2/6] turtle_subscriber: stylistic changes

---
 src/turtle_subscriber.erl | 117 ++++++++++++++++++++++------------------------
 1 file changed, 55 insertions(+), 62 deletions(-)

diff --git a/src/turtle_subscriber.erl b/src/turtle_subscriber.erl
index c6f8aeb..3bda0a4 100644
--- a/src/turtle_subscriber.erl
+++ b/src/turtle_subscriber.erl
@@ -153,17 +153,16 @@ handle_deliver_simple({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}
         props = #'P_basic' {
             correlation_id = CorrID,
             reply_to = ReplyTo }} = Content},
-    #state {
-        channel = Channel, invoke = Fun, invoke_state = IState, mode = Mode} = State) ->
+    #state {channel = Channel} = State) ->
     Tag = {DTag, ReplyTo, CorrID},
-    try handle_message(Tag, Fun, Mode, Key, Content, IState) of
-        {reply, CType, Msg, S2} ->
+    try handle_message(Tag, Key, Content, State) of
+        {reply, CType, Msg, IState} ->
             reply(Channel, Tag, CType, Msg),
-            {noreply, State#state { invoke_state = S2 }};
-        {stop, Reason, S2} ->
-            {stop, Reason, State#state { invoke_state = S2 }};
-        {_, S2} ->
-            {noreply, State#state { invoke_state = S2 }}
+            {noreply, State#state { invoke_state = IState }};
+        {stop, Reason, IState} ->
+            {stop, Reason, State#state { invoke_state = IState }};
+        {_, IState} ->
+            {noreply, State#state { invoke_state = IState }}
     catch
         Class:Error ->
             lager:error("Handler function crashed: {~p, ~p}, stack: ~p, content: ~p",
@@ -173,19 +172,17 @@ handle_deliver_simple({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}
     end.
 
 handle_deliver_bulk({#'basic.deliver' {delivery_tag = DTag, routing_key = Key},
-	#amqp_msg {
-	    props = #'P_basic' {
-	        correlation_id = CorrID,
-	        reply_to = ReplyTo }} = Content},
-	#state {
-	  invoke = Fun, invoke_state = IState, mode = Mode} = State) ->
+    #amqp_msg {
+        props = #'P_basic' {
+            correlation_id = CorrID,
+            reply_to = ReplyTo }} = Content}, State) ->
     S = erlang:monotonic_time(),
     Tag = {DTag, ReplyTo, CorrID},
-    try handle_message(Tag, Fun, Mode, Key, Content, IState) of
-        {Cmds, S2} when Cmds =:= []; Cmds =:= ok ->
-            {noreply, State#state { invoke_state = S2 }};
-        {Cmds, S2} when is_list(Cmds) ->
-            handle_commands(S, Cmds, State#state { invoke_state = S2 })
+    try handle_message(Tag, Key, Content, State) of
+        {Cmds, IState} when Cmds =:= []; Cmds =:= ok ->
+            {noreply, State#state { invoke_state = IState }};
+        {Cmds, IState} when is_list(Cmds) ->
+            handle_commands(S, Cmds, State#state { invoke_state = IState })
     catch
         Class:Error ->
            lager:error("Handler function crashed: {~p, ~p}, stack: ~p, content: ~p",
@@ -195,24 +192,23 @@ handle_deliver_bulk({#'basic.deliver' {delivery_tag = DTag, routing_key = Key},
     end.
         
 handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key},
-		#amqp_msg {
-	    props = #'P_basic' {
-	        correlation_id = CorrID,
-	        reply_to = ReplyTo }} = Content},
-	#state { invoke = Fun, invoke_state = IState, mode = Mode } = State) ->
+    #amqp_msg {
+        props = #'P_basic' {
+            correlation_id = CorrID,
+            reply_to = ReplyTo }} = Content}, State) ->
     S = erlang:monotonic_time(),
     Tag = {DTag, ReplyTo, CorrID},
     try
         %% Transform a single message into the style of bulk messages
-        {Cmds, S2} = case handle_message(Tag, Fun, Mode, Key, Content, IState) of
-            {ack, IState2} -> {[{ack, Tag}], IState2};
-            {reply, CType, Msg, IState2} -> {[{reply, Tag, CType, Msg}], IState2};
-            {reject, IState2} -> {[{reject, Tag}], IState2};
-            {remove, IState2} -> {[{remove, Tag}], IState2};
-            {stop, Reason, IState2} -> {[{{stop, Reason}, Tag}], IState2};
-            {ok, IState2} -> {[], IState2}
+        {Cmds, IState2} = case handle_message(Tag, Key, Content, State) of
+            {ack, IState} -> {[{ack, Tag}], IState};
+            {reply, CType, Msg, IState} -> {[{reply, Tag, CType, Msg}], IState};
+            {reject, IState} -> {[{reject, Tag}], IState};
+            {remove, IState} -> {[{remove, Tag}], IState};
+            {stop, Reason, IState} -> {[{{stop, Reason}, Tag}], IState};
+            {ok, IState} -> {[], IState}
         end,
-        handle_commands(S, Cmds, State#state { invoke_state = S2 })
+        handle_commands(S, Cmds, State#state { invoke_state = IState2 })
     catch
         Class:Error ->
            lager:error("Handler function crashed: {~p, ~p}, stack: ~p, content: ~p",
@@ -227,49 +223,46 @@ handle_exception(Class, Error, DTag, #state{ channel = Ch } = State) ->
     amqp_channel:call(Ch, #'basic.reject' { delivery_tag = DTag, requeue = false }),
     {stop, {Class, Error}, State}.
 
-handle_commands(S, [C | Next],
-	#state { channel = Channel, conn_name = CN, name = N } = State) ->
+handle_commands(S, [C | Next], #state { channel = Channel, conn_name = CN, name = N } = State) ->
     case C of
         {ack, Tag} ->
             exometer_update_latency(CN, N, S),
-           ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }),
-           handle_commands(S, Next, State);
-       {bulk_ack, Tag} ->
+            ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }),
+            handle_commands(S, Next, State);
+        {bulk_ack, Tag} ->
             exometer_update_latency(CN, N, S),
-           ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag), multiple = true }),
-           handle_commands(S, Next, State);
-       {bulk_nack, Tag} ->
+            ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag), multiple = true }),
+            handle_commands(S, Next, State);
+        {bulk_nack, Tag} ->
             exometer_update_latency(CN, N, S),
-           ok = amqp_channel:cast(Channel, #'basic.nack' { delivery_tag = delivery_tag(Tag), multiple = true }),
-           handle_commands(S, Next, State);
-       {reject, Tag} ->
-           exometer:update([CN, N, rejects], 1),
-           ok = amqp_channel:cast(Channel,
-           	#'basic.reject' { delivery_tag = delivery_tag(Tag), requeue=true }),
-           handle_commands(S, Next, State);
+            ok = amqp_channel:cast(Channel, #'basic.nack' { delivery_tag = delivery_tag(Tag), multiple = true }),
+            handle_commands(S, Next, State);
+        {reject, Tag} ->
+            exometer:update([CN, N, rejects], 1),
+            ok = amqp_channel:cast(Channel, #'basic.reject' { delivery_tag = delivery_tag(Tag), requeue=true }),
+            handle_commands(S, Next, State);
         {remove, Tag} ->
-           exometer:update([CN, N, removals], 1),
-           ok = amqp_channel:cast(Channel,
-           	#'basic.reject' { delivery_tag = delivery_tag(Tag), requeue = false}),
-           handle_commands(S, Next, State);
+            exometer:update([CN, N, removals], 1),
+            ok = amqp_channel:cast(Channel, #'basic.reject' { delivery_tag = delivery_tag(Tag), requeue = false}),
+            handle_commands(S, Next, State);
         {reply, Tag, CType, Msg} ->
             exometer_update_latency(CN, N, S),
-           reply(Channel, Tag, CType, Msg),
-           ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }),
-           handle_commands(S, Next, State);
+            reply(Channel, Tag, CType, Msg),
+            ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }),
+            handle_commands(S, Next, State);
         {{stop, Reason}, Tag} ->
-            ok = amqp_channel:cast(Channel,
-            	#'basic.reject' { delivery_tag = delivery_tag(Tag), requeue = true }),
+            ok = amqp_channel:cast(Channel, #'basic.reject' { delivery_tag = delivery_tag(Tag), requeue = true }),
             {stop, Reason, State};
         {stop, Reason} ->
             {stop, Reason, State}
     end.
 
-handle_message(Tag, Fun, Mode, Key,
-	#amqp_msg {
-	    payload = Payload,
-	    props = #'P_basic' {
-	        content_type = Type }}, IState) ->
+handle_message(Tag, Key,
+    #amqp_msg {
+        payload = Payload,
+        props = #'P_basic' {
+            content_type = Type }},
+    #state { invoke = Fun, invoke_state = IState, mode = Mode }) ->
     Res = case Mode of
         M when M =:= simple; M =:= single -> Fun(Key, Type, Payload, IState);
         bulk -> Fun(Key, Type, Payload, Tag, IState)
-- 
2.16.4

openSUSE Build Service is sponsored by