mirror of
https://codeberg.org/ziglang/zig.git
synced 2026-03-08 01:24:49 +01:00
std.Io.Threaded: make parking_futex lock-free
And therefore no longer depend on a mutex API. The idea here is to rely on cache line operations being very fast. Buckets span exactly one cache line each, storing only waiter pointers. Additions and removals do linear weak cmpxchg scan over the cache line, repeating until sucess. If there are more parked threads with bucket hash collisions than fits into a cache line then wait() degrades into a spin lock.
This commit is contained in:
parent
e9eadee006
commit
2bc89a5198
1 changed files with 34 additions and 56 deletions
|
|
@ -16873,29 +16873,37 @@ const parking_futex = struct {
|
|||
}
|
||||
|
||||
const Bucket = struct {
|
||||
/// Used as a fast check for `wake` to avoid having to acquire `mutex` to discover there are no
|
||||
/// waiters. It is important for `wait` to increment this *before* checking the futex value to
|
||||
/// avoid a race.
|
||||
num_waiters: std.atomic.Value(u32),
|
||||
/// Protects `waiters`.
|
||||
mutex: Mutex,
|
||||
waiters: std.DoublyLinkedList,
|
||||
/// The alignment prevents false sharing between buckets.
|
||||
waiters: [capacity]?*Waiter align(std.atomic.cache_line) = @splat(null),
|
||||
const capacity = std.atomic.cache_line / @sizeOf(?*Waiter);
|
||||
|
||||
/// Prevent false sharing between buckets.
|
||||
_: void align(std.atomic.cache_line) = {},
|
||||
/// Store the waiter into the bucket, atomic, lock-free.
|
||||
fn add(b: *Bucket, w: *Waiter) void {
|
||||
while (true) for (&b.waiters) |*slot| {
|
||||
if (@cmpxchgWeak(?*Waiter, slot, null, w, .acq_rel, .monotonic) == null) {
|
||||
return;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
const init: Bucket = .{ .num_waiters = .init(0), .mutex = .init, .waiters = .{} };
|
||||
/// Delete the waiter from the bucket, atomic, lock-free.
|
||||
fn remove(b: *Bucket, w: *Waiter) void {
|
||||
while (true) for (&b.waiters) |*slot| {
|
||||
if (@cmpxchgWeak(?*Waiter, slot, w, null, .acq_rel, .monotonic) == null) {
|
||||
return;
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
|
||||
const Waiter = struct {
|
||||
node: std.DoublyLinkedList.Node,
|
||||
node: std.SinglyLinkedList.Node,
|
||||
address: usize,
|
||||
tid: std.Thread.Id,
|
||||
/// `thread_status.cancelation` is `.parked` while the thread is waiting. The single thread
|
||||
/// which atomically updates it (to `.none` or `.canceling`) is responsible for:
|
||||
///
|
||||
/// * Removing the `Waiter` from `Bucket.waiters`
|
||||
/// * Decrementing `Bucket.num_waiters`
|
||||
/// * Atomically setting `done` (after this, the `Waiter` may go out of scope at any time,
|
||||
/// so must not be referenced again)
|
||||
/// * Unparking the thread (last, so that the unparked thread definitely sees `done`)
|
||||
|
|
@ -16911,7 +16919,7 @@ const parking_futex = struct {
|
|||
/// between different futexes. This length seems like it'll provide a reasonable balance
|
||||
/// between contention and memory usage: assuming a 128-byte `Bucket` (due to cache line
|
||||
/// alignment), this uses 32 KiB of memory.
|
||||
var buckets: [256]Bucket = @splat(.init);
|
||||
var buckets: [256]Bucket = @splat(.{});
|
||||
};
|
||||
|
||||
// Here we use Fibonacci hashing: the golden ratio can be used to evenly redistribute input
|
||||
|
|
@ -16947,16 +16955,7 @@ const parking_futex = struct {
|
|||
var status_buf: std.atomic.Value(Thread.Status) = undefined;
|
||||
|
||||
{
|
||||
mutexLockUncancelable(&bucket.mutex);
|
||||
defer mutexUnlock(&bucket.mutex);
|
||||
|
||||
_ = bucket.num_waiters.fetchAdd(1, .acquire);
|
||||
|
||||
if (@atomicLoad(u32, ptr, .monotonic) != expect) {
|
||||
assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0);
|
||||
return;
|
||||
}
|
||||
|
||||
if (@atomicLoad(u32, ptr, .monotonic) != expect) return;
|
||||
// This is in the critical section to avoid marking the thread as parked until we're
|
||||
// certain that we're actually going to park.
|
||||
waiter.thread_status = status: {
|
||||
|
|
@ -16974,11 +16973,7 @@ const parking_futex = struct {
|
|||
);
|
||||
switch (old_status.cancelation) {
|
||||
.none => {}, // status is now `.parked`
|
||||
.canceling => {
|
||||
// status is now `.canceled`
|
||||
assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0);
|
||||
return error.Canceled;
|
||||
},
|
||||
.canceling => return error.Canceled, // status is now `.canceled`
|
||||
.canceled => break :cancelable, // status is still `.canceled`
|
||||
.parked => unreachable,
|
||||
.blocked => unreachable,
|
||||
|
|
@ -16996,7 +16991,7 @@ const parking_futex = struct {
|
|||
break :status &status_buf;
|
||||
};
|
||||
|
||||
bucket.waiters.append(&waiter.node);
|
||||
bucket.add(&waiter);
|
||||
}
|
||||
|
||||
const deadline: ?Io.Clock.Timestamp = switch (timeout) {
|
||||
|
|
@ -17017,10 +17012,7 @@ const parking_futex = struct {
|
|||
.parked => {
|
||||
// We saw a timeout and updated our own status from `.parked` to `.none`. It is
|
||||
// our responsibility to remove `waiter` from `bucket`.
|
||||
mutexLockUncancelable(&bucket.mutex);
|
||||
defer mutexUnlock(&bucket.mutex);
|
||||
bucket.waiters.remove(&waiter.node);
|
||||
assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0);
|
||||
bucket.remove(&waiter);
|
||||
},
|
||||
.none, .canceling => {
|
||||
// Race condition: the timeout was reached, then `wake` or a cancelation tried
|
||||
|
|
@ -17046,25 +17038,16 @@ const parking_futex = struct {
|
|||
|
||||
const bucket = bucketForAddress(@intFromPtr(ptr));
|
||||
|
||||
// To ensure the store to `ptr` is ordered before this check, we effectively want a `.release`
|
||||
// load, but that doesn't exist in the C11 memory model, so emulate it with a non-mutating rmw.
|
||||
if (bucket.num_waiters.fetchAdd(0, .release) == 0) {
|
||||
@branchHint(.likely);
|
||||
return; // no waiters
|
||||
}
|
||||
|
||||
// Waiters removed from the linked list under the mutex so we can unpark their threads outside
|
||||
// of the critical section. This forms a singly-linked list of waiters using `Waiter.node.next`.
|
||||
var waking_head: ?*std.DoublyLinkedList.Node = null;
|
||||
var waking_head: ?*std.SinglyLinkedList.Node = null;
|
||||
{
|
||||
mutexLockUncancelable(&bucket.mutex);
|
||||
defer mutexUnlock(&bucket.mutex);
|
||||
|
||||
var num_removed: u32 = 0;
|
||||
var it = bucket.waiters.first;
|
||||
while (num_removed < max_waiters) {
|
||||
const waiter: *Waiter = @fieldParentPtr("node", it orelse break);
|
||||
it = waiter.node.next;
|
||||
var i: usize = 0;
|
||||
while (num_removed < max_waiters) : (i += 1) {
|
||||
const waiter: *Waiter = while (bucket.waiters.len - i != 0) : (i += 1) {
|
||||
break @atomicLoad(?*Waiter, &bucket.waiters[i], .monotonic) orelse continue;
|
||||
} else break;
|
||||
if (waiter.address != @intFromPtr(ptr)) continue;
|
||||
const old_status = waiter.thread_status.fetchAnd(
|
||||
.{ .cancelation = @enumFromInt(0b110), .awaitable = .all_ones },
|
||||
|
|
@ -17072,7 +17055,7 @@ const parking_futex = struct {
|
|||
);
|
||||
switch (old_status.cancelation) {
|
||||
.parked => {}, // state updated to `.none`
|
||||
.none => continue, // race with timeout; they are about to lock `bucket.mutex` and remove themselves from the bucket
|
||||
.none => continue, // race with timeout; they are about to remove themselves from the bucket
|
||||
.canceling => continue, // race with a canceler who hasn't called `removeCanceledWaiter` yet
|
||||
.canceled => unreachable,
|
||||
.blocked => unreachable,
|
||||
|
|
@ -17081,13 +17064,11 @@ const parking_futex = struct {
|
|||
.blocked_canceling => unreachable,
|
||||
}
|
||||
// We're waking this waiter. Remove them from the bucket and add them to our local list.
|
||||
bucket.waiters.remove(&waiter.node);
|
||||
@atomicStore(?*Waiter, &bucket.waiters[i], null, .release);
|
||||
waiter.node.next = waking_head;
|
||||
waking_head = &waiter.node;
|
||||
num_removed += 1;
|
||||
}
|
||||
|
||||
_ = bucket.num_waiters.fetchSub(num_removed, .monotonic);
|
||||
}
|
||||
|
||||
var unpark_buf: [128]UnparkTid = undefined;
|
||||
|
|
@ -17113,10 +17094,7 @@ const parking_futex = struct {
|
|||
|
||||
fn removeCanceledWaiter(waiter: *Waiter) void {
|
||||
const bucket = bucketForAddress(waiter.address);
|
||||
mutexLockUncancelable(&bucket.mutex);
|
||||
defer mutexUnlock(&bucket.mutex);
|
||||
bucket.waiters.remove(&waiter.node);
|
||||
assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0);
|
||||
bucket.remove(waiter);
|
||||
waiter.done.store(true, .release); // potentially invalidates `waiter.*`
|
||||
}
|
||||
};
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue