Sign Up
Log In
Log In
or
Sign Up
Places
All Projects
Status Monitor
Collapse sidebar
home:Ledest:erlang:19
turtle
0021-no_ack-reimplemented-as-new-simple-mode.patch
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 0021-no_ack-reimplemented-as-new-simple-mode.patch of Package turtle
From 9ccd54e9a1fdaf4d7ae0ec7e7db918e7ed6f25de Mon Sep 17 00:00:00 2001 From: Andrey Klymchuk <a.klymchuk@betinvest.com> Date: Mon, 26 Nov 2018 15:09:50 +0200 Subject: [PATCH 1/6] no_ack reimplemented as new simple mode --- src/turtle.erl | 10 +++--- src/turtle_subscriber.erl | 84 +++++++++++++++++++++++++---------------------- 2 files changed, 51 insertions(+), 43 deletions(-) diff --git a/src/turtle.erl b/src/turtle.erl index 79b5de5..fa0a38c 100644 --- a/src/turtle.erl +++ b/src/turtle.erl @@ -290,14 +290,16 @@ await(publisher, Name, Timeout) -> consume(Channel, #'basic.consume' {} = Consume) -> #'basic.consume_ok' { consumer_tag = Tag } = amqp_channel:call(Channel, Consume), {ok, Tag}; -consume(Channel, Queue) -> - consume(Channel, #'basic.consume' { queue = Queue }). +consume(Channel, Queue) -> consume(Channel, Queue, single). %% @doc consume/3 starts consumption on a channel with default parameters %% @end %% @private -consume(Channel, Queue, NoAck) when is_boolean(NoAck) -> - consume(Channel, #'basic.consume' { queue = Queue, no_ack = NoAck }). +consume(Channel, Queue, simple) -> + consume(Channel, #'basic.consume' { queue = Queue, no_ack = true }); + +consume(Channel, Queue, Mode) when Mode =:= single; Mode =:= bulk -> + consume(Channel, #'basic.consume' { queue = Queue, no_ack = false }). %% @doc cancel/2 stop consumption on a channel again. diff --git a/src/turtle_subscriber.erl b/src/turtle_subscriber.erl index 5d2f759..c6f8aeb 100644 --- a/src/turtle_subscriber.erl +++ b/src/turtle_subscriber.erl @@ -32,7 +32,7 @@ monitor :: reference(), consumer_tag, no_ack = #'basic.consume'{}#'basic.consume'.no_ack :: boolean(), - mode = single + mode = single :: simple | single | bulk }). %% LIFETIME MAINTENANCE @@ -56,9 +56,8 @@ init([#{ ok = turtle:qos(Ch, Conf), ok = amqp_channel:register_return_handler(Ch, self()), ok = turtle:declare(Ch, Decls, #{ passive => Passive }), - NoAck = maps:get(no_ack, Conf, #state{}#state.no_ack), - {ok, Tag} = turtle:consume(Ch, Queue, NoAck), Mode = mode(Conf), + {ok, Tag} = turtle:consume(Ch, Queue, Mode), {ok, #state { consumer_tag = Tag, invoke = Fun, @@ -68,7 +67,6 @@ init([#{ monitor = MRef, conn_name = ConnName, name = Name, - no_ack = NoAck, mode = Mode }}. @@ -88,6 +86,8 @@ handle_info(#'basic.consume_ok'{}, State) -> handle_info(#'basic.cancel_ok'{}, State) -> lager:info("Consumption canceled"), {stop, normal, shutdown(rabbitmq_gone, State)}; +handle_info({#'basic.deliver'{}, _Content} = Msg, #state { mode = simple } = State) -> + handle_deliver_simple(Msg, State); handle_info({#'basic.deliver'{}, _Content} = Msg, #state { mode = single } = State) -> handle_deliver_single(Msg, State); handle_info({#'basic.deliver'{}, _Content} = Msg, #state { mode = bulk } = State) -> @@ -112,6 +112,7 @@ handle_info(Info, #state { handle_info = HandleInfo, invoke_state = IState } = S S = erlang:monotonic_time(), try HandleInfo(Info, IState) of {ok, IState2} -> {noreply, State#state { invoke_state = IState2 }}; + {stop, Reason, IState2} -> {stop, Reason, State#state { invoke_state = IState2 }}; {Cmds, IState2} when is_list(Cmds) -> handle_commands(S, Cmds, State#state { invoke_state = IState2 }) catch @@ -147,20 +148,41 @@ code_change(_, State, _) -> %% INTERNAL FUNCTIONS %% +handle_deliver_simple({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}, + #amqp_msg { + props = #'P_basic' { + correlation_id = CorrID, + reply_to = ReplyTo }} = Content}, + #state { + channel = Channel, invoke = Fun, invoke_state = IState, mode = Mode} = State) -> + Tag = {DTag, ReplyTo, CorrID}, + try handle_message(Tag, Fun, Mode, Key, Content, IState) of + {reply, CType, Msg, S2} -> + reply(Channel, Tag, CType, Msg), + {noreply, State#state { invoke_state = S2 }}; + {stop, Reason, S2} -> + {stop, Reason, State#state { invoke_state = S2 }}; + {_, S2} -> + {noreply, State#state { invoke_state = S2 }} + catch + Class:Error -> + lager:error("Handler function crashed: {~p, ~p}, stack: ~p, content: ~p", + [Class, Error, erlang:get_stacktrace(), format_amqp_msg(Content)]), + lager:error("Mailbox size ~p", [erlang:process_info(self(), message_queue_len)]), + handle_exception(Class, Error, DTag, State) + end. + handle_deliver_bulk({#'basic.deliver' {delivery_tag = DTag, routing_key = Key}, #amqp_msg { props = #'P_basic' { correlation_id = CorrID, reply_to = ReplyTo }} = Content}, #state { - invoke = Fun, invoke_state = IState, - conn_name = CN, - name = N } = State) -> + invoke = Fun, invoke_state = IState, mode = Mode} = State) -> S = erlang:monotonic_time(), Tag = {DTag, ReplyTo, CorrID}, - try handle_message(Tag, Fun, Key, Content, IState) of + try handle_message(Tag, Fun, Mode, Key, Content, IState) of {Cmds, S2} when Cmds =:= []; Cmds =:= ok -> - exometer_update_latency(CN, N, S), {noreply, State#state { invoke_state = S2 }}; {Cmds, S2} when is_list(Cmds) -> handle_commands(S, Cmds, State#state { invoke_state = S2 }) @@ -177,23 +199,18 @@ handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key} props = #'P_basic' { correlation_id = CorrID, reply_to = ReplyTo }} = Content}, - #state { invoke = Fun, invoke_state = IState } = State) -> + #state { invoke = Fun, invoke_state = IState, mode = Mode } = State) -> S = erlang:monotonic_time(), Tag = {DTag, ReplyTo, CorrID}, try %% Transform a single message into the style of bulk messages - %% by temporarily inserting an empty tag - SingleTag = {undefined, ReplyTo, CorrID}, - {Cmds, S2} = case handle_message(SingleTag, Fun, Key, Content, IState) of + {Cmds, S2} = case handle_message(Tag, Fun, Mode, Key, Content, IState) of {ack, IState2} -> {[{ack, Tag}], IState2}; - {reply, _Tag, CType, Msg, IState2} -> {[{reply, Tag, CType, Msg}], IState2}; + {reply, CType, Msg, IState2} -> {[{reply, Tag, CType, Msg}], IState2}; {reject, IState2} -> {[{reject, Tag}], IState2}; {remove, IState2} -> {[{remove, Tag}], IState2}; {stop, Reason, IState2} -> {[{{stop, Reason}, Tag}], IState2}; - {ok, IState2} -> - #state { conn_name = CN, name = N } = State, - exometer_update_latency(CN, N, S), - {[], IState2} + {ok, IState2} -> {[], IState2} end, handle_commands(S, Cmds, State#state { invoke_state = S2 }) catch @@ -204,25 +221,12 @@ handle_deliver_single({#'basic.deliver' {delivery_tag = DTag, routing_key = Key} handle_exception(Class, Error, DTag, State) end. -handle_exception(Class, Error, _DTag, #state{ no_ack = true } = State) -> +handle_exception(Class, Error, _DTag, #state{ mode = simple } = State) -> {stop, {Class, Error}, State}; handle_exception(Class, Error, DTag, #state{ channel = Ch } = State) -> amqp_channel:call(Ch, #'basic.reject' { delivery_tag = DTag, requeue = false }), {stop, {Class, Error}, State}. -handle_commands(_S, [], State) -> - {noreply, State}; -handle_commands(S, [{reply, Tag, CType, Msg} | Next], - #state { channel = Channel, conn_name = CN, name = N, no_ack = true } = State) -> - exometer_update_latency(CN, N, S), - reply(Channel, Tag, CType, Msg), - handle_commands(S, Next, State); -handle_commands(_S, [{stop, Reason, _Tag} | _Next], #state { no_ack = true } = State) -> - {stop, Reason, State}; -handle_commands(_S, [{{stop, Reason}, _Tag} | _Next], #state { no_ack = true } = State) -> - {stop, Reason, State}; -handle_commands(S, [_C | Next], #state { no_ack = true } = State) -> - handle_commands(S, Next, State); handle_commands(S, [C | Next], #state { channel = Channel, conn_name = CN, name = N } = State) -> case C of @@ -253,21 +257,22 @@ handle_commands(S, [C | Next], reply(Channel, Tag, CType, Msg), ok = amqp_channel:cast(Channel, #'basic.ack' { delivery_tag = delivery_tag(Tag) }), handle_commands(S, Next, State); - {{stop, Reason}, Tag} -> ok = amqp_channel:cast(Channel, #'basic.reject' { delivery_tag = delivery_tag(Tag), requeue = true }), + {stop, Reason, State}; + {stop, Reason} -> {stop, Reason, State} end. -handle_message(Tag, Fun, Key, +handle_message(Tag, Fun, Mode, Key, #amqp_msg { payload = Payload, props = #'P_basic' { content_type = Type }}, IState) -> - Res = case Tag of - {undefined, _, _} -> Fun(Key, Type, Payload, IState); - Tag -> Fun(Key, Type, Payload, Tag, IState) + Res = case Mode of + M when M =:= simple; M =:= single -> Fun(Key, Type, Payload, IState); + bulk -> Fun(Key, Type, Payload, Tag, IState) end, case Res of %% Bulk messages @@ -277,8 +282,8 @@ handle_message(Tag, Fun, Key, %% Single messages ack -> {ack, IState}; {ack, IState2} -> {ack, IState2}; - {reply, CType, Msg} -> {reply, Tag, CType, Msg, IState}; - {reply, CType, Msg, IState2} -> {reply, Tag, CType, Msg, IState2}; + {reply, CType, Msg} -> {reply, CType, Msg, IState}; + {reply, CType, Msg, IState2} -> {reply, CType, Msg, IState2}; reject -> {reject, IState}; {reject, IState2} -> {reject, IState2}; remove -> {remove, IState}; @@ -335,6 +340,7 @@ drain_reject_messages(Channel) -> ok end. +mode(#{ mode := simple }) -> simple; mode(#{ mode := bulk }) -> bulk; mode(#{ mode := single }) -> single; mode(#{}) -> single. -- 2.16.4
Locations
Projects
Search
Status Monitor
Help
OpenBuildService.org
Documentation
API Documentation
Code of Conduct
Contact
Support
@OBShq
Terms
openSUSE Build Service is sponsored by
The Open Build Service is an
openSUSE project
.
Sign Up
Log In
Places
Places
All Projects
Status Monitor