Merge pull request 'Io.Select: cancelation and concurrent' (#30836) from blblack/zig:select-stuff into master

Reviewed-on: https://codeberg.org/ziglang/zig/pulls/30836
Reviewed-by: Andrew Kelley <andrew@ziglang.org>
This commit is contained in:
Andrew Kelley 2026-02-21 01:29:06 +01:00
commit 5ac6ff43d4

View file

@ -1206,7 +1206,7 @@ pub fn Select(comptime U: type) type {
/// already been called and completed, or it has successfully been
/// assigned a unit of concurrency.
///
/// After this is called, `wait` or `cancel` must be called before the
/// After this is called, `await` or `cancel` must be called before the
/// select is deinitialized.
///
/// Threadsafe.
@ -1225,10 +1225,13 @@ pub fn Select(comptime U: type) type {
args: @TypeOf(args),
fn start(type_erased_context: *const anyopaque) Cancelable!void {
const context: *const @This() = @ptrCast(@alignCast(type_erased_context));
const elem = @unionInit(U, @tagName(field), @call(.auto, function, context.args));
const raw_result = @call(.auto, function, context.args);
const elem = @unionInit(U, @tagName(field), raw_result);
context.select.queue.putOneUncancelable(context.select.io, elem) catch |err| switch (err) {
error.Closed => unreachable,
};
if (@typeInfo(@TypeOf(raw_result)) == .error_union)
raw_result catch |err| if (err == error.Canceled) return error.Canceled;
}
};
const context: Context = .{ .select = s, .args = args };
@ -1236,6 +1239,46 @@ pub fn Select(comptime U: type) type {
s.io.vtable.groupAsync(s.io.userdata, &s.group, @ptrCast(&context), .of(Context), Context.start);
}
/// Calls `function` with `args` concurrently. The resource spawned is
/// owned by the select.
///
/// `function` must have return type matching the `field` field of `Union`.
///
/// After this function returns successfully, it is guaranteed that
/// `function` has been assigned a unit of concurrency, and `await` or
/// `cancel` must be called before the select is deinitialized.
///
///
/// Threadsafe.
///
/// Related:
/// * `Io.concurrent`
/// * `Group.concurrent`
pub fn concurrent(
s: *S,
comptime field: Field,
function: anytype,
args: std.meta.ArgsTuple(@TypeOf(function)),
) ConcurrentError!void {
const Context = struct {
select: *S,
args: @TypeOf(args),
fn start(type_erased_context: *const anyopaque) Cancelable!void {
const context: *const @This() = @ptrCast(@alignCast(type_erased_context));
const raw_result = @call(.auto, function, context.args);
const elem = @unionInit(U, @tagName(field), raw_result);
context.select.queue.putOneUncancelable(context.select.io, elem) catch |err| switch (err) {
error.Closed => unreachable,
};
if (@typeInfo(@TypeOf(raw_result)) == .error_union)
raw_result catch |err| if (err == error.Canceled) return error.Canceled;
}
};
const context: Context = .{ .select = s, .args = args };
try s.io.vtable.groupConcurrent(s.io.userdata, &s.group, @ptrCast(&context), .of(Context), Context.start);
_ = @atomicRmw(usize, &s.outstanding, .Add, 1, .monotonic);
}
/// Blocks until another task of the select finishes.
///
/// Asserts there is at least one more `outstanding` task.
@ -1249,12 +1292,12 @@ pub fn Select(comptime U: type) type {
};
}
/// Equivalent to `wait` but requests cancelation on all remaining
/// Equivalent to `await` but requests cancelation on all remaining
/// tasks owned by the select.
///
/// For a description of cancelation and cancelation points, see `Future.cancel`.
///
/// It is illegal to call `wait` after this.
/// It is illegal to call `await` after this.
///
/// Idempotent. Not threadsafe.
pub fn cancel(s: *S) void {