diff --git a/lib/std/Io.zig b/lib/std/Io.zig index cf23796db2..fd463b8d55 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -631,7 +631,7 @@ pub const VTable = struct { /// Copied and then passed to `start`. context: []const u8, context_alignment: std.mem.Alignment, - start: *const fn (*Group, context: *const anyopaque) Cancelable!void, + start: *const fn (context: *const anyopaque) Cancelable!void, ) void, /// Thread-safe. groupConcurrent: *const fn ( @@ -642,7 +642,7 @@ pub const VTable = struct { /// Copied and then passed to `start`. context: []const u8, context_alignment: std.mem.Alignment, - start: *const fn (*Group, context: *const anyopaque) Cancelable!void, + start: *const fn (context: *const anyopaque) Cancelable!void, ) ConcurrentError!void, groupAwait: *const fn (?*anyopaque, *Group, token: *anyopaque) Cancelable!void, groupCancel: *const fn (?*anyopaque, *Group, token: *anyopaque) void, @@ -1050,40 +1050,40 @@ pub fn Future(Result: type) type { }; } +/// An unordered set of tasks which can only be awaited or canceled as a whole. +/// Tasks are spawned in the group with `Group.async` and `Group.concurrent`. +/// +/// The resources associated with each task are *guaranteed* to be released when +/// the individual task returns, as opposed to when the whole group completes or +/// is awaited. For this reason, it is not a resource leak to have a long-lived +/// group which concurrent tasks are repeatedly added to. However, asynchronous +/// tasks are not guaranteed to run until `Group.await` or `Group.cancel` is +/// called, so adding async tasks to a group without ever awaiting it may leak +/// resources. pub const Group = struct { - state: usize, - context: ?*anyopaque, /// This value indicates whether or not a group has pending tasks. `null` /// means there are no pending tasks, and no resources associated with the /// group, so `await` and `cancel` return immediately without calling the /// implementation. This means that `token` must be accessed atomically to /// avoid racing with the check in `await` and `cancel`. token: std.atomic.Value(?*anyopaque), + /// This value is available for the implementation to use as it wishes. + state: usize, - pub const init: Group = .{ .state = 0, .context = null, .token = .init(null) }; + pub const init: Group = .{ .token = .init(null), .state = 0 }; - /// Calls `function` with `args` asynchronously. The resource spawned is - /// owned by the group. + /// Equivalent to `Io.async`, except the task is spawned in this `Group` + /// instead of becoming associated with a `Future`. /// - /// `function` *may* be called immediately, before `async` returns. + /// The return type of `function` must be coercible to `Cancelable!void`. /// - /// When this function returns, it is guaranteed that `function` has - /// already been called and completed, or it has successfully been assigned - /// a unit of concurrency. - /// - /// After this is called, `await` or `cancel` must be called before the - /// group is deinitialized. - /// - /// Threadsafe. - /// - /// See also: - /// * `concurrent` - /// * `Io.async` + /// Once this function is called, there are resources associated with the + /// group. To release those resources, `Group.await` or `Group.cancel` must + /// eventually be called. pub fn async(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) void { const Args = @TypeOf(args); const TypeErased = struct { - fn start(group: *Group, context: *const anyopaque) Cancelable!void { - _ = group; + fn start(context: *const anyopaque) Cancelable!void { const args_casted: *const Args = @ptrCast(@alignCast(context)); return @call(.auto, function, args_casted.*); } @@ -1091,27 +1091,18 @@ pub const Group = struct { io.vtable.groupAsync(io.userdata, g, @ptrCast(&args), .of(Args), TypeErased.start); } - /// Calls `function` with `args`, such that the function is not guaranteed - /// to have returned until `await` is called, allowing the caller to - /// progress while waiting for any `Io` operations. + /// Equivalent to `Io.concurrent`, except the task is spawned in this + /// `Group` instead of becoming associated with a `Future`. /// - /// The resource spawned is owned by the group; after this is called, - /// `await` or `cancel` must be called before the group is deinitialized. + /// The return type of `function` must be coercible to `Cancelable!void`. /// - /// This has stronger guarantee than `async`, placing restrictions on what kind - /// of `Io` implementations are supported. By calling `async` instead, one - /// allows, for example, stackful single-threaded blocking I/O. - /// - /// Threadsafe. - /// - /// See also: - /// * `async` - /// * `Io.concurrent` + /// Once this function is called, there are resources associated with the + /// group. To release those resources, `Group.await` or `Group.cancel` must + /// eventually be called. pub fn concurrent(g: *Group, io: Io, function: anytype, args: std.meta.ArgsTuple(@TypeOf(function))) ConcurrentError!void { const Args = @TypeOf(args); const TypeErased = struct { - fn start(group: *Group, context: *const anyopaque) Cancelable!void { - _ = group; + fn start(context: *const anyopaque) Cancelable!void { const args_casted: *const Args = @ptrCast(@alignCast(context)); return @call(.auto, function, args_casted.*); } @@ -1263,19 +1254,20 @@ pub fn Select(comptime U: type) type { function: anytype, args: std.meta.ArgsTuple(@TypeOf(function)), ) void { - const Args = @TypeOf(args); - const TypeErased = struct { - fn start(group: *Group, context: *const anyopaque) Cancelable!void { - const args_casted: *const Args = @ptrCast(@alignCast(context)); - const unerased_select: *S = @fieldParentPtr("group", group); - const elem = @unionInit(U, @tagName(field), @call(.auto, function, args_casted.*)); - unerased_select.queue.putOneUncancelable(unerased_select.io, elem) catch |err| switch (err) { + const Context = struct { + select: *S, + args: @TypeOf(args), + fn start(type_erased_context: *const anyopaque) Cancelable!void { + const context: *const @This() = @ptrCast(@alignCast(type_erased_context)); + const elem = @unionInit(U, @tagName(field), @call(.auto, function, context.args)); + context.select.queue.putOneUncancelable(context.select.io, elem) catch |err| switch (err) { error.Closed => unreachable, }; } }; + const context: Context = .{ .select = s, .args = args }; _ = @atomicRmw(usize, &s.outstanding, .Add, 1, .monotonic); - s.io.vtable.groupAsync(s.io.userdata, &s.group, @ptrCast(&args), .of(Args), TypeErased.start); + s.io.vtable.groupAsync(s.io.userdata, &s.group, @ptrCast(&context), .of(Context), Context.start); } /// Blocks until another task of the select finishes. diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 361126dbec..fe1d8b07cb 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -182,7 +182,7 @@ const Group = struct { const Task = struct { runnable: Runnable, group: *Io.Group, - func: *const fn (*Io.Group, context: *const anyopaque) void, + func: *const fn (context: *const anyopaque) void, context_alignment: Alignment, alloc_len: usize, @@ -192,7 +192,7 @@ const Group = struct { group: Group, context: []const u8, context_alignment: Alignment, - func: *const fn (*Io.Group, context: *const anyopaque) void, + func: *const fn (context: *const anyopaque) void, ) Allocator.Error!*Task { const max_context_misalignment = context_alignment.toByteUnits() -| @alignOf(Task); const worst_case_context_offset = context_alignment.forward(@sizeOf(Task) + max_context_misalignment); @@ -247,7 +247,7 @@ const Group = struct { }, .monotonic); } - assertGroupResult(task.func(group.ptr, task.contextPointer())); + assertGroupResult(task.func(task.contextPointer())); thread.status.store(.{ .cancelation = .none, .awaitable = .null }, .monotonic); const old_status = group.status().fetchSub(.{ @@ -1707,16 +1707,16 @@ fn groupAsync( type_erased: *Io.Group, context: []const u8, context_alignment: Alignment, - start: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void, + start: *const fn (context: *const anyopaque) Io.Cancelable!void, ) void { const t: *Threaded = @ptrCast(@alignCast(userdata)); const g: Group = .{ .ptr = type_erased }; - if (builtin.single_threaded) return start(g.ptr, context.ptr) catch unreachable; + if (builtin.single_threaded) return start(context.ptr) catch unreachable; const gpa = t.allocator; const task = Group.Task.create(gpa, g, context, context_alignment, start) catch |err| switch (err) { - error.OutOfMemory => return t.assertGroupResult(start(g.ptr, context.ptr)), + error.OutOfMemory => return t.assertGroupResult(start(context.ptr)), }; t.mutex.lock(); @@ -1726,7 +1726,7 @@ fn groupAsync( if (busy_count >= @intFromEnum(t.async_limit)) { t.mutex.unlock(); task.destroy(gpa); - return t.assertGroupResult(start(g.ptr, context.ptr)); + return t.assertGroupResult(start(context.ptr)); } t.busy_count = busy_count + 1; @@ -1739,7 +1739,7 @@ fn groupAsync( t.busy_count = busy_count; t.mutex.unlock(); task.destroy(gpa); - return t.assertGroupResult(start(g.ptr, context.ptr)); + return t.assertGroupResult(start(context.ptr)); }; thread.detach(); } @@ -1782,7 +1782,7 @@ fn groupConcurrent( type_erased: *Io.Group, context: []const u8, context_alignment: Alignment, - start: *const fn (*Io.Group, context: *const anyopaque) Io.Cancelable!void, + start: *const fn (context: *const anyopaque) Io.Cancelable!void, ) Io.ConcurrentError!void { if (builtin.single_threaded) return error.ConcurrencyUnavailable;