std.Io.Threaded: super broken Windows impl of batch

this is a cry for help
This commit is contained in:
Andrew Kelley 2026-01-09 20:46:51 -08:00
parent c95c442b75
commit 078a19cf31

View file

@ -2366,14 +2366,13 @@ fn batchCancel(userdata: ?*anyopaque, b: *Io.Batch) void {
fn batch(userdata: ?*anyopaque, operations: []Io.Operation) Io.ConcurrentError!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
if (operations.len == 1) {
@branchHint(.likely);
return operate(&operations[0]);
}
if (is_windows) @panic("TODO");
if (is_windows) return batchWindows(t, operations);
var poll_buffer: [poll_buffer_len]posix.pollfd = undefined;
var map_buffer: [poll_buffer_len]u8 = undefined; // poll_buffer index to operations index
@ -2397,7 +2396,7 @@ fn batch(userdata: ?*anyopaque, operations: []Io.Operation) Io.ConcurrentError!v
const map = map_buffer[0..poll_i];
var pending = poll_i;
while (pending > 1) {
while (pending > 0) {
const syscall = Syscall.start() catch |err| switch (err) {
error.Canceled => {
if (!setOperationsError(operations, polls, map, error.Canceled))
@ -2408,17 +2407,11 @@ fn batch(userdata: ?*anyopaque, operations: []Io.Operation) Io.ConcurrentError!v
const rc = posix.system.poll(polls.ptr, polls.len, -1);
syscall.finish();
switch (posix.errno(rc)) {
.SUCCESS => {
if (rc == 0) {
// Spurious timeout; handle the same as INTR.
continue;
}
for (polls, map) |*poll_fd, i| {
if (poll_fd.revents == 0) continue;
poll_fd.fd = -1;
pending -= 1;
operate(&operations[i]);
}
.SUCCESS => for (polls, map) |*poll_fd, i| {
if (poll_fd.revents == 0) continue;
poll_fd.fd = -1;
pending -= 1;
operate(&operations[i]);
},
.INTR => continue,
.NOMEM => {
@ -2431,11 +2424,67 @@ fn batch(userdata: ?*anyopaque, operations: []Io.Operation) Io.ConcurrentError!v
},
}
}
}
if (pending == 1) for (poll_buffer[0..poll_i], map_buffer[0..poll_i]) |*poll_fd, i| {
if (poll_fd.fd == -1) continue;
operate(&operations[i]);
fn batchWindows(t: *Threaded, operations: []Io.Operation) Io.ConcurrentError!void {
_ = t;
var overlapped_buffer: [poll_buffer_len]windows.OVERLAPPED = undefined;
var handles_buffer: [poll_buffer_len]windows.HANDLE = undefined;
var map_buffer: [poll_buffer_len]u8 = undefined; // handles_buffer index to operations index
var buffer_i: usize = 0;
for (operations, 0..) |*op, operation_index| switch (op.*) {
.noop => continue,
.file_read_streaming => |*o| {
if (handles_buffer.len - buffer_i == 0) return error.ConcurrencyUnavailable;
const overlapped = &overlapped_buffer[buffer_i];
overlapped.* = .{
.Internal = 0,
.InternalHigh = 0,
.DUMMYUNIONNAME = .{
.DUMMYSTRUCTNAME = .{
.Offset = 0,
.OffsetHigh = 0,
},
.Pointer = null,
},
.hEvent = null,
};
var n: windows.DWORD = undefined;
const buf = o.data[0];
if (windows.kernel32.ReadFile(o.file.handle, buf.ptr, buf.len, &n, overlapped) == 0) {
@panic("TODO");
}
handles_buffer[buffer_i] = o.file.handle;
map_buffer[buffer_i] = @intCast(operation_index);
buffer_i += 1;
},
};
const handles = handles_buffer[0..buffer_i];
const map = map_buffer[0..buffer_i];
var pending = buffer_i;
while (pending > 0) {
const syscall: Syscall = try .start();
const index = windows.WaitForMultipleObjectsEx(handles, false, windows.INFINITE, true);
syscall.finish();
var n: windows.DWORD = undefined;
if (0 == windows.kernel32.GetOverlappedResult(handles[index], overlapped_buffer[index], &n, 0)) {
switch (windows.GetLastError()) {
.BROKEN_PIPE => @panic("TODO"),
.OPERATION_ABORTED => @panic("TODO"),
else => @panic("TODO"),
}
} else switch (operations[map[index]]) {
.noop => unreachable,
.file_read_streaming => |*o| {
o.status = .{ .result = n };
pending -= 1;
},
}
}
}
fn setOperationsError(