diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 7b06ccfb6a..be54629b7d 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -242,7 +242,6 @@ pub const VTable = struct { netConnectUnix: *const fn (?*anyopaque, *const net.UnixAddress) net.UnixAddress.ConnectError!net.Socket.Handle, netSocketCreatePair: *const fn (?*anyopaque, net.Socket.CreatePairOptions) net.Socket.CreatePairError![2]net.Socket, netSend: *const fn (?*anyopaque, net.Socket.Handle, []net.OutgoingMessage, net.SendFlags) struct { ?net.Socket.SendError, usize }, - netReceive: *const fn (?*anyopaque, net.Socket.Handle, message_buffer: []net.IncomingMessage, data_buffer: []u8, net.ReceiveFlags, Timeout) struct { ?net.Socket.ReceiveTimeoutError, usize }, /// Returns 0 on end of stream. netRead: *const fn (?*anyopaque, src: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize, netWrite: *const fn (?*anyopaque, dest: net.Socket.Handle, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize, @@ -260,6 +259,7 @@ pub const Operation = union(enum) { /// On Windows this is NtDeviceIoControlFile. On POSIX this is ioctl. On /// other systems this tag is unreachable. device_io_control: DeviceIoControl, + net_receive: NetReceive, pub const Tag = @typeInfo(Operation).@"union".tag_type.?; @@ -349,6 +349,35 @@ pub const Operation = union(enum) { }, }; + pub const NetReceive = struct { + socket_handle: net.Socket.Handle, + message_buffer: []net.IncomingMessage, + data_buffer: []u8, + flags: net.ReceiveFlags, + + pub const Error = error{ + /// Insufficient memory or other resource internal to the operating system. + SystemResources, + /// Per-process limit on the number of open file descriptors has been reached. + ProcessFdQuotaExceeded, + /// System-wide limit on the total number of open files has been reached. + SystemFdQuotaExceeded, + /// Local end has been shut down on a connection-oriented socket, or + /// the socket was never connected. + SocketUnconnected, + /// The socket type requires that message be sent atomically, and the + /// size of the message to be sent made this impossible. The message + /// was not transmitted, or was partially transmitted. + MessageOversize, + /// Network connection was unexpectedly closed by sender. + ConnectionResetByPeer, + /// The local network interface used to reach the destination is offline. + NetworkDown, + } || Io.UnexpectedError; + + pub const Result = struct { ?net.Socket.ReceiveError, usize }; + }; + pub const Result = Result: { const operation_fields = @typeInfo(Operation).@"union".fields; var field_names: [operation_fields.len][]const u8 = undefined; @@ -416,6 +445,19 @@ pub fn operate(io: Io, operation: Operation) Cancelable!Operation.Result { return io.vtable.operate(io.userdata, operation); } +pub const OperateTimeoutError = Cancelable || Timeout.Error || ConcurrentError; + +/// Performs one `Operation` with provided `timeout`. +pub fn operateTimeout(io: Io, operation: Operation, timeout: Timeout) OperateTimeoutError!Operation.Result { + var storage: [1]Operation.Storage = undefined; + var batch: Batch = .init(&storage); + batch.addAt(0, operation); + try batch.awaitConcurrent(io, timeout); + const completion = batch.next().?; + assert(completion.index == 0); + return completion.result; +} + /// Submits many operations together without waiting for all of them to /// complete. /// @@ -1715,7 +1757,7 @@ pub const Event = enum(u32) { } /// Blocks until the logical boolean is `true`. - pub fn wait(event: *Event, io: Io) Io.Cancelable!void { + pub fn wait(event: *Event, io: Io) Cancelable!void { if (@cmpxchgStrong(Event, event, .unset, .waiting, .acquire, .acquire)) |prev| switch (prev) { .unset => unreachable, .waiting => {}, diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 83bda2c865..22bb842015 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -61,7 +61,7 @@ disable_memory_mapping: bool, stderr_writer: File.Writer = .{ .io = undefined, - .interface = Io.File.Writer.initInterface(&.{}), + .interface = File.Writer.initInterface(&.{}), .file = if (is_windows) undefined else .stderr(), .mode = .streaming, }, @@ -160,7 +160,7 @@ pub const Environ = struct { }, }; - pub fn scan(environ: *Environ, allocator: std.mem.Allocator) void { + pub fn scan(environ: *Environ, allocator: Allocator) void { if (is_windows) { // This value expires with any call that modifies the environment, // which is outside of this Io implementation's control, so references @@ -1900,10 +1900,6 @@ pub fn io(t: *Threaded) Io { .windows => netSendWindows, else => netSendPosix, }, - .netReceive = switch (native_os) { - .windows => netReceiveWindows, - else => netReceivePosix, - }, .netInterfaceNameResolve = netInterfaceNameResolve, .netInterfaceName = netInterfaceName, .netLookup = netLookup, @@ -2035,7 +2031,6 @@ pub fn ioBasic(t: *Threaded) Io { .netWrite = netWriteUnavailable, .netWriteFile = netWriteFileUnavailable, .netSend = netSendUnavailable, - .netReceive = netReceiveUnavailable, .netInterfaceNameResolve = netInterfaceNameResolveUnavailable, .netInterfaceName = netInterfaceNameUnavailable, .netLookup = netLookupUnavailable, @@ -2636,6 +2631,15 @@ fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Oper }, }, .device_io_control => |*o| return .{ .device_io_control = try deviceIoControl(o) }, + .net_receive => |*o| return .{ .net_receive = o: { + if (!have_networking) break :o .{ error.NetworkDown, 0 }; + if (is_windows) break :o netReceiveWindows(t, o.socket_handle, o.message_buffer, o.data_buffer, o.flags); + netReceivePosix(o.socket_handle, &o.message_buffer[0], o.data_buffer, o.flags) catch |err| switch (err) { + error.Canceled => |e| return e, + else => |e| break :o .{ e, 0 }, + }; + break :o .{ null, 1 }; + } }, } } @@ -2660,11 +2664,19 @@ fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Cancelable!void { const submission = &b.storage[index.toIndex()].submission; switch (submission.operation) { .file_read_streaming => |o| { - poll_buffer[poll_len] = .{ .fd = o.file.handle, .events = posix.POLL.IN, .revents = 0 }; + poll_buffer[poll_len] = .{ + .fd = o.file.handle, + .events = posix.POLL.IN | posix.POLL.ERR, + .revents = 0, + }; poll_len += 1; }, .file_write_streaming => |o| { - poll_buffer[poll_len] = .{ .fd = o.file.handle, .events = posix.POLL.OUT, .revents = 0 }; + poll_buffer[poll_len] = .{ + .fd = o.file.handle, + .events = posix.POLL.OUT | posix.POLL.ERR, + .revents = 0, + }; poll_len += 1; }, .device_io_control => |o| { @@ -2675,6 +2687,14 @@ fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Cancelable!void { }; poll_len += 1; }, + .net_receive => |*o| { + poll_buffer[poll_len] = .{ + .fd = o.socket_handle, + .events = posix.POLL.IN | posix.POLL.ERR, + .revents = 0, + }; + poll_len += 1; + }, } index = submission.node.next; } @@ -2794,12 +2814,12 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout if (!have_poll) return error.ConcurrencyUnavailable; var poll_buffer: [poll_buffer_len]posix.pollfd = undefined; var poll_storage: struct { - gpa: std.mem.Allocator, + gpa: Allocator, batch: *Io.Batch, slice: []posix.pollfd, len: u32, - fn add(storage: *@This(), file: Io.File, events: @FieldType(posix.pollfd, "events")) Io.ConcurrentError!void { + fn add(storage: *@This(), fd: File.Handle, events: @FieldType(posix.pollfd, "events")) Io.ConcurrentError!void { const len = storage.len; if (len == poll_buffer_len) { const slice: []posix.pollfd = if (storage.batch.userdata) |batch_userdata| @@ -2814,7 +2834,7 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout storage.slice = slice; } storage.slice[len] = .{ - .fd = file.handle, + .fd = fd, .events = events, .revents = 0, }; @@ -2826,9 +2846,10 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout while (index != .none) { const submission = &b.storage[index.toIndex()].submission; switch (submission.operation) { - .file_read_streaming => |o| try poll_storage.add(o.file, posix.POLL.IN), - .file_write_streaming => |o| try poll_storage.add(o.file, posix.POLL.OUT), - .device_io_control => |o| try poll_storage.add(o.file, posix.POLL.IN | posix.POLL.OUT | posix.POLL.ERR), + .file_read_streaming => |o| try poll_storage.add(o.file.handle, posix.POLL.IN | posix.POLL.ERR), + .file_write_streaming => |o| try poll_storage.add(o.file.handle, posix.POLL.OUT | posix.POLL.ERR), + .device_io_control => |o| try poll_storage.add(o.file.handle, posix.POLL.IN | posix.POLL.OUT | posix.POLL.ERR), + .net_receive => |o| try poll_storage.add(o.socket_handle, posix.POLL.IN | posix.POLL.ERR), } index = submission.node.next; } @@ -2998,6 +3019,7 @@ fn batchApc( .file_read_streaming => .{ .file_read_streaming = ntReadFileResult(iosb) }, .file_write_streaming => .{ .file_write_streaming = ntWriteFileResult(iosb) }, .device_io_control => .{ .device_io_control = iosb.* }, + .net_receive => unreachable, // TODO }; storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } }; }, @@ -3199,6 +3221,11 @@ fn batchDrainSubmittedWindows(b: *Io.Batch, concurrency: bool) (Io.ConcurrentErr }; } }, + .net_receive => |o| { + if (concurrency) return error.ConcurrencyUnavailable; + _ = o; + @panic("TODO implement Batch NetReceive on Windows"); + }, } index = submission.node.next; } @@ -13188,70 +13215,42 @@ fn netSendMany( } fn netReceivePosix( - userdata: ?*anyopaque, - handle: net.Socket.Handle, - message_buffer: []net.IncomingMessage, + socket_handle: net.Socket.Handle, + message: *net.IncomingMessage, data_buffer: []u8, flags: net.ReceiveFlags, - timeout: Io.Timeout, -) struct { ?net.Socket.ReceiveTimeoutError, usize } { - if (!have_networking) return .{ error.NetworkDown, 0 }; - const t: *Threaded = @ptrCast(@alignCast(userdata)); - const t_io = io(t); - +) net.Socket.ReceiveError!void { // recvmmsg is useless, here's why: // * [timeout bug](https://bugzilla.kernel.org/show_bug.cgi?id=75371) // * it wants iovecs for each message but we have a better API: one data // buffer to handle all the messages. The better API cannot be lowered to // the split vectors though because reducing the buffer size might make // some messages unreceivable. - - // So the strategy instead is to use non-blocking recvmsg calls, calling - // poll() with timeout if the first one returns EAGAIN. const posix_flags: u32 = @as(u32, if (flags.oob) posix.MSG.OOB else 0) | @as(u32, if (flags.peek) posix.MSG.PEEK else 0) | @as(u32, if (flags.trunc) posix.MSG.TRUNC else 0) | - posix.MSG.DONTWAIT | posix.MSG.NOSIGNAL; + posix.MSG.NOSIGNAL; - var poll_fds: [1]posix.pollfd = .{ - .{ - .fd = handle, - .events = posix.POLL.IN, - .revents = undefined, - }, + var storage: PosixAddress = undefined; + var iov: posix.iovec = .{ .base = data_buffer.ptr, .len = data_buffer.len }; + var msg: posix.msghdr = .{ + .name = &storage.any, + .namelen = @sizeOf(PosixAddress), + .iov = (&iov)[0..1], + .iovlen = 1, + .control = message.control.ptr, + .controllen = @intCast(message.control.len), + .flags = undefined, }; - var message_i: usize = 0; - var data_i: usize = 0; - const deadline = timeout.toTimestamp(t_io); - - recv: while (true) { - if (message_buffer.len - message_i == 0) return .{ null, message_i }; - const message = &message_buffer[message_i]; - const remaining_data_buffer = data_buffer[data_i..]; - var storage: PosixAddress = undefined; - var iov: posix.iovec = .{ .base = remaining_data_buffer.ptr, .len = remaining_data_buffer.len }; - var msg: posix.msghdr = .{ - .name = &storage.any, - .namelen = @sizeOf(PosixAddress), - .iov = (&iov)[0..1], - .iovlen = 1, - .control = message.control.ptr, - .controllen = @intCast(message.control.len), - .flags = undefined, - }; - - const recv_rc = rc: { - const syscall = Syscall.start() catch |err| return .{ err, message_i }; - const rc = posix.system.recvmsg(handle, &msg, posix_flags); - syscall.finish(); - break :rc rc; - }; - switch (posix.errno(recv_rc)) { + const syscall = try Syscall.start(); + while (true) { + const rc = posix.system.recvmsg(socket_handle, &msg, posix_flags); + switch (posix.errno(rc)) { .SUCCESS => { - const data = remaining_data_buffer[0..@intCast(recv_rc)]; - data_i += data.len; + syscall.finish(); + const data = data_buffer[0..@intCast(rc)]; message.* = .{ .from = addressFromPosix(&storage), .data = data, @@ -13264,78 +13263,45 @@ fn netReceivePosix( .errqueue = if (@hasDecl(posix.MSG, "ERRQUEUE")) (msg.flags & posix.MSG.ERRQUEUE) != 0 else false, }, }; - message_i += 1; + return; + }, + .INTR => { + try syscall.checkCancel(); continue; }, - .AGAIN => while (true) { - if (message_i != 0) return .{ null, message_i }; - - const max_poll_ms = std.math.maxInt(u31); - const timeout_ms: u31 = if (deadline) |d| t: { - const duration = d.durationFromNow(t_io); - if (duration.raw.nanoseconds <= 0) return .{ error.Timeout, message_i }; - break :t @intCast(@min(max_poll_ms, duration.raw.toMilliseconds())); - } else max_poll_ms; - - const syscall = Syscall.start() catch |err| return .{ err, message_i }; - const poll_rc = posix.system.poll(&poll_fds, poll_fds.len, timeout_ms); - syscall.finish(); - - switch (posix.errno(poll_rc)) { - .SUCCESS => { - if (poll_rc == 0) { - // Although spurious timeouts are OK, when no deadline - // is passed we must not return `error.Timeout`. - if (deadline == null) continue; - return .{ error.Timeout, message_i }; - } - continue :recv; - }, - .INTR => continue, - - .FAULT => |err| return .{ errnoBug(err), message_i }, - .INVAL => |err| return .{ errnoBug(err), message_i }, - .NOMEM => return .{ error.SystemResources, message_i }, - else => |err| return .{ posix.unexpectedErrno(err), message_i }, - } - }, - .INTR => continue, - - .BADF => |err| return .{ errnoBug(err), message_i }, - .NFILE => return .{ error.SystemFdQuotaExceeded, message_i }, - .MFILE => return .{ error.ProcessFdQuotaExceeded, message_i }, - .FAULT => |err| return .{ errnoBug(err), message_i }, - .INVAL => |err| return .{ errnoBug(err), message_i }, - .NOBUFS => return .{ error.SystemResources, message_i }, - .NOMEM => return .{ error.SystemResources, message_i }, - .NOTCONN => return .{ error.SocketUnconnected, message_i }, - .NOTSOCK => |err| return .{ errnoBug(err), message_i }, - .MSGSIZE => return .{ error.MessageOversize, message_i }, - .PIPE => return .{ error.SocketUnconnected, message_i }, - .OPNOTSUPP => |err| return .{ errnoBug(err), message_i }, - .CONNRESET => return .{ error.ConnectionResetByPeer, message_i }, - .NETDOWN => return .{ error.NetworkDown, message_i }, - else => |err| return .{ posix.unexpectedErrno(err), message_i }, + .NFILE => return syscall.fail(error.SystemFdQuotaExceeded), + .MFILE => return syscall.fail(error.ProcessFdQuotaExceeded), + .NOBUFS => return syscall.fail(error.SystemResources), + .NOMEM => return syscall.fail(error.SystemResources), + .NOTCONN => return syscall.fail(error.SocketUnconnected), + .MSGSIZE => return syscall.fail(error.MessageOversize), + .PIPE => return syscall.fail(error.SocketUnconnected), + .CONNRESET => return syscall.fail(error.ConnectionResetByPeer), + .NETDOWN => return syscall.fail(error.NetworkDown), + .AGAIN => |err| return syscall.errnoBug(err), + .BADF => |err| return syscall.errnoBug(err), + .FAULT => |err| return syscall.errnoBug(err), + .INVAL => |err| return syscall.errnoBug(err), + .NOTSOCK => |err| return syscall.errnoBug(err), + .OPNOTSUPP => |err| return syscall.errnoBug(err), + else => |err| return syscall.unexpectedErrno(err), } } } fn netReceiveWindows( - userdata: ?*anyopaque, - handle: net.Socket.Handle, + t: *Threaded, + socket_handle: net.Socket.Handle, message_buffer: []net.IncomingMessage, data_buffer: []u8, flags: net.ReceiveFlags, - timeout: Io.Timeout, -) struct { ?net.Socket.ReceiveTimeoutError, usize } { +) struct { ?net.Socket.ReceiveError, usize } { if (!have_networking) return .{ error.NetworkDown, 0 }; - const t: *Threaded = @ptrCast(@alignCast(userdata)); _ = t; - _ = handle; + _ = socket_handle; _ = message_buffer; _ = data_buffer; _ = flags; - _ = timeout; @panic("TODO implement netReceiveWindows"); } diff --git a/lib/std/Io/net.zig b/lib/std/Io/net.zig index 51a8d6647b..7a49996ff8 100644 --- a/lib/std/Io/net.zig +++ b/lib/std/Io/net.zig @@ -1109,25 +1109,7 @@ pub const Socket = struct { if (n != messages.len) return err.?; } - pub const ReceiveError = error{ - /// Insufficient memory or other resource internal to the operating system. - SystemResources, - /// Per-process limit on the number of open file descriptors has been reached. - ProcessFdQuotaExceeded, - /// System-wide limit on the total number of open files has been reached. - SystemFdQuotaExceeded, - /// Local end has been shut down on a connection-oriented socket, or - /// the socket was never connected. - SocketUnconnected, - /// The socket type requires that message be sent atomically, and the - /// size of the message to be sent made this impossible. The message - /// was not transmitted, or was partially transmitted. - MessageOversize, - /// Network connection was unexpectedly closed by sender. - ConnectionResetByPeer, - /// The local network interface used to reach the destination is offline. - NetworkDown, - } || Io.UnexpectedError || Io.Cancelable; + pub const ReceiveError = Io.Operation.NetReceive.Error || Io.Cancelable; /// Waits for data. Connectionless. /// @@ -1145,7 +1127,7 @@ pub const Socket = struct { return message; } - pub const ReceiveTimeoutError = ReceiveError || Io.Timeout.Error; + pub const ReceiveTimeoutError = ReceiveError || Io.Timeout.Error || Io.ConcurrentError; /// Waits for data. Connectionless. /// @@ -1161,7 +1143,12 @@ pub const Socket = struct { timeout: Io.Timeout, ) ReceiveTimeoutError!IncomingMessage { var message: IncomingMessage = .init; - const maybe_err, const count = io.vtable.netReceive(io.userdata, s.handle, (&message)[0..1], buffer, .{}, timeout); + const maybe_err, const count = (try io.operateTimeout(.{ .net_receive = .{ + .socket = s.handle, + .message_buffer = (&message)[0..1], + .data_buffer = buffer, + .flags = .{}, + } }, timeout)).net_receive; if (maybe_err) |err| return err; assert(1 == count); return message; @@ -1186,7 +1173,13 @@ pub const Socket = struct { flags: ReceiveFlags, timeout: Io.Timeout, ) struct { ?ReceiveTimeoutError, usize } { - return io.vtable.netReceive(io.userdata, s.handle, message_buffer, data_buffer, flags, timeout); + const result = io.operateTimeout(.{ .net_receive = .{ + .socket_handle = s.handle, + .message_buffer = message_buffer, + .data_buffer = data_buffer, + .flags = flags, + } }, timeout) catch |err| return .{ err, 0 }; + return result.net_receive; } pub const CreatePairError = error{