std.Io.Threaded: sever dependency on std.Thread Mutex and Condition

This commit is contained in:
Andrew Kelley 2026-02-01 16:13:51 -08:00
parent c2d4806d65
commit eb74e23e7b

View file

@ -29,8 +29,8 @@ const ws2_32 = std.os.windows.ws2_32;
/// * scanning environment variables on some targets
/// * memory-mapping when mmap or equivalent is not available
allocator: Allocator,
mutex: std.Thread.Mutex = .{},
cond: std.Thread.Condition = .{},
mutex: Io.Mutex = .init,
cond: Io.Condition = .init,
run_queue: std.SinglyLinkedList = .{},
join_requested: bool = false,
stack_size: usize,
@ -1486,8 +1486,8 @@ var global_single_threaded_instance: Threaded = .init_single_threaded;
pub const global_single_threaded: *Threaded = &global_single_threaded_instance;
pub fn setAsyncLimit(t: *Threaded, new_limit: Io.Limit) void {
t.mutex.lock();
defer t.mutex.unlock();
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
t.async_limit = new_limit;
}
@ -1508,11 +1508,11 @@ pub fn deinit(t: *Threaded) void {
fn join(t: *Threaded) void {
if (builtin.single_threaded) return;
{
t.mutex.lock();
defer t.mutex.unlock();
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
t.join_requested = true;
}
t.cond.broadcast();
condBroadcast(&t.cond);
t.wait_group.wait();
}
@ -1574,20 +1574,20 @@ fn worker(t: *Threaded) void {
defer t.wait_group.finish();
t.mutex.lock();
defer t.mutex.unlock();
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
while (true) {
while (t.run_queue.popFirst()) |runnable_node| {
t.mutex.unlock();
mutexUnlock(&t.mutex);
thread.cancel_protection = .unblocked;
const runnable: *Runnable = @fieldParentPtr("node", runnable_node);
runnable.startFn(runnable, &thread, t);
t.mutex.lock();
mutexLockUncancelable(&t.mutex);
t.busy_count -= 1;
}
if (t.join_requested) break;
t.cond.wait(&t.mutex);
condWait(&t.cond, &t.mutex);
}
}
@ -2004,12 +2004,12 @@ fn async(
},
};
t.mutex.lock();
mutexLockUncancelable(&t.mutex);
const busy_count = t.busy_count;
if (busy_count >= @intFromEnum(t.async_limit)) {
t.mutex.unlock();
mutexUnlock(&t.mutex);
future.destroy(gpa);
start(context.ptr, result.ptr);
return null;
@ -2023,7 +2023,7 @@ fn async(
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
t.wait_group.finish();
t.busy_count = busy_count;
t.mutex.unlock();
mutexUnlock(&t.mutex);
future.destroy(gpa);
start(context.ptr, result.ptr);
return null;
@ -2033,8 +2033,8 @@ fn async(
t.run_queue.prepend(&future.runnable.node);
t.mutex.unlock();
t.cond.signal();
mutexUnlock(&t.mutex);
condSignal(&t.cond);
return @ptrCast(future);
}
@ -2056,8 +2056,8 @@ fn concurrent(
};
errdefer future.destroy(gpa);
t.mutex.lock();
defer t.mutex.unlock();
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
const busy_count = t.busy_count;
@ -2080,7 +2080,7 @@ fn concurrent(
t.run_queue.prepend(&future.runnable.node);
t.cond.signal();
condSignal(&t.cond);
return @ptrCast(future);
}
@ -2101,12 +2101,12 @@ fn groupAsync(
error.OutOfMemory => return groupAsyncEager(start, context.ptr),
};
t.mutex.lock();
mutexLockUncancelable(&t.mutex);
const busy_count = t.busy_count;
if (busy_count >= @intFromEnum(t.async_limit)) {
t.mutex.unlock();
mutexUnlock(&t.mutex);
task.destroy(gpa);
return groupAsyncEager(start, context.ptr);
}
@ -2119,7 +2119,7 @@ fn groupAsync(
const thread = std.Thread.spawn(.{ .stack_size = t.stack_size }, worker, .{t}) catch {
t.wait_group.finish();
t.busy_count = busy_count;
t.mutex.unlock();
mutexUnlock(&t.mutex);
task.destroy(gpa);
return groupAsyncEager(start, context.ptr);
};
@ -2136,8 +2136,8 @@ fn groupAsync(
}, .monotonic);
t.run_queue.prepend(&task.runnable.node);
t.mutex.unlock();
t.cond.signal();
mutexUnlock(&t.mutex);
condSignal(&t.cond);
}
fn groupAsyncEager(
start: *const fn (context: *const anyopaque) Io.Cancelable!void,
@ -2201,8 +2201,8 @@ fn groupConcurrent(
};
errdefer task.destroy(gpa);
t.mutex.lock();
defer t.mutex.unlock();
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
const busy_count = t.busy_count;
@ -2233,7 +2233,7 @@ fn groupConcurrent(
}, .monotonic);
t.run_queue.prepend(&task.runnable.node);
t.cond.signal();
condSignal(&t.cond);
}
fn groupAwait(userdata: ?*anyopaque, type_erased: *Io.Group, initial_token: *anyopaque) Io.Cancelable!void {
@ -3838,8 +3838,8 @@ fn fileStatWindows(userdata: ?*anyopaque, file: File) File.StatError!File.Stat {
fn systemBasicInformation(t: *Threaded) ?*const windows.SYSTEM_BASIC_INFORMATION {
if (!t.system_basic_information.initialized.load(.acquire)) {
t.mutex.lock();
defer t.mutex.unlock();
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
switch (windows.ntdll.NtQuerySystemInformation(
.SystemBasicInformation,
@ -14299,10 +14299,9 @@ const Wsa = struct {
};
fn initializeWsa(t: *Threaded) error{ NetworkDown, Canceled }!void {
const t_io = io(t);
const wsa = &t.wsa;
try wsa.mutex.lock(t_io);
defer wsa.mutex.unlock(t_io);
try mutexLock(&wsa.mutex);
defer mutexUnlock(&wsa.mutex);
switch (wsa.status) {
.uninitialized => {
var wsa_data: ws2_32.WSADATA = undefined;
@ -14373,8 +14372,8 @@ const WindowsEnvironStrings = struct {
};
fn scanEnviron(t: *Threaded) void {
t.mutex.lock();
defer t.mutex.unlock();
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
if (t.environ.initialized) return;
t.environ.initialized = true;
@ -14729,8 +14728,8 @@ fn spawnPosix(t: *Threaded, options: process.SpawnOptions) process.SpawnError!Sp
fn getDevNullFd(t: *Threaded) !posix.fd_t {
{
t.mutex.lock();
defer t.mutex.unlock();
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
if (t.null_file.fd != -1) return t.null_file.fd;
}
const mode: u32 = 0;
@ -14741,8 +14740,8 @@ fn getDevNullFd(t: *Threaded) !posix.fd_t {
.SUCCESS => {
syscall.finish();
const fresh_fd: posix.fd_t = @intCast(rc);
t.mutex.lock(); // Another thread might have won the race.
defer t.mutex.unlock();
mutexLockUncancelable(&t.mutex); // Another thread might have won the race.
defer mutexUnlock(&t.mutex);
if (t.null_file.fd != -1) {
posix.close(fresh_fd);
return t.null_file.fd;
@ -15402,8 +15401,8 @@ fn processSpawnWindows(userdata: ?*anyopaque, options: process.SpawnOptions) pro
fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE {
{
t.mutex.lock();
defer t.mutex.unlock();
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
if (t.random_file.handle) |handle| return handle;
}
@ -15437,8 +15436,8 @@ fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE {
)) {
.SUCCESS => {
syscall.finish();
t.mutex.lock(); // Another thread might have won the race.
defer t.mutex.unlock();
mutexLockUncancelable(&t.mutex); // Another thread might have won the race.
defer mutexUnlock(&t.mutex);
if (t.random_file.handle) |prev_handle| {
windows.CloseHandle(fresh_handle);
return prev_handle;
@ -15458,8 +15457,8 @@ fn getCngHandle(t: *Threaded) Io.RandomSecureError!windows.HANDLE {
fn getNulHandle(t: *Threaded) !windows.HANDLE {
{
t.mutex.lock();
defer t.mutex.unlock();
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
if (t.null_file.handle) |handle| return handle;
}
@ -15505,8 +15504,8 @@ fn getNulHandle(t: *Threaded) !windows.HANDLE {
)) {
.SUCCESS => {
syscall.finish();
t.mutex.lock(); // Another thread might have won the race.
defer t.mutex.unlock();
mutexLockUncancelable(&t.mutex); // Another thread might have won the race.
defer mutexUnlock(&t.mutex);
if (t.null_file.handle) |prev_handle| {
windows.CloseHandle(fresh_handle);
return prev_handle;
@ -16551,15 +16550,15 @@ fn random(userdata: ?*anyopaque, buffer: []u8) void {
}
fn randomMainThread(t: *Threaded, buffer: []u8) void {
t.mutex.lock();
defer t.mutex.unlock();
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
if (!t.csprng.isInitialized()) {
@branchHint(.unlikely);
var seed: [Csprng.seed_len]u8 = undefined;
{
t.mutex.unlock();
defer t.mutex.lock();
mutexUnlock(&t.mutex);
defer mutexLockUncancelable(&t.mutex);
const prev = swapCancelProtection(t, .blocked);
defer _ = swapCancelProtection(t, prev);
@ -16744,8 +16743,8 @@ fn randomSecure(userdata: ?*anyopaque, buffer: []u8) Io.RandomSecureError!void {
fn getRandomFd(t: *Threaded) Io.RandomSecureError!posix.fd_t {
{
t.mutex.lock();
defer t.mutex.unlock();
mutexLockUncancelable(&t.mutex);
defer mutexUnlock(&t.mutex);
if (t.random_file.fd == -2) return error.EntropyUnavailable;
if (t.random_file.fd != -1) return t.random_file.fd;
@ -16785,8 +16784,8 @@ fn getRandomFd(t: *Threaded) Io.RandomSecureError!posix.fd_t {
.SUCCESS => {
syscall.finish();
if (!statx.mask.TYPE) return error.EntropyUnavailable;
t.mutex.lock(); // Another thread might have won the race.
defer t.mutex.unlock();
mutexLockUncancelable(&t.mutex); // Another thread might have won the race.
defer mutexUnlock(&t.mutex);
if (t.random_file.fd >= 0) {
posix.close(fd);
return t.random_file.fd;
@ -16813,8 +16812,8 @@ fn getRandomFd(t: *Threaded) Io.RandomSecureError!posix.fd_t {
switch (posix.errno(fstat_sym(fd, &stat))) {
.SUCCESS => {
syscall.finish();
t.mutex.lock(); // Another thread might have won the race.
defer t.mutex.unlock();
mutexLockUncancelable(&t.mutex); // Another thread might have won the race.
defer mutexUnlock(&t.mutex);
if (t.random_file.fd >= 0) {
posix.close(fd);
return t.random_file.fd;
@ -16878,13 +16877,13 @@ const parking_futex = struct {
/// avoid a race.
num_waiters: std.atomic.Value(u32),
/// Protects `waiters`.
mutex: std.Thread.Mutex,
mutex: Io.Mutex,
waiters: std.DoublyLinkedList,
/// Prevent false sharing between buckets.
_: void align(std.atomic.cache_line) = {},
const init: Bucket = .{ .num_waiters = .init(0), .mutex = .{}, .waiters = .{} };
const init: Bucket = .{ .num_waiters = .init(0), .mutex = .init, .waiters = .{} };
};
const Waiter = struct {
@ -16947,8 +16946,8 @@ const parking_futex = struct {
var status_buf: std.atomic.Value(Thread.Status) = undefined;
{
bucket.mutex.lock();
defer bucket.mutex.unlock();
mutexLockUncancelable(&bucket.mutex);
defer mutexUnlock(&bucket.mutex);
_ = bucket.num_waiters.fetchAdd(1, .acquire);
@ -17017,8 +17016,8 @@ const parking_futex = struct {
.parked => {
// We saw a timeout and updated our own status from `.parked` to `.none`. It is
// our responsibility to remove `waiter` from `bucket`.
bucket.mutex.lock();
defer bucket.mutex.unlock();
mutexLockUncancelable(&bucket.mutex);
defer mutexUnlock(&bucket.mutex);
bucket.waiters.remove(&waiter.node);
assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0);
},
@ -17057,8 +17056,8 @@ const parking_futex = struct {
// of the critical section. This forms a singly-linked list of waiters using `Waiter.node.next`.
var waking_head: ?*std.DoublyLinkedList.Node = null;
{
bucket.mutex.lock();
defer bucket.mutex.unlock();
mutexLockUncancelable(&bucket.mutex);
defer mutexUnlock(&bucket.mutex);
var num_removed: u32 = 0;
var it = bucket.waiters.first;
@ -17113,8 +17112,8 @@ const parking_futex = struct {
fn removeCanceledWaiter(waiter: *Waiter) void {
const bucket = bucketForAddress(waiter.address);
bucket.mutex.lock();
defer bucket.mutex.unlock();
mutexLockUncancelable(&bucket.mutex);
defer mutexUnlock(&bucket.mutex);
bucket.waiters.remove(&waiter.node);
assert(bucket.num_waiters.fetchSub(1, .monotonic) > 0);
waiter.done.store(true, .release); // potentially invalidates `waiter.*`
@ -18102,3 +18101,123 @@ fn eventSet(event: *Io.Event) void {
.waiting => Thread.futexWake(@ptrCast(event), std.math.maxInt(u32)),
}
}
/// Same as `Io.Condition.broadcast` but avoids the VTable.
fn condBroadcast(cond: *Io.Condition) void {
var prev_state = cond.state.load(.monotonic);
while (prev_state.waiters > prev_state.signals) {
@branchHint(.unlikely);
prev_state = cond.state.cmpxchgWeak(prev_state, .{
.waiters = prev_state.waiters,
.signals = prev_state.waiters,
}, .release, .monotonic) orelse {
// Update the epoch to tell the waiting threads that there are new signals for them.
// Note that a waiting thread could miss a take if *exactly* (1<<32)-1 wakes happen
// between it observing the epoch and sleeping on it, but this is extraordinarily
// unlikely due to the precise number of calls required.
_ = cond.epoch.fetchAdd(1, .release); // `.release` to ensure ordered after `state` update
Thread.futexWake(&cond.epoch.raw, prev_state.waiters - prev_state.signals);
return;
};
}
}
/// Same as `Io.Condition.signal` but avoids the VTable.
fn condSignal(cond: *Io.Condition) void {
var prev_state = cond.state.load(.monotonic);
while (prev_state.waiters > prev_state.signals) {
@branchHint(.unlikely);
prev_state = cond.state.cmpxchgWeak(prev_state, .{
.waiters = prev_state.waiters,
.signals = prev_state.signals + 1,
}, .release, .monotonic) orelse {
// Update the epoch to tell the waiting threads that there are new signals for them.
// Note that a waiting thread could miss a take if *exactly* (1<<32)-1 wakes happen
// between it observing the epoch and sleeping on it, but this is extraordinarily
// unlikely due to the precise number of calls required.
_ = cond.epoch.fetchAdd(1, .release); // `.release` to ensure ordered after `state` update
Thread.futexWake(&cond.epoch.raw, 1);
return;
};
}
}
/// Same as `Io.Condition.waitUncancelable` but avoids the VTable.
fn condWait(cond: *Io.Condition, mutex: *Io.Mutex) void {
var epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before state load
{
const prev_state = cond.state.fetchAdd(.{ .waiters = 1, .signals = 0 }, .monotonic);
assert(prev_state.waiters < std.math.maxInt(u16)); // overflow caused by too many waiters
}
mutexUnlock(mutex);
defer mutexLockUncancelable(mutex);
while (true) {
Thread.futexWaitUncancelable(&cond.epoch.raw, epoch, null);
epoch = cond.epoch.load(.acquire); // `.acquire` to ensure ordered before `state` laod
var prev_state = cond.state.load(.monotonic);
while (prev_state.signals > 0) {
prev_state = cond.state.cmpxchgWeak(prev_state, .{
.waiters = prev_state.waiters - 1,
.signals = prev_state.signals - 1,
}, .acquire, .monotonic) orelse {
// We successfully consumed a signal.
return;
};
}
}
}
/// Same as `Io.Mutex.lockUncancelable` but avoids the VTable.
fn mutexLock(m: *Io.Mutex) Io.Cancelable!void {
const initial_state = m.state.cmpxchgWeak(
.unlocked,
.locked_once,
.acquire,
.monotonic,
) orelse {
@branchHint(.likely);
return;
};
if (initial_state == .contended) {
try Thread.futexWait(@ptrCast(&m.state.raw), @intFromEnum(Io.Mutex.State.contended), null);
}
while (m.state.swap(.contended, .acquire) != .unlocked) {
try Thread.futexWait(@ptrCast(&m.state.raw), @intFromEnum(Io.Mutex.State.contended), null);
}
}
/// Same as `Io.Mutex.lockUncancelable` but avoids the VTable.
fn mutexLockUncancelable(m: *Io.Mutex) void {
const initial_state = m.state.cmpxchgWeak(
.unlocked,
.locked_once,
.acquire,
.monotonic,
) orelse {
@branchHint(.likely);
return;
};
if (initial_state == .contended) {
Thread.futexWaitUncancelable(@ptrCast(&m.state.raw), @intFromEnum(Io.Mutex.State.contended), null);
}
while (m.state.swap(.contended, .acquire) != .unlocked) {
Thread.futexWaitUncancelable(@ptrCast(&m.state.raw), @intFromEnum(Io.Mutex.State.contended), null);
}
}
/// Same as `Io.Mutex.unlock` but avoids the VTable.
fn mutexUnlock(m: *Io.Mutex) void {
switch (m.state.swap(.unlocked, .release)) {
.unlocked => unreachable,
.locked_once => {},
.contended => {
@branchHint(.unlikely);
Thread.futexWake(@ptrCast(&m.state.raw), 1);
},
}
}