Revert "std.process.Child: rewrite using concurrent"

This reverts commit 76e1ba8f490812c6e2ebf6f6becd89a71275d21e.
This commit is contained in:
Andrew Kelley 2026-01-08 16:55:51 -08:00
parent 87408f8add
commit e2a266e744
2 changed files with 56 additions and 26 deletions

View file

@ -455,7 +455,6 @@ pub fn spawnPath(io: Io, dir: Io.Dir, options: SpawnOptions) SpawnError!Child {
pub const RunError = CurrentPathError || posix.ReadError || SpawnError || posix.PollError || error{
StreamTooLong,
ConcurrencyUnavailable,
};
pub const RunOptions = struct {

View file

@ -125,15 +125,14 @@ pub fn wait(child: *Child, io: Io) WaitError!Term {
return io.vtable.childWait(io.userdata, child);
}
pub const CollectOutputError = error{
StreamTooLong,
ConcurrencyUnavailable,
} || Allocator.Error || Io.File.Reader.Error;
pub const CollectOutputError = error{StreamTooLong} || Allocator.Error || Io.File.Reader.Error;
pub const CollectOutputOptions = struct {
stdout: *std.ArrayList(u8),
stderr: *std.ArrayList(u8),
allocator: Allocator,
/// Used for `stdout` and `stderr`. If not provided, only the existing
/// capacity will be used.
allocator: ?Allocator = null,
stdout_limit: Io.Limit = .unlimited,
stderr_limit: Io.Limit = .unlimited,
};
@ -145,24 +144,56 @@ pub const CollectOutputOptions = struct {
/// The process must have been started with stdout and stderr set to
/// `process.SpawnOptions.StdIo.pipe`.
pub fn collectOutput(child: *const Child, io: Io, options: CollectOutputOptions) CollectOutputError!void {
var stdout = try io.concurrent(collectStream, .{
io, options.allocator, child.stdout.?, options.stdout, options.stdout_limit,
});
defer stdout.cancel(io) catch {};
var stderr = try io.concurrent(collectStream, .{
io, options.allocator, child.stderr.?, options.stderr, options.stderr_limit,
});
defer stderr.cancel(io) catch {};
try stdout.await(io);
try stderr.await(io);
}
fn collectStream(io: Io, gpa: Allocator, file: File, list: *std.ArrayList(u8), limit: Io.Limit) CollectOutputError!void {
var fr = file.readerStreaming(io, &.{});
fr.interface.appendRemaining(gpa, list, limit) catch |err| switch (err) {
error.ReadFailed => return fr.err.?,
else => |e| return e,
};
const files: [2]Io.File = .{ child.stdout.?, child.stderr.? };
const lists: [2]*std.ArrayList(u8) = .{ options.stdout, options.stderr };
const limits: [2]Io.Limit = .{ options.stdout_limit, options.stderr_limit };
var dones: [2]bool = .{ false, false };
var reads: [2]Io.Operation = undefined;
var vecs: [2][1][]u8 = undefined;
while (true) {
for (&reads, &lists, &files, dones, &vecs) |*read, list, file, done, *vec| {
if (done) {
read.* = .noop;
continue;
}
if (options.allocator) |gpa| try list.ensureUnusedCapacity(gpa, 1);
const cap = list.unusedCapacitySlice();
if (cap.len == 0) return error.StreamTooLong;
vec[0] = cap;
read.* = .{ .file_read_streaming = .{
.file = file,
.data = vec,
.nonblocking = true,
.result = undefined,
} };
}
var all_done = true;
var any_canceled = false;
var other_err: (error{StreamTooLong} || Io.File.Reader.Error)!void = {};
io.vtable.operate(io.userdata, &reads);
for (&reads, &lists, &limits, &dones) |*read, list, limit, *done| {
if (done.*) continue;
const n = read.file_read_streaming.result catch |err| switch (err) {
error.Canceled => {
any_canceled = true;
continue;
},
error.WouldBlock => continue,
else => |e| {
other_err = e;
continue;
},
};
if (n == 0) {
done.* = true;
} else {
all_done = false;
}
list.items.len += n;
if (list.items.len > @intFromEnum(limit)) other_err = error.StreamTooLong;
}
if (any_canceled) return error.Canceled;
try other_err;
if (all_done) return;
}
}