Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:20
turtle
0022-turtle_subscriber-stylistic-changes.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 0022-turtle_subscriber-stylistic-changes.patch of Package turtle
From 73170b0464131647269a1eb33f325d293f17a53e Mon Sep 17 00:00:00 2001 From: Andrey Klymchuk <a.klymchuk@betinvest.com> Date: Wed, 28 Nov 2018 12:01:02 +0200 Subject: [PATCH 2/6] turtle_subscriber: stylistic changes --- src/turtle_subscriber.erl | 117 ++++++++++++++++++++++------------------------ 1 file changed, 55 insertions(+), 62 deletions(-) diff --git a/src/turtle_subscriber.erl b/src/turtle_subscriber.erl index c6f8aeb..3bda0a4 100644 --- a/src/turtle_subscriber.erl +++ b/src/turtle_subscriber.erl @@ -153,17 +153,16 @@ handle_deliver_simple({#'basic.deliver' {delivery_tag = DTag, routing_key = Key} props = #'P_basic' { correlation_id = CorrID, reply_to = ReplyTo }} = Content}, - #state { - channel = Channel, invoke = Fun, invoke_state = IState, mode = Mode} = State) -> + #state {channel = Channel} = State) -> Tag = {DTag, ReplyTo, CorrID}, - try handle_message(Tag, Fun, Mode, Key, Content, IState) of - {reply, CType, Msg, S2} -> + try handle_message(Tag, Key, Content, State) of + {reply, CType, Msg, IState} -> reply(Channel, Tag, CType, Msg), - {noreply, State#state { invoke_state = S2 }}; - {stop, Reason, S2} -> - {stop, Reason, State#state { invoke_state = S2 }}; - {_, S2} -> - {noreply, State#state { invoke_state = S2 }} + {noreply, State#state { invoke_state = IState }}; + {stop, Reason, IState} -> + {stop, Reason, State#state { invoke_state = IState }}; + {_, IState} -> + {noreply, State#state { invoke_state = IState }} catch Class:Error -> lager:error("Handler function crashed: {~p, ~p}, stack: ~p, content: ~p", @@ -173,19 +172,17 @@ handle_deliver_simple({#'basic.deliver' {delivery_tag = DTag, routing_key = Key} end. handle_deliver_bulk({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}, - #amqp_msg { - props = #'P_basic' { - correlation_id = CorrID, - reply_to = ReplyTo }} = Content}, - #state { - invoke = Fun, invoke_state = IState, mode = Mode} = State) -> + #amqp_msg { + props = #'P_basic' { + correlation_id = CorrID, + reply_to = ReplyTo }} = Content}, State) -> S = erlang:monotonic_time(), Tag = {DTag, ReplyTo, CorrID}, - try handle_message(Tag, Fun, Mode, Key, Content, IState) of - {Cmds, S2} when Cmds =:= []; Cmds =:= ok -> - {noreply, State#state { invoke_state = S2 }}; - {Cmds, S2} when is_list(Cmds) -> - handle_commands(S, Cmds, State#state { invoke_state = S2 }) + try handle_message(Tag, Key, Content, State) of + {Cmds, IState} when Cmds =:= []; Cmds =:= ok -> + {noreply, State#state { invoke_state = IState }}; + {Cmds, IState} when is_list(Cmds) -> + handle_commands(S, Cmds, State#state { invoke_state = IState }) catch Class:Error -> lager:error("Handler function crashed: {~p, ~p}, stack: ~p, content: ~p", @@ -195,24 +192,23 @@ handle_deliver_bulk({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}, end. handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}, - #amqp_msg { - props = #'P_basic' { - correlation_id = CorrID, - reply_to = ReplyTo }} = Content}, - #state { invoke = Fun, invoke_state = IState, mode = Mode } = State) -> + #amqp_msg { + props = #'P_basic' { + correlation_id = CorrID, + reply_to = ReplyTo }} = Content}, State) -> S = erlang:monotonic_time(), Tag = {DTag, ReplyTo, CorrID}, try %% Transform a single message into the style of bulk messages - {Cmds, S2} = case handle_message(Tag, Fun, Mode, Key, Content, IState) of - {ack, IState2} -> {[{ack, Tag}], IState2}; - {reply, CType, Msg, IState2} -> {[{reply, Tag, CType, Msg}], IState2}; - {reject, IState2} -> {[{reject, Tag}], IState2}; - {remove, IState2} -> {[{remove, Tag}], IState2}; - {stop, Reason, IState2} -> {[{{stop, Reason}, Tag}], IState2}; - {ok, IState2} -> {[], IState2} + {Cmds, IState2} = case handle_message(Tag, Key, Content, State) of + {ack, IState} -> {[{ack, Tag}], IState}; + {reply, CType, Msg, IState} -> {[{reply, Tag, CType, Msg}], IState}; + {reject, IState} -> {[{reject, Tag}], IState}; + {remove, IState} -> {[{remove, Tag}], IState}; + {stop, Reason, IState} -> {[{{stop, Reason}, Tag}], IState}; + {ok, IState} -> {[], IState} end, - handle_commands(S, Cmds, State#state { invoke_state = S2 }) + handle_commands(S, Cmds, State#state { invoke_state = IState2 }) catch Class:Error -> lager:error("Handler function crashed: {~p, ~p}, stack: ~p, content: ~p", @@ -227,49 +223,46 @@ handle_exception(Class, Error, DTag, #state{ channel = Ch } = State) -> amqp_channel:call(Ch, #'basic.reject' { delivery_tag = DTag, requeue = false }), {stop, {Class, Error}, State}. -handle_commands(S, [C | Next], - #state { channel = Channel, conn_name = CN, name = N } = State) -> +handle_commands(S, [C | Next], #state { channel = Channel, conn_name = CN, name = N } = State) -> case C of {ack, Tag} -> exometer_update_latency(CN, N, S), - ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }), - handle_commands(S, Next, State); - {bulk_ack, Tag} -> + ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }), + handle_commands(S, Next, State); + {bulk_ack, Tag} -> exometer_update_latency(CN, N, S), - ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag), multiple = true }), - handle_commands(S, Next, State); - {bulk_nack, Tag} -> + ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag), multiple = true }), + handle_commands(S, Next, State); + {bulk_nack, Tag} -> exometer_update_latency(CN, N, S), - ok = amqp_channel:cast(Channel, #'basic.nack' { delivery_tag = delivery_tag(Tag), multiple = true }), - handle_commands(S, Next, State); - {reject, Tag} -> - exometer:update([CN, N, rejects], 1), - ok = amqp_channel:cast(Channel, - #'basic.reject' { delivery_tag = delivery_tag(Tag), requeue=true }), - handle_commands(S, Next, State); + ok = amqp_channel:cast(Channel, #'basic.nack' { delivery_tag = delivery_tag(Tag), multiple = true }), + handle_commands(S, Next, State); + {reject, Tag} -> + exometer:update([CN, N, rejects], 1), + ok = amqp_channel:cast(Channel, #'basic.reject' { delivery_tag = delivery_tag(Tag), requeue=true }), + handle_commands(S, Next, State); {remove, Tag} -> - exometer:update([CN, N, removals], 1), - ok = amqp_channel:cast(Channel, - #'basic.reject' { delivery_tag = delivery_tag(Tag), requeue = false}), - handle_commands(S, Next, State); + exometer:update([CN, N, removals], 1), + ok = amqp_channel:cast(Channel, #'basic.reject' { delivery_tag = delivery_tag(Tag), requeue = false}), + handle_commands(S, Next, State); {reply, Tag, CType, Msg} -> exometer_update_latency(CN, N, S), - reply(Channel, Tag, CType, Msg), - ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }), - handle_commands(S, Next, State); + reply(Channel, Tag, CType, Msg), + ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }), + handle_commands(S, Next, State); {{stop, Reason}, Tag} -> - ok = amqp_channel:cast(Channel, - #'basic.reject' { delivery_tag = delivery_tag(Tag), requeue = true }), + ok = amqp_channel:cast(Channel, #'basic.reject' { delivery_tag = delivery_tag(Tag), requeue = true }), {stop, Reason, State}; {stop, Reason} -> {stop, Reason, State} end. -handle_message(Tag, Fun, Mode, Key, - #amqp_msg { - payload = Payload, - props = #'P_basic' { - content_type = Type }}, IState) -> +handle_message(Tag, Key, + #amqp_msg { + payload = Payload, + props = #'P_basic' { + content_type = Type }}, + #state { invoke = Fun, invoke_state = IState, mode = Mode }) -> Res = case Mode of M when M =:= simple; M =:= single -> Fun(Key, Type, Payload, IState); bulk -> Fun(Key, Type, Payload, Tag, IState) -- 2.16.4
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor