std.Io: introduce Event implemented using futex

This commit is contained in:
Matthew Lugg 2025-12-15 14:02:20 +00:00
parent b4ee54b580
commit 0649f96da3
No known key found for this signature in database
GPG key ID: 3F5B7DCCBF4AF02E
2 changed files with 139 additions and 0 deletions

View file

@ -1386,6 +1386,109 @@ pub const Condition = struct {
}
};
/// Logical boolean flag which can be set and unset and supports a "wait until set" operation.
pub const Event = enum(u32) {
unset,
waiting,
is_set,
/// Returns whether the logical boolean is `true`.
pub fn isSet(event: *const Event) bool {
return switch (@atomicLoad(Event, event, .acquire)) {
.unset, .waiting => false,
.is_set => true,
};
}
/// Blocks until the logical boolean is `true`.
pub fn wait(event: *Event, io: Io) Io.Cancelable!void {
if (@cmpxchgStrong(Event, event, .unset, .waiting, .acquire, .acquire)) |prev| switch (prev) {
.unset => unreachable,
.waiting => {},
.is_set => return,
};
errdefer {
// Ideally we would restore the event back to `.unset` instead of `.waiting`, but there
// might be other threads waiting on the event. In theory we could track the *number* of
// waiting threads in the unused bits of the `Event`, but that has its own problem: the
// waiters would wake up when a *new waiter* was added. So it's easiest to just leave
// the state at `.waiting`---at worst it causes one redundant call to `futexWake`.
}
while (true) {
try io.futexWait(Event, event, .waiting);
switch (@atomicLoad(Event, event, .acquire)) {
.unset => unreachable, // `reset` called before pending `wait` returned
.waiting => continue,
.is_set => return,
}
}
}
/// Same as `wait` except uninterruptible.
pub fn waitUncancelable(event: *Event, io: Io) void {
if (@cmpxchgStrong(Event, event, .unset, .waiting, .acquire, .acquire)) |prev| switch (prev) {
.unset => unreachable,
.waiting => {},
.is_set => return,
};
while (true) {
io.futexWaitUncancelable(Event, event, .waiting);
switch (@atomicLoad(Event, event, .acquire)) {
.unset => unreachable, // `reset` called before pending `wait` returned
.waiting => continue,
.is_set => return,
}
}
}
/// Blocks the calling thread until either the logical boolean is set, the timeout expires, or a
/// spurious wakeup occurs. If the timeout expires or a spurious wakeup occurs, `error.Timeout`
/// is returned.
pub fn waitTimeout(event: *Event, io: Io, timeout: Timeout) (error{Timeout} || Cancelable)!void {
if (@cmpxchgStrong(Event, event, .unset, .waiting, .acquire, .acquire)) |prev| switch (prev) {
.unset => unreachable,
.waiting => assert(!builtin.single_threaded), // invalid state
.is_set => return,
};
errdefer {
// Ideally we would restore the event back to `.unset` instead of `.waiting`, but there
// might be other threads waiting on the event. In theory we could track the *number* of
// waiting threads in the unused bits of the `Event`, but that has its own problem: the
// waiters would wake up when a *new waiter* was added. So it's easiest to just leave
// the state at `.waiting`---at worst it causes one redundant call to `futexWake`.
}
io.futexWaitTimeout(Event, event, .waiting, timeout);
switch (@atomicLoad(Event, event, .acquire)) {
.unset => unreachable, // `reset` called before pending `wait` returned
.waiting => return error.Timeout,
.is_set => return,
}
}
/// Sets the logical boolean to true, and hence unblocks any pending calls to `wait`. The
/// logical boolean remains true until `reset` is called, so future calls to `set` have no
/// semantic effect.
///
/// Any memory accesses prior to a `set` call are "released", so that if this `set` call causes
/// `isSet` to return `true` or a wait to finish, those tasks will be able to observe those
/// memory accesses.
pub fn set(e: *Event, io: Io) void {
switch (@atomicRmw(Event, e, .Xchg, .is_set, .release)) {
.unset, .is_set => {},
.waiting => io.futexWake(Event, e, std.math.maxInt(u32)),
}
}
/// Sets the logical boolean to false.
///
/// Assumes that there is no pending call to `wait` or `waitUncancelable`.
///
/// However, concurrent calls to `isSet`, `set`, and `reset` are allowed.
pub fn reset(e: *Event) void {
@atomicStore(Event, e, .unset, .monotonic);
}
};
pub const TypeErasedQueue = struct {
mutex: Mutex,

View file

@ -255,3 +255,39 @@ test "Queue" {
try testQueue(4);
try testQueue(5);
}
test "Event" {
const global = struct {
fn waitAndRead(io: Io, event: *Io.Event, ptr: *const u32) Io.Cancelable!u32 {
try event.wait(io);
return ptr.*;
}
};
const io = std.testing.io;
var event: Io.Event = .unset;
var buffer: u32 = undefined;
{
var future = io.concurrent(global.waitAndRead, .{ io, &event, &buffer }) catch |err| switch (err) {
error.ConcurrencyUnavailable => return error.SkipZigTest,
};
buffer = 123;
event.set(io);
const result = try future.await(io);
try std.testing.expectEqual(123, result);
}
event.reset();
{
var future = io.concurrent(global.waitAndRead, .{ io, &event, &buffer }) catch |err| switch (err) {
error.ConcurrencyUnavailable => return error.SkipZigTest,
};
try std.testing.expectError(error.Canceled, future.cancel(io));
}
}