std.Io.Threaded: rework file reading to observe nonblocking flag

- batchAwaitAsync does blocking reads with NtReadFile (no APC, no event)
  when the nonblocking flag is unset, but still takes advantage of
  APCs when nonblocking flag is set.
- batchAwaitConcurrent returns error.ConcurrencyUnavailable when it
  encounters a file_read_streaming operation on a file in blocking mode.
- fileReadStreaming avoids pointlessly checking sync cancelation status
  when nonblocking flag is set, uses an APC with a done flag, and waits
  on that value to change in NtDelayExecution before returning.
- fix incorrect use of NtCancelIoFile (ntdll function prototype was
  wrong, leading to misuse)
This commit is contained in:
Andrew Kelley 2026-01-30 19:10:44 -08:00
parent 39a6d5d1c5
commit 25aef0dd87
2 changed files with 147 additions and 79 deletions

View file

@ -1340,8 +1340,6 @@ const AlertableSyscall = struct {
}
};
fn noopApc(_: ?*anyopaque, _: *windows.IO_STATUS_BLOCK, _: windows.ULONG) callconv(.winapi) void {}
fn waitForApcOrAlert() void {
const infinite_timeout: windows.LARGE_INTEGER = std.math.minInt(windows.LARGE_INTEGER);
_ = windows.ntdll.NtDelayExecution(windows.TRUE, &infinite_timeout);
@ -2500,7 +2498,10 @@ fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Oper
fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Batch.AwaitAsyncError!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
if (is_windows) {
try batchAwaitWindows(b);
batchAwaitWindows(b, false) catch |err| switch (err) {
error.ConcurrencyUnavailable => unreachable, // passed concurrency=false
else => |e| return e,
};
const alertable_syscall = try AlertableSyscall.start();
while (b.pending.head != .none and b.completions.head == .none) waitForApcOrAlert();
alertable_syscall.finish();
@ -2616,7 +2617,7 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout
},
error.UnsupportedClock => |e| return e,
};
try batchAwaitWindows(b);
try batchAwaitWindows(b, true);
while (b.pending.head != .none and b.completions.head == .none) {
var delay_interval: windows.LARGE_INTEGER = interval: {
const d = deadline orelse break :interval std.math.minInt(windows.LARGE_INTEGER);
@ -2810,7 +2811,8 @@ fn batchCancel(userdata: ?*anyopaque, b: *Io.Batch) void {
while (index != .none) {
const pending = &b.storage[index.toIndex()].pending;
const context: *WindowsBatchPendingOperationContext = .fromErased(&pending.context);
_ = windows.ntdll.NtCancelIoFile(context.file, &context.iosb);
var cancel_iosb: windows.IO_STATUS_BLOCK = undefined;
_ = windows.ntdll.NtCancelIoFileEx(context.file, &context.iosb, &cancel_iosb);
index = pending.node.next;
}
while (b.pending.head != .none) waitForApcOrAlert();
@ -2860,30 +2862,94 @@ fn batchApc(apc_context: ?*anyopaque, iosb: *windows.IO_STATUS_BLOCK, _: windows
}
}
fn batchAwaitWindows(b: *Io.Batch) Io.Cancelable!void {
/// If `concurrency` is false, `error.ConcurrencyUnavailable` is unreachable.
fn batchAwaitWindows(b: *Io.Batch, concurrency: bool) error{ Canceled, ConcurrencyUnavailable }!void {
var index = b.submissions.head;
errdefer b.submissions.head = index;
while (index != .none) {
const storage = &b.storage[index.toIndex()];
const submission = storage.submission;
errdefer storage.* = .{ .submission = submission };
storage.* = .{ .pending = .{
.node = .{ .prev = b.pending.tail, .next = .none },
.tag = submission.operation,
.context = undefined,
} };
const context: *WindowsBatchPendingOperationContext = .fromErased(&storage.pending.context);
switch (submission.operation) {
.file_read_streaming => |o| {
context.file = o.file.handle;
try ntReadFile(o.file.handle, o.data, &batchApc, b, &context.iosb);
},
}
switch (b.pending.tail) {
.none => b.pending.head = index,
else => |tail_index| b.storage[tail_index.toIndex()].pending.node.next = index,
}
b.pending.tail = index;
const context: *WindowsBatchPendingOperationContext = .fromErased(&storage.pending.context);
errdefer {
context.iosb.u.Status = .CANCELLED;
batchApc(b, &context.iosb, 0);
}
switch (submission.operation) {
.file_read_streaming => |o| o: {
var data_index: usize = 0;
while (o.data.len - data_index != 0 and o.data[data_index].len == 0) data_index += 1;
if (o.data.len - data_index == 0) {
context.iosb = .{
.u = .{ .Status = .SUCCESS },
.Information = 0,
};
batchApc(b, &context.iosb, 0);
break :o;
}
const buffer = o.data[data_index];
const short_buffer_len = @min(std.math.maxInt(u32), buffer.len);
if (o.file.flags.nonblocking) {
context.file = o.file.handle;
switch (windows.ntdll.NtReadFile(
o.file.handle,
null, // event
&batchApc,
b,
&context.iosb,
buffer.ptr,
short_buffer_len,
null, // byte offset
null, // key
)) {
.PENDING, .SUCCESS => {},
.CANCELLED => unreachable,
else => |status| {
context.iosb.u.Status = status;
batchApc(b, &context.iosb, 0);
},
}
} else {
if (concurrency) return error.ConcurrencyUnavailable;
const syscall: Syscall = try .start();
while (true) switch (windows.ntdll.NtReadFile(
o.file.handle,
null, // event
null, // APC routine
null, // APC context
&context.iosb,
buffer.ptr,
short_buffer_len,
null, // byte offset
null, // key
)) {
.PENDING => unreachable, // unrecoverable: wrong File nonblocking flag
.CANCELLED => {
try syscall.checkCancel();
continue;
},
else => |status| {
syscall.finish();
context.iosb.u.Status = status;
batchApc(b, &context.iosb, 0);
break;
},
};
}
},
}
index = submission.node.next;
}
b.submissions = .{ .head = .none, .tail = .none };
@ -8846,28 +8912,76 @@ fn fileReadStreamingPosix(file: File, data: []const []u8) File.ReadStreamingErro
}
fn fileReadStreamingWindows(file: File, data: []const []u8) File.ReadStreamingError!usize {
var io_status_block: windows.IO_STATUS_BLOCK = .{
.u = .{ .Status = .PENDING },
.Information = undefined,
};
try ntReadFile(file.handle, data, &noopApc, null, &io_status_block);
var index: usize = 0;
while (data.len - index != 0 and data[index].len == 0) index += 1;
if (data.len - index == 0) return 0;
const buffer = data[index];
const short_buffer_len = @min(std.math.maxInt(u32), buffer.len);
while (@atomicLoad(windows.NTSTATUS, &io_status_block.u.Status, .acquire) == .PENDING) {
// Once we get here we must not return from the function until the
// operation completes, thereby releasing reference to io_status_block.
const alertable_syscall = AlertableSyscall.start() catch |err| switch (err) {
error.Canceled => |e| {
_ = windows.ntdll.NtCancelIoFile(file.handle, &io_status_block);
while (@atomicLoad(windows.NTSTATUS, &io_status_block.u.Status, .acquire) == .PENDING) {
waitForApcOrAlert();
}
return e;
var iosb: windows.IO_STATUS_BLOCK = undefined;
if (!file.flags.nonblocking) {
const syscall: Syscall = try .start();
while (true) switch (windows.ntdll.NtReadFile(
file.handle,
null, // event
null, // APC routine
null, // APC context
&iosb,
buffer.ptr,
short_buffer_len,
null, // byte offset
null, // key
)) {
.PENDING => unreachable, // unrecoverable: wrong File nonblocking flag
.CANCELLED => {
try syscall.checkCancel();
continue;
},
else => |status| {
syscall.finish();
iosb.u.Status = status;
return ntReadFileResult(&iosb);
},
};
waitForApcOrAlert();
alertable_syscall.finish();
}
return ntReadFileResult(&io_status_block);
var done: bool = false;
switch (windows.ntdll.NtReadFile(
file.handle,
null, // event
flagApc,
&done, // APC context
&iosb,
buffer.ptr,
short_buffer_len,
null, // byte offset
null, // key
)) {
// We must wait for the APC routine.
.PENDING, .SUCCESS => while (!done) {
// Once we get here we must not return from the function until the
// operation completes, thereby releasing reference to io_status_block.
const alertable_syscall = AlertableSyscall.start() catch |err| switch (err) {
error.Canceled => |e| {
var cancel_iosb: windows.IO_STATUS_BLOCK = undefined;
_ = windows.ntdll.NtCancelIoFileEx(file.handle, &iosb, &cancel_iosb);
while (!done) waitForApcOrAlert();
return e;
},
};
waitForApcOrAlert();
alertable_syscall.finish();
},
else => |status| iosb.u.Status = status,
}
return ntReadFileResult(&iosb);
}
fn flagApc(userdata: ?*anyopaque, _: *windows.IO_STATUS_BLOCK, _: windows.ULONG) callconv(.winapi) void {
const flag: *bool = @ptrCast(userdata);
flag.* = true;
}
fn ntReadFileResult(io_status_block: *const windows.IO_STATUS_BLOCK) !usize {
@ -8883,52 +8997,6 @@ fn ntReadFileResult(io_status_block: *const windows.IO_STATUS_BLOCK) !usize {
}
}
fn ntReadFile(
handle: windows.HANDLE,
data: []const []u8,
apcRoutine: ?*const windows.IO_APC_ROUTINE,
apc_context: ?*anyopaque,
iosb: *windows.IO_STATUS_BLOCK,
) Io.Cancelable!void {
var index: usize = 0;
while (index < data.len and data[index].len == 0) index += 1;
if (index == data.len) {
iosb.* = .{ .u = .{ .Status = .SUCCESS }, .Information = 0 };
if (apcRoutine) |routine| if (routine != &noopApc) {
_ = windows.ntdll.NtQueueApcThread(windows.current_process, routine, apc_context, iosb, null);
};
return;
}
const buffer = data[index];
const syscall: Syscall = try .start();
while (true) switch (windows.ntdll.NtReadFile(
handle,
null, // event
apcRoutine,
apc_context,
iosb,
buffer.ptr,
@min(std.math.maxInt(u32), buffer.len),
null, // byte offset
null, // key
)) {
.PENDING => {
syscall.finish();
return;
},
.CANCELLED => {
try syscall.checkCancel();
continue;
},
else => |status| {
syscall.finish();
iosb.u.Status = status;
return;
},
};
}
fn fileReadPositionalPosix(file: File, data: []const []u8, offset: u64) File.ReadPositionalError!usize {
if (!have_preadv) @compileError("TODO implement fileReadPositionalPosix for cursed operating systems that don't support preadv (it's only Haiku)");

View file

@ -614,5 +614,5 @@ pub extern "ntdll" fn NtCancelIoFileEx(
pub extern "ntdll" fn NtCancelIoFile(
FileHandle: HANDLE,
IoRequestToCancel: ?*IO_STATUS_BLOCK,
IoStatusBlock: *IO_STATUS_BLOCK,
) callconv(.winapi) NTSTATUS;