std.Io.Threaded: implement net_receive for Windows

This commit is contained in:
Andrew Kelley 2026-03-05 17:46:07 -08:00
parent ebb2752c07
commit 3ab20d5fe2
2 changed files with 130 additions and 24 deletions

View file

@ -2647,7 +2647,7 @@ fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Oper
fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Cancelable!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
if (is_windows) {
batchDrainSubmittedWindows(b, false) catch |err| switch (err) {
batchDrainSubmittedWindows(t, b, false) catch |err| switch (err) {
error.ConcurrencyUnavailable => unreachable, // passed concurrency=false
else => |e| return e,
};
@ -2787,7 +2787,7 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout
const t: *Threaded = @ptrCast(@alignCast(userdata));
if (is_windows) {
const deadline: ?Io.Clock.Timestamp = timeout.toTimestamp(ioBasic(t));
try batchDrainSubmittedWindows(b, true);
try batchDrainSubmittedWindows(t, b, true);
while (b.pending.head != .none and b.completed.head == .none) {
var delay_interval: windows.LARGE_INTEGER = interval: {
const d = deadline orelse break :interval std.math.minInt(windows.LARGE_INTEGER);
@ -3003,6 +3003,31 @@ fn batchCancel(userdata: ?*anyopaque, b: *Io.Batch) void {
}
}
fn batchCompleteBlockingWindows(
b: *Io.Batch,
operation_userdata: *WindowsBatchOperationUserdata,
result: Io.Operation.Result,
) void {
const erased_userdata = operation_userdata.toErased();
const pending: *Io.Operation.Storage.Pending = @fieldParentPtr("userdata", erased_userdata);
switch (pending.node.prev) {
.none => b.pending.head = pending.node.next,
else => |prev_index| b.storage[prev_index.toIndex()].pending.node.next = pending.node.next,
}
switch (pending.node.next) {
.none => b.pending.tail = pending.node.prev,
else => |next_index| b.storage[next_index.toIndex()].pending.node.prev = pending.node.prev,
}
const storage: *Io.Operation.Storage = @fieldParentPtr("pending", pending);
const index: Io.Operation.OptionalIndex = .fromIndex(storage - b.storage.ptr);
switch (b.completed.tail) {
.none => b.completed.head = index,
else => |tail_index| b.storage[tail_index.toIndex()].completion.node.next = index,
}
b.completed.tail = index;
storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
}
fn batchApc(
apc_context: ?*anyopaque,
iosb: *windows.IO_STATUS_BLOCK,
@ -3042,7 +3067,7 @@ fn batchApc(
.file_read_streaming => .{ .file_read_streaming = ntReadFileResult(iosb) },
.file_write_streaming => .{ .file_write_streaming = ntWriteFileResult(iosb) },
.device_io_control => .{ .device_io_control = iosb.* },
.net_receive => unreachable, // TODO
.net_receive => unreachable,
};
storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
},
@ -3050,7 +3075,7 @@ fn batchApc(
}
/// If `concurrency` is false, `error.ConcurrencyUnavailable` is unreachable.
fn batchDrainSubmittedWindows(b: *Io.Batch, concurrency: bool) (Io.ConcurrentError || Io.Cancelable)!void {
fn batchDrainSubmittedWindows(t: *Threaded, b: *Io.Batch, concurrency: bool) (Io.ConcurrentError || Io.Cancelable)!void {
var index = b.submitted.head;
errdefer b.submitted.head = index;
while (index != .none) {
@ -3244,10 +3269,12 @@ fn batchDrainSubmittedWindows(b: *Io.Batch, concurrency: bool) (Io.ConcurrentErr
};
}
},
.net_receive => |o| {
.net_receive => |*o| {
// TODO integrate with overlapped I/O or equivalent to avoid this error
if (concurrency) return error.ConcurrencyUnavailable;
_ = o;
@panic("TODO implement Batch NetReceive on Windows");
batchCompleteBlockingWindows(b, operation_userdata, .{
.net_receive = netReceiveWindows(t, o.socket_handle, o.message_buffer, o.data_buffer, o.flags),
});
},
}
index = submission.node.next;
@ -13321,13 +13348,89 @@ fn netReceiveWindows(
data_buffer: []u8,
flags: net.ReceiveFlags,
) struct { ?net.Socket.ReceiveError, usize } {
if (!have_networking) return .{ error.NetworkDown, 0 };
_ = t;
_ = socket_handle;
_ = message_buffer;
_ = data_buffer;
_ = flags;
@panic("TODO implement netReceiveWindows");
netReceiveWindowsOne(t, socket_handle, &message_buffer[0], data_buffer, flags) catch |err| return .{ err, 0 };
return .{ null, 1 };
}
fn netReceiveWindowsOne(
t: *Threaded,
socket_handle: net.Socket.Handle,
message: *net.IncomingMessage,
data_buffer: []u8,
flags: net.ReceiveFlags,
) net.Socket.ReceiveError!void {
comptime assert(have_networking);
var windows_flags: u32 =
@as(u32, if (flags.oob) ws2_32.MSG.OOB else 0) |
@as(u32, if (flags.peek) ws2_32.MSG.PEEK else 0) |
@as(u32, if (flags.trunc) ws2_32.MSG.TRUNC else 0);
var buf: ws2_32.WSABUF = .{
.buf = data_buffer.ptr,
.len = std.math.cast(u32, data_buffer.len) orelse return error.MessageOversize,
};
var n: u32 = undefined;
var syscall: Syscall = try .start();
var from_storage: WsaAddress = undefined;
var from_storage_len: i32 = @sizeOf(WsaAddress);
while (true) {
const rc = ws2_32.WSARecvFrom(
socket_handle,
(&buf)[0..1],
1,
&n,
&windows_flags,
&from_storage.any,
&from_storage_len,
null,
null,
);
if (rc != ws2_32.SOCKET_ERROR) {
syscall.finish();
message.* = .{
.from = addressFromWsa(&from_storage),
.data = data_buffer[0..n],
.control = &.{},
.flags = .{
.eor = false,
.trunc = (windows_flags & ws2_32.MSG.TRUNC) != 0,
.ctrunc = (windows_flags & ws2_32.MSG.CTRUNC) != 0,
.oob = false,
.errqueue = false,
},
};
return;
}
switch (ws2_32.WSAGetLastError()) {
.EINTR, .ECANCELLED, .E_CANCELLED, .OPERATION_ABORTED => {
try syscall.checkCancel();
continue;
},
.NOTINITIALISED => {
syscall.finish();
try initializeWsa(t);
syscall = try .start();
continue;
},
.ECONNRESET => return syscall.fail(error.ConnectionResetByPeer),
.ENETDOWN => return syscall.fail(error.NetworkDown),
.ENETRESET => return syscall.fail(error.ConnectionResetByPeer),
.ENOTCONN => return syscall.fail(error.SocketUnconnected),
.EFAULT => unreachable, // a pointer is not completely contained in user address space.
else => |err| {
syscall.finish();
switch (err) {
.EINVAL => return wsaErrorBug(err),
.EMSGSIZE => return wsaErrorBug(err),
else => return windows.unexpectedWSAError(err),
}
},
}
}
}
fn netReceiveUnavailable(

View file

@ -661,17 +661,20 @@ pub const IOC_OUT = 1073741824;
pub const IOC_IN = 2147483648;
pub const MSG = struct {
pub const TRUNC = 256;
pub const CTRUNC = 512;
pub const BCAST = 1024;
pub const MCAST = 2048;
pub const ERRQUEUE = 4096;
pub const OOB = 0x1;
pub const PEEK = 0x2;
pub const DONTROUTE = 0x4;
pub const WAITALL = 0x8;
pub const INTERRUPT = 0x10;
pub const PUSH_IMMEDIATE = 0x20;
pub const TRUNC = 0x0100;
pub const CTRUNC = 0x0200;
pub const BCAST = 0x0400;
pub const MCAST = 0x0800;
pub const PARTIAL = 0x8000;
pub const PEEK = 2;
pub const WAITALL = 8;
pub const PUSH_IMMEDIATE = 32;
pub const PARTIAL = 32768;
pub const INTERRUPT = 16;
pub const MAXIOVLEN = 16;
};