File 1961-Add-zstd-flush-2.patch of Package erlang
From 488a10a528147e53f89a101652bab7986b291531 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= <essen@ninenines.eu>
Date: Mon, 5 Jan 2026 14:50:33 +0100
Subject: [PATCH] Add zstd:flush/2
This function uses the ZSTD_e_flush end directive to flush
compressed data while still keeping the compression context
open. The flushed data can be decompressed without waiting
for the rest of the frame.
---
erts/emulator/nifs/common/zstd_nif.c | 19 +++++++-
lib/stdlib/src/zstd.erl | 67 +++++++++++++++++++++++++---
lib/stdlib/test/zstd_SUITE.erl | 12 +++++
3 files changed, 91 insertions(+), 7 deletions(-)
diff --git a/erts/emulator/nifs/common/zstd_nif.c b/erts/emulator/nifs/common/zstd_nif.c
index 05c2f33c21..44e1ea3efe 100644
--- a/erts/emulator/nifs/common/zstd_nif.c
+++ b/erts/emulator/nifs/common/zstd_nif.c
@@ -27,6 +27,11 @@
#define STATIC_ERLANG_NIF 1
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include "sys.h"
#include "erl_nif.h"
#define ZSTD_STATIC_LINKING_ONLY
#include "erl_zstd.h"
@@ -582,7 +587,11 @@ static ERL_NIF_TERM codec_nif(ZstdCtx* ctx,
ZSTD_outBuffer out_buffer;
ZSTD_inBuffer in_buffer;
size_t remaining;
- int flush = enif_is_identical(argv[1], am_true);
+ int flush;
+
+ if (!enif_get_int(env, argv[1], &flush)) {
+ return enif_make_badarg(env);
+ }
if (!enif_inspect_binary(env, argv[0], &input)) {
return enif_make_badarg(env);
@@ -663,7 +672,7 @@ static size_t compress_stream_callback(
return ZSTD_compressStream2(ctx->handle.c,
out_buffer,
in_buffer,
- flush ? ZSTD_e_end : ZSTD_e_continue);
+ (ZSTD_EndDirective)flush);
}
static ERL_NIF_TERM compress_stream_nif(ErlNifEnv *env,
@@ -1044,6 +1053,12 @@ static int load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM arg) {
(void)arg;
+ /* Compile-time checks to ensure ZSTD_EndDirective values
+ * match Erlang defines. */
+ ERTS_CT_ASSERT(ZSTD_e_continue == 0);
+ ERTS_CT_ASSERT(ZSTD_e_flush == 1);
+ ERTS_CT_ASSERT(ZSTD_e_end == 2);
+
return 0;
}
diff --git a/lib/stdlib/src/zstd.erl b/lib/stdlib/src/zstd.erl
index b8503214a6..b143de628f 100644
--- a/lib/stdlib/src/zstd.erl
+++ b/lib/stdlib/src/zstd.erl
@@ -210,7 +210,7 @@ To reset the context to not use any dictionary use the empty dictionary, that is
-export([compress/1, compress/2, decompress/1, decompress/2]).
%% Streaming API
--export([context/1, context/2, stream/2, finish/2, set_parameter/3,
+-export([context/1, context/2, stream/2, flush/2, finish/2, set_parameter/3,
get_parameter/2, reset/1, close/1]).
%% Dict API
@@ -239,6 +239,11 @@ To reset the context to not use any dictionary use the empty dictionary, that is
get_frame_header_nif/1
]).
+%% ZSTD_EndDirective.
+-define(ZSTD_e_continue, 0).
+-define(ZSTD_e_flush, 1).
+-define(ZSTD_e_end, 2).
+
-spec on_load() -> ok.
on_load() ->
ok = erlang:load_nif("zstd", zstd).
@@ -493,9 +498,57 @@ stream(Ctx, Data) when is_list(Data) ->
error(badarg)
end;
stream({compress, Ref}, Data) ->
- compress_stream_nif(Ref, Data, false);
+ compress_stream_nif(Ref, Data, ?ZSTD_e_continue);
stream({decompress, Ref}, Data) ->
- decompress_stream_nif(Ref, Data, false).
+ decompress_stream_nif(Ref, Data, ?ZSTD_e_continue).
+
+-doc """
+Flush the compression stream.
+
+This flushes all pending compressed data without ending the frame,
+allowing the compressed data to be read immediately while keeping
+the context open for further compression.
+
+Example:
+
+```
+1> {ok, CCtx} = zstd:context(compress).
+2> {continue, C1} = zstd:stream(CCtx, ~"hello").
+3> {continue, C2} = zstd:flush(CCtx, ~"").
+4> zstd:decompress([C1, C2]).
+[<<"hello">>]
+```
+""".
+-doc #{ since => "OTP 29.0" }.
+-spec flush(Ctx :: context(), Data :: iodata()) -> Result when
+ Result :: {continue, Remainder :: erlang:iovec(), Output :: binary()} |
+ {continue, Output :: binary()}.
+flush({compress, Ref}, Data) when is_binary(Data) ->
+ case compress_stream_nif(Ref, Data, ?ZSTD_e_flush) of
+ {done, Output} ->
+ {continue, Output};
+ Res ->
+ Res
+ end;
+flush(Ctx, Data) when is_list(Data) ->
+ %% Handle iodata similar to stream/2
+ try erlang:iolist_to_iovec(Data) of
+ [] ->
+ flush(Ctx, <<>>);
+ [H] ->
+ flush(Ctx, H);
+ [H|T] ->
+ case flush(Ctx, H) of
+ {continue, Rem, Out} ->
+ {continue, [Rem | T], Out};
+ {continue, Out} ->
+ {continue, T, Out}
+ end
+ catch _:_ ->
+ error(badarg)
+ end;
+flush(Ctx={decompress, _}, _) ->
+ error({badarg, {invalid_context, Ctx}}).
-doc """
Finish compressing/decompressing data.
@@ -543,7 +596,7 @@ finish(Ctx, Data) ->
end.
finish_1(Codec, Ref, Data) when is_binary(Data) ->
- case Codec(Ref, Data, true) of
+ case Codec(Ref, Data, ?ZSTD_e_end) of
{continue, Remainder, Output} ->
case finish_1(Codec, Ref, Remainder) of
{done, Tail} ->
@@ -670,7 +723,11 @@ decompress(Data, {decompress, Ref}) ->
erlang:iolist_to_iovec(Data)).
codec_loop(Codec, Ref, [Data | Next]) ->
- case Codec(Ref, Data, Next =:= []) of
+ EndDirective = case Next of
+ [] -> ?ZSTD_e_end;
+ _ -> ?ZSTD_e_continue
+ end,
+ case Codec(Ref, Data, EndDirective) of
{continue, Remainder, Output} ->
[Output | codec_loop(Codec, Ref, [Remainder | Next])];
{continue, <<>>} ->
diff --git a/lib/stdlib/test/zstd_SUITE.erl b/lib/stdlib/test/zstd_SUITE.erl
index 68ba82c47e..c40d643bc2 100644
--- a/lib/stdlib/test/zstd_SUITE.erl
+++ b/lib/stdlib/test/zstd_SUITE.erl
@@ -207,6 +207,18 @@ cstream(_Config) ->
{done, C5_1} = zstd:finish(CCtx, [LargeData, LargeData]),
?assertEqual(iob([LargeData, LargeData]), iob(zstd:decompress([C5_1]))),
+ %% Test flushing of compressed data. Both partial and full decompression
+ %% are confirmed to work.
+ {ok, FCtx} = zstd:context(decompress),
+ {continue, F1} = zstd:stream(CCtx, ~"hello"),
+ {continue, F2} = zstd:flush(CCtx, <<>>),
+ ?assertEqual(~"hello", iob(zstd:decompress([F1, F2], FCtx))),
+ {continue, F3} = zstd:stream(CCtx, ~"world"),
+ {done, F4} = zstd:finish(CCtx, <<>>),
+ ?assertEqual(~"world", iob(zstd:decompress([F3, F4], FCtx))),
+ Final = iolist_to_binary([F1, F2, F3, F4]),
+ ?assertEqual(~"helloworld", iob(zstd:decompress(Final))),
+
ok = zstd:close(CCtx),
{'EXIT',{badarg,_}} = catch zstd:finish(CCtx, Data),
--
2.51.0