std.Io.Group: tweak documentation and vtable API

This commit is contained in:
Matthew Lugg 2025-12-30 21:46:57 +00:00
parent 2c395e326f
commit b8a09bcbd9
No known key found for this signature in database
GPG key ID: 3F5B7DCCBF4AF02E
2 changed files with 47 additions and 55 deletions

View file

@ -631,7 +631,7 @@ pub const VTable = struct {
/// Copied and then passed to `start`.
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (*Group, context: *const anyopaque) Cancelable!void,
start: *const fn (context: *const anyopaque) Cancelable!void,
) void,
/// Thread-safe.
groupConcurrent: *const fn (
@ -642,7 +642,7 @@ pub const VTable = struct {
/// Copied and then passed to `start`.
context: []const u8,
context_alignment: std.mem.Alignment,
start: *const fn (*Group, context: *const anyopaque) Cancelable!void,
start: *const fn (context: *const anyopaque) Cancelable!void,
) ConcurrentError!void,
groupAwait: *const fn (?*anyopaque, *Group, token: *anyopaque) Cancelable!void,
groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void,
@ -1050,40 +1050,40 @@ pub fn Future(Result: type) type {
};
}
/// An unordered set of tasks which can only be awaited or canceled as a whole.
/// Tasks are spawned in the group with `Group.async` and `Group.concurrent`.
///
/// The resources associated with each task are *guaranteed* to be released when
/// the individual task returns, as opposed to when the whole group completes or
/// is awaited. For this reason, it is not a resource leak to have a long-lived
/// group which concurrent tasks are repeatedly added to. However, asynchronous
/// tasks are not guaranteed to run until `Group.await` or `Group.cancel` is
/// called, so adding async tasks to a group without ever awaiting it may leak
/// resources.
pub const Group = struct {
state: usize,
context: ?*anyopaque,
/// This value indicates whether or not a group has pending tasks. `null`
/// means there are no pending tasks, and no resources associated with the
/// group, so `await` and `cancel` return immediately without calling the
/// implementation. This means that `token` must be accessed atomically to
/// avoid racing with the check in `await` and `cancel`.
token: std.atomic.Value(?*anyopaque),
/// This value is available for the implementation to use as it wishes.
state: usize,
pub const init: Group = .{ .state = 0, .context = null, .token = .init(null) };
pub const init: Group = .{ .token = .init(null), .state = 0 };
/// Calls `function` with `args` asynchronously. The resource spawned is
/// owned by the group.
/// Equivalent to `Io.async`, except the task is spawned in this `Group`
/// instead of becoming associated with a `Future`.
///
/// `function` *may* be called immediately, before `async` returns.
/// The return type of `function` must be coercible to `Cancelable!void`.
///
/// When this function returns, it is guaranteed that `function` has
/// already been called and completed, or it has successfully been assigned
/// a unit of concurrency.
///
/// After this is called, `await` or `cancel` must be called before the
/// group is deinitialized.
///
/// Threadsafe.
///
/// See also:
/// * `concurrent`
/// * `Io.async`
/// Once this function is called, there are resources associated with the
/// group. To release those resources, `Group.await` or `Group.cancel` must
/// eventually be called.
pub fn async(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void {
const Args = @TypeOf(args);
const TypeErased = struct {
fn start(group: *Group, context: *const anyopaque) Cancelable!void {
_ = group;
fn start(context: *const anyopaque) Cancelable!void {
const args_casted: *const Args = @ptrCast(@alignCast(context));
return @call(.auto, function, args_casted.*);
}
@ -1091,27 +1091,18 @@ pub const Group = struct {
io.vtable.groupAsync(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start);
}
/// Calls `function` with `args`, such that the function is not guaranteed
/// to have returned until `await` is called, allowing the caller to
/// progress while waiting for any `Io` operations.
/// Equivalent to `Io.concurrent`, except the task is spawned in this
/// `Group` instead of becoming associated with a `Future`.
///
/// The resource spawned is owned by the group; after this is called,
/// `await` or `cancel` must be called before the group is deinitialized.
/// The return type of `function` must be coercible to `Cancelable!void`.
///
/// This has stronger guarantee than `async`, placing restrictions on what kind
/// of `Io` implementations are supported. By calling `async` instead, one
/// allows, for example, stackful single-threaded blocking I/O.
///
/// Threadsafe.
///
/// See also:
/// * `async`
/// * `Io.concurrent`
/// Once this function is called, there are resources associated with the
/// group. To release those resources, `Group.await` or `Group.cancel` must
/// eventually be called.
pub fn concurrent(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) ConcurrentError!void {
const Args = @TypeOf(args);
const TypeErased = struct {
fn start(group: *Group, context: *const anyopaque) Cancelable!void {
_ = group;
fn start(context: *const anyopaque) Cancelable!void {
const args_casted: *const Args = @ptrCast(@alignCast(context));
return @call(.auto, function, args_casted.*);
}
@ -1263,19 +1254,20 @@ pub fn Select(comptime U: type) type {
function: anytype,
args: std.meta.ArgsTuple(@TypeOf(function)),
) void {
const Args = @TypeOf(args);
const TypeErased = struct {
fn start(group: *Group, context: *const anyopaque) Cancelable!void {
const args_casted: *const Args = @ptrCast(@alignCast(context));
const unerased_select: *S = @fieldParentPtr("group", group);
const elem = @unionInit(U, @tagName(field), @call(.auto, function, args_casted.*));
unerased_select.queue.putOneUncancelable(unerased_select.io, elem) catch |err| switch (err) {
const Context = struct {
select: *S,
args: @TypeOf(args),
fn start(type_erased_context: *const anyopaque) Cancelable!void {
const context: *const @This() = @ptrCast(@alignCast(type_erased_context));
const elem = @unionInit(U, @tagName(field), @call(.auto, function, context.args));
context.select.queue.putOneUncancelable(context.select.io, elem) catch |err| switch (err) {
error.Closed => unreachable,
};
}
};
const context: Context = .{ .select = s, .args = args };
_ = @atomicRmw(usize, &s.outstanding, .Add, 1, .monotonic);
s.io.vtable.groupAsync(s.io.userdata, &s.group, @ptrCast(&args), .of(Args), TypeErased.start);
s.io.vtable.groupAsync(s.io.userdata, &s.group, @ptrCast(&context), .of(Context), Context.start);
}
/// Blocks until another task of the select finishes.

View file

@ -182,7 +182,7 @@ const Group = struct {
const Task = struct {
runnable: Runnable,
group: *Io.Group,
func: *const fn (*Io.Group, context: *const anyopaque) void,
func: *const fn (context: *const anyopaque) void,
context_alignment: Alignment,
alloc_len: usize,
@ -192,7 +192,7 @@ const Group = struct {
group: Group,
context: []const u8,
context_alignment: Alignment,
func: *const fn (*Io.Group, context: *const anyopaque) void,
func: *const fn (context: *const anyopaque) void,
) Allocator.Error!*Task {
const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(Task);
const worst_case_context_offset = context_alignment.forward(@sizeOf(Task) + max_context_misalignment);
@ -247,7 +247,7 @@ const Group = struct {
}, .monotonic);
}
assertGroupResult(task.func(group.ptr, task.contextPointer()));
assertGroupResult(task.func(task.contextPointer()));
thread.status.store(.{ .cancelation = .none, .awaitable = .null }, .monotonic);
const old_status = group.status().fetchSub(.{
@ -1707,16 +1707,16 @@ fn groupAsync(
type_erased: *Io.Group,
context: []const u8,
context_alignment: Alignment,
start: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void,
start: *const fn (context: *const anyopaque) Io.Cancelable!void,
) void {
const t: *Threaded = @ptrCast(@alignCast(userdata));
const g: Group = .{ .ptr = type_erased };
if (builtin.single_threaded) return start(g.ptr, context.ptr) catch unreachable;
if (builtin.single_threaded) return start(context.ptr) catch unreachable;
const gpa = t.allocator;
const task = Group.Task.create(gpa, g, context, context_alignment, start) catch |err| switch (err) {
error.OutOfMemory => return t.assertGroupResult(start(g.ptr, context.ptr)),
error.OutOfMemory => return t.assertGroupResult(start(context.ptr)),
};
t.mutex.lock();
@ -1726,7 +1726,7 @@ fn groupAsync(
if (busy_count >= @intFromEnum(t.async_limit)) {
t.mutex.unlock();
task.destroy(gpa);
return t.assertGroupResult(start(g.ptr, context.ptr));
return t.assertGroupResult(start(context.ptr));
}
t.busy_count = busy_count + 1;
@ -1739,7 +1739,7 @@ fn groupAsync(
t.busy_count = busy_count;
t.mutex.unlock();
task.destroy(gpa);
return t.assertGroupResult(start(g.ptr, context.ptr));
return t.assertGroupResult(start(context.ptr));
};
thread.detach();
}
@ -1782,7 +1782,7 @@ fn groupConcurrent(
type_erased: *Io.Group,
context: []const u8,
context_alignment: Alignment,
start: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void,
start: *const fn (context: *const anyopaque) Io.Cancelable!void,
) Io.ConcurrentError!void {
if (builtin.single_threaded) return error.ConcurrencyUnavailable;