From 6a7fe61d74f80456b32cb46ef21715bbffaf49a7 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Thu, 8 Jan 2026 15:07:03 -0800 Subject: [PATCH] std.Io.Threaded.operate: handle poll buffer exceeded --- lib/std/Io/Threaded.zig | 141 ++++++++++++++++++++-------------------- 1 file changed, 72 insertions(+), 69 deletions(-) diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 1f4bcf3478..e58d923793 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -2458,81 +2458,84 @@ fn operate(userdata: ?*anyopaque, operations: []Io.Operation) void { var poll_buffer: [poll_buffer_len]posix.pollfd = undefined; var map_buffer: [poll_buffer_len]u8 = undefined; // poll_buffer index to operations index - var poll_i: usize = 0; + var operation_index: usize = 0; - // Put all the file reads with nonblocking enabled into the poll set. - if (operations.len > poll_buffer.len) @panic("TODO"); - - for (operations, 0..) |*operation, operation_index| switch (operation.*) { - .noop => continue, - .file_read_streaming => |*o| { - if (o.nonblocking) { - o.result = error.WouldBlock; - poll_buffer[poll_i] = .{ - .fd = o.file.handle, - .events = posix.POLL.IN, - .revents = undefined, - }; - map_buffer[poll_i] = @intCast(operation_index); - poll_i += 1; - } else { - o.result = fileReadStreaming(o.file, o.data) catch |err| switch (err) { - error.Canceled => { - setOperationsCanceled(operations[operation_index..]); - return; - }, - else => err, - }; + while (operation_index < operations.len) { + var poll_i: usize = 0; + while (operation_index < operations.len) : (operation_index += 1) { + switch (operations[operation_index]) { + .noop => continue, + .file_read_streaming => |*o| { + if (o.nonblocking) { + o.result = error.WouldBlock; + poll_buffer[poll_i] = .{ + .fd = o.file.handle, + .events = posix.POLL.IN, + .revents = 0, + }; + if (map_buffer.len - poll_i == 0) break; + map_buffer[poll_i] = @intCast(operation_index); + poll_i += 1; + } else { + o.result = fileReadStreaming(o.file, o.data) catch |err| switch (err) { + error.Canceled => { + setOperationsError(operations[operation_index..], error.Canceled); + return; + }, + else => err, + }; + } + }, } - }, - }; - - if (poll_i == 0) { - @branchHint(.likely); - return; - } - - while (true) { - const syscall = Syscall.start() catch |err| switch (err) { - error.Canceled => { - setAllOperationsError(operations, map_buffer[0..poll_i], error.Canceled); - return; - }, - }; - const poll_rc = posix.system.poll(&poll_buffer, poll_i, -1); - syscall.finish(); - switch (posix.errno(poll_rc)) { - .SUCCESS => { - if (poll_rc == 0) { - // Spurious timeout; handle same as INTR. - continue; - } - break; - }, - .INTR => continue, - .NOMEM => { - setAllOperationsError(operations, map_buffer[0..poll_i], error.SystemResources); - return; - }, - else => { - setAllOperationsError(operations, map_buffer[0..poll_i], error.Unexpected); - return; - }, } - } - for (poll_buffer[0..poll_i], map_buffer[0..poll_i]) |*poll_fd, operation_index| { - if (poll_fd.revents == 0) continue; - switch (operations[operation_index]) { - .noop => unreachable, - .file_read_streaming => |*o| { - o.result = fileReadStreaming(o.file, o.data); - }, + if (poll_i == 0) { + @branchHint(.likely); + return; + } + + while (true) { + const syscall = Syscall.start() catch |err| switch (err) { + error.Canceled => { + setPollOperationsError(operations, map_buffer[0..poll_i], error.Canceled); + setOperationsError(operations[operation_index..], error.Canceled); + return; + }, + }; + const poll_rc = posix.system.poll(&poll_buffer, poll_i, -1); + syscall.finish(); + switch (posix.errno(poll_rc)) { + .SUCCESS => { + if (poll_rc == 0) { + // Spurious timeout; handle same as INTR. + continue; + } + for (poll_buffer[0..poll_i], map_buffer[0..poll_i]) |*poll_fd, i| { + if (poll_fd.revents == 0) continue; + switch (operations[i]) { + .noop => unreachable, + .file_read_streaming => |*o| { + o.result = fileReadStreaming(o.file, o.data); + }, + } + } + break; + }, + .INTR => continue, + .NOMEM => { + setPollOperationsError(operations, map_buffer[0..poll_i], error.SystemResources); + break; + }, + else => { + setPollOperationsError(operations, map_buffer[0..poll_i], error.Unexpected); + break; + }, + } } } } -fn setAllOperationsError( +fn setPollOperationsError( operations: []Io.Operation, map: []const u8, err: error{ Canceled, SystemResources, Unexpected }, @@ -2543,10 +2546,10 @@ fn setAllOperationsError( }; } -fn setOperationsCanceled(operations: []Io.Operation) void { +fn setOperationsError(operations: []Io.Operation, err: error{ Canceled, SystemResources, Unexpected }) void { for (operations) |*op| switch (op.*) { .noop => unreachable, - inline else => |*o| o.result = error.Canceled, + inline else => |*o| o.result = err, }; }