mirror of
https://codeberg.org/ziglang/zig.git
synced 2026-03-08 04:24:33 +01:00
While the general guidance remains useful, it is not the case that error.Canceled will always pass across the Group task function boundary. Remove the too-aggressive assertions and add unit test coverage. Closes #30096 Closes #31340 Closes #31358
1524 lines
52 KiB
Zig
1524 lines
52 KiB
Zig
const Kqueue = @This();
|
|
const builtin = @import("builtin");
|
|
|
|
const std = @import("../std.zig");
|
|
const Io = std.Io;
|
|
const Dir = std.Io.Dir;
|
|
const File = std.Io.File;
|
|
const net = std.Io.net;
|
|
const assert = std.debug.assert;
|
|
const Allocator = std.mem.Allocator;
|
|
const Alignment = std.mem.Alignment;
|
|
const IpAddress = std.Io.net.IpAddress;
|
|
const errnoBug = std.Io.Threaded.errnoBug;
|
|
const closeFd = std.Io.Threaded.closeFd;
|
|
const posix = std.posix;
|
|
|
|
/// Must be a thread-safe allocator.
|
|
gpa: Allocator,
|
|
mutex: Io.Mutex,
|
|
main_fiber_buffer: [@sizeOf(Fiber) + Fiber.max_result_size]u8 align(@alignOf(Fiber)),
|
|
threads: Thread.List,
|
|
|
|
/// Empirically saw >128KB being used by the self-hosted backend to panic.
|
|
const idle_stack_size = 256 * 1024;
|
|
|
|
const max_idle_search = 4;
|
|
const max_steal_ready_search = 4;
|
|
const max_iovecs_len = 8;
|
|
|
|
const changes_buffer_len = 64;
|
|
|
|
const Thread = struct {
|
|
thread: std.Thread,
|
|
idle_context: Io.fiber.Context,
|
|
current_context: *Io.fiber.Context,
|
|
ready_queue: ?*Fiber,
|
|
kq_fd: posix.fd_t,
|
|
idle_search_index: u32,
|
|
steal_ready_search_index: u32,
|
|
/// For ensuring multiple fibers waiting on the same file descriptor and
|
|
/// filter use the same kevent.
|
|
wait_queues: std.AutoArrayHashMapUnmanaged(WaitQueueKey, *Fiber),
|
|
|
|
const WaitQueueKey = struct {
|
|
ident: usize,
|
|
filter: i32,
|
|
};
|
|
|
|
const canceling: ?*Thread = @ptrFromInt(@alignOf(Thread));
|
|
|
|
threadlocal var self: *Thread = undefined;
|
|
|
|
fn current() *Thread {
|
|
return self;
|
|
}
|
|
|
|
fn currentFiber(thread: *Thread) *Fiber {
|
|
return @fieldParentPtr("context", thread.current_context);
|
|
}
|
|
|
|
const List = struct {
|
|
allocated: []Thread,
|
|
reserved: u32,
|
|
active: u32,
|
|
};
|
|
|
|
fn deinit(thread: *Thread, gpa: Allocator) void {
|
|
closeFd(thread.kq_fd);
|
|
assert(thread.wait_queues.count() == 0);
|
|
thread.wait_queues.deinit(gpa);
|
|
thread.* = undefined;
|
|
}
|
|
};
|
|
|
|
const Fiber = struct {
|
|
required_align: void align(4),
|
|
context: Io.fiber.Context,
|
|
awaiter: ?*Fiber,
|
|
queue_next: ?*Fiber,
|
|
cancel_thread: ?*Thread,
|
|
awaiting_completions: std.StaticBitSet(3),
|
|
|
|
const finished: ?*Fiber = @ptrFromInt(@alignOf(Thread));
|
|
|
|
const max_result_align: Alignment = .@"16";
|
|
const max_result_size = max_result_align.forward(64);
|
|
/// This includes any stack realignments that need to happen, and also the
|
|
/// initial frame return address slot and argument frame, depending on target.
|
|
const min_stack_size = 4 * 1024 * 1024;
|
|
const max_context_align: Alignment = .@"16";
|
|
const max_context_size = max_context_align.forward(1024);
|
|
const max_closure_size: usize = @sizeOf(AsyncClosure);
|
|
const max_closure_align: Alignment = .of(AsyncClosure);
|
|
const allocation_size = std.mem.alignForward(
|
|
usize,
|
|
max_closure_align.max(max_context_align).forward(
|
|
max_result_align.forward(@sizeOf(Fiber)) + max_result_size + min_stack_size,
|
|
) + max_closure_size + max_context_size,
|
|
std.heap.page_size_max,
|
|
);
|
|
|
|
fn allocate(k: *Kqueue) error{OutOfMemory}!*Fiber {
|
|
return @ptrCast(try k.gpa.alignedAlloc(u8, .of(Fiber), allocation_size));
|
|
}
|
|
|
|
fn allocatedSlice(f: *Fiber) []align(@alignOf(Fiber)) u8 {
|
|
return @as([*]align(@alignOf(Fiber)) u8, @ptrCast(f))[0..allocation_size];
|
|
}
|
|
|
|
fn allocatedEnd(f: *Fiber) [*]u8 {
|
|
const allocated_slice = f.allocatedSlice();
|
|
return allocated_slice[allocated_slice.len..].ptr;
|
|
}
|
|
|
|
fn resultPointer(f: *Fiber, comptime Result: type) *Result {
|
|
return @ptrCast(@alignCast(f.resultBytes(.of(Result))));
|
|
}
|
|
|
|
fn resultBytes(f: *Fiber, alignment: Alignment) [*]u8 {
|
|
return @ptrFromInt(alignment.forward(@intFromPtr(f) + @sizeOf(Fiber)));
|
|
}
|
|
|
|
fn enterCancelRegion(fiber: *Fiber, thread: *Thread) error{Canceled}!void {
|
|
if (@cmpxchgStrong(
|
|
?*Thread,
|
|
&fiber.cancel_thread,
|
|
null,
|
|
thread,
|
|
.acq_rel,
|
|
.acquire,
|
|
)) |cancel_thread| {
|
|
assert(cancel_thread == Thread.canceling);
|
|
return error.Canceled;
|
|
}
|
|
}
|
|
|
|
fn exitCancelRegion(fiber: *Fiber, thread: *Thread) void {
|
|
if (@cmpxchgStrong(
|
|
?*Thread,
|
|
&fiber.cancel_thread,
|
|
thread,
|
|
null,
|
|
.acq_rel,
|
|
.acquire,
|
|
)) |cancel_thread| assert(cancel_thread == Thread.canceling);
|
|
}
|
|
|
|
const Queue = struct { head: *Fiber, tail: *Fiber };
|
|
};
|
|
|
|
fn recycle(k: *Kqueue, fiber: *Fiber) void {
|
|
std.log.debug("recyling {*}", .{fiber});
|
|
assert(fiber.queue_next == null);
|
|
k.gpa.free(fiber.allocatedSlice());
|
|
}
|
|
|
|
pub const InitOptions = struct {
|
|
n_threads: ?usize = null,
|
|
};
|
|
|
|
pub const InitError = Allocator.Error || CreateFileDescriptorError;
|
|
|
|
pub fn init(k: *Kqueue, gpa: Allocator, options: InitOptions) !void {
|
|
assert(options.n_threads != 0);
|
|
|
|
const n_threads = @max(1, options.n_threads orelse std.Thread.getCpuCount() catch 1);
|
|
const threads_size = n_threads * @sizeOf(Thread);
|
|
const idle_stack_end_offset = std.mem.alignForward(usize, threads_size + idle_stack_size, std.heap.page_size_max);
|
|
const allocated_slice = try gpa.alignedAlloc(u8, .of(Thread), idle_stack_end_offset);
|
|
errdefer gpa.free(allocated_slice);
|
|
k.* = .{
|
|
.gpa = gpa,
|
|
.mutex = .init,
|
|
.main_fiber_buffer = undefined,
|
|
.threads = .{
|
|
.allocated = @ptrCast(allocated_slice[0..threads_size]),
|
|
.reserved = 1,
|
|
.active = 1,
|
|
},
|
|
};
|
|
const main_fiber: *Fiber = @ptrCast(&k.main_fiber_buffer);
|
|
main_fiber.* = .{
|
|
.required_align = {},
|
|
.context = undefined,
|
|
.awaiter = null,
|
|
.queue_next = null,
|
|
.cancel_thread = null,
|
|
.awaiting_completions = .initEmpty(),
|
|
};
|
|
const main_thread = &k.threads.allocated[0];
|
|
Thread.self = main_thread;
|
|
const idle_stack_end: [*]align(16) usize = @ptrCast(@alignCast(allocated_slice[idle_stack_end_offset..].ptr));
|
|
(idle_stack_end - 1)[0..1].* = .{@intFromPtr(k)};
|
|
main_thread.* = .{
|
|
.thread = undefined,
|
|
.idle_context = switch (builtin.cpu.arch) {
|
|
.aarch64 => .{
|
|
.sp = @intFromPtr(idle_stack_end),
|
|
.fp = 0,
|
|
.pc = @intFromPtr(&mainIdleEntry),
|
|
},
|
|
.x86_64 => .{
|
|
.rsp = @intFromPtr(idle_stack_end - 1),
|
|
.rbp = 0,
|
|
.rip = @intFromPtr(&mainIdleEntry),
|
|
},
|
|
else => @compileError("unimplemented architecture"),
|
|
},
|
|
.current_context = &main_fiber.context,
|
|
.ready_queue = null,
|
|
.kq_fd = try createFileDescriptor(),
|
|
.idle_search_index = 1,
|
|
.steal_ready_search_index = 1,
|
|
.wait_queues = .empty,
|
|
};
|
|
errdefer closeFd(main_thread.kq_fd);
|
|
std.log.debug("created main idle {*}", .{&main_thread.idle_context});
|
|
std.log.debug("created main {*}", .{main_fiber});
|
|
}
|
|
|
|
pub fn deinit(k: *Kqueue) void {
|
|
const active_threads = @atomicLoad(u32, &k.threads.active, .acquire);
|
|
for (k.threads.allocated[0..active_threads]) |*thread| {
|
|
const ready_fiber = @atomicLoad(?*Fiber, &thread.ready_queue, .monotonic);
|
|
assert(ready_fiber == null or ready_fiber == Fiber.finished); // pending async
|
|
}
|
|
k.yield(null, .exit);
|
|
const main_thread = &k.threads.allocated[0];
|
|
const gpa = k.gpa;
|
|
main_thread.deinit(gpa);
|
|
const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @ptrCast(@alignCast(k.threads.allocated.ptr));
|
|
const idle_stack_end_offset = std.mem.alignForward(usize, k.threads.allocated.len * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max);
|
|
for (k.threads.allocated[1..active_threads]) |*thread| thread.thread.join();
|
|
gpa.free(allocated_ptr[0..idle_stack_end_offset]);
|
|
k.* = undefined;
|
|
}
|
|
|
|
pub const CreateFileDescriptorError = error{
|
|
/// The per-process limit on the number of open file descriptors has been reached.
|
|
ProcessFdQuotaExceeded,
|
|
/// The system-wide limit on the total number of open files has been reached.
|
|
SystemFdQuotaExceeded,
|
|
} || Io.UnexpectedError;
|
|
|
|
pub fn createFileDescriptor() CreateFileDescriptorError!posix.fd_t {
|
|
const rc = posix.system.kqueue();
|
|
switch (posix.errno(rc)) {
|
|
.SUCCESS => return @intCast(rc),
|
|
.MFILE => return error.ProcessFdQuotaExceeded,
|
|
.NFILE => return error.SystemFdQuotaExceeded,
|
|
else => |err| return posix.unexpectedErrno(err),
|
|
}
|
|
}
|
|
|
|
fn findReadyFiber(k: *Kqueue, thread: *Thread) ?*Fiber {
|
|
if (@atomicRmw(?*Fiber, &thread.ready_queue, .Xchg, Fiber.finished, .acquire)) |ready_fiber| {
|
|
@atomicStore(?*Fiber, &thread.ready_queue, ready_fiber.queue_next, .release);
|
|
ready_fiber.queue_next = null;
|
|
return ready_fiber;
|
|
}
|
|
const active_threads = @atomicLoad(u32, &k.threads.active, .acquire);
|
|
for (0..@min(max_steal_ready_search, active_threads)) |_| {
|
|
defer thread.steal_ready_search_index += 1;
|
|
if (thread.steal_ready_search_index == active_threads) thread.steal_ready_search_index = 0;
|
|
const steal_ready_search_thread = &k.threads.allocated[0..active_threads][thread.steal_ready_search_index];
|
|
if (steal_ready_search_thread == thread) continue;
|
|
const ready_fiber = @atomicLoad(?*Fiber, &steal_ready_search_thread.ready_queue, .acquire) orelse continue;
|
|
if (ready_fiber == Fiber.finished) continue;
|
|
if (@cmpxchgWeak(
|
|
?*Fiber,
|
|
&steal_ready_search_thread.ready_queue,
|
|
ready_fiber,
|
|
null,
|
|
.acquire,
|
|
.monotonic,
|
|
)) |_| continue;
|
|
@atomicStore(?*Fiber, &thread.ready_queue, ready_fiber.queue_next, .release);
|
|
ready_fiber.queue_next = null;
|
|
return ready_fiber;
|
|
}
|
|
// couldn't find anything to do, so we are now open for business
|
|
@atomicStore(?*Fiber, &thread.ready_queue, null, .monotonic);
|
|
return null;
|
|
}
|
|
|
|
fn yield(k: *Kqueue, maybe_ready_fiber: ?*Fiber, pending_task: SwitchMessage.PendingTask) void {
|
|
const thread: *Thread = .current();
|
|
const ready_context = if (maybe_ready_fiber orelse k.findReadyFiber(thread)) |ready_fiber|
|
|
&ready_fiber.context
|
|
else
|
|
&thread.idle_context;
|
|
const message: SwitchMessage = .{
|
|
.contexts = .{
|
|
.old = thread.current_context,
|
|
.new = ready_context,
|
|
},
|
|
.pending_task = pending_task,
|
|
};
|
|
std.log.debug("switching from {*} to {*}", .{ message.contexts.old, message.contexts.new });
|
|
contextSwitch(&message).handle(k);
|
|
}
|
|
|
|
fn schedule(k: *Kqueue, thread: *Thread, ready_queue: Fiber.Queue) void {
|
|
{
|
|
var fiber = ready_queue.head;
|
|
while (true) {
|
|
std.log.debug("scheduling {*}", .{fiber});
|
|
fiber = fiber.queue_next orelse break;
|
|
}
|
|
assert(fiber == ready_queue.tail);
|
|
}
|
|
// shared fields of previous `Thread` must be initialized before later ones are marked as active
|
|
const new_thread_index = @atomicLoad(u32, &k.threads.active, .acquire);
|
|
for (0..@min(max_idle_search, new_thread_index)) |_| {
|
|
defer thread.idle_search_index += 1;
|
|
if (thread.idle_search_index == new_thread_index) thread.idle_search_index = 0;
|
|
const idle_search_thread = &k.threads.allocated[0..new_thread_index][thread.idle_search_index];
|
|
if (idle_search_thread == thread) continue;
|
|
if (@cmpxchgWeak(
|
|
?*Fiber,
|
|
&idle_search_thread.ready_queue,
|
|
null,
|
|
ready_queue.head,
|
|
.release,
|
|
.monotonic,
|
|
)) |_| continue;
|
|
const changes = [_]posix.Kevent{
|
|
.{
|
|
.ident = 0,
|
|
.filter = std.c.EVFILT.USER,
|
|
.flags = std.c.EV.ADD | std.c.EV.ONESHOT,
|
|
.fflags = std.c.NOTE.TRIGGER,
|
|
.data = 0,
|
|
.udata = @intFromEnum(Completion.UserData.wakeup),
|
|
},
|
|
};
|
|
// If an error occurs it only pessimises scheduling.
|
|
_ = kevent(idle_search_thread.kq_fd, &changes, &.{}, null) catch |err| {
|
|
// TODO handle EINTR for cancellation purposes
|
|
@panic(@errorName(err)); // TODO
|
|
};
|
|
return;
|
|
}
|
|
spawn_thread: {
|
|
// previous failed reservations must have completed before retrying
|
|
if (new_thread_index == k.threads.allocated.len or @cmpxchgWeak(
|
|
u32,
|
|
&k.threads.reserved,
|
|
new_thread_index,
|
|
new_thread_index + 1,
|
|
.acquire,
|
|
.monotonic,
|
|
) != null) break :spawn_thread;
|
|
const new_thread = &k.threads.allocated[new_thread_index];
|
|
const next_thread_index = new_thread_index + 1;
|
|
new_thread.* = .{
|
|
.thread = undefined,
|
|
.idle_context = undefined,
|
|
.current_context = &new_thread.idle_context,
|
|
.ready_queue = ready_queue.head,
|
|
.kq_fd = createFileDescriptor() catch |err| {
|
|
@atomicStore(u32, &k.threads.reserved, new_thread_index, .release);
|
|
// no more access to `thread` after giving up reservation
|
|
std.log.warn("unable to create worker thread due to kqueue init failure: {t}", .{err});
|
|
break :spawn_thread;
|
|
},
|
|
.idle_search_index = 0,
|
|
.steal_ready_search_index = 0,
|
|
.wait_queues = .empty,
|
|
};
|
|
new_thread.thread = std.Thread.spawn(.{
|
|
.stack_size = idle_stack_size,
|
|
.allocator = k.gpa,
|
|
}, threadEntry, .{ k, new_thread_index }) catch |err| {
|
|
closeFd(new_thread.kq_fd);
|
|
@atomicStore(u32, &k.threads.reserved, new_thread_index, .release);
|
|
// no more access to `thread` after giving up reservation
|
|
std.log.warn("unable to create worker thread due spawn failure: {s}", .{@errorName(err)});
|
|
break :spawn_thread;
|
|
};
|
|
// shared fields of `Thread` must be initialized before being marked active
|
|
@atomicStore(u32, &k.threads.active, next_thread_index, .release);
|
|
return;
|
|
}
|
|
// nobody wanted it, so just queue it on ourselves
|
|
while (@cmpxchgWeak(
|
|
?*Fiber,
|
|
&thread.ready_queue,
|
|
ready_queue.tail.queue_next,
|
|
ready_queue.head,
|
|
.acq_rel,
|
|
.acquire,
|
|
)) |old_head| ready_queue.tail.queue_next = old_head;
|
|
}
|
|
|
|
fn mainIdle(k: *Kqueue, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Io.fiber.Context)))) noreturn {
|
|
message.handle(k);
|
|
k.idle(&k.threads.allocated[0]);
|
|
k.yield(@ptrCast(&k.main_fiber_buffer), .nothing);
|
|
unreachable; // switched to dead fiber
|
|
}
|
|
|
|
fn threadEntry(k: *Kqueue, index: u32) void {
|
|
const thread: *Thread = &k.threads.allocated[index];
|
|
Thread.self = thread;
|
|
std.log.debug("created thread idle {*}", .{&thread.idle_context});
|
|
k.idle(thread);
|
|
thread.deinit(k.gpa);
|
|
}
|
|
|
|
const Completion = struct {
|
|
const UserData = enum(usize) {
|
|
unused,
|
|
wakeup,
|
|
cleanup,
|
|
exit,
|
|
/// *Fiber
|
|
_,
|
|
};
|
|
/// Corresponds to Kevent field.
|
|
flags: u16,
|
|
/// Corresponds to Kevent field.
|
|
fflags: u32,
|
|
/// Corresponds to Kevent field.
|
|
data: isize,
|
|
};
|
|
|
|
fn idle(k: *Kqueue, thread: *Thread) void {
|
|
var events_buffer: [changes_buffer_len]posix.Kevent = undefined;
|
|
var maybe_ready_fiber: ?*Fiber = null;
|
|
while (true) {
|
|
while (maybe_ready_fiber orelse k.findReadyFiber(thread)) |ready_fiber| {
|
|
k.yield(ready_fiber, .nothing);
|
|
maybe_ready_fiber = null;
|
|
}
|
|
const n = kevent(thread.kq_fd, &.{}, &events_buffer, null) catch |err| {
|
|
// TODO handle EINTR for cancellation purposes
|
|
@panic(@errorName(err)); // TODO
|
|
};
|
|
var maybe_ready_queue: ?Fiber.Queue = null;
|
|
for (events_buffer[0..n]) |event| switch (@as(Completion.UserData, @enumFromInt(event.udata))) {
|
|
.unused => unreachable, // bad submission queued?
|
|
.wakeup => {},
|
|
.cleanup => @panic("failed to notify other threads that we are exiting"),
|
|
.exit => {
|
|
assert(maybe_ready_fiber == null and maybe_ready_queue == null); // pending async
|
|
return;
|
|
},
|
|
_ => {
|
|
const event_head_fiber: *Fiber = @ptrFromInt(event.udata);
|
|
const event_tail_fiber = thread.wait_queues.fetchSwapRemove(.{
|
|
.ident = event.ident,
|
|
.filter = event.filter,
|
|
}).?.value;
|
|
assert(event_tail_fiber.queue_next == null);
|
|
|
|
// TODO reevaluate this logic
|
|
event_head_fiber.resultPointer(Completion).* = .{
|
|
.flags = event.flags,
|
|
.fflags = event.fflags,
|
|
.data = event.data,
|
|
};
|
|
|
|
queue_ready: {
|
|
const head: *Fiber = if (maybe_ready_fiber == null) f: {
|
|
maybe_ready_fiber = event_head_fiber;
|
|
const next = event_head_fiber.queue_next orelse break :queue_ready;
|
|
event_head_fiber.queue_next = null;
|
|
break :f next;
|
|
} else event_head_fiber;
|
|
|
|
if (maybe_ready_queue) |*ready_queue| {
|
|
ready_queue.tail.queue_next = head;
|
|
ready_queue.tail = event_tail_fiber;
|
|
} else {
|
|
maybe_ready_queue = .{ .head = head, .tail = event_tail_fiber };
|
|
}
|
|
}
|
|
},
|
|
};
|
|
if (maybe_ready_queue) |ready_queue| k.schedule(thread, ready_queue);
|
|
}
|
|
}
|
|
|
|
const SwitchMessage = struct {
|
|
contexts: Io.fiber.Switch,
|
|
pending_task: PendingTask,
|
|
|
|
const PendingTask = union(enum) {
|
|
nothing,
|
|
reschedule,
|
|
recycle: *Fiber,
|
|
register_awaiter: *?*Fiber,
|
|
exit,
|
|
};
|
|
|
|
fn handle(message: *const SwitchMessage, k: *Kqueue) void {
|
|
const thread: *Thread = .current();
|
|
thread.current_context = message.contexts.new;
|
|
switch (message.pending_task) {
|
|
.nothing => {},
|
|
.reschedule => if (message.contexts.old != &thread.idle_context) {
|
|
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.old));
|
|
assert(prev_fiber.queue_next == null);
|
|
k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
|
|
},
|
|
.recycle => |fiber| {
|
|
k.recycle(fiber);
|
|
},
|
|
.register_awaiter => |awaiter| {
|
|
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.old));
|
|
assert(prev_fiber.queue_next == null);
|
|
if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished)
|
|
k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
|
|
},
|
|
.exit => for (k.threads.allocated[0..@atomicLoad(u32, &k.threads.active, .acquire)]) |*each_thread| {
|
|
const changes = [_]posix.Kevent{
|
|
.{
|
|
.ident = 0,
|
|
.filter = std.c.EVFILT.USER,
|
|
.flags = std.c.EV.ADD | std.c.EV.ONESHOT,
|
|
.fflags = std.c.NOTE.TRIGGER,
|
|
.data = 0,
|
|
.udata = @intFromEnum(Completion.UserData.exit),
|
|
},
|
|
};
|
|
_ = kevent(each_thread.kq_fd, &changes, &.{}, null) catch |err| {
|
|
// TODO handle EINTR for cancellation purposes
|
|
@panic(@errorName(err)); // TODO
|
|
};
|
|
},
|
|
}
|
|
}
|
|
};
|
|
|
|
inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage {
|
|
return @fieldParentPtr("contexts", Io.fiber.contextSwitch(&message.contexts));
|
|
}
|
|
|
|
fn mainIdleEntry() callconv(.naked) void {
|
|
switch (builtin.cpu.arch) {
|
|
.x86_64 => asm volatile (
|
|
\\ movq (%%rsp), %%rdi
|
|
\\ jmp %[mainIdle:P]
|
|
:
|
|
: [mainIdle] "X" (&mainIdle),
|
|
),
|
|
.aarch64 => asm volatile (
|
|
\\ ldr x0, [sp, #-8]
|
|
\\ b %[mainIdle]
|
|
:
|
|
: [mainIdle] "X" (&mainIdle),
|
|
),
|
|
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
|
|
}
|
|
}
|
|
|
|
fn fiberEntry() callconv(.naked) void {
|
|
switch (builtin.cpu.arch) {
|
|
.x86_64 => asm volatile (
|
|
\\ leaq 8(%%rsp), %%rdi
|
|
\\ jmp %[AsyncClosure_call:P]
|
|
:
|
|
: [AsyncClosure_call] "X" (&AsyncClosure.call),
|
|
),
|
|
.aarch64 => asm volatile (
|
|
\\ mov x0, sp
|
|
\\ b %[AsyncClosure_call]
|
|
:
|
|
: [AsyncClosure_call] "X" (&AsyncClosure.call),
|
|
),
|
|
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
|
|
}
|
|
}
|
|
|
|
const AsyncClosure = struct {
|
|
kqueue: *Kqueue,
|
|
fiber: *Fiber,
|
|
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
|
|
result_align: Alignment,
|
|
already_awaited: bool,
|
|
|
|
fn contextPointer(closure: *AsyncClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 {
|
|
return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(AsyncClosure));
|
|
}
|
|
|
|
fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(AsyncClosure))) noreturn {
|
|
message.handle(closure.kqueue);
|
|
const fiber = closure.fiber;
|
|
std.log.debug("{*} performing async", .{fiber});
|
|
closure.start(closure.contextPointer(), fiber.resultBytes(closure.result_align));
|
|
const awaiter = @atomicRmw(?*Fiber, &fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
|
|
const ready_awaiter = r: {
|
|
const a = awaiter orelse break :r null;
|
|
if (@atomicRmw(bool, &closure.already_awaited, .Xchg, true, .acq_rel)) break :r null;
|
|
break :r a;
|
|
};
|
|
closure.kqueue.yield(ready_awaiter, .nothing);
|
|
unreachable; // switched to dead fiber
|
|
}
|
|
|
|
fn fromFiber(fiber: *Fiber) *AsyncClosure {
|
|
return @ptrFromInt(Fiber.max_context_align.max(.of(AsyncClosure)).backward(
|
|
@intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size,
|
|
) - @sizeOf(AsyncClosure));
|
|
}
|
|
};
|
|
|
|
pub fn io(k: *Kqueue) Io {
|
|
return .{
|
|
.userdata = k,
|
|
.vtable = &.{
|
|
.async = async,
|
|
.concurrent = concurrent,
|
|
.await = await,
|
|
.cancel = cancel,
|
|
|
|
.groupAsync = groupAsync,
|
|
.groupConcurrent = groupConcurrent,
|
|
.groupAwait = groupAwait,
|
|
.groupCancel = groupCancel,
|
|
|
|
.dirCreateDir = dirCreateDir,
|
|
.dirCreateDirPath = dirCreateDirPath,
|
|
.dirCreateDirPathOpen = dirCreateDirPathOpen,
|
|
.dirStat = dirStat,
|
|
.dirStatFile = dirStatFile,
|
|
|
|
.fileStat = fileStat,
|
|
.dirAccess = dirAccess,
|
|
.dirCreateFile = dirCreateFile,
|
|
.dirOpenFile = dirOpenFile,
|
|
.dirOpenDir = dirOpenDir,
|
|
.dirClose = dirClose,
|
|
.fileClose = fileClose,
|
|
.fileWriteStreaming = fileWriteStreaming,
|
|
.fileWritePositional = fileWritePositional,
|
|
.fileReadStreaming = fileReadStreaming,
|
|
.fileReadPositional = fileReadPositional,
|
|
.fileSeekBy = fileSeekBy,
|
|
.fileSeekTo = fileSeekTo,
|
|
|
|
.now = now,
|
|
.sleep = sleep,
|
|
|
|
.netListenIp = netListenIp,
|
|
.netListenUnix = netListenUnix,
|
|
.netAccept = netAccept,
|
|
.netBindIp = netBindIp,
|
|
.netConnectIp = netConnectIp,
|
|
.netConnectUnix = netConnectUnix,
|
|
.netClose = netClose,
|
|
.netShutdown = netShutdown,
|
|
.netRead = netRead,
|
|
.netWrite = netWrite,
|
|
.netSend = netSend,
|
|
.netReceive = netReceive,
|
|
.netInterfaceNameResolve = netInterfaceNameResolve,
|
|
.netInterfaceName = netInterfaceName,
|
|
.netLookup = netLookup,
|
|
},
|
|
};
|
|
}
|
|
|
|
fn async(
|
|
userdata: ?*anyopaque,
|
|
result: []u8,
|
|
result_alignment: std.mem.Alignment,
|
|
context: []const u8,
|
|
context_alignment: std.mem.Alignment,
|
|
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
|
|
) ?*Io.AnyFuture {
|
|
return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch {
|
|
start(context.ptr, result.ptr);
|
|
return null;
|
|
};
|
|
}
|
|
|
|
fn concurrent(
|
|
userdata: ?*anyopaque,
|
|
result_len: usize,
|
|
result_alignment: Alignment,
|
|
context: []const u8,
|
|
context_alignment: Alignment,
|
|
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
|
|
) Io.ConcurrentError!*Io.AnyFuture {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO
|
|
assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO
|
|
assert(result_len <= Fiber.max_result_size); // TODO
|
|
assert(context.len <= Fiber.max_context_size); // TODO
|
|
|
|
const fiber = Fiber.allocate(k) catch return error.ConcurrencyUnavailable;
|
|
std.log.debug("allocated {*}", .{fiber});
|
|
|
|
const closure: *AsyncClosure = .fromFiber(fiber);
|
|
fiber.* = .{
|
|
.required_align = {},
|
|
.context = switch (builtin.cpu.arch) {
|
|
.x86_64 => .{
|
|
.rsp = @intFromPtr(closure) - @sizeOf(usize),
|
|
.rbp = 0,
|
|
.rip = @intFromPtr(&fiberEntry),
|
|
},
|
|
.aarch64 => .{
|
|
.sp = @intFromPtr(closure),
|
|
.fp = 0,
|
|
.pc = @intFromPtr(&fiberEntry),
|
|
},
|
|
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
|
|
},
|
|
.awaiter = null,
|
|
.queue_next = null,
|
|
.cancel_thread = null,
|
|
.awaiting_completions = .initEmpty(),
|
|
};
|
|
closure.* = .{
|
|
.kqueue = k,
|
|
.fiber = fiber,
|
|
.start = start,
|
|
.result_align = result_alignment,
|
|
.already_awaited = false,
|
|
};
|
|
@memcpy(closure.contextPointer(), context);
|
|
|
|
k.schedule(.current(), .{ .head = fiber, .tail = fiber });
|
|
return @ptrCast(fiber);
|
|
}
|
|
|
|
fn await(
|
|
userdata: ?*anyopaque,
|
|
any_future: *Io.AnyFuture,
|
|
result: []u8,
|
|
result_alignment: std.mem.Alignment,
|
|
) void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
|
|
if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished)
|
|
k.yield(null, .{ .register_awaiter = &future_fiber.awaiter });
|
|
@memcpy(result, future_fiber.resultBytes(result_alignment));
|
|
k.recycle(future_fiber);
|
|
}
|
|
|
|
fn cancel(
|
|
userdata: ?*anyopaque,
|
|
any_future: *Io.AnyFuture,
|
|
result: []u8,
|
|
result_alignment: std.mem.Alignment,
|
|
) void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = any_future;
|
|
_ = result;
|
|
_ = result_alignment;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn cancelRequested(userdata: ?*anyopaque) bool {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
return false; // TODO
|
|
}
|
|
|
|
fn groupAsync(
|
|
userdata: ?*anyopaque,
|
|
type_erased: *Io.Group,
|
|
context: []const u8,
|
|
context_alignment: Alignment,
|
|
start: *const fn (context: *const anyopaque) void,
|
|
) void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = type_erased;
|
|
_ = context;
|
|
_ = context_alignment;
|
|
_ = start;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn groupConcurrent(
|
|
userdata: ?*anyopaque,
|
|
type_erased: *Io.Group,
|
|
context: []const u8,
|
|
context_alignment: Alignment,
|
|
start: *const fn (context: *const anyopaque) void,
|
|
) Io.ConcurrentError!void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = type_erased;
|
|
_ = context;
|
|
_ = context_alignment;
|
|
_ = start;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn groupAwait(userdata: ?*anyopaque, type_erased: *Io.Group, initial_token: *anyopaque) Io.Cancelable!void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = type_erased;
|
|
_ = initial_token;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = group;
|
|
_ = token;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn dirCreateDir(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, permissions: Dir.Permissions) Dir.CreateDirError!void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = dir;
|
|
_ = sub_path;
|
|
_ = permissions;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn dirCreateDirPath(
|
|
userdata: ?*anyopaque,
|
|
dir: Dir,
|
|
sub_path: []const u8,
|
|
permissions: Dir.Permissions,
|
|
) Dir.CreateDirPathError!Dir.CreatePathStatus {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = dir;
|
|
_ = sub_path;
|
|
_ = permissions;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn dirCreateDirPathOpen(
|
|
userdata: ?*anyopaque,
|
|
dir: Dir,
|
|
sub_path: []const u8,
|
|
permissions: Dir.Permissions,
|
|
options: Dir.OpenOptions,
|
|
) Dir.CreateDirPathOpenError!Dir {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = dir;
|
|
_ = sub_path;
|
|
_ = permissions;
|
|
_ = options;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn dirStat(userdata: ?*anyopaque, dir: Dir) Dir.StatError!Dir.Stat {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = dir;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn dirStatFile(
|
|
userdata: ?*anyopaque,
|
|
dir: Dir,
|
|
sub_path: []const u8,
|
|
options: Dir.StatFileOptions,
|
|
) Dir.StatFileError!File.Stat {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = dir;
|
|
_ = sub_path;
|
|
_ = options;
|
|
@panic("TODO");
|
|
}
|
|
fn dirAccess(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, options: Dir.AccessOptions) Dir.AccessError!void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = dir;
|
|
_ = sub_path;
|
|
_ = options;
|
|
@panic("TODO");
|
|
}
|
|
fn dirCreateFile(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, flags: File.CreateFlags) File.OpenError!File {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = dir;
|
|
_ = sub_path;
|
|
_ = flags;
|
|
@panic("TODO");
|
|
}
|
|
fn dirOpenFile(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, flags: File.OpenFlags) File.OpenError!File {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = dir;
|
|
_ = sub_path;
|
|
_ = flags;
|
|
@panic("TODO");
|
|
}
|
|
fn dirOpenDir(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, options: Dir.OpenOptions) Dir.OpenError!Dir {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = dir;
|
|
_ = sub_path;
|
|
_ = options;
|
|
@panic("TODO");
|
|
}
|
|
fn dirClose(userdata: ?*anyopaque, dirs: []const Dir) void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = dirs;
|
|
@panic("TODO");
|
|
}
|
|
fn fileStat(userdata: ?*anyopaque, file: File) File.StatError!File.Stat {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = file;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn fileClose(userdata: ?*anyopaque, files: []const File) void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = files;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn fileWriteStreaming(
|
|
userdata: ?*anyopaque,
|
|
file: File,
|
|
header: []const u8,
|
|
data: []const []const u8,
|
|
splat: usize,
|
|
) File.Writer.Error!usize {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = file;
|
|
_ = header;
|
|
_ = data;
|
|
_ = splat;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn fileWritePositional(
|
|
userdata: ?*anyopaque,
|
|
file: File,
|
|
header: []const u8,
|
|
data: []const []const u8,
|
|
splat: usize,
|
|
offset: u64,
|
|
) File.WritePositionalError!usize {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = file;
|
|
_ = header;
|
|
_ = data;
|
|
_ = splat;
|
|
_ = offset;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn fileReadStreaming(userdata: ?*anyopaque, file: File, data: []const []u8) File.Reader.Error!usize {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = file;
|
|
_ = data;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn fileReadPositional(userdata: ?*anyopaque, file: File, data: []const []u8, offset: u64) File.ReadPositionalError!usize {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = file;
|
|
_ = data;
|
|
_ = offset;
|
|
@panic("TODO");
|
|
}
|
|
fn fileSeekBy(userdata: ?*anyopaque, file: File, relative_offset: i64) File.SeekError!void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = file;
|
|
_ = relative_offset;
|
|
@panic("TODO");
|
|
}
|
|
fn fileSeekTo(userdata: ?*anyopaque, file: File, absolute_offset: u64) File.SeekError!void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = file;
|
|
_ = absolute_offset;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn now(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = clock;
|
|
@panic("TODO");
|
|
}
|
|
fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = timeout;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn netListenIp(
|
|
userdata: ?*anyopaque,
|
|
address: net.IpAddress,
|
|
options: net.IpAddress.ListenOptions,
|
|
) net.IpAddress.ListenError!net.Server {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = address;
|
|
_ = options;
|
|
@panic("TODO");
|
|
}
|
|
fn netAccept(userdata: ?*anyopaque, server: net.Socket.Handle) net.Server.AcceptError!net.Stream {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = server;
|
|
@panic("TODO");
|
|
}
|
|
fn netBindIp(
|
|
userdata: ?*anyopaque,
|
|
address: *const net.IpAddress,
|
|
options: net.IpAddress.BindOptions,
|
|
) net.IpAddress.BindError!net.Socket {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
const family = Io.Threaded.posixAddressFamily(address);
|
|
const socket_fd = try openSocketPosix(k, family, options);
|
|
errdefer closeFd(socket_fd);
|
|
var storage: Io.Threaded.PosixAddress = undefined;
|
|
var addr_len = Io.Threaded.addressToPosix(address, &storage);
|
|
try posixBind(k, socket_fd, &storage.any, addr_len);
|
|
try posixGetSockName(k, socket_fd, &storage.any, &addr_len);
|
|
return .{
|
|
.handle = socket_fd,
|
|
.address = Io.Threaded.addressFromPosix(&storage),
|
|
};
|
|
}
|
|
fn netConnectIp(userdata: ?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.ConnectOptions) net.IpAddress.ConnectError!net.Stream {
|
|
if (options.timeout != .none) @panic("TODO");
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
const family = Io.Threaded.posixAddressFamily(address);
|
|
const socket_fd = try openSocketPosix(k, family, .{
|
|
.mode = options.mode,
|
|
.protocol = options.protocol,
|
|
});
|
|
errdefer closeFd(socket_fd);
|
|
var storage: Io.Threaded.PosixAddress = undefined;
|
|
var addr_len = Io.Threaded.addressToPosix(address, &storage);
|
|
try posixConnect(k, socket_fd, &storage.any, addr_len);
|
|
try posixGetSockName(k, socket_fd, &storage.any, &addr_len);
|
|
return .{ .socket = .{
|
|
.handle = socket_fd,
|
|
.address = Io.Threaded.addressFromPosix(&storage),
|
|
} };
|
|
}
|
|
|
|
fn posixConnect(k: *Kqueue, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
|
|
while (true) {
|
|
try k.checkCancel();
|
|
switch (posix.errno(posix.system.connect(socket_fd, addr, addr_len))) {
|
|
.SUCCESS => return,
|
|
.INTR => continue,
|
|
.CANCELED => return error.Canceled,
|
|
.AGAIN => @panic("TODO"),
|
|
.INPROGRESS => return, // Due to TCP fast open, we find out possible error later.
|
|
|
|
.ADDRNOTAVAIL => return error.AddressUnavailable,
|
|
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
|
|
.ALREADY => return error.ConnectionPending,
|
|
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
|
|
.CONNREFUSED => return error.ConnectionRefused,
|
|
.CONNRESET => return error.ConnectionResetByPeer,
|
|
.FAULT => |err| return errnoBug(err),
|
|
.ISCONN => |err| return errnoBug(err),
|
|
.HOSTUNREACH => return error.HostUnreachable,
|
|
.NETUNREACH => return error.NetworkUnreachable,
|
|
.NOTSOCK => |err| return errnoBug(err),
|
|
.PROTOTYPE => |err| return errnoBug(err),
|
|
.TIMEDOUT => return error.Timeout,
|
|
.CONNABORTED => |err| return errnoBug(err),
|
|
.ACCES => return error.AccessDenied,
|
|
.PERM => |err| return errnoBug(err),
|
|
.NOENT => |err| return errnoBug(err),
|
|
.NETDOWN => return error.NetworkDown,
|
|
else => |err| return posix.unexpectedErrno(err),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn netListenUnix(
|
|
userdata: ?*anyopaque,
|
|
unix_address: *const net.UnixAddress,
|
|
options: net.UnixAddress.ListenOptions,
|
|
) net.UnixAddress.ListenError!net.Socket.Handle {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = unix_address;
|
|
_ = options;
|
|
@panic("TODO");
|
|
}
|
|
fn netConnectUnix(
|
|
userdata: ?*anyopaque,
|
|
unix_address: *const net.UnixAddress,
|
|
) net.UnixAddress.ConnectError!net.Socket.Handle {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = unix_address;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn netSend(
|
|
userdata: ?*anyopaque,
|
|
handle: net.Socket.Handle,
|
|
outgoing_messages: []net.OutgoingMessage,
|
|
flags: net.SendFlags,
|
|
) struct { ?net.Socket.SendError, usize } {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
|
|
const posix_flags: u32 =
|
|
@as(u32, if (@hasDecl(posix.MSG, "CONFIRM") and flags.confirm) posix.MSG.CONFIRM else 0) |
|
|
@as(u32, if (@hasDecl(posix.MSG, "DONTROUTE") and flags.dont_route) posix.MSG.DONTROUTE else 0) |
|
|
@as(u32, if (@hasDecl(posix.MSG, "EOR") and flags.eor) posix.MSG.EOR else 0) |
|
|
@as(u32, if (@hasDecl(posix.MSG, "OOB") and flags.oob) posix.MSG.OOB else 0) |
|
|
@as(u32, if (@hasDecl(posix.MSG, "FASTOPEN") and flags.fastopen) posix.MSG.FASTOPEN else 0) |
|
|
posix.MSG.NOSIGNAL;
|
|
|
|
for (outgoing_messages, 0..) |*msg, i| {
|
|
netSendOne(k, handle, msg, posix_flags) catch |err| return .{ err, i };
|
|
}
|
|
|
|
return .{ null, outgoing_messages.len };
|
|
}
|
|
|
|
fn netSendOne(
|
|
k: *Kqueue,
|
|
handle: net.Socket.Handle,
|
|
message: *net.OutgoingMessage,
|
|
flags: u32,
|
|
) net.Socket.SendError!void {
|
|
var addr: Io.Threaded.PosixAddress = undefined;
|
|
var iovec: posix.iovec_const = .{ .base = @constCast(message.data_ptr), .len = message.data_len };
|
|
const msg: posix.msghdr_const = .{
|
|
.name = &addr.any,
|
|
.namelen = Io.Threaded.addressToPosix(message.address, &addr),
|
|
.iov = (&iovec)[0..1],
|
|
.iovlen = 1,
|
|
// OS returns EINVAL if this pointer is invalid even if controllen is zero.
|
|
.control = if (message.control.len == 0) null else @constCast(message.control.ptr),
|
|
.controllen = @intCast(message.control.len),
|
|
.flags = 0,
|
|
};
|
|
while (true) {
|
|
try k.checkCancel();
|
|
const rc = posix.system.sendmsg(handle, &msg, flags);
|
|
switch (posix.errno(rc)) {
|
|
.SUCCESS => {
|
|
message.data_len = @intCast(rc);
|
|
return;
|
|
},
|
|
.INTR => continue,
|
|
.CANCELED => return error.Canceled,
|
|
.AGAIN => @panic("TODO register kevent"),
|
|
|
|
.ACCES => return error.AccessDenied,
|
|
.ALREADY => return error.FastOpenAlreadyInProgress,
|
|
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
|
|
.CONNRESET => return error.ConnectionResetByPeer,
|
|
.DESTADDRREQ => |err| return errnoBug(err),
|
|
.FAULT => |err| return errnoBug(err),
|
|
.INVAL => |err| return errnoBug(err),
|
|
.ISCONN => |err| return errnoBug(err),
|
|
.MSGSIZE => return error.MessageOversize,
|
|
.NOBUFS => return error.SystemResources,
|
|
.NOMEM => return error.SystemResources,
|
|
.NOTSOCK => |err| return errnoBug(err),
|
|
.OPNOTSUPP => |err| return errnoBug(err),
|
|
.PIPE => return error.SocketUnconnected,
|
|
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
|
|
.HOSTUNREACH => return error.HostUnreachable,
|
|
.NETUNREACH => return error.NetworkUnreachable,
|
|
.NOTCONN => return error.SocketUnconnected,
|
|
.NETDOWN => return error.NetworkDown,
|
|
else => |err| return posix.unexpectedErrno(err),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn netReceive(
|
|
userdata: ?*anyopaque,
|
|
handle: net.Socket.Handle,
|
|
message_buffer: []net.IncomingMessage,
|
|
data_buffer: []u8,
|
|
flags: net.ReceiveFlags,
|
|
timeout: Io.Timeout,
|
|
) struct { ?net.Socket.ReceiveTimeoutError, usize } {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = handle;
|
|
_ = message_buffer;
|
|
_ = data_buffer;
|
|
_ = flags;
|
|
_ = timeout;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn netRead(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
|
|
var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
|
|
var i: usize = 0;
|
|
for (data) |buf| {
|
|
if (iovecs_buffer.len - i == 0) break;
|
|
if (buf.len != 0) {
|
|
iovecs_buffer[i] = .{ .base = buf.ptr, .len = buf.len };
|
|
i += 1;
|
|
}
|
|
}
|
|
const dest = iovecs_buffer[0..i];
|
|
assert(dest[0].len > 0);
|
|
|
|
while (true) {
|
|
try k.checkCancel();
|
|
const rc = posix.system.readv(fd, dest.ptr, @intCast(dest.len));
|
|
switch (posix.errno(rc)) {
|
|
.SUCCESS => return @intCast(rc),
|
|
.INTR => continue,
|
|
.CANCELED => return error.Canceled,
|
|
.AGAIN => {
|
|
const thread: *Thread = .current();
|
|
const fiber = thread.currentFiber();
|
|
const ident: u32 = @bitCast(fd);
|
|
const filter = std.c.EVFILT.READ;
|
|
const gop = thread.wait_queues.getOrPut(k.gpa, .{
|
|
.ident = ident,
|
|
.filter = filter,
|
|
}) catch return error.SystemResources;
|
|
if (gop.found_existing) {
|
|
const tail_fiber = gop.value_ptr.*;
|
|
assert(tail_fiber.queue_next == null);
|
|
tail_fiber.queue_next = fiber;
|
|
gop.value_ptr.* = fiber;
|
|
} else {
|
|
gop.value_ptr.* = fiber;
|
|
const changes = [_]posix.Kevent{
|
|
.{
|
|
.ident = ident,
|
|
.filter = filter,
|
|
.flags = std.c.EV.ADD | std.c.EV.ONESHOT,
|
|
.fflags = 0,
|
|
.data = 0,
|
|
.udata = @intFromPtr(fiber),
|
|
},
|
|
};
|
|
assert(0 == (kevent(thread.kq_fd, &changes, &.{}, null) catch |err| {
|
|
// TODO handle EINTR for cancellation purposes
|
|
@panic(@errorName(err)); // TODO
|
|
}));
|
|
}
|
|
yield(k, null, .nothing);
|
|
continue;
|
|
},
|
|
|
|
.INVAL => |err| return errnoBug(err),
|
|
.FAULT => |err| return errnoBug(err),
|
|
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
|
|
.NOBUFS => return error.SystemResources,
|
|
.NOMEM => return error.SystemResources,
|
|
.NOTCONN => return error.SocketUnconnected,
|
|
.CONNRESET => return error.ConnectionResetByPeer,
|
|
.TIMEDOUT => return error.Timeout,
|
|
.PIPE => return error.SocketUnconnected,
|
|
.NETDOWN => return error.NetworkDown,
|
|
else => |err| return posix.unexpectedErrno(err),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn netWrite(userdata: ?*anyopaque, dest: net.Socket.Handle, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = dest;
|
|
_ = header;
|
|
_ = data;
|
|
_ = splat;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn netClose(userdata: ?*anyopaque, handles: []const net.Socket.Handle) void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = handles;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn netShutdown(userdata: ?*anyopaque, handle: net.Socket.Handle, how: net.ShutdownHow) net.ShutdownError!void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = handle;
|
|
_ = how;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn netInterfaceNameResolve(
|
|
userdata: ?*anyopaque,
|
|
name: *const net.Interface.Name,
|
|
) net.Interface.Name.ResolveError!net.Interface {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = name;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn netInterfaceName(userdata: ?*anyopaque, interface: net.Interface) net.Interface.NameError!net.Interface.Name {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = interface;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn netLookup(
|
|
userdata: ?*anyopaque,
|
|
host_name: net.HostName,
|
|
resolved: *Io.Queue(net.HostName.LookupResult),
|
|
options: net.HostName.LookupOptions,
|
|
) net.HostName.LookupError!void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = host_name;
|
|
_ = resolved;
|
|
_ = options;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn openSocketPosix(
|
|
k: *Kqueue,
|
|
family: posix.sa_family_t,
|
|
options: IpAddress.BindOptions,
|
|
) error{
|
|
AddressFamilyUnsupported,
|
|
ProtocolUnsupportedBySystem,
|
|
ProcessFdQuotaExceeded,
|
|
SystemFdQuotaExceeded,
|
|
SystemResources,
|
|
ProtocolUnsupportedByAddressFamily,
|
|
SocketModeUnsupported,
|
|
OptionUnsupported,
|
|
Unexpected,
|
|
Canceled,
|
|
}!posix.socket_t {
|
|
const mode = Io.Threaded.posixSocketMode(options.mode);
|
|
const protocol = Io.Threaded.posixProtocol(options.protocol);
|
|
const socket_fd = while (true) {
|
|
try k.checkCancel();
|
|
const flags: u32 = mode | if (Io.Threaded.socket_flags_unsupported) 0 else posix.SOCK.CLOEXEC;
|
|
const socket_rc = posix.system.socket(family, flags, protocol);
|
|
switch (posix.errno(socket_rc)) {
|
|
.SUCCESS => {
|
|
const fd: posix.fd_t = @intCast(socket_rc);
|
|
errdefer closeFd(fd);
|
|
if (Io.Threaded.socket_flags_unsupported) {
|
|
while (true) {
|
|
try k.checkCancel();
|
|
switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) {
|
|
.SUCCESS => break,
|
|
.INTR => continue,
|
|
.CANCELED => return error.Canceled,
|
|
else => |err| return posix.unexpectedErrno(err),
|
|
}
|
|
}
|
|
|
|
var fl_flags: usize = while (true) {
|
|
try k.checkCancel();
|
|
const rc = posix.system.fcntl(fd, posix.F.GETFL, @as(usize, 0));
|
|
switch (posix.errno(rc)) {
|
|
.SUCCESS => break @intCast(rc),
|
|
.INTR => continue,
|
|
.CANCELED => return error.Canceled,
|
|
else => |err| return posix.unexpectedErrno(err),
|
|
}
|
|
};
|
|
fl_flags |= @as(usize, 1 << @bitOffsetOf(posix.O, "NONBLOCK"));
|
|
while (true) {
|
|
try k.checkCancel();
|
|
switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFL, fl_flags))) {
|
|
.SUCCESS => break,
|
|
.INTR => continue,
|
|
.CANCELED => return error.Canceled,
|
|
else => |err| return posix.unexpectedErrno(err),
|
|
}
|
|
}
|
|
}
|
|
break fd;
|
|
},
|
|
.INTR => continue,
|
|
.CANCELED => return error.Canceled,
|
|
|
|
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
|
|
.INVAL => return error.ProtocolUnsupportedBySystem,
|
|
.MFILE => return error.ProcessFdQuotaExceeded,
|
|
.NFILE => return error.SystemFdQuotaExceeded,
|
|
.NOBUFS => return error.SystemResources,
|
|
.NOMEM => return error.SystemResources,
|
|
.PROTONOSUPPORT => return error.ProtocolUnsupportedByAddressFamily,
|
|
.PROTOTYPE => return error.SocketModeUnsupported,
|
|
else => |err| return posix.unexpectedErrno(err),
|
|
}
|
|
};
|
|
errdefer closeFd(socket_fd);
|
|
|
|
if (options.ip6_only) {
|
|
if (posix.IPV6 == void) return error.OptionUnsupported;
|
|
try setSocketOption(k, socket_fd, posix.IPPROTO.IPV6, posix.IPV6.V6ONLY, 0);
|
|
}
|
|
|
|
return socket_fd;
|
|
}
|
|
|
|
fn posixBind(
|
|
k: *Kqueue,
|
|
socket_fd: posix.socket_t,
|
|
addr: *const posix.sockaddr,
|
|
addr_len: posix.socklen_t,
|
|
) !void {
|
|
while (true) {
|
|
try k.checkCancel();
|
|
switch (posix.errno(posix.system.bind(socket_fd, addr, addr_len))) {
|
|
.SUCCESS => break,
|
|
.INTR => continue,
|
|
.CANCELED => return error.Canceled,
|
|
|
|
.ADDRINUSE => return error.AddressInUse,
|
|
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
|
|
.INVAL => |err| return errnoBug(err), // invalid parameters
|
|
.NOTSOCK => |err| return errnoBug(err), // invalid `sockfd`
|
|
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
|
|
.ADDRNOTAVAIL => return error.AddressUnavailable,
|
|
.FAULT => |err| return errnoBug(err), // invalid `addr` pointer
|
|
.NOMEM => return error.SystemResources,
|
|
else => |err| return posix.unexpectedErrno(err),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn posixGetSockName(k: *Kqueue, socket_fd: posix.fd_t, addr: *posix.sockaddr, addr_len: *posix.socklen_t) !void {
|
|
while (true) {
|
|
try k.checkCancel();
|
|
switch (posix.errno(posix.system.getsockname(socket_fd, addr, addr_len))) {
|
|
.SUCCESS => break,
|
|
.INTR => continue,
|
|
.CANCELED => return error.Canceled,
|
|
|
|
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
|
|
.FAULT => |err| return errnoBug(err),
|
|
.INVAL => |err| return errnoBug(err), // invalid parameters
|
|
.NOTSOCK => |err| return errnoBug(err), // always a race condition
|
|
.NOBUFS => return error.SystemResources,
|
|
else => |err| return posix.unexpectedErrno(err),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn setSocketOption(k: *Kqueue, fd: posix.fd_t, level: i32, opt_name: u32, option: u32) !void {
|
|
const o: []const u8 = @ptrCast(&option);
|
|
while (true) {
|
|
try k.checkCancel();
|
|
switch (posix.errno(posix.system.setsockopt(fd, level, opt_name, o.ptr, @intCast(o.len)))) {
|
|
.SUCCESS => return,
|
|
.INTR => continue,
|
|
.CANCELED => return error.Canceled,
|
|
|
|
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
|
|
.NOTSOCK => |err| return errnoBug(err),
|
|
.INVAL => |err| return errnoBug(err),
|
|
.FAULT => |err| return errnoBug(err),
|
|
else => |err| return posix.unexpectedErrno(err),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn checkCancel(k: *Kqueue) error{Canceled}!void {
|
|
if (cancelRequested(k)) return error.Canceled;
|
|
}
|
|
|
|
pub const KEventError = error{
|
|
/// The process does not have permission to register a filter.
|
|
AccessDenied,
|
|
/// The event could not be found to be modified or deleted.
|
|
EventNotFound,
|
|
/// No memory was available to register the event.
|
|
SystemResources,
|
|
/// The specified process to attach to does not exist.
|
|
ProcessNotFound,
|
|
/// changelist or eventlist had too many items on it.
|
|
/// TODO remove this possibility
|
|
Overflow,
|
|
};
|
|
|
|
pub fn kevent(
|
|
kq: i32,
|
|
changelist: []const posix.Kevent,
|
|
eventlist: []posix.Kevent,
|
|
timeout: ?*const posix.timespec,
|
|
) KEventError!usize {
|
|
while (true) {
|
|
const rc = posix.system.kevent(
|
|
kq,
|
|
changelist.ptr,
|
|
std.math.cast(c_int, changelist.len) orelse return error.Overflow,
|
|
eventlist.ptr,
|
|
std.math.cast(c_int, eventlist.len) orelse return error.Overflow,
|
|
timeout,
|
|
);
|
|
switch (posix.errno(rc)) {
|
|
.SUCCESS => return @intCast(rc),
|
|
.ACCES => return error.AccessDenied,
|
|
.FAULT => unreachable, // TODO use error.Unexpected for these
|
|
.BADF => unreachable, // Always a race condition.
|
|
.INTR => continue, // TODO handle cancelation
|
|
.INVAL => unreachable,
|
|
.NOENT => return error.EventNotFound,
|
|
.NOMEM => return error.SystemResources,
|
|
.SRCH => return error.ProcessNotFound,
|
|
else => unreachable,
|
|
}
|
|
}
|
|
}
|