diff --git a/lib/std/Io.zig b/lib/std/Io.zig index eb0d28dff3..ff58bb400e 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -1206,7 +1206,7 @@ pub fn Select(comptime U: type) type { /// already been called and completed, or it has successfully been /// assigned a unit of concurrency. /// - /// After this is called, `wait` or `cancel` must be called before the + /// After this is called, `await` or `cancel` must be called before the /// select is deinitialized. /// /// Threadsafe. @@ -1225,10 +1225,13 @@ pub fn Select(comptime U: type) type { args: @TypeOf(args), fn start(type_erased_context: *const anyopaque) Cancelable!void { const context: *const @This() = @ptrCast(@alignCast(type_erased_context)); - const elem = @unionInit(U, @tagName(field), @call(.auto, function, context.args)); + const raw_result = @call(.auto, function, context.args); + const elem = @unionInit(U, @tagName(field), raw_result); context.select.queue.putOneUncancelable(context.select.io, elem) catch |err| switch (err) { error.Closed => unreachable, }; + if (@typeInfo(@TypeOf(raw_result)) == .error_union) + raw_result catch |err| if (err == error.Canceled) return error.Canceled; } }; const context: Context = .{ .select = s, .args = args }; @@ -1236,6 +1239,46 @@ pub fn Select(comptime U: type) type { s.io.vtable.groupAsync(s.io.userdata, &s.group, @ptrCast(&context), .of(Context), Context.start); } + /// Calls `function` with `args` concurrently. The resource spawned is + /// owned by the select. + /// + /// `function` must have return type matching the `field` field of `Union`. + /// + /// After this function returns successfully, it is guaranteed that + /// `function` has been assigned a unit of concurrency, and `await` or + /// `cancel` must be called before the select is deinitialized. + /// + /// + /// Threadsafe. + /// + /// Related: + /// * `Io.concurrent` + /// * `Group.concurrent` + pub fn concurrent( + s: *S, + comptime field: Field, + function: anytype, + args: std.meta.ArgsTuple(@TypeOf(function)), + ) ConcurrentError!void { + const Context = struct { + select: *S, + args: @TypeOf(args), + fn start(type_erased_context: *const anyopaque) Cancelable!void { + const context: *const @This() = @ptrCast(@alignCast(type_erased_context)); + const raw_result = @call(.auto, function, context.args); + const elem = @unionInit(U, @tagName(field), raw_result); + context.select.queue.putOneUncancelable(context.select.io, elem) catch |err| switch (err) { + error.Closed => unreachable, + }; + if (@typeInfo(@TypeOf(raw_result)) == .error_union) + raw_result catch |err| if (err == error.Canceled) return error.Canceled; + } + }; + const context: Context = .{ .select = s, .args = args }; + try s.io.vtable.groupConcurrent(s.io.userdata, &s.group, @ptrCast(&context), .of(Context), Context.start); + _ = @atomicRmw(usize, &s.outstanding, .Add, 1, .monotonic); + } + /// Blocks until another task of the select finishes. /// /// Asserts there is at least one more `outstanding` task. @@ -1249,12 +1292,12 @@ pub fn Select(comptime U: type) type { }; } - /// Equivalent to `wait` but requests cancelation on all remaining + /// Equivalent to `await` but requests cancelation on all remaining /// tasks owned by the select. /// /// For a description of cancelation and cancelation points, see `Future.cancel`. /// - /// It is illegal to call `wait` after this. + /// It is illegal to call `await` after this. /// /// Idempotent. Not threadsafe. pub fn cancel(s: *S) void {