File 1961-Add-zstd-flush-2.patch of Package erlang

From 488a10a528147e53f89a101652bab7986b291531 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= <essen@ninenines.eu>
Date: Mon, 5 Jan 2026 14:50:33 +0100
Subject: [PATCH] Add zstd:flush/2

This function uses the ZSTD_e_flush end directive to flush
compressed data while still keeping the compression context
open. The flushed data can be decompressed without waiting
for the rest of the frame.
---
 erts/emulator/nifs/common/zstd_nif.c | 19 +++++++-
 lib/stdlib/src/zstd.erl              | 67 +++++++++++++++++++++++++---
 lib/stdlib/test/zstd_SUITE.erl       | 12 +++++
 3 files changed, 91 insertions(+), 7 deletions(-)

diff --git a/erts/emulator/nifs/common/zstd_nif.c b/erts/emulator/nifs/common/zstd_nif.c
index 05c2f33c21..44e1ea3efe 100644
--- a/erts/emulator/nifs/common/zstd_nif.c
+++ b/erts/emulator/nifs/common/zstd_nif.c
@@ -27,6 +27,11 @@
 
 #define STATIC_ERLANG_NIF 1
 
+#ifdef HAVE_CONFIG_H
+#  include "config.h"
+#endif
+
+#include "sys.h"
 #include "erl_nif.h"
 #define ZSTD_STATIC_LINKING_ONLY
 #include "erl_zstd.h"
@@ -582,7 +587,11 @@ static ERL_NIF_TERM codec_nif(ZstdCtx* ctx,
     ZSTD_outBuffer out_buffer;
     ZSTD_inBuffer in_buffer;
     size_t remaining;
-    int flush = enif_is_identical(argv[1], am_true);
+    int flush;
+
+    if (!enif_get_int(env, argv[1], &flush)) {
+        return enif_make_badarg(env);
+    }
 
     if (!enif_inspect_binary(env, argv[0], &input)) {
         return enif_make_badarg(env);
@@ -663,7 +672,7 @@ static size_t compress_stream_callback(
     return ZSTD_compressStream2(ctx->handle.c,
                                 out_buffer,
                                 in_buffer,
-                                flush ? ZSTD_e_end : ZSTD_e_continue);
+                                (ZSTD_EndDirective)flush);
 }
 
 static ERL_NIF_TERM compress_stream_nif(ErlNifEnv *env,
@@ -1044,6 +1053,12 @@ static int load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM arg) {
 
     (void)arg;
 
+    /* Compile-time checks to ensure ZSTD_EndDirective values
+     * match Erlang defines. */
+    ERTS_CT_ASSERT(ZSTD_e_continue == 0);
+    ERTS_CT_ASSERT(ZSTD_e_flush == 1);
+    ERTS_CT_ASSERT(ZSTD_e_end == 2);
+
     return 0;
 }
 
diff --git a/lib/stdlib/src/zstd.erl b/lib/stdlib/src/zstd.erl
index b8503214a6..b143de628f 100644
--- a/lib/stdlib/src/zstd.erl
+++ b/lib/stdlib/src/zstd.erl
@@ -210,7 +210,7 @@ To reset the context to not use any dictionary use the empty dictionary, that is
 -export([compress/1, compress/2, decompress/1, decompress/2]).
 
 %% Streaming API
--export([context/1, context/2, stream/2, finish/2, set_parameter/3,
+-export([context/1, context/2, stream/2, flush/2, finish/2, set_parameter/3,
          get_parameter/2, reset/1, close/1]).
 
 %% Dict API
@@ -239,6 +239,11 @@ To reset the context to not use any dictionary use the empty dictionary, that is
        get_frame_header_nif/1
       ]).
 
+%% ZSTD_EndDirective.
+-define(ZSTD_e_continue, 0).
+-define(ZSTD_e_flush,    1).
+-define(ZSTD_e_end,      2).
+
 -spec on_load() -> ok.
 on_load() ->
     ok = erlang:load_nif("zstd", zstd).
@@ -493,9 +498,57 @@ stream(Ctx, Data) when is_list(Data) ->
             error(badarg)
     end;
 stream({compress, Ref}, Data) ->
-    compress_stream_nif(Ref, Data, false);
+    compress_stream_nif(Ref, Data, ?ZSTD_e_continue);
 stream({decompress, Ref}, Data) ->
-    decompress_stream_nif(Ref, Data, false).
+    decompress_stream_nif(Ref, Data, ?ZSTD_e_continue).
+
+-doc """
+Flush the compression stream.
+
+This flushes all pending compressed data without ending the frame,
+allowing the compressed data to be read immediately while keeping
+the context open for further compression.
+
+Example:
+
+```
+1> {ok, CCtx} = zstd:context(compress).
+2> {continue, C1} = zstd:stream(CCtx, ~"hello").
+3> {continue, C2} = zstd:flush(CCtx, ~"").
+4> zstd:decompress([C1, C2]).
+[<<"hello">>]
+```
+""".
+-doc #{ since => "OTP 29.0" }.
+-spec flush(Ctx :: context(), Data :: iodata()) -> Result when
+      Result :: {continue, Remainder :: erlang:iovec(), Output :: binary()} |
+                {continue, Output :: binary()}.
+flush({compress, Ref}, Data) when is_binary(Data) ->
+    case compress_stream_nif(Ref, Data, ?ZSTD_e_flush) of
+        {done, Output} ->
+            {continue, Output};
+        Res ->
+            Res
+    end;
+flush(Ctx, Data) when is_list(Data) ->
+    %% Handle iodata similar to stream/2
+    try erlang:iolist_to_iovec(Data) of
+        [] ->
+            flush(Ctx, <<>>);
+        [H] ->
+            flush(Ctx, H);
+        [H|T] ->
+            case flush(Ctx, H) of
+                {continue, Rem, Out} ->
+                    {continue, [Rem | T], Out};
+                {continue, Out} ->
+                    {continue, T, Out}
+            end
+    catch _:_ ->
+            error(badarg)
+    end;
+flush(Ctx={decompress, _}, _) ->
+    error({badarg, {invalid_context, Ctx}}).
 
 -doc """
 Finish compressing/decompressing data.
@@ -543,7 +596,7 @@ finish(Ctx, Data) ->
     end.
 
 finish_1(Codec, Ref, Data) when is_binary(Data) ->
-    case Codec(Ref, Data, true) of
+    case Codec(Ref, Data, ?ZSTD_e_end) of
         {continue, Remainder, Output} ->
             case finish_1(Codec, Ref, Remainder) of
                 {done, Tail} ->
@@ -670,7 +723,11 @@ decompress(Data, {decompress, Ref}) ->
                erlang:iolist_to_iovec(Data)).
 
 codec_loop(Codec, Ref, [Data | Next]) ->
-    case Codec(Ref, Data, Next =:= []) of
+    EndDirective = case Next of
+        [] -> ?ZSTD_e_end;
+        _ -> ?ZSTD_e_continue
+    end,
+    case Codec(Ref, Data, EndDirective) of
         {continue, Remainder, Output} ->
             [Output | codec_loop(Codec, Ref, [Remainder | Next])];
         {continue, <<>>} ->
diff --git a/lib/stdlib/test/zstd_SUITE.erl b/lib/stdlib/test/zstd_SUITE.erl
index 68ba82c47e..c40d643bc2 100644
--- a/lib/stdlib/test/zstd_SUITE.erl
+++ b/lib/stdlib/test/zstd_SUITE.erl
@@ -207,6 +207,18 @@ cstream(_Config) ->
     {done, C5_1} = zstd:finish(CCtx, [LargeData, LargeData]),
     ?assertEqual(iob([LargeData, LargeData]), iob(zstd:decompress([C5_1]))),
 
+    %% Test flushing of compressed data. Both partial and full decompression
+    %% are confirmed to work.
+    {ok, FCtx} = zstd:context(decompress),
+    {continue, F1} = zstd:stream(CCtx, ~"hello"),
+    {continue, F2} = zstd:flush(CCtx, <<>>),
+    ?assertEqual(~"hello", iob(zstd:decompress([F1, F2], FCtx))),
+    {continue, F3} = zstd:stream(CCtx, ~"world"),
+    {done, F4} = zstd:finish(CCtx, <<>>),
+    ?assertEqual(~"world", iob(zstd:decompress([F3, F4], FCtx))),
+    Final = iolist_to_binary([F1, F2, F3, F4]),
+    ?assertEqual(~"helloworld", iob(zstd:decompress(Final))),
+
     ok = zstd:close(CCtx),
     {'EXIT',{badarg,_}} = catch zstd:finish(CCtx, Data),
 
-- 
2.51.0

openSUSE Build Service is sponsored by