std.Io: introduce futex primitives

Co-authored-by: Andrew Kelley <andrew@ziglang.org>
This commit is contained in:
Matthew Lugg 2025-12-20 13:05:35 +00:00
parent 330e295bc4
commit 6ed5b62050
No known key found for this signature in database
GPG key ID: 3F5B7DCCBF4AF02E
2 changed files with 305 additions and 262 deletions

View file

@ -654,6 +654,10 @@ pub const VTable = struct {
/// that awaiting it will not block. Returns that index.
select: *const fn (?*anyopaque, futures: []const *AnyFuture) Cancelable!usize,
futexWait: *const fn (?*anyopaque, ptr: *const u32, expected: u32, Timeout) Cancelable!void,
futexWaitUncancelable: *const fn (?*anyopaque, ptr: *const u32, expected: u32) void,
futexWake: *const fn (?*anyopaque, ptr: *const u32, max_waiters: u32) void,
mutexLock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) Cancelable!void,
mutexLockUncancelable: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) void,
mutexUnlock: *const fn (?*anyopaque, prev_state: Mutex.State, mutex: *Mutex) void,
@ -1174,6 +1178,42 @@ pub fn Select(comptime U: type) type {
};
}
/// Atomically checks if the value at `ptr` equals `expected`, and if so, blocks until either:
///
/// * a matching (same `ptr` argument) `futexWake` call occurs, or
/// * a spurious ("random") wakeup occurs.
///
/// Typically, `futexWake` should be called immediately after updating the value at `ptr.*`, to
/// unblock tasks using `futexWait` to wait for the value to change from what it previously was.
///
/// The caller is responsible for identifying spurious wakeups if necessary, typically by checking
/// the value at `ptr.*`.
///
/// Asserts that `T` is 4 bytes in length and has a well-defined layout with no padding bits.
pub fn futexWait(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) const T, expected: T) Cancelable!void {
return futexWaitTimeout(io, T, ptr, expected, .none);
}
/// Same as `futexWait`, except also unblocks if `timeout` expires. As with `futexWait`, spurious
/// wakeups are possible. It remains the caller's responsibility to differentiate between these
/// three possible wake-up reasons if necessary.
pub fn futexWaitTimeout(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) const T, expected: T, timeout: Timeout) Cancelable!void {
comptime assert(@sizeOf(T) == 4);
const expected_raw: *align(1) const u32 = @ptrCast(&expected);
return io.vtable.futexWait(io.userdata, @ptrCast(ptr), expected_raw.*, timeout);
}
/// Same as `futexWait`, except is not affected by task cancelation.
pub fn futexWaitUncancelable(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) const T, expected: T) void {
comptime assert(@sizeOf(T) == @sizeOf(u32));
const expected_raw: *align(1) const u32 = @ptrCast(&expected);
io.vtable.futexWaitUncancelable(io.userdata, @ptrCast(ptr), expected_raw.*);
}
/// Unblocks pending futex waits on `ptr`, up to a limit of `max_waiters` calls.
pub fn futexWake(io: Io, comptime T: type, ptr: *align(@alignOf(u32)) const T, max_waiters: u32) void {
comptime assert(@sizeOf(T) == @sizeOf(u32));
if (max_waiters == 0) return;
return io.vtable.futexWake(io.userdata, @ptrCast(ptr), max_waiters);
}
pub const Mutex = struct {
state: State,

View file

@ -155,6 +155,220 @@ const Thread = struct {
fn currentSignalId() SignaleeId {
return if (std.Thread.use_pthreads) std.c.pthread_self() else std.Thread.getCurrentId();
}
fn futexWaitUncancelable(ptr: *const u32, expect: u32) void {
return Thread.futexWaitTimed(null, ptr, expect, null) catch unreachable;
}
fn futexWait(thread: *Thread, ptr: *const u32, expect: u32) Io.Cancelable!void {
return Thread.futexWaitTimed(thread, ptr, expect, null) catch |err| switch (err) {
error.Canceled => return error.Canceled,
error.Timeout => unreachable,
};
}
fn futexWaitTimed(thread: ?*Thread, ptr: *const u32, expect: u32, timeout_ns: ?u64) Io.Cancelable!void {
@branchHint(.cold);
if (builtin.single_threaded) unreachable; // nobody would ever wake us
if (builtin.cpu.arch.isWasm()) {
comptime assert(builtin.cpu.has(.wasm, .atomics));
if (thread) |t| try t.checkCancel();
const to: i64 = if (timeout_ns) |ns| ns else -1;
const signed_expect: i32 = @bitCast(expect);
const result = asm volatile (
\\local.get %[ptr]
\\local.get %[expected]
\\local.get %[timeout]
\\memory.atomic.wait32 0
\\local.set %[ret]
: [ret] "=r" (-> u32),
: [ptr] "r" (ptr),
[expected] "r" (signed_expect),
[timeout] "r" (to),
);
switch (result) {
0 => {}, // ok
1 => {}, // expected != loaded
2 => {}, // timeout
else => assert(!is_debug),
}
} else switch (native_os) {
.linux => {
const linux = std.os.linux;
var ts_buffer: linux.timespec = undefined;
const ts: ?*linux.timespec = if (timeout_ns) |ns| ts: {
ts_buffer = timestampToPosix(ns);
break :ts &ts_buffer;
} else null;
if (thread) |t| try t.beginSyscall();
const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, ts);
if (thread) |t| t.endSyscall();
switch (linux.errno(rc)) {
.SUCCESS => {}, // notified by `wake()`
.INTR => {}, // caller's responsibility to retry
.AGAIN => {}, // ptr.* != expect
.INVAL => {}, // possibly timeout overflow
.TIMEDOUT => {}, // timeout
.FAULT => recoverableOsBugDetected(), // ptr was invalid
else => recoverableOsBugDetected(),
}
},
.driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => {
const c = std.c;
const flags: c.UL = .{
.op = .COMPARE_AND_WAIT,
.NO_ERRNO = true,
};
if (thread) |t| try t.beginSyscall();
const status = switch (darwin_supports_ulock_wait2) {
true => c.__ulock_wait2(flags, ptr, expect, ns: {
const ns = timeout_ns orelse break :ns 0;
if (ns == 0) break :ns 1;
break :ns ns;
}, 0),
false => c.__ulock_wait(flags, ptr, expect, us: {
const ns = timeout_ns orelse break :us 0;
const us = std.math.lossyCast(u32, ns / std.time.ns_per_us);
if (us == 0) break :us 1;
break :us us;
}),
};
if (thread) |t| t.endSyscall();
if (status >= 0) return;
switch (@as(c.E, @enumFromInt(-status))) {
.INTR => {}, // spurious wake
// Address of the futex was paged out. This is unlikely, but possible in theory, and
// pthread/libdispatch on darwin bother to handle it. In this case we'll return
// without waiting, but the caller should retry anyway.
.FAULT => {},
.TIMEDOUT => {}, // timeout
else => recoverableOsBugDetected(),
}
},
.windows => {
var timeout_value: windows.LARGE_INTEGER = undefined;
var timeout_ptr: ?*const windows.LARGE_INTEGER = null;
// NTDLL functions work with time in units of 100 nanoseconds.
// Positive values are absolute deadlines while negative values are relative durations.
if (timeout_ns) |delay| {
timeout_value = @as(windows.LARGE_INTEGER, @intCast(delay / 100));
timeout_value = -timeout_value;
timeout_ptr = &timeout_value;
}
if (thread) |t| try t.checkCancel();
switch (windows.ntdll.RtlWaitOnAddress(ptr, &expect, @sizeOf(@TypeOf(expect)), timeout_ptr)) {
.SUCCESS => {},
.CANCELLED => {},
.TIMEOUT => {}, // timeout
else => recoverableOsBugDetected(),
}
},
.freebsd => {
const flags = @intFromEnum(std.c.UMTX_OP.WAIT_UINT_PRIVATE);
var tm_size: usize = 0;
var tm: std.c._umtx_time = undefined;
var tm_ptr: ?*const std.c._umtx_time = null;
if (timeout_ns) |ns| {
tm_ptr = &tm;
tm_size = @sizeOf(@TypeOf(tm));
tm.flags = 0; // use relative time not UMTX_ABSTIME
tm.clockid = .MONOTONIC;
tm.timeout = timestampToPosix(ns);
}
if (thread) |t| try t.beginSyscall();
const rc = std.c._umtx_op(@intFromPtr(ptr), flags, @as(c_ulong, expect), tm_size, @intFromPtr(tm_ptr));
if (thread) |t| t.endSyscall();
if (is_debug) switch (posix.errno(rc)) {
.SUCCESS => {},
.FAULT => unreachable, // one of the args points to invalid memory
.INVAL => unreachable, // arguments should be correct
.TIMEDOUT => {}, // timeout
.INTR => {}, // spurious wake
else => unreachable,
};
},
else => @compileError("unimplemented: futexWait"),
}
}
fn futexWake(ptr: *const u32, max_waiters: u32) void {
@branchHint(.cold);
if (builtin.single_threaded) return; // nothing to wake up
if (builtin.cpu.arch.isWasm()) {
comptime assert(builtin.cpu.has(.wasm, .atomics));
assert(max_waiters != 0);
const woken_count = asm volatile (
\\local.get %[ptr]
\\local.get %[waiters]
\\memory.atomic.notify 0
\\local.set %[ret]
: [ret] "=r" (-> u32),
: [ptr] "r" (ptr),
[waiters] "r" (max_waiters),
);
_ = woken_count; // can be 0 when linker flag 'shared-memory' is not enabled
} else switch (native_os) {
.linux => {
const linux = std.os.linux;
switch (linux.errno(linux.futex_3arg(
ptr,
.{ .cmd = .WAKE, .private = true },
@min(max_waiters, std.math.maxInt(i32)),
))) {
.SUCCESS => return, // successful wake up
.INVAL => return, // invalid futex_wait() on ptr done elsewhere
.FAULT => return, // pointer became invalid while doing the wake
else => return recoverableOsBugDetected(), // deadlock due to operating system bug
}
},
.driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => {
const c = std.c;
const flags: c.UL = .{
.op = .COMPARE_AND_WAIT,
.NO_ERRNO = true,
.WAKE_ALL = max_waiters > 1,
};
while (true) {
const status = c.__ulock_wake(flags, ptr, 0);
if (status >= 0) return;
switch (@as(c.E, @enumFromInt(-status))) {
.INTR, .CANCELED => continue, // spurious wake()
.FAULT => unreachable, // __ulock_wake doesn't generate EFAULT according to darwin pthread_cond_t
.NOENT => return, // nothing was woken up
.ALREADY => unreachable, // only for UL.Op.WAKE_THREAD
else => unreachable, // deadlock due to operating system bug
}
}
},
.windows => {
assert(max_waiters != 0);
switch (max_waiters) {
1 => windows.ntdll.RtlWakeAddressSingle(ptr),
else => windows.ntdll.RtlWakeAddressAll(ptr),
}
},
.freebsd => {
const rc = std.c._umtx_op(
@intFromPtr(ptr),
@intFromEnum(std.c.UMTX_OP.WAKE_PRIVATE),
@as(c_ulong, max_waiters),
0, // there is no timeout struct
0, // there is no timeout struct pointer
);
switch (posix.errno(rc)) {
.SUCCESS => {},
.FAULT => {}, // it's ok if the ptr doesn't point to valid memory
.INVAL => unreachable, // arguments should be correct
else => unreachable, // deadlock due to operating system bug
}
},
else => @compileError("unimplemented: futexWake"),
}
}
};
const max_iovecs_len = 8;
@ -403,6 +617,10 @@ pub fn io(t: *Threaded) Io {
.groupWait = groupWait,
.groupCancel = groupCancel,
.futexWait = futexWait,
.futexWaitUncancelable = futexWaitUncancelable,
.futexWake = futexWake,
.mutexLock = mutexLock,
.mutexLockUncancelable = mutexLockUncancelable,
.mutexUnlock = mutexUnlock,
@ -499,6 +717,10 @@ pub fn ioBasic(t: *Threaded) Io {
.groupWait = groupWait,
.groupCancel = groupCancel,
.futexWait = futexWait,
.futexWaitUncancelable = futexWaitUncancelable,
.futexWake = futexWake,
.mutexLock = mutexLock,
.mutexLockUncancelable = mutexLockUncancelable,
.mutexUnlock = mutexUnlock,
@ -1020,6 +1242,38 @@ fn cancel(
ac.waitAndDeinit(t, result);
}
fn futexWait(userdata: ?*anyopaque, ptr: *const u32, expected: u32, timeout: Io.Timeout) Io.Cancelable!void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const current_thread = Thread.getCurrent(t);
const t_io = ioBasic(t);
const timeout_ns: ?u64 = ns: {
const d = (timeout.toDurationFromNow(t_io) catch break :ns 10) orelse break :ns null;
break :ns std.math.lossyCast(u64, d.raw.toNanoseconds());
};
switch (native_os) {
.illumos, .netbsd, .openbsd => @panic("TODO"),
else => try current_thread.futexWaitTimed(ptr, expected, timeout_ns),
}
}
fn futexWaitUncancelable(userdata: ?*anyopaque, ptr: *const u32, expected: u32) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
switch (native_os) {
.illumos, .netbsd, .openbsd => @panic("TODO"),
else => Thread.futexWaitUncancelable(ptr, expected),
}
}
fn futexWake(userdata: ?*anyopaque, ptr: *const u32, max_waiters: u32) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
_ = t;
switch (native_os) {
.illumos, .netbsd, .openbsd => @panic("TODO"),
else => Thread.futexWake(ptr, max_waiters),
}
}
fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex) Io.Cancelable!void {
if (builtin.single_threaded) unreachable; // Interface should have prevented this.
if (native_os == .netbsd) @panic("TODO");
@ -1027,10 +1281,10 @@ fn mutexLock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mutex
const t: *Threaded = @ptrCast(@alignCast(userdata));
const current_thread = Thread.getCurrent(t);
if (prev_state == .contended) {
try futexWait(current_thread, @ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
try current_thread.futexWait(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
}
while (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .contended, .acquire) != .unlocked) {
try futexWait(current_thread, @ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
try current_thread.futexWait(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
}
}
@ -1040,10 +1294,10 @@ fn mutexLockUncancelable(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mute
if (native_os == .openbsd) @panic("TODO");
_ = userdata;
if (prev_state == .contended) {
futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
Thread.futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
}
while (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .contended, .acquire) != .unlocked) {
futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
Thread.futexWaitUncancelable(@ptrCast(&mutex.state), @intFromEnum(Io.Mutex.State.contended));
}
}
@ -1054,7 +1308,7 @@ fn mutexUnlock(userdata: ?*anyopaque, prev_state: Io.Mutex.State, mutex: *Io.Mut
_ = userdata;
_ = prev_state;
if (@atomicRmw(Io.Mutex.State, &mutex.state, .Xchg, .unlocked, .release) == .contended) {
futexWake(@ptrCast(&mutex.state), 1);
Thread.futexWake(@ptrCast(&mutex.state), 1);
}
}
@ -1081,7 +1335,7 @@ fn conditionWaitUncancelable(userdata: ?*anyopaque, cond: *Io.Condition, mutex:
defer mutex.lockUncancelable(t_io);
while (true) {
futexWaitUncancelable(cond_epoch, epoch);
Thread.futexWaitUncancelable(@ptrCast(cond_epoch), epoch);
epoch = cond_epoch.load(.acquire);
state = cond_state.load(.monotonic);
while (state & signal_mask != 0) {
@ -1125,7 +1379,7 @@ fn conditionWait(userdata: ?*anyopaque, cond: *Io.Condition, mutex: *Io.Mutex) I
defer mutex.lockUncancelable(t_io);
while (true) {
try futexWait(current_thread, cond_epoch, epoch);
try current_thread.futexWait(@ptrCast(cond_epoch), epoch);
epoch = cond_epoch.load(.acquire);
state = cond_state.load(.monotonic);
@ -1198,7 +1452,7 @@ fn conditionWake(userdata: ?*anyopaque, cond: *Io.Condition, wake: Io.Condition.
_ = cond_epoch.fetchAdd(1, .release);
if (native_os == .netbsd) @panic("TODO");
if (native_os == .openbsd) @panic("TODO");
futexWake(cond_epoch, to_wake);
Thread.futexWake(@ptrCast(cond_epoch), to_wake);
return;
};
}
@ -6612,257 +6866,6 @@ fn copyCanon(canonical_name_buffer: *[HostName.max_len]u8, name: []const u8) Hos
/// ulock_wait2() uses 64-bit nano-second timeouts (with the same convention)
const darwin_supports_ulock_wait2 = builtin.os.version_range.semver.min.major >= 11;
fn futexWait(current_thread: *Thread, ptr: *const std.atomic.Value(u32), expect: u32) Io.Cancelable!void {
@branchHint(.cold);
if (builtin.cpu.arch.isWasm()) {
comptime assert(builtin.cpu.has(.wasm, .atomics));
try current_thread.checkCancel();
const timeout: i64 = -1;
const signed_expect: i32 = @bitCast(expect);
const result = asm volatile (
\\local.get %[ptr]
\\local.get %[expected]
\\local.get %[timeout]
\\memory.atomic.wait32 0
\\local.set %[ret]
: [ret] "=r" (-> u32),
: [ptr] "r" (&ptr.raw),
[expected] "r" (signed_expect),
[timeout] "r" (timeout),
);
switch (result) {
0 => {}, // ok
1 => {}, // expected != loaded
2 => assert(!is_debug), // timeout
else => assert(!is_debug),
}
} else switch (native_os) {
.linux => {
const linux = std.os.linux;
try current_thread.beginSyscall();
const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, null);
current_thread.endSyscall();
switch (linux.errno(rc)) {
.SUCCESS => {}, // notified by `wake()`
.INTR => {}, // caller's responsibility to retry
.AGAIN => {}, // ptr.* != expect
.INVAL => {}, // possibly timeout overflow
.TIMEDOUT => recoverableOsBugDetected(),
.FAULT => recoverableOsBugDetected(), // ptr was invalid
else => recoverableOsBugDetected(),
}
},
.driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => {
const c = std.c;
const flags: c.UL = .{
.op = .COMPARE_AND_WAIT,
.NO_ERRNO = true,
};
try current_thread.beginSyscall();
const status = if (darwin_supports_ulock_wait2)
c.__ulock_wait2(flags, ptr, expect, 0, 0)
else
c.__ulock_wait(flags, ptr, expect, 0);
current_thread.endSyscall();
if (status >= 0) return;
if (is_debug) switch (@as(c.E, @enumFromInt(-status))) {
.INTR => {}, // spurious wake
// Address of the futex was paged out. This is unlikely, but possible in theory, and
// pthread/libdispatch on darwin bother to handle it. In this case we'll return
// without waiting, but the caller should retry anyway.
.FAULT => {},
.TIMEDOUT => unreachable,
else => unreachable,
};
},
.windows => {
try current_thread.checkCancel();
switch (windows.ntdll.RtlWaitOnAddress(ptr, &expect, @sizeOf(@TypeOf(expect)), null)) {
.SUCCESS => {},
.CANCELLED => return error.Canceled,
else => recoverableOsBugDetected(),
}
},
.freebsd => {
const flags = @intFromEnum(std.c.UMTX_OP.WAIT_UINT_PRIVATE);
try current_thread.beginSyscall();
const rc = std.c._umtx_op(@intFromPtr(&ptr.raw), flags, @as(c_ulong, expect), 0, 0);
current_thread.endSyscall();
if (is_debug) switch (posix.errno(rc)) {
.SUCCESS => {},
.FAULT => unreachable, // one of the args points to invalid memory
.INVAL => unreachable, // arguments should be correct
.TIMEDOUT => unreachable, // no timeout provided
.INTR => {}, // spurious wake
else => unreachable,
};
},
else => @compileError("unimplemented: futexWait"),
}
}
pub fn futexWaitUncancelable(ptr: *const std.atomic.Value(u32), expect: u32) void {
@branchHint(.cold);
if (builtin.cpu.arch.isWasm()) {
comptime assert(builtin.cpu.has(.wasm, .atomics));
const timeout: i64 = -1;
const signed_expect: i32 = @bitCast(expect);
const result = asm volatile (
\\local.get %[ptr]
\\local.get %[expected]
\\local.get %[timeout]
\\memory.atomic.wait32 0
\\local.set %[ret]
: [ret] "=r" (-> u32),
: [ptr] "r" (&ptr.raw),
[expected] "r" (signed_expect),
[timeout] "r" (timeout),
);
switch (result) {
0 => {}, // ok
1 => {}, // expected != loaded
2 => recoverableOsBugDetected(), // timeout
else => recoverableOsBugDetected(),
}
} else switch (native_os) {
.linux => {
const linux = std.os.linux;
const rc = linux.futex_4arg(ptr, .{ .cmd = .WAIT, .private = true }, expect, null);
switch (linux.errno(rc)) {
.SUCCESS => {}, // notified by `wake()`
.INTR => {}, // caller's responsibility to repeat
.AGAIN => {}, // ptr.* != expect
.INVAL => {}, // possibly timeout overflow
.TIMEDOUT => recoverableOsBugDetected(),
.FAULT => recoverableOsBugDetected(), // ptr was invalid
else => recoverableOsBugDetected(),
}
},
.driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => {
const c = std.c;
const flags: c.UL = .{
.op = .COMPARE_AND_WAIT,
.NO_ERRNO = true,
};
const status = if (darwin_supports_ulock_wait2)
c.__ulock_wait2(flags, ptr, expect, 0, 0)
else
c.__ulock_wait(flags, ptr, expect, 0);
if (status >= 0) return;
switch (@as(c.E, @enumFromInt(-status))) {
// Wait was interrupted by the OS or other spurious signalling.
.INTR => {},
// Address of the futex was paged out. This is unlikely, but possible in theory, and
// pthread/libdispatch on darwin bother to handle it. In this case we'll return
// without waiting, but the caller should retry anyway.
.FAULT => {},
.TIMEDOUT => recoverableOsBugDetected(),
else => recoverableOsBugDetected(),
}
},
.windows => {
switch (windows.ntdll.RtlWaitOnAddress(ptr, &expect, @sizeOf(@TypeOf(expect)), null)) {
.SUCCESS, .CANCELLED => {},
else => recoverableOsBugDetected(),
}
},
.freebsd => {
const flags = @intFromEnum(std.c.UMTX_OP.WAIT_UINT_PRIVATE);
const rc = std.c._umtx_op(@intFromPtr(&ptr.raw), flags, @as(c_ulong, expect), 0, 0);
switch (posix.errno(rc)) {
.SUCCESS => {},
.INTR => {}, // spurious wake
.FAULT => recoverableOsBugDetected(), // one of the args points to invalid memory
.INVAL => recoverableOsBugDetected(), // arguments should be correct
.TIMEDOUT => recoverableOsBugDetected(), // no timeout provided
else => recoverableOsBugDetected(),
}
},
else => @compileError("unimplemented: futexWaitUncancelable"),
}
}
pub fn futexWake(ptr: *const std.atomic.Value(u32), max_waiters: u32) void {
@branchHint(.cold);
if (builtin.cpu.arch.isWasm()) {
comptime assert(builtin.cpu.has(.wasm, .atomics));
assert(max_waiters != 0);
const woken_count = asm volatile (
\\local.get %[ptr]
\\local.get %[waiters]
\\memory.atomic.notify 0
\\local.set %[ret]
: [ret] "=r" (-> u32),
: [ptr] "r" (&ptr.raw),
[waiters] "r" (max_waiters),
);
_ = woken_count; // can be 0 when linker flag 'shared-memory' is not enabled
} else switch (native_os) {
.linux => {
const linux = std.os.linux;
switch (linux.errno(linux.futex_3arg(
&ptr.raw,
.{ .cmd = .WAKE, .private = true },
@min(max_waiters, std.math.maxInt(i32)),
))) {
.SUCCESS => return, // successful wake up
.INVAL => return, // invalid futex_wait() on ptr done elsewhere
.FAULT => return, // pointer became invalid while doing the wake
else => return recoverableOsBugDetected(), // deadlock due to operating system bug
}
},
.driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => {
const c = std.c;
const flags: c.UL = .{
.op = .COMPARE_AND_WAIT,
.NO_ERRNO = true,
.WAKE_ALL = max_waiters > 1,
};
while (true) {
const status = c.__ulock_wake(flags, ptr, 0);
if (status >= 0) return;
switch (@as(c.E, @enumFromInt(-status))) {
.INTR, .CANCELED => continue, // spurious wake()
.FAULT => unreachable, // __ulock_wake doesn't generate EFAULT according to darwin pthread_cond_t
.NOENT => return, // nothing was woken up
.ALREADY => unreachable, // only for UL.Op.WAKE_THREAD
else => unreachable, // deadlock due to operating system bug
}
}
},
.windows => {
assert(max_waiters != 0);
switch (max_waiters) {
1 => windows.ntdll.RtlWakeAddressSingle(ptr),
else => windows.ntdll.RtlWakeAddressAll(ptr),
}
},
.freebsd => {
const rc = std.c._umtx_op(
@intFromPtr(&ptr.raw),
@intFromEnum(std.c.UMTX_OP.WAKE_PRIVATE),
@as(c_ulong, max_waiters),
0, // there is no timeout struct
0, // there is no timeout struct pointer
);
switch (posix.errno(rc)) {
.SUCCESS => {},
.FAULT => {}, // it's ok if the ptr doesn't point to valid memory
.INVAL => unreachable, // arguments should be correct
else => unreachable, // deadlock due to operating system bug
}
},
else => @compileError("unimplemented: futexWake"),
}
}
/// A thread-safe logical boolean value which can be `set` and `unset`.
///
/// It can also block threads until the value is set with cancelation via timed
@ -6919,7 +6922,7 @@ const ResetEventFutex = enum(u32) {
}
const current_thread = Thread.getCurrent(t);
while (state == .waiting) {
try futexWait(current_thread, @ptrCast(ref), @intFromEnum(ResetEventFutex.waiting));
try current_thread.futexWait(@ptrCast(ref), @intFromEnum(ResetEventFutex.waiting));
state = @atomicLoad(ResetEventFutex, ref, .acquire);
}
assert(state == .is_set);
@ -6944,7 +6947,7 @@ const ResetEventFutex = enum(u32) {
state = @cmpxchgStrong(ResetEventFutex, ref, state, .waiting, .acquire, .acquire) orelse .waiting;
}
while (state == .waiting) {
futexWaitUncancelable(@ptrCast(ref), @intFromEnum(ResetEventFutex.waiting));
Thread.futexWaitUncancelable(@ptrCast(ref), @intFromEnum(ResetEventFutex.waiting));
state = @atomicLoad(ResetEventFutex, ref, .acquire);
}
assert(state == .is_set);
@ -6964,7 +6967,7 @@ const ResetEventFutex = enum(u32) {
return;
}
if (@atomicRmw(ResetEventFutex, ref, .Xchg, .is_set, .release) == .waiting) {
futexWake(@ptrCast(ref), std.math.maxInt(u32));
Thread.futexWake(@ptrCast(ref), std.math.maxInt(u32));
}
}