From 642f329ac91d69d02588ba15714edafb09e709da Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Fri, 9 Jan 2026 15:06:50 -0800 Subject: [PATCH] std.Io: exploring a different batch API proposal --- lib/std/Io.zig | 95 ++++++++++++-- lib/std/Io/File.zig | 5 +- lib/std/Io/Threaded.zig | 260 +++++++++++++++++++++++++------------- lib/std/process.zig | 16 +-- lib/std/process/Child.zig | 92 +++++++------- 5 files changed, 314 insertions(+), 154 deletions(-) diff --git a/lib/std/Io.zig b/lib/std/Io.zig index c52b51e00a..d916bb6995 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -149,7 +149,10 @@ pub const VTable = struct { futexWaitUncancelable: *const fn (?*anyopaque, ptr: *const u32, expected: u32) void, futexWake: *const fn (?*anyopaque, ptr: *const u32, max_waiters: u32) void, - operate: *const fn (?*anyopaque, []Operation) void, + batch: *const fn (?*anyopaque, []Operation) ConcurrentError!void, + batchSubmit: *const fn (?*anyopaque, *Batch) void, + batchWait: *const fn (?*anyopaque, *Batch, resubmissions: []const usize, Timeout) Batch.WaitError!usize, + batchCancel: *const fn (?*anyopaque, *Batch) void, dirCreateDir: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions) Dir.CreateDirError!void, dirCreateDirPath: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions) Dir.CreateDirPathError!Dir.CreatePathStatus, @@ -253,26 +256,96 @@ pub const VTable = struct { }; pub const Operation = union(enum) { - noop, + noop: Noop, file_read_streaming: FileReadStreaming, + pub const Noop = struct { + reserved: [2]usize, + status: Status(void) = .{ .result = {} }, + }; + + /// Returns 0 on end of stream. pub const FileReadStreaming = struct { file: File, data: []const []u8, - /// Causes `result` to return `error.WouldBlock` instead of blocking. - nonblocking: bool = false, - /// Returns 0 on end of stream. - result: File.Reader.Error!usize, + status: Status(File.Reader.Error!usize) = .{ .unstarted = {} }, }; + + pub fn Status(Result: type) type { + return union { + unstarted: void, + pending: usize, + result: Result, + }; + } }; -/// Performs all `operations` in a non-deterministic order. Returns after all -/// `operations` have been completed. The degree to which the operations are -/// performed concurrently is determined by the `Io` implementation. -pub fn operate(io: Io, operations: []Operation) void { - return io.vtable.operate(io.userdata, operations); +/// Performs all `operations` in an unspecified order, concurrently. +/// +/// Returns after all `operations` have been completed. If the operations could +/// not be completed concurrently, returns `error.ConcurrencyUnavailable`. +/// +/// With this API, it is rare for concurrency to not be available. Even a +/// single-threaded `Io` implementation can, for example, take advantage of +/// poll() to implement this. Note that poll() is fallible however. +/// +/// If `operations.len` is one, `error.ConcurrencyUnavailable` is unreachable. +/// +/// On entry, all operations must already have `.status = .unstarted` except +/// noops must have `.status = .{ .result = {} }`, to safety check the state +/// transitions. +/// +/// On return, all operations have `.status = .{ .result = ... }`. +pub fn batch(io: Io, operations: []Operation) ConcurrentError!void { + return io.vtable.batch(io.userdata, operations); } +/// Performs one `Operation`. +pub fn operate(io: Io, operation: *Operation) void { + return io.vtable.batch(io.userdata, (operation)[0..1]) catch unreachable; +} + +/// Submits many operations together without waiting for all of them to +/// complete. +/// +/// This is a low-level abstraction based on `Operation`. For a higher +/// level API that operates on `Future`, see `Select`. +pub const Batch = struct { + operations: []Operation, + index: usize, + reserved: ?*anyopaque, + + pub fn init(operations: []Operation) Batch { + return .{ .operations = operations, .index = 0, .reserved = null }; + } + + /// Submits all non-noop `operations`. + pub fn submit(b: *Batch, io: Io) void { + return io.vtable.batchSubmit(io.userdata, b); + } + + pub const WaitError = ConcurrentError || Cancelable || Timeout.Error; + + /// Resubmits the previously completed or noop-initialized `operations` at + /// indexes given by `resubmissions`. This set of indexes typically will be empty + /// on the first call to `await` since all operations have already been + /// submitted via `async`. + /// + /// Returns the index of a completed `Operation`, or `operations.len` if + /// all operations are completed. + /// + /// When `error.Canceled` is returned, all operations have already completed. + pub fn wait(b: *Batch, io: Io, resubmissions: []const usize, timeout: Timeout) WaitError!usize { + return io.vtable.batchWait(io.userdata, b, resubmissions, timeout); + } + + /// Returns after all `operations` have completed. Each operation + /// independently may or may not have been canceled. + pub fn cancel(b: *Batch, io: Io) void { + return io.vtable.batchCancel(io.userdata, b); + } +}; + pub const Limit = enum(usize) { nothing = 0, unlimited = math.maxInt(usize), diff --git a/lib/std/Io/File.zig b/lib/std/Io/File.zig index 16663eb484..f27f249975 100644 --- a/lib/std/Io/File.zig +++ b/lib/std/Io/File.zig @@ -557,10 +557,9 @@ pub fn readStreaming(file: File, io: Io, buffer: []const []u8) Reader.Error!usiz var operation: Io.Operation = .{ .file_read_streaming = .{ .file = file, .data = buffer, - .result = undefined, } }; - io.vtable.operate(io.userdata, (&operation)[0..1]); - return operation.file_read_streaming.result; + io.operate(&operation); + return operation.file_read_streaming.status.result; } pub const ReadPositionalError = error{ diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 55b3596ee7..0eda4e8fdc 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -1587,7 +1587,10 @@ pub fn io(t: *Threaded) Io { .futexWaitUncancelable = futexWaitUncancelable, .futexWake = futexWake, - .operate = operate, + .batch = batch, + .batchSubmit = batchSubmit, + .batchWait = batchWait, + .batchCancel = batchCancel, .dirCreateDir = dirCreateDir, .dirCreateDirPath = dirCreateDirPath, @@ -1748,7 +1751,10 @@ pub fn ioBasic(t: *Threaded) Io { .futexWaitUncancelable = futexWaitUncancelable, .futexWake = futexWake, - .operate = operate, + .batch = batch, + .batchSubmit = batchSubmit, + .batchWait = batchWait, + .batchCancel = batchCancel, .dirCreateDir = dirCreateDir, .dirCreateDirPath = dirCreateDirPath, @@ -2450,107 +2456,187 @@ fn futexWake(userdata: ?*anyopaque, ptr: *const u32, max_waiters: u32) void { Thread.futexWake(ptr, max_waiters); } -fn operate(userdata: ?*anyopaque, operations: []Io.Operation) void { +fn batchSubmit(userdata: ?*anyopaque, b: *Io.Batch) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; + _ = b; + return; +} + +fn operate(op: *Io.Operation) void { + switch (op.*) { + .noop => {}, + .file_read_streaming => |*o| o.status = .{ .result = fileReadStreaming(o.file, o.data) }, + } +} + +fn batchWait( + userdata: ?*anyopaque, + b: *Io.Batch, + resubmissions: []const usize, + timeout: Io.Timeout, +) Io.Batch.WaitError!usize { + _ = resubmissions; + const t: *Threaded = @ptrCast(@alignCast(userdata)); + const operations = b.operations; + if (operations.len == 1) { + operate(&operations[0]); + return b.operations.len; + } + if (is_windows) @panic("TODO"); + + var poll_buffer: [poll_buffer_len]posix.pollfd = undefined; + var map_buffer: [poll_buffer_len]u8 = undefined; // poll_buffer index to operations index + var poll_i: usize = 0; + + for (operations, 0..) |*op, operation_index| switch (op.*) { + .noop => continue, + .file_read_streaming => |*o| { + if (poll_buffer.len - poll_i == 0) return error.ConcurrencyUnavailable; + poll_buffer[poll_i] = .{ + .fd = o.file.handle, + .events = posix.POLL.IN, + .revents = 0, + }; + map_buffer[poll_i] = @intCast(operation_index); + poll_i += 1; + }, + }; + + if (poll_i == 0) return operations.len; + + const t_io = ioBasic(t); + const deadline = timeout.toDeadline(t_io) catch return error.UnsupportedClock; + const max_poll_ms = std.math.maxInt(i32); + + while (true) { + const timeout_ms: i32 = if (deadline) |d| t: { + const duration = d.durationFromNow(t_io) catch return error.UnsupportedClock; + if (duration.raw.nanoseconds <= 0) return error.Timeout; + break :t @intCast(@min(max_poll_ms, duration.raw.toMilliseconds())); + } else -1; + const syscall = try Syscall.start(); + const rc = posix.system.poll(&poll_buffer, poll_i, timeout_ms); + syscall.finish(); + switch (posix.errno(rc)) { + .SUCCESS => { + if (rc == 0) { + // Although spurious timeouts are OK, when no deadline is + // passed we must not return `error.Timeout`. + if (deadline == null) continue; + return error.Timeout; + } + for (poll_buffer[0..poll_i], map_buffer[0..poll_i]) |*poll_fd, i| { + if (poll_fd.revents == 0) continue; + operate(&operations[i]); + return i; + } + }, + .INTR => continue, + else => return error.ConcurrencyUnavailable, + } + } +} + +fn batchCancel(userdata: ?*anyopaque, b: *Io.Batch) void { + const t: *Threaded = @ptrCast(@alignCast(userdata)); + _ = t; + _ = b; + return; +} + +fn batch(userdata: ?*anyopaque, operations: []Io.Operation) Io.ConcurrentError!void { + const t: *Threaded = @ptrCast(@alignCast(userdata)); + _ = t; + + if (operations.len == 1) { + @branchHint(.likely); + return operate(&operations[0]); + } if (is_windows) @panic("TODO"); var poll_buffer: [poll_buffer_len]posix.pollfd = undefined; var map_buffer: [poll_buffer_len]u8 = undefined; // poll_buffer index to operations index - var operation_index: usize = 0; + var poll_i: usize = 0; - while (operation_index < operations.len) { - var poll_i: usize = 0; - while (operation_index < operations.len) : (operation_index += 1) { - switch (operations[operation_index]) { - .noop => continue, - .file_read_streaming => |*o| { - if (o.nonblocking) { - o.result = error.WouldBlock; - poll_buffer[poll_i] = .{ - .fd = o.file.handle, - .events = posix.POLL.IN, - .revents = 0, - }; - if (map_buffer.len - poll_i == 0) break; - map_buffer[poll_i] = @intCast(operation_index); - poll_i += 1; - } else { - o.result = fileReadStreaming(o.file, o.data) catch |err| switch (err) { - error.Canceled => { - setOperationsError(operations[operation_index..], error.Canceled); - return; - }, - else => err, - }; - } - }, - } - } - - if (poll_i == 0) { - @branchHint(.likely); - return; - } - - while (true) { - const syscall = Syscall.start() catch |err| switch (err) { - error.Canceled => { - setPollOperationsError(operations, map_buffer[0..poll_i], error.Canceled); - setOperationsError(operations[operation_index..], error.Canceled); - return; - }, + for (operations, 0..) |*op, operation_index| switch (op.*) { + .noop => continue, + .file_read_streaming => |*o| { + if (poll_buffer.len - poll_i == 0) return error.ConcurrencyUnavailable; + poll_buffer[poll_i] = .{ + .fd = o.file.handle, + .events = posix.POLL.IN, + .revents = 0, }; - const poll_rc = posix.system.poll(&poll_buffer, poll_i, -1); - syscall.finish(); - switch (posix.errno(poll_rc)) { - .SUCCESS => { - if (poll_rc == 0) { - // Spurious timeout; handle same as INTR. - continue; - } - for (poll_buffer[0..poll_i], map_buffer[0..poll_i]) |*poll_fd, i| { - if (poll_fd.revents == 0) continue; - switch (operations[i]) { - .noop => unreachable, - .file_read_streaming => |*o| { - o.result = fileReadStreaming(o.file, o.data); - }, - } - } - break; - }, - .INTR => continue, - .NOMEM => { - setPollOperationsError(operations, map_buffer[0..poll_i], error.SystemResources); - break; - }, - else => { - setPollOperationsError(operations, map_buffer[0..poll_i], error.Unexpected); - break; - }, - } + map_buffer[poll_i] = @intCast(operation_index); + poll_i += 1; + }, + }; + + const polls = poll_buffer[0..poll_i]; + const map = map_buffer[0..poll_i]; + + var pending = poll_i; + while (pending > 1) { + const syscall = Syscall.start() catch |err| switch (err) { + error.Canceled => { + if (!setOperationsError(operations, polls, map, error.Canceled)) + recancelInner(); + return; + }, + }; + const rc = posix.system.poll(polls.ptr, polls.len, -1); + syscall.finish(); + switch (posix.errno(rc)) { + .SUCCESS => { + if (rc == 0) { + // Spurious timeout; handle the same as INTR. + continue; + } + for (polls, map) |*poll_fd, i| { + if (poll_fd.revents == 0) continue; + poll_fd.fd = -1; + pending -= 1; + operate(&operations[i]); + } + }, + .INTR => continue, + .NOMEM => { + assert(setOperationsError(operations, polls, map, error.SystemResources)); + return; + }, + else => { + assert(setOperationsError(operations, polls, map, error.Unexpected)); + return; + }, } } + + if (pending == 1) for (poll_buffer[0..poll_i], map_buffer[0..poll_i]) |*poll_fd, i| { + if (poll_fd.fd == -1) continue; + operate(&operations[i]); + }; } -fn setPollOperationsError( +fn setOperationsError( operations: []Io.Operation, + polls: []const posix.pollfd, map: []const u8, err: error{ Canceled, SystemResources, Unexpected }, -) void { - for (map) |operation_index| switch (operations[operation_index]) { - .noop => unreachable, - inline else => |*o| o.result = err, - }; -} - -fn setOperationsError(operations: []Io.Operation, err: error{ Canceled, SystemResources, Unexpected }) void { - for (operations) |*op| switch (op.*) { - .noop => unreachable, - inline else => |*o| o.result = err, - }; +) bool { + var marked = false; + for (polls, map) |*poll_fd, i| { + if (poll_fd.fd == -1) continue; + switch (operations[i]) { + .noop => unreachable, + inline else => |*o| { + o.status = .{ .result = err }; + marked = true; + }, + } + } + return marked; } const dirCreateDir = switch (native_os) { diff --git a/lib/std/process.zig b/lib/std/process.zig index 4a021879a5..b5de41f5d8 100644 --- a/lib/std/process.zig +++ b/lib/std/process.zig @@ -453,9 +453,7 @@ pub fn spawnPath(io: Io, dir: Io.Dir, options: SpawnOptions) SpawnError!Child { return io.vtable.processSpawnPath(io.userdata, dir, options); } -pub const RunError = CurrentPathError || posix.ReadError || SpawnError || posix.PollError || error{ - StreamTooLong, -}; +pub const RunError = SpawnError || Child.CollectOutputError; pub const RunOptions = struct { argv: []const []const u8, @@ -535,13 +533,15 @@ pub fn run(gpa: Allocator, io: Io, options: RunOptions) RunError!RunResult { const term = try child.wait(io); - const owned_stdout = try stdout.toOwnedSlice(gpa); - errdefer gpa.free(owned_stdout); - const owned_stderr = try stderr.toOwnedSlice(gpa); + const stdout_slice = try stdout.toOwnedSlice(gpa); + errdefer gpa.free(stdout_slice); + + const stderr_slice = try stderr.toOwnedSlice(gpa); + errdefer gpa.free(stderr_slice); return .{ - .stdout = owned_stdout, - .stderr = owned_stderr, + .stdout = stdout_slice, + .stderr = stderr_slice, .term = term, }; } diff --git a/lib/std/process/Child.zig b/lib/std/process/Child.zig index fc31014520..9d31b77080 100644 --- a/lib/std/process/Child.zig +++ b/lib/std/process/Child.zig @@ -125,7 +125,9 @@ pub fn wait(child: *Child, io: Io) WaitError!Term { return io.vtable.childWait(io.userdata, child); } -pub const CollectOutputError = error{StreamTooLong} || Allocator.Error || Io.File.Reader.Error; +pub const CollectOutputError = error{ + StreamTooLong, +} || Io.ConcurrentError || Allocator.Error || Io.File.Reader.Error || Io.Timeout.Error; pub const CollectOutputOptions = struct { stdout: *std.ArrayList(u8), @@ -135,6 +137,7 @@ pub const CollectOutputOptions = struct { allocator: ?Allocator = null, stdout_limit: Io.Limit = .unlimited, stderr_limit: Io.Limit = .unlimited, + timeout: Io.Timeout = .none, }; /// Collect the output from the process's stdout and stderr. Will return once @@ -144,56 +147,55 @@ pub const CollectOutputOptions = struct { /// The process must have been started with stdout and stderr set to /// `process.SpawnOptions.StdIo.pipe`. pub fn collectOutput(child: *const Child, io: Io, options: CollectOutputOptions) CollectOutputError!void { - const files: [2]Io.File = .{ child.stdout.?, child.stderr.? }; const lists: [2]*std.ArrayList(u8) = .{ options.stdout, options.stderr }; const limits: [2]Io.Limit = .{ options.stdout_limit, options.stderr_limit }; - var dones: [2]bool = .{ false, false }; - var reads: [2]Io.Operation = undefined; + + if (options.allocator) |gpa| { + for (lists) |list| try list.ensureUnusedCapacity(gpa, 1); + } else { + for (lists) |list| { + if (list.unusedCapacitySlice().len == 0) + return error.StreamTooLong; + } + } + var vecs: [2][1][]u8 = undefined; - while (true) { - for (&reads, &lists, &files, dones, &vecs) |*read, list, file, done, *vec| { - if (done) { - read.* = .noop; - continue; - } + for (lists, &vecs) |list, *vec| + vec[0] = list.unusedCapacitySlice(); + + var operations: [2]Io.Operation = .{ + .{ .file_read_streaming = .{ + .file = child.stdout.?, + .data = &vecs[0], + } }, + .{ .file_read_streaming = .{ + .file = child.stderr.?, + .data = &vecs[1], + } }, + }; + + var batch: Io.Batch = .init(&operations); + batch.submit(io); + defer batch.cancel(io); + + var pending = operations.len; + var retry_index: ?usize = null; + while (pending > 0) { + const resubmissions: []const usize = if (retry_index) |i| &.{i} else &.{}; + const index = try batch.wait(io, resubmissions, options.timeout); + const n = try operations[index].file_read_streaming.status.result; + if (n == 0) { + pending -= 1; + } else { + retry_index = index; + const list = lists[index]; + const limit = limits[index]; + list.items.len += n; + if (list.items.len >= @intFromEnum(limit)) return error.StreamTooLong; if (options.allocator) |gpa| try list.ensureUnusedCapacity(gpa, 1); const cap = list.unusedCapacitySlice(); if (cap.len == 0) return error.StreamTooLong; - vec[0] = cap; - read.* = .{ .file_read_streaming = .{ - .file = file, - .data = vec, - .nonblocking = true, - .result = undefined, - } }; + vecs[index][0] = cap; } - var all_done = true; - var any_canceled = false; - var other_err: (error{StreamTooLong} || Io.File.Reader.Error)!void = {}; - io.vtable.operate(io.userdata, &reads); - for (&reads, &lists, &limits, &dones) |*read, list, limit, *done| { - if (done.*) continue; - const n = read.file_read_streaming.result catch |err| switch (err) { - error.Canceled => { - any_canceled = true; - continue; - }, - error.WouldBlock => continue, - else => |e| { - other_err = e; - continue; - }, - }; - if (n == 0) { - done.* = true; - } else { - all_done = false; - } - list.items.len += n; - if (list.items.len > @intFromEnum(limit)) other_err = error.StreamTooLong; - } - if (any_canceled) return error.Canceled; - try other_err; - if (all_done) return; } }