mirror of
https://codeberg.org/ziglang/zig.git
synced 2026-03-08 08:44:47 +01:00
This function works with a slice of futures and returns the index of a completed one. This doesn't work very well in practice because it's either too high level or too low level. At the lower level we have Io.Batch for doing this kind of thing at the Operation API layer. At the higher level we have Io.Select which is a convenience wrapper around an Io.Group and an Io.Queue.
1524 lines
52 KiB
Zig
1524 lines
52 KiB
Zig
const Kqueue = @This();
|
|
const builtin = @import("builtin");
|
|
|
|
const std = @import("../std.zig");
|
|
const Io = std.Io;
|
|
const Dir = std.Io.Dir;
|
|
const File = std.Io.File;
|
|
const net = std.Io.net;
|
|
const assert = std.debug.assert;
|
|
const Allocator = std.mem.Allocator;
|
|
const Alignment = std.mem.Alignment;
|
|
const IpAddress = std.Io.net.IpAddress;
|
|
const errnoBug = std.Io.Threaded.errnoBug;
|
|
const closeFd = std.Io.Threaded.closeFd;
|
|
const posix = std.posix;
|
|
|
|
/// Must be a thread-safe allocator.
|
|
gpa: Allocator,
|
|
mutex: Io.Mutex,
|
|
main_fiber_buffer: [@sizeOf(Fiber) + Fiber.max_result_size]u8 align(@alignOf(Fiber)),
|
|
threads: Thread.List,
|
|
|
|
/// Empirically saw >128KB being used by the self-hosted backend to panic.
|
|
const idle_stack_size = 256 * 1024;
|
|
|
|
const max_idle_search = 4;
|
|
const max_steal_ready_search = 4;
|
|
const max_iovecs_len = 8;
|
|
|
|
const changes_buffer_len = 64;
|
|
|
|
const Thread = struct {
|
|
thread: std.Thread,
|
|
idle_context: Io.fiber.Context,
|
|
current_context: *Io.fiber.Context,
|
|
ready_queue: ?*Fiber,
|
|
kq_fd: posix.fd_t,
|
|
idle_search_index: u32,
|
|
steal_ready_search_index: u32,
|
|
/// For ensuring multiple fibers waiting on the same file descriptor and
|
|
/// filter use the same kevent.
|
|
wait_queues: std.AutoArrayHashMapUnmanaged(WaitQueueKey, *Fiber),
|
|
|
|
const WaitQueueKey = struct {
|
|
ident: usize,
|
|
filter: i32,
|
|
};
|
|
|
|
const canceling: ?*Thread = @ptrFromInt(@alignOf(Thread));
|
|
|
|
threadlocal var self: *Thread = undefined;
|
|
|
|
fn current() *Thread {
|
|
return self;
|
|
}
|
|
|
|
fn currentFiber(thread: *Thread) *Fiber {
|
|
return @fieldParentPtr("context", thread.current_context);
|
|
}
|
|
|
|
const List = struct {
|
|
allocated: []Thread,
|
|
reserved: u32,
|
|
active: u32,
|
|
};
|
|
|
|
fn deinit(thread: *Thread, gpa: Allocator) void {
|
|
closeFd(thread.kq_fd);
|
|
assert(thread.wait_queues.count() == 0);
|
|
thread.wait_queues.deinit(gpa);
|
|
thread.* = undefined;
|
|
}
|
|
};
|
|
|
|
const Fiber = struct {
|
|
required_align: void align(4),
|
|
context: Io.fiber.Context,
|
|
awaiter: ?*Fiber,
|
|
queue_next: ?*Fiber,
|
|
cancel_thread: ?*Thread,
|
|
awaiting_completions: std.StaticBitSet(3),
|
|
|
|
const finished: ?*Fiber = @ptrFromInt(@alignOf(Thread));
|
|
|
|
const max_result_align: Alignment = .@"16";
|
|
const max_result_size = max_result_align.forward(64);
|
|
/// This includes any stack realignments that need to happen, and also the
|
|
/// initial frame return address slot and argument frame, depending on target.
|
|
const min_stack_size = 4 * 1024 * 1024;
|
|
const max_context_align: Alignment = .@"16";
|
|
const max_context_size = max_context_align.forward(1024);
|
|
const max_closure_size: usize = @sizeOf(AsyncClosure);
|
|
const max_closure_align: Alignment = .of(AsyncClosure);
|
|
const allocation_size = std.mem.alignForward(
|
|
usize,
|
|
max_closure_align.max(max_context_align).forward(
|
|
max_result_align.forward(@sizeOf(Fiber)) + max_result_size + min_stack_size,
|
|
) + max_closure_size + max_context_size,
|
|
std.heap.page_size_max,
|
|
);
|
|
|
|
fn allocate(k: *Kqueue) error{OutOfMemory}!*Fiber {
|
|
return @ptrCast(try k.gpa.alignedAlloc(u8, .of(Fiber), allocation_size));
|
|
}
|
|
|
|
fn allocatedSlice(f: *Fiber) []align(@alignOf(Fiber)) u8 {
|
|
return @as([*]align(@alignOf(Fiber)) u8, @ptrCast(f))[0..allocation_size];
|
|
}
|
|
|
|
fn allocatedEnd(f: *Fiber) [*]u8 {
|
|
const allocated_slice = f.allocatedSlice();
|
|
return allocated_slice[allocated_slice.len..].ptr;
|
|
}
|
|
|
|
fn resultPointer(f: *Fiber, comptime Result: type) *Result {
|
|
return @ptrCast(@alignCast(f.resultBytes(.of(Result))));
|
|
}
|
|
|
|
fn resultBytes(f: *Fiber, alignment: Alignment) [*]u8 {
|
|
return @ptrFromInt(alignment.forward(@intFromPtr(f) + @sizeOf(Fiber)));
|
|
}
|
|
|
|
fn enterCancelRegion(fiber: *Fiber, thread: *Thread) error{Canceled}!void {
|
|
if (@cmpxchgStrong(
|
|
?*Thread,
|
|
&fiber.cancel_thread,
|
|
null,
|
|
thread,
|
|
.acq_rel,
|
|
.acquire,
|
|
)) |cancel_thread| {
|
|
assert(cancel_thread == Thread.canceling);
|
|
return error.Canceled;
|
|
}
|
|
}
|
|
|
|
fn exitCancelRegion(fiber: *Fiber, thread: *Thread) void {
|
|
if (@cmpxchgStrong(
|
|
?*Thread,
|
|
&fiber.cancel_thread,
|
|
thread,
|
|
null,
|
|
.acq_rel,
|
|
.acquire,
|
|
)) |cancel_thread| assert(cancel_thread == Thread.canceling);
|
|
}
|
|
|
|
const Queue = struct { head: *Fiber, tail: *Fiber };
|
|
};
|
|
|
|
fn recycle(k: *Kqueue, fiber: *Fiber) void {
|
|
std.log.debug("recyling {*}", .{fiber});
|
|
assert(fiber.queue_next == null);
|
|
k.gpa.free(fiber.allocatedSlice());
|
|
}
|
|
|
|
pub const InitOptions = struct {
|
|
n_threads: ?usize = null,
|
|
};
|
|
|
|
pub const InitError = Allocator.Error || CreateFileDescriptorError;
|
|
|
|
pub fn init(k: *Kqueue, gpa: Allocator, options: InitOptions) !void {
|
|
assert(options.n_threads != 0);
|
|
|
|
const n_threads = @max(1, options.n_threads orelse std.Thread.getCpuCount() catch 1);
|
|
const threads_size = n_threads * @sizeOf(Thread);
|
|
const idle_stack_end_offset = std.mem.alignForward(usize, threads_size + idle_stack_size, std.heap.page_size_max);
|
|
const allocated_slice = try gpa.alignedAlloc(u8, .of(Thread), idle_stack_end_offset);
|
|
errdefer gpa.free(allocated_slice);
|
|
k.* = .{
|
|
.gpa = gpa,
|
|
.mutex = .init,
|
|
.main_fiber_buffer = undefined,
|
|
.threads = .{
|
|
.allocated = @ptrCast(allocated_slice[0..threads_size]),
|
|
.reserved = 1,
|
|
.active = 1,
|
|
},
|
|
};
|
|
const main_fiber: *Fiber = @ptrCast(&k.main_fiber_buffer);
|
|
main_fiber.* = .{
|
|
.required_align = {},
|
|
.context = undefined,
|
|
.awaiter = null,
|
|
.queue_next = null,
|
|
.cancel_thread = null,
|
|
.awaiting_completions = .initEmpty(),
|
|
};
|
|
const main_thread = &k.threads.allocated[0];
|
|
Thread.self = main_thread;
|
|
const idle_stack_end: [*]align(16) usize = @ptrCast(@alignCast(allocated_slice[idle_stack_end_offset..].ptr));
|
|
(idle_stack_end - 1)[0..1].* = .{@intFromPtr(k)};
|
|
main_thread.* = .{
|
|
.thread = undefined,
|
|
.idle_context = switch (builtin.cpu.arch) {
|
|
.aarch64 => .{
|
|
.sp = @intFromPtr(idle_stack_end),
|
|
.fp = 0,
|
|
.pc = @intFromPtr(&mainIdleEntry),
|
|
},
|
|
.x86_64 => .{
|
|
.rsp = @intFromPtr(idle_stack_end - 1),
|
|
.rbp = 0,
|
|
.rip = @intFromPtr(&mainIdleEntry),
|
|
},
|
|
else => @compileError("unimplemented architecture"),
|
|
},
|
|
.current_context = &main_fiber.context,
|
|
.ready_queue = null,
|
|
.kq_fd = try createFileDescriptor(),
|
|
.idle_search_index = 1,
|
|
.steal_ready_search_index = 1,
|
|
.wait_queues = .empty,
|
|
};
|
|
errdefer closeFd(main_thread.kq_fd);
|
|
std.log.debug("created main idle {*}", .{&main_thread.idle_context});
|
|
std.log.debug("created main {*}", .{main_fiber});
|
|
}
|
|
|
|
pub fn deinit(k: *Kqueue) void {
|
|
const active_threads = @atomicLoad(u32, &k.threads.active, .acquire);
|
|
for (k.threads.allocated[0..active_threads]) |*thread| {
|
|
const ready_fiber = @atomicLoad(?*Fiber, &thread.ready_queue, .monotonic);
|
|
assert(ready_fiber == null or ready_fiber == Fiber.finished); // pending async
|
|
}
|
|
k.yield(null, .exit);
|
|
const main_thread = &k.threads.allocated[0];
|
|
const gpa = k.gpa;
|
|
main_thread.deinit(gpa);
|
|
const allocated_ptr: [*]align(@alignOf(Thread)) u8 = @ptrCast(@alignCast(k.threads.allocated.ptr));
|
|
const idle_stack_end_offset = std.mem.alignForward(usize, k.threads.allocated.len * @sizeOf(Thread) + idle_stack_size, std.heap.page_size_max);
|
|
for (k.threads.allocated[1..active_threads]) |*thread| thread.thread.join();
|
|
gpa.free(allocated_ptr[0..idle_stack_end_offset]);
|
|
k.* = undefined;
|
|
}
|
|
|
|
pub const CreateFileDescriptorError = error{
|
|
/// The per-process limit on the number of open file descriptors has been reached.
|
|
ProcessFdQuotaExceeded,
|
|
/// The system-wide limit on the total number of open files has been reached.
|
|
SystemFdQuotaExceeded,
|
|
} || Io.UnexpectedError;
|
|
|
|
pub fn createFileDescriptor() CreateFileDescriptorError!posix.fd_t {
|
|
const rc = posix.system.kqueue();
|
|
switch (posix.errno(rc)) {
|
|
.SUCCESS => return @intCast(rc),
|
|
.MFILE => return error.ProcessFdQuotaExceeded,
|
|
.NFILE => return error.SystemFdQuotaExceeded,
|
|
else => |err| return posix.unexpectedErrno(err),
|
|
}
|
|
}
|
|
|
|
fn findReadyFiber(k: *Kqueue, thread: *Thread) ?*Fiber {
|
|
if (@atomicRmw(?*Fiber, &thread.ready_queue, .Xchg, Fiber.finished, .acquire)) |ready_fiber| {
|
|
@atomicStore(?*Fiber, &thread.ready_queue, ready_fiber.queue_next, .release);
|
|
ready_fiber.queue_next = null;
|
|
return ready_fiber;
|
|
}
|
|
const active_threads = @atomicLoad(u32, &k.threads.active, .acquire);
|
|
for (0..@min(max_steal_ready_search, active_threads)) |_| {
|
|
defer thread.steal_ready_search_index += 1;
|
|
if (thread.steal_ready_search_index == active_threads) thread.steal_ready_search_index = 0;
|
|
const steal_ready_search_thread = &k.threads.allocated[0..active_threads][thread.steal_ready_search_index];
|
|
if (steal_ready_search_thread == thread) continue;
|
|
const ready_fiber = @atomicLoad(?*Fiber, &steal_ready_search_thread.ready_queue, .acquire) orelse continue;
|
|
if (ready_fiber == Fiber.finished) continue;
|
|
if (@cmpxchgWeak(
|
|
?*Fiber,
|
|
&steal_ready_search_thread.ready_queue,
|
|
ready_fiber,
|
|
null,
|
|
.acquire,
|
|
.monotonic,
|
|
)) |_| continue;
|
|
@atomicStore(?*Fiber, &thread.ready_queue, ready_fiber.queue_next, .release);
|
|
ready_fiber.queue_next = null;
|
|
return ready_fiber;
|
|
}
|
|
// couldn't find anything to do, so we are now open for business
|
|
@atomicStore(?*Fiber, &thread.ready_queue, null, .monotonic);
|
|
return null;
|
|
}
|
|
|
|
fn yield(k: *Kqueue, maybe_ready_fiber: ?*Fiber, pending_task: SwitchMessage.PendingTask) void {
|
|
const thread: *Thread = .current();
|
|
const ready_context = if (maybe_ready_fiber orelse k.findReadyFiber(thread)) |ready_fiber|
|
|
&ready_fiber.context
|
|
else
|
|
&thread.idle_context;
|
|
const message: SwitchMessage = .{
|
|
.contexts = .{
|
|
.old = thread.current_context,
|
|
.new = ready_context,
|
|
},
|
|
.pending_task = pending_task,
|
|
};
|
|
std.log.debug("switching from {*} to {*}", .{ message.contexts.old, message.contexts.new });
|
|
contextSwitch(&message).handle(k);
|
|
}
|
|
|
|
fn schedule(k: *Kqueue, thread: *Thread, ready_queue: Fiber.Queue) void {
|
|
{
|
|
var fiber = ready_queue.head;
|
|
while (true) {
|
|
std.log.debug("scheduling {*}", .{fiber});
|
|
fiber = fiber.queue_next orelse break;
|
|
}
|
|
assert(fiber == ready_queue.tail);
|
|
}
|
|
// shared fields of previous `Thread` must be initialized before later ones are marked as active
|
|
const new_thread_index = @atomicLoad(u32, &k.threads.active, .acquire);
|
|
for (0..@min(max_idle_search, new_thread_index)) |_| {
|
|
defer thread.idle_search_index += 1;
|
|
if (thread.idle_search_index == new_thread_index) thread.idle_search_index = 0;
|
|
const idle_search_thread = &k.threads.allocated[0..new_thread_index][thread.idle_search_index];
|
|
if (idle_search_thread == thread) continue;
|
|
if (@cmpxchgWeak(
|
|
?*Fiber,
|
|
&idle_search_thread.ready_queue,
|
|
null,
|
|
ready_queue.head,
|
|
.release,
|
|
.monotonic,
|
|
)) |_| continue;
|
|
const changes = [_]posix.Kevent{
|
|
.{
|
|
.ident = 0,
|
|
.filter = std.c.EVFILT.USER,
|
|
.flags = std.c.EV.ADD | std.c.EV.ONESHOT,
|
|
.fflags = std.c.NOTE.TRIGGER,
|
|
.data = 0,
|
|
.udata = @intFromEnum(Completion.UserData.wakeup),
|
|
},
|
|
};
|
|
// If an error occurs it only pessimises scheduling.
|
|
_ = kevent(idle_search_thread.kq_fd, &changes, &.{}, null) catch |err| {
|
|
// TODO handle EINTR for cancellation purposes
|
|
@panic(@errorName(err)); // TODO
|
|
};
|
|
return;
|
|
}
|
|
spawn_thread: {
|
|
// previous failed reservations must have completed before retrying
|
|
if (new_thread_index == k.threads.allocated.len or @cmpxchgWeak(
|
|
u32,
|
|
&k.threads.reserved,
|
|
new_thread_index,
|
|
new_thread_index + 1,
|
|
.acquire,
|
|
.monotonic,
|
|
) != null) break :spawn_thread;
|
|
const new_thread = &k.threads.allocated[new_thread_index];
|
|
const next_thread_index = new_thread_index + 1;
|
|
new_thread.* = .{
|
|
.thread = undefined,
|
|
.idle_context = undefined,
|
|
.current_context = &new_thread.idle_context,
|
|
.ready_queue = ready_queue.head,
|
|
.kq_fd = createFileDescriptor() catch |err| {
|
|
@atomicStore(u32, &k.threads.reserved, new_thread_index, .release);
|
|
// no more access to `thread` after giving up reservation
|
|
std.log.warn("unable to create worker thread due to kqueue init failure: {t}", .{err});
|
|
break :spawn_thread;
|
|
},
|
|
.idle_search_index = 0,
|
|
.steal_ready_search_index = 0,
|
|
.wait_queues = .empty,
|
|
};
|
|
new_thread.thread = std.Thread.spawn(.{
|
|
.stack_size = idle_stack_size,
|
|
.allocator = k.gpa,
|
|
}, threadEntry, .{ k, new_thread_index }) catch |err| {
|
|
closeFd(new_thread.kq_fd);
|
|
@atomicStore(u32, &k.threads.reserved, new_thread_index, .release);
|
|
// no more access to `thread` after giving up reservation
|
|
std.log.warn("unable to create worker thread due spawn failure: {s}", .{@errorName(err)});
|
|
break :spawn_thread;
|
|
};
|
|
// shared fields of `Thread` must be initialized before being marked active
|
|
@atomicStore(u32, &k.threads.active, next_thread_index, .release);
|
|
return;
|
|
}
|
|
// nobody wanted it, so just queue it on ourselves
|
|
while (@cmpxchgWeak(
|
|
?*Fiber,
|
|
&thread.ready_queue,
|
|
ready_queue.tail.queue_next,
|
|
ready_queue.head,
|
|
.acq_rel,
|
|
.acquire,
|
|
)) |old_head| ready_queue.tail.queue_next = old_head;
|
|
}
|
|
|
|
fn mainIdle(k: *Kqueue, message: *const SwitchMessage) callconv(.withStackAlign(.c, @max(@alignOf(Thread), @alignOf(Io.fiber.Context)))) noreturn {
|
|
message.handle(k);
|
|
k.idle(&k.threads.allocated[0]);
|
|
k.yield(@ptrCast(&k.main_fiber_buffer), .nothing);
|
|
unreachable; // switched to dead fiber
|
|
}
|
|
|
|
fn threadEntry(k: *Kqueue, index: u32) void {
|
|
const thread: *Thread = &k.threads.allocated[index];
|
|
Thread.self = thread;
|
|
std.log.debug("created thread idle {*}", .{&thread.idle_context});
|
|
k.idle(thread);
|
|
thread.deinit(k.gpa);
|
|
}
|
|
|
|
const Completion = struct {
|
|
const UserData = enum(usize) {
|
|
unused,
|
|
wakeup,
|
|
cleanup,
|
|
exit,
|
|
/// *Fiber
|
|
_,
|
|
};
|
|
/// Corresponds to Kevent field.
|
|
flags: u16,
|
|
/// Corresponds to Kevent field.
|
|
fflags: u32,
|
|
/// Corresponds to Kevent field.
|
|
data: isize,
|
|
};
|
|
|
|
fn idle(k: *Kqueue, thread: *Thread) void {
|
|
var events_buffer: [changes_buffer_len]posix.Kevent = undefined;
|
|
var maybe_ready_fiber: ?*Fiber = null;
|
|
while (true) {
|
|
while (maybe_ready_fiber orelse k.findReadyFiber(thread)) |ready_fiber| {
|
|
k.yield(ready_fiber, .nothing);
|
|
maybe_ready_fiber = null;
|
|
}
|
|
const n = kevent(thread.kq_fd, &.{}, &events_buffer, null) catch |err| {
|
|
// TODO handle EINTR for cancellation purposes
|
|
@panic(@errorName(err)); // TODO
|
|
};
|
|
var maybe_ready_queue: ?Fiber.Queue = null;
|
|
for (events_buffer[0..n]) |event| switch (@as(Completion.UserData, @enumFromInt(event.udata))) {
|
|
.unused => unreachable, // bad submission queued?
|
|
.wakeup => {},
|
|
.cleanup => @panic("failed to notify other threads that we are exiting"),
|
|
.exit => {
|
|
assert(maybe_ready_fiber == null and maybe_ready_queue == null); // pending async
|
|
return;
|
|
},
|
|
_ => {
|
|
const event_head_fiber: *Fiber = @ptrFromInt(event.udata);
|
|
const event_tail_fiber = thread.wait_queues.fetchSwapRemove(.{
|
|
.ident = event.ident,
|
|
.filter = event.filter,
|
|
}).?.value;
|
|
assert(event_tail_fiber.queue_next == null);
|
|
|
|
// TODO reevaluate this logic
|
|
event_head_fiber.resultPointer(Completion).* = .{
|
|
.flags = event.flags,
|
|
.fflags = event.fflags,
|
|
.data = event.data,
|
|
};
|
|
|
|
queue_ready: {
|
|
const head: *Fiber = if (maybe_ready_fiber == null) f: {
|
|
maybe_ready_fiber = event_head_fiber;
|
|
const next = event_head_fiber.queue_next orelse break :queue_ready;
|
|
event_head_fiber.queue_next = null;
|
|
break :f next;
|
|
} else event_head_fiber;
|
|
|
|
if (maybe_ready_queue) |*ready_queue| {
|
|
ready_queue.tail.queue_next = head;
|
|
ready_queue.tail = event_tail_fiber;
|
|
} else {
|
|
maybe_ready_queue = .{ .head = head, .tail = event_tail_fiber };
|
|
}
|
|
}
|
|
},
|
|
};
|
|
if (maybe_ready_queue) |ready_queue| k.schedule(thread, ready_queue);
|
|
}
|
|
}
|
|
|
|
const SwitchMessage = struct {
|
|
contexts: Io.fiber.Switch,
|
|
pending_task: PendingTask,
|
|
|
|
const PendingTask = union(enum) {
|
|
nothing,
|
|
reschedule,
|
|
recycle: *Fiber,
|
|
register_awaiter: *?*Fiber,
|
|
exit,
|
|
};
|
|
|
|
fn handle(message: *const SwitchMessage, k: *Kqueue) void {
|
|
const thread: *Thread = .current();
|
|
thread.current_context = message.contexts.new;
|
|
switch (message.pending_task) {
|
|
.nothing => {},
|
|
.reschedule => if (message.contexts.old != &thread.idle_context) {
|
|
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.old));
|
|
assert(prev_fiber.queue_next == null);
|
|
k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
|
|
},
|
|
.recycle => |fiber| {
|
|
k.recycle(fiber);
|
|
},
|
|
.register_awaiter => |awaiter| {
|
|
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.old));
|
|
assert(prev_fiber.queue_next == null);
|
|
if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished)
|
|
k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
|
|
},
|
|
.exit => for (k.threads.allocated[0..@atomicLoad(u32, &k.threads.active, .acquire)]) |*each_thread| {
|
|
const changes = [_]posix.Kevent{
|
|
.{
|
|
.ident = 0,
|
|
.filter = std.c.EVFILT.USER,
|
|
.flags = std.c.EV.ADD | std.c.EV.ONESHOT,
|
|
.fflags = std.c.NOTE.TRIGGER,
|
|
.data = 0,
|
|
.udata = @intFromEnum(Completion.UserData.exit),
|
|
},
|
|
};
|
|
_ = kevent(each_thread.kq_fd, &changes, &.{}, null) catch |err| {
|
|
// TODO handle EINTR for cancellation purposes
|
|
@panic(@errorName(err)); // TODO
|
|
};
|
|
},
|
|
}
|
|
}
|
|
};
|
|
|
|
inline fn contextSwitch(message: *const SwitchMessage) *const SwitchMessage {
|
|
return @fieldParentPtr("contexts", Io.fiber.contextSwitch(&message.contexts));
|
|
}
|
|
|
|
fn mainIdleEntry() callconv(.naked) void {
|
|
switch (builtin.cpu.arch) {
|
|
.x86_64 => asm volatile (
|
|
\\ movq (%%rsp), %%rdi
|
|
\\ jmp %[mainIdle:P]
|
|
:
|
|
: [mainIdle] "X" (&mainIdle),
|
|
),
|
|
.aarch64 => asm volatile (
|
|
\\ ldr x0, [sp, #-8]
|
|
\\ b %[mainIdle]
|
|
:
|
|
: [mainIdle] "X" (&mainIdle),
|
|
),
|
|
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
|
|
}
|
|
}
|
|
|
|
fn fiberEntry() callconv(.naked) void {
|
|
switch (builtin.cpu.arch) {
|
|
.x86_64 => asm volatile (
|
|
\\ leaq 8(%%rsp), %%rdi
|
|
\\ jmp %[AsyncClosure_call:P]
|
|
:
|
|
: [AsyncClosure_call] "X" (&AsyncClosure.call),
|
|
),
|
|
.aarch64 => asm volatile (
|
|
\\ mov x0, sp
|
|
\\ b %[AsyncClosure_call]
|
|
:
|
|
: [AsyncClosure_call] "X" (&AsyncClosure.call),
|
|
),
|
|
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
|
|
}
|
|
}
|
|
|
|
const AsyncClosure = struct {
|
|
kqueue: *Kqueue,
|
|
fiber: *Fiber,
|
|
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
|
|
result_align: Alignment,
|
|
already_awaited: bool,
|
|
|
|
fn contextPointer(closure: *AsyncClosure) [*]align(Fiber.max_context_align.toByteUnits()) u8 {
|
|
return @alignCast(@as([*]u8, @ptrCast(closure)) + @sizeOf(AsyncClosure));
|
|
}
|
|
|
|
fn call(closure: *AsyncClosure, message: *const SwitchMessage) callconv(.withStackAlign(.c, @alignOf(AsyncClosure))) noreturn {
|
|
message.handle(closure.kqueue);
|
|
const fiber = closure.fiber;
|
|
std.log.debug("{*} performing async", .{fiber});
|
|
closure.start(closure.contextPointer(), fiber.resultBytes(closure.result_align));
|
|
const awaiter = @atomicRmw(?*Fiber, &fiber.awaiter, .Xchg, Fiber.finished, .acq_rel);
|
|
const ready_awaiter = r: {
|
|
const a = awaiter orelse break :r null;
|
|
if (@atomicRmw(bool, &closure.already_awaited, .Xchg, true, .acq_rel)) break :r null;
|
|
break :r a;
|
|
};
|
|
closure.kqueue.yield(ready_awaiter, .nothing);
|
|
unreachable; // switched to dead fiber
|
|
}
|
|
|
|
fn fromFiber(fiber: *Fiber) *AsyncClosure {
|
|
return @ptrFromInt(Fiber.max_context_align.max(.of(AsyncClosure)).backward(
|
|
@intFromPtr(fiber.allocatedEnd()) - Fiber.max_context_size,
|
|
) - @sizeOf(AsyncClosure));
|
|
}
|
|
};
|
|
|
|
pub fn io(k: *Kqueue) Io {
|
|
return .{
|
|
.userdata = k,
|
|
.vtable = &.{
|
|
.async = async,
|
|
.concurrent = concurrent,
|
|
.await = await,
|
|
.cancel = cancel,
|
|
|
|
.groupAsync = groupAsync,
|
|
.groupConcurrent = groupConcurrent,
|
|
.groupAwait = groupAwait,
|
|
.groupCancel = groupCancel,
|
|
|
|
.dirCreateDir = dirCreateDir,
|
|
.dirCreateDirPath = dirCreateDirPath,
|
|
.dirCreateDirPathOpen = dirCreateDirPathOpen,
|
|
.dirStat = dirStat,
|
|
.dirStatFile = dirStatFile,
|
|
|
|
.fileStat = fileStat,
|
|
.dirAccess = dirAccess,
|
|
.dirCreateFile = dirCreateFile,
|
|
.dirOpenFile = dirOpenFile,
|
|
.dirOpenDir = dirOpenDir,
|
|
.dirClose = dirClose,
|
|
.fileClose = fileClose,
|
|
.fileWriteStreaming = fileWriteStreaming,
|
|
.fileWritePositional = fileWritePositional,
|
|
.fileReadStreaming = fileReadStreaming,
|
|
.fileReadPositional = fileReadPositional,
|
|
.fileSeekBy = fileSeekBy,
|
|
.fileSeekTo = fileSeekTo,
|
|
|
|
.now = now,
|
|
.sleep = sleep,
|
|
|
|
.netListenIp = netListenIp,
|
|
.netListenUnix = netListenUnix,
|
|
.netAccept = netAccept,
|
|
.netBindIp = netBindIp,
|
|
.netConnectIp = netConnectIp,
|
|
.netConnectUnix = netConnectUnix,
|
|
.netClose = netClose,
|
|
.netShutdown = netShutdown,
|
|
.netRead = netRead,
|
|
.netWrite = netWrite,
|
|
.netSend = netSend,
|
|
.netReceive = netReceive,
|
|
.netInterfaceNameResolve = netInterfaceNameResolve,
|
|
.netInterfaceName = netInterfaceName,
|
|
.netLookup = netLookup,
|
|
},
|
|
};
|
|
}
|
|
|
|
fn async(
|
|
userdata: ?*anyopaque,
|
|
result: []u8,
|
|
result_alignment: std.mem.Alignment,
|
|
context: []const u8,
|
|
context_alignment: std.mem.Alignment,
|
|
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
|
|
) ?*Io.AnyFuture {
|
|
return concurrent(userdata, result.len, result_alignment, context, context_alignment, start) catch {
|
|
start(context.ptr, result.ptr);
|
|
return null;
|
|
};
|
|
}
|
|
|
|
fn concurrent(
|
|
userdata: ?*anyopaque,
|
|
result_len: usize,
|
|
result_alignment: Alignment,
|
|
context: []const u8,
|
|
context_alignment: Alignment,
|
|
start: *const fn (context: *const anyopaque, result: *anyopaque) void,
|
|
) Io.ConcurrentError!*Io.AnyFuture {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
assert(result_alignment.compare(.lte, Fiber.max_result_align)); // TODO
|
|
assert(context_alignment.compare(.lte, Fiber.max_context_align)); // TODO
|
|
assert(result_len <= Fiber.max_result_size); // TODO
|
|
assert(context.len <= Fiber.max_context_size); // TODO
|
|
|
|
const fiber = Fiber.allocate(k) catch return error.ConcurrencyUnavailable;
|
|
std.log.debug("allocated {*}", .{fiber});
|
|
|
|
const closure: *AsyncClosure = .fromFiber(fiber);
|
|
fiber.* = .{
|
|
.required_align = {},
|
|
.context = switch (builtin.cpu.arch) {
|
|
.x86_64 => .{
|
|
.rsp = @intFromPtr(closure) - @sizeOf(usize),
|
|
.rbp = 0,
|
|
.rip = @intFromPtr(&fiberEntry),
|
|
},
|
|
.aarch64 => .{
|
|
.sp = @intFromPtr(closure),
|
|
.fp = 0,
|
|
.pc = @intFromPtr(&fiberEntry),
|
|
},
|
|
else => |arch| @compileError("unimplemented architecture: " ++ @tagName(arch)),
|
|
},
|
|
.awaiter = null,
|
|
.queue_next = null,
|
|
.cancel_thread = null,
|
|
.awaiting_completions = .initEmpty(),
|
|
};
|
|
closure.* = .{
|
|
.kqueue = k,
|
|
.fiber = fiber,
|
|
.start = start,
|
|
.result_align = result_alignment,
|
|
.already_awaited = false,
|
|
};
|
|
@memcpy(closure.contextPointer(), context);
|
|
|
|
k.schedule(.current(), .{ .head = fiber, .tail = fiber });
|
|
return @ptrCast(fiber);
|
|
}
|
|
|
|
fn await(
|
|
userdata: ?*anyopaque,
|
|
any_future: *Io.AnyFuture,
|
|
result: []u8,
|
|
result_alignment: std.mem.Alignment,
|
|
) void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
|
|
if (@atomicLoad(?*Fiber, &future_fiber.awaiter, .acquire) != Fiber.finished)
|
|
k.yield(null, .{ .register_awaiter = &future_fiber.awaiter });
|
|
@memcpy(result, future_fiber.resultBytes(result_alignment));
|
|
k.recycle(future_fiber);
|
|
}
|
|
|
|
fn cancel(
|
|
userdata: ?*anyopaque,
|
|
any_future: *Io.AnyFuture,
|
|
result: []u8,
|
|
result_alignment: std.mem.Alignment,
|
|
) void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = any_future;
|
|
_ = result;
|
|
_ = result_alignment;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn cancelRequested(userdata: ?*anyopaque) bool {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
return false; // TODO
|
|
}
|
|
|
|
fn groupAsync(
|
|
userdata: ?*anyopaque,
|
|
type_erased: *Io.Group,
|
|
context: []const u8,
|
|
context_alignment: Alignment,
|
|
start: *const fn (context: *const anyopaque) Io.Cancelable!void,
|
|
) void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = type_erased;
|
|
_ = context;
|
|
_ = context_alignment;
|
|
_ = start;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn groupConcurrent(
|
|
userdata: ?*anyopaque,
|
|
type_erased: *Io.Group,
|
|
context: []const u8,
|
|
context_alignment: Alignment,
|
|
start: *const fn (context: *const anyopaque) Io.Cancelable!void,
|
|
) Io.ConcurrentError!void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = type_erased;
|
|
_ = context;
|
|
_ = context_alignment;
|
|
_ = start;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn groupAwait(userdata: ?*anyopaque, type_erased: *Io.Group, initial_token: *anyopaque) Io.Cancelable!void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = type_erased;
|
|
_ = initial_token;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = group;
|
|
_ = token;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn dirCreateDir(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, permissions: Dir.Permissions) Dir.CreateDirError!void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = dir;
|
|
_ = sub_path;
|
|
_ = permissions;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn dirCreateDirPath(
|
|
userdata: ?*anyopaque,
|
|
dir: Dir,
|
|
sub_path: []const u8,
|
|
permissions: Dir.Permissions,
|
|
) Dir.CreateDirPathError!Dir.CreatePathStatus {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = dir;
|
|
_ = sub_path;
|
|
_ = permissions;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn dirCreateDirPathOpen(
|
|
userdata: ?*anyopaque,
|
|
dir: Dir,
|
|
sub_path: []const u8,
|
|
permissions: Dir.Permissions,
|
|
options: Dir.OpenOptions,
|
|
) Dir.CreateDirPathOpenError!Dir {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = dir;
|
|
_ = sub_path;
|
|
_ = permissions;
|
|
_ = options;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn dirStat(userdata: ?*anyopaque, dir: Dir) Dir.StatError!Dir.Stat {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = dir;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn dirStatFile(
|
|
userdata: ?*anyopaque,
|
|
dir: Dir,
|
|
sub_path: []const u8,
|
|
options: Dir.StatFileOptions,
|
|
) Dir.StatFileError!File.Stat {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = dir;
|
|
_ = sub_path;
|
|
_ = options;
|
|
@panic("TODO");
|
|
}
|
|
fn dirAccess(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, options: Dir.AccessOptions) Dir.AccessError!void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = dir;
|
|
_ = sub_path;
|
|
_ = options;
|
|
@panic("TODO");
|
|
}
|
|
fn dirCreateFile(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, flags: File.CreateFlags) File.OpenError!File {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = dir;
|
|
_ = sub_path;
|
|
_ = flags;
|
|
@panic("TODO");
|
|
}
|
|
fn dirOpenFile(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, flags: File.OpenFlags) File.OpenError!File {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = dir;
|
|
_ = sub_path;
|
|
_ = flags;
|
|
@panic("TODO");
|
|
}
|
|
fn dirOpenDir(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, options: Dir.OpenOptions) Dir.OpenError!Dir {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = dir;
|
|
_ = sub_path;
|
|
_ = options;
|
|
@panic("TODO");
|
|
}
|
|
fn dirClose(userdata: ?*anyopaque, dirs: []const Dir) void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = dirs;
|
|
@panic("TODO");
|
|
}
|
|
fn fileStat(userdata: ?*anyopaque, file: File) File.StatError!File.Stat {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = file;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn fileClose(userdata: ?*anyopaque, files: []const File) void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = files;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn fileWriteStreaming(
|
|
userdata: ?*anyopaque,
|
|
file: File,
|
|
header: []const u8,
|
|
data: []const []const u8,
|
|
splat: usize,
|
|
) File.Writer.Error!usize {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = file;
|
|
_ = header;
|
|
_ = data;
|
|
_ = splat;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn fileWritePositional(
|
|
userdata: ?*anyopaque,
|
|
file: File,
|
|
header: []const u8,
|
|
data: []const []const u8,
|
|
splat: usize,
|
|
offset: u64,
|
|
) File.WritePositionalError!usize {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = file;
|
|
_ = header;
|
|
_ = data;
|
|
_ = splat;
|
|
_ = offset;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn fileReadStreaming(userdata: ?*anyopaque, file: File, data: []const []u8) File.Reader.Error!usize {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = file;
|
|
_ = data;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn fileReadPositional(userdata: ?*anyopaque, file: File, data: []const []u8, offset: u64) File.ReadPositionalError!usize {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = file;
|
|
_ = data;
|
|
_ = offset;
|
|
@panic("TODO");
|
|
}
|
|
fn fileSeekBy(userdata: ?*anyopaque, file: File, relative_offset: i64) File.SeekError!void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = file;
|
|
_ = relative_offset;
|
|
@panic("TODO");
|
|
}
|
|
fn fileSeekTo(userdata: ?*anyopaque, file: File, absolute_offset: u64) File.SeekError!void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = file;
|
|
_ = absolute_offset;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn now(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = clock;
|
|
@panic("TODO");
|
|
}
|
|
fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = timeout;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn netListenIp(
|
|
userdata: ?*anyopaque,
|
|
address: net.IpAddress,
|
|
options: net.IpAddress.ListenOptions,
|
|
) net.IpAddress.ListenError!net.Server {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = address;
|
|
_ = options;
|
|
@panic("TODO");
|
|
}
|
|
fn netAccept(userdata: ?*anyopaque, server: net.Socket.Handle) net.Server.AcceptError!net.Stream {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = server;
|
|
@panic("TODO");
|
|
}
|
|
fn netBindIp(
|
|
userdata: ?*anyopaque,
|
|
address: *const net.IpAddress,
|
|
options: net.IpAddress.BindOptions,
|
|
) net.IpAddress.BindError!net.Socket {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
const family = Io.Threaded.posixAddressFamily(address);
|
|
const socket_fd = try openSocketPosix(k, family, options);
|
|
errdefer closeFd(socket_fd);
|
|
var storage: Io.Threaded.PosixAddress = undefined;
|
|
var addr_len = Io.Threaded.addressToPosix(address, &storage);
|
|
try posixBind(k, socket_fd, &storage.any, addr_len);
|
|
try posixGetSockName(k, socket_fd, &storage.any, &addr_len);
|
|
return .{
|
|
.handle = socket_fd,
|
|
.address = Io.Threaded.addressFromPosix(&storage),
|
|
};
|
|
}
|
|
fn netConnectIp(userdata: ?*anyopaque, address: *const net.IpAddress, options: net.IpAddress.ConnectOptions) net.IpAddress.ConnectError!net.Stream {
|
|
if (options.timeout != .none) @panic("TODO");
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
const family = Io.Threaded.posixAddressFamily(address);
|
|
const socket_fd = try openSocketPosix(k, family, .{
|
|
.mode = options.mode,
|
|
.protocol = options.protocol,
|
|
});
|
|
errdefer closeFd(socket_fd);
|
|
var storage: Io.Threaded.PosixAddress = undefined;
|
|
var addr_len = Io.Threaded.addressToPosix(address, &storage);
|
|
try posixConnect(k, socket_fd, &storage.any, addr_len);
|
|
try posixGetSockName(k, socket_fd, &storage.any, &addr_len);
|
|
return .{ .socket = .{
|
|
.handle = socket_fd,
|
|
.address = Io.Threaded.addressFromPosix(&storage),
|
|
} };
|
|
}
|
|
|
|
fn posixConnect(k: *Kqueue, socket_fd: posix.socket_t, addr: *const posix.sockaddr, addr_len: posix.socklen_t) !void {
|
|
while (true) {
|
|
try k.checkCancel();
|
|
switch (posix.errno(posix.system.connect(socket_fd, addr, addr_len))) {
|
|
.SUCCESS => return,
|
|
.INTR => continue,
|
|
.CANCELED => return error.Canceled,
|
|
.AGAIN => @panic("TODO"),
|
|
.INPROGRESS => return, // Due to TCP fast open, we find out possible error later.
|
|
|
|
.ADDRNOTAVAIL => return error.AddressUnavailable,
|
|
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
|
|
.ALREADY => return error.ConnectionPending,
|
|
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
|
|
.CONNREFUSED => return error.ConnectionRefused,
|
|
.CONNRESET => return error.ConnectionResetByPeer,
|
|
.FAULT => |err| return errnoBug(err),
|
|
.ISCONN => |err| return errnoBug(err),
|
|
.HOSTUNREACH => return error.HostUnreachable,
|
|
.NETUNREACH => return error.NetworkUnreachable,
|
|
.NOTSOCK => |err| return errnoBug(err),
|
|
.PROTOTYPE => |err| return errnoBug(err),
|
|
.TIMEDOUT => return error.Timeout,
|
|
.CONNABORTED => |err| return errnoBug(err),
|
|
.ACCES => return error.AccessDenied,
|
|
.PERM => |err| return errnoBug(err),
|
|
.NOENT => |err| return errnoBug(err),
|
|
.NETDOWN => return error.NetworkDown,
|
|
else => |err| return posix.unexpectedErrno(err),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn netListenUnix(
|
|
userdata: ?*anyopaque,
|
|
unix_address: *const net.UnixAddress,
|
|
options: net.UnixAddress.ListenOptions,
|
|
) net.UnixAddress.ListenError!net.Socket.Handle {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = unix_address;
|
|
_ = options;
|
|
@panic("TODO");
|
|
}
|
|
fn netConnectUnix(
|
|
userdata: ?*anyopaque,
|
|
unix_address: *const net.UnixAddress,
|
|
) net.UnixAddress.ConnectError!net.Socket.Handle {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = unix_address;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn netSend(
|
|
userdata: ?*anyopaque,
|
|
handle: net.Socket.Handle,
|
|
outgoing_messages: []net.OutgoingMessage,
|
|
flags: net.SendFlags,
|
|
) struct { ?net.Socket.SendError, usize } {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
|
|
const posix_flags: u32 =
|
|
@as(u32, if (@hasDecl(posix.MSG, "CONFIRM") and flags.confirm) posix.MSG.CONFIRM else 0) |
|
|
@as(u32, if (@hasDecl(posix.MSG, "DONTROUTE") and flags.dont_route) posix.MSG.DONTROUTE else 0) |
|
|
@as(u32, if (@hasDecl(posix.MSG, "EOR") and flags.eor) posix.MSG.EOR else 0) |
|
|
@as(u32, if (@hasDecl(posix.MSG, "OOB") and flags.oob) posix.MSG.OOB else 0) |
|
|
@as(u32, if (@hasDecl(posix.MSG, "FASTOPEN") and flags.fastopen) posix.MSG.FASTOPEN else 0) |
|
|
posix.MSG.NOSIGNAL;
|
|
|
|
for (outgoing_messages, 0..) |*msg, i| {
|
|
netSendOne(k, handle, msg, posix_flags) catch |err| return .{ err, i };
|
|
}
|
|
|
|
return .{ null, outgoing_messages.len };
|
|
}
|
|
|
|
fn netSendOne(
|
|
k: *Kqueue,
|
|
handle: net.Socket.Handle,
|
|
message: *net.OutgoingMessage,
|
|
flags: u32,
|
|
) net.Socket.SendError!void {
|
|
var addr: Io.Threaded.PosixAddress = undefined;
|
|
var iovec: posix.iovec_const = .{ .base = @constCast(message.data_ptr), .len = message.data_len };
|
|
const msg: posix.msghdr_const = .{
|
|
.name = &addr.any,
|
|
.namelen = Io.Threaded.addressToPosix(message.address, &addr),
|
|
.iov = (&iovec)[0..1],
|
|
.iovlen = 1,
|
|
// OS returns EINVAL if this pointer is invalid even if controllen is zero.
|
|
.control = if (message.control.len == 0) null else @constCast(message.control.ptr),
|
|
.controllen = @intCast(message.control.len),
|
|
.flags = 0,
|
|
};
|
|
while (true) {
|
|
try k.checkCancel();
|
|
const rc = posix.system.sendmsg(handle, &msg, flags);
|
|
switch (posix.errno(rc)) {
|
|
.SUCCESS => {
|
|
message.data_len = @intCast(rc);
|
|
return;
|
|
},
|
|
.INTR => continue,
|
|
.CANCELED => return error.Canceled,
|
|
.AGAIN => @panic("TODO register kevent"),
|
|
|
|
.ACCES => return error.AccessDenied,
|
|
.ALREADY => return error.FastOpenAlreadyInProgress,
|
|
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
|
|
.CONNRESET => return error.ConnectionResetByPeer,
|
|
.DESTADDRREQ => |err| return errnoBug(err),
|
|
.FAULT => |err| return errnoBug(err),
|
|
.INVAL => |err| return errnoBug(err),
|
|
.ISCONN => |err| return errnoBug(err),
|
|
.MSGSIZE => return error.MessageOversize,
|
|
.NOBUFS => return error.SystemResources,
|
|
.NOMEM => return error.SystemResources,
|
|
.NOTSOCK => |err| return errnoBug(err),
|
|
.OPNOTSUPP => |err| return errnoBug(err),
|
|
.PIPE => return error.SocketUnconnected,
|
|
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
|
|
.HOSTUNREACH => return error.HostUnreachable,
|
|
.NETUNREACH => return error.NetworkUnreachable,
|
|
.NOTCONN => return error.SocketUnconnected,
|
|
.NETDOWN => return error.NetworkDown,
|
|
else => |err| return posix.unexpectedErrno(err),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn netReceive(
|
|
userdata: ?*anyopaque,
|
|
handle: net.Socket.Handle,
|
|
message_buffer: []net.IncomingMessage,
|
|
data_buffer: []u8,
|
|
flags: net.ReceiveFlags,
|
|
timeout: Io.Timeout,
|
|
) struct { ?net.Socket.ReceiveTimeoutError, usize } {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = handle;
|
|
_ = message_buffer;
|
|
_ = data_buffer;
|
|
_ = flags;
|
|
_ = timeout;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn netRead(userdata: ?*anyopaque, fd: net.Socket.Handle, data: [][]u8) net.Stream.Reader.Error!usize {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
|
|
var iovecs_buffer: [max_iovecs_len]posix.iovec = undefined;
|
|
var i: usize = 0;
|
|
for (data) |buf| {
|
|
if (iovecs_buffer.len - i == 0) break;
|
|
if (buf.len != 0) {
|
|
iovecs_buffer[i] = .{ .base = buf.ptr, .len = buf.len };
|
|
i += 1;
|
|
}
|
|
}
|
|
const dest = iovecs_buffer[0..i];
|
|
assert(dest[0].len > 0);
|
|
|
|
while (true) {
|
|
try k.checkCancel();
|
|
const rc = posix.system.readv(fd, dest.ptr, @intCast(dest.len));
|
|
switch (posix.errno(rc)) {
|
|
.SUCCESS => return @intCast(rc),
|
|
.INTR => continue,
|
|
.CANCELED => return error.Canceled,
|
|
.AGAIN => {
|
|
const thread: *Thread = .current();
|
|
const fiber = thread.currentFiber();
|
|
const ident: u32 = @bitCast(fd);
|
|
const filter = std.c.EVFILT.READ;
|
|
const gop = thread.wait_queues.getOrPut(k.gpa, .{
|
|
.ident = ident,
|
|
.filter = filter,
|
|
}) catch return error.SystemResources;
|
|
if (gop.found_existing) {
|
|
const tail_fiber = gop.value_ptr.*;
|
|
assert(tail_fiber.queue_next == null);
|
|
tail_fiber.queue_next = fiber;
|
|
gop.value_ptr.* = fiber;
|
|
} else {
|
|
gop.value_ptr.* = fiber;
|
|
const changes = [_]posix.Kevent{
|
|
.{
|
|
.ident = ident,
|
|
.filter = filter,
|
|
.flags = std.c.EV.ADD | std.c.EV.ONESHOT,
|
|
.fflags = 0,
|
|
.data = 0,
|
|
.udata = @intFromPtr(fiber),
|
|
},
|
|
};
|
|
assert(0 == (kevent(thread.kq_fd, &changes, &.{}, null) catch |err| {
|
|
// TODO handle EINTR for cancellation purposes
|
|
@panic(@errorName(err)); // TODO
|
|
}));
|
|
}
|
|
yield(k, null, .nothing);
|
|
continue;
|
|
},
|
|
|
|
.INVAL => |err| return errnoBug(err),
|
|
.FAULT => |err| return errnoBug(err),
|
|
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
|
|
.NOBUFS => return error.SystemResources,
|
|
.NOMEM => return error.SystemResources,
|
|
.NOTCONN => return error.SocketUnconnected,
|
|
.CONNRESET => return error.ConnectionResetByPeer,
|
|
.TIMEDOUT => return error.Timeout,
|
|
.PIPE => return error.SocketUnconnected,
|
|
.NETDOWN => return error.NetworkDown,
|
|
else => |err| return posix.unexpectedErrno(err),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn netWrite(userdata: ?*anyopaque, dest: net.Socket.Handle, header: []const u8, data: []const []const u8, splat: usize) net.Stream.Writer.Error!usize {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = dest;
|
|
_ = header;
|
|
_ = data;
|
|
_ = splat;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn netClose(userdata: ?*anyopaque, handles: []const net.Socket.Handle) void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = handles;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn netShutdown(userdata: ?*anyopaque, handle: net.Socket.Handle, how: net.ShutdownHow) net.ShutdownError!void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = handle;
|
|
_ = how;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn netInterfaceNameResolve(
|
|
userdata: ?*anyopaque,
|
|
name: *const net.Interface.Name,
|
|
) net.Interface.Name.ResolveError!net.Interface {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = name;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn netInterfaceName(userdata: ?*anyopaque, interface: net.Interface) net.Interface.NameError!net.Interface.Name {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = interface;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn netLookup(
|
|
userdata: ?*anyopaque,
|
|
host_name: net.HostName,
|
|
resolved: *Io.Queue(net.HostName.LookupResult),
|
|
options: net.HostName.LookupOptions,
|
|
) net.HostName.LookupError!void {
|
|
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
|
_ = k;
|
|
_ = host_name;
|
|
_ = resolved;
|
|
_ = options;
|
|
@panic("TODO");
|
|
}
|
|
|
|
fn openSocketPosix(
|
|
k: *Kqueue,
|
|
family: posix.sa_family_t,
|
|
options: IpAddress.BindOptions,
|
|
) error{
|
|
AddressFamilyUnsupported,
|
|
ProtocolUnsupportedBySystem,
|
|
ProcessFdQuotaExceeded,
|
|
SystemFdQuotaExceeded,
|
|
SystemResources,
|
|
ProtocolUnsupportedByAddressFamily,
|
|
SocketModeUnsupported,
|
|
OptionUnsupported,
|
|
Unexpected,
|
|
Canceled,
|
|
}!posix.socket_t {
|
|
const mode = Io.Threaded.posixSocketMode(options.mode);
|
|
const protocol = Io.Threaded.posixProtocol(options.protocol);
|
|
const socket_fd = while (true) {
|
|
try k.checkCancel();
|
|
const flags: u32 = mode | if (Io.Threaded.socket_flags_unsupported) 0 else posix.SOCK.CLOEXEC;
|
|
const socket_rc = posix.system.socket(family, flags, protocol);
|
|
switch (posix.errno(socket_rc)) {
|
|
.SUCCESS => {
|
|
const fd: posix.fd_t = @intCast(socket_rc);
|
|
errdefer closeFd(fd);
|
|
if (Io.Threaded.socket_flags_unsupported) {
|
|
while (true) {
|
|
try k.checkCancel();
|
|
switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFD, @as(usize, posix.FD_CLOEXEC)))) {
|
|
.SUCCESS => break,
|
|
.INTR => continue,
|
|
.CANCELED => return error.Canceled,
|
|
else => |err| return posix.unexpectedErrno(err),
|
|
}
|
|
}
|
|
|
|
var fl_flags: usize = while (true) {
|
|
try k.checkCancel();
|
|
const rc = posix.system.fcntl(fd, posix.F.GETFL, @as(usize, 0));
|
|
switch (posix.errno(rc)) {
|
|
.SUCCESS => break @intCast(rc),
|
|
.INTR => continue,
|
|
.CANCELED => return error.Canceled,
|
|
else => |err| return posix.unexpectedErrno(err),
|
|
}
|
|
};
|
|
fl_flags |= @as(usize, 1 << @bitOffsetOf(posix.O, "NONBLOCK"));
|
|
while (true) {
|
|
try k.checkCancel();
|
|
switch (posix.errno(posix.system.fcntl(fd, posix.F.SETFL, fl_flags))) {
|
|
.SUCCESS => break,
|
|
.INTR => continue,
|
|
.CANCELED => return error.Canceled,
|
|
else => |err| return posix.unexpectedErrno(err),
|
|
}
|
|
}
|
|
}
|
|
break fd;
|
|
},
|
|
.INTR => continue,
|
|
.CANCELED => return error.Canceled,
|
|
|
|
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
|
|
.INVAL => return error.ProtocolUnsupportedBySystem,
|
|
.MFILE => return error.ProcessFdQuotaExceeded,
|
|
.NFILE => return error.SystemFdQuotaExceeded,
|
|
.NOBUFS => return error.SystemResources,
|
|
.NOMEM => return error.SystemResources,
|
|
.PROTONOSUPPORT => return error.ProtocolUnsupportedByAddressFamily,
|
|
.PROTOTYPE => return error.SocketModeUnsupported,
|
|
else => |err| return posix.unexpectedErrno(err),
|
|
}
|
|
};
|
|
errdefer closeFd(socket_fd);
|
|
|
|
if (options.ip6_only) {
|
|
if (posix.IPV6 == void) return error.OptionUnsupported;
|
|
try setSocketOption(k, socket_fd, posix.IPPROTO.IPV6, posix.IPV6.V6ONLY, 0);
|
|
}
|
|
|
|
return socket_fd;
|
|
}
|
|
|
|
fn posixBind(
|
|
k: *Kqueue,
|
|
socket_fd: posix.socket_t,
|
|
addr: *const posix.sockaddr,
|
|
addr_len: posix.socklen_t,
|
|
) !void {
|
|
while (true) {
|
|
try k.checkCancel();
|
|
switch (posix.errno(posix.system.bind(socket_fd, addr, addr_len))) {
|
|
.SUCCESS => break,
|
|
.INTR => continue,
|
|
.CANCELED => return error.Canceled,
|
|
|
|
.ADDRINUSE => return error.AddressInUse,
|
|
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
|
|
.INVAL => |err| return errnoBug(err), // invalid parameters
|
|
.NOTSOCK => |err| return errnoBug(err), // invalid `sockfd`
|
|
.AFNOSUPPORT => return error.AddressFamilyUnsupported,
|
|
.ADDRNOTAVAIL => return error.AddressUnavailable,
|
|
.FAULT => |err| return errnoBug(err), // invalid `addr` pointer
|
|
.NOMEM => return error.SystemResources,
|
|
else => |err| return posix.unexpectedErrno(err),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn posixGetSockName(k: *Kqueue, socket_fd: posix.fd_t, addr: *posix.sockaddr, addr_len: *posix.socklen_t) !void {
|
|
while (true) {
|
|
try k.checkCancel();
|
|
switch (posix.errno(posix.system.getsockname(socket_fd, addr, addr_len))) {
|
|
.SUCCESS => break,
|
|
.INTR => continue,
|
|
.CANCELED => return error.Canceled,
|
|
|
|
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
|
|
.FAULT => |err| return errnoBug(err),
|
|
.INVAL => |err| return errnoBug(err), // invalid parameters
|
|
.NOTSOCK => |err| return errnoBug(err), // always a race condition
|
|
.NOBUFS => return error.SystemResources,
|
|
else => |err| return posix.unexpectedErrno(err),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn setSocketOption(k: *Kqueue, fd: posix.fd_t, level: i32, opt_name: u32, option: u32) !void {
|
|
const o: []const u8 = @ptrCast(&option);
|
|
while (true) {
|
|
try k.checkCancel();
|
|
switch (posix.errno(posix.system.setsockopt(fd, level, opt_name, o.ptr, @intCast(o.len)))) {
|
|
.SUCCESS => return,
|
|
.INTR => continue,
|
|
.CANCELED => return error.Canceled,
|
|
|
|
.BADF => |err| return errnoBug(err), // File descriptor used after closed.
|
|
.NOTSOCK => |err| return errnoBug(err),
|
|
.INVAL => |err| return errnoBug(err),
|
|
.FAULT => |err| return errnoBug(err),
|
|
else => |err| return posix.unexpectedErrno(err),
|
|
}
|
|
}
|
|
}
|
|
|
|
fn checkCancel(k: *Kqueue) error{Canceled}!void {
|
|
if (cancelRequested(k)) return error.Canceled;
|
|
}
|
|
|
|
pub const KEventError = error{
|
|
/// The process does not have permission to register a filter.
|
|
AccessDenied,
|
|
/// The event could not be found to be modified or deleted.
|
|
EventNotFound,
|
|
/// No memory was available to register the event.
|
|
SystemResources,
|
|
/// The specified process to attach to does not exist.
|
|
ProcessNotFound,
|
|
/// changelist or eventlist had too many items on it.
|
|
/// TODO remove this possibility
|
|
Overflow,
|
|
};
|
|
|
|
pub fn kevent(
|
|
kq: i32,
|
|
changelist: []const posix.Kevent,
|
|
eventlist: []posix.Kevent,
|
|
timeout: ?*const posix.timespec,
|
|
) KEventError!usize {
|
|
while (true) {
|
|
const rc = posix.system.kevent(
|
|
kq,
|
|
changelist.ptr,
|
|
std.math.cast(c_int, changelist.len) orelse return error.Overflow,
|
|
eventlist.ptr,
|
|
std.math.cast(c_int, eventlist.len) orelse return error.Overflow,
|
|
timeout,
|
|
);
|
|
switch (posix.errno(rc)) {
|
|
.SUCCESS => return @intCast(rc),
|
|
.ACCES => return error.AccessDenied,
|
|
.FAULT => unreachable, // TODO use error.Unexpected for these
|
|
.BADF => unreachable, // Always a race condition.
|
|
.INTR => continue, // TODO handle cancelation
|
|
.INVAL => unreachable,
|
|
.NOENT => return error.EventNotFound,
|
|
.NOMEM => return error.SystemResources,
|
|
.SRCH => return error.ProcessNotFound,
|
|
else => unreachable,
|
|
}
|
|
}
|
|
}
|