File 0001-Compression-parameter-created.patch of Package rabbitmq-server
From 33aba3776b2c32920b06ddf9cd47260472f08d39 Mon Sep 17 00:00:00 2001
From: Jose Antonio Deniz Fabelo <jose.deniz@ns90.ie>
Date: Thu, 27 Apr 2023 20:33:45 +0100
Subject: [PATCH 1/2] Compression parameter created
---
.../rabbitmq_shovel/include/rabbit_shovel.hrl | 1 +
.../src/rabbit_amqp091_shovel.erl | 19 ++++++++++++++++---
.../src/rabbit_shovel_config.erl | 15 +++++++++++++++
.../src/rabbit_shovel_parameters.erl | 7 +++++++
deps/rabbitmq_shovel_management/README.md | 1 +
.../priv/www/js/shovel.js | 10 ++++++++++
.../priv/www/js/tmpl/dynamic-shovel.ejs | 4 ++++
.../priv/www/js/tmpl/dynamic-shovels.ejs | 15 +++++++++++++++
8 files changed, 69 insertions(+), 3 deletions(-)
diff --git a/deps/rabbitmq_shovel/include/rabbit_shovel.hrl b/deps/rabbitmq_shovel/include/rabbit_shovel.hrl
index 7518aa6871..e2d3e39e34 100644
--- a/deps/rabbitmq_shovel/include/rabbit_shovel.hrl
+++ b/deps/rabbitmq_shovel/include/rabbit_shovel.hrl
@@ -26,6 +26,7 @@
-define(DEFAULT_PREFETCH, 1000).
-define(DEFAULT_ACK_MODE, on_confirm).
+-define(DEFAULT_COMPRESS_MODE, none).
-define(DEFAULT_RECONNECT_DELAY, 5).
-define(SHOVEL_GUIDE_URL, <<"https://rabbitmq.com/shovel.html">>).
diff --git a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
index 8e4f3f227e..6f17fa58db 100644
--- a/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
+++ b/deps/rabbitmq_shovel/src/rabbit_amqp091_shovel.erl
@@ -191,13 +191,26 @@ do_forward(IncomingTag, Props, Payload,
fields_fun := FieldsFun}}) ->
SrcUri = rabbit_shovel_behaviour:source_uri(State0),
% do publish
+ % Conf = #{compress_mode := CompressMode},
+ CompressMode = maps:get(compress_mode, State0 , none),
Exchange = maps:get(exchange, Props, undefined),
RoutingKey = maps:get(routing_key, Props, undefined),
Method = #'basic.publish'{exchange = Exchange, routing_key = RoutingKey},
Method1 = FieldsFun(SrcUri, DstUri, Method),
- Msg1 = #amqp_msg{props = PropsFun(SrcUri, DstUri, props_from_map(Props)),
- payload = Payload},
- publish(IncomingTag, Method1, Msg1, State0).
+ case CompressMode of
+ none ->
+ Msg1 = #amqp_msg{props = PropsFun(SrcUri, DstUri, props_from_map(Props)),
+ payload = Payload},
+ publish(IncomingTag, Method1, Msg1, State0);
+ gzip ->
+ Msg1 = #amqp_msg{props = PropsFun(SrcUri, DstUri, props_from_map(Props)),
+ payload = zlib:gzip(Payload)},
+ publish(IncomingTag, Method1, Msg1, State0);
+ gunzip ->
+ Msg1 = #amqp_msg{props = PropsFun(SrcUri, DstUri, props_from_map(Props)),
+ payload = zlib:gunzip(Payload)},
+ publish(IncomingTag, Method1, Msg1, State0)
+ end.
props_from_map(Map) ->
#'P_basic'{content_type = maps:get(content_type, Map, undefined),
diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_config.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_config.erl
index bc294569df..e170c34a8e 100644
--- a/deps/rabbitmq_shovel/src/rabbit_shovel_config.erl
+++ b/deps/rabbitmq_shovel/src/rabbit_shovel_config.erl
@@ -42,6 +42,8 @@ convert_from_legacy(Config) ->
RD = proplists:get_value(reconnect_delay, Config, ?DEFAULT_RECONNECT_DELAY),
AckMode = proplists:get_value(ack_mode, Config, ?DEFAULT_ACK_MODE),
validate_ack_mode(AckMode),
+ CompressMode = proplists:get_value(compress_mode, Config, ?DEFAULT_COMPRESS_MODE),
+ validate_compress_mode(CompressMode),
PubFields = proplists:get_value(publish_fields, Config, []),
PubProps = proplists:get_value(publish_properties, Config, []),
AFH = proplists:get_value(add_forward_headers, Config, false),
@@ -64,6 +66,7 @@ convert_from_legacy(Config) ->
{add_forward_headers, AFH},
{add_timestamp_header, ATH}]},
{ack_mode, AckMode},
+ {compress_mode, CompressMode},
{reconnect_delay, RD}].
parse(ShovelName, Config0) ->
@@ -133,9 +136,12 @@ parse_current(ShovelName, Config) ->
DstMod = resolve_module(proplists:get_value(protocol, Destination, amqp091)),
AckMode = proplists:get_value(ack_mode, Config, no_ack),
validate_ack_mode(AckMode),
+ CompressMode = proplists:get_value(compress_mode, Config, none),
+ validate_compress_mode(CompressMode),
{ok, #{name => ShovelName,
shovel_type => static,
ack_mode => AckMode,
+ compress_mode => CompressMode,
reconnect_delay => proplists:get_value(reconnect_delay, Config,
?DEFAULT_RECONNECT_DELAY),
source => rabbit_shovel_behaviour:parse(SrcMod, ShovelName,
@@ -163,6 +169,15 @@ validate_ack_mode(WrongVal) ->
{ack_mode_value_requires_one_of, {no_ack, on_publish, on_confirm},
WrongVal}}).
+validate_compress_mode(Val) when Val =:= none orelse
+ Val =:= gzip orelse
+ Val =:= gunzip ->
+ ok;
+validate_compress_mode(WrongVal) ->
+ fail({invalid_parameter_value, compress_mode,
+ {compress_mode_value_requires_one_of, {none, gzip, gunzip},
+ WrongVal}}).
+
duplicate_keys(PropList) when is_list(PropList) ->
proplists:get_keys(
lists:foldl(fun (K, L) -> lists:keydelete(K, 1, L) end, PropList,
diff --git a/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl b/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl
index c39c90eb1f..d1f657ff8d 100644
--- a/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl
+++ b/deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl
@@ -104,6 +104,8 @@ shovel_validation() ->
[{<<"reconnect-delay">>, fun rabbit_parameter_validation:number/2,optional},
{<<"ack-mode">>, rabbit_parameter_validation:enum(
['no-ack', 'on-publish', 'on-confirm']), optional},
+ {<<"compress-mode">>, rabbit_parameter_validation:enum(
+ ['none', 'gzip', 'gunzip']), optional},
{<<"src-protocol">>,
rabbit_parameter_validation:enum(['amqp10', 'amqp091']), optional},
{<<"dest-protocol">>,
@@ -272,6 +274,7 @@ parse({VHost, Name}, ClusterName, Def) ->
dest => parse_dest({VHost, Name}, ClusterName, Def,
SourceHeaders),
ack_mode => translate_ack_mode(pget(<<"ack-mode">>, Def, <<"on-confirm">>)),
+ compress_mode => translate_compress_mode(pget(<<"compress-mode">>, Def, <<"none">>)),
reconnect_delay => pget(<<"reconnect-delay">>, Def,
?DEFAULT_RECONNECT_DELAY)}}.
@@ -450,6 +453,10 @@ translate_ack_mode(<<"on-confirm">>) -> on_confirm;
translate_ack_mode(<<"on-publish">>) -> on_publish;
translate_ack_mode(<<"no-ack">>) -> no_ack.
+translate_compress_mode(<<"none">>) -> none;
+translate_compress_mode(<<"gzip">>) -> gzip;
+translate_compress_mode(<<"gunzip">>) -> gunzip.
+
ensure_queue(Conn, Queue, XArgs) ->
{ok, Ch} = amqp_connection:open_channel(Conn),
try
diff --git a/deps/rabbitmq_shovel_management/README.md b/deps/rabbitmq_shovel_management/README.md
index 8e2bcfc8a4..a1605db6ab 100644
--- a/deps/rabbitmq_shovel_management/README.md
+++ b/deps/rabbitmq_shovel_management/README.md
@@ -48,6 +48,7 @@ Create a file called ``shovel.json`` similar to the following, replacing the par
"name": "my-shovel",
"value": {
"ack-mode": "on-publish",
+ "compress-mode": "none",
"add-forward-headers": false,
"delete-after": "never",
"dest-exchange": null,
diff --git a/deps/rabbitmq_shovel_management/priv/www/js/shovel.js b/deps/rabbitmq_shovel_management/priv/www/js/shovel.js
index 1a89aaa477..300a90c241 100644
--- a/deps/rabbitmq_shovel_management/priv/www/js/shovel.js
+++ b/deps/rabbitmq_shovel_management/priv/www/js/shovel.js
@@ -133,6 +133,16 @@ HELP['shovel-ack-mode'] =
<dd>Message acknowledgements are not used. The fastest option, but may lose messages in the event of network or broker failures.</dd>\
</dl>';
+HELP['shovel-compress-mode'] =
+ '<dl>\
+ <dt><code>none</code></dt>\
+ <dd>Messages are forwarded without changes.</dd>\
+ <dt><code>gzip</code></dt>\
+ <dd>Messages are compressed using zlib:gzip.</dd>\
+ <dt><code>gunzip</code></dt>\
+ <dd>Messages are decompressed using zlib:gunzip.</dd>\
+</dl>';
+
HELP['shovel-amqp091-auto-delete'] =
'<dl>\
<dt><code>Never</code></dt>\
diff --git a/deps/rabbitmq_shovel_management/priv/www/js/tmpl/dynamic-shovel.ejs b/deps/rabbitmq_shovel_management/priv/www/js/tmpl/dynamic-shovel.ejs
index 58607c4b0f..2b0a1dfd79 100644
--- a/deps/rabbitmq_shovel_management/priv/www/js/tmpl/dynamic-shovel.ejs
+++ b/deps/rabbitmq_shovel_management/priv/www/js/tmpl/dynamic-shovel.ejs
@@ -36,6 +36,10 @@
<th>Ack mode</th>
<td><%= fmt_string(shovel.value['ack-mode']) %></td>
</tr>
+ <tr>
+ <th>Compression mode</th>
+ <td><%= fmt_string(shovel.value['compress-mode']) %></td>
+ </tr>
<tr>
<th>Auto-delete</th>
<td><%= fmt_string(fallback_value(shovel, 'src-delete-after', 'delete-after')) %></td>
diff --git a/deps/rabbitmq_shovel_management/priv/www/js/tmpl/dynamic-shovels.ejs b/deps/rabbitmq_shovel_management/priv/www/js/tmpl/dynamic-shovels.ejs
index 837674f062..9b3e83a685 100644
--- a/deps/rabbitmq_shovel_management/priv/www/js/tmpl/dynamic-shovels.ejs
+++ b/deps/rabbitmq_shovel_management/priv/www/js/tmpl/dynamic-shovels.ejs
@@ -325,6 +325,21 @@
</select>
</td>
</tr>
+ <tr>
+ <th>
+ <label>
+ Compression mode:
+ <span class="help" id="shovel-compress-mode"></span>
+ </label>
+ </th>
+ <td>
+ <select name="compress-mode">
+ <option value="none">none</option>
+ <option value="gzip">gzip</option>
+ <option value="gunzip">gunzip</option>
+ </select>
+ </td>
+ </tr>
</table>
<input type="submit" value="Add shovel"/>
</form>
--
2.34.1