mirror of
https://codeberg.org/ziglang/zig.git
synced 2026-03-08 04:04:44 +01:00
std.Io: move netReceive to become an Operation
Notably, the timeout becomes provided by the general-purpose Batch API rather than being special-purposed.
This commit is contained in:
parent
4fa465fc8f
commit
a14d5e1418
3 changed files with 145 additions and 144 deletions
|
|
@ -242,7 +242,6 @@ pub const VTable = struct {
|
|||
netConnectUnix: *const fn (?*anyopaque, *const net.UnixAddress) net.UnixAddress.ConnectError!net.Socket.Handle,
|
||||
netSocketCreatePair: *const fn (?*anyopaque, net.Socket.CreatePairOptions) net.Socket.CreatePairError![2]net.Socket,
|
||||
netSend: *const fn (?*anyopaque, net.Socket.Handle, []net.OutgoingMessage, net.SendFlags) struct { ?net.Socket.SendError, usize },
|
||||
netReceive: *const fn (?*anyopaque, net.Socket.Handle, message_buffer: []net.IncomingMessage, data_buffer: []u8, net.ReceiveFlags, Timeout) struct { ?net.Socket.ReceiveTimeoutError, usize },
|
||||
/// Returns 0 on end of stream.
|
||||
netRead: *const fn (?*anyopaque, src: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize,
|
||||
netWrite: *const fn (?*anyopaque, dest: net.Socket.Handle, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize,
|
||||
|
|
@ -260,6 +259,7 @@ pub const Operation = union(enum) {
|
|||
/// On Windows this is NtDeviceIoControlFile. On POSIX this is ioctl. On
|
||||
/// other systems this tag is unreachable.
|
||||
device_io_control: DeviceIoControl,
|
||||
net_receive: NetReceive,
|
||||
|
||||
pub const Tag = @typeInfo(Operation).@"union".tag_type.?;
|
||||
|
||||
|
|
@ -349,6 +349,35 @@ pub const Operation = union(enum) {
|
|||
},
|
||||
};
|
||||
|
||||
pub const NetReceive = struct {
|
||||
socket_handle: net.Socket.Handle,
|
||||
message_buffer: []net.IncomingMessage,
|
||||
data_buffer: []u8,
|
||||
flags: net.ReceiveFlags,
|
||||
|
||||
pub const Error = error{
|
||||
/// Insufficient memory or other resource internal to the operating system.
|
||||
SystemResources,
|
||||
/// Per-process limit on the number of open file descriptors has been reached.
|
||||
ProcessFdQuotaExceeded,
|
||||
/// System-wide limit on the total number of open files has been reached.
|
||||
SystemFdQuotaExceeded,
|
||||
/// Local end has been shut down on a connection-oriented socket, or
|
||||
/// the socket was never connected.
|
||||
SocketUnconnected,
|
||||
/// The socket type requires that message be sent atomically, and the
|
||||
/// size of the message to be sent made this impossible. The message
|
||||
/// was not transmitted, or was partially transmitted.
|
||||
MessageOversize,
|
||||
/// Network connection was unexpectedly closed by sender.
|
||||
ConnectionResetByPeer,
|
||||
/// The local network interface used to reach the destination is offline.
|
||||
NetworkDown,
|
||||
} || Io.UnexpectedError;
|
||||
|
||||
pub const Result = struct { ?net.Socket.ReceiveError, usize };
|
||||
};
|
||||
|
||||
pub const Result = Result: {
|
||||
const operation_fields = @typeInfo(Operation).@"union".fields;
|
||||
var field_names: [operation_fields.len][]const u8 = undefined;
|
||||
|
|
@ -416,6 +445,19 @@ pub fn operate(io: Io, operation: Operation) Cancelable!Operation.Result {
|
|||
return io.vtable.operate(io.userdata, operation);
|
||||
}
|
||||
|
||||
pub const OperateTimeoutError = Cancelable || Timeout.Error || ConcurrentError;
|
||||
|
||||
/// Performs one `Operation` with provided `timeout`.
|
||||
pub fn operateTimeout(io: Io, operation: Operation, timeout: Timeout) OperateTimeoutError!Operation.Result {
|
||||
var storage: [1]Operation.Storage = undefined;
|
||||
var batch: Batch = .init(&storage);
|
||||
batch.addAt(0, operation);
|
||||
try batch.awaitConcurrent(io, timeout);
|
||||
const completion = batch.next().?;
|
||||
assert(completion.index == 0);
|
||||
return completion.result;
|
||||
}
|
||||
|
||||
/// Submits many operations together without waiting for all of them to
|
||||
/// complete.
|
||||
///
|
||||
|
|
@ -1715,7 +1757,7 @@ pub const Event = enum(u32) {
|
|||
}
|
||||
|
||||
/// Blocks until the logical boolean is `true`.
|
||||
pub fn wait(event: *Event, io: Io) Io.Cancelable!void {
|
||||
pub fn wait(event: *Event, io: Io) Cancelable!void {
|
||||
if (@cmpxchgStrong(Event, event, .unset, .waiting, .acquire, .acquire)) |prev| switch (prev) {
|
||||
.unset => unreachable,
|
||||
.waiting => {},
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ disable_memory_mapping: bool,
|
|||
|
||||
stderr_writer: File.Writer = .{
|
||||
.io = undefined,
|
||||
.interface = Io.File.Writer.initInterface(&.{}),
|
||||
.interface = File.Writer.initInterface(&.{}),
|
||||
.file = if (is_windows) undefined else .stderr(),
|
||||
.mode = .streaming,
|
||||
},
|
||||
|
|
@ -160,7 +160,7 @@ pub const Environ = struct {
|
|||
},
|
||||
};
|
||||
|
||||
pub fn scan(environ: *Environ, allocator: std.mem.Allocator) void {
|
||||
pub fn scan(environ: *Environ, allocator: Allocator) void {
|
||||
if (is_windows) {
|
||||
// This value expires with any call that modifies the environment,
|
||||
// which is outside of this Io implementation's control, so references
|
||||
|
|
@ -1900,10 +1900,6 @@ pub fn io(t: *Threaded) Io {
|
|||
.windows => netSendWindows,
|
||||
else => netSendPosix,
|
||||
},
|
||||
.netReceive = switch (native_os) {
|
||||
.windows => netReceiveWindows,
|
||||
else => netReceivePosix,
|
||||
},
|
||||
.netInterfaceNameResolve = netInterfaceNameResolve,
|
||||
.netInterfaceName = netInterfaceName,
|
||||
.netLookup = netLookup,
|
||||
|
|
@ -2035,7 +2031,6 @@ pub fn ioBasic(t: *Threaded) Io {
|
|||
.netWrite = netWriteUnavailable,
|
||||
.netWriteFile = netWriteFileUnavailable,
|
||||
.netSend = netSendUnavailable,
|
||||
.netReceive = netReceiveUnavailable,
|
||||
.netInterfaceNameResolve = netInterfaceNameResolveUnavailable,
|
||||
.netInterfaceName = netInterfaceNameUnavailable,
|
||||
.netLookup = netLookupUnavailable,
|
||||
|
|
@ -2636,6 +2631,15 @@ fn operate(userdata: ?*anyopaque, operation: Io.Operation) Io.Cancelable!Io.Oper
|
|||
},
|
||||
},
|
||||
.device_io_control => |*o| return .{ .device_io_control = try deviceIoControl(o) },
|
||||
.net_receive => |*o| return .{ .net_receive = o: {
|
||||
if (!have_networking) break :o .{ error.NetworkDown, 0 };
|
||||
if (is_windows) break :o netReceiveWindows(t, o.socket_handle, o.message_buffer, o.data_buffer, o.flags);
|
||||
netReceivePosix(o.socket_handle, &o.message_buffer[0], o.data_buffer, o.flags) catch |err| switch (err) {
|
||||
error.Canceled => |e| return e,
|
||||
else => |e| break :o .{ e, 0 },
|
||||
};
|
||||
break :o .{ null, 1 };
|
||||
} },
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -2660,11 +2664,19 @@ fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Cancelable!void {
|
|||
const submission = &b.storage[index.toIndex()].submission;
|
||||
switch (submission.operation) {
|
||||
.file_read_streaming => |o| {
|
||||
poll_buffer[poll_len] = .{ .fd = o.file.handle, .events = posix.POLL.IN, .revents = 0 };
|
||||
poll_buffer[poll_len] = .{
|
||||
.fd = o.file.handle,
|
||||
.events = posix.POLL.IN | posix.POLL.ERR,
|
||||
.revents = 0,
|
||||
};
|
||||
poll_len += 1;
|
||||
},
|
||||
.file_write_streaming => |o| {
|
||||
poll_buffer[poll_len] = .{ .fd = o.file.handle, .events = posix.POLL.OUT, .revents = 0 };
|
||||
poll_buffer[poll_len] = .{
|
||||
.fd = o.file.handle,
|
||||
.events = posix.POLL.OUT | posix.POLL.ERR,
|
||||
.revents = 0,
|
||||
};
|
||||
poll_len += 1;
|
||||
},
|
||||
.device_io_control => |o| {
|
||||
|
|
@ -2675,6 +2687,14 @@ fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Cancelable!void {
|
|||
};
|
||||
poll_len += 1;
|
||||
},
|
||||
.net_receive => |*o| {
|
||||
poll_buffer[poll_len] = .{
|
||||
.fd = o.socket_handle,
|
||||
.events = posix.POLL.IN | posix.POLL.ERR,
|
||||
.revents = 0,
|
||||
};
|
||||
poll_len += 1;
|
||||
},
|
||||
}
|
||||
index = submission.node.next;
|
||||
}
|
||||
|
|
@ -2794,12 +2814,12 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout
|
|||
if (!have_poll) return error.ConcurrencyUnavailable;
|
||||
var poll_buffer: [poll_buffer_len]posix.pollfd = undefined;
|
||||
var poll_storage: struct {
|
||||
gpa: std.mem.Allocator,
|
||||
gpa: Allocator,
|
||||
batch: *Io.Batch,
|
||||
slice: []posix.pollfd,
|
||||
len: u32,
|
||||
|
||||
fn add(storage: *@This(), file: Io.File, events: @FieldType(posix.pollfd, "events")) Io.ConcurrentError!void {
|
||||
fn add(storage: *@This(), fd: File.Handle, events: @FieldType(posix.pollfd, "events")) Io.ConcurrentError!void {
|
||||
const len = storage.len;
|
||||
if (len == poll_buffer_len) {
|
||||
const slice: []posix.pollfd = if (storage.batch.userdata) |batch_userdata|
|
||||
|
|
@ -2814,7 +2834,7 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout
|
|||
storage.slice = slice;
|
||||
}
|
||||
storage.slice[len] = .{
|
||||
.fd = file.handle,
|
||||
.fd = fd,
|
||||
.events = events,
|
||||
.revents = 0,
|
||||
};
|
||||
|
|
@ -2826,9 +2846,10 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout
|
|||
while (index != .none) {
|
||||
const submission = &b.storage[index.toIndex()].submission;
|
||||
switch (submission.operation) {
|
||||
.file_read_streaming => |o| try poll_storage.add(o.file, posix.POLL.IN),
|
||||
.file_write_streaming => |o| try poll_storage.add(o.file, posix.POLL.OUT),
|
||||
.device_io_control => |o| try poll_storage.add(o.file, posix.POLL.IN | posix.POLL.OUT | posix.POLL.ERR),
|
||||
.file_read_streaming => |o| try poll_storage.add(o.file.handle, posix.POLL.IN | posix.POLL.ERR),
|
||||
.file_write_streaming => |o| try poll_storage.add(o.file.handle, posix.POLL.OUT | posix.POLL.ERR),
|
||||
.device_io_control => |o| try poll_storage.add(o.file.handle, posix.POLL.IN | posix.POLL.OUT | posix.POLL.ERR),
|
||||
.net_receive => |o| try poll_storage.add(o.socket_handle, posix.POLL.IN | posix.POLL.ERR),
|
||||
}
|
||||
index = submission.node.next;
|
||||
}
|
||||
|
|
@ -2998,6 +3019,7 @@ fn batchApc(
|
|||
.file_read_streaming => .{ .file_read_streaming = ntReadFileResult(iosb) },
|
||||
.file_write_streaming => .{ .file_write_streaming = ntWriteFileResult(iosb) },
|
||||
.device_io_control => .{ .device_io_control = iosb.* },
|
||||
.net_receive => unreachable, // TODO
|
||||
};
|
||||
storage.* = .{ .completion = .{ .node = .{ .next = .none }, .result = result } };
|
||||
},
|
||||
|
|
@ -3199,6 +3221,11 @@ fn batchDrainSubmittedWindows(b: *Io.Batch, concurrency: bool) (Io.ConcurrentErr
|
|||
};
|
||||
}
|
||||
},
|
||||
.net_receive => |o| {
|
||||
if (concurrency) return error.ConcurrencyUnavailable;
|
||||
_ = o;
|
||||
@panic("TODO implement Batch NetReceive on Windows");
|
||||
},
|
||||
}
|
||||
index = submission.node.next;
|
||||
}
|
||||
|
|
@ -13188,70 +13215,42 @@ fn netSendMany(
|
|||
}
|
||||
|
||||
fn netReceivePosix(
|
||||
userdata: ?*anyopaque,
|
||||
handle: net.Socket.Handle,
|
||||
message_buffer: []net.IncomingMessage,
|
||||
socket_handle: net.Socket.Handle,
|
||||
message: *net.IncomingMessage,
|
||||
data_buffer: []u8,
|
||||
flags: net.ReceiveFlags,
|
||||
timeout: Io.Timeout,
|
||||
) struct { ?net.Socket.ReceiveTimeoutError, usize } {
|
||||
if (!have_networking) return .{ error.NetworkDown, 0 };
|
||||
const t: *Threaded = @ptrCast(@alignCast(userdata));
|
||||
const t_io = io(t);
|
||||
|
||||
) net.Socket.ReceiveError!void {
|
||||
// recvmmsg is useless, here's why:
|
||||
// * [timeout bug](https://bugzilla.kernel.org/show_bug.cgi?id=75371)
|
||||
// * it wants iovecs for each message but we have a better API: one data
|
||||
// buffer to handle all the messages. The better API cannot be lowered to
|
||||
// the split vectors though because reducing the buffer size might make
|
||||
// some messages unreceivable.
|
||||
|
||||
// So the strategy instead is to use non-blocking recvmsg calls, calling
|
||||
// poll() with timeout if the first one returns EAGAIN.
|
||||
const posix_flags: u32 =
|
||||
@as(u32, if (flags.oob) posix.MSG.OOB else 0) |
|
||||
@as(u32, if (flags.peek) posix.MSG.PEEK else 0) |
|
||||
@as(u32, if (flags.trunc) posix.MSG.TRUNC else 0) |
|
||||
posix.MSG.DONTWAIT | posix.MSG.NOSIGNAL;
|
||||
posix.MSG.NOSIGNAL;
|
||||
|
||||
var poll_fds: [1]posix.pollfd = .{
|
||||
.{
|
||||
.fd = handle,
|
||||
.events = posix.POLL.IN,
|
||||
.revents = undefined,
|
||||
},
|
||||
var storage: PosixAddress = undefined;
|
||||
var iov: posix.iovec = .{ .base = data_buffer.ptr, .len = data_buffer.len };
|
||||
var msg: posix.msghdr = .{
|
||||
.name = &storage.any,
|
||||
.namelen = @sizeOf(PosixAddress),
|
||||
.iov = (&iov)[0..1],
|
||||
.iovlen = 1,
|
||||
.control = message.control.ptr,
|
||||
.controllen = @intCast(message.control.len),
|
||||
.flags = undefined,
|
||||
};
|
||||
var message_i: usize = 0;
|
||||
var data_i: usize = 0;
|
||||
|
||||
const deadline = timeout.toTimestamp(t_io);
|
||||
|
||||
recv: while (true) {
|
||||
if (message_buffer.len - message_i == 0) return .{ null, message_i };
|
||||
const message = &message_buffer[message_i];
|
||||
const remaining_data_buffer = data_buffer[data_i..];
|
||||
var storage: PosixAddress = undefined;
|
||||
var iov: posix.iovec = .{ .base = remaining_data_buffer.ptr, .len = remaining_data_buffer.len };
|
||||
var msg: posix.msghdr = .{
|
||||
.name = &storage.any,
|
||||
.namelen = @sizeOf(PosixAddress),
|
||||
.iov = (&iov)[0..1],
|
||||
.iovlen = 1,
|
||||
.control = message.control.ptr,
|
||||
.controllen = @intCast(message.control.len),
|
||||
.flags = undefined,
|
||||
};
|
||||
|
||||
const recv_rc = rc: {
|
||||
const syscall = Syscall.start() catch |err| return .{ err, message_i };
|
||||
const rc = posix.system.recvmsg(handle, &msg, posix_flags);
|
||||
syscall.finish();
|
||||
break :rc rc;
|
||||
};
|
||||
switch (posix.errno(recv_rc)) {
|
||||
const syscall = try Syscall.start();
|
||||
while (true) {
|
||||
const rc = posix.system.recvmsg(socket_handle, &msg, posix_flags);
|
||||
switch (posix.errno(rc)) {
|
||||
.SUCCESS => {
|
||||
const data = remaining_data_buffer[0..@intCast(recv_rc)];
|
||||
data_i += data.len;
|
||||
syscall.finish();
|
||||
const data = data_buffer[0..@intCast(rc)];
|
||||
message.* = .{
|
||||
.from = addressFromPosix(&storage),
|
||||
.data = data,
|
||||
|
|
@ -13264,78 +13263,45 @@ fn netReceivePosix(
|
|||
.errqueue = if (@hasDecl(posix.MSG, "ERRQUEUE")) (msg.flags & posix.MSG.ERRQUEUE) != 0 else false,
|
||||
},
|
||||
};
|
||||
message_i += 1;
|
||||
return;
|
||||
},
|
||||
.INTR => {
|
||||
try syscall.checkCancel();
|
||||
continue;
|
||||
},
|
||||
.AGAIN => while (true) {
|
||||
if (message_i != 0) return .{ null, message_i };
|
||||
|
||||
const max_poll_ms = std.math.maxInt(u31);
|
||||
const timeout_ms: u31 = if (deadline) |d| t: {
|
||||
const duration = d.durationFromNow(t_io);
|
||||
if (duration.raw.nanoseconds <= 0) return .{ error.Timeout, message_i };
|
||||
break :t @intCast(@min(max_poll_ms, duration.raw.toMilliseconds()));
|
||||
} else max_poll_ms;
|
||||
|
||||
const syscall = Syscall.start() catch |err| return .{ err, message_i };
|
||||
const poll_rc = posix.system.poll(&poll_fds, poll_fds.len, timeout_ms);
|
||||
syscall.finish();
|
||||
|
||||
switch (posix.errno(poll_rc)) {
|
||||
.SUCCESS => {
|
||||
if (poll_rc == 0) {
|
||||
// Although spurious timeouts are OK, when no deadline
|
||||
// is passed we must not return `error.Timeout`.
|
||||
if (deadline == null) continue;
|
||||
return .{ error.Timeout, message_i };
|
||||
}
|
||||
continue :recv;
|
||||
},
|
||||
.INTR => continue,
|
||||
|
||||
.FAULT => |err| return .{ errnoBug(err), message_i },
|
||||
.INVAL => |err| return .{ errnoBug(err), message_i },
|
||||
.NOMEM => return .{ error.SystemResources, message_i },
|
||||
else => |err| return .{ posix.unexpectedErrno(err), message_i },
|
||||
}
|
||||
},
|
||||
.INTR => continue,
|
||||
|
||||
.BADF => |err| return .{ errnoBug(err), message_i },
|
||||
.NFILE => return .{ error.SystemFdQuotaExceeded, message_i },
|
||||
.MFILE => return .{ error.ProcessFdQuotaExceeded, message_i },
|
||||
.FAULT => |err| return .{ errnoBug(err), message_i },
|
||||
.INVAL => |err| return .{ errnoBug(err), message_i },
|
||||
.NOBUFS => return .{ error.SystemResources, message_i },
|
||||
.NOMEM => return .{ error.SystemResources, message_i },
|
||||
.NOTCONN => return .{ error.SocketUnconnected, message_i },
|
||||
.NOTSOCK => |err| return .{ errnoBug(err), message_i },
|
||||
.MSGSIZE => return .{ error.MessageOversize, message_i },
|
||||
.PIPE => return .{ error.SocketUnconnected, message_i },
|
||||
.OPNOTSUPP => |err| return .{ errnoBug(err), message_i },
|
||||
.CONNRESET => return .{ error.ConnectionResetByPeer, message_i },
|
||||
.NETDOWN => return .{ error.NetworkDown, message_i },
|
||||
else => |err| return .{ posix.unexpectedErrno(err), message_i },
|
||||
.NFILE => return syscall.fail(error.SystemFdQuotaExceeded),
|
||||
.MFILE => return syscall.fail(error.ProcessFdQuotaExceeded),
|
||||
.NOBUFS => return syscall.fail(error.SystemResources),
|
||||
.NOMEM => return syscall.fail(error.SystemResources),
|
||||
.NOTCONN => return syscall.fail(error.SocketUnconnected),
|
||||
.MSGSIZE => return syscall.fail(error.MessageOversize),
|
||||
.PIPE => return syscall.fail(error.SocketUnconnected),
|
||||
.CONNRESET => return syscall.fail(error.ConnectionResetByPeer),
|
||||
.NETDOWN => return syscall.fail(error.NetworkDown),
|
||||
.AGAIN => |err| return syscall.errnoBug(err),
|
||||
.BADF => |err| return syscall.errnoBug(err),
|
||||
.FAULT => |err| return syscall.errnoBug(err),
|
||||
.INVAL => |err| return syscall.errnoBug(err),
|
||||
.NOTSOCK => |err| return syscall.errnoBug(err),
|
||||
.OPNOTSUPP => |err| return syscall.errnoBug(err),
|
||||
else => |err| return syscall.unexpectedErrno(err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn netReceiveWindows(
|
||||
userdata: ?*anyopaque,
|
||||
handle: net.Socket.Handle,
|
||||
t: *Threaded,
|
||||
socket_handle: net.Socket.Handle,
|
||||
message_buffer: []net.IncomingMessage,
|
||||
data_buffer: []u8,
|
||||
flags: net.ReceiveFlags,
|
||||
timeout: Io.Timeout,
|
||||
) struct { ?net.Socket.ReceiveTimeoutError, usize } {
|
||||
) struct { ?net.Socket.ReceiveError, usize } {
|
||||
if (!have_networking) return .{ error.NetworkDown, 0 };
|
||||
const t: *Threaded = @ptrCast(@alignCast(userdata));
|
||||
_ = t;
|
||||
_ = handle;
|
||||
_ = socket_handle;
|
||||
_ = message_buffer;
|
||||
_ = data_buffer;
|
||||
_ = flags;
|
||||
_ = timeout;
|
||||
@panic("TODO implement netReceiveWindows");
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1109,25 +1109,7 @@ pub const Socket = struct {
|
|||
if (n != messages.len) return err.?;
|
||||
}
|
||||
|
||||
pub const ReceiveError = error{
|
||||
/// Insufficient memory or other resource internal to the operating system.
|
||||
SystemResources,
|
||||
/// Per-process limit on the number of open file descriptors has been reached.
|
||||
ProcessFdQuotaExceeded,
|
||||
/// System-wide limit on the total number of open files has been reached.
|
||||
SystemFdQuotaExceeded,
|
||||
/// Local end has been shut down on a connection-oriented socket, or
|
||||
/// the socket was never connected.
|
||||
SocketUnconnected,
|
||||
/// The socket type requires that message be sent atomically, and the
|
||||
/// size of the message to be sent made this impossible. The message
|
||||
/// was not transmitted, or was partially transmitted.
|
||||
MessageOversize,
|
||||
/// Network connection was unexpectedly closed by sender.
|
||||
ConnectionResetByPeer,
|
||||
/// The local network interface used to reach the destination is offline.
|
||||
NetworkDown,
|
||||
} || Io.UnexpectedError || Io.Cancelable;
|
||||
pub const ReceiveError = Io.Operation.NetReceive.Error || Io.Cancelable;
|
||||
|
||||
/// Waits for data. Connectionless.
|
||||
///
|
||||
|
|
@ -1145,7 +1127,7 @@ pub const Socket = struct {
|
|||
return message;
|
||||
}
|
||||
|
||||
pub const ReceiveTimeoutError = ReceiveError || Io.Timeout.Error;
|
||||
pub const ReceiveTimeoutError = ReceiveError || Io.Timeout.Error || Io.ConcurrentError;
|
||||
|
||||
/// Waits for data. Connectionless.
|
||||
///
|
||||
|
|
@ -1161,7 +1143,12 @@ pub const Socket = struct {
|
|||
timeout: Io.Timeout,
|
||||
) ReceiveTimeoutError!IncomingMessage {
|
||||
var message: IncomingMessage = .init;
|
||||
const maybe_err, const count = io.vtable.netReceive(io.userdata, s.handle, (&message)[0..1], buffer, .{}, timeout);
|
||||
const maybe_err, const count = (try io.operateTimeout(.{ .net_receive = .{
|
||||
.socket = s.handle,
|
||||
.message_buffer = (&message)[0..1],
|
||||
.data_buffer = buffer,
|
||||
.flags = .{},
|
||||
} }, timeout)).net_receive;
|
||||
if (maybe_err) |err| return err;
|
||||
assert(1 == count);
|
||||
return message;
|
||||
|
|
@ -1186,7 +1173,13 @@ pub const Socket = struct {
|
|||
flags: ReceiveFlags,
|
||||
timeout: Io.Timeout,
|
||||
) struct { ?ReceiveTimeoutError, usize } {
|
||||
return io.vtable.netReceive(io.userdata, s.handle, message_buffer, data_buffer, flags, timeout);
|
||||
const result = io.operateTimeout(.{ .net_receive = .{
|
||||
.socket_handle = s.handle,
|
||||
.message_buffer = message_buffer,
|
||||
.data_buffer = data_buffer,
|
||||
.flags = flags,
|
||||
} }, timeout) catch |err| return .{ err, 0 };
|
||||
return result.net_receive;
|
||||
}
|
||||
|
||||
pub const CreatePairError = error{
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue