std.Io.Select: add awaitMany function and unit test

and fix documentation. these functions are in fact threadsafe.
This commit is contained in:
Andrew Kelley 2026-02-20 17:28:33 -08:00
parent f9053f38e5
commit 311bba4af0
2 changed files with 73 additions and 22 deletions

View file

@ -1286,7 +1286,7 @@ pub fn Select(comptime U: type) type {
/// Blocks until another task of the select finishes.
///
/// Not threadsafe.
/// Threadsafe.
pub fn await(s: *S) Cancelable!U {
return s.queue.getOne(s.io) catch |err| switch (err) {
error.Canceled => |e| return e,
@ -1294,6 +1294,19 @@ pub fn Select(comptime U: type) type {
};
}
/// Blocks until at least `min` number of results have been copied
/// into `buffer`.
///
/// Asserts that `buffer.len >= min`.
///
/// Threadsafe.
pub fn awaitMany(s: *S, buffer: []U, min: usize) Cancelable!usize {
return s.queue.get(s.io, buffer, min) catch |err| switch (err) {
error.Canceled => |e| return e,
error.Closed => unreachable,
};
}
/// Equivalent to `await` but requests cancelation on all remaining
/// tasks owned by the select.
///
@ -1301,7 +1314,7 @@ pub fn Select(comptime U: type) type {
///
/// It is illegal to call `await` after this.
///
/// Idempotent. Not threadsafe.
/// Idempotent. Threadsafe.
pub fn cancel(s: *S) void {
s.group.cancel(s.io);
}
@ -1732,7 +1745,7 @@ pub const TypeErasedQueue = struct {
return if (slice.len > 0) slice else null;
}
fn putLocked(q: *TypeErasedQueue, io: Io, elements: []const u8, target: usize, uncancelable: bool) (QueueClosedError || Cancelable)!usize {
fn putLocked(q: *TypeErasedQueue, io: Io, elements: []const u8, min: usize, uncancelable: bool) (QueueClosedError || Cancelable)!usize {
// A closed queue cannot be added to, even if there is space in the buffer.
if (q.closed) return error.Closed;
@ -1768,12 +1781,12 @@ pub const TypeErasedQueue = struct {
if (n == elements.len) return elements.len;
}
// Don't block if we hit the target.
if (n >= target) return n;
// Don't block if we hit the min.
if (n >= min) return n;
var pending: Put = .{
.remaining = elements[n..],
.needed = target - n,
.needed = min - n,
.condition = .init,
.node = .{},
};
@ -1832,7 +1845,7 @@ pub const TypeErasedQueue = struct {
return if (slice.len > 0) slice else null;
}
fn getLocked(q: *TypeErasedQueue, io: Io, buffer: []u8, target: usize, uncancelable: bool) (QueueClosedError || Cancelable)!usize {
fn getLocked(q: *TypeErasedQueue, io: Io, buffer: []u8, min: usize, uncancelable: bool) (QueueClosedError || Cancelable)!usize {
// The ring buffer gets first priority, then data should come from any
// queued putters, then finally the ring buffer should be filled with
// data from putters so they can be resumed.
@ -1878,15 +1891,15 @@ pub const TypeErasedQueue = struct {
// No need to call `fillRingBufferFromPutters` from this point onwards,
// because we emptied the ring buffer *and* the putter queue!
// Don't block if we hit the target or if the queue is closed. Return how
// Don't block if we hit the min or if the queue is closed. Return how
// many elements we could get immediately, unless the queue was closed and
// empty, in which case report `error.Closed`.
if (n == 0 and q.closed) return error.Closed;
if (n >= target or q.closed) return n;
if (n >= min or q.closed) return n;
var pending: Get = .{
.remaining = buffer[n..],
.needed = target - n,
.needed = min - n,
.condition = .init,
.node = .{},
};
@ -1962,7 +1975,7 @@ pub fn Queue(Elem: type) type {
/// there is insufficient capacity. Returns when any one of the
/// following conditions is satisfied:
///
/// * At least `target` elements have been added to the queue
/// * At least `min` elements have been added to the queue
/// * The queue is closed
/// * The current task is canceled
///
@ -1971,16 +1984,16 @@ pub fn Queue(Elem: type) type {
///
/// If the queue is closed or the task is canceled, but some items were
/// already added before the closure or cancelation, then `put` may
/// return a number lower than `target`, in which case future calls are
/// return a number lower than `min`, in which case future calls are
/// guaranteed to return `error.Canceled` or `error.Closed`.
///
/// A return value of 0 is only possible if `target` is 0, in which case
/// A return value of 0 is only possible if `min` is 0, in which case
/// the call is guaranteed to queue as many of `elements` as is possible
/// *without* blocking.
///
/// Asserts that `elements.len >= target`.
pub fn put(q: *@This(), io: Io, elements: []const Elem, target: usize) (QueueClosedError || Cancelable)!usize {
return @divExact(try q.type_erased.put(io, @ptrCast(elements), target * @sizeOf(Elem)), @sizeOf(Elem));
/// Asserts that `elements.len >= min`.
pub fn put(q: *@This(), io: Io, elements: []const Elem, min: usize) (QueueClosedError || Cancelable)!usize {
return @divExact(try q.type_erased.put(io, @ptrCast(elements), min * @sizeOf(Elem)), @sizeOf(Elem));
}
/// Same as `put` but blocks until all elements have been added to the queue.
@ -2019,7 +2032,7 @@ pub fn Queue(Elem: type) type {
/// if there are insufficient elements currently in the queue. Returns when
/// any one of the following conditions is satisfied:
///
/// * At least `target` elements have been received from the queue
/// * At least `min` elements have been received from the queue
/// * The queue is closed and contains no buffered elements
/// * The current task is canceled
///
@ -2028,16 +2041,16 @@ pub fn Queue(Elem: type) type {
///
/// If the queue is closed or the task is canceled, but some items were
/// already received before the closure or cancelation, then `get` may
/// return a number lower than `target`, in which case future calls are
/// return a number lower than `min`, in which case future calls are
/// guaranteed to return `error.Canceled` or `error.Closed`.
///
/// A return value of 0 is only possible if `target` is 0, in which case
/// A return value of 0 is only possible if `min` is 0, in which case
/// the call is guaranteed to fill as much of `buffer` as is possible
/// *without* blocking.
///
/// Asserts that `buffer.len >= target`.
pub fn get(q: *@This(), io: Io, buffer: []Elem, target: usize) (QueueClosedError || Cancelable)!usize {
return @divExact(try q.type_erased.get(io, @ptrCast(buffer), target * @sizeOf(Elem)), @sizeOf(Elem));
/// Asserts that `buffer.len >= min`.
pub fn get(q: *@This(), io: Io, buffer: []Elem, min: usize) (QueueClosedError || Cancelable)!usize {
return @divExact(try q.type_erased.get(io, @ptrCast(buffer), min * @sizeOf(Elem)), @sizeOf(Elem));
}
/// Same as `get`, except does not introduce a cancelation point.

View file

@ -810,3 +810,41 @@ test "Event broadcast" {
try ctx.run();
}
test "Select" {
const S = struct {
fn foo() bool {
return true;
}
fn bar(io: Io) Io.Cancelable!void {
try io.sleep(.fromSeconds(300), .awake);
}
};
const io = testing.io;
const U = union(enum) {
foo: bool,
bar: Io.Cancelable!void,
};
var buffer: [4]U = undefined;
var select: Io.Select(U) = .init(io, &buffer);
defer select.cancel();
select.async(.foo, S.foo, .{});
select.concurrent(.bar, S.bar, .{io}) catch |err| switch (err) {
error.ConcurrencyUnavailable => return error.SkipZigTest,
};
switch (try select.await()) {
.foo => {},
.bar => return error.TestFailed, // should be sleeping
}
select.async(.foo, S.foo, .{});
select.async(.foo, S.foo, .{});
var finished_buffer: [3]U = undefined;
const finished = finished_buffer[0..try select.awaitMany(&finished_buffer, 2)];
try testing.expectEqualSlices(U, &.{ .{ .foo = true }, .{ .foo = true } }, finished);
}