diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 60df1bdd81..048fcc415d 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -16873,29 +16873,37 @@ const parking_futex = struct { } const Bucket = struct { - /// Used as a fast check for `wake` to avoid having to acquire `mutex` to discover there are no - /// waiters. It is important for `wait` to increment this *before* checking the futex value to - /// avoid a race. - num_waiters: std.atomic.Value(u32), - /// Protects `waiters`. - mutex: Mutex, - waiters: std.DoublyLinkedList, + /// The alignment prevents false sharing between buckets. + waiters: [capacity]?*Waiter align(std.atomic.cache_line) = @splat(null), + const capacity = std.atomic.cache_line / @sizeOf(?*Waiter); - /// Prevent false sharing between buckets. - _: void align(std.atomic.cache_line) = {}, + /// Store the waiter into the bucket, atomic, lock-free. + fn add(b: *Bucket, w: *Waiter) void { + while (true) for (&b.waiters) |*slot| { + if (@cmpxchgWeak(?*Waiter, slot, null, w, .acq_rel, .monotonic) == null) { + return; + } + }; + } - const init: Bucket = .{ .num_waiters = .init(0), .mutex = .init, .waiters = .{} }; + /// Delete the waiter from the bucket, atomic, lock-free. + fn remove(b: *Bucket, w: *Waiter) void { + while (true) for (&b.waiters) |*slot| { + if (@cmpxchgWeak(?*Waiter, slot, w, null, .acq_rel, .monotonic) == null) { + return; + } + }; + } }; const Waiter = struct { - node: std.DoublyLinkedList.Node, + node: std.SinglyLinkedList.Node, address: usize, tid: std.Thread.Id, /// `thread_status.cancelation` is `.parked` while the thread is waiting. The single thread /// which atomically updates it (to `.none` or `.canceling`) is responsible for: /// /// * Removing the `Waiter` from `Bucket.waiters` - /// * Decrementing `Bucket.num_waiters` /// * Atomically setting `done` (after this, the `Waiter` may go out of scope at any time, /// so must not be referenced again) /// * Unparking the thread (last, so that the unparked thread definitely sees `done`) @@ -16911,7 +16919,7 @@ const parking_futex = struct { /// between different futexes. This length seems like it'll provide a reasonable balance /// between contention and memory usage: assuming a 128-byte `Bucket` (due to cache line /// alignment), this uses 32 KiB of memory. - var buckets: [256]Bucket = @splat(.init); + var buckets: [256]Bucket = @splat(.{}); }; // Here we use Fibonacci hashing: the golden ratio can be used to evenly redistribute input @@ -16947,16 +16955,7 @@ const parking_futex = struct { var status_buf: std.atomic.Value(Thread.Status) = undefined; { - mutexLockUncancelable(&bucket.mutex); - defer mutexUnlock(&bucket.mutex); - - _ = bucket.num_waiters.fetchAdd(1, .acquire); - - if (@atomicLoad(u32, ptr, .monotonic) != expect) { - assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0); - return; - } - + if (@atomicLoad(u32, ptr, .monotonic) != expect) return; // This is in the critical section to avoid marking the thread as parked until we're // certain that we're actually going to park. waiter.thread_status = status: { @@ -16974,11 +16973,7 @@ const parking_futex = struct { ); switch (old_status.cancelation) { .none => {}, // status is now `.parked` - .canceling => { - // status is now `.canceled` - assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0); - return error.Canceled; - }, + .canceling => return error.Canceled, // status is now `.canceled` .canceled => break :cancelable, // status is still `.canceled` .parked => unreachable, .blocked => unreachable, @@ -16996,7 +16991,7 @@ const parking_futex = struct { break :status &status_buf; }; - bucket.waiters.append(&waiter.node); + bucket.add(&waiter); } const deadline: ?Io.Clock.Timestamp = switch (timeout) { @@ -17017,10 +17012,7 @@ const parking_futex = struct { .parked => { // We saw a timeout and updated our own status from `.parked` to `.none`. It is // our responsibility to remove `waiter` from `bucket`. - mutexLockUncancelable(&bucket.mutex); - defer mutexUnlock(&bucket.mutex); - bucket.waiters.remove(&waiter.node); - assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0); + bucket.remove(&waiter); }, .none, .canceling => { // Race condition: the timeout was reached, then `wake` or a cancelation tried @@ -17046,25 +17038,16 @@ const parking_futex = struct { const bucket = bucketForAddress(@intFromPtr(ptr)); - // To ensure the store to `ptr` is ordered before this check, we effectively want a `.release` - // load, but that doesn't exist in the C11 memory model, so emulate it with a non-mutating rmw. - if (bucket.num_waiters.fetchAdd(0, .release) == 0) { - @branchHint(.likely); - return; // no waiters - } - // Waiters removed from the linked list under the mutex so we can unpark their threads outside // of the critical section. This forms a singly-linked list of waiters using `Waiter.node.next`. - var waking_head: ?*std.DoublyLinkedList.Node = null; + var waking_head: ?*std.SinglyLinkedList.Node = null; { - mutexLockUncancelable(&bucket.mutex); - defer mutexUnlock(&bucket.mutex); - var num_removed: u32 = 0; - var it = bucket.waiters.first; - while (num_removed < max_waiters) { - const waiter: *Waiter = @fieldParentPtr("node", it orelse break); - it = waiter.node.next; + var i: usize = 0; + while (num_removed < max_waiters) : (i += 1) { + const waiter: *Waiter = while (bucket.waiters.len - i != 0) : (i += 1) { + break @atomicLoad(?*Waiter, &bucket.waiters[i], .monotonic) orelse continue; + } else break; if (waiter.address != @intFromPtr(ptr)) continue; const old_status = waiter.thread_status.fetchAnd( .{ .cancelation = @enumFromInt(0b110), .awaitable = .all_ones }, @@ -17072,7 +17055,7 @@ const parking_futex = struct { ); switch (old_status.cancelation) { .parked => {}, // state updated to `.none` - .none => continue, // race with timeout; they are about to lock `bucket.mutex` and remove themselves from the bucket + .none => continue, // race with timeout; they are about to remove themselves from the bucket .canceling => continue, // race with a canceler who hasn't called `removeCanceledWaiter` yet .canceled => unreachable, .blocked => unreachable, @@ -17081,13 +17064,11 @@ const parking_futex = struct { .blocked_canceling => unreachable, } // We're waking this waiter. Remove them from the bucket and add them to our local list. - bucket.waiters.remove(&waiter.node); + @atomicStore(?*Waiter, &bucket.waiters[i], null, .release); waiter.node.next = waking_head; waking_head = &waiter.node; num_removed += 1; } - - _ = bucket.num_waiters.fetchSub(num_removed, .monotonic); } var unpark_buf: [128]UnparkTid = undefined; @@ -17113,10 +17094,7 @@ const parking_futex = struct { fn removeCanceledWaiter(waiter: *Waiter) void { const bucket = bucketForAddress(waiter.address); - mutexLockUncancelable(&bucket.mutex); - defer mutexUnlock(&bucket.mutex); - bucket.waiters.remove(&waiter.node); - assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0); + bucket.remove(waiter); waiter.done.store(true, .release); // potentially invalidates `waiter.*` } };