implement proper deflate flush semantics

To end a flate stream, `finish` must now be called. `flush` now follows
regular semantics and byte-aligns the stream.

Byte-aligning the stream is done with empty fixed or store blocks. To
implement flush, a variable history length was added and it is tracked
if the final bytes of history have been hashed yet.
This commit is contained in:
Kendall Condon 2026-03-06 19:10:47 -05:00
parent 46658257f4
commit d62c73b20c
2 changed files with 209 additions and 110 deletions

View file

@ -2,7 +2,8 @@
//!
//! The source of an `error.WriteFailed` is always the backing writer. After an
//! `error.WriteFailed`, the `.writer` becomes `.failing` and is unrecoverable.
//! After a `flush`, the writer also becomes `.failing` since the stream has
//!
//! After `finish`, the writer also becomes `.failing` since the stream has
//! been finished. This behavior also applies to `Raw` and `Huffman`.
// Implementation details:
@ -43,9 +44,10 @@ const PackedOptionalU15 = packed struct(u16) {
pub const null_bit: PackedOptionalU15 = .{ .value = 0, .is_null = true };
};
/// After `flush` is called, all vtable calls with result in `error.WriteFailed.`
/// After `finish` is called, all vtable calls with result in `error.WriteFailed`.
writer: Writer,
has_history: bool,
history_len: u16,
history_end_unhashed: bool,
bit_writer: BitWriter,
buffered_tokens: struct {
/// List of `TokenBufferEntryHeader`s and their trailing data.
@ -108,7 +110,7 @@ const BitWriter = struct {
b.buffered = @intCast(combined >> (combined_bits - b.buffered_n));
}
/// Assserts one byte can be written to `b.otuput` without rebasing.
/// Asserts one byte can be written to `b.output` without rebasing.
pub fn byteAlign(b: *BitWriter) void {
b.output.unusedCapacitySlice()[0] = b.buffered;
b.output.advance(@intFromBool(b.buffered_n != 0));
@ -116,6 +118,35 @@ const BitWriter = struct {
b.buffered_n = 0;
}
/// Byte align using only empty flate blocks
pub fn byteAlignBlocks(b: *BitWriter) Writer.Error!void {
if (b.buffered_n == 0) return;
// There are two methods to do this:
// 1. A store block (5 or 6 bytes)
// 2. Outputting empty 10-bit fixed blocks until aligned
//
// Fixed blocks advance the bit alignment by two, and so can only used for even numbers
// requiring a maximum of four bytes (three blocks = 30 bits) to which is always more
// efficient than store blocks.
if (b.buffered_n & 1 == 0) {
const splat = (8 - @as(u5, b.buffered_n)) >> 1;
const bits = splat * 10;
// fixed eos code is 0, so the only bits are for the block header
const pattern: u32 = BlockHeader.int(.{ .kind = .fixed, .final = false });
const splatted = ((pattern << 20) | (pattern << 10) | pattern) >> (30 - bits);
try b.write(splatted, bits);
} else {
try b.write(BlockHeader.int(.{ .kind = .stored, .final = false }), 3);
try b.output.rebase(0, 5);
b.byteAlign();
b.output.writeInt(u16, 0x0000, .little) catch unreachable;
b.output.writeInt(u16, 0xffff, .little) catch unreachable;
}
assert(b.buffered_n == 0);
}
pub fn writeClen(
b: *BitWriter,
hclen: u4,
@ -159,9 +190,9 @@ const BitWriter = struct {
/// The maximum value is `math.maxInt(u16) - 1` since one token is reserved for end-of-block.
const block_tokens: u16 = 1 << 15;
const lookup_hash_bits = 15;
const Hash = u16; // `u[lookup_hash_bits]` is not used due to worse optimization (with LLVM 21)
const Hash = u16; // `@Int(.unsigned, lookup_hash_bits)` is not used due to worse optimization (with LLVM 21)
const seq_bytes = 3; // not intended to be changed
const Seq = std.meta.Int(.unsigned, seq_bytes * 8);
const Seq = @Int(.unsigned, seq_bytes * 8);
const TokenBufferEntryHeader = packed struct(u16) {
kind: enum(u1) {
@ -295,7 +326,8 @@ pub fn init(
.rebase = rebase,
},
},
.has_history = false,
.history_len = 0,
.history_end_unhashed = false,
.bit_writer = .init(output),
.buffered_tokens = .empty,
.lookup = .{
@ -314,68 +346,110 @@ fn drain(w: *Writer, data: []const []const u8, splat: usize) Writer.Error!usize
errdefer w.* = .failing;
// There may have not been enough space in the buffer and the write was sent directly here.
// However, it is required that all data goes through the buffer to keep a history.
//
// Additionally, ensuring the buffer is always full ensures there is always a full history
// after.
const data_n = w.buffer.len - w.end;
_ = w.fixedDrain(data, splat) catch {};
assert(w.end == w.buffer.len);
try rebaseInner(w, 0, 1, false);
try rebaseInner(w, 0, 1, false, false);
return data_n;
}
fn flush(w: *Writer) Writer.Error!void {
defer w.* = .failing;
errdefer w.* = .failing;
try rebaseInner(w, 0, w.buffer.len - flate.history_len, true, false);
const c: *Compress = @fieldParentPtr("writer", w);
try rebaseInner(w, 0, w.buffer.len - flate.history_len, true);
try c.bit_writer.byteAlignBlocks();
}
pub fn finish(c: *Compress) Writer.Error!void {
defer c.writer = .failing;
try rebaseInner(&c.writer, 0, c.writer.buffer.len - flate.history_len, true, true);
try c.bit_writer.output.rebase(0, 1);
c.bit_writer.byteAlign();
try c.hasher.writeFooter(c.bit_writer.output);
}
fn rebase(w: *Writer, preserve: usize, capacity: usize) Writer.Error!void {
return rebaseInner(w, preserve, capacity, false);
errdefer w.* = .failing;
return rebaseInner(w, preserve, capacity, false, false);
}
pub const rebase_min_preserve = flate.history_len;
pub const rebase_reserved_capacity = (token.max_length + 1) + seq_bytes;
fn rebaseInner(w: *Writer, preserve: usize, capacity: usize, eos: bool) Writer.Error!void {
if (!eos) {
fn rebaseInner(
w: *Writer,
preserve: usize,
capacity: usize,
is_flush: bool,
is_finish: bool,
) Writer.Error!void {
if (!is_flush) {
assert(@max(preserve, rebase_min_preserve) + (capacity + rebase_reserved_capacity) <= w.buffer.len);
assert(w.end >= flate.history_len + rebase_reserved_capacity); // Above assert should
// fail since rebase is only called when `capacity` is not present. This assertion is
// important because a full history is required at the end.
} else {
// Preverse is not considered for `matching_end`
assert(preserve == 0 and capacity == w.buffer.len - flate.history_len);
}
if (is_finish) assert(is_flush);
const c: *Compress = @fieldParentPtr("writer", w);
const buffered = w.buffered();
const start = @as(usize, flate.history_len) * @intFromBool(c.has_history);
const lit_end: usize = if (!eos)
const start: usize = c.history_len;
const hashable_len = buffered.len -| (seq_bytes - 1);
const matching_end: usize = if (!is_flush)
buffered.len - rebase_reserved_capacity - (preserve -| flate.history_len)
else
buffered.len -| (seq_bytes - 1);
hashable_len;
var i = start;
var last_unmatched = i;
// Read from `w.buffer` instead of `buffered` since the latter may not
// have enough bytes. If this is the case, this variable is not used.
var seq: Seq = mem.readInt(
std.meta.Int(.unsigned, (seq_bytes - 1) * 8),
w.buffer[i..][0 .. seq_bytes - 1],
.big,
);
if (buffered[i..].len < seq_bytes - 1) {
@branchHint(.unlikely);
assert(eos);
seq = undefined;
assert(i >= lit_end);
}
var seq: Seq = start_seq: {
if (c.history_end_unhashed) {
@branchHint(.unlikely);
while (i < lit_end) {
assert(i != 0);
i -|= seq_bytes - 1;
var seq: Seq = mem.readInt(
@Int(.unsigned, (seq_bytes - 1) * 8),
w.buffer[i..][0 .. seq_bytes - 1],
.big,
);
while (i < @min(start, hashable_len)) {
seq <<= 8;
seq |= buffered[i + (seq_bytes - 1)];
c.addHash(i, hash(seq));
i += 1;
}
if (i < start) {
@branchHint(.unlikely);
i = start;
assert(i >= hashable_len);
assert(i >= matching_end);
assert(is_flush);
break :start_seq undefined; // Unused
}
c.history_end_unhashed = false;
break :start_seq seq;
}
if (i >= hashable_len) {
@branchHint(.unlikely);
assert(i >= matching_end);
assert(is_flush);
break :start_seq undefined; // Unused
}
break :start_seq mem.readInt(
@Int(.unsigned, (seq_bytes - 1) * 8),
buffered[i..][0 .. seq_bytes - 1],
.big,
);
};
while (i < matching_end) {
var match_start = i;
seq <<= 8;
seq |= buffered[i + (seq_bytes - 1)];
@ -420,43 +494,50 @@ fn rebaseInner(w: *Writer, preserve: usize, capacity: usize, eos: bool) Writer.E
try c.outputBytes(buffered[last_unmatched..match_start]);
try c.outputMatch(@intCast(match.dist), @intCast(match.len - 3));
last_unmatched = match_start + match.len;
if (last_unmatched + seq_bytes >= w.end) {
@branchHint(.unlikely);
assert(eos);
i = undefined;
break;
}
while (true) {
while (i < hashable_len) {
seq <<= 8;
seq |= buffered[i + (seq_bytes - 1)];
_ = c.addHash(i, hash(seq));
c.addHash(i, hash(seq));
i += 1;
match_unadded -= 1;
if (match_unadded == 0) break;
} else {
@branchHint(.unlikely);
assert(is_flush);
// `c.history_end_unhashed` is set down below
break;
}
assert(i == match_start + match.len);
}
if (eos) {
i = undefined; // (from match hashing logic)
if (is_flush) {
try c.outputBytes(buffered[last_unmatched..]);
c.hasher.update(buffered[start..]);
try c.writeBlock(true);
return;
if (is_finish) {
try c.writeBlock(true);
return; // Other state does not need updated since the writer transitions to `.failing`
}
i = buffered.len;
c.history_end_unhashed = i != 0;
if (c.buffered_tokens.n != 0) {
try c.writeBlock(false);
}
} else {
try c.outputBytes(buffered[last_unmatched..i]);
c.hasher.update(buffered[start..i]);
}
try c.outputBytes(buffered[last_unmatched..i]);
c.hasher.update(buffered[start..i]);
const preserved = buffered[i - flate.history_len ..];
assert(preserved.len > @max(rebase_min_preserve, preserve));
c.history_len = @min(i, flate.history_len);
const preserved = buffered[i - c.history_len ..];
if (!is_flush) assert(preserved.len >= @max(rebase_min_preserve, preserve));
@memmove(w.buffer[0..preserved.len], preserved);
w.end = preserved.len;
c.has_history = true;
}
fn addHash(c: *Compress, i: usize, h: Hash) void {
@ -499,7 +580,7 @@ fn betterMatchLen(old: u16, prev: []const u8, bytes: []const u8) u16 {
assert(bytes.len >= token.min_length);
var i: u16 = 0;
const Block = std.meta.Int(.unsigned, @min(math.divCeil(
const Block = @Int(.unsigned, @min(math.divCeil(
comptime_int,
math.ceilPowerOfTwoAssert(usize, @bitSizeOf(usize)),
8,
@ -798,7 +879,6 @@ test buildClen {
fn writeBlock(c: *Compress, eos: bool) Writer.Error!void {
const toks = &c.buffered_tokens;
if (!eos) assert(toks.n == block_tokens);
assert(toks.lit_freqs[256] == 0);
toks.lit_freqs[256] = 1;
@ -1438,20 +1518,8 @@ fn testFuzzedCompressInput(fbufs: *const [2][65536]u8, smith: *std.testing.Smith
.chain = chain,
});
// It is ensured that more bytes are not written then this to ensure this run
// does not take too long and that `flate_buf` does not run out of space.
const flate_buf_blocks = flate_buf.len / block_tokens;
// Allow a max overhead of 64 bytes per block since the implementation does not gaurauntee it
// writes store blocks when optimal. This comes from taking less than 32 bytes to write an
// optimal dynamic block header of mostly bitlen 8 codes and the end of block literal plus
// `(65536 / 256) / 8`, which is is the maximum number of extra bytes from bitlen 9 codes. An
// extra 32 bytes is reserved on top of that for container headers and footers.
const max_size = flate_buf.len - (flate_buf_blocks * 64 + 32);
var max_output: usize = 32; // Headers / footer
while (!smith.eosWeightedSimple(7, 1)) {
const max_bytes = max_size -| expected_size;
if (max_bytes == 0) break;
const buffered = deflate_w.writer.buffered();
// Required for repeating patterns and since writing from `buffered` is illegal
var copy_buf: [512]u8 = undefined;
@ -1459,13 +1527,13 @@ fn testFuzzedCompressInput(fbufs: *const [2][65536]u8, smith: *std.testing.Smith
const bytes = bytes: switch (smith.valueRangeAtMost(
u2,
@intFromBool(buffered.len == 0),
2,
3,
)) {
0 => { // Copy
const start = smith.valueRangeLessThan(u32, 0, @intCast(buffered.len));
// Reuse the implementation's history; otherwise, our own would need maintained.
const from = buffered[start..];
const len = smith.valueRangeAtMost(u16, 1, @min(copy_buf.len, max_bytes));
const len = smith.valueRangeAtMost(u16, 1, copy_buf.len);
const history_bytes = from[0..@min(from.len, len)];
@memcpy(copy_buf[0..history_bytes.len], history_bytes);
@ -1485,7 +1553,7 @@ fn testFuzzedCompressInput(fbufs: *const [2][65536]u8, smith: *std.testing.Smith
.value(FreqBufIndex, .random, 1),
})
];
const len = smith.valueRangeAtMost(u32, 1, @min(fbuf.len, max_bytes));
const len = smith.valueRangeAtMost(u32, 1, fbuf.len);
const off = smith.valueRangeAtMost(u32, 0, @intCast(fbuf.len - len));
break :bytes fbuf[off..][0..len];
},
@ -1493,25 +1561,42 @@ fn testFuzzedCompressInput(fbufs: *const [2][65536]u8, smith: *std.testing.Smith
const rebaseable = bufsize - rebase_reserved_capacity;
const capacity = smith.valueRangeAtMost(u32, 1, rebaseable - rebase_min_preserve);
const preserve = smith.valueRangeAtMost(u32, 0, rebaseable - capacity);
try deflate_w.writer.rebase(preserve, capacity);
const failed = deflate_w.writer.rebase(preserve, capacity);
if (flate_w.buffered().len > max_output) return error.OverheadTooLarge;
failed catch return; // Wrote too much data and ran out of space
continue;
},
3 => { // Flush
max_output += 8; // Alignment data
const failed = deflate_w.writer.flush();
if (flate_w.buffered().len > max_output) return error.OverheadTooLarge;
failed catch return; // Wrote too much data and ran out of space
continue;
},
else => unreachable,
};
assert(bytes.len <= max_bytes);
try deflate_w.writer.writeAll(bytes);
// An overhead of 64 bytes is given for each block since the implementation does not
// gaurauntee it writes store blocks when optimal. This comes from taking less than 32
// bytes to write an optimal dynamic block header of mostly bitlen 8 codes and the end
// of block literal plus `(65536 / 256) / 8`, which is is the maximum number of extra
// bytes from bitlen 9 codes.
max_output += bytes.len + ((bytes.len + flate_buf.len - 1) / block_tokens) * 64;
const failed = deflate_w.writer.writeAll(bytes);
if (flate_w.buffered().len > max_output) return error.OverheadTooLarge;
failed catch return; // Wrote too much data and ran out of space
expected_hash.update(bytes);
expected_size += @intCast(bytes.len);
}
try deflate_w.writer.flush();
const failed = deflate_w.finish();
if (flate_w.buffered().len > max_output) return error.OverheadTooLarge;
failed catch return; // Wrote too much data and ran out of space
try testingCheckDecompressedMatches(flate_w.buffered(), expected_size, expected_hash);
}
/// Does not compress data
pub const Raw = struct {
/// After `flush` is called, all vtable calls with result in `error.WriteFailed.`
/// After `finish` is called, all vtable calls with result in `error.WriteFailed`.
writer: Writer,
output: *Writer,
hasher: flate.Container.Hasher,
@ -1654,8 +1739,12 @@ pub const Raw = struct {
}
fn flush(w: *Writer) Writer.Error!void {
defer w.* = .failing;
try Raw.rebaseInner(w, 0, w.buffer.len, true);
errdefer w.* = .failing;
try Raw.rebaseInner(w, 0, w.buffer.len, false);
}
fn finish(r: *Raw) Writer.Error!void {
try Raw.rebaseInner(&r.writer, 0, r.writer.buffer.len, true);
}
fn rebase(w: *Writer, preserve: usize, capacity: usize) Writer.Error!void {
@ -1899,19 +1988,21 @@ fn testFuzzedRawInput(data_buf: *const [4 * 65536]u8, smith: *std.testing.Smith)
const Op = packed struct {
drain: bool = false,
add_vec: bool = false,
rebase: bool = false,
rebase: enum(u2) { none, rebase, flush } = .none,
pub const drain_only: @This() = .{ .drain = true };
pub const add_vec_only: @This() = .{ .add_vec = true };
pub const add_vec_and_drain: @This() = .{ .add_vec = true, .drain = true };
pub const drain_and_rebase: @This() = .{ .drain = true, .rebase = true };
pub const drain_and_rebase: @This() = .{ .drain = true, .rebase = .rebase };
pub const drain_and_flush: @This() = .{ .drain = true, .rebase = .flush };
};
const is_eos = expected_size == max_size or smith.eosWeightedSimple(7, 1);
var op: Op = if (!is_eos) smith.valueWeighted(Op, &.{
.value(Op, .add_vec_only, 6),
.value(Op, .add_vec_only, 5),
.value(Op, .add_vec_and_drain, 1),
.value(Op, .drain_and_rebase, 1),
.value(Op, .drain_and_flush, 1),
}) else .drain_only;
if (op.add_vec) {
@ -1965,16 +2056,20 @@ fn testFuzzedRawInput(data_buf: *const [4 * 65536]u8, smith: *std.testing.Smith)
vecs_n = 0;
}
if (op.rebase) {
const capacity = smith.valueRangeAtMost(u32, 0, raw_buf_len);
const preserve = smith.valueRangeAtMost(u32, 0, raw_buf_len - capacity);
try raw.writer.rebase(preserve, capacity);
switch (op.rebase) {
.none => {},
.rebase => {
const capacity = smith.valueRangeAtMost(u32, 0, raw_buf_len);
const preserve = smith.valueRangeAtMost(u32, 0, raw_buf_len - capacity);
try raw.writer.rebase(preserve, capacity);
},
.flush => try raw.writer.flush(),
}
if (is_eos) break;
}
try raw.writer.flush();
try raw.finish();
try output.writer.flush();
try std.testing.expectEqual(.end, output.state);
@ -1997,6 +2092,7 @@ fn testFuzzedRawInput(data_buf: *const [4 * 65536]u8, smith: *std.testing.Smith)
/// Only performs huffman compression on data, does no matching.
pub const Huffman = struct {
/// After `finish` is called, all vtable calls with result in `error.WriteFailed`.
writer: Writer,
bit_writer: BitWriter,
hasher: flate.Container.Hasher,
@ -2026,12 +2122,6 @@ pub const Huffman = struct {
}
fn drain(w: *Writer, data: []const []const u8, splat: usize) Writer.Error!usize {
{
//std.debug.print("drain {} (buffered)", .{w.buffered().len});
//for (data) |d| std.debug.print("\n\t+ {}", .{d.len});
//std.debug.print(" x {}\n\n", .{splat});
}
const h: *Huffman = @fieldParentPtr("writer", w);
const min_block = @min(w.buffer.len, max_tokens);
const pattern = data[data.len - 1];
@ -2238,9 +2328,15 @@ pub const Huffman = struct {
}
fn flush(w: *Writer) Writer.Error!void {
defer w.* = .failing;
errdefer w.* = .failing;
const h: *Huffman = @fieldParentPtr("writer", w);
try Huffman.rebaseInner(w, 0, w.buffer.len, true);
try Huffman.rebaseInner(w, 0, w.buffer.len, false);
try h.bit_writer.byteAlignBlocks();
}
fn finish(h: *Huffman) Writer.Error!void {
defer h.writer = .failing;
try Huffman.rebaseInner(&h.writer, 0, h.writer.buffer.len, true);
try h.bit_writer.output.rebase(0, 1);
h.bit_writer.byteAlign();
try h.hasher.writeFooter(h.bit_writer.output);
@ -2359,9 +2455,6 @@ pub const Huffman = struct {
break :n stored_align_bits + @as(u32, 32) + @as(u32, bytes) * 8;
};
//std.debug.print("@ {}{{{}}} ", .{ h.bit_writer.output.end, h.bit_writer.buffered_n });
//std.debug.print("#{} -> s {} f {} d {}\n", .{ bytes, stored_bitsize, fixed_bitsize, dynamic_bitsize });
if (stored_bitsize <= @min(dynamic_bitsize, fixed_bitsize)) {
try h.bit_writer.write(BlockHeader.int(.{ .kind = .stored, .final = eos }), 3);
try h.bit_writer.output.rebase(0, 5);
@ -2434,19 +2527,21 @@ fn testFuzzedHuffmanInput(fbufs: *const [2][65536]u8, smith: *std.testing.Smith)
const Op = packed struct {
drain: bool = false,
add_vec: bool = false,
rebase: bool = false,
rebase: enum(u2) { none, rebase, flush } = .none,
pub const drain_only: @This() = .{ .drain = true };
pub const add_vec_only: @This() = .{ .add_vec = true };
pub const add_vec_and_drain: @This() = .{ .add_vec = true, .drain = true };
pub const drain_and_rebase: @This() = .{ .drain = true, .rebase = true };
pub const drain_and_rebase: @This() = .{ .drain = true, .rebase = .rebase };
pub const drain_and_flush: @This() = .{ .drain = true, .rebase = .flush };
};
const is_eos = expected_size == max_size or smith.eosWeightedSimple(7, 1);
var op: Op = if (!is_eos) smith.valueWeighted(Op, &.{
.value(Op, .add_vec_only, 6),
.value(Op, .add_vec_only, 5),
.value(Op, .add_vec_and_drain, 1),
.value(Op, .drain_and_rebase, 1),
.value(Op, .drain_and_flush, 1),
}) else .drain_only;
if (op.add_vec) {
@ -2517,7 +2612,7 @@ fn testFuzzedHuffmanInput(fbufs: *const [2][65536]u8, smith: *std.testing.Smith)
vecs_n = 0;
}
if (op.rebase) {
if (op.rebase != .none) {
const capacity = smith.valueRangeAtMost(u32, 0, h_buf_len);
const preserve = smith.valueRangeAtMost(u32, 0, h_buf_len - capacity);
@ -2525,9 +2620,14 @@ fn testFuzzedHuffmanInput(fbufs: *const [2][65536]u8, smith: *std.testing.Smith)
h.writer.buffered().len,
flate_w.buffered().len,
false,
);
h.writer.rebase(preserve, capacity) catch
return if (max_space <= flate_w.buffer.len) error.OverheadTooLarge else {};
) + @as(usize, 8) * @intFromBool(op.rebase == .flush); // Overhead from byte alignment
switch (op.rebase) {
.none => unreachable,
.rebase => h.writer.rebase(preserve, capacity) catch
return if (max_space <= flate_w.buffer.len) error.OverheadTooLarge else {},
.flush => h.writer.flush() catch
return if (max_space <= flate_w.buffer.len) error.OverheadTooLarge else {},
}
if (flate_w.buffered().len > max_space) return error.OverheadTooLarge;
}
@ -2539,8 +2639,7 @@ fn testFuzzedHuffmanInput(fbufs: *const [2][65536]u8, smith: *std.testing.Smith)
flate_w.buffered().len,
true,
);
h.writer.flush() catch
return if (max_space <= flate_w.buffer.len) error.OverheadTooLarge else {};
h.finish() catch return if (max_space <= flate_w.buffer.len) error.OverheadTooLarge else {};
if (flate_w.buffered().len > max_space) return error.OverheadTooLarge;
try testingCheckDecompressedMatches(flate_w.buffered(), expected_size, expected_hash);

View file

@ -443,7 +443,7 @@ pub const JobQueue = struct {
// intentionally omitting the pointless trailer
//try archiver.finish();
compress.writer.flush() catch |err| switch (err) {
compress.finish() catch |err| switch (err) {
error.WriteFailed => return file_writer.err.?,
};
try file_writer.flush();