diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index c9e6ba3fc7..f95738e35d 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -37,7 +37,7 @@ cpu_count_error: ?std.Thread.CpuCountError, /// available count, subtract this from either `async_limit` or /// `concurrent_limit`. busy_count: usize = 0, -main_thread: Thread, +worker_threads: std.atomic.Value(?*Thread), pid: Pid = .unknown, robust_cancel: RobustCancel, @@ -153,107 +153,465 @@ pub const UseFchmodat2 = if (have_fchmodat2 and !have_fchmodat_flags) enum { pub const default: UseFchmodat2 = .disabled; }; +const Runnable = struct { + node: std.SinglyLinkedList.Node, + startFn: *const fn (*Runnable, *Thread, *Threaded) void, +}; + +const Group = struct { + ptr: *Io.Group, + + /// Returns a correctly-typed pointer to the `Io.Group.token` field. + /// + /// The status indicates how many pending tasks are in the group, whether the group has been + /// canceled, and whether the group has been awaited. + /// + /// Note that the zero value of `Status` intentionally represents the initial group state (empty + /// with no awaiters). This is a requirement of `Io.Group`. + fn status(g: Group) *std.atomic.Value(Status) { + return @ptrCast(&g.ptr.token); + } + /// Returns a correctly-typed pointer to the `Io.Group.state` field. The double-pointer here is + /// intentional, because the `state` field itself stores a pointer, and this function returns a + /// pointer to that field. + /// + /// On completion of the whole group, if `status` indicates that there is an awaiter, the last + /// task must increment this `u32` and do a futex wake on it to signal that awaiter. + fn awaiter(g: Group) **std.atomic.Value(u32) { + return @ptrCast(&g.ptr.state); + } + + const Status = packed struct(usize) { + num_running: @Int(.unsigned, @bitSizeOf(usize) - 2), + have_awaiter: bool, + canceled: bool, + }; + + const Task = struct { + runnable: Runnable, + group: *Io.Group, + func: *const fn (*Io.Group, context: *const anyopaque) void, + context_alignment: Alignment, + alloc_len: usize, + + /// `Task.runnable.node` is `undefined` in the created `Task`. + fn create( + gpa: Allocator, + group: Group, + context: []const u8, + context_alignment: Alignment, + func: *const fn (*Io.Group, context: *const anyopaque) void, + ) Allocator.Error!*Task { + const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(Task); + const worst_case_context_offset = context_alignment.forward(@sizeOf(Task) + max_context_misalignment); + const alloc_len = worst_case_context_offset + context.len; + + const task: *Task = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(Task), alloc_len))); + errdefer comptime unreachable; + + task.* = .{ + .runnable = .{ + .node = undefined, + .startFn = &start, + }, + .group = group.ptr, + .func = func, + .context_alignment = context_alignment, + .alloc_len = alloc_len, + }; + @memcpy(task.contextPointer()[0..context.len], context); + return task; + } + + fn destroy(task: *Task, gpa: Allocator) void { + const base: [*]align(@alignOf(Task)) u8 = @ptrCast(task); + gpa.free(base[0..task.alloc_len]); + } + + fn contextPointer(task: *Task) [*]u8 { + const base: [*]u8 = @ptrCast(task); + const offset = task.context_alignment.forward(@intFromPtr(base) + @sizeOf(Task)) - @intFromPtr(base); + return base + offset; + } + + fn start(r: *Runnable, thread: *Thread, t: *Threaded) void { + const task: *Task = @fieldParentPtr("runnable", r); + const group: Group = .{ .ptr = task.group }; + + // This would be a simple store, but it's upgraded to an RMW so we can use `.acquire` to + // enforce the ordering between this and the `group.status().load` below. Paired with + // the `.release` rmw on `Thread.status` in `cancelThreads`, this creates a StoreLoad + // barrier which guarantees that when a group is canceled, either we see the cancelation + // in the group status, or the canceler sees our thread status so can directly notify us + // of the cancelation. + _ = thread.status.swap(.{ + .cancelation = .none, + .awaitable = .fromGroup(group.ptr), + }, .acquire); + if (group.status().load(.monotonic).canceled) { + thread.status.store(.{ + .cancelation = .canceling, + .awaitable = .fromGroup(group.ptr), + }, .monotonic); + } + + assertGroupResult(task.func(group.ptr, task.contextPointer())); + + thread.status.store(.{ .cancelation = .none, .awaitable = .null }, .monotonic); + const old_status = group.status().fetchSub(.{ + .num_running = 1, + .have_awaiter = false, + .canceled = false, + }, .acq_rel); // acquire `group.awaiter()`, release task results + assert(old_status.num_running > 0); + if (old_status.have_awaiter and old_status.num_running == 1) { + const to_signal = group.awaiter().*; + // `awaiter` should only be modified by us. For another thread to see `num_running` + // drop to 0 after this point would indicate that another task started up, meaning + // `async`/`cancel` was racing with awaited group completion. + group.awaiter().* = undefined; + _ = to_signal.fetchAdd(1, .release); // release results + Thread.futexWake(&to_signal.raw, 1); + } + + // Task completed. Self-destruct sequence initiated. + task.destroy(t.allocator); + } + }; + + /// Assumes the caller has already atomically updated the group status to indicate cancelation, + /// and notifies any already-running threads of this cancelation. + fn cancelThreads(g: Group, t: *Threaded) bool { + var any_blocked = false; + var it = t.worker_threads.load(.acquire); // acquire `Thread` values + while (it) |thread| : (it = thread.next) { + // This non-mutating RMW exists for ordering reasons: see comment in `Group.Task.start` for reasons. + _ = thread.status.fetchOr(.{ .cancelation = @enumFromInt(0), .awaitable = .null }, .release); + if (thread.cancelAwaitable(.fromGroup(g.ptr))) any_blocked = true; + } + return any_blocked; + } + + /// Uses `Thread.signalCanceledSyscall` to signal any threads which are still blocked in a + /// syscall for this group and have not observed a cancelation request yet. Returns `true` if + /// more signals may be necessary, in which case the caller must call this again after a delay. + fn signalAllCanceledSyscalls(g: Group, t: *Threaded) bool { + var any_signaled = false; + var it = t.worker_threads.load(.acquire); // acquire `Thread` values + while (it) |thread| : (it = thread.next) { + if (thread.signalCanceledSyscall(t, .fromGroup(g.ptr))) any_signaled = true; + } + return any_signaled; + } + + /// The caller has canceled `g`. Inform any threads working on that group of the cancelation if + /// necessary, and wait for `g` to finish (indicated by `num_completed` being incremented from 0 + /// to 1), while sending regular signals to threads if necessary for them to unblock from any + /// cancelable syscalls. + /// + /// `skip_signals` means it is already known that no threads are currently working on the group + /// so no notifications or signals are necessary. + fn waitForCancelWithSignaling( + g: Group, + t: *Threaded, + num_completed: *std.atomic.Value(u32), + skip_signals: bool, + ) void { + var need_signal: bool = !skip_signals and g.cancelThreads(t); + var timeout_ns: u64 = 1 << 10; + while (true) { + need_signal = need_signal and g.signalAllCanceledSyscalls(t) and t.robust_cancel == .enabled; + Thread.futexWaitTimed( + null, + &num_completed.raw, + 0, + if (need_signal) timeout_ns else null, + ) catch |err| switch (err) { + error.Canceled => unreachable, + }; + switch (num_completed.load(.acquire)) { // acquire task results + 0 => {}, + 1 => break, + else => unreachable, + } + timeout_ns <<|= 1; + } + } +}; + +/// Trailing data: +/// 1. context +/// 2. result +const Future = struct { + runnable: Runnable, + func: *const fn (context: *const anyopaque, result: *anyopaque) void, + status: std.atomic.Value(Status), + /// On completion, increment this `u32` and do a futex wake on it. + awaiter: *std.atomic.Value(u32), + context_alignment: Alignment, + result_offset: usize, + alloc_len: usize, + + const Status = packed struct(usize) { + /// The values of this enum are chosen so that await/cancel can just OR with 0b01 and 0b11 + /// respectively. That *does* clobber `.done`, but that's actually fine, because if the tag + /// is `.done` then only the awaiter is referencing this `Future` anyway. + tag: enum(u2) { + /// The future is queued or running (depending on whether `thread` is set). + pending = 0b00, + /// Like `pending`, but the future is being awaited. `Future.awaiter` is populated. + pending_awaited = 0b01, + /// Like `pending`, but the future is being canceled. `Future.awaiter` is populated. + pending_canceled = 0b11, + /// The future has already completed. `thread` is `null`. + done = 0b10, + }, + /// When the future begins execution, this is atomically updated from `null` to the thread running the + /// `Future`, so that cancelation knows which thread to cancel. + thread: Thread.PackedPtr, + }; + + /// `Future.runnable.node` is `undefined` in the created `Future`. + fn create( + gpa: Allocator, + result_len: usize, + result_alignment: Alignment, + context: []const u8, + context_alignment: Alignment, + func: *const fn (context: *const anyopaque, result: *anyopaque) void, + ) Allocator.Error!*Future { + const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(Future); + const worst_case_context_offset = context_alignment.forward(@sizeOf(Future) + max_context_misalignment); + const worst_case_result_offset = result_alignment.forward(worst_case_context_offset + context.len); + const alloc_len = worst_case_result_offset + result_len; + + const future: *Future = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(Future), alloc_len))); + errdefer comptime unreachable; + + const actual_context_addr = context_alignment.forward(@intFromPtr(future) + @sizeOf(Future)); + const actual_result_addr = result_alignment.forward(actual_context_addr + context.len); + const actual_result_offset = actual_result_addr - @intFromPtr(future); + future.* = .{ + .runnable = .{ + .node = undefined, + .startFn = &start, + }, + .func = func, + .status = .init(.{ + .tag = .pending, + .thread = .null, + }), + .awaiter = undefined, + .context_alignment = context_alignment, + .result_offset = actual_result_offset, + .alloc_len = alloc_len, + }; + @memcpy(future.contextPointer()[0..context.len], context); + return future; + } + + fn destroy(future: *Future, gpa: Allocator) void { + const base: [*]align(@alignOf(Future)) u8 = @ptrCast(future); + gpa.free(base[0..future.alloc_len]); + } + + fn resultPointer(future: *Future) [*]u8 { + const base: [*]u8 = @ptrCast(future); + return base + future.result_offset; + } + + fn contextPointer(future: *Future) [*]u8 { + const base: [*]u8 = @ptrCast(future); + const context_offset = future.context_alignment.forward(@intFromPtr(future) + @sizeOf(Future)) - @intFromPtr(future); + return base + context_offset; + } + + fn start(r: *Runnable, thread: *Thread, t: *Threaded) void { + _ = t; + const future: *Future = @fieldParentPtr("runnable", r); + + thread.status.store(.{ + .cancelation = .none, + .awaitable = .fromFuture(future), + }, .monotonic); + { + const old_status = future.status.fetchOr(.{ + .tag = .pending, + .thread = .pack(thread), + }, .release); + assert(old_status.thread == .null); + switch (old_status.tag) { + .pending, .pending_awaited => {}, + .pending_canceled => thread.status.store(.{ + .cancelation = .canceling, + .awaitable = .fromFuture(future), + }, .monotonic), + .done => unreachable, + } + } + + future.func(future.contextPointer(), future.resultPointer()); + + thread.status.store(.{ .cancelation = .none, .awaitable = .null }, .monotonic); + const old_status = future.status.swap(.{ + .tag = .done, + .thread = .null, + }, .acq_rel); // acquire `future.awaiter`, release results + switch (old_status.tag) { + .pending => {}, + .pending_awaited, .pending_canceled => { + const to_signal = future.awaiter; + _ = to_signal.fetchAdd(1, .release); // release results + Thread.futexWake(&to_signal.raw, 1); + }, + .done => unreachable, + } + } + + /// The caller has canceled `future`. `thread` is the thread currently running that future. + /// Inform `thread` of the cancelation if necessary, and wait for `future` to finish (indicated + /// by `num_completed` being incremented from 0 to 1), while sending regular signals to `thread` + /// if necessary for it to unblock from a cancelable syscall. + fn waitForCancelWithSignaling( + future: *Future, + t: *Threaded, + num_completed: *std.atomic.Value(u32), + thread: ?*Thread, + ) void { + var need_signal: bool = thread != null and thread.?.cancelAwaitable(.fromFuture(future)); + var timeout_ns: u64 = 1 << 10; + while (true) { + need_signal = need_signal and thread.?.signalCanceledSyscall(t, .fromFuture(future)) and t.robust_cancel == .enabled; + Thread.futexWaitTimed( + null, + &num_completed.raw, + 0, + if (need_signal) timeout_ns else null, + ) catch |err| switch (err) { + error.Canceled => unreachable, + }; + switch (num_completed.load(.acquire)) { // acquire task results + 0 => {}, + 1 => break, + else => unreachable, + } + timeout_ns <<|= 1; + } + } +}; + +/// A sequence of (ptr_bit_width - 3) bits which uniquely identifies a group or future. The bits are +/// the MSBs of the `*Io.Group` or `*Future`. These things do not necessarily have 3 zero bits at +/// the end (they are pointer-aligned, so on 32-bit targets only have 2), but because they both have +/// a *size* of at least 8 bytes, no two groups/futures in memory at the same time will have the +/// same value for all of these bits. In other words, given a group/future pointer, the next group +/// or future must be at least 8 bytes later, so its address will have a different value for one of +/// the top (ptr_bit_width - 3) bits. +const AwaitableId = enum(@Int(.unsigned, @bitSizeOf(usize) - 3)) { + comptime { + assert(@sizeOf(Future) >= 8); + assert(@sizeOf(Io.Group) >= 8); + } + null = 0, + all_ones = std.math.maxInt(@Int(.unsigned, @bitSizeOf(usize) - 3)), + _, + const Split = packed struct(usize) { low: u3, high: AwaitableId }; + fn fromGroup(g: *Io.Group) AwaitableId { + const split: Split = @bitCast(@intFromPtr(g)); + return split.high; + } + fn fromFuture(f: *Future) AwaitableId { + const split: Split = @bitCast(@intFromPtr(f)); + return split.high; + } +}; + const Thread = struct { + next: ?*Thread, /// The value that needs to be passed to pthread_kill or tgkill in order to /// send a signal. - signal_id: SignaleeId, - current_closure: ?*Closure, - /// Only populated if `current_closure != null`. Indicates the current cancel protection mode. + signalee_id: SignaleeId, + + status: std.atomic.Value(Status), + cancel_protection: Io.CancelProtection, + const Status = packed struct(usize) { + /// The specific values of these enum fields are chosen to simplify the implementation of + /// the transformations we need to apply to this state. + cancelation: enum(u3) { + /// The thread has not yet been canceled, and is not in a cancelable operation. + /// To request cancelation, just set the status to `.canceling`. + none = 0b000, + + /// The thread is parked in a cancelable futex wait or sleep. + /// Only applicable on Windows, NetBSD, and Illumos. + /// To request cancelation, set the status to `.canceling` and unpark the thread. + /// To unpark for another reason (futex wake), set the status to `.none` and unpark the thread. + parked = 0b001, + + /// The thread is blocked in a cancelable system call. + /// To request cancelation, set the status to `.blocked_canceling` and repeatedly interrupt the system call until the status changes. + blocked = 0b011, + + /// Windows-only: the thread is blocked on a DNS query. + /// To request cancelation, set the status to `.canceling` and call `DnsCancelQuery`. + blocked_windows_dns = 0b010, + + /// The thread has an outstanding cancelation request but is not in a cancelable operation. + /// When it acknowledges the cancelation, it will set the status to `.canceled`. + canceling = 0b110, + + /// The thread has received and acknowledged a cancelation request. + /// If `recancel` is called, the status will revert to `.canceling`, but otherwise, the status + /// will not change for the remainder of this task's execution. + canceled = 0b111, + + /// The thread is blocked in a cancelable system call, and is being canceled. The thread which triggered the cancelation will send signals to this thread + /// until its status changes. + blocked_canceling = 0b101, + }, + + /// We cannot turn this value back into a pointer. Instead, it exists so that a task can be + /// canceled by a cmpxchg on thread status: if it is running the task we want to cancel, + /// then update the `cancelation` field. + awaitable: AwaitableId, + }; + const SignaleeId = if (std.Thread.use_pthreads) std.c.pthread_t else std.Thread.Id; threadlocal var current: ?*Thread = null; - fn getCurrent(t: *Threaded) *Thread { - return current orelse return &t.main_thread; - } - - fn checkCancel(thread: *Thread) error{Canceled}!void { - const closure = thread.current_closure orelse return; - + /// The thread is neither in a syscall nor entering one, but we want to check for cancelation + /// anyway. If there is a pending cancel request, acknowledge it and return `error.Canceled`. + fn checkCancel() Io.Cancelable!void { + const thread = Thread.current orelse return; switch (thread.cancel_protection) { - .unblocked => {}, .blocked => return, - } - - switch (@cmpxchgStrong( - CancelStatus, - &closure.cancel_status, - .requested, - .acknowledged, - .acq_rel, - .acquire, - ) orelse return error.Canceled) { - .requested => unreachable, - .acknowledged => unreachable, - .none, _ => {}, - } - } - - fn beginSyscall(thread: *Thread) error{Canceled}!void { - const closure = thread.current_closure orelse return; - - switch (thread.cancel_protection) { .unblocked => {}, - .blocked => return, } - - switch (@cmpxchgStrong( - CancelStatus, - &closure.cancel_status, - .none, - .fromSignaleeId(thread.signal_id), - .acq_rel, - .acquire, - ) orelse return) { - .none => unreachable, - .requested => { - @atomicStore(CancelStatus, &closure.cancel_status, .acknowledged, .release); + // Here, unlike `Syscall.checkCancel`, it's not particularly likely that we're canceled, so + // it seems preferable to do a cheap atomic load and, in the unlikely case, a separate store + // to acknowledge. Besides, the state transitions we need here can't be done with one atomic + // OR/AND/XOR on `Status.cancelation`, so we don't actually have any other option. + const status = thread.status.load(.monotonic); + switch (status.cancelation) { + .parked => unreachable, + .blocked => unreachable, + .blocked_windows_dns => unreachable, + .blocked_canceling => unreachable, + .none, .canceled => {}, + .canceling => { + thread.status.store(.{ + .cancelation = .canceled, + .awaitable = status.awaitable, + }, .monotonic); return error.Canceled; }, - .acknowledged => return, - _ => unreachable, } } - fn endSyscall(thread: *Thread) void { - const closure = thread.current_closure orelse return; - - switch (thread.cancel_protection) { - .unblocked => {}, - .blocked => return, - } - - _ = @cmpxchgStrong( - CancelStatus, - &closure.cancel_status, - .fromSignaleeId(thread.signal_id), - .none, - .acq_rel, - .acquire, - ) orelse return; - } - - fn endSyscallErrnoBug(thread: *Thread, err: posix.E) Io.UnexpectedError { - @branchHint(.cold); - thread.endSyscall(); - return errnoBug(err); - } - - fn endSyscallUnexpectedErrno(thread: *Thread, err: posix.E) Io.UnexpectedError { - @branchHint(.cold); - thread.endSyscall(); - return posix.unexpectedErrno(err); - } - - /// inline to make error return traces slightly shallower. - inline fn endSyscallError(thread: *Thread, err: anytype) @TypeOf(err) { - thread.endSyscall(); - return err; - } - - fn currentSignalId() SignaleeId { + fn currentSignaleeId() SignaleeId { return if (std.Thread.use_pthreads) std.c.pthread_self() else std.Thread.getCurrentId(); } @@ -262,10 +620,7 @@ const Thread = struct { } fn futexWait(thread: *Thread, ptr: *const u32, expect: u32) Io.Cancelable!void { - return Thread.futexWaitTimed(thread, ptr, expect, null) catch |err| switch (err) { - error.Canceled => return error.Canceled, - error.Timeout => unreachable, - }; + return Thread.futexWaitTimed(thread, ptr, expect, null); } fn futexWaitTimed(thread: ?*Thread, ptr: *const u32, expect: u32, timeout_ns: ?u64) Io.Cancelable!void { @@ -543,6 +898,191 @@ const Thread = struct { }, } } + + /// Cancels `thread` if it is working on `awaitable`. + /// + /// It is possible that `thread` gets canceled by this function, but is blocked in a syscall. In + /// that case, the thread may need to be sent a signal to interrupt the call. This function will + /// return `true` to indicate this, in which case the caller must call `signalCanceledSyscall`. + fn cancelAwaitable(thread: *Thread, awaitable: AwaitableId) bool { + var status = thread.status.load(.monotonic); + while (true) { + if (status.awaitable != awaitable) return false; // thread is working on something else + status = switch (status.cancelation) { + .none => thread.status.cmpxchgWeak( + .{ .cancelation = .none, .awaitable = awaitable }, + .{ .cancelation = .canceling, .awaitable = awaitable }, + .monotonic, + .monotonic, + ) orelse return false, + + .parked => thread.status.cmpxchgWeak( + .{ .cancelation = .parked, .awaitable = awaitable }, + .{ .cancelation = .canceling, .awaitable = awaitable }, + .monotonic, + .monotonic, + ) orelse { + if (true) @panic("MLUGG TODO: unpark thread"); + return false; + }, + + .blocked => thread.status.cmpxchgWeak( + .{ .cancelation = .blocked, .awaitable = awaitable }, + .{ .cancelation = .blocked_canceling, .awaitable = awaitable }, + .monotonic, + .monotonic, + ) orelse return true, + + .blocked_windows_dns => thread.status.cmpxchgWeak( + .{ .cancelation = .blocked_windows_dns, .awaitable = awaitable }, + .{ .cancelation = .canceling, .awaitable = awaitable }, + .monotonic, + .monotonic, + ) orelse return false, + + .canceling, .canceled => { + // This can happen when the task start raced with the cancelation, so the thread + // saw the cancelation on the future/group *and* we are trying to signal the + // thread here. + return false; + }, + + .blocked_canceling => unreachable, + }; + } + } + + /// Sends a signal to `thread` if it is still blocked in a syscall (i.e. has not yet observed + /// the cancelation request from `cancelAwaitable`). + /// + /// Unfortunately, the signal could arrive before the syscall actually starts, so the interrupt + /// is missed. To handle this, we may need to send multiple signals. As such, if this function + /// returns `true`, then it should be called again after a short delay to send another signal if + /// the thread is still blocked. For the implementation, `Future.waitForCancelWithSignaling` and + /// `Group.waitForCancelWithSignaling`: they use exponential backoff starting at a 1us delay and + /// doubling each call. In practice, it is rare to send more than one signal. + fn signalCanceledSyscall(thread: *Thread, t: *Threaded, awaitable: AwaitableId) bool { + const bad_status: Status = .{ .cancelation = .blocked_canceling, .awaitable = awaitable }; + if (thread.status.load(.monotonic) != bad_status) return false; + + // The thread ID can be read non-atomically because it never changes and was released by the + // store that made `thread` available to us. + const signalee_id = thread.signalee_id; + + if (std.Thread.use_pthreads) { + if (std.c.pthread_kill(signalee_id, .IO) != 0) return false; + } else if (native_os == .linux) { + const pid: posix.pid_t = pid: { + const cached_pid = @atomicLoad(Pid, &t.pid, .monotonic); + if (cached_pid != .unknown) break :pid @intFromEnum(cached_pid); + const pid = std.os.linux.getpid(); + @atomicStore(Pid, &t.pid, @enumFromInt(pid), .monotonic); + break :pid pid; + }; + if (std.os.linux.tgkill(pid, @bitCast(signalee_id), .IO) != 0) return false; + } else { + @compileError("MLUGG TODO"); + } + + return true; + } + + /// Like a `*Thread`, but 2 bits smaller than a pointer (because the LSBs are always 0 due to + /// alignment) so that those two bits can be used in a `packed struct`. + const PackedPtr = enum(@Int(.unsigned, @bitSizeOf(usize) - 2)) { + null = 0, + all_ones = std.math.maxInt(@Int(.unsigned, @bitSizeOf(usize) - 2)), + _, + + const Split = packed struct(usize) { low: u2, high: PackedPtr }; + fn pack(ptr: *Thread) PackedPtr { + const split: Split = @bitCast(@intFromPtr(ptr)); + assert(split.low == 0); + return split.high; + } + fn unpack(ptr: PackedPtr) ?*Thread { + const split: Split = .{ .low = 0, .high = ptr }; + return @ptrFromInt(@as(usize, @bitCast(split))); + } + }; +}; + +const Syscall = struct { + thread: ?*Thread, + /// Marks entry to a syscall region. This should be tightly scoped around the actual syscall + /// to minimize races. The syscall must be marked as "finished" by `checkCancel`, `finish`, + /// or one of the wrappers of `finish`. + fn start() Io.Cancelable!Syscall { + const thread = Thread.current orelse return .{ .thread = null }; + switch (thread.cancel_protection) { + .blocked => return .{ .thread = null }, + .unblocked => {}, + } + switch (thread.status.fetchOr(.{ + .cancelation = @enumFromInt(0b011), + .awaitable = .null, + }, .monotonic).cancelation) { + .parked => unreachable, + .blocked => unreachable, + .blocked_windows_dns => unreachable, + .blocked_canceling => unreachable, + .none => return .{ .thread = thread }, // new status is `.blocked` + .canceling => return error.Canceled, // new status is `.canceled` + .canceled => return .{ .thread = null }, // new status is `.canceled` (unchanged) + } + } + /// Checks whether this syscall has been canceled. This should be called when a syscall is + /// interrupted through a mechanism which may indicate cancelation, or may be spurious. If + /// the syscall was canceled, it is finished and `error.Canceled` is returned. Otherwise, + /// the syscall is not marked finished, and the caller should retry. + fn checkCancel(s: Syscall) Io.Cancelable!void { + const thread = s.thread orelse return; + switch (thread.status.fetchOr(.{ + .cancelation = @enumFromInt(0b010), + .awaitable = .null, + }, .monotonic).cancelation) { + .none => unreachable, + .parked => unreachable, + .blocked_windows_dns => unreachable, + .canceling => unreachable, + .canceled => unreachable, + .blocked => {}, // new status is `.blocked` (unchanged) + .blocked_canceling => return error.Canceled, // new status is `.canceled` + } + } + /// Marks this syscall as finished. + fn finish(s: Syscall) void { + const thread = s.thread orelse return; + switch (thread.status.fetchXor(.{ + .cancelation = @enumFromInt(0b011), + .awaitable = .null, + }, .monotonic).cancelation) { + .none => unreachable, + .parked => unreachable, + .blocked_windows_dns => unreachable, + .canceling => unreachable, + .canceled => unreachable, + .blocked => {}, // new status is `.none` + .blocked_canceling => {}, // new status is `.canceling` + } + } + /// Convenience wrapper which calls `finish`, then returns `err`. + fn fail(s: Syscall, err: anytype) @TypeOf(err) { + s.finish(); + return err; + } + /// Convenience wrapper which calls `finish`, then calls `Threaded.errnoBug`. + fn errnoBug(s: Syscall, err: posix.E) Io.UnexpectedError { + @branchHint(.cold); + s.finish(); + return Threaded.errnoBug(err); + } + /// Convenience wrapper which calls `finish`, then calls `posix.unexpectedErrno`. + fn unexpectedErrno(s: Syscall, err: posix.E) Io.UnexpectedError { + @branchHint(.cold); + s.finish(); + return posix.unexpectedErrno(err); + } }; const max_iovecs_len = 8; @@ -552,114 +1092,6 @@ comptime { if (@TypeOf(posix.IOV_MAX) != void) assert(max_iovecs_len <= posix.IOV_MAX); } -const CancelStatus = enum(usize) { - /// Cancellation has neither been requested, nor checked. The async - /// operation will check status before entering a blocking syscall. - /// This is also the status used for uninteruptible tasks. - none = 0, - /// Cancellation has been requested and the status will be checked before - /// entering a blocking syscall. - requested = std.math.maxInt(usize) - 1, - /// Cancellation has been acknowledged and is in progress. Signals should - /// not be sent. - acknowledged = std.math.maxInt(usize), - /// Stores a `Thread.SignaleeId` and indicates that sending a signal to this thread - /// is needed in order to cancel. This state is set before going into - /// a blocking operation that needs to get unblocked via signal. - _, - - const Unpacked = union(enum) { - none, - requested, - acknowledged, - signal_id: Thread.SignaleeId, - }; - - fn unpack(cs: CancelStatus) Unpacked { - return switch (cs) { - .none => .none, - .requested => .requested, - .acknowledged => .acknowledged, - _ => |signal_id| .{ - .signal_id = if (std.Thread.use_pthreads) - @ptrFromInt(@intFromEnum(signal_id)) - else - @truncate(@intFromEnum(signal_id)), - }, - }; - } - - fn fromSignaleeId(signal_id: Thread.SignaleeId) CancelStatus { - return if (std.Thread.use_pthreads) - @enumFromInt(@intFromPtr(signal_id)) - else - @enumFromInt(signal_id); - } -}; - -const Closure = struct { - start: Start, - node: std.SinglyLinkedList.Node = .{}, - cancel_status: CancelStatus, - - const Start = *const fn (*Closure, *Threaded) void; - - fn requestCancel(closure: *Closure, t: *Threaded) void { - var signal_id = switch (@atomicRmw(CancelStatus, &closure.cancel_status, .Xchg, .requested, .monotonic).unpack()) { - .none, .acknowledged, .requested => return, - .signal_id => |signal_id| signal_id, - }; - // The task will enter a blocking syscall before checking for cancellation again. - // We can send a signal to interrupt the syscall, but if it arrives before - // the syscall instruction, it will be missed. Therefore, this code tries - // again until the cancellation request is acknowledged. - - // 1 << 10 ns is about 1 microsecond, approximately syscall overhead. - // 1 << 20 ns is about 1 millisecond. - // 1 << 30 ns is about 1 second. - // - // On a heavily loaded Linux 6.17.5, I observed a maximum of 20 - // attempts not acknowledged before the timeout (including exponential - // backoff) was sufficient, despite the heavy load. - const max_attempts = 22; - - for (0..max_attempts) |attempt_index| { - if (std.Thread.use_pthreads) { - if (std.c.pthread_kill(signal_id, .IO) != 0) return; - } else if (native_os == .linux) { - const pid: posix.pid_t = p: { - const cached_pid = @atomicLoad(Pid, &t.pid, .monotonic); - if (cached_pid != .unknown) break :p @intFromEnum(cached_pid); - const pid = std.os.linux.getpid(); - @atomicStore(Pid, &t.pid, @enumFromInt(pid), .monotonic); - break :p pid; - }; - if (std.os.linux.tgkill(pid, @bitCast(signal_id), .IO) != 0) return; - } else { - return; - } - - if (t.robust_cancel != .enabled) return; - - var timespec: posix.timespec = .{ - .sec = 0, - .nsec = @as(isize, 1) << @intCast(attempt_index), - }; - if (native_os == .linux) { - _ = std.os.linux.clock_nanosleep(posix.CLOCK.MONOTONIC, .{ .ABSTIME = false }, ×pec, ×pec); - } else { - _ = posix.system.nanosleep(×pec, ×pec); - } - - switch (@atomicRmw(CancelStatus, &closure.cancel_status, .Xchg, .requested, .monotonic).unpack()) { - .requested => continue, // Retry needed in case other thread hasn't yet entered the syscall. - .none, .acknowledged => return, - .signal_id => |new_signal_id| signal_id = new_signal_id, - } - } - } -}; - pub const InitOptions = struct { /// Affects how many bytes are memory-mapped for threads. stack_size: usize = std.Thread.SpawnConfig.default_stack_size, @@ -727,14 +1159,10 @@ pub fn init( .old_sig_io = undefined, .old_sig_pipe = undefined, .have_signal_handler = false, - .main_thread = .{ - .signal_id = Thread.currentSignalId(), - .current_closure = null, - .cancel_protection = .unblocked, - }, .argv0 = options.argv0, .environ = options.environ, .robust_cancel = options.robust_cancel, + .worker_threads = .init(null), }; if (posix.Sigaction != void) { @@ -768,14 +1196,10 @@ pub const init_single_threaded: Threaded = .{ .old_sig_io = undefined, .old_sig_pipe = undefined, .have_signal_handler = false, - .main_thread = .{ - .signal_id = undefined, - .current_closure = null, - .cancel_protection = .unblocked, - }, .robust_cancel = .disabled, .argv0 = .{}, .environ = .{}, + .worker_threads = .init(null), }; var global_single_threaded_instance: Threaded = .init_single_threaded; @@ -822,22 +1246,40 @@ fn join(t: *Threaded) void { fn worker(t: *Threaded) void { var thread: Thread = .{ - .signal_id = Thread.currentSignalId(), - .current_closure = null, + .next = undefined, + .signalee_id = Thread.currentSignaleeId(), + .status = .init(.{ + .cancelation = .none, + .awaitable = .null, + }), .cancel_protection = .unblocked, }; Thread.current = &thread; + { + var head = t.worker_threads.load(.monotonic); + while (true) { + thread.next = head; + head = t.worker_threads.cmpxchgWeak( + head, + &thread, + .release, + .monotonic, + ) orelse break; + } + } + defer t.wait_group.finish(); t.mutex.lock(); defer t.mutex.unlock(); while (true) { - while (t.run_queue.popFirst()) |closure_node| { + while (t.run_queue.popFirst()) |runnable_node| { t.mutex.unlock(); - const closure: *Closure = @fieldParentPtr("node", closure_node); - closure.start(closure, t); + thread.cancel_protection = .unblocked; + const runnable: *Runnable = @fieldParentPtr("node", runnable_node); + runnable.startFn(runnable, &thread, t); t.mutex.lock(); t.busy_count -= 1; } @@ -1145,103 +1587,6 @@ const linux_copy_file_range_use_c = std.c.versionCheck(if (builtin.abi.isAndroid }); const linux_copy_file_range_sys = if (linux_copy_file_range_use_c) std.c else std.os.linux; -/// Trailing data: -/// 1. context -/// 2. result -const AsyncClosure = struct { - closure: Closure, - func: *const fn (context: *anyopaque, result: *anyopaque) void, - event: Io.Event, - select_condition: ?*Io.Event, - context_alignment: Alignment, - result_offset: usize, - alloc_len: usize, - - const done_event: *Io.Event = @ptrFromInt(@alignOf(Io.Event)); - - fn start(closure: *Closure, t: *Threaded) void { - const ac: *AsyncClosure = @alignCast(@fieldParentPtr("closure", closure)); - const current_thread = Thread.getCurrent(t); - - current_thread.current_closure = closure; - current_thread.cancel_protection = .unblocked; - - ac.func(ac.contextPointer(), ac.resultPointer()); - - current_thread.current_closure = null; - current_thread.cancel_protection = undefined; - - if (@atomicRmw(?*Io.Event, &ac.select_condition, .Xchg, done_event, .release)) |select_event| { - assert(select_event != done_event); - select_event.set(ioBasic(t)); - } - ac.event.set(ioBasic(t)); - } - - fn resultPointer(ac: *AsyncClosure) [*]u8 { - const base: [*]u8 = @ptrCast(ac); - return base + ac.result_offset; - } - - fn contextPointer(ac: *AsyncClosure) [*]u8 { - const base: [*]u8 = @ptrCast(ac); - const context_offset = ac.context_alignment.forward(@intFromPtr(ac) + @sizeOf(AsyncClosure)) - @intFromPtr(ac); - return base + context_offset; - } - - fn init( - gpa: Allocator, - result_len: usize, - result_alignment: Alignment, - context: []const u8, - context_alignment: Alignment, - func: *const fn (context: *const anyopaque, result: *anyopaque) void, - ) Allocator.Error!*AsyncClosure { - const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(AsyncClosure); - const worst_case_context_offset = context_alignment.forward(@sizeOf(AsyncClosure) + max_context_misalignment); - const worst_case_result_offset = result_alignment.forward(worst_case_context_offset + context.len); - const alloc_len = worst_case_result_offset + result_len; - - const ac: *AsyncClosure = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(AsyncClosure), alloc_len))); - errdefer comptime unreachable; - - const actual_context_addr = context_alignment.forward(@intFromPtr(ac) + @sizeOf(AsyncClosure)); - const actual_result_addr = result_alignment.forward(actual_context_addr + context.len); - const actual_result_offset = actual_result_addr - @intFromPtr(ac); - ac.* = .{ - .closure = .{ - .cancel_status = .none, - .start = start, - }, - .func = func, - .context_alignment = context_alignment, - .result_offset = actual_result_offset, - .alloc_len = alloc_len, - .event = .unset, - .select_condition = null, - }; - @memcpy(ac.contextPointer()[0..context.len], context); - return ac; - } - - fn waitAndDeinit(ac: *AsyncClosure, t: *Threaded, result: []u8) void { - ac.event.wait(ioBasic(t)) catch |err| switch (err) { - error.Canceled => { - ac.closure.requestCancel(t); - ac.event.waitUncancelable(ioBasic(t)); - recancel(t); - }, - }; - @memcpy(result, ac.resultPointer()[0..result.len]); - ac.deinit(t.allocator); - } - - fn deinit(ac: *AsyncClosure, gpa: Allocator) void { - const base: [*]align(@alignOf(AsyncClosure)) u8 = @ptrCast(ac); - gpa.free(base[0..ac.alloc_len]); - } -}; - fn async( userdata: ?*anyopaque, result: []u8, @@ -1255,10 +1600,13 @@ fn async( start(context.ptr, result.ptr); return null; } + const gpa = t.allocator; - const ac = AsyncClosure.init(gpa, result.len, result_alignment, context, context_alignment, start) catch { - start(context.ptr, result.ptr); - return null; + const future = Future.create(gpa, result.len, result_alignment, context, context_alignment, start) catch |err| switch (err) { + error.OutOfMemory => { + start(context.ptr, result.ptr); + return null; + }, }; t.mutex.lock(); @@ -1267,7 +1615,7 @@ fn async( if (busy_count >= @intFromEnum(t.async_limit)) { t.mutex.unlock(); - ac.deinit(gpa); + future.destroy(gpa); start(context.ptr, result.ptr); return null; } @@ -1281,17 +1629,18 @@ fn async( t.wait_group.finish(); t.busy_count = busy_count; t.mutex.unlock(); - ac.deinit(gpa); + future.destroy(gpa); start(context.ptr, result.ptr); return null; }; thread.detach(); } - t.run_queue.prepend(&ac.closure.node); + t.run_queue.prepend(&future.runnable.node); + t.mutex.unlock(); t.cond.signal(); - return @ptrCast(ac); + return @ptrCast(future); } fn concurrent( @@ -1307,9 +1656,10 @@ fn concurrent( const t: *Threaded = @ptrCast(@alignCast(userdata)); const gpa = t.allocator; - const ac = AsyncClosure.init(gpa, result_len, result_alignment, context, context_alignment, start) catch - return error.ConcurrencyUnavailable; - errdefer ac.deinit(gpa); + const future = Future.create(gpa, result_len, result_alignment, context, context_alignment, start) catch |err| switch (err) { + error.OutOfMemory => return error.ConcurrencyUnavailable, + }; + errdefer future.destroy(gpa); t.mutex.lock(); defer t.mutex.unlock(); @@ -1329,110 +1679,32 @@ fn concurrent( const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch return error.ConcurrencyUnavailable; + thread.detach(); } - t.run_queue.prepend(&ac.closure.node); + t.run_queue.prepend(&future.runnable.node); + t.cond.signal(); - return @ptrCast(ac); + return @ptrCast(future); } -const GroupClosure = struct { - closure: Closure, - group: *Io.Group, - /// Points to sibling `GroupClosure`. Used for walking the group to cancel all. - node: std.SinglyLinkedList.Node, - func: *const fn (*Io.Group, context: *anyopaque) Io.Cancelable!void, - context_alignment: Alignment, - alloc_len: usize, - - fn start(closure: *Closure, t: *Threaded) void { - const gc: *GroupClosure = @alignCast(@fieldParentPtr("closure", closure)); - const current_thread = Thread.getCurrent(t); - const group = gc.group; - const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); - const event: *Io.Event = @ptrCast(&group.context); - current_thread.current_closure = closure; - current_thread.cancel_protection = .unblocked; - - assertResult(closure, gc.func(group, gc.contextPointer())); - - current_thread.current_closure = null; - current_thread.cancel_protection = undefined; - - const prev_state = group_state.fetchSub(sync_one_pending, .acq_rel); - assert((prev_state / sync_one_pending) > 0); - if (prev_state == (sync_one_pending | sync_is_waiting)) event.set(ioBasic(t)); - } - - fn assertResult(closure: *Closure, result: Io.Cancelable!void) void { - if (result) |_| switch (closure.cancel_status.unpack()) { - .none, .requested => {}, - .acknowledged => unreachable, // task illegally swallowed error.Canceled - .signal_id => unreachable, - } else |err| switch (err) { - error.Canceled => assert(closure.cancel_status == .acknowledged), - } - } - - fn contextPointer(gc: *GroupClosure) [*]u8 { - const base: [*]u8 = @ptrCast(gc); - const context_offset = gc.context_alignment.forward(@intFromPtr(gc) + @sizeOf(GroupClosure)) - @intFromPtr(gc); - return base + context_offset; - } - - /// Does not initialize the `node` field. - fn init( - gpa: Allocator, - group: *Io.Group, - context: []const u8, - context_alignment: Alignment, - func: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void, - ) Allocator.Error!*GroupClosure { - const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(GroupClosure); - const worst_case_context_offset = context_alignment.forward(@sizeOf(GroupClosure) + max_context_misalignment); - const alloc_len = worst_case_context_offset + context.len; - - const gc: *GroupClosure = @ptrCast(@alignCast(try gpa.alignedAlloc(u8, .of(GroupClosure), alloc_len))); - errdefer comptime unreachable; - - gc.* = .{ - .closure = .{ - .cancel_status = .none, - .start = start, - }, - .group = group, - .node = undefined, - .func = func, - .context_alignment = context_alignment, - .alloc_len = alloc_len, - }; - @memcpy(gc.contextPointer()[0..context.len], context); - return gc; - } - - fn deinit(gc: *GroupClosure, gpa: Allocator) void { - const base: [*]align(@alignOf(GroupClosure)) u8 = @ptrCast(gc); - gpa.free(base[0..gc.alloc_len]); - } - - const sync_is_waiting: usize = 1 << 0; - const sync_one_pending: usize = 1 << 1; -}; - fn groupAsync( userdata: ?*anyopaque, - group: *Io.Group, + type_erased: *Io.Group, context: []const u8, context_alignment: Alignment, start: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void, ) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); - if (builtin.single_threaded) return start(group, context.ptr) catch unreachable; + const g: Group = .{ .ptr = type_erased }; + + if (builtin.single_threaded) return start(g.ptr, context.ptr) catch unreachable; const gpa = t.allocator; - const gc = GroupClosure.init(gpa, group, context, context_alignment, start) catch - return t.assertGroupResult(start(group, context.ptr)); + const task = Group.Task.create(gpa, g, context, context_alignment, start) catch |err| switch (err) { + error.OutOfMemory => return t.assertGroupResult(start(g.ptr, context.ptr)), + }; t.mutex.lock(); @@ -1440,8 +1712,8 @@ fn groupAsync( if (busy_count >= @intFromEnum(t.async_limit)) { t.mutex.unlock(); - gc.deinit(gpa); - return t.assertGroupResult(start(group, context.ptr)); + task.destroy(gpa); + return t.assertGroupResult(start(g.ptr, context.ptr)); } t.busy_count = busy_count + 1; @@ -1453,37 +1725,48 @@ fn groupAsync( t.wait_group.finish(); t.busy_count = busy_count; t.mutex.unlock(); - gc.deinit(gpa); - return t.assertGroupResult(start(group, context.ptr)); + task.destroy(gpa); + return t.assertGroupResult(start(g.ptr, context.ptr)); }; thread.detach(); } - // Append to the group linked list inside the mutex to make `Io.Group.async` thread-safe. - gc.node = .{ .next = @ptrCast(@alignCast(group.token.load(.monotonic))) }; - group.token.store(&gc.node, .monotonic); - - t.run_queue.prepend(&gc.closure.node); - - // This needs to be done before unlocking the mutex to avoid a race with - // the associated task finishing. - const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); - const prev_state = group_state.fetchAdd(GroupClosure.sync_one_pending, .monotonic); - assert((prev_state / GroupClosure.sync_one_pending) < (std.math.maxInt(usize) / GroupClosure.sync_one_pending)); + // TODO: if this logic is changed to be lock-free, this `fetchAdd` must be released by the queue + // prepend so that the task doesn't finish without observing this and try to decrement the count + // below zero. + _ = g.status().fetchAdd(.{ + .num_running = 1, + .have_awaiter = false, + .canceled = false, + }, .monotonic); + t.run_queue.prepend(&task.runnable.node); t.mutex.unlock(); t.cond.signal(); } -fn assertGroupResult(t: *Threaded, result: Io.Cancelable!void) void { - const current_thread: *Thread = .getCurrent(t); - const current_closure = current_thread.current_closure orelse return; - GroupClosure.assertResult(current_closure, result); +fn assertGroupResult(result: Io.Cancelable!void) void { + const cancel_acknowledged = if (Thread.current) |thread| + switch (thread.status.load(.monotonic).cancelation) { + .none, .canceling => false, + .canceled => true, + .parked => unreachable, + .blocked => unreachable, + .blocked_windows_dns => unreachable, + .blocked_canceling => unreachable, + } + else + false; + if (result) { + assert(!cancel_acknowledged); // group task acknowledged cancelation but did not return `error.Canceled` + } else |err| switch (err) { + error.Canceled => assert(cancel_acknowledged), // group task returned `error.Canceled` but was never canceled + } } fn groupConcurrent( userdata: ?*anyopaque, - group: *Io.Group, + type_erased: *Io.Group, context: []const u8, context_alignment: Alignment, start: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void, @@ -1491,10 +1774,13 @@ fn groupConcurrent( if (builtin.single_threaded) return error.ConcurrencyUnavailable; const t: *Threaded = @ptrCast(@alignCast(userdata)); + const g: Group = .{ .ptr = type_erased }; const gpa = t.allocator; - const gc = GroupClosure.init(gpa, group, context, context_alignment, start) catch - return error.ConcurrencyUnavailable; + const task = Group.Task.create(gpa, g, context, context_alignment, start) catch |err| switch (err) { + error.OutOfMemory => return error.ConcurrencyUnavailable, + }; + errdefer task.destroy(gpa); t.mutex.lock(); defer t.mutex.unlock(); @@ -1514,102 +1800,126 @@ fn groupConcurrent( const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch return error.ConcurrencyUnavailable; + thread.detach(); } - // Append to the group linked list inside the mutex to make `Io.Group.concurrent` thread-safe. - gc.node = .{ .next = @ptrCast(@alignCast(group.token.load(.monotonic))) }; - group.token.store(&gc.node, .monotonic); - - t.run_queue.prepend(&gc.closure.node); - - // This needs to be done before unlocking the mutex to avoid a race with - // the associated task finishing. - const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); - const prev_state = group_state.fetchAdd(GroupClosure.sync_one_pending, .monotonic); - assert((prev_state / GroupClosure.sync_one_pending) < (std.math.maxInt(usize) / GroupClosure.sync_one_pending)); + // TODO: if this logic is changed to be lock-free, this `fetchAdd` must be released by the queue + // prepend so that the task doesn't finish without observing this and try to decrement the count + // below zero. + _ = g.status().fetchAdd(.{ + .num_running = 1, + .have_awaiter = false, + .canceled = false, + }, .monotonic); + t.run_queue.prepend(&task.runnable.node); t.cond.signal(); } -fn groupAwait(userdata: ?*anyopaque, group: *Io.Group, initial_token: *anyopaque) Io.Cancelable!void { - const t: *Threaded = @ptrCast(@alignCast(userdata)); - const gpa = t.allocator; - +fn groupAwait(userdata: ?*anyopaque, type_erased: *Io.Group, initial_token: *anyopaque) Io.Cancelable!void { _ = initial_token; // we need to load `token` *after* the group finishes + const t: *Threaded = @ptrCast(@alignCast(userdata)); + const g: Group = .{ .ptr = type_erased }; + const thread: *Thread = .getCurrent(t); - if (builtin.single_threaded) unreachable; // we never set `group.token` to non-`null` + var num_completed: std.atomic.Value(u32) = .init(0); + g.awaiter().* = &num_completed; - const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); - const event: *Io.Event = @ptrCast(&group.context); - const prev_state = group_state.fetchAdd(GroupClosure.sync_is_waiting, .acquire); - assert(prev_state & GroupClosure.sync_is_waiting == 0); - { - errdefer _ = group_state.fetchSub(GroupClosure.sync_is_waiting, .monotonic); - // This event.wait can return error.Canceled, in which case this logic does - // *not* propagate cancel requests to each group member. Instead, the user - // code will likely do this with a defered call to groupCancel, or, - // intentionally not do this. - if ((prev_state / GroupClosure.sync_one_pending) > 0) try event.wait(ioBasic(t)); + const pre_await_status = g.status().fetchOr(.{ + .num_running = 0, + .have_awaiter = true, + .canceled = false, + }, .acq_rel); // acquire results if complete; release `g.awaiter()` + + assert(!pre_await_status.have_awaiter); + assert(!pre_await_status.canceled); + if (pre_await_status.num_running == 0) { + // Already done. Since the group is finished, it's illegal to spawn more tasks in it + // until we return, so we can access `g.status()` non-atomically. + g.status().raw.have_awaiter = false; + return; } - // Since the group has now finished, it's illegal to add more tasks to it until we return. It's - // also illegal for us to race with another `await` or `cancel`. Therefore, we must be the only - // thread who can access `group` right now. - var it: ?*std.SinglyLinkedList.Node = @ptrCast(@alignCast(group.token.raw)); - group.token.raw = null; - while (it) |node| { - it = node.next; // update `it` now, because `deinit` will invalidate `node` - const gc: *GroupClosure = @fieldParentPtr("node", node); - gc.deinit(gpa); + while (thread.futexWait(&num_completed.raw, 0)) { + switch (num_completed.load(.acquire)) { // acquire task results + 0 => continue, + 1 => break, + else => unreachable, // group was reused before `await` returned + } + } else |err| switch (err) { + error.Canceled => { + const pre_cancel_status = g.status().fetchOr(.{ + .num_running = 0, + .have_awaiter = false, + .canceled = true, + }, .acq_rel); // acquire results if complete; release `g.awaiter()` + assert(pre_cancel_status.have_awaiter); + assert(!pre_cancel_status.canceled); + + // Even if `pre_cancel_status.num_running == 0`, we still need to wait for the signal, + // because in that case the last member of the group is already trying to modify it. + // However, if we know everything is done, we *can* skip signaling blocked threads. + const skip_signals = pre_cancel_status.num_running == 0; + g.waitForCancelWithSignaling(t, &num_completed, skip_signals); + + // The group is finished, so it's illegal to spawn more tasks in it until we return, so + // we can access `g.status()` non-atomically. + g.status().raw.canceled = false; + g.status().raw.have_awaiter = false; + return error.Canceled; + }, } + + // The group is finished, so it's illegal to spawn more tasks in it until we return, so + // we can access `g.status()` non-atomically. + g.status().raw.have_awaiter = false; } -fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, initial_token: *anyopaque) void { +fn groupCancel(userdata: ?*anyopaque, type_erased: *Io.Group, initial_token: *anyopaque) void { + _ = initial_token; const t: *Threaded = @ptrCast(@alignCast(userdata)); - const gpa = t.allocator; + const g: Group = .{ .ptr = type_erased }; - _ = initial_token; // we need to load `token` *after* the group finishes + var num_completed: std.atomic.Value(u32) = .init(0); + g.awaiter().* = &num_completed; - if (builtin.single_threaded) unreachable; // we never set `group.token` to non-`null` + const pre_cancel_status = g.status().fetchOr(.{ + .num_running = 0, + .have_awaiter = true, + .canceled = true, + }, .acq_rel); // acquire results if complete; release `g.awaiter()` - { - var it: ?*std.SinglyLinkedList.Node = @ptrCast(@alignCast(group.token.load(.monotonic))); - while (it) |node| : (it = node.next) { - const gc: *GroupClosure = @fieldParentPtr("node", node); - gc.closure.requestCancel(t); - } + assert(!pre_cancel_status.have_awaiter); + assert(!pre_cancel_status.canceled); + if (pre_cancel_status.num_running == 0) { + // Already done. Since the group is finished, it's illegal to spawn more tasks in it + // until we return, so we can access `g.status()` non-atomically. + g.status().raw.have_awaiter = false; + g.status().raw.canceled = false; + return; } - const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state); - const event: *Io.Event = @ptrCast(&group.context); - const prev_state = group_state.fetchAdd(GroupClosure.sync_is_waiting, .acquire); - assert(prev_state & GroupClosure.sync_is_waiting == 0); - if ((prev_state / GroupClosure.sync_one_pending) > 0) event.waitUncancelable(ioBasic(t)); + g.waitForCancelWithSignaling(t, &num_completed, false); - // Since the group has now finished, it's illegal to add more tasks to it until we return. It's - // also illegal for us to race with another `await` or `cancel`. Therefore, we must be the only - // thread who can access `group` right now. - var it: ?*std.SinglyLinkedList.Node = @ptrCast(@alignCast(group.token.raw)); - group.token.raw = null; - while (it) |node| { - it = node.next; // update `it` now, because `deinit` will invalidate `node` - const gc: *GroupClosure = @fieldParentPtr("node", node); - gc.deinit(gpa); - } + g.status().raw = .{ .num_running = 0, .have_awaiter = false, .canceled = false }; } fn recancel(userdata: ?*anyopaque) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); const current_thread: *Thread = .getCurrent(t); - const cancel_status = ¤t_thread.current_closure.?.cancel_status; - switch (@atomicLoad(CancelStatus, cancel_status, .monotonic)) { - .none => unreachable, // called `recancel` when not canceled - .requested => unreachable, // called `recancel` when cancelation was already outstanding - .acknowledged => {}, - _ => unreachable, // invalid state: not in a syscall + switch (current_thread.status.fetchXor(.{ + .cancelation = @enumFromInt(0b001), + .awaitable = .null, + }, .monotonic).cancelation) { + .canceled => {}, + .none => unreachable, // called `recancel` but was not canceled + .canceling => unreachable, // called `recancel` but cancelation was already pending + .parked => unreachable, + .blocked => unreachable, + .blocked_windows_dns => unreachable, + .blocked_canceling => unreachable, } - @atomicStore(CancelStatus, cancel_status, .requested, .monotonic); } fn swapCancelProtection(userdata: ?*anyopaque, new: Io.CancelProtection) Io.CancelProtection { @@ -1622,7 +1932,8 @@ fn swapCancelProtection(userdata: ?*anyopaque, new: Io.CancelProtection) Io.Canc fn checkCancel(userdata: ?*anyopaque) Io.Cancelable!void { const t: *Threaded = @ptrCast(@alignCast(userdata)); - return Thread.getCurrent(t).checkCancel(); + _ = t; + return Thread.checkCancel(); } fn await( @@ -1633,8 +1944,51 @@ fn await( ) void { _ = result_alignment; const t: *Threaded = @ptrCast(@alignCast(userdata)); - const closure: *AsyncClosure = @ptrCast(@alignCast(any_future)); - closure.waitAndDeinit(t, result); + const future: *Future = @ptrCast(@alignCast(any_future)); + const thread: *Thread = .getCurrent(t); + + var num_completed: std.atomic.Value(u32) = .init(0); + future.awaiter = &num_completed; + + const pre_await_status = future.status.fetchOr(.{ + .tag = .pending_awaited, + .thread = .null, + }, .acq_rel); // acquire results if complete; release `future.awaiter` + switch (pre_await_status.tag) { + .pending => while (thread.futexWait(&num_completed.raw, 0)) { + switch (num_completed.load(.acquire)) { // acquire task results + 0 => continue, + 1 => break, + else => unreachable, // group was reused before `await` returned + } + } else |err| switch (err) { + error.Canceled => { + const pre_cancel_status = future.status.fetchOr(.{ + .tag = .pending_canceled, + .thread = .null, + }, .acq_rel); // acquire results if complete; release `future.awaiter` + switch (pre_cancel_status.tag) { + .pending => unreachable, // invalid state: we already awaited + .pending_awaited => { + const working_thread = pre_cancel_status.thread.unpack(); + future.waitForCancelWithSignaling(t, &num_completed, @alignCast(working_thread)); + }, + .pending_canceled => unreachable, // `await` raced with `cancel` + .done => { + // The task just finished, but we still need to wait for the signal, because the + // task thread already figured out that they need to update `future.awaiter`. + future.waitForCancelWithSignaling(t, &num_completed, null); + }, + } + recancel(t); + }, + }, + .pending_awaited => unreachable, // `await` raced with `await` + .pending_canceled => unreachable, // `await` raced with `cancel` + .done => {}, + } + @memcpy(result, future.resultPointer()); + future.destroy(t.allocator); } fn cancel( @@ -1645,9 +1999,26 @@ fn cancel( ) void { _ = result_alignment; const t: *Threaded = @ptrCast(@alignCast(userdata)); - const ac: *AsyncClosure = @ptrCast(@alignCast(any_future)); - ac.closure.requestCancel(t); - ac.waitAndDeinit(t, result); + const future: *Future = @ptrCast(@alignCast(any_future)); + + var num_completed: std.atomic.Value(u32) = .init(0); + future.awaiter = &num_completed; + + const pre_cancel_status = future.status.fetchOr(.{ + .tag = .pending_canceled, + .thread = .null, + }, .acq_rel); // acquire results if complete; release `future.awaiter` + switch (pre_cancel_status.tag) { + .pending => { + const working_thread = pre_cancel_status.thread.unpack(); + future.waitForCancelWithSignaling(t, &num_completed, @alignCast(working_thread)); + }, + .pending_awaited => unreachable, // `await` raced with `await` + .pending_canceled => unreachable, // `await` raced with `cancel` + .done => {}, + } + @memcpy(result, future.resultPointer()); + future.destroy(t.allocator); } fn futexWait(userdata: ?*anyopaque, ptr: *const u32, expected: u32, timeout: Io.Timeout) Io.Cancelable!void { @@ -8555,32 +8926,69 @@ fn sleepPosix(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void { fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) Io.Cancelable!usize { const t: *Threaded = @ptrCast(@alignCast(userdata)); - var event: Io.Event = .unset; + var num_completed: std.atomic.Value(u32) = .init(0); - for (futures, 0..) |future, i| { - const closure: *AsyncClosure = @ptrCast(@alignCast(future)); - if (@atomicRmw(?*Io.Event, &closure.select_condition, .Xchg, &event, .seq_cst) == AsyncClosure.done_event) { - for (futures[0..i]) |cleanup_future| { - const cleanup_closure: *AsyncClosure = @ptrCast(@alignCast(cleanup_future)); - if (@atomicRmw(?*Io.Event, &cleanup_closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_event) { - cleanup_closure.event.waitUncancelable(ioBasic(t)); // Ensure no reference to our stack-allocated event. - } - } - return i; + for (futures, 0..) |any_future, i| { + const future: *Future = @ptrCast(@alignCast(any_future)); + future.awaiter = &num_completed; + const old_status = future.status.fetchOr( + .{ .tag = .pending_awaited, .thread = .null }, + .release, // release `future.awaiter` + ); + switch (old_status.tag) { + .pending => {}, + .pending_awaited => unreachable, // `await` raced with `select` + .pending_canceled => unreachable, // `cancel` raced with `select` + .done => { + future.status.store(old_status, .monotonic); + _ = finishSelect(&num_completed, futures[0..i]); + return i; + }, } } - try event.wait(ioBasic(t)); + errdefer _ = finishSelect(&num_completed, futures); + const thread: *Thread = .getCurrent(t); - var result: ?usize = null; - for (futures, 0..) |future, i| { - const closure: *AsyncClosure = @ptrCast(@alignCast(future)); - if (@atomicRmw(?*Io.Event, &closure.select_condition, .Xchg, null, .seq_cst) == AsyncClosure.done_event) { - closure.event.waitUncancelable(ioBasic(t)); // Ensure no reference to our stack-allocated event. - if (result == null) result = i; // In case multiple are ready, return first. + while (true) { + const n = num_completed.load(.acquire); + if (n > 0) break; + assert(n < futures.len); + try thread.futexWait(&num_completed.raw, n); + } + return finishSelect(&num_completed, futures).?; +} +fn finishSelect( + num_completed: *std.atomic.Value(u32), + futures: []const *Io.AnyFuture, +) ?usize { + var completed_index: ?usize = null; + var expect_completed: u32 = 0; + for (futures, 0..) |any_future, i| { + const future: *Future = @ptrCast(@alignCast(any_future)); + // This operation will convert `.pending_awaited` to `.pending`, or leave `.done` untouched. + switch (future.status.fetchAnd( + .{ .tag = @enumFromInt(0b10), .thread = .all_ones }, + .monotonic, + ).tag) { + .pending_awaited => {}, + .pending => unreachable, + .pending_canceled => unreachable, + .done => { + expect_completed += 1; + completed_index = i; + }, } } - return result.?; + // If any future has just finished, wait for it to signal `num_completed` to avoid dangling + // references to stack memory. + while (true) { + const n = num_completed.load(.acquire); + if (n == expect_completed) break; + assert(n < expect_completed); + Thread.futexWaitUncancelable(&num_completed.raw, n); + } + return completed_index; } fn netListenIpPosix(