std.Io: proof-of-concept "operations" API

This commit shows a proof-of-concept direction for std.Io.VTable to go,
which is to have general support for batching, timeouts, and
non-blocking.

I'm not sure if this is a good idea or not so I'm putting it up for
scrutiny.

This commit introduces `std.Io.operate`, `std.Io.Operation`, and
implements it experimentally for `FileReadStreaming`.

In `std.Io.Threaded`, the implementation is based on poll().

This commit shows how it can be used in `std.process.run` to collect
both stdout and stderr in a single-threaded program using
`std.Threaded.Io`.

It also demonstrates how to upgrade code that was previously using
`std.Io.poll` (*not* integrated with the interface!) using concurrency.
This may not be ideal since it makes the build runner no longer support
single-threaded mode. There is still a needed abstraction for
conveniently reading multiple File streams concurrently without
io.concurrent, but this commit demonstrates that such an API can be
built on top of the new `std.Io.operate` functionality.
This commit is contained in:
Andrew Kelley 2026-01-07 18:38:19 -08:00
parent bd4b6d8b14
commit 0a0ecc4fb1
8 changed files with 272 additions and 72 deletions

View file

@ -381,10 +381,15 @@ pub fn addError(step: *Step, comptime fmt: []const u8, args: anytype) error{OutO
pub const ZigProcess = struct {
child: std.process.Child,
poller: Io.Poller(StreamEnum),
progress_ipc_fd: if (std.Progress.have_ipc) ?std.posix.fd_t else void,
pub const StreamEnum = enum { stdout, stderr };
pub fn deinit(zp: *ZigProcess, gpa: Allocator, io: Io) void {
_ = gpa;
zp.child.kill(io);
zp.* = undefined;
}
};
/// Assumes that argv contains `--listen=-` and that the process being spawned
@ -459,14 +464,10 @@ pub fn evalZigProcess(
zp.* = .{
.child = zp.child,
.poller = Io.poll(gpa, ZigProcess.StreamEnum, .{
.stdout = zp.child.stdout.?,
.stderr = zp.child.stderr.?,
}),
.progress_ipc_fd = if (std.Progress.have_ipc) prog_node.getIpcFd() else {},
};
if (watch) s.setZigProcess(zp);
defer if (!watch) zp.poller.deinit();
defer if (!watch) zp.deinit(gpa, io);
const result = try zigProcessUpdate(s, zp, watch, web_server, gpa);
@ -526,6 +527,9 @@ fn zigProcessUpdate(s: *Step, zp: *ZigProcess, watch: bool, web_server: ?*Build.
const arena = b.allocator;
const io = b.graph.io;
var stderr_task = try io.concurrent(readStreamAlloc, .{ gpa, io, zp.child.stderr.?, .unlimited });
defer if (stderr_task.cancel(io)) |slice| gpa.free(slice) else |_| {};
var timer = try std.time.Timer.start();
try sendMessage(io, zp.child.stdin.?, .update);
@ -533,14 +537,18 @@ fn zigProcessUpdate(s: *Step, zp: *ZigProcess, watch: bool, web_server: ?*Build.
var result: ?Path = null;
const stdout = zp.poller.reader(.stdout);
var stdout_buffer: [512]u8 = undefined;
var stdout_reader: Io.File.Reader = .initStreaming(zp.child.stdout.?, io, &stdout_buffer);
const stdout = &stdout_reader.interface;
poll: while (true) {
var body_buffer: std.ArrayList(u8) = .empty;
while (true) {
const Header = std.zig.Server.Message.Header;
while (stdout.buffered().len < @sizeOf(Header)) if (!try zp.poller.poll()) break :poll;
const header = stdout.takeStruct(Header, .little) catch unreachable;
while (stdout.buffered().len < header.bytes_len) if (!try zp.poller.poll()) break :poll;
const body = stdout.take(header.bytes_len) catch unreachable;
const header = try stdout.takeStruct(Header, .little);
body_buffer.clearRetainingCapacity();
try stdout.appendExact(gpa, &body_buffer, header.bytes_len);
const body = body_buffer.items;
switch (header.tag) {
.zig_version => {
if (!std.mem.eql(u8, builtin.zig_version_string, body)) {
@ -553,11 +561,11 @@ fn zigProcessUpdate(s: *Step, zp: *ZigProcess, watch: bool, web_server: ?*Build.
.error_bundle => {
s.result_error_bundle = try std.zig.Server.allocErrorBundle(gpa, body);
// This message indicates the end of the update.
if (watch) break :poll;
if (watch) break;
},
.emit_digest => {
const EmitDigest = std.zig.Server.Message.EmitDigest;
const emit_digest = @as(*align(1) const EmitDigest, @ptrCast(body));
const emit_digest: *align(1) const EmitDigest = @ptrCast(body);
s.result_cached = emit_digest.flags.cache_hit;
const digest = body[@sizeOf(EmitDigest)..][0..Cache.bin_digest_len];
result = .{
@ -631,7 +639,8 @@ fn zigProcessUpdate(s: *Step, zp: *ZigProcess, watch: bool, web_server: ?*Build.
s.result_duration_ns = timer.read();
const stderr_contents = try zp.poller.toOwnedSlice(.stderr);
const stderr_contents = try stderr_task.await(io);
defer gpa.free(stderr_contents);
if (stderr_contents.len > 0) {
try s.result_error_msgs.append(arena, try arena.dupe(u8, stderr_contents));
}
@ -639,6 +648,14 @@ fn zigProcessUpdate(s: *Step, zp: *ZigProcess, watch: bool, web_server: ?*Build.
return result;
}
fn readStreamAlloc(gpa: Allocator, io: Io, file: Io.File, limit: Io.Limit) ![]u8 {
var file_reader: Io.File.Reader = .initStreaming(file, io, &.{});
return file_reader.interface.allocRemaining(gpa, limit) catch |err| switch (err) {
error.ReadFailed => return file_reader.err.?,
else => |e| return e,
};
}
pub fn getZigProcess(s: *Step) ?*ZigProcess {
return switch (s.id) {
.compile => s.cast(Compile).?.zig_process,

View file

@ -149,6 +149,8 @@ pub const VTable = struct {
futexWaitUncancelable: *const fn (?*anyopaque, ptr: *const u32, expected: u32) void,
futexWake: *const fn (?*anyopaque, ptr: *const u32, max_waiters: u32) void,
operate: *const fn (?*anyopaque, []Operation, n_wait: usize, Timeout) OperateError!void,
dirCreateDir: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions) Dir.CreateDirError!void,
dirCreateDirPath: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions) Dir.CreateDirPathError!Dir.CreatePathStatus,
dirCreateDirPathOpen: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions, Dir.OpenOptions) Dir.CreateDirPathOpenError!Dir,
@ -184,8 +186,6 @@ pub const VTable = struct {
fileWriteFileStreaming: *const fn (?*anyopaque, File, header: []const u8, *Io.File.Reader, Io.Limit) File.Writer.WriteFileError!usize,
fileWriteFilePositional: *const fn (?*anyopaque, File, header: []const u8, *Io.File.Reader, Io.Limit, offset: u64) File.WriteFilePositionalError!usize,
/// Returns 0 on end of stream.
fileReadStreaming: *const fn (?*anyopaque, File, data: []const []u8) File.Reader.Error!usize,
/// Returns 0 on end of stream.
fileReadPositional: *const fn (?*anyopaque, File, data: []const []u8, offset: u64) File.ReadPositionalError!usize,
fileSeekBy: *const fn (?*anyopaque, File, relative_offset: i64) File.SeekError!void,
fileSeekTo: *const fn (?*anyopaque, File, absolute_offset: u64) File.SeekError!void,
@ -252,6 +252,38 @@ pub const VTable = struct {
netLookup: *const fn (?*anyopaque, net.HostName, *Queue(net.HostName.LookupResult), net.HostName.LookupOptions) net.HostName.LookupError!void,
};
pub const Operation = union(enum) {
noop,
file_read_streaming: FileReadStreaming,
pub const FileReadStreaming = struct {
file: File,
data: []const []u8,
/// Causes `result` to return `error.WouldBlock` instead of blocking.
nonblocking: bool = false,
/// Returns 0 on end of stream.
result: File.Reader.Error!usize,
};
};
pub const OperateError = error{ Canceled, Timeout };
/// Performs all `operations` in a non-deterministic order. Returns after all
/// `operations` have been attempted. The degree to which the operations are
/// performed concurrently is determined by the `Io` implementation.
///
/// `n_wait` is an amount of operations between `0` and `operations.len` that
/// determines how many attempted operations must complete before `operate`
/// returns. Operation completion is defined by returning a value other than
/// `error.WouldBlock`. If the operation cannot return `error.WouldBlock`, it
/// always counts as completing.
///
/// In the event `error.Canceled` is returned, any number of `operations` may
/// still have been completed successfully.
pub fn operate(io: Io, operations: []Operation, n_wait: usize, timeout: Timeout) OperateError!void {
return io.vtable.operate(io.userdata, operations, n_wait, timeout);
}
pub const Limit = enum(usize) {
nothing = 0,
unlimited = math.maxInt(usize),

View file

@ -554,7 +554,13 @@ pub fn setTimestampsNow(file: File, io: Io) SetTimestampsError!void {
/// See also:
/// * `reader`
pub fn readStreaming(file: File, io: Io, buffer: []const []u8) Reader.Error!usize {
return io.vtable.fileReadStreaming(io.userdata, file, buffer);
var operation: Io.Operation = .{ .file_read_streaming = .{
.file = file,
.data = buffer,
.result = undefined,
} };
io.vtable.operate(io.userdata, (&operation)[0..1], 1, .none) catch unreachable;
return operation.file_read_streaming.result;
}
pub const ReadPositionalError = error{

View file

@ -300,7 +300,7 @@ fn readVecStreaming(r: *Reader, data: [][]u8) Io.Reader.Error!usize {
const dest_n, const data_size = try r.interface.writableVector(&iovecs_buffer, data);
const dest = iovecs_buffer[0..dest_n];
assert(dest[0].len > 0);
const n = io.vtable.fileReadStreaming(io.userdata, r.file, dest) catch |err| {
const n = r.file.readStreaming(io, dest) catch |err| {
r.err = err;
return error.ReadFailed;
};
@ -355,7 +355,7 @@ fn discard(io_reader: *Io.Reader, limit: Io.Limit) Io.Reader.Error!usize {
const dest_n, const data_size = try r.interface.writableVector(&iovecs_buffer, &data);
const dest = iovecs_buffer[0..dest_n];
assert(dest[0].len > 0);
const n = io.vtable.fileReadStreaming(io.userdata, file, dest) catch |err| {
const n = file.readStreaming(io, dest) catch |err| {
r.err = err;
return error.ReadFailed;
};

View file

@ -315,6 +315,27 @@ pub fn allocRemainingAlignedSentinel(
}
}
pub const AppendExactError = Allocator.Error || Error;
/// Transfers exactly `n` bytes from the reader to the `ArrayList`.
///
/// See also:
/// * `appendRemaining`
pub fn appendExact(
r: *Reader,
gpa: Allocator,
list: *ArrayList(u8),
n: usize,
) AppendExactError!void {
try list.ensureUnusedCapacity(gpa, n);
var a = std.Io.Writer.Allocating.fromArrayList(gpa, list);
defer list.* = a.toArrayList();
streamExact(r, &a.writer, n) catch |err| switch (err) {
error.ReadFailed, error.EndOfStream => |e| return e,
error.WriteFailed => unreachable,
};
}
/// Transfers all bytes from the current position to the end of the stream, up
/// to `limit`, appending them to `list`.
///

View file

@ -1586,6 +1586,8 @@ pub fn io(t: *Threaded) Io {
.futexWaitUncancelable = futexWaitUncancelable,
.futexWake = futexWake,
.operate = operate,
.dirCreateDir = dirCreateDir,
.dirCreateDirPath = dirCreateDirPath,
.dirCreateDirPathOpen = dirCreateDirPathOpen,
@ -1620,7 +1622,6 @@ pub fn io(t: *Threaded) Io {
.fileWritePositional = fileWritePositional,
.fileWriteFileStreaming = fileWriteFileStreaming,
.fileWriteFilePositional = fileWriteFilePositional,
.fileReadStreaming = fileReadStreaming,
.fileReadPositional = fileReadPositional,
.fileSeekBy = fileSeekBy,
.fileSeekTo = fileSeekTo,
@ -1746,6 +1747,8 @@ pub fn ioBasic(t: *Threaded) Io {
.futexWaitUncancelable = futexWaitUncancelable,
.futexWake = futexWake,
.operate = operate,
.dirCreateDir = dirCreateDir,
.dirCreateDirPath = dirCreateDirPath,
.dirCreateDirPathOpen = dirCreateDirPathOpen,
@ -1780,7 +1783,6 @@ pub fn ioBasic(t: *Threaded) Io {
.fileWritePositional = fileWritePositional,
.fileWriteFileStreaming = fileWriteFileStreaming,
.fileWriteFilePositional = fileWriteFilePositional,
.fileReadStreaming = fileReadStreaming,
.fileReadPositional = fileReadPositional,
.fileSeekBy = fileSeekBy,
.fileSeekTo = fileSeekTo,
@ -2447,6 +2449,87 @@ fn futexWake(userdata: ?*anyopaque, ptr: *const u32, max_waiters: u32) void {
Thread.futexWake(ptr, max_waiters);
}
fn operate(userdata: ?*anyopaque, operations: []Io.Operation, n_wait: usize, timeout: Io.Timeout) Io.OperateError!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const t_io = ioBasic(t);
if (is_windows) @panic("TODO");
const deadline = timeout.toDeadline(t_io) catch |err| switch (err) {
error.UnsupportedClock, error.Unexpected => null,
};
var poll_buffer: [100]posix.pollfd = undefined;
var map_buffer: [poll_buffer.len]u8 = undefined; // poll_buffer index to operations index
var poll_i: usize = 0;
var completed: usize = 0;
// Put all the file reads with nonblocking enabled into the poll set.
if (operations.len > poll_buffer.len) @panic("TODO");
// TODO if any operation is canceled, cancel the rest
for (operations, 0..) |*operation, operation_index| switch (operation.*) {
.noop => continue,
.file_read_streaming => |*o| {
if (o.nonblocking) {
o.result = error.WouldBlock;
poll_buffer[poll_i] = .{
.fd = o.file.handle,
.events = posix.POLL.IN,
.revents = undefined,
};
map_buffer[poll_i] = @intCast(operation_index);
poll_i += 1;
} else {
o.result = fileReadStreaming(o.file, o.data);
completed += 1;
}
},
};
if (poll_i == 0) {
@branchHint(.likely);
return;
}
const max_poll_ms = std.math.maxInt(i32);
while (completed < n_wait) {
const timeout_ms: i32 = if (deadline) |d| t: {
const duration = d.durationFromNow(t_io) catch @panic("TODO make this unreachable");
if (duration.raw.nanoseconds <= 0) return error.Timeout;
break :t @intCast(@min(max_poll_ms, duration.raw.toMilliseconds()));
} else -1;
const syscall = try Syscall.start();
const poll_rc = posix.system.poll(&poll_buffer, poll_i, timeout_ms);
syscall.finish();
switch (posix.errno(poll_rc)) {
.SUCCESS => {
if (poll_rc == 0) {
// Although spurious timeouts are OK, when no deadline
// is passed we must not return `error.Timeout`.
if (deadline == null) continue;
return error.Timeout;
}
for (poll_buffer[0..poll_i], map_buffer[0..poll_i]) |*poll_fd, operation_index| {
if (poll_fd.revents == 0) continue;
poll_fd.fd = -1; // Disarm this operation.
switch (operations[operation_index]) {
.noop => unreachable,
.file_read_streaming => |*o| {
o.result = fileReadStreaming(o.file, o.data);
completed += 1;
},
}
}
},
.INTR => continue,
else => @panic("TODO handle unexpected error from poll()"),
}
}
}
const dirCreateDir = switch (native_os) {
.windows => dirCreateDirWindows,
.wasi => dirCreateDirWasi,

View file

@ -454,13 +454,17 @@ pub fn spawnPath(io: Io, dir: Io.Dir, options: SpawnOptions) SpawnError!Child {
}
pub const RunError = CurrentPathError || posix.ReadError || SpawnError || posix.PollError || error{
StdoutStreamTooLong,
StderrStreamTooLong,
StreamTooLong,
};
pub const RunOptions = struct {
argv: []const []const u8,
max_output_bytes: usize = 50 * 1024,
stderr_limit: Io.Limit = .unlimited,
stdout_limit: Io.Limit = .unlimited,
/// How many bytes to initially allocate for stderr.
stderr_reserve_amount: usize = 1,
/// How many bytes to initially allocate for stdout.
stdout_reserve_amount: usize = 1,
/// Set to change the current working directory when spawning the child process.
cwd: ?[]const u8 = null,
@ -486,6 +490,7 @@ pub const RunOptions = struct {
create_no_window: bool = true,
/// Darwin-only. Disable ASLR for the child process.
disable_aslr: bool = false,
timeout: Io.Timeout = .none,
};
pub const RunResult = struct {
@ -518,7 +523,17 @@ pub fn run(gpa: Allocator, io: Io, options: RunOptions) RunError!RunResult {
var stderr: std.ArrayList(u8) = .empty;
defer stderr.deinit(gpa);
try child.collectOutput(gpa, &stdout, &stderr, options.max_output_bytes);
try stdout.ensureUnusedCapacity(gpa, options.stdout_reserve_amount);
try stderr.ensureUnusedCapacity(gpa, options.stderr_reserve_amount);
try child.collectOutput(io, .{
.allocator = gpa,
.stdout = &stdout,
.stderr = &stderr,
.stdout_limit = options.stdout_limit,
.stderr_limit = options.stderr_limit,
.timeout = options.timeout,
});
const term = try child.wait(io);

View file

@ -9,7 +9,6 @@ const process = std.process;
const File = std.Io.File;
const assert = std.debug.assert;
const Allocator = std.mem.Allocator;
const ArrayList = std.ArrayList;
pub const Id = switch (native_os) {
.windows => std.os.windows.HANDLE,
@ -126,53 +125,80 @@ pub fn wait(child: *Child, io: Io) WaitError!Term {
return io.vtable.childWait(io.userdata, child);
}
/// Collect the output from the process's stdout and stderr. Will return once all output
/// has been collected. This does not mean that the process has ended. `wait` should still
/// be called to wait for and clean up the process.
pub const CollectOutputError = error{
Timeout,
StreamTooLong,
} || Allocator.Error || Io.File.Reader.Error;
pub const CollectOutputOptions = struct {
stdout: *std.ArrayList(u8),
stderr: *std.ArrayList(u8),
/// Used for `stdout` and `stderr`. If not provided, only the existing
/// capacity will be used.
allocator: ?Allocator = null,
stdout_limit: Io.Limit = .unlimited,
stderr_limit: Io.Limit = .unlimited,
timeout: Io.Timeout = .none,
};
/// Collect the output from the process's stdout and stderr. Will return once
/// all output has been collected. This does not mean that the process has
/// ended. `wait` should still be called to wait for and clean up the process.
///
/// The process must have been started with stdout and stderr set to
/// `process.SpawnOptions.StdIo.pipe`.
pub fn collectOutput(
child: *const Child,
/// Used for `stdout` and `stderr`.
allocator: Allocator,
stdout: *ArrayList(u8),
stderr: *ArrayList(u8),
max_output_bytes: usize,
) !void {
var poller = std.Io.poll(allocator, enum { stdout, stderr }, .{
.stdout = child.stdout.?,
.stderr = child.stderr.?,
});
defer poller.deinit();
const stdout_r = poller.reader(.stdout);
stdout_r.buffer = stdout.allocatedSlice();
stdout_r.seek = 0;
stdout_r.end = stdout.items.len;
const stderr_r = poller.reader(.stderr);
stderr_r.buffer = stderr.allocatedSlice();
stderr_r.seek = 0;
stderr_r.end = stderr.items.len;
defer {
stdout.* = .{
.items = stdout_r.buffer[0..stdout_r.end],
.capacity = stdout_r.buffer.len,
};
stderr.* = .{
.items = stderr_r.buffer[0..stderr_r.end],
.capacity = stderr_r.buffer.len,
};
stdout_r.buffer = &.{};
stderr_r.buffer = &.{};
}
while (try poller.poll()) {
if (stdout_r.bufferedLen() > max_output_bytes)
return error.StdoutStreamTooLong;
if (stderr_r.bufferedLen() > max_output_bytes)
return error.StderrStreamTooLong;
pub fn collectOutput(child: *const Child, io: Io, options: CollectOutputOptions) CollectOutputError!void {
const files: [2]Io.File = .{ child.stdout.?, child.stderr.? };
const lists: [2]*std.ArrayList(u8) = .{ options.stdout, options.stderr };
const limits: [2]Io.Limit = .{ options.stdout_limit, options.stderr_limit };
var dones: [2]bool = .{ false, false };
var reads: [2]Io.Operation = undefined;
var vecs: [2][1][]u8 = undefined;
while (true) {
for (&reads, &lists, &files, dones, &vecs) |*read, list, file, done, *vec| {
if (done) {
read.* = .noop;
continue;
}
if (options.allocator) |gpa| try list.ensureUnusedCapacity(gpa, 1);
const cap = list.unusedCapacitySlice();
if (cap.len == 0) return error.StreamTooLong;
vec[0] = cap;
read.* = .{ .file_read_streaming = .{
.file = file,
.data = vec,
.nonblocking = true,
.result = undefined,
} };
}
var all_done = true;
var any_canceled = false;
var other_err: (error{StreamTooLong} || Io.File.Reader.Error)!void = {};
const op_result = io.vtable.operate(io.userdata, &reads, 1, options.timeout);
for (&reads, &lists, &limits, &dones) |*read, list, limit, *done| {
if (done.*) continue;
const n = read.file_read_streaming.result catch |err| switch (err) {
error.Canceled => {
any_canceled = true;
continue;
},
error.WouldBlock => continue,
else => |e| {
other_err = e;
continue;
},
};
if (n == 0) {
done.* = true;
} else {
all_done = false;
}
list.items.len += n;
if (list.items.len > @intFromEnum(limit)) other_err = error.StreamTooLong;
}
if (any_canceled) return error.Canceled;
try op_result; // could be error.Canceled
try other_err;
if (all_done) return;
}
}