std.Io: make it safe to call Group.async within a group task

This commit is contained in:
Matthew Lugg 2025-12-21 14:51:06 +00:00
parent 49f17237b0
commit 4772f1a9f4
No known key found for this signature in database
GPG key ID: 3F5B7DCCBF4AF02E
2 changed files with 35 additions and 12 deletions

View file

@ -1016,9 +1016,14 @@ pub fn Future(Result: type) type {
pub const Group = struct {
state: usize,
context: ?*anyopaque,
token: ?*anyopaque,
/// This value indicates whether or not a group has pending tasks. `null`
/// means there are no pending tasks, and no resources associated with the
/// group, so `await` and `cancel` return immediately without calling the
/// implementation. This means that `token` must be accessed atomically to
/// avoid racing with the check in `await` and `cancel`.
token: std.atomic.Value(?*anyopaque),
pub const init: Group = .{ .state = 0, .context = null, .token = null };
pub const init: Group = .{ .state = 0, .context = null, .token = .init(null) };
/// Calls `function` with `args` asynchronously. The resource spawned is
/// owned by the group.
@ -1081,10 +1086,14 @@ pub const Group = struct {
/// cancellation requests propagate to all members of the group.
///
/// Idempotent. Not threadsafe.
///
/// It is safe to call this function concurrently with `Group.async` or
/// `Group.concurrent`, provided that the group does not complete until
/// the call to `Group.async` or `Group.concurrent` returns.
pub fn wait(g: *Group, io: Io) void {
const token = g.token orelse return;
g.token = null;
const token = g.token.load(.acquire) orelse return;
io.vtable.groupWait(io.userdata, g, token);
assert(g.token.raw == null);
}
/// Equivalent to `wait` but immediately requests cancellation on all
@ -1093,10 +1102,14 @@ pub const Group = struct {
/// For a description of cancelation and cancelation points, see `Future.cancel`.
///
/// Idempotent. Not threadsafe.
///
/// It is safe to call this function concurrently with `Group.async` or
/// `Group.concurrent`, provided that the group does not complete until
/// the call to `Group.async` or `Group.concurrent` returns.
pub fn cancel(g: *Group, io: Io) void {
const token = g.token orelse return;
g.token = null;
const token = g.token.load(.acquire) orelse return;
io.vtable.groupCancel(io.userdata, g, token);
assert(g.token.raw == null);
}
};

View file

@ -1117,8 +1117,8 @@ fn groupAsync(
}
// Append to the group linked list inside the mutex to make `Io.Group.async` thread-safe.
gc.node = .{ .next = @ptrCast(@alignCast(group.token)) };
group.token = &gc.node;
gc.node = .{ .next = @ptrCast(@alignCast(group.token.load(.monotonic))) };
group.token.store(&gc.node, .monotonic);
t.run_queue.prepend(&gc.closure.node);
@ -1169,8 +1169,8 @@ fn groupConcurrent(
}
// Append to the group linked list inside the mutex to make `Io.Group.concurrent` thread-safe.
gc.node = .{ .next = @ptrCast(@alignCast(group.token)) };
group.token = &gc.node;
gc.node = .{ .next = @ptrCast(@alignCast(group.token.load(.monotonic))) };
group.token.store(&gc.node, .monotonic);
t.run_queue.prepend(&gc.closure.node);
@ -1187,7 +1187,7 @@ fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const gpa = t.allocator;
if (builtin.single_threaded) return;
if (builtin.single_threaded) unreachable; // we never set `group.token` to non-`null`
const group_state: *std.atomic.Value(usize) = @ptrCast(&group.state);
const event: *Io.Event = @ptrCast(&group.context);
@ -1212,13 +1212,18 @@ fn groupWait(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
gc.deinit(gpa);
node = node_next orelse break;
}
// Since the group has now finished, it's illegal to add more tasks to it until we return. It's
// also illegal for us to race with another `await` or `cancel`. Therefore, we must be the only
// thread who can access `group` right now.
group.token.raw = null;
}
fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const gpa = t.allocator;
if (builtin.single_threaded) return;
if (builtin.single_threaded) unreachable; // we never set `group.token` to non-`null`
{
var node: *std.SinglyLinkedList.Node = @ptrCast(@alignCast(token));
@ -1244,6 +1249,11 @@ fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void
node = node_next orelse break;
}
}
// Since the group has now finished, it's illegal to add more tasks to it until we return. It's
// also illegal for us to race with another `await` or `cancel`. Therefore, we must be the only
// thread who can access `group` right now.
group.token.raw = null;
}
fn recancel(userdata: ?*anyopaque) void {