std.Io.File.MultiReader: implementation fixes

This commit is contained in:
Andrew Kelley 2026-01-13 21:23:44 -08:00
parent 12cfc96e1b
commit e56563ce3f
4 changed files with 103 additions and 96 deletions

View file

@ -381,13 +381,15 @@ pub fn addError(step: *Step, comptime fmt: []const u8, args: anytype) error{OutO
pub const ZigProcess = struct {
child: std.process.Child,
multi_reader_buffer: Io.File.MultiReader.Buffer(2),
multi_reader: Io.File.MultiReader,
progress_ipc_fd: if (std.Progress.have_ipc) ?std.posix.fd_t else void,
pub const StreamEnum = enum { stdout, stderr };
pub fn deinit(zp: *ZigProcess, gpa: Allocator, io: Io) void {
_ = gpa;
pub fn deinit(zp: *ZigProcess, io: Io) void {
zp.child.kill(io);
zp.multi_reader.deinit();
zp.* = undefined;
}
};
@ -460,14 +462,18 @@ pub fn evalZigProcess(
.request_resource_usage_statistics = true,
.progress_node = prog_node,
}) catch |err| return s.fail("failed to spawn zig compiler {s}: {t}", .{ argv[0], err });
defer if (!watch) zp.child.kill(io);
zp.* = .{
.child = zp.child,
.multi_reader_buffer = undefined,
.multi_reader = undefined,
.progress_ipc_fd = if (std.Progress.have_ipc) prog_node.getIpcFd() else {},
};
zp.multi_reader.init(gpa, io, zp.multi_reader_buffer.toStreams(), &.{
zp.child.stdout.?, zp.child.stderr.?,
});
if (watch) s.setZigProcess(zp);
defer if (!watch) zp.deinit(gpa, io);
defer if (!watch) zp.deinit(io);
const result = try zigProcessUpdate(s, zp, watch, web_server, gpa);
@ -534,18 +540,18 @@ fn zigProcessUpdate(s: *Step, zp: *ZigProcess, watch: bool, web_server: ?*Build.
var result: ?Path = null;
var multi_reader_buffer: Io.File.MultiReader.Buffer(2) = undefined;
var multi_reader: Io.File.MultiReader = undefined;
multi_reader.init(gpa, io, multi_reader_buffer.toStreams(), &.{ zp.child.stdout.?, zp.child.stderr.? });
defer multi_reader.deinit();
const stdout = multi_reader.reader(0);
const stderr = multi_reader.reader(1);
const stdout = zp.multi_reader.fileReader(0);
while (true) {
const Header = std.zig.Server.Message.Header;
const header = try stdout.takeStruct(Header, .little);
const body = try stdout.take(header.bytes_len);
const header = stdout.interface.takeStruct(Header, .little) catch |err| switch (err) {
error.EndOfStream => break,
error.ReadFailed => return stdout.err.?,
};
const body = stdout.interface.take(header.bytes_len) catch |err| switch (err) {
error.EndOfStream => |e| return e,
error.ReadFailed => return stdout.err.?,
};
switch (header.tag) {
.zig_version => {
if (!std.mem.eql(u8, builtin.zig_version_string, body)) {
@ -636,7 +642,7 @@ fn zigProcessUpdate(s: *Step, zp: *ZigProcess, watch: bool, web_server: ?*Build.
s.result_duration_ns = timer.read();
const stderr_contents = stderr.buffered();
const stderr_contents = zp.multi_reader.reader(1).buffered();
if (stderr_contents.len > 0) {
try s.result_error_msgs.append(arena, try arena.dupe(u8, stderr_contents));
}

View file

@ -1695,11 +1695,9 @@ fn evalZigTest(
// The runner unexpectedly closed a stdio pipe, which means a crash. Make sure we've captured
// all available stderr to make our error output as useful as possible.
const stderr_fr = multi_reader.fileReader(1);
while (true) {
stderr_fr.interface.fillMore() catch |e| switch (e) {
error.ReadFailed => return stderr_fr.err.?,
error.EndOfStream => break,
};
while (stderr_fr.interface.fillMore()) |_| {} else |e| switch (e) {
error.ReadFailed => return stderr_fr.err.?,
error.EndOfStream => {},
}
run.step.result_stderr = try arena.dupe(u8, stderr_fr.interface.buffered());
@ -1905,7 +1903,7 @@ fn waitZigTest(
.clock = .awake,
} } else .none;
multi_reader.fill(timeout) catch |err| switch (err) {
multi_reader.fill(64, timeout) catch |err| switch (err) {
error.Timeout, error.EndOfStream => return .{ .no_poll = .{
.active_test_index = active_test_index,
.ns_elapsed = if (timer) |*t| t.read() else 0,
@ -2227,7 +2225,7 @@ fn evalGeneric(run: *Run, spawn_options: process.SpawnOptions) !EvalGenericResul
const stdout_reader = multi_reader.reader(0);
const stderr_reader = multi_reader.reader(1);
while (multi_reader.fill(.none)) |_| {
while (multi_reader.fill(64, .none)) |_| {
if (run.stdio_limit.toInt()) |limit| {
if (stdout_reader.buffered().len > limit)
return error.StdoutStreamTooLong;

View file

@ -28,18 +28,23 @@ pub const Streams = extern struct {
len: u32,
pub fn contexts(s: *Streams) []Context {
_ = s;
@panic("TODO");
const base: usize = @intFromPtr(s);
const ptr: [*]Context = @ptrFromInt(std.mem.alignForward(usize, base + @sizeOf(Streams), @alignOf(Context)));
return ptr[0..s.len];
}
pub fn ring(s: *Streams) []u32 {
_ = s;
@panic("TODO");
const prev = contexts(s);
const end = prev.ptr + prev.len;
const ptr: [*]u32 = @ptrFromInt(std.mem.alignForward(usize, @intFromPtr(end), @alignOf(u32)));
return ptr[0..s.len];
}
pub fn operations(s: *Streams) []Io.Operation {
_ = s;
@panic("TODO");
const prev = ring(s);
const end = prev.ptr + prev.len;
const ptr: [*]Io.Operation = @ptrFromInt(std.mem.alignForward(usize, @intFromPtr(end), @alignOf(Io.Operation)));
return ptr[0..s.len];
}
};
@ -51,6 +56,7 @@ pub fn Buffer(comptime n: usize) type {
operations: [n][@sizeOf(Io.Operation)]u8 align(@alignOf(Io.Operation)),
pub fn toStreams(b: *@This()) *Streams {
b.len = n;
return @ptrCast(b);
}
};
@ -157,35 +163,87 @@ fn stream(r: *Io.Reader, w: *Io.Writer, limit: Io.Limit) Io.Reader.StreamError!u
_ = w;
const fr: *File.Reader = @alignCast(@fieldParentPtr("interface", r));
const context: *Context = @fieldParentPtr("fr", fr);
const mr = context.mr;
return fillUntimed(mr, context);
try fillUntimed(context, 1);
return 0;
}
fn discard(r: *Io.Reader, limit: Io.Limit) Io.Reader.Error!usize {
_ = limit;
const fr: *File.Reader = @alignCast(@fieldParentPtr("interface", r));
const context: *Context = @fieldParentPtr("fr", fr);
const mr = context.mr;
return fillUntimed(mr, context);
try fillUntimed(context, 1);
return 0;
}
fn readVec(r: *Io.Reader, data: [][]u8) Io.Reader.Error!usize {
_ = data;
const fr: *File.Reader = @alignCast(@fieldParentPtr("interface", r));
const context: *Context = @fieldParentPtr("fr", fr);
const mr = context.mr;
return fillUntimed(mr, context);
try fillUntimed(context, 1);
return 0;
}
fn rebase(r: *Io.Reader, capacity: usize) Io.Reader.RebaseError!void {
const fr: *File.Reader = @alignCast(@fieldParentPtr("interface", r));
const context: *Context = @fieldParentPtr("fr", fr);
const mr = context.mr;
try fillUntimed(context, capacity);
}
return rebaseGrowing(mr, context, capacity) catch |err| {
context.err = err;
return error.ReadFailed;
fn fillUntimed(context: *Context, capacity: usize) Io.Reader.Error!void {
fill(context.mr, capacity, .none) catch |err| switch (err) {
error.Timeout, error.UnsupportedClock => unreachable,
error.Canceled, error.ConcurrencyUnavailable => |e| {
context.err = e;
return error.ReadFailed;
},
error.EndOfStream => |e| return e,
};
if (context.err != null) return error.ReadFailed;
if (context.eos) return error.EndOfStream;
}
pub const FillError = Io.Batch.WaitError || error{
/// `fill` was called when all streams already have failed or reached the
/// end.
EndOfStream,
};
/// Wait until at least one stream receives more data.
pub fn fill(mr: *MultiReader, unused_capacity: usize, timeout: Io.Timeout) FillError!void {
const contexts = mr.streams.contexts();
const operations = mr.streams.operations();
const io = contexts[0].fr.io;
var any_completed = false;
try mr.batch.wait(io, timeout);
while (mr.batch.next()) |i| {
any_completed = true;
const context = &contexts[i];
const operation = &operations[i];
const n = operation.file_read_streaming.status.result catch |err| {
context.err = err;
continue;
};
if (n == 0) {
context.eos = true;
continue;
}
const r = &context.fr.interface;
r.end += n;
if (r.buffer.len - r.end < unused_capacity) {
rebaseGrowing(mr, context, r.bufferedLen() + unused_capacity) catch |err| {
context.err = err;
continue;
};
assert(r.seek == 0);
}
context.vec[0] = r.buffer[r.end..];
operation.file_read_streaming.status = .{ .unstarted = {} };
mr.batch.add(i);
}
if (!any_completed) return error.EndOfStream;
}
fn rebaseGrowing(mr: *MultiReader, context: *Context, capacity: usize) Allocator.Error!void {
@ -209,65 +267,9 @@ fn rebaseGrowing(mr: *MultiReader, context: *Context, capacity: usize) Allocator
const data = r.buffer[r.seek..r.end];
const new = try gpa.alloc(u8, adjusted_capacity);
@memcpy(new[0..data.len], data);
gpa.free(r.buffer);
r.buffer = new;
r.seek = 0;
r.end = data.len;
}
}
pub const FillError = Io.Batch.WaitError || error{
/// `fill` was called when all streams already have failed or reached the
/// end.
EndOfStream,
};
/// Wait until at least one stream receives more data.
pub fn fill(mr: *MultiReader, timeout: Io.Timeout) FillError!void {
const contexts = mr.streams.contexts();
const operations = mr.streams.operations();
const io = contexts[0].fr.io;
var any_completed = false;
try mr.batch.wait(io, timeout);
while (mr.batch.next()) |i| {
any_completed = true;
const context = &contexts[i];
const operation = &operations[i];
const n = operation.file_read_streaming.status.result catch |err| {
context.err = err;
continue;
};
if (n == 0) {
context.eos = true;
continue;
}
const r = &context.fr.interface;
r.end += n;
if (r.buffer.len - r.end == 0) {
rebaseGrowing(mr, context, r.bufferedLen() + 1) catch |err| {
context.err = err;
continue;
};
assert(r.seek == 0);
context.vec[0] = r.buffer;
}
operation.file_read_streaming.status = .{ .unstarted = {} };
mr.batch.add(i);
}
if (!any_completed) return error.EndOfStream;
}
fn fillUntimed(mr: *MultiReader, context: *Context) Io.Reader.Error!usize {
fill(mr, .none) catch |err| switch (err) {
error.Timeout, error.UnsupportedClock => unreachable,
error.Canceled, error.ConcurrencyUnavailable => |e| {
context.err = e;
return error.ReadFailed;
},
error.EndOfStream => |e| return e,
};
if (context.err != null) return error.ReadFailed;
if (context.eos) return error.EndOfStream;
return 0;
}

View file

@ -336,10 +336,11 @@ pub fn init(input: *Reader, output: *Writer, options: Options) InitError!Client
// Ensure the input buffer pointer is stable in this scope.
input.rebase(tls.max_ciphertext_record_len) catch |err| switch (err) {
error.EndOfStream => {}, // We have assurance the remainder of stream can be buffered.
error.ReadFailed => |e| return e,
};
const record_header = input.peek(tls.record_header_len) catch |err| switch (err) {
error.EndOfStream => return error.TlsConnectionTruncated,
error.ReadFailed => return error.ReadFailed,
error.ReadFailed => |e| return e,
};
const record_ct = input.takeEnumNonexhaustive(tls.ContentType, .big) catch unreachable; // already peeked
input.toss(2); // legacy_version