diff --git a/lib/std/Io.zig b/lib/std/Io.zig index 6eea9789b4..cbf1a24847 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -1286,7 +1286,7 @@ pub fn Select(comptime U: type) type { /// Blocks until another task of the select finishes. /// - /// Not threadsafe. + /// Threadsafe. pub fn await(s: *S) Cancelable!U { return s.queue.getOne(s.io) catch |err| switch (err) { error.Canceled => |e| return e, @@ -1294,6 +1294,19 @@ pub fn Select(comptime U: type) type { }; } + /// Blocks until at least `min` number of results have been copied + /// into `buffer`. + /// + /// Asserts that `buffer.len >= min`. + /// + /// Threadsafe. + pub fn awaitMany(s: *S, buffer: []U, min: usize) Cancelable!usize { + return s.queue.get(s.io, buffer, min) catch |err| switch (err) { + error.Canceled => |e| return e, + error.Closed => unreachable, + }; + } + /// Equivalent to `await` but requests cancelation on all remaining /// tasks owned by the select. /// @@ -1301,7 +1314,7 @@ pub fn Select(comptime U: type) type { /// /// It is illegal to call `await` after this. /// - /// Idempotent. Not threadsafe. + /// Idempotent. Threadsafe. pub fn cancel(s: *S) void { s.group.cancel(s.io); } @@ -1732,7 +1745,7 @@ pub const TypeErasedQueue = struct { return if (slice.len > 0) slice else null; } - fn putLocked(q: *TypeErasedQueue, io: Io, elements: []const u8, target: usize, uncancelable: bool) (QueueClosedError || Cancelable)!usize { + fn putLocked(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize, uncancelable: bool) (QueueClosedError || Cancelable)!usize { // A closed queue cannot be added to, even if there is space in the buffer. if (q.closed) return error.Closed; @@ -1768,12 +1781,12 @@ pub const TypeErasedQueue = struct { if (n == elements.len) return elements.len; } - // Don't block if we hit the target. - if (n >= target) return n; + // Don't block if we hit the min. + if (n >= min) return n; var pending: Put = .{ .remaining = elements[n..], - .needed = target - n, + .needed = min - n, .condition = .init, .node = .{}, }; @@ -1832,7 +1845,7 @@ pub const TypeErasedQueue = struct { return if (slice.len > 0) slice else null; } - fn getLocked(q: *TypeErasedQueue, io: Io, buffer: []u8, target: usize, uncancelable: bool) (QueueClosedError || Cancelable)!usize { + fn getLocked(q: *TypeErasedQueue, io: Io, buffer: []u8, min: usize, uncancelable: bool) (QueueClosedError || Cancelable)!usize { // The ring buffer gets first priority, then data should come from any // queued putters, then finally the ring buffer should be filled with // data from putters so they can be resumed. @@ -1878,15 +1891,15 @@ pub const TypeErasedQueue = struct { // No need to call `fillRingBufferFromPutters` from this point onwards, // because we emptied the ring buffer *and* the putter queue! - // Don't block if we hit the target or if the queue is closed. Return how + // Don't block if we hit the min or if the queue is closed. Return how // many elements we could get immediately, unless the queue was closed and // empty, in which case report `error.Closed`. if (n == 0 and q.closed) return error.Closed; - if (n >= target or q.closed) return n; + if (n >= min or q.closed) return n; var pending: Get = .{ .remaining = buffer[n..], - .needed = target - n, + .needed = min - n, .condition = .init, .node = .{}, }; @@ -1962,7 +1975,7 @@ pub fn Queue(Elem: type) type { /// there is insufficient capacity. Returns when any one of the /// following conditions is satisfied: /// - /// * At least `target` elements have been added to the queue + /// * At least `min` elements have been added to the queue /// * The queue is closed /// * The current task is canceled /// @@ -1971,16 +1984,16 @@ pub fn Queue(Elem: type) type { /// /// If the queue is closed or the task is canceled, but some items were /// already added before the closure or cancelation, then `put` may - /// return a number lower than `target`, in which case future calls are + /// return a number lower than `min`, in which case future calls are /// guaranteed to return `error.Canceled` or `error.Closed`. /// - /// A return value of 0 is only possible if `target` is 0, in which case + /// A return value of 0 is only possible if `min` is 0, in which case /// the call is guaranteed to queue as many of `elements` as is possible /// *without* blocking. /// - /// Asserts that `elements.len >= target`. - pub fn put(q: *@This(), io: Io, elements: []const Elem, target: usize) (QueueClosedError || Cancelable)!usize { - return @divExact(try q.type_erased.put(io, @ptrCast(elements), target * @sizeOf(Elem)), @sizeOf(Elem)); + /// Asserts that `elements.len >= min`. + pub fn put(q: *@This(), io: Io, elements: []const Elem, min: usize) (QueueClosedError || Cancelable)!usize { + return @divExact(try q.type_erased.put(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem)); } /// Same as `put` but blocks until all elements have been added to the queue. @@ -2019,7 +2032,7 @@ pub fn Queue(Elem: type) type { /// if there are insufficient elements currently in the queue. Returns when /// any one of the following conditions is satisfied: /// - /// * At least `target` elements have been received from the queue + /// * At least `min` elements have been received from the queue /// * The queue is closed and contains no buffered elements /// * The current task is canceled /// @@ -2028,16 +2041,16 @@ pub fn Queue(Elem: type) type { /// /// If the queue is closed or the task is canceled, but some items were /// already received before the closure or cancelation, then `get` may - /// return a number lower than `target`, in which case future calls are + /// return a number lower than `min`, in which case future calls are /// guaranteed to return `error.Canceled` or `error.Closed`. /// - /// A return value of 0 is only possible if `target` is 0, in which case + /// A return value of 0 is only possible if `min` is 0, in which case /// the call is guaranteed to fill as much of `buffer` as is possible /// *without* blocking. /// - /// Asserts that `buffer.len >= target`. - pub fn get(q: *@This(), io: Io, buffer: []Elem, target: usize) (QueueClosedError || Cancelable)!usize { - return @divExact(try q.type_erased.get(io, @ptrCast(buffer), target * @sizeOf(Elem)), @sizeOf(Elem)); + /// Asserts that `buffer.len >= min`. + pub fn get(q: *@This(), io: Io, buffer: []Elem, min: usize) (QueueClosedError || Cancelable)!usize { + return @divExact(try q.type_erased.get(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem)); } /// Same as `get`, except does not introduce a cancelation point. diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig index d13cc802ce..4fc041c749 100644 --- a/lib/std/Io/test.zig +++ b/lib/std/Io/test.zig @@ -810,3 +810,41 @@ test "Event broadcast" { try ctx.run(); } + +test "Select" { + const S = struct { + fn foo() bool { + return true; + } + + fn bar(io: Io) Io.Cancelable!void { + try io.sleep(.fromSeconds(300), .awake); + } + }; + + const io = testing.io; + + const U = union(enum) { + foo: bool, + bar: Io.Cancelable!void, + }; + var buffer: [4]U = undefined; + var select: Io.Select(U) = .init(io, &buffer); + defer select.cancel(); + + select.async(.foo, S.foo, .{}); + select.concurrent(.bar, S.bar, .{io}) catch |err| switch (err) { + error.ConcurrencyUnavailable => return error.SkipZigTest, + }; + + switch (try select.await()) { + .foo => {}, + .bar => return error.TestFailed, // should be sleeping + } + select.async(.foo, S.foo, .{}); + select.async(.foo, S.foo, .{}); + + var finished_buffer: [3]U = undefined; + const finished = finished_buffer[0..try select.awaitMany(&finished_buffer, 2)]; + try testing.expectEqualSlices(U, &.{ .{ .foo = true }, .{ .foo = true } }, finished); +}