File 0022-turtle_subscriber-stylistic-changes.patch of Package turtle
From 73170b0464131647269a1eb33f325d293f17a53e Mon Sep 17 00:00:00 2001
From: Andrey Klymchuk <a.klymchuk@betinvest.com>
Date: Wed, 28 Nov 2018 12:01:02 +0200
Subject: [PATCH 2/6] turtle_subscriber: stylistic changes
---
src/turtle_subscriber.erl | 117 ++++++++++++++++++++++------------------------
1 file changed, 55 insertions(+), 62 deletions(-)
diff --git a/src/turtle_subscriber.erl b/src/turtle_subscriber.erl
index c6f8aeb..3bda0a4 100644
--- a/src/turtle_subscriber.erl
+++ b/src/turtle_subscriber.erl
@@ -153,17 +153,16 @@ handle_deliver_simple({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}
props = #'P_basic' {
correlation_id = CorrID,
reply_to = ReplyTo }} = Content},
- #state {
- channel = Channel, invoke = Fun, invoke_state = IState, mode = Mode} = State) ->
+ #state {channel = Channel} = State) ->
Tag = {DTag, ReplyTo, CorrID},
- try handle_message(Tag, Fun, Mode, Key, Content, IState) of
- {reply, CType, Msg, S2} ->
+ try handle_message(Tag, Key, Content, State) of
+ {reply, CType, Msg, IState} ->
reply(Channel, Tag, CType, Msg),
- {noreply, State#state { invoke_state = S2 }};
- {stop, Reason, S2} ->
- {stop, Reason, State#state { invoke_state = S2 }};
- {_, S2} ->
- {noreply, State#state { invoke_state = S2 }}
+ {noreply, State#state { invoke_state = IState }};
+ {stop, Reason, IState} ->
+ {stop, Reason, State#state { invoke_state = IState }};
+ {_, IState} ->
+ {noreply, State#state { invoke_state = IState }}
catch
Class:Error ->
lager:error("Handler function crashed: {~p, ~p}, stack: ~p, content: ~p",
@@ -173,19 +172,17 @@ handle_deliver_simple({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}
end.
handle_deliver_bulk({#'basic.deliver' {delivery_tag = DTag, routing_key = Key},
- #amqp_msg {
- props = #'P_basic' {
- correlation_id = CorrID,
- reply_to = ReplyTo }} = Content},
- #state {
- invoke = Fun, invoke_state = IState, mode = Mode} = State) ->
+ #amqp_msg {
+ props = #'P_basic' {
+ correlation_id = CorrID,
+ reply_to = ReplyTo }} = Content}, State) ->
S = erlang:monotonic_time(),
Tag = {DTag, ReplyTo, CorrID},
- try handle_message(Tag, Fun, Mode, Key, Content, IState) of
- {Cmds, S2} when Cmds =:= []; Cmds =:= ok ->
- {noreply, State#state { invoke_state = S2 }};
- {Cmds, S2} when is_list(Cmds) ->
- handle_commands(S, Cmds, State#state { invoke_state = S2 })
+ try handle_message(Tag, Key, Content, State) of
+ {Cmds, IState} when Cmds =:= []; Cmds =:= ok ->
+ {noreply, State#state { invoke_state = IState }};
+ {Cmds, IState} when is_list(Cmds) ->
+ handle_commands(S, Cmds, State#state { invoke_state = IState })
catch
Class:Error ->
lager:error("Handler function crashed: {~p, ~p}, stack: ~p, content: ~p",
@@ -195,24 +192,23 @@ handle_deliver_bulk({#'basic.deliver' {delivery_tag = DTag, routing_key = Key},
end.
handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key},
- #amqp_msg {
- props = #'P_basic' {
- correlation_id = CorrID,
- reply_to = ReplyTo }} = Content},
- #state { invoke = Fun, invoke_state = IState, mode = Mode } = State) ->
+ #amqp_msg {
+ props = #'P_basic' {
+ correlation_id = CorrID,
+ reply_to = ReplyTo }} = Content}, State) ->
S = erlang:monotonic_time(),
Tag = {DTag, ReplyTo, CorrID},
try
%% Transform a single message into the style of bulk messages
- {Cmds, S2} = case handle_message(Tag, Fun, Mode, Key, Content, IState) of
- {ack, IState2} -> {[{ack, Tag}], IState2};
- {reply, CType, Msg, IState2} -> {[{reply, Tag, CType, Msg}], IState2};
- {reject, IState2} -> {[{reject, Tag}], IState2};
- {remove, IState2} -> {[{remove, Tag}], IState2};
- {stop, Reason, IState2} -> {[{{stop, Reason}, Tag}], IState2};
- {ok, IState2} -> {[], IState2}
+ {Cmds, IState2} = case handle_message(Tag, Key, Content, State) of
+ {ack, IState} -> {[{ack, Tag}], IState};
+ {reply, CType, Msg, IState} -> {[{reply, Tag, CType, Msg}], IState};
+ {reject, IState} -> {[{reject, Tag}], IState};
+ {remove, IState} -> {[{remove, Tag}], IState};
+ {stop, Reason, IState} -> {[{{stop, Reason}, Tag}], IState};
+ {ok, IState} -> {[], IState}
end,
- handle_commands(S, Cmds, State#state { invoke_state = S2 })
+ handle_commands(S, Cmds, State#state { invoke_state = IState2 })
catch
Class:Error ->
lager:error("Handler function crashed: {~p, ~p}, stack: ~p, content: ~p",
@@ -227,49 +223,46 @@ handle_exception(Class, Error, DTag, #state{ channel = Ch } = State) ->
amqp_channel:call(Ch, #'basic.reject' { delivery_tag = DTag, requeue = false }),
{stop, {Class, Error}, State}.
-handle_commands(S, [C | Next],
- #state { channel = Channel, conn_name = CN, name = N } = State) ->
+handle_commands(S, [C | Next], #state { channel = Channel, conn_name = CN, name = N } = State) ->
case C of
{ack, Tag} ->
exometer_update_latency(CN, N, S),
- ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }),
- handle_commands(S, Next, State);
- {bulk_ack, Tag} ->
+ ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }),
+ handle_commands(S, Next, State);
+ {bulk_ack, Tag} ->
exometer_update_latency(CN, N, S),
- ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag), multiple = true }),
- handle_commands(S, Next, State);
- {bulk_nack, Tag} ->
+ ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag), multiple = true }),
+ handle_commands(S, Next, State);
+ {bulk_nack, Tag} ->
exometer_update_latency(CN, N, S),
- ok = amqp_channel:cast(Channel, #'basic.nack' { delivery_tag = delivery_tag(Tag), multiple = true }),
- handle_commands(S, Next, State);
- {reject, Tag} ->
- exometer:update([CN, N, rejects], 1),
- ok = amqp_channel:cast(Channel,
- #'basic.reject' { delivery_tag = delivery_tag(Tag), requeue=true }),
- handle_commands(S, Next, State);
+ ok = amqp_channel:cast(Channel, #'basic.nack' { delivery_tag = delivery_tag(Tag), multiple = true }),
+ handle_commands(S, Next, State);
+ {reject, Tag} ->
+ exometer:update([CN, N, rejects], 1),
+ ok = amqp_channel:cast(Channel, #'basic.reject' { delivery_tag = delivery_tag(Tag), requeue=true }),
+ handle_commands(S, Next, State);
{remove, Tag} ->
- exometer:update([CN, N, removals], 1),
- ok = amqp_channel:cast(Channel,
- #'basic.reject' { delivery_tag = delivery_tag(Tag), requeue = false}),
- handle_commands(S, Next, State);
+ exometer:update([CN, N, removals], 1),
+ ok = amqp_channel:cast(Channel, #'basic.reject' { delivery_tag = delivery_tag(Tag), requeue = false}),
+ handle_commands(S, Next, State);
{reply, Tag, CType, Msg} ->
exometer_update_latency(CN, N, S),
- reply(Channel, Tag, CType, Msg),
- ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }),
- handle_commands(S, Next, State);
+ reply(Channel, Tag, CType, Msg),
+ ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }),
+ handle_commands(S, Next, State);
{{stop, Reason}, Tag} ->
- ok = amqp_channel:cast(Channel,
- #'basic.reject' { delivery_tag = delivery_tag(Tag), requeue = true }),
+ ok = amqp_channel:cast(Channel, #'basic.reject' { delivery_tag = delivery_tag(Tag), requeue = true }),
{stop, Reason, State};
{stop, Reason} ->
{stop, Reason, State}
end.
-handle_message(Tag, Fun, Mode, Key,
- #amqp_msg {
- payload = Payload,
- props = #'P_basic' {
- content_type = Type }}, IState) ->
+handle_message(Tag, Key,
+ #amqp_msg {
+ payload = Payload,
+ props = #'P_basic' {
+ content_type = Type }},
+ #state { invoke = Fun, invoke_state = IState, mode = Mode }) ->
Res = case Mode of
M when M =:= simple; M =:= single -> Fun(Key, Type, Payload, IState);
bulk -> Fun(Key, Type, Payload, Tag, IState)
--
2.16.4