Io: add ring to Batch API

This commit is contained in:
Jacob Young 2026-01-10 15:34:36 -05:00 committed by Andrew Kelley
parent 0a379513af
commit 8146ccfecc
5 changed files with 260 additions and 160 deletions

View file

@ -149,9 +149,8 @@ pub const VTable = struct {
futexWaitUncancelable: *const fn (?*anyopaque, ptr: *const u32, expected: u32) void,
futexWake: *const fn (?*anyopaque, ptr: *const u32, max_waiters: u32) void,
batch: *const fn (?*anyopaque, []Operation) ConcurrentError!void,
batchSubmit: *const fn (?*anyopaque, *Batch) void,
batchWait: *const fn (?*anyopaque, *Batch, resubmissions: []const usize, Timeout) Batch.WaitError!usize,
operate: *const fn (?*anyopaque, *Operation) Cancelable!void,
batchWait: *const fn (?*anyopaque, *Batch, Timeout) Batch.WaitError!void,
batchCancel: *const fn (?*anyopaque, *Batch) void,
dirCreateDir: *const fn (?*anyopaque, Dir, []const u8, Dir.Permissions) Dir.CreateDirError!void,
@ -261,48 +260,50 @@ pub const Operation = union(enum) {
pub const Noop = struct {
reserved: [2]usize = .{ 0, 0 },
status: Status(void) = .{ .result = {} },
status: Status(void) = .{ .unstarted = {} },
};
/// Returns 0 on end of stream.
pub const FileReadStreaming = struct {
file: File,
data: []const []u8,
status: Status(File.Reader.Error!usize) = .{ .unstarted = {} },
status: Status(Error!usize) = .{ .unstarted = {} },
pub const Error = error{
InputOutput,
SystemResources,
/// Trying to read a directory file descriptor as if it were a file.
IsDir,
BrokenPipe,
ConnectionResetByPeer,
/// File was not opened with read capability.
NotOpenForReading,
SocketUnconnected,
/// Non-blocking has been enabled, and reading from the file descriptor
/// would block.
WouldBlock,
/// In WASI, this error occurs when the file descriptor does
/// not hold the required rights to read from it.
AccessDenied,
/// Unable to read file due to lock. Depending on the `Io` implementation,
/// reading from a locked file may return this error, or may ignore the
/// lock.
LockViolation,
} || Io.UnexpectedError;
};
pub fn Status(Result: type) type {
return union {
unstarted: void,
pending: usize,
pending: *Batch,
result: Result,
};
}
};
/// Performs all `operations` in an unspecified order, concurrently.
///
/// Returns after all `operations` have been completed. If the operations could
/// not be completed concurrently, returns `error.ConcurrencyUnavailable`.
///
/// With this API, it is rare for concurrency to not be available. Even a
/// single-threaded `Io` implementation can, for example, take advantage of
/// poll() to implement this. Note that poll() is fallible however.
///
/// If `operations.len` is one, `error.ConcurrencyUnavailable` is unreachable.
///
/// On entry, all operations must already have `.status = .unstarted` except
/// noops must have `.status = .{ .result = {} }`, to safety check the state
/// transitions.
///
/// On return, all operations have `.status = .{ .result = ... }`.
pub fn batch(io: Io, operations: []Operation) ConcurrentError!void {
return io.vtable.batch(io.userdata, operations);
}
/// Performs one `Operation`.
pub fn operate(io: Io, operation: *Operation) void {
return io.vtable.batch(io.userdata, (operation)[0..1]) catch unreachable;
pub fn operate(io: Io, operation: *Operation) Cancelable!void {
return io.vtable.operate(io.userdata, operation) catch unreachable;
}
/// Submits many operations together without waiting for all of them to
@ -312,35 +313,107 @@ pub fn operate(io: Io, operation: *Operation) void {
/// level API that operates on `Future`, see `Select`.
pub const Batch = struct {
operations: []Operation,
index: usize,
reserved: ?*anyopaque,
ring: [*]u32,
user: struct {
submit_tail: RingIndex,
complete_head: RingIndex,
complete_tail: RingIndex,
},
impl: struct {
submit_head: RingIndex,
submit_tail: RingIndex,
complete_tail: RingIndex,
reserved: ?*anyopaque,
},
pub fn init(operations: []Operation) Batch {
return .{ .operations = operations, .index = 0, .reserved = null };
}
pub const RingIndex = enum(u32) {
_,
/// Submits all non-noop `operations`.
pub fn submit(b: *Batch, io: Io) void {
return io.vtable.batchSubmit(io.userdata, b);
}
pub fn index(ri: RingIndex, len: u31) u31 {
const i = @intFromEnum(ri);
assert(i < @as(u32, len) * 2);
return @intCast(if (i < len) i else i - len);
}
pub fn prev(ri: RingIndex, len: u31) RingIndex {
const i = @intFromEnum(ri);
const double_len = @as(u32, len) * 2;
assert(i <= double_len);
return @enumFromInt((if (i > 0) i else double_len) - 1);
}
pub fn next(ri: RingIndex, len: u31) RingIndex {
const i = @intFromEnum(ri) + 1;
const double_len = @as(u32, len) * 2;
assert(i <= double_len);
return @enumFromInt(if (i < double_len) i else 0);
}
};
pub const WaitError = ConcurrentError || Cancelable || Timeout.Error;
/// Resubmits the previously completed or noop-initialized `operations` at
/// indexes given by `resubmissions`. This set of indexes typically will be empty
/// on the first call to `await` since all operations have already been
/// submitted via `async`.
///
/// Returns the index of a completed `Operation`, or `operations.len` if
/// all operations are completed.
///
/// When `error.Canceled` is returned, all operations have already completed.
pub fn wait(b: *Batch, io: Io, resubmissions: []const usize, timeout: Timeout) WaitError!usize {
return io.vtable.batchWait(io.userdata, b, resubmissions, timeout);
pub fn init(operations: []Operation, ring: []u32) Batch {
const len: u31 = @intCast(operations.len);
assert(ring.len == len);
return .{
.operations = operations,
.ring = ring.ptr,
.user = .{
.submit_tail = @enumFromInt(0),
.complete_head = @enumFromInt(0),
.complete_tail = @enumFromInt(0),
},
.impl = .{
.submit_head = @enumFromInt(0),
.submit_tail = @enumFromInt(0),
.complete_tail = @enumFromInt(0),
.reserved = null,
},
};
}
/// Returns after all `operations` have completed. Each operation
/// independently may or may not have been canceled.
/// Adds `b.operations[operation]` to the list of submitted operations
/// that will be performed when `wait` is called.
pub fn add(b: *Batch, operation: usize) void {
const tail = b.user.submit_tail;
const len: u31 = @intCast(b.operations.len);
b.user.submit_tail = tail.next(len);
b.ring[0..len][tail.index(len)] = @intCast(operation);
}
fn flush(b: *Batch) void {
@atomicStore(RingIndex, &b.impl.submit_tail, b.user.submit_tail, .release);
}
/// Returns `operation` such that `b.operations[operation]` has completed.
/// Returns `null` when `wait` should be called.
pub fn next(b: *Batch) ?u32 {
const head = b.user.complete_head;
if (head == b.user.complete_tail) {
@branchHint(.unlikely);
b.flush();
const tail = @atomicLoad(RingIndex, &b.impl.complete_tail, .acquire);
if (head == tail) {
@branchHint(.unlikely);
return null;
}
assert(head != tail);
b.user.complete_tail = tail;
}
const len: u31 = @intCast(b.operations.len);
b.user.complete_head = head.next(len);
return b.ring[0..len][head.index(len)];
}
/// Starts work on any submitted operations and returns when at least one has completeed.
///
/// Returns `error.Timeout` if `timeout` expires first.
pub fn wait(b: *Batch, io: Io, timeout: Timeout) WaitError!void {
return io.vtable.batchWait(io.userdata, b, timeout);
}
/// Returns after all `operations` have completed. Operations which have not completed
/// after this function returns were successfully dropped and had no side effects.
pub fn cancel(b: *Batch, io: Io) void {
return io.vtable.batchCancel(io.userdata, b);
}

View file

@ -558,7 +558,7 @@ pub fn readStreaming(file: File, io: Io, buffer: []const []u8) Reader.Error!usiz
.file = file,
.data = buffer,
} };
io.operate(&operation);
try io.operate(&operation);
return operation.file_read_streaming.status.result;
}

View file

@ -26,27 +26,7 @@ size_err: ?SizeError = null,
seek_err: ?SeekError = null,
interface: Io.Reader,
pub const Error = error{
InputOutput,
SystemResources,
/// Trying to read a directory file descriptor as if it were a file.
IsDir,
BrokenPipe,
ConnectionResetByPeer,
/// File was not opened with read capability.
NotOpenForReading,
SocketUnconnected,
/// Non-blocking has been enabled, and reading from the file descriptor
/// would block.
WouldBlock,
/// In WASI, this error occurs when the file descriptor does
/// not hold the required rights to read from it.
AccessDenied,
/// Unable to read file due to lock. Depending on the `Io` implementation,
/// reading from a locked file may return this error, or may ignore the
/// lock.
LockViolation,
} || Io.Cancelable || Io.UnexpectedError;
pub const Error = Io.Operation.FileReadStreaming.Error || Io.Cancelable;
pub const SizeError = File.StatError || error{
/// Occurs if, for example, the file handle is a network socket and therefore does not have a size.

View file

@ -1587,8 +1587,7 @@ pub fn io(t: *Threaded) Io {
.futexWaitUncancelable = futexWaitUncancelable,
.futexWake = futexWake,
.batch = batch,
.batchSubmit = batchSubmit,
.operate = operate,
.batchWait = batchWait,
.batchCancel = batchCancel,
@ -1751,8 +1750,7 @@ pub fn ioBasic(t: *Threaded) Io {
.futexWaitUncancelable = futexWaitUncancelable,
.futexWake = futexWake,
.batch = batch,
.batchSubmit = batchSubmit,
.operate = operate,
.batchWait = batchWait,
.batchCancel = batchCancel,
@ -2456,59 +2454,82 @@ fn futexWake(userdata: ?*anyopaque, ptr: *const u32, max_waiters: u32) void {
Thread.futexWake(ptr, max_waiters);
}
fn batchSubmit(userdata: ?*anyopaque, b: *Io.Batch) void {
fn operate(userdata: ?*anyopaque, op: *Io.Operation) Io.Cancelable!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
_ = b;
return;
}
fn operate(op: *Io.Operation) void {
switch (op.*) {
.noop => {},
.file_read_streaming => |*o| o.status = .{ .result = fileReadStreaming(o.file, o.data) },
.noop => |*o| {
_ = o.status.unstarted;
o.status = .{ .result = {} };
},
.file_read_streaming => |*o| {
_ = o.status.unstarted;
o.status = .{ .result = fileReadStreaming(o.file, o.data) catch |err| switch (err) {
error.Canceled => return error.Canceled,
else => |e| e,
} };
},
}
}
fn batchWait(
userdata: ?*anyopaque,
b: *Io.Batch,
resubmissions: []const usize,
timeout: Io.Timeout,
) Io.Batch.WaitError!usize {
_ = resubmissions;
fn batchWait(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.WaitError!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const operations = b.operations;
if (operations.len == 1) {
operate(&operations[0]);
return b.operations.len;
const len: u31 = @intCast(operations.len);
const ring = b.ring[0..len];
var submit_head = b.impl.submit_head;
const submit_tail = b.user.submit_tail;
b.impl.submit_tail = submit_tail;
var complete_tail = b.impl.complete_tail;
var map_buffer: [poll_buffer_len]u32 = undefined; // poll_buffer index to operations index
var poll_i: usize = 0;
defer {
for (map_buffer[0..poll_i]) |op| {
submit_head = submit_head.prev(len);
ring[submit_head.index(len)] = op;
}
b.impl.submit_head = submit_head;
b.impl.complete_tail = complete_tail;
b.user.complete_tail = complete_tail;
}
if (is_windows) @panic("TODO");
var poll_buffer: [poll_buffer_len]posix.pollfd = undefined;
var map_buffer: [poll_buffer_len]u8 = undefined; // poll_buffer index to operations index
var poll_i: usize = 0;
for (operations, 0..) |*op, operation_index| switch (op.*) {
.noop => continue,
.file_read_streaming => |*o| {
if (poll_buffer.len - poll_i == 0) return error.ConcurrencyUnavailable;
poll_buffer[poll_i] = .{
.fd = o.file.handle,
.events = posix.POLL.IN,
.revents = 0,
};
map_buffer[poll_i] = @intCast(operation_index);
poll_i += 1;
while (submit_head != submit_tail) : (submit_head = submit_head.next(len)) {
const op = ring[submit_head.index(len)];
const operation = &operations[op];
switch (operation.*) {
else => {
try operate(t, operation);
ring[complete_tail.index(len)] = op;
complete_tail = complete_tail.next(len);
},
.file_read_streaming => |*o| {
_ = o.status.unstarted;
if (poll_buffer.len - poll_i == 0) return error.ConcurrencyUnavailable;
poll_buffer[poll_i] = .{
.fd = o.file.handle,
.events = posix.POLL.IN,
.revents = 0,
};
map_buffer[poll_i] = op;
poll_i += 1;
},
}
}
switch (poll_i) {
0 => return,
1 => if (timeout == .none) {
const op = map_buffer[0];
try operate(t, &operations[op]);
ring[complete_tail.index(len)] = op;
complete_tail = complete_tail.next(len);
return;
},
};
if (poll_i == 0) return operations.len;
else => {},
}
const t_io = ioBasic(t);
const deadline = timeout.toDeadline(t_io) catch return error.UnsupportedClock;
const max_poll_ms = std.math.maxInt(i32);
while (true) {
const timeout_ms: i32 = if (deadline) |d| t: {
const duration = d.durationFromNow(t_io) catch return error.UnsupportedClock;
@ -2526,11 +2547,24 @@ fn batchWait(
if (deadline == null) continue;
return error.Timeout;
}
for (poll_buffer[0..poll_i], map_buffer[0..poll_i]) |*poll_fd, i| {
if (poll_fd.revents == 0) continue;
operate(&operations[i]);
return i;
var canceled = false;
for (poll_buffer[0..poll_i], map_buffer[0..poll_i]) |*poll_fd, op| {
if (poll_fd.revents == 0) {
submit_head = submit_head.prev(len);
ring[submit_head.index(len)] = op;
} else {
operate(t, &operations[op]) catch |err| switch (err) {
error.Canceled => {
canceled = true;
continue;
},
};
ring[complete_tail.index(len)] = op;
complete_tail = complete_tail.next(len);
}
}
poll_i = 0;
return if (canceled) error.Canceled;
},
.INTR => continue,
else => return error.ConcurrencyUnavailable,
@ -2540,9 +2574,27 @@ fn batchWait(
fn batchCancel(userdata: ?*anyopaque, b: *Io.Batch) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
_ = b;
return;
const operations = b.operations;
const len: u31 = @intCast(operations.len);
const ring = b.ring[0..len];
var submit_head = b.impl.submit_head;
const submit_tail = b.user.submit_tail;
b.impl.submit_tail = submit_tail;
var complete_tail = b.impl.complete_tail;
while (submit_head != submit_tail) : (submit_head = submit_head.next(len)) {
const op = ring[submit_head.index(len)];
switch (operations[op]) {
.noop => {
operate(t, &operations[op]) catch unreachable;
ring[complete_tail.index(len)] = op;
complete_tail = complete_tail.next(len);
},
.file_read_streaming => |*o| _ = o.status.unstarted,
}
}
b.impl.submit_head = submit_tail;
b.impl.complete_tail = complete_tail;
b.user.complete_tail = complete_tail;
}
fn batch(userdata: ?*anyopaque, operations: []Io.Operation) Io.ConcurrentError!void {
@ -10352,6 +10404,7 @@ fn nowWasi(clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
if (timeout == .none) return;
if (use_parking_sleep) return parking_sleep.sleep(try timeout.toDeadline(ioBasic(t)));
if (native_os == .wasi) return sleepWasi(t, timeout);
if (@TypeOf(posix.system.clock_nanosleep) != void) return sleepPosix(timeout);

View file

@ -149,51 +149,45 @@ pub fn collectOutput(child: *const Child, io: Io, options: CollectOutputOptions)
const files: [2]Io.File = .{ child.stdout.?, child.stderr.? };
const lists: [2]*std.ArrayList(u8) = .{ options.stdout, options.stderr };
const limits: [2]Io.Limit = .{ options.stdout_limit, options.stderr_limit };
var dones: [2]bool = .{ false, false };
var reads: [2]Io.Operation = undefined;
var vecs: [2][1][]u8 = undefined;
while (true) {
for (&reads, &lists, &files, dones, &vecs) |*read, list, file, done, *vec| {
if (done) {
read.* = .{ .noop = .{} };
continue;
}
if (options.allocator) |gpa| try list.ensureUnusedCapacity(gpa, 1);
const cap = list.unusedCapacitySlice();
if (cap.len == 0) return error.StreamTooLong;
vec[0] = cap;
read.* = .{ .file_read_streaming = .{
.file = file,
.data = vec,
} };
var ring: [2]u32 = undefined;
var batch: Io.Batch = .init(&reads, &ring);
defer {
batch.cancel(io);
while (batch.next()) |op| {
lists[op].items.len += reads[op].file_read_streaming.status.result catch continue;
}
var all_done = true;
var any_canceled = false;
var other_err: (error{StreamTooLong} || Io.File.Reader.Error)!void = {};
try io.vtable.batch(io.userdata, &reads);
for (&reads, &lists, &limits, &dones) |*read, list, limit, *done| {
if (done.*) continue;
const n = read.file_read_streaming.status.result catch |err| switch (err) {
error.Canceled => {
any_canceled = true;
continue;
},
error.WouldBlock => continue,
else => |e| {
other_err = e;
continue;
},
};
}
var remaining: usize = 0;
for (0.., &reads, &lists, &files, &vecs) |op, *read, list, file, *vec| {
if (options.allocator) |gpa| try list.ensureUnusedCapacity(gpa, 1);
const cap = list.unusedCapacitySlice();
if (cap.len == 0) return error.StreamTooLong;
vec[0] = cap;
read.* = .{ .file_read_streaming = .{
.file = file,
.data = vec,
} };
batch.add(op);
remaining += 1;
}
while (remaining > 0) {
try batch.wait(io, .none);
while (batch.next()) |op| {
const n = try reads[op].file_read_streaming.status.result;
if (n == 0) {
done.* = true;
remaining -= 1;
} else {
all_done = false;
lists[op].items.len += n;
if (lists[op].items.len > @intFromEnum(limits[op])) return error.StreamTooLong;
if (options.allocator) |gpa| try lists[op].ensureUnusedCapacity(gpa, 1);
const cap = lists[op].unusedCapacitySlice();
if (cap.len == 0) return error.StreamTooLong;
vecs[op][0] = cap;
reads[op].file_read_streaming.status = .{ .unstarted = {} };
batch.add(op);
}
list.items.len += n;
if (list.items.len > @intFromEnum(limit)) other_err = error.StreamTooLong;
}
if (any_canceled) return error.Canceled;
try other_err;
if (all_done) return;
}
}