From d9fc7fa04db797d7b27dab2d9d6f56f63848da76 Mon Sep 17 00:00:00 2001 From: Andrew Kelley Date: Sat, 14 Feb 2026 18:31:48 -0800 Subject: [PATCH] std.Io: remove select function This function works with a slice of futures and returns the index of a completed one. This doesn't work very well in practice because it's either too high level or too low level. At the lower level we have Io.Batch for doing this kind of thing at the Operation API layer. At the higher level we have Io.Select which is a convenience wrapper around an Io.Group and an Io.Queue. --- lib/std/Io.zig | 38 ---------------------- lib/std/Io/Dispatch.zig | 50 ----------------------------- lib/std/Io/Kqueue.zig | 22 ------------- lib/std/Io/Threaded.zig | 70 ----------------------------------------- lib/std/Io/Uring.zig | 61 ++--------------------------------- lib/std/Io/test.zig | 34 -------------------- 6 files changed, 2 insertions(+), 273 deletions(-) diff --git a/lib/std/Io.zig b/lib/std/Io.zig index daced32fa8..eb0d28dff3 100644 --- a/lib/std/Io.zig +++ b/lib/std/Io.zig @@ -144,10 +144,6 @@ pub const VTable = struct { swapCancelProtection: *const fn (?*anyopaque, new: CancelProtection) CancelProtection, checkCancel: *const fn (?*anyopaque) Cancelable!void, - /// Blocks until one of the futures from the list has a result ready, such - /// that awaiting it will not block. Returns that index. - select: *const fn (?*anyopaque, futures: []const *AnyFuture) Cancelable!usize, - futexWait: *const fn (?*anyopaque, ptr: *const u32, expected: u32, Timeout) Cancelable!void, futexWaitUncancelable: *const fn (?*anyopaque, ptr: *const u32, expected: u32) void, futexWake: *const fn (?*anyopaque, ptr: *const u32, max_waiters: u32) void, @@ -2120,40 +2116,6 @@ pub fn sleep(io: Io, duration: Duration, clock: Clock) Cancelable!void { } }); } -/// Given a struct with each field a `*Future`, returns a union with the same -/// fields, each field type the future's result. -pub fn SelectUnion(S: type) type { - const struct_fields = @typeInfo(S).@"struct".fields; - var names: [struct_fields.len][]const u8 = undefined; - var types: [struct_fields.len]type = undefined; - for (struct_fields, &names, &types) |struct_field, *union_field_name, *UnionFieldType| { - const FieldFuture = @typeInfo(struct_field.type).pointer.child; - union_field_name.* = struct_field.name; - UnionFieldType.* = @FieldType(FieldFuture, "result"); - } - return @Union(.auto, std.meta.FieldEnum(S), &names, &types, &@splat(.{})); -} - -/// `s` is a struct with every field a `*Future(T)`, where `T` can be any type, -/// and can be different for each field. -pub fn select(io: Io, s: anytype) Cancelable!SelectUnion(@TypeOf(s)) { - const U = SelectUnion(@TypeOf(s)); - const S = @TypeOf(s); - const fields = @typeInfo(S).@"struct".fields; - var futures: [fields.len]*AnyFuture = undefined; - inline for (fields, &futures) |field, *any_future| { - const future = @field(s, field.name); - any_future.* = future.any_future orelse return @unionInit(U, field.name, future.result); - } - switch (try io.vtable.select(io.userdata, &futures)) { - inline 0...(fields.len - 1) => |selected_index| { - const field_name = fields[selected_index].name; - return @unionInit(U, field_name, @field(s, field_name).await(io)); - }, - else => unreachable, - } -} - pub const LockedStderr = struct { file_writer: *File.Writer, terminal_mode: Terminal.Mode, diff --git a/lib/std/Io/Dispatch.zig b/lib/std/Io/Dispatch.zig index 959cb86421..6a6257a25a 100644 --- a/lib/std/Io/Dispatch.zig +++ b/lib/std/Io/Dispatch.zig @@ -123,7 +123,6 @@ const Fiber = struct { const Awaiting = enum(@Int(.unsigned, @bitSizeOf(usize) - shift)) { nothing = 0, group = 1, - select = 2, _, const shift = 1; @@ -277,9 +276,6 @@ const Fiber = struct { ev.queue.async(fiber, &Fiber.@"resume"); } }, - .select => if (@atomicRmw(i32, &fiber.await_count, .Add, 1, .monotonic) == -1) { - ev.queue.async(fiber, &Fiber.@"resume"); - }, _ => |awaiting| awaiting.toCancelable().async(), } } @@ -370,8 +366,6 @@ pub fn io(ev: *Evented) Io { .swapCancelProtection = swapCancelProtection, .checkCancel = checkCancel, - .select = select, - .futexWait = futexWait, .futexWaitUncancelable = futexWaitUncancelable, .futexWake = futexWake, @@ -1689,50 +1683,6 @@ fn futexForAddress(ev: *Evented, address: usize) *Futex { return &ev.futexes[hashed >> @clz(ev.futexes.len - 1)]; } -fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) Io.Cancelable!usize { - const ev: *Evented = @ptrCast(@alignCast(userdata)); - const fiber = Thread.current().currentFiber(); - var await_count: u31, var result = for (futures, 0..) |future, future_index| { - const future_fiber: *Fiber = @ptrCast(@alignCast(future)); - if (@atomicRmw( - ?*Fiber, - &future_fiber.link.awaiter, - .Xchg, - fiber, - .acq_rel, - )) |awaiter| { - assert(awaiter == Fiber.finished); - break .{ @intCast(future_index), future_index }; - } - } else result: { - const await_count: u31 = @intCast(futures.len); - ev.yield(.{ .await = 1 }); - break :result .{ await_count - 1, futures.len }; - }; - for (futures[0..result], 0..) |future, future_index| { - const future_fiber: *Fiber = @ptrCast(@alignCast(future)); - const awaiter = @atomicRmw(?*Fiber, &future_fiber.link.awaiter, .Xchg, null, .monotonic); - if (awaiter == Fiber.finished) { - @atomicStore(?*Fiber, &future_fiber.link.awaiter, Fiber.finished, .monotonic); - result = @min(future_index, result); - } else { - assert(awaiter == fiber); - await_count -= 1; - } - } - // Equivalent to `ev.yield(null, .{ .await = await_count });`, - // but avoiding a context switch in the common case. - switch (std.math.order( - @atomicRmw(i32, &fiber.await_count, .Sub, await_count, .monotonic), - await_count, - )) { - .lt => ev.yield(.{ .await = 0 }), - .eq => {}, - .gt => unreachable, - } - return result; -} - fn futexWait( userdata: ?*anyopaque, ptr: *const u32, diff --git a/lib/std/Io/Kqueue.zig b/lib/std/Io/Kqueue.zig index 863bb0c22a..e70a3b1274 100644 --- a/lib/std/Io/Kqueue.zig +++ b/lib/std/Io/Kqueue.zig @@ -491,7 +491,6 @@ const SwitchMessage = struct { reschedule, recycle: *Fiber, register_awaiter: *?*Fiber, - register_select: []const *Io.AnyFuture, exit, }; @@ -514,19 +513,6 @@ const SwitchMessage = struct { if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber }); }, - .register_select => |futures| { - const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.old)); - assert(prev_fiber.queue_next == null); - for (futures) |any_future| { - const future_fiber: *Fiber = @ptrCast(@alignCast(any_future)); - if (@atomicRmw(?*Fiber, &future_fiber.awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) { - const closure: *AsyncClosure = .fromFiber(future_fiber); - if (!@atomicRmw(bool, &closure.already_awaited, .Xchg, true, .seq_cst)) { - k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber }); - } - } - } - }, .exit => for (k.threads.allocated[0..@atomicLoad(u32, &k.threads.active, .acquire)]) |*each_thread| { const changes = [_]posix.Kevent{ .{ @@ -628,7 +614,6 @@ pub fn io(k: *Kqueue) Io { .concurrent = concurrent, .await = await, .cancel = cancel, - .select = select, .groupAsync = groupAsync, .groupConcurrent = groupConcurrent, @@ -824,13 +809,6 @@ fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void @panic("TODO"); } -fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) Io.Cancelable!usize { - const k: *Kqueue = @ptrCast(@alignCast(userdata)); - _ = k; - _ = futures; - @panic("TODO"); -} - fn dirCreateDir(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, permissions: Dir.Permissions) Dir.CreateDirError!void { const k: *Kqueue = @ptrCast(@alignCast(userdata)); _ = k; diff --git a/lib/std/Io/Threaded.zig b/lib/std/Io/Threaded.zig index 7902ee693b..485c35da57 100644 --- a/lib/std/Io/Threaded.zig +++ b/lib/std/Io/Threaded.zig @@ -1772,7 +1772,6 @@ pub fn io(t: *Threaded) Io { .concurrent = concurrent, .await = await, .cancel = cancel, - .select = select, .groupAsync = groupAsync, .groupConcurrent = groupConcurrent, @@ -1938,7 +1937,6 @@ pub fn ioBasic(t: *Threaded) Io { .concurrent = concurrent, .await = await, .cancel = cancel, - .select = select, .groupAsync = groupAsync, .groupConcurrent = groupConcurrent, @@ -11727,74 +11725,6 @@ fn sleepNanosleep(t: *Threaded, timeout: Io.Timeout) Io.Cancelable!void { } } -fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) Io.Cancelable!usize { - const t: *Threaded = @ptrCast(@alignCast(userdata)); - _ = t; - - var num_completed: std.atomic.Value(u32) = .init(0); - - for (futures, 0..) |any_future, i| { - const future: *Future = @ptrCast(@alignCast(any_future)); - future.awaiter = &num_completed; - const old_status = future.status.fetchOr( - .{ .tag = .pending_awaited, .thread = .null }, - .release, // release `future.awaiter` - ); - switch (old_status.tag) { - .pending => {}, - .pending_awaited => unreachable, // `await` raced with `select` - .pending_canceled => unreachable, // `cancel` raced with `select` - .done => { - future.status.store(old_status, .monotonic); - _ = finishSelect(&num_completed, futures[0..i]); - return i; - }, - } - } - - errdefer _ = finishSelect(&num_completed, futures); - - while (true) { - const n = num_completed.load(.acquire); - if (n > 0) break; - assert(n < futures.len); - try Thread.futexWait(&num_completed.raw, n, null); - } - return finishSelect(&num_completed, futures).?; -} -fn finishSelect( - num_completed: *std.atomic.Value(u32), - futures: []const *Io.AnyFuture, -) ?usize { - var completed_index: ?usize = null; - var expect_completed: u32 = 0; - for (futures, 0..) |any_future, i| { - const future: *Future = @ptrCast(@alignCast(any_future)); - // This operation will convert `.pending_awaited` to `.pending`, or leave `.done` untouched. - switch (future.status.fetchAnd( - .{ .tag = @enumFromInt(0b10), .thread = .all_ones }, - .monotonic, - ).tag) { - .pending_awaited => {}, - .pending => unreachable, - .pending_canceled => unreachable, - .done => { - expect_completed += 1; - completed_index = i; - }, - } - } - // If any future has just finished, wait for it to signal `num_completed` to avoid dangling - // references to stack memory. - while (true) { - const n = num_completed.load(.acquire); - if (n == expect_completed) break; - assert(n < expect_completed); - Thread.futexWaitUncancelable(&num_completed.raw, n, null); - } - return completed_index; -} - fn netListenIpPosix( userdata: ?*anyopaque, address: IpAddress, diff --git a/lib/std/Io/Uring.zig b/lib/std/Io/Uring.zig index e668fe0426..d204c23d01 100644 --- a/lib/std/Io/Uring.zig +++ b/lib/std/Io/Uring.zig @@ -175,7 +175,6 @@ const Fiber = struct { const Awaiting = enum(u31) { nothing = std.math.maxInt(u31), group = std.math.maxInt(u31) - 1, - select = std.math.maxInt(u31) - 2, /// An io_uring fd. _, @@ -186,14 +185,14 @@ const Fiber = struct { fn fromIoUringFd(fd: fd_t) Awaiting { const awaiting: Awaiting = @enumFromInt(fd); switch (awaiting) { - .nothing, .group, .select => unreachable, + .nothing, .group => unreachable, _ => return awaiting, } } fn toIoUringFd(awaiting: Awaiting) fd_t { switch (awaiting) { - .nothing, .group, .select => unreachable, + .nothing, .group => unreachable, _ => return @intFromEnum(awaiting), } } @@ -376,9 +375,6 @@ const Fiber = struct { _ = ev.schedule(.current(), .{ .head = fiber, .tail = fiber }); } }, - .select => if (@atomicRmw(i32, &fiber.await_count, .Add, 1, .monotonic) == -1) { - _ = ev.schedule(.current(), .{ .head = fiber, .tail = fiber }); - }, _ => |awaiting| { const awaiting_io_uring_fd = awaiting.toIoUringFd(); const thread: *Thread = .current(); @@ -684,8 +680,6 @@ pub fn io(ev: *Evented) Io { .swapCancelProtection = swapCancelProtection, .checkCancel = checkCancel, - .select = select, - .futexWait = futexWait, .futexWaitUncancelable = futexWaitUncancelable, .futexWake = futexWake, @@ -1928,57 +1922,6 @@ fn checkCancel(userdata: ?*anyopaque) Io.Cancelable!void { } } -fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) Io.Cancelable!usize { - const ev: *Evented = @ptrCast(@alignCast(userdata)); - var cancel_region: CancelRegion = .init(); - defer cancel_region.deinit(); - var await_count: u31, var result = for (futures, 0..) |future, future_index| { - const future_fiber: *Fiber = @ptrCast(@alignCast(future)); - if (@atomicRmw( - ?*Fiber, - &future_fiber.link.awaiter, - .Xchg, - cancel_region.fiber, - .acq_rel, - )) |awaiter| { - assert(awaiter == Fiber.finished); - break .{ @intCast(future_index), future_index }; - } - } else result: { - const await_count: u31 = @intCast(futures.len); - cancel_region.await(.select) catch |err| switch (err) { - error.Canceled => |e| break :result .{ await_count + 1, e }, - }; - ev.yield(null, .{ .await = 1 }); - cancel_region.await(.nothing) catch |err| switch (err) { - error.Canceled => |e| break :result .{ await_count, e }, - }; - break :result .{ await_count - 1, futures.len }; - }; - for (futures[0 .. result catch futures.len], 0..) |future, future_index| { - const future_fiber: *Fiber = @ptrCast(@alignCast(future)); - const awaiter = @atomicRmw(?*Fiber, &future_fiber.link.awaiter, .Xchg, null, .monotonic); - if (awaiter == Fiber.finished) { - @atomicStore(?*Fiber, &future_fiber.link.awaiter, Fiber.finished, .monotonic); - result = if (result) |finished_index| @min(future_index, finished_index) else |e| e; - } else { - assert(awaiter == cancel_region.fiber); - await_count -= 1; - } - } - // Equivalent to `ev.yield(null, .{ .await = await_count });`, - // but avoiding a context switch in the common case. - switch (std.math.order( - @atomicRmw(i32, &cancel_region.fiber.await_count, .Sub, await_count, .monotonic), - await_count, - )) { - .lt => ev.yield(null, .{ .await = 0 }), - .eq => {}, - .gt => unreachable, - } - return result; -} - fn futexWait( userdata: ?*anyopaque, ptr: *const u32, diff --git a/lib/std/Io/test.zig b/lib/std/Io/test.zig index 7b652a495e..d13cc802ce 100644 --- a/lib/std/Io/test.zig +++ b/lib/std/Io/test.zig @@ -282,40 +282,6 @@ test "Group.concurrent" { try testing.expectEqualSlices(usize, &.{ 45, 245 }, &results); } -test "select" { - const io = testing.io; - - var queue: Io.Queue(u8) = .init(&.{}); - - var get_a = io.concurrent(Io.Queue(u8).getOne, .{ &queue, io }) catch |err| switch (err) { - error.ConcurrencyUnavailable => { - try testing.expect(builtin.single_threaded); - return; - }, - }; - defer _ = get_a.cancel(io) catch {}; - - var get_b = try io.concurrent(Io.Queue(u8).getOne, .{ &queue, io }); - defer _ = get_b.cancel(io) catch {}; - - var timeout = io.async(Io.sleep, .{ io, .fromMilliseconds(1), .awake }); - defer timeout.cancel(io) catch {}; - - switch (try io.select(.{ - .get_a = &get_a, - .get_b = &get_b, - .timeout = &timeout, - })) { - .get_a => return error.TestFailure, - .get_b => return error.TestFailure, - .timeout => { - queue.close(io); - try testing.expectError(error.Closed, get_a.await(io)); - try testing.expectError(error.Closed, get_b.await(io)); - }, - } -} - fn testQueue(comptime len: usize) !void { const io = testing.io; var buf: [len]usize = undefined;