From e56563ce3fb7ae2fb13f66ba6045ffb1f828ae08 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Tue, 13 Jan 2026 21:23:44 -0800 Subject: [PATCH] std.Io.File.MultiReader: implementation fixes --- lib/std/Build/Step.zig | 34 +++++--- lib/std/Build/Step/Run.zig | 12 ++- lib/std/Io/File/MultiReader.zig | 150 ++++++++++++++++---------------- lib/std/crypto/tls/Client.zig | 3 +- 4 files changed, 103 insertions(+), 96 deletions(-) diff --git a/lib/std/Build/Step.zig b/lib/std/Build/Step.zig index 40845f75c3..37fc2ca023 100644 --- a/lib/std/Build/Step.zig +++ b/lib/std/Build/Step.zig @@ -381,13 +381,15 @@ pub fn addError(step: *Step, comptime fmt: []const u8, args: anytype) error{OutO pub const ZigProcess = struct { child: std.process.Child, + multi_reader_buffer: Io.File.MultiReader.Buffer(2), + multi_reader: Io.File.MultiReader, progress_ipc_fd: if (std.Progress.have_ipc) ?std.posix.fd_t else void, pub const StreamEnum = enum { stdout, stderr }; - pub fn deinit(zp: *ZigProcess, gpa: Allocator, io: Io) void { - _ = gpa; + pub fn deinit(zp: *ZigProcess, io: Io) void { zp.child.kill(io); + zp.multi_reader.deinit(); zp.* = undefined; } }; @@ -460,14 +462,18 @@ pub fn evalZigProcess( .request_resource_usage_statistics = true, .progress_node = prog_node, }) catch |err| return s.fail("failed to spawn zig compiler {s}: {t}", .{ argv[0], err }); - defer if (!watch) zp.child.kill(io); zp.* = .{ .child = zp.child, + .multi_reader_buffer = undefined, + .multi_reader = undefined, .progress_ipc_fd = if (std.Progress.have_ipc) prog_node.getIpcFd() else {}, }; + zp.multi_reader.init(gpa, io, zp.multi_reader_buffer.toStreams(), &.{ + zp.child.stdout.?, zp.child.stderr.?, + }); if (watch) s.setZigProcess(zp); - defer if (!watch) zp.deinit(gpa, io); + defer if (!watch) zp.deinit(io); const result = try zigProcessUpdate(s, zp, watch, web_server, gpa); @@ -534,18 +540,18 @@ fn zigProcessUpdate(s: *Step, zp: *ZigProcess, watch: bool, web_server: ?*Build. var result: ?Path = null; - var multi_reader_buffer: Io.File.MultiReader.Buffer(2) = undefined; - var multi_reader: Io.File.MultiReader = undefined; - multi_reader.init(gpa, io, multi_reader_buffer.toStreams(), &.{ zp.child.stdout.?, zp.child.stderr.? }); - defer multi_reader.deinit(); - - const stdout = multi_reader.reader(0); - const stderr = multi_reader.reader(1); + const stdout = zp.multi_reader.fileReader(0); while (true) { const Header = std.zig.Server.Message.Header; - const header = try stdout.takeStruct(Header, .little); - const body = try stdout.take(header.bytes_len); + const header = stdout.interface.takeStruct(Header, .little) catch |err| switch (err) { + error.EndOfStream => break, + error.ReadFailed => return stdout.err.?, + }; + const body = stdout.interface.take(header.bytes_len) catch |err| switch (err) { + error.EndOfStream => |e| return e, + error.ReadFailed => return stdout.err.?, + }; switch (header.tag) { .zig_version => { if (!std.mem.eql(u8, builtin.zig_version_string, body)) { @@ -636,7 +642,7 @@ fn zigProcessUpdate(s: *Step, zp: *ZigProcess, watch: bool, web_server: ?*Build. s.result_duration_ns = timer.read(); - const stderr_contents = stderr.buffered(); + const stderr_contents = zp.multi_reader.reader(1).buffered(); if (stderr_contents.len > 0) { try s.result_error_msgs.append(arena, try arena.dupe(u8, stderr_contents)); } diff --git a/lib/std/Build/Step/Run.zig b/lib/std/Build/Step/Run.zig index 82c15531d0..c74286f61b 100644 --- a/lib/std/Build/Step/Run.zig +++ b/lib/std/Build/Step/Run.zig @@ -1695,11 +1695,9 @@ fn evalZigTest( // The runner unexpectedly closed a stdio pipe, which means a crash. Make sure we've captured // all available stderr to make our error output as useful as possible. const stderr_fr = multi_reader.fileReader(1); - while (true) { - stderr_fr.interface.fillMore() catch |e| switch (e) { - error.ReadFailed => return stderr_fr.err.?, - error.EndOfStream => break, - }; + while (stderr_fr.interface.fillMore()) |_| {} else |e| switch (e) { + error.ReadFailed => return stderr_fr.err.?, + error.EndOfStream => {}, } run.step.result_stderr = try arena.dupe(u8, stderr_fr.interface.buffered()); @@ -1905,7 +1903,7 @@ fn waitZigTest( .clock = .awake, } } else .none; - multi_reader.fill(timeout) catch |err| switch (err) { + multi_reader.fill(64, timeout) catch |err| switch (err) { error.Timeout, error.EndOfStream => return .{ .no_poll = .{ .active_test_index = active_test_index, .ns_elapsed = if (timer) |*t| t.read() else 0, @@ -2227,7 +2225,7 @@ fn evalGeneric(run: *Run, spawn_options: process.SpawnOptions) !EvalGenericResul const stdout_reader = multi_reader.reader(0); const stderr_reader = multi_reader.reader(1); - while (multi_reader.fill(.none)) |_| { + while (multi_reader.fill(64, .none)) |_| { if (run.stdio_limit.toInt()) |limit| { if (stdout_reader.buffered().len > limit) return error.StdoutStreamTooLong; diff --git a/lib/std/Io/File/MultiReader.zig b/lib/std/Io/File/MultiReader.zig index d4024ef914..a1ea42a7d8 100644 --- a/lib/std/Io/File/MultiReader.zig +++ b/lib/std/Io/File/MultiReader.zig @@ -28,18 +28,23 @@ pub const Streams = extern struct { len: u32, pub fn contexts(s: *Streams) []Context { - _ = s; - @panic("TODO"); + const base: usize = @intFromPtr(s); + const ptr: [*]Context = @ptrFromInt(std.mem.alignForward(usize, base + @sizeOf(Streams), @alignOf(Context))); + return ptr[0..s.len]; } pub fn ring(s: *Streams) []u32 { - _ = s; - @panic("TODO"); + const prev = contexts(s); + const end = prev.ptr + prev.len; + const ptr: [*]u32 = @ptrFromInt(std.mem.alignForward(usize, @intFromPtr(end), @alignOf(u32))); + return ptr[0..s.len]; } pub fn operations(s: *Streams) []Io.Operation { - _ = s; - @panic("TODO"); + const prev = ring(s); + const end = prev.ptr + prev.len; + const ptr: [*]Io.Operation = @ptrFromInt(std.mem.alignForward(usize, @intFromPtr(end), @alignOf(Io.Operation))); + return ptr[0..s.len]; } }; @@ -51,6 +56,7 @@ pub fn Buffer(comptime n: usize) type { operations: [n][@sizeOf(Io.Operation)]u8 align(@alignOf(Io.Operation)), pub fn toStreams(b: *@This()) *Streams { + b.len = n; return @ptrCast(b); } }; @@ -157,35 +163,87 @@ fn stream(r: *Io.Reader, w: *Io.Writer, limit: Io.Limit) Io.Reader.StreamError!u _ = w; const fr: *File.Reader = @alignCast(@fieldParentPtr("interface", r)); const context: *Context = @fieldParentPtr("fr", fr); - const mr = context.mr; - return fillUntimed(mr, context); + try fillUntimed(context, 1); + return 0; } fn discard(r: *Io.Reader, limit: Io.Limit) Io.Reader.Error!usize { _ = limit; const fr: *File.Reader = @alignCast(@fieldParentPtr("interface", r)); const context: *Context = @fieldParentPtr("fr", fr); - const mr = context.mr; - return fillUntimed(mr, context); + try fillUntimed(context, 1); + return 0; } fn readVec(r: *Io.Reader, data: [][]u8) Io.Reader.Error!usize { _ = data; const fr: *File.Reader = @alignCast(@fieldParentPtr("interface", r)); const context: *Context = @fieldParentPtr("fr", fr); - const mr = context.mr; - return fillUntimed(mr, context); + try fillUntimed(context, 1); + return 0; } fn rebase(r: *Io.Reader, capacity: usize) Io.Reader.RebaseError!void { const fr: *File.Reader = @alignCast(@fieldParentPtr("interface", r)); const context: *Context = @fieldParentPtr("fr", fr); - const mr = context.mr; + try fillUntimed(context, capacity); +} - return rebaseGrowing(mr, context, capacity) catch |err| { - context.err = err; - return error.ReadFailed; +fn fillUntimed(context: *Context, capacity: usize) Io.Reader.Error!void { + fill(context.mr, capacity, .none) catch |err| switch (err) { + error.Timeout, error.UnsupportedClock => unreachable, + error.Canceled, error.ConcurrencyUnavailable => |e| { + context.err = e; + return error.ReadFailed; + }, + error.EndOfStream => |e| return e, }; + if (context.err != null) return error.ReadFailed; + if (context.eos) return error.EndOfStream; +} + +pub const FillError = Io.Batch.WaitError || error{ + /// `fill` was called when all streams already have failed or reached the + /// end. + EndOfStream, +}; + +/// Wait until at least one stream receives more data. +pub fn fill(mr: *MultiReader, unused_capacity: usize, timeout: Io.Timeout) FillError!void { + const contexts = mr.streams.contexts(); + const operations = mr.streams.operations(); + const io = contexts[0].fr.io; + var any_completed = false; + + try mr.batch.wait(io, timeout); + + while (mr.batch.next()) |i| { + any_completed = true; + const context = &contexts[i]; + const operation = &operations[i]; + const n = operation.file_read_streaming.status.result catch |err| { + context.err = err; + continue; + }; + if (n == 0) { + context.eos = true; + continue; + } + const r = &context.fr.interface; + r.end += n; + if (r.buffer.len - r.end < unused_capacity) { + rebaseGrowing(mr, context, r.bufferedLen() + unused_capacity) catch |err| { + context.err = err; + continue; + }; + assert(r.seek == 0); + } + context.vec[0] = r.buffer[r.end..]; + operation.file_read_streaming.status = .{ .unstarted = {} }; + mr.batch.add(i); + } + + if (!any_completed) return error.EndOfStream; } fn rebaseGrowing(mr: *MultiReader, context: *Context, capacity: usize) Allocator.Error!void { @@ -209,65 +267,9 @@ fn rebaseGrowing(mr: *MultiReader, context: *Context, capacity: usize) Allocator const data = r.buffer[r.seek..r.end]; const new = try gpa.alloc(u8, adjusted_capacity); @memcpy(new[0..data.len], data); + gpa.free(r.buffer); + r.buffer = new; r.seek = 0; r.end = data.len; } } - -pub const FillError = Io.Batch.WaitError || error{ - /// `fill` was called when all streams already have failed or reached the - /// end. - EndOfStream, -}; - -/// Wait until at least one stream receives more data. -pub fn fill(mr: *MultiReader, timeout: Io.Timeout) FillError!void { - const contexts = mr.streams.contexts(); - const operations = mr.streams.operations(); - const io = contexts[0].fr.io; - var any_completed = false; - - try mr.batch.wait(io, timeout); - - while (mr.batch.next()) |i| { - any_completed = true; - const context = &contexts[i]; - const operation = &operations[i]; - const n = operation.file_read_streaming.status.result catch |err| { - context.err = err; - continue; - }; - if (n == 0) { - context.eos = true; - continue; - } - const r = &context.fr.interface; - r.end += n; - if (r.buffer.len - r.end == 0) { - rebaseGrowing(mr, context, r.bufferedLen() + 1) catch |err| { - context.err = err; - continue; - }; - assert(r.seek == 0); - context.vec[0] = r.buffer; - } - operation.file_read_streaming.status = .{ .unstarted = {} }; - mr.batch.add(i); - } - - if (!any_completed) return error.EndOfStream; -} - -fn fillUntimed(mr: *MultiReader, context: *Context) Io.Reader.Error!usize { - fill(mr, .none) catch |err| switch (err) { - error.Timeout, error.UnsupportedClock => unreachable, - error.Canceled, error.ConcurrencyUnavailable => |e| { - context.err = e; - return error.ReadFailed; - }, - error.EndOfStream => |e| return e, - }; - if (context.err != null) return error.ReadFailed; - if (context.eos) return error.EndOfStream; - return 0; -} diff --git a/lib/std/crypto/tls/Client.zig b/lib/std/crypto/tls/Client.zig index 44a73c344a..eeeb7d0537 100644 --- a/lib/std/crypto/tls/Client.zig +++ b/lib/std/crypto/tls/Client.zig @@ -336,10 +336,11 @@ pub fn init(input: *Reader, output: *Writer, options: Options) InitError!Client // Ensure the input buffer pointer is stable in this scope. input.rebase(tls.max_ciphertext_record_len) catch |err| switch (err) { error.EndOfStream => {}, // We have assurance the remainder of stream can be buffered. + error.ReadFailed => |e| return e, }; const record_header = input.peek(tls.record_header_len) catch |err| switch (err) { error.EndOfStream => return error.TlsConnectionTruncated, - error.ReadFailed => return error.ReadFailed, + error.ReadFailed => |e| return e, }; const record_ct = input.takeEnumNonexhaustive(tls.ContentType, .big) catch unreachable; // already peeked input.toss(2); // legacy_version