diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 2f34dc07e4..b503979fda 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -149,9 +149,8 @@ pub const VTable = struct { futexWaitUncancelable: *const fn (?*anyopaque, ptr: *const u32, expected: u32) void, futexWake: *const fn (?*anyopaque, ptr: *const u32, max_waiters: u32) void, - batch: *const fn (?*anyopaque, []Operation) ConcurrentError!void, - batchSubmit: *const fn (?*anyopaque, *Batch) void, - batchWait: *const fn (?*anyopaque, *Batch, resubmissions: []const usize, Timeout) Batch.WaitError!usize, + operate: *const fn (?*anyopaque, *Operation) Cancelable!void, + batchWait: *const fn (?*anyopaque, *Batch, Timeout) Batch.WaitError!void, batchCancel: *const fn (?*anyopaque, *Batch) void, dirCreateDir: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions) Dir.CreateDirError!void, @@ -261,48 +260,50 @@ pub const Operation = union(enum) { pub const Noop = struct { reserved: [2]usize = .{ 0, 0 }, - status: Status(void) = .{ .result = {} }, + status: Status(void) = .{ .unstarted = {} }, }; /// Returns 0 on end of stream. pub const FileReadStreaming = struct { file: File, data: []const []u8, - status: Status(File.Reader.Error!usize) = .{ .unstarted = {} }, + status: Status(Error!usize) = .{ .unstarted = {} }, + + pub const Error = error{ + InputOutput, + SystemResources, + /// Trying to read a directory file descriptor as if it were a file. + IsDir, + BrokenPipe, + ConnectionResetByPeer, + /// File was not opened with read capability. + NotOpenForReading, + SocketUnconnected, + /// Non-blocking has been enabled, and reading from the file descriptor + /// would block. + WouldBlock, + /// In WASI, this error occurs when the file descriptor does + /// not hold the required rights to read from it. + AccessDenied, + /// Unable to read file due to lock. Depending on the `Io` implementation, + /// reading from a locked file may return this error, or may ignore the + /// lock. + LockViolation, + } || Io.UnexpectedError; }; pub fn Status(Result: type) type { return union { unstarted: void, - pending: usize, + pending: *Batch, result: Result, }; } }; -/// Performs all `operations` in an unspecified order, concurrently. -/// -/// Returns after all `operations` have been completed. If the operations could -/// not be completed concurrently, returns `error.ConcurrencyUnavailable`. -/// -/// With this API, it is rare for concurrency to not be available. Even a -/// single-threaded `Io` implementation can, for example, take advantage of -/// poll() to implement this. Note that poll() is fallible however. -/// -/// If `operations.len` is one, `error.ConcurrencyUnavailable` is unreachable. -/// -/// On entry, all operations must already have `.status = .unstarted` except -/// noops must have `.status = .{ .result = {} }`, to safety check the state -/// transitions. -/// -/// On return, all operations have `.status = .{ .result = ... }`. -pub fn batch(io: Io, operations: []Operation) ConcurrentError!void { - return io.vtable.batch(io.userdata, operations); -} - /// Performs one `Operation`. -pub fn operate(io: Io, operation: *Operation) void { - return io.vtable.batch(io.userdata, (operation)[0..1]) catch unreachable; +pub fn operate(io: Io, operation: *Operation) Cancelable!void { + return io.vtable.operate(io.userdata, operation) catch unreachable; } /// Submits many operations together without waiting for all of them to @@ -312,35 +313,107 @@ pub fn operate(io: Io, operation: *Operation) void { /// level API that operates on `Future`, see `Select`. pub const Batch = struct { operations: []Operation, - index: usize, - reserved: ?*anyopaque, + ring: [*]u32, + user: struct { + submit_tail: RingIndex, + complete_head: RingIndex, + complete_tail: RingIndex, + }, + impl: struct { + submit_head: RingIndex, + submit_tail: RingIndex, + complete_tail: RingIndex, + reserved: ?*anyopaque, + }, - pub fn init(operations: []Operation) Batch { - return .{ .operations = operations, .index = 0, .reserved = null }; - } + pub const RingIndex = enum(u32) { + _, - /// Submits all non-noop `operations`. - pub fn submit(b: *Batch, io: Io) void { - return io.vtable.batchSubmit(io.userdata, b); - } + pub fn index(ri: RingIndex, len: u31) u31 { + const i = @intFromEnum(ri); + assert(i < @as(u32, len) * 2); + return @intCast(if (i < len) i else i - len); + } + + pub fn prev(ri: RingIndex, len: u31) RingIndex { + const i = @intFromEnum(ri); + const double_len = @as(u32, len) * 2; + assert(i <= double_len); + return @enumFromInt((if (i > 0) i else double_len) - 1); + } + + pub fn next(ri: RingIndex, len: u31) RingIndex { + const i = @intFromEnum(ri) + 1; + const double_len = @as(u32, len) * 2; + assert(i <= double_len); + return @enumFromInt(if (i < double_len) i else 0); + } + }; pub const WaitError = ConcurrentError || Cancelable || Timeout.Error; - /// Resubmits the previously completed or noop-initialized `operations` at - /// indexes given by `resubmissions`. This set of indexes typically will be empty - /// on the first call to `await` since all operations have already been - /// submitted via `async`. - /// - /// Returns the index of a completed `Operation`, or `operations.len` if - /// all operations are completed. - /// - /// When `error.Canceled` is returned, all operations have already completed. - pub fn wait(b: *Batch, io: Io, resubmissions: []const usize, timeout: Timeout) WaitError!usize { - return io.vtable.batchWait(io.userdata, b, resubmissions, timeout); + pub fn init(operations: []Operation, ring: []u32) Batch { + const len: u31 = @intCast(operations.len); + assert(ring.len == len); + return .{ + .operations = operations, + .ring = ring.ptr, + .user = .{ + .submit_tail = @enumFromInt(0), + .complete_head = @enumFromInt(0), + .complete_tail = @enumFromInt(0), + }, + .impl = .{ + .submit_head = @enumFromInt(0), + .submit_tail = @enumFromInt(0), + .complete_tail = @enumFromInt(0), + .reserved = null, + }, + }; } - /// Returns after all `operations` have completed. Each operation - /// independently may or may not have been canceled. + /// Adds `b.operations[operation]` to the list of submitted operations + /// that will be performed when `wait` is called. + pub fn add(b: *Batch, operation: usize) void { + const tail = b.user.submit_tail; + const len: u31 = @intCast(b.operations.len); + b.user.submit_tail = tail.next(len); + b.ring[0..len][tail.index(len)] = @intCast(operation); + } + + fn flush(b: *Batch) void { + @atomicStore(RingIndex, &b.impl.submit_tail, b.user.submit_tail, .release); + } + + /// Returns `operation` such that `b.operations[operation]` has completed. + /// Returns `null` when `wait` should be called. + pub fn next(b: *Batch) ?u32 { + const head = b.user.complete_head; + if (head == b.user.complete_tail) { + @branchHint(.unlikely); + b.flush(); + const tail = @atomicLoad(RingIndex, &b.impl.complete_tail, .acquire); + if (head == tail) { + @branchHint(.unlikely); + return null; + } + assert(head != tail); + b.user.complete_tail = tail; + } + const len: u31 = @intCast(b.operations.len); + b.user.complete_head = head.next(len); + return b.ring[0..len][head.index(len)]; + } + + /// Starts work on any submitted operations and returns when at least one has completeed. + /// + /// Returns `error.Timeout` if `timeout` expires first. + pub fn wait(b: *Batch, io: Io, timeout: Timeout) WaitError!void { + return io.vtable.batchWait(io.userdata, b, timeout); + } + + /// Returns after all `operations` have completed. Operations which have not completed + /// after this function returns were successfully dropped and had no side effects. pub fn cancel(b: *Batch, io: Io) void { return io.vtable.batchCancel(io.userdata, b); } diff --git a/lib/std/Io/File.zig b/lib/std/Io/File.zig index f27f249975..cc7042d443 100644 --- a/lib/std/Io/File.zig +++ b/lib/std/Io/File.zig @@ -558,7 +558,7 @@ pub fn readStreaming(file: File, io: Io, buffer: []const []u8) Reader.Error!usiz .file = file, .data = buffer, } }; - io.operate(&operation); + try io.operate(&operation); return operation.file_read_streaming.status.result; } diff --git a/lib/std/Io/File/Reader.zig b/lib/std/Io/File/Reader.zig index d3d1c05e3f..7703521d7e 100644 --- a/lib/std/Io/File/Reader.zig +++ b/lib/std/Io/File/Reader.zig @@ -26,27 +26,7 @@ size_err: ?SizeError = null, seek_err: ?SeekError = null, interface: Io.Reader, -pub const Error = error{ - InputOutput, - SystemResources, - /// Trying to read a directory file descriptor as if it were a file. - IsDir, - BrokenPipe, - ConnectionResetByPeer, - /// File was not opened with read capability. - NotOpenForReading, - SocketUnconnected, - /// Non-blocking has been enabled, and reading from the file descriptor - /// would block. - WouldBlock, - /// In WASI, this error occurs when the file descriptor does - /// not hold the required rights to read from it. - AccessDenied, - /// Unable to read file due to lock. Depending on the `Io` implementation, - /// reading from a locked file may return this error, or may ignore the - /// lock. - LockViolation, -} || Io.Cancelable || Io.UnexpectedError; +pub const Error = Io.Operation.FileReadStreaming.Error || Io.Cancelable; pub const SizeError = File.StatError || error{ /// Occurs if, for example, the file handle is a network socket and therefore does not have a size. diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index f7b4b73b33..fefa8fe84d 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -1587,8 +1587,7 @@ pub fn io(t: *Threaded) Io { .futexWaitUncancelable = futexWaitUncancelable, .futexWake = futexWake, - .batch = batch, - .batchSubmit = batchSubmit, + .operate = operate, .batchWait = batchWait, .batchCancel = batchCancel, @@ -1751,8 +1750,7 @@ pub fn ioBasic(t: *Threaded) Io { .futexWaitUncancelable = futexWaitUncancelable, .futexWake = futexWake, - .batch = batch, - .batchSubmit = batchSubmit, + .operate = operate, .batchWait = batchWait, .batchCancel = batchCancel, @@ -2456,59 +2454,82 @@ fn futexWake(userdata: ?*anyopaque, ptr: *const u32, max_waiters: u32) void { Thread.futexWake(ptr, max_waiters); } -fn batchSubmit(userdata: ?*anyopaque, b: *Io.Batch) void { +fn operate(userdata: ?*anyopaque, op: *Io.Operation) Io.Cancelable!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; - _ = b; - return; -} - -fn operate(op: *Io.Operation) void { switch (op.*) { - .noop => {}, - .file_read_streaming => |*o| o.status = .{ .result = fileReadStreaming(o.file, o.data) }, + .noop => |*o| { + _ = o.status.unstarted; + o.status = .{ .result = {} }; + }, + .file_read_streaming => |*o| { + _ = o.status.unstarted; + o.status = .{ .result = fileReadStreaming(o.file, o.data) catch |err| switch (err) { + error.Canceled => return error.Canceled, + else => |e| e, + } }; + }, } } -fn batchWait( - userdata: ?*anyopaque, - b: *Io.Batch, - resubmissions: []const usize, - timeout: Io.Timeout, -) Io.Batch.WaitError!usize { - _ = resubmissions; +fn batchWait(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.WaitError!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); const operations = b.operations; - if (operations.len == 1) { - operate(&operations[0]); - return b.operations.len; + const len: u31 = @intCast(operations.len); + const ring = b.ring[0..len]; + var submit_head = b.impl.submit_head; + const submit_tail = b.user.submit_tail; + b.impl.submit_tail = submit_tail; + var complete_tail = b.impl.complete_tail; + var map_buffer: [poll_buffer_len]u32 = undefined; // poll_buffer index to operations index + var poll_i: usize = 0; + defer { + for (map_buffer[0..poll_i]) |op| { + submit_head = submit_head.prev(len); + ring[submit_head.index(len)] = op; + } + b.impl.submit_head = submit_head; + b.impl.complete_tail = complete_tail; + b.user.complete_tail = complete_tail; } if (is_windows) @panic("TODO"); - var poll_buffer: [poll_buffer_len]posix.pollfd = undefined; - var map_buffer: [poll_buffer_len]u8 = undefined; // poll_buffer index to operations index - var poll_i: usize = 0; - - for (operations, 0..) |*op, operation_index| switch (op.*) { - .noop => continue, - .file_read_streaming => |*o| { - if (poll_buffer.len - poll_i == 0) return error.ConcurrencyUnavailable; - poll_buffer[poll_i] = .{ - .fd = o.file.handle, - .events = posix.POLL.IN, - .revents = 0, - }; - map_buffer[poll_i] = @intCast(operation_index); - poll_i += 1; + while (submit_head != submit_tail) : (submit_head = submit_head.next(len)) { + const op = ring[submit_head.index(len)]; + const operation = &operations[op]; + switch (operation.*) { + else => { + try operate(t, operation); + ring[complete_tail.index(len)] = op; + complete_tail = complete_tail.next(len); + }, + .file_read_streaming => |*o| { + _ = o.status.unstarted; + if (poll_buffer.len - poll_i == 0) return error.ConcurrencyUnavailable; + poll_buffer[poll_i] = .{ + .fd = o.file.handle, + .events = posix.POLL.IN, + .revents = 0, + }; + map_buffer[poll_i] = op; + poll_i += 1; + }, + } + } + switch (poll_i) { + 0 => return, + 1 => if (timeout == .none) { + const op = map_buffer[0]; + try operate(t, &operations[op]); + ring[complete_tail.index(len)] = op; + complete_tail = complete_tail.next(len); + return; }, - }; - - if (poll_i == 0) return operations.len; - + else => {}, + } const t_io = ioBasic(t); const deadline = timeout.toDeadline(t_io) catch return error.UnsupportedClock; const max_poll_ms = std.math.maxInt(i32); - while (true) { const timeout_ms: i32 = if (deadline) |d| t: { const duration = d.durationFromNow(t_io) catch return error.UnsupportedClock; @@ -2526,11 +2547,24 @@ fn batchWait( if (deadline == null) continue; return error.Timeout; } - for (poll_buffer[0..poll_i], map_buffer[0..poll_i]) |*poll_fd, i| { - if (poll_fd.revents == 0) continue; - operate(&operations[i]); - return i; + var canceled = false; + for (poll_buffer[0..poll_i], map_buffer[0..poll_i]) |*poll_fd, op| { + if (poll_fd.revents == 0) { + submit_head = submit_head.prev(len); + ring[submit_head.index(len)] = op; + } else { + operate(t, &operations[op]) catch |err| switch (err) { + error.Canceled => { + canceled = true; + continue; + }, + }; + ring[complete_tail.index(len)] = op; + complete_tail = complete_tail.next(len); + } } + poll_i = 0; + return if (canceled) error.Canceled; }, .INTR => continue, else => return error.ConcurrencyUnavailable, @@ -2540,9 +2574,27 @@ fn batchWait( fn batchCancel(userdata: ?*anyopaque, b: *Io.Batch) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); - _ = t; - _ = b; - return; + const operations = b.operations; + const len: u31 = @intCast(operations.len); + const ring = b.ring[0..len]; + var submit_head = b.impl.submit_head; + const submit_tail = b.user.submit_tail; + b.impl.submit_tail = submit_tail; + var complete_tail = b.impl.complete_tail; + while (submit_head != submit_tail) : (submit_head = submit_head.next(len)) { + const op = ring[submit_head.index(len)]; + switch (operations[op]) { + .noop => { + operate(t, &operations[op]) catch unreachable; + ring[complete_tail.index(len)] = op; + complete_tail = complete_tail.next(len); + }, + .file_read_streaming => |*o| _ = o.status.unstarted, + } + } + b.impl.submit_head = submit_tail; + b.impl.complete_tail = complete_tail; + b.user.complete_tail = complete_tail; } fn batch(userdata: ?*anyopaque, operations: []Io.Operation) Io.ConcurrentError!void { @@ -10352,6 +10404,7 @@ fn nowWasi(clock: Io.Clock) Io.Clock.Error!Io.Timestamp { fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); + if (timeout == .none) return; if (use_parking_sleep) return parking_sleep.sleep(try timeout.toDeadline(ioBasic(t))); if (native_os == .wasi) return sleepWasi(t, timeout); if (@TypeOf(posix.system.clock_nanosleep) != void) return sleepPosix(timeout); diff --git a/lib/std/process/Child.zig b/lib/std/process/Child.zig index 364c52446f..19c974ff9f 100644 --- a/lib/std/process/Child.zig +++ b/lib/std/process/Child.zig @@ -149,51 +149,45 @@ pub fn collectOutput(child: *const Child, io: Io, options: CollectOutputOptions) const files: [2]Io.File = .{ child.stdout.?, child.stderr.? }; const lists: [2]*std.ArrayList(u8) = .{ options.stdout, options.stderr }; const limits: [2]Io.Limit = .{ options.stdout_limit, options.stderr_limit }; - var dones: [2]bool = .{ false, false }; var reads: [2]Io.Operation = undefined; var vecs: [2][1][]u8 = undefined; - while (true) { - for (&reads, &lists, &files, dones, &vecs) |*read, list, file, done, *vec| { - if (done) { - read.* = .{ .noop = .{} }; - continue; - } - if (options.allocator) |gpa| try list.ensureUnusedCapacity(gpa, 1); - const cap = list.unusedCapacitySlice(); - if (cap.len == 0) return error.StreamTooLong; - vec[0] = cap; - read.* = .{ .file_read_streaming = .{ - .file = file, - .data = vec, - } }; + var ring: [2]u32 = undefined; + var batch: Io.Batch = .init(&reads, &ring); + defer { + batch.cancel(io); + while (batch.next()) |op| { + lists[op].items.len += reads[op].file_read_streaming.status.result catch continue; } - var all_done = true; - var any_canceled = false; - var other_err: (error{StreamTooLong} || Io.File.Reader.Error)!void = {}; - try io.vtable.batch(io.userdata, &reads); - for (&reads, &lists, &limits, &dones) |*read, list, limit, *done| { - if (done.*) continue; - const n = read.file_read_streaming.status.result catch |err| switch (err) { - error.Canceled => { - any_canceled = true; - continue; - }, - error.WouldBlock => continue, - else => |e| { - other_err = e; - continue; - }, - }; + } + var remaining: usize = 0; + for (0.., &reads, &lists, &files, &vecs) |op, *read, list, file, *vec| { + if (options.allocator) |gpa| try list.ensureUnusedCapacity(gpa, 1); + const cap = list.unusedCapacitySlice(); + if (cap.len == 0) return error.StreamTooLong; + vec[0] = cap; + read.* = .{ .file_read_streaming = .{ + .file = file, + .data = vec, + } }; + batch.add(op); + remaining += 1; + } + while (remaining > 0) { + try batch.wait(io, .none); + while (batch.next()) |op| { + const n = try reads[op].file_read_streaming.status.result; if (n == 0) { - done.* = true; + remaining -= 1; } else { - all_done = false; + lists[op].items.len += n; + if (lists[op].items.len > @intFromEnum(limits[op])) return error.StreamTooLong; + if (options.allocator) |gpa| try lists[op].ensureUnusedCapacity(gpa, 1); + const cap = lists[op].unusedCapacitySlice(); + if (cap.len == 0) return error.StreamTooLong; + vecs[op][0] = cap; + reads[op].file_read_streaming.status = .{ .unstarted = {} }; + batch.add(op); } - list.items.len += n; - if (list.items.len > @intFromEnum(limit)) other_err = error.StreamTooLong; } - if (any_canceled) return error.Canceled; - try other_err; - if (all_done) return; } }