diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 8fdd2a58e9..e9e5ece521 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -1340,8 +1340,6 @@ const AlertableSyscall = struct { } }; -fn noopApc(_: ?*anyopaque, _: *windows.IO_STATUS_BLOCK, _: windows.ULONG) callconv(.winapi) void {} - fn waitForApcOrAlert() void { const infinite_timeout: windows.LARGE_INTEGER = std.math.minInt(windows.LARGE_INTEGER); _ = windows.ntdll.NtDelayExecution(windows.TRUE, &infinite_timeout); @@ -2500,7 +2498,10 @@ fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Oper fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Batch.AwaitAsyncError!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); if (is_windows) { - try batchAwaitWindows(b); + batchAwaitWindows(b, false) catch |err| switch (err) { + error.ConcurrencyUnavailable => unreachable, // passed concurrency=false + else => |e| return e, + }; const alertable_syscall = try AlertableSyscall.start(); while (b.pending.head != .none and b.completions.head == .none) waitForApcOrAlert(); alertable_syscall.finish(); @@ -2616,7 +2617,7 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout }, error.UnsupportedClock => |e| return e, }; - try batchAwaitWindows(b); + try batchAwaitWindows(b, true); while (b.pending.head != .none and b.completions.head == .none) { var delay_interval: windows.LARGE_INTEGER = interval: { const d = deadline orelse break :interval std.math.minInt(windows.LARGE_INTEGER); @@ -2810,7 +2811,8 @@ fn batchCancel(userdata: ?*anyopaque, b: *Io.Batch) void { while (index != .none) { const pending = &b.storage[index.toIndex()].pending; const context: *WindowsBatchPendingOperationContext = .fromErased(&pending.context); - _ = windows.ntdll.NtCancelIoFile(context.file, &context.iosb); + var cancel_iosb: windows.IO_STATUS_BLOCK = undefined; + _ = windows.ntdll.NtCancelIoFileEx(context.file, &context.iosb, &cancel_iosb); index = pending.node.next; } while (b.pending.head != .none) waitForApcOrAlert(); @@ -2860,30 +2862,94 @@ fn batchApc(apc_context: ?*anyopaque, iosb: *windows.IO_STATUS_BLOCK, _: windows } } -fn batchAwaitWindows(b: *Io.Batch) Io.Cancelable!void { +/// If `concurrency` is false, `error.ConcurrencyUnavailable` is unreachable. +fn batchAwaitWindows(b: *Io.Batch, concurrency: bool) error{ Canceled, ConcurrencyUnavailable }!void { var index = b.submissions.head; errdefer b.submissions.head = index; while (index != .none) { const storage = &b.storage[index.toIndex()]; const submission = storage.submission; - errdefer storage.* = .{ .submission = submission }; storage.* = .{ .pending = .{ .node = .{ .prev = b.pending.tail, .next = .none }, .tag = submission.operation, .context = undefined, } }; - const context: *WindowsBatchPendingOperationContext = .fromErased(&storage.pending.context); - switch (submission.operation) { - .file_read_streaming => |o| { - context.file = o.file.handle; - try ntReadFile(o.file.handle, o.data, &batchApc, b, &context.iosb); - }, - } switch (b.pending.tail) { .none => b.pending.head = index, else => |tail_index| b.storage[tail_index.toIndex()].pending.node.next = index, } b.pending.tail = index; + const context: *WindowsBatchPendingOperationContext = .fromErased(&storage.pending.context); + errdefer { + context.iosb.u.Status = .CANCELLED; + batchApc(b, &context.iosb, 0); + } + switch (submission.operation) { + .file_read_streaming => |o| o: { + var data_index: usize = 0; + while (o.data.len - data_index != 0 and o.data[data_index].len == 0) data_index += 1; + if (o.data.len - data_index == 0) { + context.iosb = .{ + .u = .{ .Status = .SUCCESS }, + .Information = 0, + }; + batchApc(b, &context.iosb, 0); + break :o; + } + const buffer = o.data[data_index]; + const short_buffer_len = @min(std.math.maxInt(u32), buffer.len); + + if (o.file.flags.nonblocking) { + context.file = o.file.handle; + switch (windows.ntdll.NtReadFile( + o.file.handle, + null, // event + &batchApc, + b, + &context.iosb, + buffer.ptr, + short_buffer_len, + null, // byte offset + null, // key + )) { + .PENDING, .SUCCESS => {}, + .CANCELLED => unreachable, + else => |status| { + context.iosb.u.Status = status; + batchApc(b, &context.iosb, 0); + }, + } + } else { + if (concurrency) return error.ConcurrencyUnavailable; + + const syscall: Syscall = try .start(); + while (true) switch (windows.ntdll.NtReadFile( + o.file.handle, + null, // event + null, // APC routine + null, // APC context + &context.iosb, + buffer.ptr, + short_buffer_len, + null, // byte offset + null, // key + )) { + .PENDING => unreachable, // unrecoverable: wrong File nonblocking flag + .CANCELLED => { + try syscall.checkCancel(); + continue; + }, + else => |status| { + syscall.finish(); + + context.iosb.u.Status = status; + batchApc(b, &context.iosb, 0); + break; + }, + }; + } + }, + } index = submission.node.next; } b.submissions = .{ .head = .none, .tail = .none }; @@ -8846,28 +8912,76 @@ fn fileReadStreamingPosix(file: File, data: []const []u8) File.ReadStreamingErro } fn fileReadStreamingWindows(file: File, data: []const []u8) File.ReadStreamingError!usize { - var io_status_block: windows.IO_STATUS_BLOCK = .{ - .u = .{ .Status = .PENDING }, - .Information = undefined, - }; - try ntReadFile(file.handle, data, &noopApc, null, &io_status_block); + var index: usize = 0; + while (data.len - index != 0 and data[index].len == 0) index += 1; + if (data.len - index == 0) return 0; + const buffer = data[index]; + const short_buffer_len = @min(std.math.maxInt(u32), buffer.len); - while (@atomicLoad(windows.NTSTATUS, &io_status_block.u.Status, .acquire) == .PENDING) { - // Once we get here we must not return from the function until the - // operation completes, thereby releasing reference to io_status_block. - const alertable_syscall = AlertableSyscall.start() catch |err| switch (err) { - error.Canceled => |e| { - _ = windows.ntdll.NtCancelIoFile(file.handle, &io_status_block); - while (@atomicLoad(windows.NTSTATUS, &io_status_block.u.Status, .acquire) == .PENDING) { - waitForApcOrAlert(); - } - return e; + var iosb: windows.IO_STATUS_BLOCK = undefined; + + if (!file.flags.nonblocking) { + const syscall: Syscall = try .start(); + while (true) switch (windows.ntdll.NtReadFile( + file.handle, + null, // event + null, // APC routine + null, // APC context + &iosb, + buffer.ptr, + short_buffer_len, + null, // byte offset + null, // key + )) { + .PENDING => unreachable, // unrecoverable: wrong File nonblocking flag + .CANCELLED => { + try syscall.checkCancel(); + continue; + }, + else => |status| { + syscall.finish(); + iosb.u.Status = status; + return ntReadFileResult(&iosb); }, }; - waitForApcOrAlert(); - alertable_syscall.finish(); } - return ntReadFileResult(&io_status_block); + + var done: bool = false; + + switch (windows.ntdll.NtReadFile( + file.handle, + null, // event + flagApc, + &done, // APC context + &iosb, + buffer.ptr, + short_buffer_len, + null, // byte offset + null, // key + )) { + // We must wait for the APC routine. + .PENDING, .SUCCESS => while (!done) { + // Once we get here we must not return from the function until the + // operation completes, thereby releasing reference to io_status_block. + const alertable_syscall = AlertableSyscall.start() catch |err| switch (err) { + error.Canceled => |e| { + var cancel_iosb: windows.IO_STATUS_BLOCK = undefined; + _ = windows.ntdll.NtCancelIoFileEx(file.handle, &iosb, &cancel_iosb); + while (!done) waitForApcOrAlert(); + return e; + }, + }; + waitForApcOrAlert(); + alertable_syscall.finish(); + }, + else => |status| iosb.u.Status = status, + } + return ntReadFileResult(&iosb); +} + +fn flagApc(userdata: ?*anyopaque, _: *windows.IO_STATUS_BLOCK, _: windows.ULONG) callconv(.winapi) void { + const flag: *bool = @ptrCast(userdata); + flag.* = true; } fn ntReadFileResult(io_status_block: *const windows.IO_STATUS_BLOCK) !usize { @@ -8883,52 +8997,6 @@ fn ntReadFileResult(io_status_block: *const windows.IO_STATUS_BLOCK) !usize { } } -fn ntReadFile( - handle: windows.HANDLE, - data: []const []u8, - apcRoutine: ?*const windows.IO_APC_ROUTINE, - apc_context: ?*anyopaque, - iosb: *windows.IO_STATUS_BLOCK, -) Io.Cancelable!void { - var index: usize = 0; - while (index < data.len and data[index].len == 0) index += 1; - if (index == data.len) { - iosb.* = .{ .u = .{ .Status = .SUCCESS }, .Information = 0 }; - if (apcRoutine) |routine| if (routine != &noopApc) { - _ = windows.ntdll.NtQueueApcThread(windows.current_process, routine, apc_context, iosb, null); - }; - return; - } - const buffer = data[index]; - - const syscall: Syscall = try .start(); - while (true) switch (windows.ntdll.NtReadFile( - handle, - null, // event - apcRoutine, - apc_context, - iosb, - buffer.ptr, - @min(std.math.maxInt(u32), buffer.len), - null, // byte offset - null, // key - )) { - .PENDING => { - syscall.finish(); - return; - }, - .CANCELLED => { - try syscall.checkCancel(); - continue; - }, - else => |status| { - syscall.finish(); - iosb.u.Status = status; - return; - }, - }; -} - fn fileReadPositionalPosix(file: File, data: []const []u8, offset: u64) File.ReadPositionalError!usize { if (!have_preadv) @compileError("TODO implement fileReadPositionalPosix for cursed operating systems that don't support preadv (it's only Haiku)"); diff --git a/lib/std/os/windows/ntdll.zig b/lib/std/os/windows/ntdll.zig index 195a457d3b..d9e68e54f9 100644 --- a/lib/std/os/windows/ntdll.zig +++ b/lib/std/os/windows/ntdll.zig @@ -614,5 +614,5 @@ pub extern "ntdll" fn NtCancelIoFileEx( pub extern "ntdll" fn NtCancelIoFile( FileHandle: HANDLE, - IoRequestToCancel: ?*IO_STATUS_BLOCK, + IoStatusBlock: *IO_STATUS_BLOCK, ) callconv(.winapi) NTSTATUS;