std.Io.Threaded.operate: handle poll buffer exceeded

This commit is contained in:
Andrew Kelley 2026-01-08 15:07:03 -08:00
parent 93f5c99149
commit 6a7fe61d74

View file

@ -2458,81 +2458,84 @@ fn operate(userdata: ?*anyopaque, operations: []Io.Operation) void {
var poll_buffer: [poll_buffer_len]posix.pollfd = undefined;
var map_buffer: [poll_buffer_len]u8 = undefined; // poll_buffer index to operations index
var poll_i: usize = 0;
var operation_index: usize = 0;
// Put all the file reads with nonblocking enabled into the poll set.
if (operations.len > poll_buffer.len) @panic("TODO");
for (operations, 0..) |*operation, operation_index| switch (operation.*) {
.noop => continue,
.file_read_streaming => |*o| {
if (o.nonblocking) {
o.result = error.WouldBlock;
poll_buffer[poll_i] = .{
.fd = o.file.handle,
.events = posix.POLL.IN,
.revents = undefined,
};
map_buffer[poll_i] = @intCast(operation_index);
poll_i += 1;
} else {
o.result = fileReadStreaming(o.file, o.data) catch |err| switch (err) {
error.Canceled => {
setOperationsCanceled(operations[operation_index..]);
return;
},
else => err,
};
while (operation_index < operations.len) {
var poll_i: usize = 0;
while (operation_index < operations.len) : (operation_index += 1) {
switch (operations[operation_index]) {
.noop => continue,
.file_read_streaming => |*o| {
if (o.nonblocking) {
o.result = error.WouldBlock;
poll_buffer[poll_i] = .{
.fd = o.file.handle,
.events = posix.POLL.IN,
.revents = 0,
};
if (map_buffer.len - poll_i == 0) break;
map_buffer[poll_i] = @intCast(operation_index);
poll_i += 1;
} else {
o.result = fileReadStreaming(o.file, o.data) catch |err| switch (err) {
error.Canceled => {
setOperationsError(operations[operation_index..], error.Canceled);
return;
},
else => err,
};
}
},
}
},
};
if (poll_i == 0) {
@branchHint(.likely);
return;
}
while (true) {
const syscall = Syscall.start() catch |err| switch (err) {
error.Canceled => {
setAllOperationsError(operations, map_buffer[0..poll_i], error.Canceled);
return;
},
};
const poll_rc = posix.system.poll(&poll_buffer, poll_i, -1);
syscall.finish();
switch (posix.errno(poll_rc)) {
.SUCCESS => {
if (poll_rc == 0) {
// Spurious timeout; handle same as INTR.
continue;
}
break;
},
.INTR => continue,
.NOMEM => {
setAllOperationsError(operations, map_buffer[0..poll_i], error.SystemResources);
return;
},
else => {
setAllOperationsError(operations, map_buffer[0..poll_i], error.Unexpected);
return;
},
}
}
for (poll_buffer[0..poll_i], map_buffer[0..poll_i]) |*poll_fd, operation_index| {
if (poll_fd.revents == 0) continue;
switch (operations[operation_index]) {
.noop => unreachable,
.file_read_streaming => |*o| {
o.result = fileReadStreaming(o.file, o.data);
},
if (poll_i == 0) {
@branchHint(.likely);
return;
}
while (true) {
const syscall = Syscall.start() catch |err| switch (err) {
error.Canceled => {
setPollOperationsError(operations, map_buffer[0..poll_i], error.Canceled);
setOperationsError(operations[operation_index..], error.Canceled);
return;
},
};
const poll_rc = posix.system.poll(&poll_buffer, poll_i, -1);
syscall.finish();
switch (posix.errno(poll_rc)) {
.SUCCESS => {
if (poll_rc == 0) {
// Spurious timeout; handle same as INTR.
continue;
}
for (poll_buffer[0..poll_i], map_buffer[0..poll_i]) |*poll_fd, i| {
if (poll_fd.revents == 0) continue;
switch (operations[i]) {
.noop => unreachable,
.file_read_streaming => |*o| {
o.result = fileReadStreaming(o.file, o.data);
},
}
}
break;
},
.INTR => continue,
.NOMEM => {
setPollOperationsError(operations, map_buffer[0..poll_i], error.SystemResources);
break;
},
else => {
setPollOperationsError(operations, map_buffer[0..poll_i], error.Unexpected);
break;
},
}
}
}
}
fn setAllOperationsError(
fn setPollOperationsError(
operations: []Io.Operation,
map: []const u8,
err: error{ Canceled, SystemResources, Unexpected },
@ -2543,10 +2546,10 @@ fn setAllOperationsError(
};
}
fn setOperationsCanceled(operations: []Io.Operation) void {
fn setOperationsError(operations: []Io.Operation, err: error{ Canceled, SystemResources, Unexpected }) void {
for (operations) |*op| switch (op.*) {
.noop => unreachable,
inline else => |*o| o.result = error.Canceled,
inline else => |*o| o.result = err,
};
}