diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 4e7f2070f5..1450bea47a 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -2647,7 +2647,7 @@ fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Oper fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Cancelable!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); if (is_windows) { - batchDrainSubmittedWindows(b, false) catch |err| switch (err) { + batchDrainSubmittedWindows(t, b, false) catch |err| switch (err) { error.ConcurrencyUnavailable => unreachable, // passed concurrency=false else => |e| return e, }; @@ -2787,7 +2787,7 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout const t: *Threaded = @ptrCast(@alignCast(userdata)); if (is_windows) { const deadline: ?Io.Clock.Timestamp = timeout.toTimestamp(ioBasic(t)); - try batchDrainSubmittedWindows(b, true); + try batchDrainSubmittedWindows(t, b, true); while (b.pending.head != .none and b.completed.head == .none) { var delay_interval: windows.LARGE_INTEGER = interval: { const d = deadline orelse break :interval std.math.minInt(windows.LARGE_INTEGER); @@ -3003,6 +3003,31 @@ fn batchCancel(userdata: ?*anyopaque, b: *Io.Batch) void { } } +fn batchCompleteBlockingWindows( + b: *Io.Batch, + operation_userdata: *WindowsBatchOperationUserdata, + result: Io.Operation.Result, +) void { + const erased_userdata = operation_userdata.toErased(); + const pending: *Io.Operation.Storage.Pending = @fieldParentPtr("userdata", erased_userdata); + switch (pending.node.prev) { + .none => b.pending.head = pending.node.next, + else => |prev_index| b.storage[prev_index.toIndex()].pending.node.next = pending.node.next, + } + switch (pending.node.next) { + .none => b.pending.tail = pending.node.prev, + else => |next_index| b.storage[next_index.toIndex()].pending.node.prev = pending.node.prev, + } + const storage: *Io.Operation.Storage = @fieldParentPtr("pending", pending); + const index: Io.Operation.OptionalIndex = .fromIndex(storage - b.storage.ptr); + switch (b.completed.tail) { + .none => b.completed.head = index, + else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = index, + } + b.completed.tail = index; + storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } }; +} + fn batchApc( apc_context: ?*anyopaque, iosb: *windows.IO_STATUS_BLOCK, @@ -3042,7 +3067,7 @@ fn batchApc( .file_read_streaming => .{ .file_read_streaming = ntReadFileResult(iosb) }, .file_write_streaming => .{ .file_write_streaming = ntWriteFileResult(iosb) }, .device_io_control => .{ .device_io_control = iosb.* }, - .net_receive => unreachable, // TODO + .net_receive => unreachable, }; storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } }; }, @@ -3050,7 +3075,7 @@ fn batchApc( } /// If `concurrency` is false, `error.ConcurrencyUnavailable` is unreachable. -fn batchDrainSubmittedWindows(b: *Io.Batch, concurrency: bool) (Io.ConcurrentError || Io.Cancelable)!void { +fn batchDrainSubmittedWindows(t: *Threaded, b: *Io.Batch, concurrency: bool) (Io.ConcurrentError || Io.Cancelable)!void { var index = b.submitted.head; errdefer b.submitted.head = index; while (index != .none) { @@ -3244,10 +3269,12 @@ fn batchDrainSubmittedWindows(b: *Io.Batch, concurrency: bool) (Io.ConcurrentErr }; } }, - .net_receive => |o| { + .net_receive => |*o| { + // TODO integrate with overlapped I/O or equivalent to avoid this error if (concurrency) return error.ConcurrencyUnavailable; - _ = o; - @panic("TODO implement Batch NetReceive on Windows"); + batchCompleteBlockingWindows(b, operation_userdata, .{ + .net_receive = netReceiveWindows(t, o.socket_handle, o.message_buffer, o.data_buffer, o.flags), + }); }, } index = submission.node.next; @@ -13321,13 +13348,89 @@ fn netReceiveWindows( data_buffer: []u8, flags: net.ReceiveFlags, ) struct { ?net.Socket.ReceiveError, usize } { - if (!have_networking) return .{ error.NetworkDown, 0 }; - _ = t; - _ = socket_handle; - _ = message_buffer; - _ = data_buffer; - _ = flags; - @panic("TODO implement netReceiveWindows"); + netReceiveWindowsOne(t, socket_handle, &message_buffer[0], data_buffer, flags) catch |err| return .{ err, 0 }; + return .{ null, 1 }; +} + +fn netReceiveWindowsOne( + t: *Threaded, + socket_handle: net.Socket.Handle, + message: *net.IncomingMessage, + data_buffer: []u8, + flags: net.ReceiveFlags, +) net.Socket.ReceiveError!void { + comptime assert(have_networking); + + var windows_flags: u32 = + @as(u32, if (flags.oob) ws2_32.MSG.OOB else 0) | + @as(u32, if (flags.peek) ws2_32.MSG.PEEK else 0) | + @as(u32, if (flags.trunc) ws2_32.MSG.TRUNC else 0); + + var buf: ws2_32.WSABUF = .{ + .buf = data_buffer.ptr, + .len = std.math.cast(u32, data_buffer.len) orelse return error.MessageOversize, + }; + var n: u32 = undefined; + var syscall: Syscall = try .start(); + var from_storage: WsaAddress = undefined; + var from_storage_len: i32 = @sizeOf(WsaAddress); + + while (true) { + const rc = ws2_32.WSARecvFrom( + socket_handle, + (&buf)[0..1], + 1, + &n, + &windows_flags, + &from_storage.any, + &from_storage_len, + null, + null, + ); + if (rc != ws2_32.SOCKET_ERROR) { + syscall.finish(); + message.* = .{ + .from = addressFromWsa(&from_storage), + .data = data_buffer[0..n], + .control = &.{}, + .flags = .{ + .eor = false, + .trunc = (windows_flags & ws2_32.MSG.TRUNC) != 0, + .ctrunc = (windows_flags & ws2_32.MSG.CTRUNC) != 0, + .oob = false, + .errqueue = false, + }, + }; + return; + } + switch (ws2_32.WSAGetLastError()) { + .EINTR, .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => { + try syscall.checkCancel(); + continue; + }, + .NOTINITIALISED => { + syscall.finish(); + try initializeWsa(t); + syscall = try .start(); + continue; + }, + + .ECONNRESET => return syscall.fail(error.ConnectionResetByPeer), + .ENETDOWN => return syscall.fail(error.NetworkDown), + .ENETRESET => return syscall.fail(error.ConnectionResetByPeer), + .ENOTCONN => return syscall.fail(error.SocketUnconnected), + .EFAULT => unreachable, // a pointer is not completely contained in user address space. + + else => |err| { + syscall.finish(); + switch (err) { + .EINVAL => return wsaErrorBug(err), + .EMSGSIZE => return wsaErrorBug(err), + else => return windows.unexpectedWSAError(err), + } + }, + } + } } fn netReceiveUnavailable( diff --git a/lib/std/os/windows/ws2_32.zig b/lib/std/os/windows/ws2_32.zig index d5c74e2212..0abfe64ec3 100644 --- a/lib/std/os/windows/ws2_32.zig +++ b/lib/std/os/windows/ws2_32.zig @@ -661,17 +661,20 @@ pub const IOC_OUT = 1073741824; pub const IOC_IN = 2147483648; pub const MSG = struct { - pub const TRUNC = 256; - pub const CTRUNC = 512; - pub const BCAST = 1024; - pub const MCAST = 2048; - pub const ERRQUEUE = 4096; + pub const OOB = 0x1; + pub const PEEK = 0x2; + pub const DONTROUTE = 0x4; + pub const WAITALL = 0x8; + pub const INTERRUPT = 0x10; + pub const PUSH_IMMEDIATE = 0x20; + + pub const TRUNC = 0x0100; + pub const CTRUNC = 0x0200; + pub const BCAST = 0x0400; + pub const MCAST = 0x0800; + + pub const PARTIAL = 0x8000; - pub const PEEK = 2; - pub const WAITALL = 8; - pub const PUSH_IMMEDIATE = 32; - pub const PARTIAL = 32768; - pub const INTERRUPT = 16; pub const MAXIOVLEN = 16; };