std.Io.Kqueue: fix bitrot

This commit is contained in:
Andrew Kelley 2026-01-07 18:00:36 -08:00
parent c0092f5394
commit ce89006035
2 changed files with 91 additions and 181 deletions

View file

@ -1358,7 +1358,7 @@ pub const Mutex = extern struct {
pub const init: Mutex = .{ .state = .init(.unlocked) };
const State = enum(u32) {
pub const State = enum(u32) {
unlocked,
locked_once,
contended,

View file

@ -239,7 +239,7 @@ pub const CreateFileDescriptorError = error{
ProcessFdQuotaExceeded,
/// The system-wide limit on the total number of open files has been reached.
SystemFdQuotaExceeded,
} || Io.Unexpected;
} || Io.UnexpectedError;
pub fn createFileDescriptor() CreateFileDescriptorError!posix.fd_t {
const rc = posix.system.kqueue();
@ -494,14 +494,6 @@ const SwitchMessage = struct {
recycle: *Fiber,
register_awaiter: *?*Fiber,
register_select: []const *Io.AnyFuture,
mutex_lock: struct {
prev_state: Io.Mutex.State,
mutex: *Io.Mutex,
},
condition_wait: struct {
cond: *Io.Condition,
mutex: *Io.Mutex,
},
exit,
};
@ -537,59 +529,6 @@ const SwitchMessage = struct {
}
}
},
.mutex_lock => |mutex_lock| {
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
assert(prev_fiber.queue_next == null);
var prev_state = mutex_lock.prev_state;
while (switch (prev_state) {
else => next_state: {
prev_fiber.queue_next = @ptrFromInt(@intFromEnum(prev_state));
break :next_state @cmpxchgWeak(
Io.Mutex.State,
&mutex_lock.mutex.state,
prev_state,
@enumFromInt(@intFromPtr(prev_fiber)),
.release,
.acquire,
);
},
.unlocked => @cmpxchgWeak(
Io.Mutex.State,
&mutex_lock.mutex.state,
.unlocked,
.locked_once,
.acquire,
.acquire,
) orelse {
prev_fiber.queue_next = null;
k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
return;
},
}) |next_state| prev_state = next_state;
},
.condition_wait => |condition_wait| {
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.prev));
assert(prev_fiber.queue_next == null);
const cond_impl = prev_fiber.resultPointer(Condition);
cond_impl.* = .{
.tail = prev_fiber,
.event = .queued,
};
if (@cmpxchgStrong(
?*Fiber,
@as(*?*Fiber, @ptrCast(&condition_wait.cond.state)),
null,
prev_fiber,
.release,
.acquire,
)) |waiting_fiber| {
const waiting_cond_impl = waiting_fiber.?.resultPointer(Condition);
assert(waiting_cond_impl.tail.queue_next == null);
waiting_cond_impl.tail.queue_next = prev_fiber;
waiting_cond_impl.tail = prev_fiber;
}
condition_wait.mutex.unlock(k.io());
},
.exit => for (k.threads.allocated[0..@atomicLoad(u32, &k.threads.active, .acquire)]) |*each_thread| {
const changes = [_]posix.Kevent{
.{
@ -878,21 +817,13 @@ pub fn io(k: *Kqueue) Io {
.concurrent = concurrent,
.await = await,
.cancel = cancel,
.cancelRequested = cancelRequested,
.select = select,
.groupAsync = groupAsync,
.groupWait = groupWait,
.groupConcurrent = groupConcurrent,
.groupAwait = groupAwait,
.groupCancel = groupCancel,
.mutexLock = mutexLock,
.mutexLockUncancelable = mutexLockUncancelable,
.mutexUnlock = mutexUnlock,
.conditionWait = conditionWait,
.conditionWaitUncancelable = conditionWaitUncancelable,
.conditionWake = conditionWake,
.dirCreateDir = dirCreateDir,
.dirCreateDirPath = dirCreateDirPath,
.dirCreateDirPathOpen = dirCreateDirPathOpen,
@ -912,7 +843,6 @@ pub fn io(k: *Kqueue) Io {
.fileReadPositional = fileReadPositional,
.fileSeekBy = fileSeekBy,
.fileSeekTo = fileSeekTo,
.openExecutable = openExecutable,
.now = now,
.sleep = sleep,
@ -1037,25 +967,41 @@ fn cancelRequested(userdata: ?*anyopaque) bool {
fn groupAsync(
userdata: ?*anyopaque,
group: *Io.Group,
type_erased: *Io.Group,
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (*Io.Group, context: *const anyopaque) void,
context_alignment: Alignment,
start: *const fn (context: *const anyopaque) Io.Cancelable!void,
) void {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = group;
_ = type_erased;
_ = context;
_ = context_alignment;
_ = start;
@panic("TODO");
}
fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
fn groupConcurrent(
userdata: ?*anyopaque,
type_erased: *Io.Group,
context: []const u8,
context_alignment: Alignment,
start: *const fn (context: *const anyopaque) Io.Cancelable!void,
) Io.ConcurrentError!void {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = group;
_ = token;
_ = type_erased;
_ = context;
_ = context_alignment;
_ = start;
@panic("TODO");
}
fn groupAwait(userdata: ?*anyopaque, type_erased: *Io.Group, initial_token: *anyopaque) Io.Cancelable!void {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = type_erased;
_ = initial_token;
@panic("TODO");
}
@ -1074,102 +1020,58 @@ fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) Io.Cancelable!u
@panic("TODO");
}
fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) Io.Cancelable!void {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = prev_state;
_ = mutex;
@panic("TODO");
}
fn mutexLockUncancelable(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = prev_state;
_ = mutex;
@panic("TODO");
}
fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) void {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = prev_state;
_ = mutex;
@panic("TODO");
}
fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) Io.Cancelable!void {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
k.yield(null, .{ .condition_wait = .{ .cond = cond, .mutex = mutex } });
const thread = Thread.current();
const fiber = thread.currentFiber();
const cond_impl = fiber.resultPointer(Condition);
try mutex.lock(k.io());
switch (cond_impl.event) {
.queued => {},
.wake => |wake| if (fiber.queue_next) |next_fiber| switch (wake) {
.one => if (@cmpxchgStrong(
?*Fiber,
@as(*?*Fiber, @ptrCast(&cond.state)),
null,
next_fiber,
.release,
.acquire,
)) |old_fiber| {
const old_cond_impl = old_fiber.?.resultPointer(Condition);
assert(old_cond_impl.tail.queue_next == null);
old_cond_impl.tail.queue_next = next_fiber;
old_cond_impl.tail = cond_impl.tail;
},
.all => k.schedule(thread, .{ .head = next_fiber, .tail = cond_impl.tail }),
},
}
fiber.queue_next = null;
}
fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) void {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = cond;
_ = mutex;
@panic("TODO");
}
fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.Wake) void {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
const waiting_fiber = @atomicRmw(?*Fiber, @as(*?*Fiber, @ptrCast(&cond.state)), .Xchg, null, .acquire) orelse return;
waiting_fiber.resultPointer(Condition).event = .{ .wake = wake };
k.yield(waiting_fiber, .reschedule);
}
fn dirCreateDir(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, mode: Dir.Mode) Dir.CreateDirError!void {
fn dirCreateDir(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, permissions: Dir.Permissions) Dir.CreateDirError!void {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = dir;
_ = sub_path;
_ = mode;
_ = permissions;
@panic("TODO");
}
fn dirCreateDirPath(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, mode: Dir.Mode) Dir.CreateDirError!void {
fn dirCreateDirPath(
userdata: ?*anyopaque,
dir: Dir,
sub_path: []const u8,
permissions: Dir.Permissions,
) Dir.CreateDirPathError!Dir.CreatePathStatus {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = dir;
_ = sub_path;
_ = mode;
_ = permissions;
@panic("TODO");
}
fn dirCreateDirPathOpen(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, options: Dir.OpenOptions) Dir.CreateDirPathOpenError!Dir {
fn dirCreateDirPathOpen(
userdata: ?*anyopaque,
dir: Dir,
sub_path: []const u8,
permissions: Dir.Permissions,
options: Dir.OpenOptions,
) Dir.CreateDirPathOpenError!Dir {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = dir;
_ = sub_path;
_ = permissions;
_ = options;
@panic("TODO");
}
fn dirStat(userdata: ?*anyopaque, dir: Dir) Dir.StatError!Dir.Stat {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = dir;
@panic("TODO");
}
fn dirStatFile(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, options: Dir.StatPathOptions) Dir.StatFileError!File.Stat {
fn dirStatFile(
userdata: ?*anyopaque,
dir: Dir,
sub_path: []const u8,
options: Dir.StatFileOptions,
) Dir.StatFileError!File.Stat {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = dir;
@ -1209,10 +1111,10 @@ fn dirOpenDir(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, options: Di
_ = options;
@panic("TODO");
}
fn dirClose(userdata: ?*anyopaque, dir: Dir) void {
fn dirClose(userdata: ?*anyopaque, dirs: []const Dir) void {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = dir;
_ = dirs;
@panic("TODO");
}
fn fileStat(userdata: ?*anyopaque, file: File) File.StatError!File.Stat {
@ -1221,35 +1123,57 @@ fn fileStat(userdata: ?*anyopaque, file: File) File.StatError!File.Stat {
_ = file;
@panic("TODO");
}
fn fileClose(userdata: ?*anyopaque, file: File) void {
fn fileClose(userdata: ?*anyopaque, files: []const File) void {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = file;
_ = files;
@panic("TODO");
}
fn fileWriteStreaming(userdata: ?*anyopaque, file: File, buffer: [][]const u8) File.WriteStreamingError!usize {
fn fileWriteStreaming(
userdata: ?*anyopaque,
file: File,
header: []const u8,
data: []const []const u8,
splat: usize,
) File.Writer.Error!usize {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = file;
_ = buffer;
_ = header;
_ = data;
_ = splat;
@panic("TODO");
}
fn fileWritePositional(userdata: ?*anyopaque, file: File, buffer: [][]const u8, offset: u64) File.WritePositionalError!usize {
fn fileWritePositional(
userdata: ?*anyopaque,
file: File,
header: []const u8,
data: []const []const u8,
splat: usize,
offset: u64,
) File.WritePositionalError!usize {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = file;
_ = buffer;
_ = header;
_ = data;
_ = splat;
_ = offset;
@panic("TODO");
}
fn fileReadStreaming(userdata: ?*anyopaque, file: File, data: [][]u8) File.Reader.Error!usize {
fn fileReadStreaming(userdata: ?*anyopaque, file: File, data: []const []u8) File.Reader.Error!usize {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = file;
_ = data;
@panic("TODO");
}
fn fileReadPositional(userdata: ?*anyopaque, file: File, data: [][]u8, offset: u64) File.ReadPositionalError!usize {
fn fileReadPositional(userdata: ?*anyopaque, file: File, data: []const []u8, offset: u64) File.ReadPositionalError!usize {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = file;
@ -1271,12 +1195,6 @@ fn fileSeekTo(userdata: ?*anyopaque, file: File, absolute_offset: u64) File.Seek
_ = absolute_offset;
@panic("TODO");
}
fn openExecutable(userdata: ?*anyopaque, file: File.OpenFlags) File.OpenExecutableError!File {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = file;
@panic("TODO");
}
fn now(userdata: ?*anyopaque, clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
@ -1576,10 +1494,10 @@ fn netWrite(userdata: ?*anyopaque, dest: net.Socket.Handle, header: []const u8,
@panic("TODO");
}
fn netClose(userdata: ?*anyopaque, handle: net.Socket.Handle) void {
fn netClose(userdata: ?*anyopaque, handles: []const net.Socket.Handle) void {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = handle;
_ = handles;
@panic("TODO");
}
@ -1611,13 +1529,13 @@ fn netInterfaceName(userdata: ?*anyopaque, interface: net.Interface) net.Interfa
fn netLookup(
userdata: ?*anyopaque,
host_name: net.HostName,
result: *Io.Queue(net.HostName.LookupResult),
resolved: *Io.Queue(net.HostName.LookupResult),
options: net.HostName.LookupOptions,
) void {
) net.HostName.LookupError!void {
const k: *Kqueue = @ptrCast(@alignCast(userdata));
_ = k;
_ = host_name;
_ = result;
_ = resolved;
_ = options;
@panic("TODO");
}
@ -1772,14 +1690,6 @@ fn checkCancel(k: *Kqueue) error{Canceled}!void {
if (cancelRequested(k)) return error.Canceled;
}
const Condition = struct {
tail: *Fiber,
event: union(enum) {
queued,
wake: Io.Condition.Wake,
},
};
pub const KEventError = error{
/// The process does not have permission to register a filter.
AccessDenied,