From 0da5d5f15063455eaab9a350352914a062cf18e2 Mon Sep 17 00:00:00 2001 From: Matthew Lugg Date: Tue, 23 Dec 2025 20:54:50 +0000 Subject: [PATCH] std.Io.Threaded: rework cancelation and groups The goal of this internal refactor is to fix some bugs in cancelation and allow group tasks to clean up their own resources eagerly. The latter will become a guarantee of the `std.Io` interface, which is important so that groups can be used to "detach" tasks. This commit changes the API which POSIX system calls use internally (the functions formerly called `beginSyscall` etc), but does not update the usage sites yet. --- lib/std/Io/Threaded.zig | 1452 +++++++++++++++++++++++++-------------- 1 file changed, 930 insertions(+), 522 deletions(-) diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index c9e6ba3fc7..f95738e35d 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -37,7 +37,7 @@ cpu_count_error: ?std.Thread.CpuCountError, /// available count, subtract this from either `async_limit` or /// `concurrent_limit`. busy_count: usize = 0, -main_thread: Thread, +worker_threads: std.atomic.Value(?*Thread), pid: Pid = .unknown, robust_cancel: RobustCancel, @@ -153,107 +153,465 @@ pub const UseFchmodat2 = if (have_fchmodat2 and !have_fchmodat_flags) enum { pub const default: UseFchmodat2 = .disabled; }; +const Runnable = struct { + node: std.SinglyLinkedList.Node, + startFn: *const fn (*Runnable, *Thread, *Threaded) void, +}; + +const Group = struct { + ptr: *Io.Group, + + /// Returns a correctly-typed pointer to the `Io.Group.token` field. + /// + /// The status indicates how many pending tasks are in the group, whether the group has been + /// canceled, and whether the group has been awaited. + /// + /// Note that the zero value of `Status` intentionally represents the initial group state (empty + /// with no awaiters). This is a requirement of `Io.Group`. + fn status(g: Group) *std.atomic.Value(Status) { + return @ptrCast(&g.ptr.token); + } + /// Returns a correctly-typed pointer to the `Io.Group.state` field. The double-pointer here is + /// intentional, because the `state` field itself stores a pointer, and this function returns a + /// pointer to that field. + /// + /// On completion of the whole group, if `status` indicates that there is an awaiter, the last + /// task must increment this `u32` and do a futex wake on it to signal that awaiter. + fn awaiter(g: Group) **std.atomic.Value(u32) { + return @ptrCast(&g.ptr.state); + } + + const Status = packed struct(usize) { + num_running: @Int(.unsigned, @bitSizeOf(usize) - 2), + have_awaiter: bool, + canceled: bool, + }; + + const Task = struct { + runnable: Runnable, + group: *Io.Group, + func: *const fn (*Io.Group, context: *const anyopaque) void, + context_alignment: Alignment, + alloc_len: usize, + + /// `Task.runnable.node` is `undefined` in the created `Task`. + fn create( + gpa: Allocator, + group: Group, + context: []const u8, + context_alignment: Alignment, + func: *const fn (*Io.Group, context: *const anyopaque) void, + ) Allocator.Error!*Task { + const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(Task); + const worst_case_context_offset = context_alignment.forward(@sizeOf(Task) + max_context_misalignment); + const alloc_len = worst_case_context_offset + context.len; + + const task: *Task = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(Task), alloc_len))); + errdefer comptime unreachable; + + task.* = .{ + .runnable = .{ + .node = undefined, + .startFn = &start, + }, + .group = group.ptr, + .func = func, + .context_alignment = context_alignment, + .alloc_len = alloc_len, + }; + @memcpy(task.contextPointer()[0..context.len], context); + return task; + } + + fn destroy(task: *Task, gpa: Allocator) void { + const base: [*]align(@alignOf(Task)) u8 = @ptrCast(task); + gpa.free(base[0..task.alloc_len]); + } + + fn contextPointer(task: *Task) [*]u8 { + const base: [*]u8 = @ptrCast(task); + const offset = task.context_alignment.forward(@intFromPtr(base) + @sizeOf(Task)) - @intFromPtr(base); + return base + offset; + } + + fn start(r: *Runnable, thread: *Thread, t: *Threaded) void { + const task: *Task = @fieldParentPtr("runnable", r); + const group: Group = .{ .ptr = task.group }; + + // This would be a simple store, but it's upgraded to an RMW so we can use `.acquire` to + // enforce the ordering between this and the `group.status().load` below. Paired with + // the `.release` rmw on `Thread.status` in `cancelThreads`, this creates a StoreLoad + // barrier which guarantees that when a group is canceled, either we see the cancelation + // in the group status, or the canceler sees our thread status so can directly notify us + // of the cancelation. + _ = thread.status.swap(.{ + .cancelation = .none, + .awaitable = .fromGroup(group.ptr), + }, .acquire); + if (group.status().load(.monotonic).canceled) { + thread.status.store(.{ + .cancelation = .canceling, + .awaitable = .fromGroup(group.ptr), + }, .monotonic); + } + + assertGroupResult(task.func(group.ptr, task.contextPointer())); + + thread.status.store(.{ .cancelation = .none, .awaitable = .null }, .monotonic); + const old_status = group.status().fetchSub(.{ + .num_running = 1, + .have_awaiter = false, + .canceled = false, + }, .acq_rel); // acquire `group.awaiter()`, release task results + assert(old_status.num_running > 0); + if (old_status.have_awaiter and old_status.num_running == 1) { + const to_signal = group.awaiter().*; + // `awaiter` should only be modified by us. For another thread to see `num_running` + // drop to 0 after this point would indicate that another task started up, meaning + // `async`/`cancel` was racing with awaited group completion. + group.awaiter().* = undefined; + _ = to_signal.fetchAdd(1, .release); // release results + Thread.futexWake(&to_signal.raw, 1); + } + + // Task completed. Self-destruct sequence initiated. + task.destroy(t.allocator); + } + }; + + /// Assumes the caller has already atomically updated the group status to indicate cancelation, + /// and notifies any already-running threads of this cancelation. + fn cancelThreads(g: Group, t: *Threaded) bool { + var any_blocked = false; + var it = t.worker_threads.load(.acquire); // acquire `Thread` values + while (it) |thread| : (it = thread.next) { + // This non-mutating RMW exists for ordering reasons: see comment in `Group.Task.start` for reasons. + _ = thread.status.fetchOr(.{ .cancelation = @enumFromInt(0), .awaitable = .null }, .release); + if (thread.cancelAwaitable(.fromGroup(g.ptr))) any_blocked = true; + } + return any_blocked; + } + + /// Uses `Thread.signalCanceledSyscall` to signal any threads which are still blocked in a + /// syscall for this group and have not observed a cancelation request yet. Returns `true` if + /// more signals may be necessary, in which case the caller must call this again after a delay. + fn signalAllCanceledSyscalls(g: Group, t: *Threaded) bool { + var any_signaled = false; + var it = t.worker_threads.load(.acquire); // acquire `Thread` values + while (it) |thread| : (it = thread.next) { + if (thread.signalCanceledSyscall(t, .fromGroup(g.ptr))) any_signaled = true; + } + return any_signaled; + } + + /// The caller has canceled `g`. Inform any threads working on that group of the cancelation if + /// necessary, and wait for `g` to finish (indicated by `num_completed` being incremented from 0 + /// to 1), while sending regular signals to threads if necessary for them to unblock from any + /// cancelable syscalls. + /// + /// `skip_signals` means it is already known that no threads are currently working on the group + /// so no notifications or signals are necessary. + fn waitForCancelWithSignaling( + g: Group, + t: *Threaded, + num_completed: *std.atomic.Value(u32), + skip_signals: bool, + ) void { + var need_signal: bool = !skip_signals and g.cancelThreads(t); + var timeout_ns: u64 = 1 << 10; + while (true) { + need_signal = need_signal and g.signalAllCanceledSyscalls(t) and t.robust_cancel == .enabled; + Thread.futexWaitTimed( + null, + &num_completed.raw, + 0, + if (need_signal) timeout_ns else null, + ) catch |err| switch (err) { + error.Canceled => unreachable, + }; + switch (num_completed.load(.acquire)) { // acquire task results + 0 => {}, + 1 => break, + else => unreachable, + } + timeout_ns <<|= 1; + } + } +}; + +/// Trailing data: +/// 1. context +/// 2. result +const Future = struct { + runnable: Runnable, + func: *const fn (context: *const anyopaque, result: *anyopaque) void, + status: std.atomic.Value(Status), + /// On completion, increment this `u32` and do a futex wake on it. + awaiter: *std.atomic.Value(u32), + context_alignment: Alignment, + result_offset: usize, + alloc_len: usize, + + const Status = packed struct(usize) { + /// The values of this enum are chosen so that await/cancel can just OR with 0b01 and 0b11 + /// respectively. That *does* clobber `.done`, but that's actually fine, because if the tag + /// is `.done` then only the awaiter is referencing this `Future` anyway. + tag: enum(u2) { + /// The future is queued or running (depending on whether `thread` is set). + pending = 0b00, + /// Like `pending`, but the future is being awaited. `Future.awaiter` is populated. + pending_awaited = 0b01, + /// Like `pending`, but the future is being canceled. `Future.awaiter` is populated. + pending_canceled = 0b11, + /// The future has already completed. `thread` is `null`. + done = 0b10, + }, + /// When the future begins execution, this is atomically updated from `null` to the thread running the + /// `Future`, so that cancelation knows which thread to cancel. + thread: Thread.PackedPtr, + }; + + /// `Future.runnable.node` is `undefined` in the created `Future`. + fn create( + gpa: Allocator, + result_len: usize, + result_alignment: Alignment, + context: []const u8, + context_alignment: Alignment, + func: *const fn (context: *const anyopaque, result: *anyopaque) void, + ) Allocator.Error!*Future { + const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(Future); + const worst_case_context_offset = context_alignment.forward(@sizeOf(Future) + max_context_misalignment); + const worst_case_result_offset = result_alignment.forward(worst_case_context_offset + context.len); + const alloc_len = worst_case_result_offset + result_len; + + const future: *Future = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(Future), alloc_len))); + errdefer comptime unreachable; + + const actual_context_addr = context_alignment.forward(@intFromPtr(future) + @sizeOf(Future)); + const actual_result_addr = result_alignment.forward(actual_context_addr + context.len); + const actual_result_offset = actual_result_addr - @intFromPtr(future); + future.* = .{ + .runnable = .{ + .node = undefined, + .startFn = &start, + }, + .func = func, + .status = .init(.{ + .tag = .pending, + .thread = .null, + }), + .awaiter = undefined, + .context_alignment = context_alignment, + .result_offset = actual_result_offset, + .alloc_len = alloc_len, + }; + @memcpy(future.contextPointer()[0..context.len], context); + return future; + } + + fn destroy(future: *Future, gpa: Allocator) void { + const base: [*]align(@alignOf(Future)) u8 = @ptrCast(future); + gpa.free(base[0..future.alloc_len]); + } + + fn resultPointer(future: *Future) [*]u8 { + const base: [*]u8 = @ptrCast(future); + return base + future.result_offset; + } + + fn contextPointer(future: *Future) [*]u8 { + const base: [*]u8 = @ptrCast(future); + const context_offset = future.context_alignment.forward(@intFromPtr(future) + @sizeOf(Future)) - @intFromPtr(future); + return base + context_offset; + } + + fn start(r: *Runnable, thread: *Thread, t: *Threaded) void { + _ = t; + const future: *Future = @fieldParentPtr("runnable", r); + + thread.status.store(.{ + .cancelation = .none, + .awaitable = .fromFuture(future), + }, .monotonic); + { + const old_status = future.status.fetchOr(.{ + .tag = .pending, + .thread = .pack(thread), + }, .release); + assert(old_status.thread == .null); + switch (old_status.tag) { + .pending, .pending_awaited => {}, + .pending_canceled => thread.status.store(.{ + .cancelation = .canceling, + .awaitable = .fromFuture(future), + }, .monotonic), + .done => unreachable, + } + } + + future.func(future.contextPointer(), future.resultPointer()); + + thread.status.store(.{ .cancelation = .none, .awaitable = .null }, .monotonic); + const old_status = future.status.swap(.{ + .tag = .done, + .thread = .null, + }, .acq_rel); // acquire `future.awaiter`, release results + switch (old_status.tag) { + .pending => {}, + .pending_awaited, .pending_canceled => { + const to_signal = future.awaiter; + _ = to_signal.fetchAdd(1, .release); // release results + Thread.futexWake(&to_signal.raw, 1); + }, + .done => unreachable, + } + } + + /// The caller has canceled `future`. `thread` is the thread currently running that future. + /// Inform `thread` of the cancelation if necessary, and wait for `future` to finish (indicated + /// by `num_completed` being incremented from 0 to 1), while sending regular signals to `thread` + /// if necessary for it to unblock from a cancelable syscall. + fn waitForCancelWithSignaling( + future: *Future, + t: *Threaded, + num_completed: *std.atomic.Value(u32), + thread: ?*Thread, + ) void { + var need_signal: bool = thread != null and thread.?.cancelAwaitable(.fromFuture(future)); + var timeout_ns: u64 = 1 << 10; + while (true) { + need_signal = need_signal and thread.?.signalCanceledSyscall(t, .fromFuture(future)) and t.robust_cancel == .enabled; + Thread.futexWaitTimed( + null, + &num_completed.raw, + 0, + if (need_signal) timeout_ns else null, + ) catch |err| switch (err) { + error.Canceled => unreachable, + }; + switch (num_completed.load(.acquire)) { // acquire task results + 0 => {}, + 1 => break, + else => unreachable, + } + timeout_ns <<|= 1; + } + } +}; + +/// A sequence of (ptr_bit_width - 3) bits which uniquely identifies a group or future. The bits are +/// the MSBs of the `*Io.Group` or `*Future`. These things do not necessarily have 3 zero bits at +/// the end (they are pointer-aligned, so on 32-bit targets only have 2), but because they both have +/// a *size* of at least 8 bytes, no two groups/futures in memory at the same time will have the +/// same value for all of these bits. In other words, given a group/future pointer, the next group +/// or future must be at least 8 bytes later, so its address will have a different value for one of +/// the top (ptr_bit_width - 3) bits. +const AwaitableId = enum(@Int(.unsigned, @bitSizeOf(usize) - 3)) { + comptime { + assert(@sizeOf(Future) >= 8); + assert(@sizeOf(Io.Group) >= 8); + } + null = 0, + all_ones = std.math.maxInt(@Int(.unsigned, @bitSizeOf(usize) - 3)), + _, + const Split = packed struct(usize) { low: u3, high: AwaitableId }; + fn fromGroup(g: *Io.Group) AwaitableId { + const split: Split = @bitCast(@intFromPtr(g)); + return split.high; + } + fn fromFuture(f: *Future) AwaitableId { + const split: Split = @bitCast(@intFromPtr(f)); + return split.high; + } +}; + const Thread = struct { + next: ?*Thread, /// The value that needs to be passed to pthread_kill or tgkill in order to /// send a signal. - signal_id: SignaleeId, - current_closure: ?*Closure, - /// Only populated if `current_closure != null`. Indicates the current cancel protection mode. + signalee_id: SignaleeId, + + status: std.atomic.Value(Status), + cancel_protection: Io.CancelProtection, + const Status = packed struct(usize) { + /// The specific values of these enum fields are chosen to simplify the implementation of + /// the transformations we need to apply to this state. + cancelation: enum(u3) { + /// The thread has not yet been canceled, and is not in a cancelable operation. + /// To request cancelation, just set the status to `.canceling`. + none = 0b000, + + /// The thread is parked in a cancelable futex wait or sleep. + /// Only applicable on Windows, NetBSD, and Illumos. + /// To request cancelation, set the status to `.canceling` and unpark the thread. + /// To unpark for another reason (futex wake), set the status to `.none` and unpark the thread. + parked = 0b001, + + /// The thread is blocked in a cancelable system call. + /// To request cancelation, set the status to `.blocked_canceling` and repeatedly interrupt the system call until the status changes. + blocked = 0b011, + + /// Windows-only: the thread is blocked on a DNS query. + /// To request cancelation, set the status to `.canceling` and call `DnsCancelQuery`. + blocked_windows_dns = 0b010, + + /// The thread has an outstanding cancelation request but is not in a cancelable operation. + /// When it acknowledges the cancelation, it will set the status to `.canceled`. + canceling = 0b110, + + /// The thread has received and acknowledged a cancelation request. + /// If `recancel` is called, the status will revert to `.canceling`, but otherwise, the status + /// will not change for the remainder of this task's execution. + canceled = 0b111, + + /// The thread is blocked in a cancelable system call, and is being canceled. The thread which triggered the cancelation will send signals to this thread + /// until its status changes. + blocked_canceling = 0b101, + }, + + /// We cannot turn this value back into a pointer. Instead, it exists so that a task can be + /// canceled by a cmpxchg on thread status: if it is running the task we want to cancel, + /// then update the `cancelation` field. + awaitable: AwaitableId, + }; + const SignaleeId = if (std.Thread.use_pthreads) std.c.pthread_t else std.Thread.Id; threadlocal var current: ?*Thread = null; - fn getCurrent(t: *Threaded) *Thread { - return current orelse return &t.main_thread; - } - - fn checkCancel(thread: *Thread) error{Canceled}!void { - const closure = thread.current_closure orelse return; - + /// The thread is neither in a syscall nor entering one, but we want to check for cancelation + /// anyway. If there is a pending cancel request, acknowledge it and return `error.Canceled`. + fn checkCancel() Io.Cancelable!void { + const thread = Thread.current orelse return; switch (thread.cancel_protection) { - .unblocked => {}, .blocked => return, - } - - switch (@cmpxchgStrong( - CancelStatus, - &closure.cancel_status, - .requested, - .acknowledged, - .acq_rel, - .acquire, - ) orelse return error.Canceled) { - .requested => unreachable, - .acknowledged => unreachable, - .none, _ => {}, - } - } - - fn beginSyscall(thread: *Thread) error{Canceled}!void { - const closure = thread.current_closure orelse return; - - switch (thread.cancel_protection) { .unblocked => {}, - .blocked => return, } - - switch (@cmpxchgStrong( - CancelStatus, - &closure.cancel_status, - .none, - .fromSignaleeId(thread.signal_id), - .acq_rel, - .acquire, - ) orelse return) { - .none => unreachable, - .requested => { - @atomicStore(CancelStatus, &closure.cancel_status, .acknowledged, .release); + // Here, unlike `Syscall.checkCancel`, it's not particularly likely that we're canceled, so + // it seems preferable to do a cheap atomic load and, in the unlikely case, a separate store + // to acknowledge. Besides, the state transitions we need here can't be done with one atomic + // OR/AND/XOR on `Status.cancelation`, so we don't actually have any other option. + const status = thread.status.load(.monotonic); + switch (status.cancelation) { + .parked => unreachable, + .blocked => unreachable, + .blocked_windows_dns => unreachable, + .blocked_canceling => unreachable, + .none, .canceled => {}, + .canceling => { + thread.status.store(.{ + .cancelation = .canceled, + .awaitable = status.awaitable, + }, .monotonic); return error.Canceled; }, - .acknowledged => return, - _ => unreachable, } } - fn endSyscall(thread: *Thread) void { - const closure = thread.current_closure orelse return; - - switch (thread.cancel_protection) { - .unblocked => {}, - .blocked => return, - } - - _ = @cmpxchgStrong( - CancelStatus, - &closure.cancel_status, - .fromSignaleeId(thread.signal_id), - .none, - .acq_rel, - .acquire, - ) orelse return; - } - - fn endSyscallErrnoBug(thread: *Thread, err: posix.E) Io.UnexpectedError { - @branchHint(.cold); - thread.endSyscall(); - return errnoBug(err); - } - - fn endSyscallUnexpectedErrno(thread: *Thread, err: posix.E) Io.UnexpectedError { - @branchHint(.cold); - thread.endSyscall(); - return posix.unexpectedErrno(err); - } - - /// inline to make error return traces slightly shallower. - inline fn endSyscallError(thread: *Thread, err: anytype) @TypeOf(err) { - thread.endSyscall(); - return err; - } - - fn currentSignalId() SignaleeId { + fn currentSignaleeId() SignaleeId { return if (std.Thread.use_pthreads) std.c.pthread_self() else std.Thread.getCurrentId(); } @@ -262,10 +620,7 @@ const Thread = struct { } fn futexWait(thread: *Thread, ptr: *const u32, expect: u32) Io.Cancelable!void { - return Thread.futexWaitTimed(thread, ptr, expect, null) catch |err| switch (err) { - error.Canceled => return error.Canceled, - error.Timeout => unreachable, - }; + return Thread.futexWaitTimed(thread, ptr, expect, null); } fn futexWaitTimed(thread: ?*Thread, ptr: *const u32, expect: u32, timeout_ns: ?u64) Io.Cancelable!void { @@ -543,6 +898,191 @@ const Thread = struct { }, } } + + /// Cancels `thread` if it is working on `awaitable`. + /// + /// It is possible that `thread` gets canceled by this function, but is blocked in a syscall. In + /// that case, the thread may need to be sent a signal to interrupt the call. This function will + /// return `true` to indicate this, in which case the caller must call `signalCanceledSyscall`. + fn cancelAwaitable(thread: *Thread, awaitable: AwaitableId) bool { + var status = thread.status.load(.monotonic); + while (true) { + if (status.awaitable != awaitable) return false; // thread is working on something else + status = switch (status.cancelation) { + .none => thread.status.cmpxchgWeak( + .{ .cancelation = .none, .awaitable = awaitable }, + .{ .cancelation = .canceling, .awaitable = awaitable }, + .monotonic, + .monotonic, + ) orelse return false, + + .parked => thread.status.cmpxchgWeak( + .{ .cancelation = .parked, .awaitable = awaitable }, + .{ .cancelation = .canceling, .awaitable = awaitable }, + .monotonic, + .monotonic, + ) orelse { + if (true) @panic("MLUGG TODO: unpark thread"); + return false; + }, + + .blocked => thread.status.cmpxchgWeak( + .{ .cancelation = .blocked, .awaitable = awaitable }, + .{ .cancelation = .blocked_canceling, .awaitable = awaitable }, + .monotonic, + .monotonic, + ) orelse return true, + + .blocked_windows_dns => thread.status.cmpxchgWeak( + .{ .cancelation = .blocked_windows_dns, .awaitable = awaitable }, + .{ .cancelation = .canceling, .awaitable = awaitable }, + .monotonic, + .monotonic, + ) orelse return false, + + .canceling, .canceled => { + // This can happen when the task start raced with the cancelation, so the thread + // saw the cancelation on the future/group *and* we are trying to signal the + // thread here. + return false; + }, + + .blocked_canceling => unreachable, + }; + } + } + + /// Sends a signal to `thread` if it is still blocked in a syscall (i.e. has not yet observed + /// the cancelation request from `cancelAwaitable`). + /// + /// Unfortunately, the signal could arrive before the syscall actually starts, so the interrupt + /// is missed. To handle this, we may need to send multiple signals. As such, if this function + /// returns `true`, then it should be called again after a short delay to send another signal if + /// the thread is still blocked. For the implementation, `Future.waitForCancelWithSignaling` and + /// `Group.waitForCancelWithSignaling`: they use exponential backoff starting at a 1us delay and + /// doubling each call. In practice, it is rare to send more than one signal. + fn signalCanceledSyscall(thread: *Thread, t: *Threaded, awaitable: AwaitableId) bool { + const bad_status: Status = .{ .cancelation = .blocked_canceling, .awaitable = awaitable }; + if (thread.status.load(.monotonic) != bad_status) return false; + + // The thread ID can be read non-atomically because it never changes and was released by the + // store that made `thread` available to us. + const signalee_id = thread.signalee_id; + + if (std.Thread.use_pthreads) { + if (std.c.pthread_kill(signalee_id, .IO) != 0) return false; + } else if (native_os == .linux) { + const pid: posix.pid_t = pid: { + const cached_pid = @atomicLoad(Pid, &t.pid, .monotonic); + if (cached_pid != .unknown) break :pid @intFromEnum(cached_pid); + const pid = std.os.linux.getpid(); + @atomicStore(Pid, &t.pid, @enumFromInt(pid), .monotonic); + break :pid pid; + }; + if (std.os.linux.tgkill(pid, @bitCast(signalee_id), .IO) != 0) return false; + } else { + @compileError("MLUGG TODO"); + } + + return true; + } + + /// Like a `*Thread`, but 2 bits smaller than a pointer (because the LSBs are always 0 due to + /// alignment) so that those two bits can be used in a `packed struct`. + const PackedPtr = enum(@Int(.unsigned, @bitSizeOf(usize) - 2)) { + null = 0, + all_ones = std.math.maxInt(@Int(.unsigned, @bitSizeOf(usize) - 2)), + _, + + const Split = packed struct(usize) { low: u2, high: PackedPtr }; + fn pack(ptr: *Thread) PackedPtr { + const split: Split = @bitCast(@intFromPtr(ptr)); + assert(split.low == 0); + return split.high; + } + fn unpack(ptr: PackedPtr) ?*Thread { + const split: Split = .{ .low = 0, .high = ptr }; + return @ptrFromInt(@as(usize, @bitCast(split))); + } + }; +}; + +const Syscall = struct { + thread: ?*Thread, + /// Marks entry to a syscall region. This should be tightly scoped around the actual syscall + /// to minimize races. The syscall must be marked as "finished" by `checkCancel`, `finish`, + /// or one of the wrappers of `finish`. + fn start() Io.Cancelable!Syscall { + const thread = Thread.current orelse return .{ .thread = null }; + switch (thread.cancel_protection) { + .blocked => return .{ .thread = null }, + .unblocked => {}, + } + switch (thread.status.fetchOr(.{ + .cancelation = @enumFromInt(0b011), + .awaitable = .null, + }, .monotonic).cancelation) { + .parked => unreachable, + .blocked => unreachable, + .blocked_windows_dns => unreachable, + .blocked_canceling => unreachable, + .none => return .{ .thread = thread }, // new status is `.blocked` + .canceling => return error.Canceled, // new status is `.canceled` + .canceled => return .{ .thread = null }, // new status is `.canceled` (unchanged) + } + } + /// Checks whether this syscall has been canceled. This should be called when a syscall is + /// interrupted through a mechanism which may indicate cancelation, or may be spurious. If + /// the syscall was canceled, it is finished and `error.Canceled` is returned. Otherwise, + /// the syscall is not marked finished, and the caller should retry. + fn checkCancel(s: Syscall) Io.Cancelable!void { + const thread = s.thread orelse return; + switch (thread.status.fetchOr(.{ + .cancelation = @enumFromInt(0b010), + .awaitable = .null, + }, .monotonic).cancelation) { + .none => unreachable, + .parked => unreachable, + .blocked_windows_dns => unreachable, + .canceling => unreachable, + .canceled => unreachable, + .blocked => {}, // new status is `.blocked` (unchanged) + .blocked_canceling => return error.Canceled, // new status is `.canceled` + } + } + /// Marks this syscall as finished. + fn finish(s: Syscall) void { + const thread = s.thread orelse return; + switch (thread.status.fetchXor(.{ + .cancelation = @enumFromInt(0b011), + .awaitable = .null, + }, .monotonic).cancelation) { + .none => unreachable, + .parked => unreachable, + .blocked_windows_dns => unreachable, + .canceling => unreachable, + .canceled => unreachable, + .blocked => {}, // new status is `.none` + .blocked_canceling => {}, // new status is `.canceling` + } + } + /// Convenience wrapper which calls `finish`, then returns `err`. + fn fail(s: Syscall, err: anytype) @TypeOf(err) { + s.finish(); + return err; + } + /// Convenience wrapper which calls `finish`, then calls `Threaded.errnoBug`. + fn errnoBug(s: Syscall, err: posix.E) Io.UnexpectedError { + @branchHint(.cold); + s.finish(); + return Threaded.errnoBug(err); + } + /// Convenience wrapper which calls `finish`, then calls `posix.unexpectedErrno`. + fn unexpectedErrno(s: Syscall, err: posix.E) Io.UnexpectedError { + @branchHint(.cold); + s.finish(); + return posix.unexpectedErrno(err); + } }; const max_iovecs_len = 8; @@ -552,114 +1092,6 @@ comptime { if (@TypeOf(posix.IOV_MAX) != void) assert(max_iovecs_len <= posix.IOV_MAX); } -const CancelStatus = enum(usize) { - /// Cancellation has neither been requested, nor checked. The async - /// operation will check status before entering a blocking syscall. - /// This is also the status used for uninteruptible tasks. - none = 0, - /// Cancellation has been requested and the status will be checked before - /// entering a blocking syscall. - requested = std.math.maxInt(usize) - 1, - /// Cancellation has been acknowledged and is in progress. Signals should - /// not be sent. - acknowledged = std.math.maxInt(usize), - /// Stores a `Thread.SignaleeId` and indicates that sending a signal to this thread - /// is needed in order to cancel. This state is set before going into - /// a blocking operation that needs to get unblocked via signal. - _, - - const Unpacked = union(enum) { - none, - requested, - acknowledged, - signal_id: Thread.SignaleeId, - }; - - fn unpack(cs: CancelStatus) Unpacked { - return switch (cs) { - .none => .none, - .requested => .requested, - .acknowledged => .acknowledged, - _ => |signal_id| .{ - .signal_id = if (std.Thread.use_pthreads) - @ptrFromInt(@intFromEnum(signal_id)) - else - @truncate(@intFromEnum(signal_id)), - }, - }; - } - - fn fromSignaleeId(signal_id: Thread.SignaleeId) CancelStatus { - return if (std.Thread.use_pthreads) - @enumFromInt(@intFromPtr(signal_id)) - else - @enumFromInt(signal_id); - } -}; - -const Closure = struct { - start: Start, - node: std.SinglyLinkedList.Node = .{}, - cancel_status: CancelStatus, - - const Start = *const fn (*Closure, *Threaded) void; - - fn requestCancel(closure: *Closure, t: *Threaded) void { - var signal_id = switch (@atomicRmw(CancelStatus, &closure.cancel_status, .Xchg, .requested, .monotonic).unpack()) { - .none, .acknowledged, .requested => return, - .signal_id => |signal_id| signal_id, - }; - // The task will enter a blocking syscall before checking for cancellation again. - // We can send a signal to interrupt the syscall, but if it arrives before - // the syscall instruction, it will be missed. Therefore, this code tries - // again until the cancellation request is acknowledged. - - // 1 << 10 ns is about 1 microsecond, approximately syscall overhead. - // 1 << 20 ns is about 1 millisecond. - // 1 << 30 ns is about 1 second. - // - // On a heavily loaded Linux 6.17.5, I observed a maximum of 20 - // attempts not acknowledged before the timeout (including exponential - // backoff) was sufficient, despite the heavy load. - const max_attempts = 22; - - for (0..max_attempts) |attempt_index| { - if (std.Thread.use_pthreads) { - if (std.c.pthread_kill(signal_id, .IO) != 0) return; - } else if (native_os == .linux) { - const pid: posix.pid_t = p: { - const cached_pid = @atomicLoad(Pid, &t.pid, .monotonic); - if (cached_pid != .unknown) break :p @intFromEnum(cached_pid); - const pid = std.os.linux.getpid(); - @atomicStore(Pid, &t.pid, @enumFromInt(pid), .monotonic); - break :p pid; - }; - if (std.os.linux.tgkill(pid, @bitCast(signal_id), .IO) != 0) return; - } else { - return; - } - - if (t.robust_cancel != .enabled) return; - - var timespec: posix.timespec = .{ - .sec = 0, - .nsec = @as(isize, 1) << @intCast(attempt_index), - }; - if (native_os == .linux) { - _ = std.os.linux.clock_nanosleep(posix.CLOCK.MONOTONIC, .{ .ABSTIME = false }, ×pec, ×pec); - } else { - _ = posix.system.nanosleep(×pec, ×pec); - } - - switch (@atomicRmw(CancelStatus, &closure.cancel_status, .Xchg, .requested, .monotonic).unpack()) { - .requested => continue, // Retry needed in case other thread hasn't yet entered the syscall. - .none, .acknowledged => return, - .signal_id => |new_signal_id| signal_id = new_signal_id, - } - } - } -}; - pub const InitOptions = struct { /// Affects how many bytes are memory-mapped for threads. stack_size: usize = std.Thread.SpawnConfig.default_stack_size, @@ -727,14 +1159,10 @@ pub fn init( .old_sig_io = undefined, .old_sig_pipe = undefined, .have_signal_handler = false, - .main_thread = .{ - .signal_id = Thread.currentSignalId(), - .current_closure = null, - .cancel_protection = .unblocked, - }, .argv0 = options.argv0, .environ = options.environ, .robust_cancel = options.robust_cancel, + .worker_threads = .init(null), }; if (posix.Sigaction != void) { @@ -768,14 +1196,10 @@ pub const init_single_threaded: Threaded = .{ .old_sig_io = undefined, .old_sig_pipe = undefined, .have_signal_handler = false, - .main_thread = .{ - .signal_id = undefined, - .current_closure = null, - .cancel_protection = .unblocked, - }, .robust_cancel = .disabled, .argv0 = .{}, .environ = .{}, + .worker_threads = .init(null), }; var global_single_threaded_instance: Threaded = .init_single_threaded; @@ -822,22 +1246,40 @@ fn join(t: *Threaded) void { fn worker(t: *Threaded) void { var thread: Thread = .{ - .signal_id = Thread.currentSignalId(), - .current_closure = null, + .next = undefined, + .signalee_id = Thread.currentSignaleeId(), + .status = .init(.{ + .cancelation = .none, + .awaitable = .null, + }), .cancel_protection = .unblocked, }; Thread.current = &thread; + { + var head = t.worker_threads.load(.monotonic); + while (true) { + thread.next = head; + head = t.worker_threads.cmpxchgWeak( + head, + &thread, + .release, + .monotonic, + ) orelse break; + } + } + defer t.wait_group.finish(); t.mutex.lock(); defer t.mutex.unlock(); while (true) { - while (t.run_queue.popFirst()) |closure_node| { + while (t.run_queue.popFirst()) |runnable_node| { t.mutex.unlock(); - const closure: *Closure = @fieldParentPtr("node", closure_node); - closure.start(closure, t); + thread.cancel_protection = .unblocked; + const runnable: *Runnable = @fieldParentPtr("node", runnable_node); + runnable.startFn(runnable, &thread, t); t.mutex.lock(); t.busy_count -= 1; } @@ -1145,103 +1587,6 @@ const linux_copy_file_range_use_c = std.c.versionCheck(if (builtin.abi.isAndroid }); const linux_copy_file_range_sys = if (linux_copy_file_range_use_c) std.c else std.os.linux; -/// Trailing data: -/// 1. context -/// 2. result -const AsyncClosure = struct { - closure: Closure, - func: *const fn (context: *anyopaque, result: *anyopaque) void, - event: Io.Event, - select_condition: ?*Io.Event, - context_alignment: Alignment, - result_offset: usize, - alloc_len: usize, - - const done_event: *Io.Event = @ptrFromInt(@alignOf(Io.Event)); - - fn start(closure: *Closure, t: *Threaded) void { - const ac: *AsyncClosure = @alignCast(@fieldParentPtr("closure", closure)); - const current_thread = Thread.getCurrent(t); - - current_thread.current_closure = closure; - current_thread.cancel_protection = .unblocked; - - ac.func(ac.contextPointer(), ac.resultPointer()); - - current_thread.current_closure = null; - current_thread.cancel_protection = undefined; - - if (@atomicRmw(?*Io.Event, &ac.select_condition, .Xchg, done_event, .release)) |select_event| { - assert(select_event != done_event); - select_event.set(ioBasic(t)); - } - ac.event.set(ioBasic(t)); - } - - fn resultPointer(ac: *AsyncClosure) [*]u8 { - const base: [*]u8 = @ptrCast(ac); - return base + ac.result_offset; - } - - fn contextPointer(ac: *AsyncClosure) [*]u8 { - const base: [*]u8 = @ptrCast(ac); - const context_offset = ac.context_alignment.forward(@intFromPtr(ac) + @sizeOf(AsyncClosure)) - @intFromPtr(ac); - return base + context_offset; - } - - fn init( - gpa: Allocator, - result_len: usize, - result_alignment: Alignment, - context: []const u8, - context_alignment: Alignment, - func: *const fn (context: *const anyopaque, result: *anyopaque) void, - ) Allocator.Error!*AsyncClosure { - const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(AsyncClosure); - const worst_case_context_offset = context_alignment.forward(@sizeOf(AsyncClosure) + max_context_misalignment); - const worst_case_result_offset = result_alignment.forward(worst_case_context_offset + context.len); - const alloc_len = worst_case_result_offset + result_len; - - const ac: *AsyncClosure = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(AsyncClosure), alloc_len))); - errdefer comptime unreachable; - - const actual_context_addr = context_alignment.forward(@intFromPtr(ac) + @sizeOf(AsyncClosure)); - const actual_result_addr = result_alignment.forward(actual_context_addr + context.len); - const actual_result_offset = actual_result_addr - @intFromPtr(ac); - ac.* = .{ - .closure = .{ - .cancel_status = .none, - .start = start, - }, - .func = func, - .context_alignment = context_alignment, - .result_offset = actual_result_offset, - .alloc_len = alloc_len, - .event = .unset, - .select_condition = null, - }; - @memcpy(ac.contextPointer()[0..context.len], context); - return ac; - } - - fn waitAndDeinit(ac: *AsyncClosure, t: *Threaded, result: []u8) void { - ac.event.wait(ioBasic(t)) catch |err| switch (err) { - error.Canceled => { - ac.closure.requestCancel(t); - ac.event.waitUncancelable(ioBasic(t)); - recancel(t); - }, - }; - @memcpy(result, ac.resultPointer()[0..result.len]); - ac.deinit(t.allocator); - } - - fn deinit(ac: *AsyncClosure, gpa: Allocator) void { - const base: [*]align(@alignOf(AsyncClosure)) u8 = @ptrCast(ac); - gpa.free(base[0..ac.alloc_len]); - } -}; - fn async( userdata: ?*anyopaque, result: []u8, @@ -1255,10 +1600,13 @@ fn async( start(context.ptr, result.ptr); return null; } + const gpa = t.allocator; - const ac = AsyncClosure.init(gpa, result.len, result_alignment, context, context_alignment, start) catch { - start(context.ptr, result.ptr); - return null; + const future = Future.create(gpa, result.len, result_alignment, context, context_alignment, start) catch |err| switch (err) { + error.OutOfMemory => { + start(context.ptr, result.ptr); + return null; + }, }; t.mutex.lock(); @@ -1267,7 +1615,7 @@ fn async( if (busy_count >= @intFromEnum(t.async_limit)) { t.mutex.unlock(); - ac.deinit(gpa); + future.destroy(gpa); start(context.ptr, result.ptr); return null; } @@ -1281,17 +1629,18 @@ fn async( t.wait_group.finish(); t.busy_count = busy_count; t.mutex.unlock(); - ac.deinit(gpa); + future.destroy(gpa); start(context.ptr, result.ptr); return null; }; thread.detach(); } - t.run_queue.prepend(&ac.closure.node); + t.run_queue.prepend(&future.runnable.node); + t.mutex.unlock(); t.cond.signal(); - return @ptrCast(ac); + return @ptrCast(future); } fn concurrent( @@ -1307,9 +1656,10 @@ fn concurrent( const t: *Threaded = @ptrCast(@alignCast(userdata)); const gpa = t.allocator; - const ac = AsyncClosure.init(gpa, result_len, result_alignment, context, context_alignment, start) catch - return error.ConcurrencyUnavailable; - errdefer ac.deinit(gpa); + const future = Future.create(gpa, result_len, result_alignment, context, context_alignment, start) catch |err| switch (err) { + error.OutOfMemory => return error.ConcurrencyUnavailable, + }; + errdefer future.destroy(gpa); t.mutex.lock(); defer t.mutex.unlock(); @@ -1329,110 +1679,32 @@ fn concurrent( const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch return error.ConcurrencyUnavailable; + thread.detach(); } - t.run_queue.prepend(&ac.closure.node); + t.run_queue.prepend(&future.runnable.node); + t.cond.signal(); - return @ptrCast(ac); + return @ptrCast(future); } -const GroupClosure = struct { - closure: Closure, - group: *Io.Group, - /// Points to sibling `GroupClosure`. Used for walking the group to cancel all. - node: std.SinglyLinkedList.Node, - func: *const fn (*Io.Group, context: *anyopaque) Io.Cancelable!void, - context_alignment: Alignment, - alloc_len: usize, - - fn start(closure: *Closure, t: *Threaded) void { - const gc: *GroupClosure = @alignCast(@fieldParentPtr("closure", closure)); - const current_thread = Thread.getCurrent(t); - const group = gc.group; - const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); - const event: *Io.Event = @ptrCast(&group.context); - current_thread.current_closure = closure; - current_thread.cancel_protection = .unblocked; - - assertResult(closure, gc.func(group, gc.contextPointer())); - - current_thread.current_closure = null; - current_thread.cancel_protection = undefined; - - const prev_state = group_state.fetchSub(sync_one_pending, .acq_rel); - assert((prev_state / sync_one_pending) > 0); - if (prev_state == (sync_one_pending | sync_is_waiting)) event.set(ioBasic(t)); - } - - fn assertResult(closure: *Closure, result: Io.Cancelable!void) void { - if (result) |_| switch (closure.cancel_status.unpack()) { - .none, .requested => {}, - .acknowledged => unreachable, // task illegally swallowed error.Canceled - .signal_id => unreachable, - } else |err| switch (err) { - error.Canceled => assert(closure.cancel_status == .acknowledged), - } - } - - fn contextPointer(gc: *GroupClosure) [*]u8 { - const base: [*]u8 = @ptrCast(gc); - const context_offset = gc.context_alignment.forward(@intFromPtr(gc) + @sizeOf(GroupClosure)) - @intFromPtr(gc); - return base + context_offset; - } - - /// Does not initialize the `node` field. - fn init( - gpa: Allocator, - group: *Io.Group, - context: []const u8, - context_alignment: Alignment, - func: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void, - ) Allocator.Error!*GroupClosure { - const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(GroupClosure); - const worst_case_context_offset = context_alignment.forward(@sizeOf(GroupClosure) + max_context_misalignment); - const alloc_len = worst_case_context_offset + context.len; - - const gc: *GroupClosure = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(GroupClosure), alloc_len))); - errdefer comptime unreachable; - - gc.* = .{ - .closure = .{ - .cancel_status = .none, - .start = start, - }, - .group = group, - .node = undefined, - .func = func, - .context_alignment = context_alignment, - .alloc_len = alloc_len, - }; - @memcpy(gc.contextPointer()[0..context.len], context); - return gc; - } - - fn deinit(gc: *GroupClosure, gpa: Allocator) void { - const base: [*]align(@alignOf(GroupClosure)) u8 = @ptrCast(gc); - gpa.free(base[0..gc.alloc_len]); - } - - const sync_is_waiting: usize = 1 << 0; - const sync_one_pending: usize = 1 << 1; -}; - fn groupAsync( userdata: ?*anyopaque, - group: *Io.Group, + type_erased: *Io.Group, context: []const u8, context_alignment: Alignment, start: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void, ) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); - if (builtin.single_threaded) return start(group, context.ptr) catch unreachable; + const g: Group = .{ .ptr = type_erased }; + + if (builtin.single_threaded) return start(g.ptr, context.ptr) catch unreachable; const gpa = t.allocator; - const gc = GroupClosure.init(gpa, group, context, context_alignment, start) catch - return t.assertGroupResult(start(group, context.ptr)); + const task = Group.Task.create(gpa, g, context, context_alignment, start) catch |err| switch (err) { + error.OutOfMemory => return t.assertGroupResult(start(g.ptr, context.ptr)), + }; t.mutex.lock(); @@ -1440,8 +1712,8 @@ fn groupAsync( if (busy_count >= @intFromEnum(t.async_limit)) { t.mutex.unlock(); - gc.deinit(gpa); - return t.assertGroupResult(start(group, context.ptr)); + task.destroy(gpa); + return t.assertGroupResult(start(g.ptr, context.ptr)); } t.busy_count = busy_count + 1; @@ -1453,37 +1725,48 @@ fn groupAsync( t.wait_group.finish(); t.busy_count = busy_count; t.mutex.unlock(); - gc.deinit(gpa); - return t.assertGroupResult(start(group, context.ptr)); + task.destroy(gpa); + return t.assertGroupResult(start(g.ptr, context.ptr)); }; thread.detach(); } - // Append to the group linked list inside the mutex to make `Io.Group.async` thread-safe. - gc.node = .{ .next = @ptrCast(@alignCast(group.token.load(.monotonic))) }; - group.token.store(&gc.node, .monotonic); - - t.run_queue.prepend(&gc.closure.node); - - // This needs to be done before unlocking the mutex to avoid a race with - // the associated task finishing. - const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); - const prev_state = group_state.fetchAdd(GroupClosure.sync_one_pending, .monotonic); - assert((prev_state / GroupClosure.sync_one_pending) < (std.math.maxInt(usize) / GroupClosure.sync_one_pending)); + // TODO: if this logic is changed to be lock-free, this `fetchAdd` must be released by the queue + // prepend so that the task doesn't finish without observing this and try to decrement the count + // below zero. + _ = g.status().fetchAdd(.{ + .num_running = 1, + .have_awaiter = false, + .canceled = false, + }, .monotonic); + t.run_queue.prepend(&task.runnable.node); t.mutex.unlock(); t.cond.signal(); } -fn assertGroupResult(t: *Threaded, result: Io.Cancelable!void) void { - const current_thread: *Thread = .getCurrent(t); - const current_closure = current_thread.current_closure orelse return; - GroupClosure.assertResult(current_closure, result); +fn assertGroupResult(result: Io.Cancelable!void) void { + const cancel_acknowledged = if (Thread.current) |thread| + switch (thread.status.load(.monotonic).cancelation) { + .none, .canceling => false, + .canceled => true, + .parked => unreachable, + .blocked => unreachable, + .blocked_windows_dns => unreachable, + .blocked_canceling => unreachable, + } + else + false; + if (result) { + assert(!cancel_acknowledged); // group task acknowledged cancelation but did not return `error.Canceled` + } else |err| switch (err) { + error.Canceled => assert(cancel_acknowledged), // group task returned `error.Canceled` but was never canceled + } } fn groupConcurrent( userdata: ?*anyopaque, - group: *Io.Group, + type_erased: *Io.Group, context: []const u8, context_alignment: Alignment, start: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void, @@ -1491,10 +1774,13 @@ fn groupConcurrent( if (builtin.single_threaded) return error.ConcurrencyUnavailable; const t: *Threaded = @ptrCast(@alignCast(userdata)); + const g: Group = .{ .ptr = type_erased }; const gpa = t.allocator; - const gc = GroupClosure.init(gpa, group, context, context_alignment, start) catch - return error.ConcurrencyUnavailable; + const task = Group.Task.create(gpa, g, context, context_alignment, start) catch |err| switch (err) { + error.OutOfMemory => return error.ConcurrencyUnavailable, + }; + errdefer task.destroy(gpa); t.mutex.lock(); defer t.mutex.unlock(); @@ -1514,102 +1800,126 @@ fn groupConcurrent( const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch return error.ConcurrencyUnavailable; + thread.detach(); } - // Append to the group linked list inside the mutex to make `Io.Group.concurrent` thread-safe. - gc.node = .{ .next = @ptrCast(@alignCast(group.token.load(.monotonic))) }; - group.token.store(&gc.node, .monotonic); - - t.run_queue.prepend(&gc.closure.node); - - // This needs to be done before unlocking the mutex to avoid a race with - // the associated task finishing. - const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); - const prev_state = group_state.fetchAdd(GroupClosure.sync_one_pending, .monotonic); - assert((prev_state / GroupClosure.sync_one_pending) < (std.math.maxInt(usize) / GroupClosure.sync_one_pending)); + // TODO: if this logic is changed to be lock-free, this `fetchAdd` must be released by the queue + // prepend so that the task doesn't finish without observing this and try to decrement the count + // below zero. + _ = g.status().fetchAdd(.{ + .num_running = 1, + .have_awaiter = false, + .canceled = false, + }, .monotonic); + t.run_queue.prepend(&task.runnable.node); t.cond.signal(); } -fn groupAwait(userdata: ?*anyopaque, group: *Io.Group, initial_token: *anyopaque) Io.Cancelable!void { - const t: *Threaded = @ptrCast(@alignCast(userdata)); - const gpa = t.allocator; - +fn groupAwait(userdata: ?*anyopaque, type_erased: *Io.Group, initial_token: *anyopaque) Io.Cancelable!void { _ = initial_token; // we need to load `token` *after* the group finishes + const t: *Threaded = @ptrCast(@alignCast(userdata)); + const g: Group = .{ .ptr = type_erased }; + const thread: *Thread = .getCurrent(t); - if (builtin.single_threaded) unreachable; // we never set `group.token` to non-`null` + var num_completed: std.atomic.Value(u32) = .init(0); + g.awaiter().* = &num_completed; - const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); - const event: *Io.Event = @ptrCast(&group.context); - const prev_state = group_state.fetchAdd(GroupClosure.sync_is_waiting, .acquire); - assert(prev_state & GroupClosure.sync_is_waiting == 0); - { - errdefer _ = group_state.fetchSub(GroupClosure.sync_is_waiting, .monotonic); - // This event.wait can return error.Canceled, in which case this logic does - // *not* propagate cancel requests to each group member. Instead, the user - // code will likely do this with a defered call to groupCancel, or, - // intentionally not do this. - if ((prev_state / GroupClosure.sync_one_pending) > 0) try event.wait(ioBasic(t)); + const pre_await_status = g.status().fetchOr(.{ + .num_running = 0, + .have_awaiter = true, + .canceled = false, + }, .acq_rel); // acquire results if complete; release `g.awaiter()` + + assert(!pre_await_status.have_awaiter); + assert(!pre_await_status.canceled); + if (pre_await_status.num_running == 0) { + // Already done. Since the group is finished, it's illegal to spawn more tasks in it + // until we return, so we can access `g.status()` non-atomically. + g.status().raw.have_awaiter = false; + return; } - // Since the group has now finished, it's illegal to add more tasks to it until we return. It's - // also illegal for us to race with another `await` or `cancel`. Therefore, we must be the only - // thread who can access `group` right now. - var it: ?*std.SinglyLinkedList.Node = @ptrCast(@alignCast(group.token.raw)); - group.token.raw = null; - while (it) |node| { - it = node.next; // update `it` now, because `deinit` will invalidate `node` - const gc: *GroupClosure = @fieldParentPtr("node", node); - gc.deinit(gpa); + while (thread.futexWait(&num_completed.raw, 0)) { + switch (num_completed.load(.acquire)) { // acquire task results + 0 => continue, + 1 => break, + else => unreachable, // group was reused before `await` returned + } + } else |err| switch (err) { + error.Canceled => { + const pre_cancel_status = g.status().fetchOr(.{ + .num_running = 0, + .have_awaiter = false, + .canceled = true, + }, .acq_rel); // acquire results if complete; release `g.awaiter()` + assert(pre_cancel_status.have_awaiter); + assert(!pre_cancel_status.canceled); + + // Even if `pre_cancel_status.num_running == 0`, we still need to wait for the signal, + // because in that case the last member of the group is already trying to modify it. + // However, if we know everything is done, we *can* skip signaling blocked threads. + const skip_signals = pre_cancel_status.num_running == 0; + g.waitForCancelWithSignaling(t, &num_completed, skip_signals); + + // The group is finished, so it's illegal to spawn more tasks in it until we return, so + // we can access `g.status()` non-atomically. + g.status().raw.canceled = false; + g.status().raw.have_awaiter = false; + return error.Canceled; + }, } + + // The group is finished, so it's illegal to spawn more tasks in it until we return, so + // we can access `g.status()` non-atomically. + g.status().raw.have_awaiter = false; } -fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, initial_token: *anyopaque) void { +fn groupCancel(userdata: ?*anyopaque, type_erased: *Io.Group, initial_token: *anyopaque) void { + _ = initial_token; const t: *Threaded = @ptrCast(@alignCast(userdata)); - const gpa = t.allocator; + const g: Group = .{ .ptr = type_erased }; - _ = initial_token; // we need to load `token` *after* the group finishes + var num_completed: std.atomic.Value(u32) = .init(0); + g.awaiter().* = &num_completed; - if (builtin.single_threaded) unreachable; // we never set `group.token` to non-`null` + const pre_cancel_status = g.status().fetchOr(.{ + .num_running = 0, + .have_awaiter = true, + .canceled = true, + }, .acq_rel); // acquire results if complete; release `g.awaiter()` - { - var it: ?*std.SinglyLinkedList.Node = @ptrCast(@alignCast(group.token.load(.monotonic))); - while (it) |node| : (it = node.next) { - const gc: *GroupClosure = @fieldParentPtr("node", node); - gc.closure.requestCancel(t); - } + assert(!pre_cancel_status.have_awaiter); + assert(!pre_cancel_status.canceled); + if (pre_cancel_status.num_running == 0) { + // Already done. Since the group is finished, it's illegal to spawn more tasks in it + // until we return, so we can access `g.status()` non-atomically. + g.status().raw.have_awaiter = false; + g.status().raw.canceled = false; + return; } - const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); - const event: *Io.Event = @ptrCast(&group.context); - const prev_state = group_state.fetchAdd(GroupClosure.sync_is_waiting, .acquire); - assert(prev_state & GroupClosure.sync_is_waiting == 0); - if ((prev_state / GroupClosure.sync_one_pending) > 0) event.waitUncancelable(ioBasic(t)); + g.waitForCancelWithSignaling(t, &num_completed, false); - // Since the group has now finished, it's illegal to add more tasks to it until we return. It's - // also illegal for us to race with another `await` or `cancel`. Therefore, we must be the only - // thread who can access `group` right now. - var it: ?*std.SinglyLinkedList.Node = @ptrCast(@alignCast(group.token.raw)); - group.token.raw = null; - while (it) |node| { - it = node.next; // update `it` now, because `deinit` will invalidate `node` - const gc: *GroupClosure = @fieldParentPtr("node", node); - gc.deinit(gpa); - } + g.status().raw = .{ .num_running = 0, .have_awaiter = false, .canceled = false }; } fn recancel(userdata: ?*anyopaque) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); const current_thread: *Thread = .getCurrent(t); - const cancel_status = ¤t_thread.current_closure.?.cancel_status; - switch (@atomicLoad(CancelStatus, cancel_status, .monotonic)) { - .none => unreachable, // called `recancel` when not canceled - .requested => unreachable, // called `recancel` when cancelation was already outstanding - .acknowledged => {}, - _ => unreachable, // invalid state: not in a syscall + switch (current_thread.status.fetchXor(.{ + .cancelation = @enumFromInt(0b001), + .awaitable = .null, + }, .monotonic).cancelation) { + .canceled => {}, + .none => unreachable, // called `recancel` but was not canceled + .canceling => unreachable, // called `recancel` but cancelation was already pending + .parked => unreachable, + .blocked => unreachable, + .blocked_windows_dns => unreachable, + .blocked_canceling => unreachable, } - @atomicStore(CancelStatus, cancel_status, .requested, .monotonic); } fn swapCancelProtection(userdata: ?*anyopaque, new: Io.CancelProtection) Io.CancelProtection { @@ -1622,7 +1932,8 @@ fn swapCancelProtection(userdata: ?*anyopaque, new: Io.CancelProtection) Io.Canc fn checkCancel(userdata: ?*anyopaque) Io.Cancelable!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); - return Thread.getCurrent(t).checkCancel(); + _ = t; + return Thread.checkCancel(); } fn await( @@ -1633,8 +1944,51 @@ fn await( ) void { _ = result_alignment; const t: *Threaded = @ptrCast(@alignCast(userdata)); - const closure: *AsyncClosure = @ptrCast(@alignCast(any_future)); - closure.waitAndDeinit(t, result); + const future: *Future = @ptrCast(@alignCast(any_future)); + const thread: *Thread = .getCurrent(t); + + var num_completed: std.atomic.Value(u32) = .init(0); + future.awaiter = &num_completed; + + const pre_await_status = future.status.fetchOr(.{ + .tag = .pending_awaited, + .thread = .null, + }, .acq_rel); // acquire results if complete; release `future.awaiter` + switch (pre_await_status.tag) { + .pending => while (thread.futexWait(&num_completed.raw, 0)) { + switch (num_completed.load(.acquire)) { // acquire task results + 0 => continue, + 1 => break, + else => unreachable, // group was reused before `await` returned + } + } else |err| switch (err) { + error.Canceled => { + const pre_cancel_status = future.status.fetchOr(.{ + .tag = .pending_canceled, + .thread = .null, + }, .acq_rel); // acquire results if complete; release `future.awaiter` + switch (pre_cancel_status.tag) { + .pending => unreachable, // invalid state: we already awaited + .pending_awaited => { + const working_thread = pre_cancel_status.thread.unpack(); + future.waitForCancelWithSignaling(t, &num_completed, @alignCast(working_thread)); + }, + .pending_canceled => unreachable, // `await` raced with `cancel` + .done => { + // The task just finished, but we still need to wait for the signal, because the + // task thread already figured out that they need to update `future.awaiter`. + future.waitForCancelWithSignaling(t, &num_completed, null); + }, + } + recancel(t); + }, + }, + .pending_awaited => unreachable, // `await` raced with `await` + .pending_canceled => unreachable, // `await` raced with `cancel` + .done => {}, + } + @memcpy(result, future.resultPointer()); + future.destroy(t.allocator); } fn cancel( @@ -1645,9 +1999,26 @@ fn cancel( ) void { _ = result_alignment; const t: *Threaded = @ptrCast(@alignCast(userdata)); - const ac: *AsyncClosure = @ptrCast(@alignCast(any_future)); - ac.closure.requestCancel(t); - ac.waitAndDeinit(t, result); + const future: *Future = @ptrCast(@alignCast(any_future)); + + var num_completed: std.atomic.Value(u32) = .init(0); + future.awaiter = &num_completed; + + const pre_cancel_status = future.status.fetchOr(.{ + .tag = .pending_canceled, + .thread = .null, + }, .acq_rel); // acquire results if complete; release `future.awaiter` + switch (pre_cancel_status.tag) { + .pending => { + const working_thread = pre_cancel_status.thread.unpack(); + future.waitForCancelWithSignaling(t, &num_completed, @alignCast(working_thread)); + }, + .pending_awaited => unreachable, // `await` raced with `await` + .pending_canceled => unreachable, // `await` raced with `cancel` + .done => {}, + } + @memcpy(result, future.resultPointer()); + future.destroy(t.allocator); } fn futexWait(userdata: ?*anyopaque, ptr: *const u32, expected: u32, timeout: Io.Timeout) Io.Cancelable!void { @@ -8555,32 +8926,69 @@ fn sleepPosix(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) Io.Cancelable!usize { const t: *Threaded = @ptrCast(@alignCast(userdata)); - var event: Io.Event = .unset; + var num_completed: std.atomic.Value(u32) = .init(0); - for (futures, 0..) |future, i| { - const closure: *AsyncClosure = @ptrCast(@alignCast(future)); - if (@atomicRmw(?*Io.Event, &closure.select_condition, .Xchg, &event, .seq_cst) == AsyncClosure.done_event) { - for (futures[0..i]) |cleanup_future| { - const cleanup_closure: *AsyncClosure = @ptrCast(@alignCast(cleanup_future)); - if (@atomicRmw(?*Io.Event, &cleanup_closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_event) { - cleanup_closure.event.waitUncancelable(ioBasic(t)); // Ensure no reference to our stack-allocated event. - } - } - return i; + for (futures, 0..) |any_future, i| { + const future: *Future = @ptrCast(@alignCast(any_future)); + future.awaiter = &num_completed; + const old_status = future.status.fetchOr( + .{ .tag = .pending_awaited, .thread = .null }, + .release, // release `future.awaiter` + ); + switch (old_status.tag) { + .pending => {}, + .pending_awaited => unreachable, // `await` raced with `select` + .pending_canceled => unreachable, // `cancel` raced with `select` + .done => { + future.status.store(old_status, .monotonic); + _ = finishSelect(&num_completed, futures[0..i]); + return i; + }, } } - try event.wait(ioBasic(t)); + errdefer _ = finishSelect(&num_completed, futures); + const thread: *Thread = .getCurrent(t); - var result: ?usize = null; - for (futures, 0..) |future, i| { - const closure: *AsyncClosure = @ptrCast(@alignCast(future)); - if (@atomicRmw(?*Io.Event, &closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_event) { - closure.event.waitUncancelable(ioBasic(t)); // Ensure no reference to our stack-allocated event. - if (result == null) result = i; // In case multiple are ready, return first. + while (true) { + const n = num_completed.load(.acquire); + if (n > 0) break; + assert(n < futures.len); + try thread.futexWait(&num_completed.raw, n); + } + return finishSelect(&num_completed, futures).?; +} +fn finishSelect( + num_completed: *std.atomic.Value(u32), + futures: []const *Io.AnyFuture, +) ?usize { + var completed_index: ?usize = null; + var expect_completed: u32 = 0; + for (futures, 0..) |any_future, i| { + const future: *Future = @ptrCast(@alignCast(any_future)); + // This operation will convert `.pending_awaited` to `.pending`, or leave `.done` untouched. + switch (future.status.fetchAnd( + .{ .tag = @enumFromInt(0b10), .thread = .all_ones }, + .monotonic, + ).tag) { + .pending_awaited => {}, + .pending => unreachable, + .pending_canceled => unreachable, + .done => { + expect_completed += 1; + completed_index = i; + }, } } - return result.?; + // If any future has just finished, wait for it to signal `num_completed` to avoid dangling + // references to stack memory. + while (true) { + const n = num_completed.load(.acquire); + if (n == expect_completed) break; + assert(n < expect_completed); + Thread.futexWaitUncancelable(&num_completed.raw, n); + } + return completed_index; } fn netListenIpPosix(