diff --git a/lib/std/Io.zig b/lib/std/Io.zig index c3ba3575e4..16d056ed0f 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -369,7 +369,9 @@ pub const Batch = struct { context: ?*anyopaque, /// After calling this, it is safe to unconditionally defer a call to - /// `cancel`. + /// `cancel`. `storage` is a pre-allocated buffer of undefined memory that + /// determines the maximum number of active operations that can be + /// submitted via `add` and `addAt`. pub fn init(storage: []Operation.Storage) Batch { var prev: Operation.OptionalIndex = .none; for (storage, 0..) |*operation, index| { @@ -422,12 +424,20 @@ pub const Batch = struct { b.submissions.tail = .fromIndex(index); } + pub const Completion = struct { + /// The element within the provided operation storage that completed. + /// `addAt` can be used to re-arm the `Batch` using this `index`. + index: u32, + /// The return value of the operation. + result: Operation.Result, + }; + /// After calling `awaitAsync`, `awaitConcurrent`, or `cancel`, this /// function iterates over the completed operations. /// /// Each completion returned from this function dequeues from the `Batch`. /// It is not required to dequeue all completions before awaiting again. - pub fn next(b: *Batch) ?struct { index: u32, result: Operation.Result } { + pub fn next(b: *Batch) ?Completion { const index = b.completions.head; if (index == .none) return null; const storage = &b.storage[index.toIndex()]; diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index e9b62ca5f5..12521ea1af 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -1938,6 +1938,10 @@ const have_mmap = switch (native_os) { .wasi, .windows => false, else => true, }; +const have_poll = switch (native_os) { + .wasi, .windows => false, + else => true, +}; const open_sym = if (posix.lfs64_abi) posix.system.open64 else posix.system.open; const openat_sym = if (posix.lfs64_abi) posix.system.openat64 else posix.system.openat; @@ -2507,104 +2511,104 @@ fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Cancelable!void { alertable_syscall.finish(); return; } - if (native_os == .wasi and !builtin.link_libc) @panic("TODO"); - var poll_buffer: [poll_buffer_len]posix.pollfd = undefined; - var poll_len: u32 = 0; - { - var index = b.submissions.head; - while (index != .none and poll_len < poll_buffer_len) { - const submission = &b.storage[index.toIndex()].submission; - switch (submission.operation) { - .file_read_streaming => |o| { - poll_buffer[poll_len] = .{ .fd = o.file.handle, .events = posix.POLL.IN, .revents = 0 }; - poll_len += 1; - }, - } - index = submission.node.next; - } - } - switch (poll_len) { - 0 => return, - 1 => {}, - else => while (true) { - const timeout_ms: i32 = t: { - if (b.completions.head != .none) { - // It is legal to call batchWait with already completed - // operations in the ring. In such case, we need to avoid - // blocking in the poll syscall, but we can still take this - // opportunity to find additional ready operations. - break :t 0; + if (have_poll) { + var poll_buffer: [poll_buffer_len]posix.pollfd = undefined; + var poll_len: u32 = 0; + { + var index = b.submissions.head; + while (index != .none and poll_len < poll_buffer_len) { + const submission = &b.storage[index.toIndex()].submission; + switch (submission.operation) { + .file_read_streaming => |o| { + poll_buffer[poll_len] = .{ .fd = o.file.handle, .events = posix.POLL.IN, .revents = 0 }; + poll_len += 1; + }, } - const max_poll_ms = std.math.maxInt(i32); - break :t max_poll_ms; - }; - const syscall = try Syscall.start(); - const rc = posix.system.poll(&poll_buffer, poll_len, timeout_ms); - syscall.finish(); - switch (posix.errno(rc)) { - .SUCCESS => { - if (rc == 0) { - if (b.completions.head != .none) { - // Since there are already completions available in the - // queue, this is neither a timeout nor a case for - // retrying. - return; - } - continue; - } - var prev_index: Io.Operation.OptionalIndex = .none; - var index = b.submissions.head; - for (poll_buffer[0..poll_len]) |poll_entry| { - const storage = &b.storage[index.toIndex()]; - const submission = &storage.submission; - const next_index = submission.node.next; - if (poll_entry.revents != 0) { - const result = try operate(t, submission.operation); - - switch (prev_index) { - .none => b.submissions.head = next_index, - else => b.storage[prev_index.toIndex()].submission.node.next = next_index, - } - if (next_index == .none) b.submissions.tail = prev_index; - - switch (b.completions.tail) { - .none => b.completions.head = index, - else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = index, - } - storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } }; - b.completions.tail = index; - } else prev_index = index; - index = next_index; - } - assert(index == .none); - return; - }, - .INTR => continue, - else => break, + index = submission.node.next; } - }, - } - { - var tail_index = b.completions.tail; - defer b.completions.tail = tail_index; - var index = b.submissions.head; - errdefer b.submissions.head = index; - while (index != .none) { - const storage = &b.storage[index.toIndex()]; - const submission = &storage.submission; - const next_index = submission.node.next; - const result = try operate(t, submission.operation); - - switch (tail_index) { - .none => b.completions.head = index, - else => b.storage[tail_index.toIndex()].completion.node.next = index, - } - storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } }; - tail_index = index; - index = next_index; } - b.submissions = .{ .head = .none, .tail = .none }; + switch (poll_len) { + 0 => return, + 1 => {}, + else => while (true) { + const timeout_ms: i32 = t: { + if (b.completions.head != .none) { + // It is legal to call batchWait with already completed + // operations in the ring. In such case, we need to avoid + // blocking in the poll syscall, but we can still take this + // opportunity to find additional ready operations. + break :t 0; + } + const max_poll_ms = std.math.maxInt(i32); + break :t max_poll_ms; + }; + const syscall = try Syscall.start(); + const rc = posix.system.poll(&poll_buffer, poll_len, timeout_ms); + syscall.finish(); + switch (posix.errno(rc)) { + .SUCCESS => { + if (rc == 0) { + if (b.completions.head != .none) { + // Since there are already completions available in the + // queue, this is neither a timeout nor a case for + // retrying. + return; + } + continue; + } + var prev_index: Io.Operation.OptionalIndex = .none; + var index = b.submissions.head; + for (poll_buffer[0..poll_len]) |poll_entry| { + const storage = &b.storage[index.toIndex()]; + const submission = &storage.submission; + const next_index = submission.node.next; + if (poll_entry.revents != 0) { + const result = try operate(t, submission.operation); + + switch (prev_index) { + .none => b.submissions.head = next_index, + else => b.storage[prev_index.toIndex()].submission.node.next = next_index, + } + if (next_index == .none) b.submissions.tail = prev_index; + + switch (b.completions.tail) { + .none => b.completions.head = index, + else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = index, + } + storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } }; + b.completions.tail = index; + } else prev_index = index; + index = next_index; + } + assert(index == .none); + return; + }, + .INTR => continue, + else => break, + } + }, + } } + + var tail_index = b.completions.tail; + defer b.completions.tail = tail_index; + var index = b.submissions.head; + errdefer b.submissions.head = index; + while (index != .none) { + const storage = &b.storage[index.toIndex()]; + const submission = &storage.submission; + const next_index = submission.node.next; + const result = try operate(t, submission.operation); + + switch (tail_index) { + .none => b.completions.head = index, + else => b.storage[tail_index.toIndex()].completion.node.next = index, + } + storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } }; + tail_index = index; + index = next_index; + } + b.submissions = .{ .head = .none, .tail = .none }; } fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.AwaitConcurrentError!void { @@ -2644,7 +2648,11 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout } return; } - if (native_os == .wasi and !builtin.link_libc) @panic("TODO"); + if (native_os == .wasi) { + // TODO call poll_oneoff + return error.ConcurrencyUnavailable; + } + if (!have_poll) return error.ConcurrencyUnavailable; var poll_buffer: [poll_buffer_len]posix.pollfd = undefined; var poll_storage: struct { gpa: std.mem.Allocator, diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig index b022b10e6e..930a176b01 100644 --- a/lib/std/Io/test.zig +++ b/lib/std/Io/test.zig @@ -656,3 +656,63 @@ test "memory mapping" { try expectEqualStrings("this9is9my data123\x00\x00", mm.memory[0.."this9is9my data123\x00\x00".len]); } } + +test "read from a file using Batch.awaitAsync API" { + const io = testing.io; + + var tmp = tmpDir(.{}); + defer tmp.cleanup(); + + try tmp.dir.writeFile(io, .{ + .sub_path = "eyes.txt", + .data = "Heaven's been cheating the Hell out of me", + }); + try tmp.dir.writeFile(io, .{ + .sub_path = "saviour.txt", + .data = "Burn your thoughts, erase your will / to gods of suffering and tears", + }); + + var eyes_file = try tmp.dir.openFile(io, "eyes.txt", .{}); + defer eyes_file.close(io); + + var saviour_file = try tmp.dir.openFile(io, "saviour.txt", .{}); + defer saviour_file.close(io); + + var eyes_buf: [100]u8 = undefined; + var saviour_buf: [100]u8 = undefined; + var storage: [2]Io.Operation.Storage = undefined; + var batch: Io.Batch = .init(&storage); + + batch.addAt(0, .{ .file_read_streaming = .{ + .file = eyes_file, + .data = &.{&eyes_buf}, + } }); + batch.addAt(1, .{ .file_read_streaming = .{ + .file = saviour_file, + .data = &.{&saviour_buf}, + } }); + + // This API is supposed to *always* work even if the target has no + // concurrency primitives available. + try batch.awaitAsync(io); + + while (batch.next()) |completion| { + switch (completion.index) { + 0 => { + const n = try completion.result.file_read_streaming; + try expectEqualStrings( + "Heaven's been cheating the Hell out of me"[0..n], + eyes_buf[0..n], + ); + }, + 1 => { + const n = try completion.result.file_read_streaming; + try expectEqualStrings( + "Burn your thoughts, erase your will / to gods of suffering and tears"[0..n], + saviour_buf[0..n], + ); + }, + else => return error.TestFailure, + } + } +}