std.Io: add test for batchAwaitAsync

and make it always work for all targets including WASI

This function guarantees no additional failure modes introduced.
This commit is contained in:
Andrew Kelley 2026-01-31 14:58:02 -08:00
parent e60ba21114
commit 3abc96a601
3 changed files with 175 additions and 97 deletions

View file

@ -369,7 +369,9 @@ pub const Batch = struct {
context: ?*anyopaque,
/// After calling this, it is safe to unconditionally defer a call to
/// `cancel`.
/// `cancel`. `storage` is a pre-allocated buffer of undefined memory that
/// determines the maximum number of active operations that can be
/// submitted via `add` and `addAt`.
pub fn init(storage: []Operation.Storage) Batch {
var prev: Operation.OptionalIndex = .none;
for (storage, 0..) |*operation, index| {
@ -422,12 +424,20 @@ pub const Batch = struct {
b.submissions.tail = .fromIndex(index);
}
pub const Completion = struct {
/// The element within the provided operation storage that completed.
/// `addAt` can be used to re-arm the `Batch` using this `index`.
index: u32,
/// The return value of the operation.
result: Operation.Result,
};
/// After calling `awaitAsync`, `awaitConcurrent`, or `cancel`, this
/// function iterates over the completed operations.
///
/// Each completion returned from this function dequeues from the `Batch`.
/// It is not required to dequeue all completions before awaiting again.
pub fn next(b: *Batch) ?struct { index: u32, result: Operation.Result } {
pub fn next(b: *Batch) ?Completion {
const index = b.completions.head;
if (index == .none) return null;
const storage = &b.storage[index.toIndex()];

View file

@ -1938,6 +1938,10 @@ const have_mmap = switch (native_os) {
.wasi, .windows => false,
else => true,
};
const have_poll = switch (native_os) {
.wasi, .windows => false,
else => true,
};
const open_sym = if (posix.lfs64_abi) posix.system.open64 else posix.system.open;
const openat_sym = if (posix.lfs64_abi) posix.system.openat64 else posix.system.openat;
@ -2507,104 +2511,104 @@ fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Cancelable!void {
alertable_syscall.finish();
return;
}
if (native_os == .wasi and !builtin.link_libc) @panic("TODO");
var poll_buffer: [poll_buffer_len]posix.pollfd = undefined;
var poll_len: u32 = 0;
{
var index = b.submissions.head;
while (index != .none and poll_len < poll_buffer_len) {
const submission = &b.storage[index.toIndex()].submission;
switch (submission.operation) {
.file_read_streaming => |o| {
poll_buffer[poll_len] = .{ .fd = o.file.handle, .events = posix.POLL.IN, .revents = 0 };
poll_len += 1;
},
}
index = submission.node.next;
}
}
switch (poll_len) {
0 => return,
1 => {},
else => while (true) {
const timeout_ms: i32 = t: {
if (b.completions.head != .none) {
// It is legal to call batchWait with already completed
// operations in the ring. In such case, we need to avoid
// blocking in the poll syscall, but we can still take this
// opportunity to find additional ready operations.
break :t 0;
if (have_poll) {
var poll_buffer: [poll_buffer_len]posix.pollfd = undefined;
var poll_len: u32 = 0;
{
var index = b.submissions.head;
while (index != .none and poll_len < poll_buffer_len) {
const submission = &b.storage[index.toIndex()].submission;
switch (submission.operation) {
.file_read_streaming => |o| {
poll_buffer[poll_len] = .{ .fd = o.file.handle, .events = posix.POLL.IN, .revents = 0 };
poll_len += 1;
},
}
const max_poll_ms = std.math.maxInt(i32);
break :t max_poll_ms;
};
const syscall = try Syscall.start();
const rc = posix.system.poll(&poll_buffer, poll_len, timeout_ms);
syscall.finish();
switch (posix.errno(rc)) {
.SUCCESS => {
if (rc == 0) {
if (b.completions.head != .none) {
// Since there are already completions available in the
// queue, this is neither a timeout nor a case for
// retrying.
return;
}
continue;
}
var prev_index: Io.Operation.OptionalIndex = .none;
var index = b.submissions.head;
for (poll_buffer[0..poll_len]) |poll_entry| {
const storage = &b.storage[index.toIndex()];
const submission = &storage.submission;
const next_index = submission.node.next;
if (poll_entry.revents != 0) {
const result = try operate(t, submission.operation);
switch (prev_index) {
.none => b.submissions.head = next_index,
else => b.storage[prev_index.toIndex()].submission.node.next = next_index,
}
if (next_index == .none) b.submissions.tail = prev_index;
switch (b.completions.tail) {
.none => b.completions.head = index,
else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = index,
}
storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
b.completions.tail = index;
} else prev_index = index;
index = next_index;
}
assert(index == .none);
return;
},
.INTR => continue,
else => break,
index = submission.node.next;
}
},
}
{
var tail_index = b.completions.tail;
defer b.completions.tail = tail_index;
var index = b.submissions.head;
errdefer b.submissions.head = index;
while (index != .none) {
const storage = &b.storage[index.toIndex()];
const submission = &storage.submission;
const next_index = submission.node.next;
const result = try operate(t, submission.operation);
switch (tail_index) {
.none => b.completions.head = index,
else => b.storage[tail_index.toIndex()].completion.node.next = index,
}
storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
tail_index = index;
index = next_index;
}
b.submissions = .{ .head = .none, .tail = .none };
switch (poll_len) {
0 => return,
1 => {},
else => while (true) {
const timeout_ms: i32 = t: {
if (b.completions.head != .none) {
// It is legal to call batchWait with already completed
// operations in the ring. In such case, we need to avoid
// blocking in the poll syscall, but we can still take this
// opportunity to find additional ready operations.
break :t 0;
}
const max_poll_ms = std.math.maxInt(i32);
break :t max_poll_ms;
};
const syscall = try Syscall.start();
const rc = posix.system.poll(&poll_buffer, poll_len, timeout_ms);
syscall.finish();
switch (posix.errno(rc)) {
.SUCCESS => {
if (rc == 0) {
if (b.completions.head != .none) {
// Since there are already completions available in the
// queue, this is neither a timeout nor a case for
// retrying.
return;
}
continue;
}
var prev_index: Io.Operation.OptionalIndex = .none;
var index = b.submissions.head;
for (poll_buffer[0..poll_len]) |poll_entry| {
const storage = &b.storage[index.toIndex()];
const submission = &storage.submission;
const next_index = submission.node.next;
if (poll_entry.revents != 0) {
const result = try operate(t, submission.operation);
switch (prev_index) {
.none => b.submissions.head = next_index,
else => b.storage[prev_index.toIndex()].submission.node.next = next_index,
}
if (next_index == .none) b.submissions.tail = prev_index;
switch (b.completions.tail) {
.none => b.completions.head = index,
else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = index,
}
storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
b.completions.tail = index;
} else prev_index = index;
index = next_index;
}
assert(index == .none);
return;
},
.INTR => continue,
else => break,
}
},
}
}
var tail_index = b.completions.tail;
defer b.completions.tail = tail_index;
var index = b.submissions.head;
errdefer b.submissions.head = index;
while (index != .none) {
const storage = &b.storage[index.toIndex()];
const submission = &storage.submission;
const next_index = submission.node.next;
const result = try operate(t, submission.operation);
switch (tail_index) {
.none => b.completions.head = index,
else => b.storage[tail_index.toIndex()].completion.node.next = index,
}
storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
tail_index = index;
index = next_index;
}
b.submissions = .{ .head = .none, .tail = .none };
}
fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.AwaitConcurrentError!void {
@ -2644,7 +2648,11 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout
}
return;
}
if (native_os == .wasi and !builtin.link_libc) @panic("TODO");
if (native_os == .wasi) {
// TODO call poll_oneoff
return error.ConcurrencyUnavailable;
}
if (!have_poll) return error.ConcurrencyUnavailable;
var poll_buffer: [poll_buffer_len]posix.pollfd = undefined;
var poll_storage: struct {
gpa: std.mem.Allocator,

View file

@ -656,3 +656,63 @@ test "memory mapping" {
try expectEqualStrings("this9is9my data123\x00\x00", mm.memory[0.."this9is9my data123\x00\x00".len]);
}
}
test "read from a file using Batch.awaitAsync API" {
const io = testing.io;
var tmp = tmpDir(.{});
defer tmp.cleanup();
try tmp.dir.writeFile(io, .{
.sub_path = "eyes.txt",
.data = "Heaven's been cheating the Hell out of me",
});
try tmp.dir.writeFile(io, .{
.sub_path = "saviour.txt",
.data = "Burn your thoughts, erase your will / to gods of suffering and tears",
});
var eyes_file = try tmp.dir.openFile(io, "eyes.txt", .{});
defer eyes_file.close(io);
var saviour_file = try tmp.dir.openFile(io, "saviour.txt", .{});
defer saviour_file.close(io);
var eyes_buf: [100]u8 = undefined;
var saviour_buf: [100]u8 = undefined;
var storage: [2]Io.Operation.Storage = undefined;
var batch: Io.Batch = .init(&storage);
batch.addAt(0, .{ .file_read_streaming = .{
.file = eyes_file,
.data = &.{&eyes_buf},
} });
batch.addAt(1, .{ .file_read_streaming = .{
.file = saviour_file,
.data = &.{&saviour_buf},
} });
// This API is supposed to *always* work even if the target has no
// concurrency primitives available.
try batch.awaitAsync(io);
while (batch.next()) |completion| {
switch (completion.index) {
0 => {
const n = try completion.result.file_read_streaming;
try expectEqualStrings(
"Heaven's been cheating the Hell out of me"[0..n],
eyes_buf[0..n],
);
},
1 => {
const n = try completion.result.file_read_streaming;
try expectEqualStrings(
"Burn your thoughts, erase your will / to gods of suffering and tears"[0..n],
saviour_buf[0..n],
);
},
else => return error.TestFailure,
}
}
}