Io.Select: add fn concurrent

This commit is contained in:
Brandon Black 2026-01-14 18:18:29 -06:00
parent 376320a5e9
commit 6b733537ab

View file

@ -1283,6 +1283,46 @@ pub fn Select(comptime U: type) type {
s.io.vtable.groupAsync(s.io.userdata, &s.group, @ptrCast(&context), .of(Context), Context.start);
}
/// Calls `function` with `args` concurrently. The resource spawned is
/// owned by the select.
///
/// `function` must have return type matching the `field` field of `Union`.
///
/// After this function returns successfully, it is guaranteed that
/// `function` has been assigned a unit of concurrency, and `await` or
/// `cancel` must be called before the select is deinitialized.
///
///
/// Threadsafe.
///
/// Related:
/// * `Io.concurrent`
/// * `Group.concurrent`
pub fn concurrent(
s: *S,
comptime field: Field,
function: anytype,
args: std.meta.ArgsTuple(@TypeOf(function)),
) ConcurrentError!void {
const Context = struct {
select: *S,
args: @TypeOf(args),
fn start(type_erased_context: *const anyopaque) Cancelable!void {
const context: *const @This() = @ptrCast(@alignCast(type_erased_context));
const raw_result = @call(.auto, function, context.args);
const elem = @unionInit(U, @tagName(field), raw_result);
context.select.queue.putOneUncancelable(context.select.io, elem) catch |err| switch (err) {
error.Closed => unreachable,
};
if (@typeInfo(@TypeOf(raw_result)) == .error_union)
raw_result catch |err| if (err == error.Canceled) return error.Canceled;
}
};
const context: Context = .{ .select = s, .args = args };
try s.io.vtable.groupConcurrent(s.io.userdata, &s.group, @ptrCast(&context), .of(Context), Context.start);
_ = @atomicRmw(usize, &s.outstanding, .Add, 1, .monotonic);
}
/// Blocks until another task of the select finishes.
///
/// Asserts there is at least one more `outstanding` task.