diff --git a/CMakeLists.txt b/CMakeLists.txt index fe8660559c..05fb9c4805 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -410,7 +410,6 @@ set(ZIG_STAGE2_SOURCES lib/std/Thread.zig lib/std/Thread/Futex.zig lib/std/Thread/Mutex.zig - lib/std/Thread/WaitGroup.zig lib/std/array_hash_map.zig lib/std/array_list.zig lib/std/ascii.zig diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 0feec928fc..46ff3a7f38 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -35,7 +35,7 @@ run_queue: std.SinglyLinkedList = .{}, join_requested: bool = false, stack_size: usize, /// All threads are spawned detached; this is how we wait until they all exit. -wait_group: std.Thread.WaitGroup = .{}, +wait_group: WaitGroup = .init, async_limit: Io.Limit, concurrent_limit: Io.Limit = .unlimited, /// Error from calling `std.Thread.getCpuCount` in `init`. @@ -17987,3 +17987,62 @@ fn deviceIoControl(t: *Threaded, o: *const Io.Operation.DeviceIoControl) Io.Canc } } } + +const WaitGroup = struct { + state: std.atomic.Value(usize), + event: Io.Event, + + const init: WaitGroup = .{ .state = .{ .raw = 0 }, .event = .unset }; + + const is_waiting: usize = 1 << 0; + const one_pending: usize = 1 << 1; + + fn start(wg: *WaitGroup) void { + const prev_state = wg.state.fetchAdd(one_pending, .monotonic); + assert((prev_state / one_pending) < (std.math.maxInt(usize) / one_pending)); + } + + fn value(wg: *WaitGroup) usize { + return wg.state.load(.monotonic) / one_pending; + } + + fn wait(wg: *WaitGroup) void { + const prev_state = wg.state.fetchAdd(is_waiting, .acquire); + assert(prev_state & is_waiting == 0); + if ((prev_state / one_pending) > 0) eventWait(&wg.event); + } + + fn finish(wg: *WaitGroup) void { + const state = wg.state.fetchSub(one_pending, .acq_rel); + assert((state / one_pending) > 0); + + if (state == (one_pending | is_waiting)) { + eventSet(&wg.event); + } + } +}; + +/// Same as `Io.Event.wait` but avoids the VTable. +fn eventWait(event: *Io.Event) void { + if (@cmpxchgStrong(Io.Event, event, .unset, .waiting, .acquire, .acquire)) |prev| switch (prev) { + .unset => unreachable, + .waiting => {}, + .is_set => return, + }; + while (true) { + Thread.futexWaitUncancelable(@ptrCast(event), @intFromEnum(Io.Event.waiting), null); + switch (@atomicLoad(Io.Event, event, .acquire)) { + .unset => unreachable, // `reset` called before pending `wait` returned + .waiting => continue, + .is_set => return, + } + } +} + +/// Same as `Io.Event.set` but avoids the VTable. +fn eventSet(event: *Io.Event) void { + switch (@atomicRmw(Io.Event, event, .Xchg, .is_set, .release)) { + .unset, .is_set => {}, + .waiting => Thread.futexWake(@ptrCast(event), std.math.maxInt(u32)), + } +} diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig index 930a176b01..a9e2eb28d4 100644 --- a/lib/std/Io/test.zig +++ b/lib/std/Io/test.zig @@ -716,3 +716,136 @@ test "read from a file using Batch.awaitAsync API" { } } } + +test "Event smoke test" { + const io = testing.io; + + var event: Io.Event = .unset; + try testing.expectEqual(false, event.isSet()); + + // make sure the event gets set + event.set(io); + try testing.expectEqual(true, event.isSet()); + + // make sure the event gets unset again + event.reset(); + try testing.expectEqual(false, event.isSet()); + + // waits should timeout as there's no other thread to set the event + try testing.expectError(error.Timeout, event.waitTimeout(io, .{ .duration = .{ + .raw = .zero, + .clock = .awake, + } })); + try testing.expectError(error.Timeout, event.waitTimeout(io, .{ .duration = .{ + .raw = .fromMilliseconds(1), + .clock = .awake, + } })); + + // set the event again and make sure waits complete + event.set(io); + try event.wait(io); + try event.waitTimeout(io, .{ .duration = .{ .raw = .fromMilliseconds(1), .clock = .awake } }); + try testing.expectEqual(true, event.isSet()); +} + +test "Event signaling" { + if (builtin.single_threaded) { + // This test requires spawning threads. + return error.SkipZigTest; + } + + const io = testing.io; + + const Context = struct { + in: Io.Event = .unset, + out: Io.Event = .unset, + value: usize = 0, + + fn input(self: *@This()) !void { + // wait for the value to become 1 + try self.in.wait(io); + self.in.reset(); + try testing.expectEqual(self.value, 1); + + // bump the value and wake up output() + self.value = 2; + self.out.set(io); + + // wait for output to receive 2, bump the value and wake us up with 3 + try self.in.wait(io); + self.in.reset(); + try testing.expectEqual(self.value, 3); + + // bump the value and wake up output() for it to see 4 + self.value = 4; + self.out.set(io); + } + + fn output(self: *@This()) !void { + // start with 0 and bump the value for input to see 1 + try testing.expectEqual(self.value, 0); + self.value = 1; + self.in.set(io); + + // wait for input to receive 1, bump the value to 2 and wake us up + try self.out.wait(io); + self.out.reset(); + try testing.expectEqual(self.value, 2); + + // bump the value to 3 for input to see (rhymes) + self.value = 3; + self.in.set(io); + + // wait for input to bump the value to 4 and receive no more (rhymes) + try self.out.wait(io); + self.out.reset(); + try testing.expectEqual(self.value, 4); + } + }; + + var ctx = Context{}; + + const thread = try std.Thread.spawn(.{}, Context.output, .{&ctx}); + defer thread.join(); + + try ctx.input(); +} + +test "Event broadcast" { + if (builtin.single_threaded) { + // This test requires spawning threads. + return error.SkipZigTest; + } + + const io = testing.io; + + const num_threads = 10; + const Barrier = struct { + event: Io.Event = .unset, + counter: std.atomic.Value(usize) = std.atomic.Value(usize).init(num_threads), + + fn wait(self: *@This()) void { + if (self.counter.fetchSub(1, .acq_rel) == 1) { + self.event.set(io); + } + } + }; + + const Context = struct { + start_barrier: Barrier = .{}, + finish_barrier: Barrier = .{}, + + fn run(self: *@This()) void { + self.start_barrier.wait(); + self.finish_barrier.wait(); + } + }; + + var ctx = Context{}; + var threads: [num_threads - 1]std.Thread = undefined; + + for (&threads) |*t| t.* = try std.Thread.spawn(.{}, Context.run, .{&ctx}); + defer for (threads) |t| t.join(); + + ctx.run(); +} diff --git a/lib/std/Thread.zig b/lib/std/Thread.zig index 33fd4c0234..ef35422e1d 100644 --- a/lib/std/Thread.zig +++ b/lib/std/Thread.zig @@ -19,129 +19,11 @@ pub const Mutex = @import("Thread/Mutex.zig"); pub const Semaphore = @import("Thread/Semaphore.zig"); pub const Condition = @import("Thread/Condition.zig"); pub const RwLock = @import("Thread/RwLock.zig"); -pub const WaitGroup = @import("Thread/WaitGroup.zig"); pub const Pool = @compileError("deprecated; consider using 'std.Io.Group' with 'std.Io.Threaded'"); pub const use_pthreads = native_os != .windows and native_os != .wasi and builtin.link_libc; -/// A thread-safe logical boolean value which can be `set` and `unset`. -/// -/// It can also block threads until the value is set with cancelation via timed -/// waits. Statically initializable; four bytes on all targets. -pub const ResetEvent = enum(u32) { - unset = 0, - waiting = 1, - is_set = 2, - - /// Returns whether the logical boolean is `set`. - /// - /// Once `reset` is called, this returns false until the next `set`. - /// - /// The memory accesses before the `set` can be said to happen before - /// `isSet` returns true. - pub fn isSet(re: *const ResetEvent) bool { - if (builtin.single_threaded) return switch (re.*) { - .unset => false, - .waiting => unreachable, - .is_set => true, - }; - // Acquire barrier ensures memory accesses before `set` happen before - // returning true. - return @atomicLoad(ResetEvent, re, .acquire) == .is_set; - } - - /// Blocks the calling thread until `set` is called. - /// - /// This is effectively a more efficient version of `while (!isSet()) {}`. - /// - /// The memory accesses before the `set` can be said to happen before `wait` returns. - pub fn wait(re: *ResetEvent) void { - if (builtin.single_threaded) switch (re.*) { - .unset => unreachable, // Deadlock, no other threads to wake us up. - .waiting => unreachable, // Invalid state. - .is_set => return, - }; - if (!re.isSet()) return timedWaitInner(re, null) catch |err| switch (err) { - error.Timeout => unreachable, // No timeout specified. - }; - } - - /// Blocks the calling thread until `set` is called, or until the - /// corresponding timeout expires, returning `error.Timeout`. - /// - /// This is effectively a more efficient version of `while (!isSet()) {}`. - /// - /// The memory accesses before the set() can be said to happen before - /// timedWait() returns without error. - pub fn timedWait(re: *ResetEvent, timeout_ns: u64) error{Timeout}!void { - if (builtin.single_threaded) switch (re.*) { - .unset => return error.Timeout, - .waiting => unreachable, // Invalid state. - .is_set => return, - }; - if (!re.isSet()) return timedWaitInner(re, timeout_ns); - } - - fn timedWaitInner(re: *ResetEvent, timeout: ?u64) error{Timeout}!void { - @branchHint(.cold); - - // Try to set the state from `unset` to `waiting` to indicate to the - // `set` thread that others are blocked on the ResetEvent. Avoid using - // any strict barriers until we know the ResetEvent is set. - var state = @atomicLoad(ResetEvent, re, .acquire); - if (state == .unset) { - state = @cmpxchgStrong(ResetEvent, re, state, .waiting, .acquire, .acquire) orelse .waiting; - } - - // Wait until the ResetEvent is set since the state is waiting. - if (state == .waiting) { - var futex_deadline = Futex.Deadline.init(timeout); - while (true) { - const wait_result = futex_deadline.wait(@ptrCast(re), @intFromEnum(ResetEvent.waiting)); - - // Check if the ResetEvent was set before possibly reporting error.Timeout below. - state = @atomicLoad(ResetEvent, re, .acquire); - if (state != .waiting) break; - - try wait_result; - } - } - - assert(state == .is_set); - } - - /// Marks the logical boolean as `set` and unblocks any threads in `wait` - /// or `timedWait` to observe the new state. - /// - /// The logical boolean stays `set` until `reset` is called, making future - /// `set` calls do nothing semantically. - /// - /// The memory accesses before `set` can be said to happen before `isSet` - /// returns true or `wait`/`timedWait` return successfully. - pub fn set(re: *ResetEvent) void { - if (builtin.single_threaded) { - re.* = .is_set; - return; - } - if (@atomicRmw(ResetEvent, re, .Xchg, .is_set, .release) == .waiting) { - Futex.wake(@ptrCast(re), std.math.maxInt(u32)); - } - } - - /// Unmarks the ResetEvent as if `set` was never called. - /// - /// Assumes no threads are blocked in `wait` or `timedWait`. Concurrent - /// calls to `set`, `isSet` and `reset` are allowed. - pub fn reset(re: *ResetEvent) void { - if (builtin.single_threaded) { - re.* = .unset; - return; - } - @atomicStore(ResetEvent, re, .unset, .monotonic); - } -}; - const Thread = @This(); const Impl = if (native_os == .windows) WindowsThreadImpl @@ -1676,16 +1558,16 @@ test "setName, getName" { const io = testing.io; const Context = struct { - start_wait_event: ResetEvent = .unset, - test_done_event: ResetEvent = .unset, - thread_done_event: ResetEvent = .unset, + start_wait_event: Io.Event = .unset, + test_done_event: Io.Event = .unset, + thread_done_event: Io.Event = .unset, done: std.atomic.Value(bool) = std.atomic.Value(bool).init(false), thread: Thread = undefined, pub fn run(ctx: *@This()) !void { // Wait for the main thread to have set the thread field in the context. - ctx.start_wait_event.wait(); + try ctx.start_wait_event.wait(io); switch (native_os) { .windows => testThreadName(io, &ctx.thread) catch |err| switch (err) { @@ -1696,10 +1578,10 @@ test "setName, getName" { } // Signal our test is done - ctx.test_done_event.set(); + ctx.test_done_event.set(io); // wait for the thread to property exit - ctx.thread_done_event.wait(); + try ctx.thread_done_event.wait(io); } }; @@ -1707,8 +1589,8 @@ test "setName, getName" { var thread = try spawn(.{}, Context.run, .{&context}); context.thread = thread; - context.start_wait_event.set(); - context.test_done_event.wait(); + context.start_wait_event.set(io); + try context.test_done_event.wait(io); switch (native_os) { .driverkit, .ios, .maccatalyst, .macos, .tvos, .visionos, .watchos => { @@ -1722,31 +1604,32 @@ test "setName, getName" { else => try testThreadName(io, &thread), } - context.thread_done_event.set(); + context.thread_done_event.set(io); thread.join(); } test { _ = Futex; - _ = ResetEvent; _ = Mutex; _ = Semaphore; _ = Condition; _ = RwLock; } -fn testIncrementNotify(value: *usize, event: *ResetEvent) void { +fn testIncrementNotify(io: Io, value: *usize, event: *Io.Event) void { value.* += 1; - event.set(); + event.set(io); } test join { if (builtin.single_threaded) return error.SkipZigTest; - var value: usize = 0; - var event: ResetEvent = .unset; + const io = testing.io; - const thread = try Thread.spawn(.{}, testIncrementNotify, .{ &value, &event }); + var value: usize = 0; + var event: Io.Event = .unset; + + const thread = try Thread.spawn(.{}, testIncrementNotify, .{ io, &value, &event }); thread.join(); try std.testing.expectEqual(value, 1); @@ -1755,13 +1638,15 @@ test join { test detach { if (builtin.single_threaded) return error.SkipZigTest; - var value: usize = 0; - var event: ResetEvent = .unset; + const io = testing.io; - const thread = try Thread.spawn(.{}, testIncrementNotify, .{ &value, &event }); + var value: usize = 0; + var event: Io.Event = .unset; + + const thread = try Thread.spawn(.{}, testIncrementNotify, .{ io, &value, &event }); thread.detach(); - event.wait(); + try event.wait(io); try std.testing.expectEqual(value, 1); } @@ -1803,127 +1688,6 @@ fn testTls() !void { if (x != 1235) return error.TlsBadEndValue; } -test "ResetEvent smoke test" { - var event: ResetEvent = .unset; - try testing.expectEqual(false, event.isSet()); - - // make sure the event gets set - event.set(); - try testing.expectEqual(true, event.isSet()); - - // make sure the event gets unset again - event.reset(); - try testing.expectEqual(false, event.isSet()); - - // waits should timeout as there's no other thread to set the event - try testing.expectError(error.Timeout, event.timedWait(0)); - try testing.expectError(error.Timeout, event.timedWait(std.time.ns_per_ms)); - - // set the event again and make sure waits complete - event.set(); - event.wait(); - try event.timedWait(std.time.ns_per_ms); - try testing.expectEqual(true, event.isSet()); -} - -test "ResetEvent signaling" { - // This test requires spawning threads - if (builtin.single_threaded) { - return error.SkipZigTest; - } - - const Context = struct { - in: ResetEvent = .unset, - out: ResetEvent = .unset, - value: usize = 0, - - fn input(self: *@This()) !void { - // wait for the value to become 1 - self.in.wait(); - self.in.reset(); - try testing.expectEqual(self.value, 1); - - // bump the value and wake up output() - self.value = 2; - self.out.set(); - - // wait for output to receive 2, bump the value and wake us up with 3 - self.in.wait(); - self.in.reset(); - try testing.expectEqual(self.value, 3); - - // bump the value and wake up output() for it to see 4 - self.value = 4; - self.out.set(); - } - - fn output(self: *@This()) !void { - // start with 0 and bump the value for input to see 1 - try testing.expectEqual(self.value, 0); - self.value = 1; - self.in.set(); - - // wait for input to receive 1, bump the value to 2 and wake us up - self.out.wait(); - self.out.reset(); - try testing.expectEqual(self.value, 2); - - // bump the value to 3 for input to see (rhymes) - self.value = 3; - self.in.set(); - - // wait for input to bump the value to 4 and receive no more (rhymes) - self.out.wait(); - self.out.reset(); - try testing.expectEqual(self.value, 4); - } - }; - - var ctx = Context{}; - - const thread = try std.Thread.spawn(.{}, Context.output, .{&ctx}); - defer thread.join(); - - try ctx.input(); -} - -test "ResetEvent broadcast" { - // This test requires spawning threads - if (builtin.single_threaded) { - return error.SkipZigTest; - } - - const num_threads = 10; - const Barrier = struct { - event: ResetEvent = .unset, - counter: std.atomic.Value(usize) = std.atomic.Value(usize).init(num_threads), - - fn wait(self: *@This()) void { - if (self.counter.fetchSub(1, .acq_rel) == 1) { - self.event.set(); - } - } - }; - - const Context = struct { - start_barrier: Barrier = .{}, - finish_barrier: Barrier = .{}, - - fn run(self: *@This()) void { - self.start_barrier.wait(); - self.finish_barrier.wait(); - } - }; - - var ctx = Context{}; - var threads: [num_threads - 1]std.Thread = undefined; - - for (&threads) |*t| t.* = try std.Thread.spawn(.{}, Context.run, .{&ctx}); - defer for (threads) |t| t.join(); - - ctx.run(); -} - /// Configures the per-thread alternative signal stack requested by `std.options.signal_stack_size`. pub fn maybeAttachSignalStack() void { const size = std.options.signal_stack_size orelse return; diff --git a/lib/std/Thread/WaitGroup.zig b/lib/std/Thread/WaitGroup.zig deleted file mode 100644 index 8a9107192d..0000000000 --- a/lib/std/Thread/WaitGroup.zig +++ /dev/null @@ -1,87 +0,0 @@ -const builtin = @import("builtin"); -const std = @import("std"); -const assert = std.debug.assert; -const WaitGroup = @This(); - -const is_waiting: usize = 1 << 0; -const one_pending: usize = 1 << 1; - -state: std.atomic.Value(usize) = std.atomic.Value(usize).init(0), -event: std.Thread.ResetEvent = .unset, - -pub fn start(self: *WaitGroup) void { - return startStateless(&self.state); -} - -pub fn startStateless(state: *std.atomic.Value(usize)) void { - const prev_state = state.fetchAdd(one_pending, .monotonic); - assert((prev_state / one_pending) < (std.math.maxInt(usize) / one_pending)); -} - -pub fn startMany(self: *WaitGroup, n: usize) void { - const state = self.state.fetchAdd(one_pending * n, .monotonic); - assert((state / one_pending) < (std.math.maxInt(usize) / one_pending)); -} - -pub fn finish(self: *WaitGroup) void { - const state = self.state.fetchSub(one_pending, .acq_rel); - assert((state / one_pending) > 0); - - if (state == (one_pending | is_waiting)) { - self.event.set(); - } -} - -pub fn finishStateless(state: *std.atomic.Value(usize), event: *std.Thread.ResetEvent) void { - const prev_state = state.fetchSub(one_pending, .acq_rel); - assert((prev_state / one_pending) > 0); - if (prev_state == (one_pending | is_waiting)) event.set(); -} - -pub fn wait(wg: *WaitGroup) void { - return waitStateless(&wg.state, &wg.event); -} - -pub fn waitStateless(state: *std.atomic.Value(usize), event: *std.Thread.ResetEvent) void { - const prev_state = state.fetchAdd(is_waiting, .acquire); - assert(prev_state & is_waiting == 0); - if ((prev_state / one_pending) > 0) event.wait(); -} - -pub fn reset(self: *WaitGroup) void { - self.state.store(0, .monotonic); - self.event.reset(); -} - -pub fn isDone(wg: *WaitGroup) bool { - const state = wg.state.load(.acquire); - assert(state & is_waiting == 0); - - return (state / one_pending) == 0; -} - -pub fn value(wg: *WaitGroup) usize { - return wg.state.load(.monotonic) / one_pending; -} - -// Spawns a new thread for the task. This is appropriate when the callee -// delegates all work. -pub fn spawnManager( - wg: *WaitGroup, - comptime func: anytype, - args: anytype, -) void { - if (builtin.single_threaded) { - @call(.auto, func, args); - return; - } - const Manager = struct { - fn run(wg_inner: *WaitGroup, args_inner: @TypeOf(args)) void { - defer wg_inner.finish(); - @call(.auto, func, args_inner); - } - }; - wg.start(); - const t = std.Thread.spawn(.{}, Manager.run, .{ wg, args }) catch return Manager.run(wg, args); - t.detach(); -} diff --git a/lib/std/fs/test.zig b/lib/std/fs/test.zig index d0ec6b33e9..977a285fb4 100644 --- a/lib/std/fs/test.zig +++ b/lib/std/fs/test.zig @@ -1745,29 +1745,32 @@ test "open file with exclusive lock twice, make sure second lock waits" { errdefer file.close(io); const S = struct { - fn checkFn(inner_ctx: *TestContext, path: []const u8, started: *std.Thread.ResetEvent, locked: *std.Thread.ResetEvent) !void { - started.set(); + fn checkFn(inner_ctx: *TestContext, path: []const u8, started: *Io.Event, locked: *Io.Event) !void { + started.set(inner_ctx.io); const file1 = try inner_ctx.dir.createFile(inner_ctx.io, path, .{ .lock = .exclusive }); - locked.set(); + locked.set(inner_ctx.io); file1.close(inner_ctx.io); } }; - var started: std.Thread.ResetEvent = .unset; - var locked: std.Thread.ResetEvent = .unset; + var started: Io.Event = .unset; + var locked: Io.Event = .unset; const t = try std.Thread.spawn(.{}, S.checkFn, .{ ctx, filename, &started, &locked }); defer t.join(); // Wait for the spawned thread to start trying to acquire the exclusive file lock. // Then wait a bit to make sure that can't acquire it since we currently hold the file lock. - started.wait(); - try expectError(error.Timeout, locked.timedWait(10 * std.time.ns_per_ms)); + try started.wait(io); + try expectError(error.Timeout, locked.waitTimeout(io, .{ .duration = .{ + .raw = .fromMilliseconds(10), + .clock = .awake, + } })); // Release the file lock which should unlock the thread to lock it and set the locked event. file.close(io); - locked.wait(); + try locked.wait(io); } }.impl); }