std.Thread: remove ResetEvent and WaitGroup

* std.Thread.ResetEvent -> Io.Event
* std.Thread.WaitGroup -> Io.Group
This commit is contained in:
Andrew Kelley 2026-02-01 14:42:12 -08:00
parent 60ac4e78eb
commit b191e50be5
6 changed files with 226 additions and 355 deletions

View file

@ -410,7 +410,6 @@ set(ZIG_STAGE2_SOURCES
lib/std/Thread.zig
lib/std/Thread/Futex.zig
lib/std/Thread/Mutex.zig
lib/std/Thread/WaitGroup.zig
lib/std/array_hash_map.zig
lib/std/array_list.zig
lib/std/ascii.zig

View file

@ -35,7 +35,7 @@ run_queue: std.SinglyLinkedList = .{},
join_requested: bool = false,
stack_size: usize,
/// All threads are spawned detached; this is how we wait until they all exit.
wait_group: std.Thread.WaitGroup = .{},
wait_group: WaitGroup = .init,
async_limit: Io.Limit,
concurrent_limit: Io.Limit = .unlimited,
/// Error from calling `std.Thread.getCpuCount` in `init`.
@ -17987,3 +17987,62 @@ fn deviceIoControl(t: *Threaded, o: *const Io.Operation.DeviceIoControl) Io.Canc
}
}
}
const WaitGroup = struct {
state: std.atomic.Value(usize),
event: Io.Event,
const init: WaitGroup = .{ .state = .{ .raw = 0 }, .event = .unset };
const is_waiting: usize = 1 << 0;
const one_pending: usize = 1 << 1;
fn start(wg: *WaitGroup) void {
const prev_state = wg.state.fetchAdd(one_pending, .monotonic);
assert((prev_state / one_pending) < (std.math.maxInt(usize) / one_pending));
}
fn value(wg: *WaitGroup) usize {
return wg.state.load(.monotonic) / one_pending;
}
fn wait(wg: *WaitGroup) void {
const prev_state = wg.state.fetchAdd(is_waiting, .acquire);
assert(prev_state & is_waiting == 0);
if ((prev_state / one_pending) > 0) eventWait(&wg.event);
}
fn finish(wg: *WaitGroup) void {
const state = wg.state.fetchSub(one_pending, .acq_rel);
assert((state / one_pending) > 0);
if (state == (one_pending | is_waiting)) {
eventSet(&wg.event);
}
}
};
/// Same as `Io.Event.wait` but avoids the VTable.
fn eventWait(event: *Io.Event) void {
if (@cmpxchgStrong(Io.Event, event, .unset, .waiting, .acquire, .acquire)) |prev| switch (prev) {
.unset => unreachable,
.waiting => {},
.is_set => return,
};
while (true) {
Thread.futexWaitUncancelable(@ptrCast(event), @intFromEnum(Io.Event.waiting), null);
switch (@atomicLoad(Io.Event, event, .acquire)) {
.unset => unreachable, // `reset` called before pending `wait` returned
.waiting => continue,
.is_set => return,
}
}
}
/// Same as `Io.Event.set` but avoids the VTable.
fn eventSet(event: *Io.Event) void {
switch (@atomicRmw(Io.Event, event, .Xchg, .is_set, .release)) {
.unset, .is_set => {},
.waiting => Thread.futexWake(@ptrCast(event), std.math.maxInt(u32)),
}
}

View file

@ -716,3 +716,136 @@ test "read from a file using Batch.awaitAsync API" {
}
}
}
test "Event smoke test" {
const io = testing.io;
var event: Io.Event = .unset;
try testing.expectEqual(false, event.isSet());
// make sure the event gets set
event.set(io);
try testing.expectEqual(true, event.isSet());
// make sure the event gets unset again
event.reset();
try testing.expectEqual(false, event.isSet());
// waits should timeout as there's no other thread to set the event
try testing.expectError(error.Timeout, event.waitTimeout(io, .{ .duration = .{
.raw = .zero,
.clock = .awake,
} }));
try testing.expectError(error.Timeout, event.waitTimeout(io, .{ .duration = .{
.raw = .fromMilliseconds(1),
.clock = .awake,
} }));
// set the event again and make sure waits complete
event.set(io);
try event.wait(io);
try event.waitTimeout(io, .{ .duration = .{ .raw = .fromMilliseconds(1), .clock = .awake } });
try testing.expectEqual(true, event.isSet());
}
test "Event signaling" {
if (builtin.single_threaded) {
// This test requires spawning threads.
return error.SkipZigTest;
}
const io = testing.io;
const Context = struct {
in: Io.Event = .unset,
out: Io.Event = .unset,
value: usize = 0,
fn input(self: *@This()) !void {
// wait for the value to become 1
try self.in.wait(io);
self.in.reset();
try testing.expectEqual(self.value, 1);
// bump the value and wake up output()
self.value = 2;
self.out.set(io);
// wait for output to receive 2, bump the value and wake us up with 3
try self.in.wait(io);
self.in.reset();
try testing.expectEqual(self.value, 3);
// bump the value and wake up output() for it to see 4
self.value = 4;
self.out.set(io);
}
fn output(self: *@This()) !void {
// start with 0 and bump the value for input to see 1
try testing.expectEqual(self.value, 0);
self.value = 1;
self.in.set(io);
// wait for input to receive 1, bump the value to 2 and wake us up
try self.out.wait(io);
self.out.reset();
try testing.expectEqual(self.value, 2);
// bump the value to 3 for input to see (rhymes)
self.value = 3;
self.in.set(io);
// wait for input to bump the value to 4 and receive no more (rhymes)
try self.out.wait(io);
self.out.reset();
try testing.expectEqual(self.value, 4);
}
};
var ctx = Context{};
const thread = try std.Thread.spawn(.{}, Context.output, .{&ctx});
defer thread.join();
try ctx.input();
}
test "Event broadcast" {
if (builtin.single_threaded) {
// This test requires spawning threads.
return error.SkipZigTest;
}
const io = testing.io;
const num_threads = 10;
const Barrier = struct {
event: Io.Event = .unset,
counter: std.atomic.Value(usize) = std.atomic.Value(usize).init(num_threads),
fn wait(self: *@This()) void {
if (self.counter.fetchSub(1, .acq_rel) == 1) {
self.event.set(io);
}
}
};
const Context = struct {
start_barrier: Barrier = .{},
finish_barrier: Barrier = .{},
fn run(self: *@This()) void {
self.start_barrier.wait();
self.finish_barrier.wait();
}
};
var ctx = Context{};
var threads: [num_threads - 1]std.Thread = undefined;
for (&threads) |*t| t.* = try std.Thread.spawn(.{}, Context.run, .{&ctx});
defer for (threads) |t| t.join();
ctx.run();
}

View file

@ -19,129 +19,11 @@ pub const Mutex = @import("Thread/Mutex.zig");
pub const Semaphore = @import("Thread/Semaphore.zig");
pub const Condition = @import("Thread/Condition.zig");
pub const RwLock = @import("Thread/RwLock.zig");
pub const WaitGroup = @import("Thread/WaitGroup.zig");
pub const Pool = @compileError("deprecated; consider using 'std.Io.Group' with 'std.Io.Threaded'");
pub const use_pthreads = native_os != .windows and native_os != .wasi and builtin.link_libc;
/// A thread-safe logical boolean value which can be `set` and `unset`.
///
/// It can also block threads until the value is set with cancelation via timed
/// waits. Statically initializable; four bytes on all targets.
pub const ResetEvent = enum(u32) {
unset = 0,
waiting = 1,
is_set = 2,
/// Returns whether the logical boolean is `set`.
///
/// Once `reset` is called, this returns false until the next `set`.
///
/// The memory accesses before the `set` can be said to happen before
/// `isSet` returns true.
pub fn isSet(re: *const ResetEvent) bool {
if (builtin.single_threaded) return switch (re.*) {
.unset => false,
.waiting => unreachable,
.is_set => true,
};
// Acquire barrier ensures memory accesses before `set` happen before
// returning true.
return @atomicLoad(ResetEvent, re, .acquire) == .is_set;
}
/// Blocks the calling thread until `set` is called.
///
/// This is effectively a more efficient version of `while (!isSet()) {}`.
///
/// The memory accesses before the `set` can be said to happen before `wait` returns.
pub fn wait(re: *ResetEvent) void {
if (builtin.single_threaded) switch (re.*) {
.unset => unreachable, // Deadlock, no other threads to wake us up.
.waiting => unreachable, // Invalid state.
.is_set => return,
};
if (!re.isSet()) return timedWaitInner(re, null) catch |err| switch (err) {
error.Timeout => unreachable, // No timeout specified.
};
}
/// Blocks the calling thread until `set` is called, or until the
/// corresponding timeout expires, returning `error.Timeout`.
///
/// This is effectively a more efficient version of `while (!isSet()) {}`.
///
/// The memory accesses before the set() can be said to happen before
/// timedWait() returns without error.
pub fn timedWait(re: *ResetEvent, timeout_ns: u64) error{Timeout}!void {
if (builtin.single_threaded) switch (re.*) {
.unset => return error.Timeout,
.waiting => unreachable, // Invalid state.
.is_set => return,
};
if (!re.isSet()) return timedWaitInner(re, timeout_ns);
}
fn timedWaitInner(re: *ResetEvent, timeout: ?u64) error{Timeout}!void {
@branchHint(.cold);
// Try to set the state from `unset` to `waiting` to indicate to the
// `set` thread that others are blocked on the ResetEvent. Avoid using
// any strict barriers until we know the ResetEvent is set.
var state = @atomicLoad(ResetEvent, re, .acquire);
if (state == .unset) {
state = @cmpxchgStrong(ResetEvent, re, state, .waiting, .acquire, .acquire) orelse .waiting;
}
// Wait until the ResetEvent is set since the state is waiting.
if (state == .waiting) {
var futex_deadline = Futex.Deadline.init(timeout);
while (true) {
const wait_result = futex_deadline.wait(@ptrCast(re), @intFromEnum(ResetEvent.waiting));
// Check if the ResetEvent was set before possibly reporting error.Timeout below.
state = @atomicLoad(ResetEvent, re, .acquire);
if (state != .waiting) break;
try wait_result;
}
}
assert(state == .is_set);
}
/// Marks the logical boolean as `set` and unblocks any threads in `wait`
/// or `timedWait` to observe the new state.
///
/// The logical boolean stays `set` until `reset` is called, making future
/// `set` calls do nothing semantically.
///
/// The memory accesses before `set` can be said to happen before `isSet`
/// returns true or `wait`/`timedWait` return successfully.
pub fn set(re: *ResetEvent) void {
if (builtin.single_threaded) {
re.* = .is_set;
return;
}
if (@atomicRmw(ResetEvent, re, .Xchg, .is_set, .release) == .waiting) {
Futex.wake(@ptrCast(re), std.math.maxInt(u32));
}
}
/// Unmarks the ResetEvent as if `set` was never called.
///
/// Assumes no threads are blocked in `wait` or `timedWait`. Concurrent
/// calls to `set`, `isSet` and `reset` are allowed.
pub fn reset(re: *ResetEvent) void {
if (builtin.single_threaded) {
re.* = .unset;
return;
}
@atomicStore(ResetEvent, re, .unset, .monotonic);
}
};
const Thread = @This();
const Impl = if (native_os == .windows)
WindowsThreadImpl
@ -1676,16 +1558,16 @@ test "setName, getName" {
const io = testing.io;
const Context = struct {
start_wait_event: ResetEvent = .unset,
test_done_event: ResetEvent = .unset,
thread_done_event: ResetEvent = .unset,
start_wait_event: Io.Event = .unset,
test_done_event: Io.Event = .unset,
thread_done_event: Io.Event = .unset,
done: std.atomic.Value(bool) = std.atomic.Value(bool).init(false),
thread: Thread = undefined,
pub fn run(ctx: *@This()) !void {
// Wait for the main thread to have set the thread field in the context.
ctx.start_wait_event.wait();
try ctx.start_wait_event.wait(io);
switch (native_os) {
.windows => testThreadName(io, &ctx.thread) catch |err| switch (err) {
@ -1696,10 +1578,10 @@ test "setName, getName" {
}
// Signal our test is done
ctx.test_done_event.set();
ctx.test_done_event.set(io);
// wait for the thread to property exit
ctx.thread_done_event.wait();
try ctx.thread_done_event.wait(io);
}
};
@ -1707,8 +1589,8 @@ test "setName, getName" {
var thread = try spawn(.{}, Context.run, .{&context});
context.thread = thread;
context.start_wait_event.set();
context.test_done_event.wait();
context.start_wait_event.set(io);
try context.test_done_event.wait(io);
switch (native_os) {
.driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => {
@ -1722,31 +1604,32 @@ test "setName, getName" {
else => try testThreadName(io, &thread),
}
context.thread_done_event.set();
context.thread_done_event.set(io);
thread.join();
}
test {
_ = Futex;
_ = ResetEvent;
_ = Mutex;
_ = Semaphore;
_ = Condition;
_ = RwLock;
}
fn testIncrementNotify(value: *usize, event: *ResetEvent) void {
fn testIncrementNotify(io: Io, value: *usize, event: *Io.Event) void {
value.* += 1;
event.set();
event.set(io);
}
test join {
if (builtin.single_threaded) return error.SkipZigTest;
var value: usize = 0;
var event: ResetEvent = .unset;
const io = testing.io;
const thread = try Thread.spawn(.{}, testIncrementNotify, .{ &value, &event });
var value: usize = 0;
var event: Io.Event = .unset;
const thread = try Thread.spawn(.{}, testIncrementNotify, .{ io, &value, &event });
thread.join();
try std.testing.expectEqual(value, 1);
@ -1755,13 +1638,15 @@ test join {
test detach {
if (builtin.single_threaded) return error.SkipZigTest;
var value: usize = 0;
var event: ResetEvent = .unset;
const io = testing.io;
const thread = try Thread.spawn(.{}, testIncrementNotify, .{ &value, &event });
var value: usize = 0;
var event: Io.Event = .unset;
const thread = try Thread.spawn(.{}, testIncrementNotify, .{ io, &value, &event });
thread.detach();
event.wait();
try event.wait(io);
try std.testing.expectEqual(value, 1);
}
@ -1803,127 +1688,6 @@ fn testTls() !void {
if (x != 1235) return error.TlsBadEndValue;
}
test "ResetEvent smoke test" {
var event: ResetEvent = .unset;
try testing.expectEqual(false, event.isSet());
// make sure the event gets set
event.set();
try testing.expectEqual(true, event.isSet());
// make sure the event gets unset again
event.reset();
try testing.expectEqual(false, event.isSet());
// waits should timeout as there's no other thread to set the event
try testing.expectError(error.Timeout, event.timedWait(0));
try testing.expectError(error.Timeout, event.timedWait(std.time.ns_per_ms));
// set the event again and make sure waits complete
event.set();
event.wait();
try event.timedWait(std.time.ns_per_ms);
try testing.expectEqual(true, event.isSet());
}
test "ResetEvent signaling" {
// This test requires spawning threads
if (builtin.single_threaded) {
return error.SkipZigTest;
}
const Context = struct {
in: ResetEvent = .unset,
out: ResetEvent = .unset,
value: usize = 0,
fn input(self: *@This()) !void {
// wait for the value to become 1
self.in.wait();
self.in.reset();
try testing.expectEqual(self.value, 1);
// bump the value and wake up output()
self.value = 2;
self.out.set();
// wait for output to receive 2, bump the value and wake us up with 3
self.in.wait();
self.in.reset();
try testing.expectEqual(self.value, 3);
// bump the value and wake up output() for it to see 4
self.value = 4;
self.out.set();
}
fn output(self: *@This()) !void {
// start with 0 and bump the value for input to see 1
try testing.expectEqual(self.value, 0);
self.value = 1;
self.in.set();
// wait for input to receive 1, bump the value to 2 and wake us up
self.out.wait();
self.out.reset();
try testing.expectEqual(self.value, 2);
// bump the value to 3 for input to see (rhymes)
self.value = 3;
self.in.set();
// wait for input to bump the value to 4 and receive no more (rhymes)
self.out.wait();
self.out.reset();
try testing.expectEqual(self.value, 4);
}
};
var ctx = Context{};
const thread = try std.Thread.spawn(.{}, Context.output, .{&ctx});
defer thread.join();
try ctx.input();
}
test "ResetEvent broadcast" {
// This test requires spawning threads
if (builtin.single_threaded) {
return error.SkipZigTest;
}
const num_threads = 10;
const Barrier = struct {
event: ResetEvent = .unset,
counter: std.atomic.Value(usize) = std.atomic.Value(usize).init(num_threads),
fn wait(self: *@This()) void {
if (self.counter.fetchSub(1, .acq_rel) == 1) {
self.event.set();
}
}
};
const Context = struct {
start_barrier: Barrier = .{},
finish_barrier: Barrier = .{},
fn run(self: *@This()) void {
self.start_barrier.wait();
self.finish_barrier.wait();
}
};
var ctx = Context{};
var threads: [num_threads - 1]std.Thread = undefined;
for (&threads) |*t| t.* = try std.Thread.spawn(.{}, Context.run, .{&ctx});
defer for (threads) |t| t.join();
ctx.run();
}
/// Configures the per-thread alternative signal stack requested by `std.options.signal_stack_size`.
pub fn maybeAttachSignalStack() void {
const size = std.options.signal_stack_size orelse return;

View file

@ -1,87 +0,0 @@
const builtin = @import("builtin");
const std = @import("std");
const assert = std.debug.assert;
const WaitGroup = @This();
const is_waiting: usize = 1 << 0;
const one_pending: usize = 1 << 1;
state: std.atomic.Value(usize) = std.atomic.Value(usize).init(0),
event: std.Thread.ResetEvent = .unset,
pub fn start(self: *WaitGroup) void {
return startStateless(&self.state);
}
pub fn startStateless(state: *std.atomic.Value(usize)) void {
const prev_state = state.fetchAdd(one_pending, .monotonic);
assert((prev_state / one_pending) < (std.math.maxInt(usize) / one_pending));
}
pub fn startMany(self: *WaitGroup, n: usize) void {
const state = self.state.fetchAdd(one_pending * n, .monotonic);
assert((state / one_pending) < (std.math.maxInt(usize) / one_pending));
}
pub fn finish(self: *WaitGroup) void {
const state = self.state.fetchSub(one_pending, .acq_rel);
assert((state / one_pending) > 0);
if (state == (one_pending | is_waiting)) {
self.event.set();
}
}
pub fn finishStateless(state: *std.atomic.Value(usize), event: *std.Thread.ResetEvent) void {
const prev_state = state.fetchSub(one_pending, .acq_rel);
assert((prev_state / one_pending) > 0);
if (prev_state == (one_pending | is_waiting)) event.set();
}
pub fn wait(wg: *WaitGroup) void {
return waitStateless(&wg.state, &wg.event);
}
pub fn waitStateless(state: *std.atomic.Value(usize), event: *std.Thread.ResetEvent) void {
const prev_state = state.fetchAdd(is_waiting, .acquire);
assert(prev_state & is_waiting == 0);
if ((prev_state / one_pending) > 0) event.wait();
}
pub fn reset(self: *WaitGroup) void {
self.state.store(0, .monotonic);
self.event.reset();
}
pub fn isDone(wg: *WaitGroup) bool {
const state = wg.state.load(.acquire);
assert(state & is_waiting == 0);
return (state / one_pending) == 0;
}
pub fn value(wg: *WaitGroup) usize {
return wg.state.load(.monotonic) / one_pending;
}
// Spawns a new thread for the task. This is appropriate when the callee
// delegates all work.
pub fn spawnManager(
wg: *WaitGroup,
comptime func: anytype,
args: anytype,
) void {
if (builtin.single_threaded) {
@call(.auto, func, args);
return;
}
const Manager = struct {
fn run(wg_inner: *WaitGroup, args_inner: @TypeOf(args)) void {
defer wg_inner.finish();
@call(.auto, func, args_inner);
}
};
wg.start();
const t = std.Thread.spawn(.{}, Manager.run, .{ wg, args }) catch return Manager.run(wg, args);
t.detach();
}

View file

@ -1745,29 +1745,32 @@ test "open file with exclusive lock twice, make sure second lock waits" {
errdefer file.close(io);
const S = struct {
fn checkFn(inner_ctx: *TestContext, path: []const u8, started: *std.Thread.ResetEvent, locked: *std.Thread.ResetEvent) !void {
started.set();
fn checkFn(inner_ctx: *TestContext, path: []const u8, started: *Io.Event, locked: *Io.Event) !void {
started.set(inner_ctx.io);
const file1 = try inner_ctx.dir.createFile(inner_ctx.io, path, .{ .lock = .exclusive });
locked.set();
locked.set(inner_ctx.io);
file1.close(inner_ctx.io);
}
};
var started: std.Thread.ResetEvent = .unset;
var locked: std.Thread.ResetEvent = .unset;
var started: Io.Event = .unset;
var locked: Io.Event = .unset;
const t = try std.Thread.spawn(.{}, S.checkFn, .{ ctx, filename, &started, &locked });
defer t.join();
// Wait for the spawned thread to start trying to acquire the exclusive file lock.
// Then wait a bit to make sure that can't acquire it since we currently hold the file lock.
started.wait();
try expectError(error.Timeout, locked.timedWait(10 * std.time.ns_per_ms));
try started.wait(io);
try expectError(error.Timeout, locked.waitTimeout(io, .{ .duration = .{
.raw = .fromMilliseconds(10),
.clock = .awake,
} }));
// Release the file lock which should unlock the thread to lock it and set the locked event.
file.close(io);
locked.wait();
try locked.wait(io);
}
}.impl);
}