mirror of
https://codeberg.org/ziglang/zig.git
synced 2026-03-08 01:04:43 +01:00
std.Io: remove select function
This function works with a slice of futures and returns the index of a completed one. This doesn't work very well in practice because it's either too high level or too low level. At the lower level we have Io.Batch for doing this kind of thing at the Operation API layer. At the higher level we have Io.Select which is a convenience wrapper around an Io.Group and an Io.Queue.
This commit is contained in:
parent
c6eeae8a8c
commit
d9fc7fa04d
6 changed files with 2 additions and 273 deletions
|
|
@ -144,10 +144,6 @@ pub const VTable = struct {
|
|||
swapCancelProtection: *const fn (?*anyopaque, new: CancelProtection) CancelProtection,
|
||||
checkCancel: *const fn (?*anyopaque) Cancelable!void,
|
||||
|
||||
/// Blocks until one of the futures from the list has a result ready, such
|
||||
/// that awaiting it will not block. Returns that index.
|
||||
select: *const fn (?*anyopaque, futures: []const *AnyFuture) Cancelable!usize,
|
||||
|
||||
futexWait: *const fn (?*anyopaque, ptr: *const u32, expected: u32, Timeout) Cancelable!void,
|
||||
futexWaitUncancelable: *const fn (?*anyopaque, ptr: *const u32, expected: u32) void,
|
||||
futexWake: *const fn (?*anyopaque, ptr: *const u32, max_waiters: u32) void,
|
||||
|
|
@ -2120,40 +2116,6 @@ pub fn sleep(io: Io, duration: Duration, clock: Clock) Cancelable!void {
|
|||
} });
|
||||
}
|
||||
|
||||
/// Given a struct with each field a `*Future`, returns a union with the same
|
||||
/// fields, each field type the future's result.
|
||||
pub fn SelectUnion(S: type) type {
|
||||
const struct_fields = @typeInfo(S).@"struct".fields;
|
||||
var names: [struct_fields.len][]const u8 = undefined;
|
||||
var types: [struct_fields.len]type = undefined;
|
||||
for (struct_fields, &names, &types) |struct_field, *union_field_name, *UnionFieldType| {
|
||||
const FieldFuture = @typeInfo(struct_field.type).pointer.child;
|
||||
union_field_name.* = struct_field.name;
|
||||
UnionFieldType.* = @FieldType(FieldFuture, "result");
|
||||
}
|
||||
return @Union(.auto, std.meta.FieldEnum(S), &names, &types, &@splat(.{}));
|
||||
}
|
||||
|
||||
/// `s` is a struct with every field a `*Future(T)`, where `T` can be any type,
|
||||
/// and can be different for each field.
|
||||
pub fn select(io: Io, s: anytype) Cancelable!SelectUnion(@TypeOf(s)) {
|
||||
const U = SelectUnion(@TypeOf(s));
|
||||
const S = @TypeOf(s);
|
||||
const fields = @typeInfo(S).@"struct".fields;
|
||||
var futures: [fields.len]*AnyFuture = undefined;
|
||||
inline for (fields, &futures) |field, *any_future| {
|
||||
const future = @field(s, field.name);
|
||||
any_future.* = future.any_future orelse return @unionInit(U, field.name, future.result);
|
||||
}
|
||||
switch (try io.vtable.select(io.userdata, &futures)) {
|
||||
inline 0...(fields.len - 1) => |selected_index| {
|
||||
const field_name = fields[selected_index].name;
|
||||
return @unionInit(U, field_name, @field(s, field_name).await(io));
|
||||
},
|
||||
else => unreachable,
|
||||
}
|
||||
}
|
||||
|
||||
pub const LockedStderr = struct {
|
||||
file_writer: *File.Writer,
|
||||
terminal_mode: Terminal.Mode,
|
||||
|
|
|
|||
|
|
@ -123,7 +123,6 @@ const Fiber = struct {
|
|||
const Awaiting = enum(@Int(.unsigned, @bitSizeOf(usize) - shift)) {
|
||||
nothing = 0,
|
||||
group = 1,
|
||||
select = 2,
|
||||
_,
|
||||
|
||||
const shift = 1;
|
||||
|
|
@ -277,9 +276,6 @@ const Fiber = struct {
|
|||
ev.queue.async(fiber, &Fiber.@"resume");
|
||||
}
|
||||
},
|
||||
.select => if (@atomicRmw(i32, &fiber.await_count, .Add, 1, .monotonic) == -1) {
|
||||
ev.queue.async(fiber, &Fiber.@"resume");
|
||||
},
|
||||
_ => |awaiting| awaiting.toCancelable().async(),
|
||||
}
|
||||
}
|
||||
|
|
@ -370,8 +366,6 @@ pub fn io(ev: *Evented) Io {
|
|||
.swapCancelProtection = swapCancelProtection,
|
||||
.checkCancel = checkCancel,
|
||||
|
||||
.select = select,
|
||||
|
||||
.futexWait = futexWait,
|
||||
.futexWaitUncancelable = futexWaitUncancelable,
|
||||
.futexWake = futexWake,
|
||||
|
|
@ -1689,50 +1683,6 @@ fn futexForAddress(ev: *Evented, address: usize) *Futex {
|
|||
return &ev.futexes[hashed >> @clz(ev.futexes.len - 1)];
|
||||
}
|
||||
|
||||
fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) Io.Cancelable!usize {
|
||||
const ev: *Evented = @ptrCast(@alignCast(userdata));
|
||||
const fiber = Thread.current().currentFiber();
|
||||
var await_count: u31, var result = for (futures, 0..) |future, future_index| {
|
||||
const future_fiber: *Fiber = @ptrCast(@alignCast(future));
|
||||
if (@atomicRmw(
|
||||
?*Fiber,
|
||||
&future_fiber.link.awaiter,
|
||||
.Xchg,
|
||||
fiber,
|
||||
.acq_rel,
|
||||
)) |awaiter| {
|
||||
assert(awaiter == Fiber.finished);
|
||||
break .{ @intCast(future_index), future_index };
|
||||
}
|
||||
} else result: {
|
||||
const await_count: u31 = @intCast(futures.len);
|
||||
ev.yield(.{ .await = 1 });
|
||||
break :result .{ await_count - 1, futures.len };
|
||||
};
|
||||
for (futures[0..result], 0..) |future, future_index| {
|
||||
const future_fiber: *Fiber = @ptrCast(@alignCast(future));
|
||||
const awaiter = @atomicRmw(?*Fiber, &future_fiber.link.awaiter, .Xchg, null, .monotonic);
|
||||
if (awaiter == Fiber.finished) {
|
||||
@atomicStore(?*Fiber, &future_fiber.link.awaiter, Fiber.finished, .monotonic);
|
||||
result = @min(future_index, result);
|
||||
} else {
|
||||
assert(awaiter == fiber);
|
||||
await_count -= 1;
|
||||
}
|
||||
}
|
||||
// Equivalent to `ev.yield(null, .{ .await = await_count });`,
|
||||
// but avoiding a context switch in the common case.
|
||||
switch (std.math.order(
|
||||
@atomicRmw(i32, &fiber.await_count, .Sub, await_count, .monotonic),
|
||||
await_count,
|
||||
)) {
|
||||
.lt => ev.yield(.{ .await = 0 }),
|
||||
.eq => {},
|
||||
.gt => unreachable,
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
fn futexWait(
|
||||
userdata: ?*anyopaque,
|
||||
ptr: *const u32,
|
||||
|
|
|
|||
|
|
@ -491,7 +491,6 @@ const SwitchMessage = struct {
|
|||
reschedule,
|
||||
recycle: *Fiber,
|
||||
register_awaiter: *?*Fiber,
|
||||
register_select: []const *Io.AnyFuture,
|
||||
exit,
|
||||
};
|
||||
|
||||
|
|
@ -514,19 +513,6 @@ const SwitchMessage = struct {
|
|||
if (@atomicRmw(?*Fiber, awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished)
|
||||
k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
|
||||
},
|
||||
.register_select => |futures| {
|
||||
const prev_fiber: *Fiber = @alignCast(@fieldParentPtr("context", message.contexts.old));
|
||||
assert(prev_fiber.queue_next == null);
|
||||
for (futures) |any_future| {
|
||||
const future_fiber: *Fiber = @ptrCast(@alignCast(any_future));
|
||||
if (@atomicRmw(?*Fiber, &future_fiber.awaiter, .Xchg, prev_fiber, .acq_rel) == Fiber.finished) {
|
||||
const closure: *AsyncClosure = .fromFiber(future_fiber);
|
||||
if (!@atomicRmw(bool, &closure.already_awaited, .Xchg, true, .seq_cst)) {
|
||||
k.schedule(thread, .{ .head = prev_fiber, .tail = prev_fiber });
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
.exit => for (k.threads.allocated[0..@atomicLoad(u32, &k.threads.active, .acquire)]) |*each_thread| {
|
||||
const changes = [_]posix.Kevent{
|
||||
.{
|
||||
|
|
@ -628,7 +614,6 @@ pub fn io(k: *Kqueue) Io {
|
|||
.concurrent = concurrent,
|
||||
.await = await,
|
||||
.cancel = cancel,
|
||||
.select = select,
|
||||
|
||||
.groupAsync = groupAsync,
|
||||
.groupConcurrent = groupConcurrent,
|
||||
|
|
@ -824,13 +809,6 @@ fn groupCancel(userdata: ?*anyopaque, group: *Io.Group, token: *anyopaque) void
|
|||
@panic("TODO");
|
||||
}
|
||||
|
||||
fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) Io.Cancelable!usize {
|
||||
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
||||
_ = k;
|
||||
_ = futures;
|
||||
@panic("TODO");
|
||||
}
|
||||
|
||||
fn dirCreateDir(userdata: ?*anyopaque, dir: Dir, sub_path: []const u8, permissions: Dir.Permissions) Dir.CreateDirError!void {
|
||||
const k: *Kqueue = @ptrCast(@alignCast(userdata));
|
||||
_ = k;
|
||||
|
|
|
|||
|
|
@ -1772,7 +1772,6 @@ pub fn io(t: *Threaded) Io {
|
|||
.concurrent = concurrent,
|
||||
.await = await,
|
||||
.cancel = cancel,
|
||||
.select = select,
|
||||
|
||||
.groupAsync = groupAsync,
|
||||
.groupConcurrent = groupConcurrent,
|
||||
|
|
@ -1938,7 +1937,6 @@ pub fn ioBasic(t: *Threaded) Io {
|
|||
.concurrent = concurrent,
|
||||
.await = await,
|
||||
.cancel = cancel,
|
||||
.select = select,
|
||||
|
||||
.groupAsync = groupAsync,
|
||||
.groupConcurrent = groupConcurrent,
|
||||
|
|
@ -11727,74 +11725,6 @@ fn sleepNanosleep(t: *Threaded, timeout: Io.Timeout) Io.Cancelable!void {
|
|||
}
|
||||
}
|
||||
|
||||
fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) Io.Cancelable!usize {
|
||||
const t: *Threaded = @ptrCast(@alignCast(userdata));
|
||||
_ = t;
|
||||
|
||||
var num_completed: std.atomic.Value(u32) = .init(0);
|
||||
|
||||
for (futures, 0..) |any_future, i| {
|
||||
const future: *Future = @ptrCast(@alignCast(any_future));
|
||||
future.awaiter = &num_completed;
|
||||
const old_status = future.status.fetchOr(
|
||||
.{ .tag = .pending_awaited, .thread = .null },
|
||||
.release, // release `future.awaiter`
|
||||
);
|
||||
switch (old_status.tag) {
|
||||
.pending => {},
|
||||
.pending_awaited => unreachable, // `await` raced with `select`
|
||||
.pending_canceled => unreachable, // `cancel` raced with `select`
|
||||
.done => {
|
||||
future.status.store(old_status, .monotonic);
|
||||
_ = finishSelect(&num_completed, futures[0..i]);
|
||||
return i;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
errdefer _ = finishSelect(&num_completed, futures);
|
||||
|
||||
while (true) {
|
||||
const n = num_completed.load(.acquire);
|
||||
if (n > 0) break;
|
||||
assert(n < futures.len);
|
||||
try Thread.futexWait(&num_completed.raw, n, null);
|
||||
}
|
||||
return finishSelect(&num_completed, futures).?;
|
||||
}
|
||||
fn finishSelect(
|
||||
num_completed: *std.atomic.Value(u32),
|
||||
futures: []const *Io.AnyFuture,
|
||||
) ?usize {
|
||||
var completed_index: ?usize = null;
|
||||
var expect_completed: u32 = 0;
|
||||
for (futures, 0..) |any_future, i| {
|
||||
const future: *Future = @ptrCast(@alignCast(any_future));
|
||||
// This operation will convert `.pending_awaited` to `.pending`, or leave `.done` untouched.
|
||||
switch (future.status.fetchAnd(
|
||||
.{ .tag = @enumFromInt(0b10), .thread = .all_ones },
|
||||
.monotonic,
|
||||
).tag) {
|
||||
.pending_awaited => {},
|
||||
.pending => unreachable,
|
||||
.pending_canceled => unreachable,
|
||||
.done => {
|
||||
expect_completed += 1;
|
||||
completed_index = i;
|
||||
},
|
||||
}
|
||||
}
|
||||
// If any future has just finished, wait for it to signal `num_completed` to avoid dangling
|
||||
// references to stack memory.
|
||||
while (true) {
|
||||
const n = num_completed.load(.acquire);
|
||||
if (n == expect_completed) break;
|
||||
assert(n < expect_completed);
|
||||
Thread.futexWaitUncancelable(&num_completed.raw, n, null);
|
||||
}
|
||||
return completed_index;
|
||||
}
|
||||
|
||||
fn netListenIpPosix(
|
||||
userdata: ?*anyopaque,
|
||||
address: IpAddress,
|
||||
|
|
|
|||
|
|
@ -175,7 +175,6 @@ const Fiber = struct {
|
|||
const Awaiting = enum(u31) {
|
||||
nothing = std.math.maxInt(u31),
|
||||
group = std.math.maxInt(u31) - 1,
|
||||
select = std.math.maxInt(u31) - 2,
|
||||
/// An io_uring fd.
|
||||
_,
|
||||
|
||||
|
|
@ -186,14 +185,14 @@ const Fiber = struct {
|
|||
fn fromIoUringFd(fd: fd_t) Awaiting {
|
||||
const awaiting: Awaiting = @enumFromInt(fd);
|
||||
switch (awaiting) {
|
||||
.nothing, .group, .select => unreachable,
|
||||
.nothing, .group => unreachable,
|
||||
_ => return awaiting,
|
||||
}
|
||||
}
|
||||
|
||||
fn toIoUringFd(awaiting: Awaiting) fd_t {
|
||||
switch (awaiting) {
|
||||
.nothing, .group, .select => unreachable,
|
||||
.nothing, .group => unreachable,
|
||||
_ => return @intFromEnum(awaiting),
|
||||
}
|
||||
}
|
||||
|
|
@ -376,9 +375,6 @@ const Fiber = struct {
|
|||
_ = ev.schedule(.current(), .{ .head = fiber, .tail = fiber });
|
||||
}
|
||||
},
|
||||
.select => if (@atomicRmw(i32, &fiber.await_count, .Add, 1, .monotonic) == -1) {
|
||||
_ = ev.schedule(.current(), .{ .head = fiber, .tail = fiber });
|
||||
},
|
||||
_ => |awaiting| {
|
||||
const awaiting_io_uring_fd = awaiting.toIoUringFd();
|
||||
const thread: *Thread = .current();
|
||||
|
|
@ -684,8 +680,6 @@ pub fn io(ev: *Evented) Io {
|
|||
.swapCancelProtection = swapCancelProtection,
|
||||
.checkCancel = checkCancel,
|
||||
|
||||
.select = select,
|
||||
|
||||
.futexWait = futexWait,
|
||||
.futexWaitUncancelable = futexWaitUncancelable,
|
||||
.futexWake = futexWake,
|
||||
|
|
@ -1928,57 +1922,6 @@ fn checkCancel(userdata: ?*anyopaque) Io.Cancelable!void {
|
|||
}
|
||||
}
|
||||
|
||||
fn select(userdata: ?*anyopaque, futures: []const *Io.AnyFuture) Io.Cancelable!usize {
|
||||
const ev: *Evented = @ptrCast(@alignCast(userdata));
|
||||
var cancel_region: CancelRegion = .init();
|
||||
defer cancel_region.deinit();
|
||||
var await_count: u31, var result = for (futures, 0..) |future, future_index| {
|
||||
const future_fiber: *Fiber = @ptrCast(@alignCast(future));
|
||||
if (@atomicRmw(
|
||||
?*Fiber,
|
||||
&future_fiber.link.awaiter,
|
||||
.Xchg,
|
||||
cancel_region.fiber,
|
||||
.acq_rel,
|
||||
)) |awaiter| {
|
||||
assert(awaiter == Fiber.finished);
|
||||
break .{ @intCast(future_index), future_index };
|
||||
}
|
||||
} else result: {
|
||||
const await_count: u31 = @intCast(futures.len);
|
||||
cancel_region.await(.select) catch |err| switch (err) {
|
||||
error.Canceled => |e| break :result .{ await_count + 1, e },
|
||||
};
|
||||
ev.yield(null, .{ .await = 1 });
|
||||
cancel_region.await(.nothing) catch |err| switch (err) {
|
||||
error.Canceled => |e| break :result .{ await_count, e },
|
||||
};
|
||||
break :result .{ await_count - 1, futures.len };
|
||||
};
|
||||
for (futures[0 .. result catch futures.len], 0..) |future, future_index| {
|
||||
const future_fiber: *Fiber = @ptrCast(@alignCast(future));
|
||||
const awaiter = @atomicRmw(?*Fiber, &future_fiber.link.awaiter, .Xchg, null, .monotonic);
|
||||
if (awaiter == Fiber.finished) {
|
||||
@atomicStore(?*Fiber, &future_fiber.link.awaiter, Fiber.finished, .monotonic);
|
||||
result = if (result) |finished_index| @min(future_index, finished_index) else |e| e;
|
||||
} else {
|
||||
assert(awaiter == cancel_region.fiber);
|
||||
await_count -= 1;
|
||||
}
|
||||
}
|
||||
// Equivalent to `ev.yield(null, .{ .await = await_count });`,
|
||||
// but avoiding a context switch in the common case.
|
||||
switch (std.math.order(
|
||||
@atomicRmw(i32, &cancel_region.fiber.await_count, .Sub, await_count, .monotonic),
|
||||
await_count,
|
||||
)) {
|
||||
.lt => ev.yield(null, .{ .await = 0 }),
|
||||
.eq => {},
|
||||
.gt => unreachable,
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
fn futexWait(
|
||||
userdata: ?*anyopaque,
|
||||
ptr: *const u32,
|
||||
|
|
|
|||
|
|
@ -282,40 +282,6 @@ test "Group.concurrent" {
|
|||
try testing.expectEqualSlices(usize, &.{ 45, 245 }, &results);
|
||||
}
|
||||
|
||||
test "select" {
|
||||
const io = testing.io;
|
||||
|
||||
var queue: Io.Queue(u8) = .init(&.{});
|
||||
|
||||
var get_a = io.concurrent(Io.Queue(u8).getOne, .{ &queue, io }) catch |err| switch (err) {
|
||||
error.ConcurrencyUnavailable => {
|
||||
try testing.expect(builtin.single_threaded);
|
||||
return;
|
||||
},
|
||||
};
|
||||
defer _ = get_a.cancel(io) catch {};
|
||||
|
||||
var get_b = try io.concurrent(Io.Queue(u8).getOne, .{ &queue, io });
|
||||
defer _ = get_b.cancel(io) catch {};
|
||||
|
||||
var timeout = io.async(Io.sleep, .{ io, .fromMilliseconds(1), .awake });
|
||||
defer timeout.cancel(io) catch {};
|
||||
|
||||
switch (try io.select(.{
|
||||
.get_a = &get_a,
|
||||
.get_b = &get_b,
|
||||
.timeout = &timeout,
|
||||
})) {
|
||||
.get_a => return error.TestFailure,
|
||||
.get_b => return error.TestFailure,
|
||||
.timeout => {
|
||||
queue.close(io);
|
||||
try testing.expectError(error.Closed, get_a.await(io));
|
||||
try testing.expectError(error.Closed, get_b.await(io));
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn testQueue(comptime len: usize) !void {
|
||||
const io = testing.io;
|
||||
var buf: [len]usize = undefined;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue