mirror of
https://codeberg.org/ziglang/zig.git
synced 2026-03-08 03:04:52 +01:00
std.Io: add RwLock and Semaphore sync primitives
and restore usage by std.debug.SelfInfo.Elf
This commit is contained in:
parent
550da1b676
commit
4c4e9d054e
5 changed files with 342 additions and 20 deletions
|
|
@ -47,6 +47,9 @@ pub const Dir = @import("Io/Dir.zig");
|
|||
pub const File = @import("Io/File.zig");
|
||||
pub const Terminal = @import("Io/Terminal.zig");
|
||||
|
||||
pub const RwLock = @import("Io/RwLock.zig");
|
||||
pub const Semaphore = @import("Io/Semaphore.zig");
|
||||
|
||||
pub const VTable = struct {
|
||||
/// If it returns `null` it means `result` has been already populated and
|
||||
/// `await` will be a no-op.
|
||||
|
|
@ -882,7 +885,7 @@ pub const Timeout = union(enum) {
|
|||
|
||||
pub const Error = error{ Timeout, UnsupportedClock };
|
||||
|
||||
pub fn toDeadline(t: Timeout, io: Io) Clock.Error!?Clock.Timestamp {
|
||||
pub fn toTimestamp(t: Timeout, io: Io) Clock.Error!?Clock.Timestamp {
|
||||
return switch (t) {
|
||||
.none => null,
|
||||
.duration => |d| try .fromNow(io, d),
|
||||
|
|
@ -890,6 +893,14 @@ pub const Timeout = union(enum) {
|
|||
};
|
||||
}
|
||||
|
||||
pub fn toDeadline(t: Timeout, io: Io) Timeout {
|
||||
return switch (t) {
|
||||
.none => .none,
|
||||
.duration => |d| .{ .deadline = Clock.Timestamp.fromNow(io, d) catch @panic("TODO") },
|
||||
.deadline => |d| .{ .deadline = d },
|
||||
};
|
||||
}
|
||||
|
||||
pub fn toDurationFromNow(t: Timeout, io: Io) Clock.Error!?Clock.Duration {
|
||||
return switch (t) {
|
||||
.none => null,
|
||||
|
|
@ -2153,5 +2164,7 @@ test {
|
|||
_ = Writer;
|
||||
_ = Evented;
|
||||
_ = Threaded;
|
||||
_ = RwLock;
|
||||
_ = Semaphore;
|
||||
_ = @import("Io/test.zig");
|
||||
}
|
||||
|
|
|
|||
238
lib/std/Io/RwLock.zig
Normal file
238
lib/std/Io/RwLock.zig
Normal file
|
|
@ -0,0 +1,238 @@
|
|||
//! A lock that supports one writer or many readers.
|
||||
const RwLock = @This();
|
||||
|
||||
const builtin = @import("builtin");
|
||||
|
||||
const std = @import("../std.zig");
|
||||
const Io = std.Io;
|
||||
const assert = std.debug.assert;
|
||||
const testing = std.testing;
|
||||
|
||||
state: usize,
|
||||
mutex: Io.Mutex,
|
||||
semaphore: Io.Semaphore,
|
||||
|
||||
pub const init: RwLock = .{
|
||||
.state = 0,
|
||||
.mutex = .init,
|
||||
.semaphore = .{},
|
||||
};
|
||||
|
||||
const is_writing: usize = 1;
|
||||
const writer: usize = 1 << 1;
|
||||
const reader: usize = 1 << (1 + @bitSizeOf(Count));
|
||||
const writer_mask: usize = std.math.maxInt(Count) << @ctz(writer);
|
||||
const reader_mask: usize = std.math.maxInt(Count) << @ctz(reader);
|
||||
const Count = @Int(.unsigned, @divFloor(@bitSizeOf(usize) - 1, 2));
|
||||
|
||||
pub fn tryLock(rl: *RwLock, io: Io) bool {
|
||||
if (rl.mutex.tryLock()) {
|
||||
const state = @atomicLoad(usize, &rl.state, .seq_cst);
|
||||
if (state & reader_mask == 0) {
|
||||
_ = @atomicRmw(usize, &rl.state, .Or, is_writing, .seq_cst);
|
||||
return true;
|
||||
}
|
||||
|
||||
rl.mutex.unlock(io);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
pub fn lockUncancelable(rl: *RwLock, io: Io) void {
|
||||
_ = @atomicRmw(usize, &rl.state, .Add, writer, .seq_cst);
|
||||
rl.mutex.lockUncancelable(io);
|
||||
|
||||
const state = @atomicRmw(usize, &rl.state, .Add, is_writing -% writer, .seq_cst);
|
||||
if (state & reader_mask != 0)
|
||||
rl.semaphore.waitUncancelable(io);
|
||||
}
|
||||
|
||||
pub fn unlock(rl: *RwLock, io: Io) void {
|
||||
_ = @atomicRmw(usize, &rl.state, .And, ~is_writing, .seq_cst);
|
||||
rl.mutex.unlock(io);
|
||||
}
|
||||
|
||||
pub fn tryLockShared(rl: *RwLock, io: Io) bool {
|
||||
const state = @atomicLoad(usize, &rl.state, .seq_cst);
|
||||
if (state & (is_writing | writer_mask) == 0) {
|
||||
_ = @cmpxchgStrong(
|
||||
usize,
|
||||
&rl.state,
|
||||
state,
|
||||
state + reader,
|
||||
.seq_cst,
|
||||
.seq_cst,
|
||||
) orelse return true;
|
||||
}
|
||||
|
||||
if (rl.mutex.tryLock()) {
|
||||
_ = @atomicRmw(usize, &rl.state, .Add, reader, .seq_cst);
|
||||
rl.mutex.unlock(io);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
pub fn lockSharedUncancelable(rl: *RwLock, io: Io) void {
|
||||
var state = @atomicLoad(usize, &rl.state, .seq_cst);
|
||||
while (state & (is_writing | writer_mask) == 0) {
|
||||
state = @cmpxchgWeak(
|
||||
usize,
|
||||
&rl.state,
|
||||
state,
|
||||
state + reader,
|
||||
.seq_cst,
|
||||
.seq_cst,
|
||||
) orelse return;
|
||||
}
|
||||
|
||||
rl.mutex.lockUncancelable(io);
|
||||
_ = @atomicRmw(usize, &rl.state, .Add, reader, .seq_cst);
|
||||
rl.mutex.unlock(io);
|
||||
}
|
||||
|
||||
pub fn unlockShared(rl: *RwLock, io: Io) void {
|
||||
const state = @atomicRmw(usize, &rl.state, .Sub, reader, .seq_cst);
|
||||
|
||||
if ((state & reader_mask == reader) and (state & is_writing != 0))
|
||||
rl.semaphore.post(io);
|
||||
}
|
||||
|
||||
test "internal state" {
|
||||
const io = testing.io;
|
||||
|
||||
var rl: Io.RwLock = .init;
|
||||
|
||||
// The following failed prior to the fix for Issue #13163,
|
||||
// where the WRITER flag was subtracted by the lock method.
|
||||
|
||||
rl.lockUncancelable(io);
|
||||
rl.unlock(io);
|
||||
try testing.expectEqual(rl, Io.RwLock.init);
|
||||
}
|
||||
|
||||
test "smoke test" {
|
||||
const io = testing.io;
|
||||
|
||||
var rl: Io.RwLock = .init;
|
||||
|
||||
rl.lockUncancelable(io);
|
||||
try testing.expect(!rl.tryLock(io));
|
||||
try testing.expect(!rl.tryLockShared(io));
|
||||
rl.unlock(io);
|
||||
|
||||
try testing.expect(rl.tryLock(io));
|
||||
try testing.expect(!rl.tryLock(io));
|
||||
try testing.expect(!rl.tryLockShared(io));
|
||||
rl.unlock(io);
|
||||
|
||||
rl.lockSharedUncancelable(io);
|
||||
try testing.expect(!rl.tryLock(io));
|
||||
try testing.expect(rl.tryLockShared(io));
|
||||
rl.unlockShared(io);
|
||||
rl.unlockShared(io);
|
||||
|
||||
try testing.expect(rl.tryLockShared(io));
|
||||
try testing.expect(!rl.tryLock(io));
|
||||
try testing.expect(rl.tryLockShared(io));
|
||||
rl.unlockShared(io);
|
||||
rl.unlockShared(io);
|
||||
|
||||
rl.lockUncancelable(io);
|
||||
rl.unlock(io);
|
||||
}
|
||||
|
||||
test "concurrent access" {
|
||||
if (builtin.single_threaded) return;
|
||||
|
||||
const io = testing.io;
|
||||
const num_writers: usize = 2;
|
||||
const num_readers: usize = 4;
|
||||
const num_writes: usize = 1000;
|
||||
const num_reads: usize = 2000;
|
||||
|
||||
const Runner = struct {
|
||||
const Runner = @This();
|
||||
|
||||
io: Io,
|
||||
|
||||
rl: Io.RwLock,
|
||||
writes: usize,
|
||||
reads: std.atomic.Value(usize),
|
||||
|
||||
val_a: usize,
|
||||
val_b: usize,
|
||||
|
||||
fn reader(run: *Runner, thread_idx: usize) !void {
|
||||
var prng = std.Random.DefaultPrng.init(thread_idx);
|
||||
const rnd = prng.random();
|
||||
while (true) {
|
||||
run.rl.lockSharedUncancelable(run.io);
|
||||
defer run.rl.unlockShared(run.io);
|
||||
|
||||
try testing.expect(run.writes <= num_writes);
|
||||
if (run.reads.fetchAdd(1, .monotonic) >= num_reads) break;
|
||||
|
||||
// We use `volatile` accesses so that we can make sure the memory is accessed either
|
||||
// side of a yield, maximising chances of a race.
|
||||
const a_ptr: *const volatile usize = &run.val_a;
|
||||
const b_ptr: *const volatile usize = &run.val_b;
|
||||
|
||||
const old_a = a_ptr.*;
|
||||
if (rnd.boolean()) try std.Thread.yield();
|
||||
const old_b = b_ptr.*;
|
||||
try testing.expect(old_a == old_b);
|
||||
}
|
||||
}
|
||||
|
||||
fn writer(run: *Runner, thread_idx: usize) !void {
|
||||
var prng = std.Random.DefaultPrng.init(thread_idx);
|
||||
const rnd = prng.random();
|
||||
while (true) {
|
||||
run.rl.lockUncancelable(run.io);
|
||||
defer run.rl.unlock(run.io);
|
||||
|
||||
try testing.expect(run.writes <= num_writes);
|
||||
if (run.writes == num_writes) break;
|
||||
|
||||
// We use `volatile` accesses so that we can make sure the memory is accessed either
|
||||
// side of a yield, maximising chances of a race.
|
||||
const a_ptr: *volatile usize = &run.val_a;
|
||||
const b_ptr: *volatile usize = &run.val_b;
|
||||
|
||||
const new_val = rnd.int(usize);
|
||||
|
||||
const old_a = a_ptr.*;
|
||||
a_ptr.* = new_val;
|
||||
if (rnd.boolean()) try std.Thread.yield();
|
||||
const old_b = b_ptr.*;
|
||||
b_ptr.* = new_val;
|
||||
try testing.expect(old_a == old_b);
|
||||
|
||||
run.writes += 1;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
var run: Runner = .{
|
||||
.io = io,
|
||||
.rl = .init,
|
||||
.writes = 0,
|
||||
.reads = .init(0),
|
||||
.val_a = 0,
|
||||
.val_b = 0,
|
||||
};
|
||||
var write_threads: [num_writers]std.Thread = undefined;
|
||||
var read_threads: [num_readers]std.Thread = undefined;
|
||||
|
||||
for (&write_threads, 0..) |*t, i| t.* = try .spawn(.{}, Runner.writer, .{ &run, i });
|
||||
for (&read_threads, num_writers..) |*t, i| t.* = try .spawn(.{}, Runner.reader, .{ &run, i });
|
||||
|
||||
for (write_threads) |t| t.join();
|
||||
for (read_threads) |t| t.join();
|
||||
|
||||
try testing.expect(run.writes == num_writes);
|
||||
try testing.expect(run.reads.raw >= num_reads);
|
||||
}
|
||||
65
lib/std/Io/Semaphore.zig
Normal file
65
lib/std/Io/Semaphore.zig
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
//! An unsigned integer that blocks the kernel thread if the number would
|
||||
//! become negative.
|
||||
//!
|
||||
//! This API supports static initialization and does not require deinitialization.
|
||||
const Semaphore = @This();
|
||||
|
||||
const builtin = @import("builtin");
|
||||
|
||||
const std = @import("../std.zig");
|
||||
const Io = std.Io;
|
||||
const testing = std.testing;
|
||||
|
||||
mutex: Io.Mutex = .init,
|
||||
cond: Io.Condition = .init,
|
||||
/// It is OK to initialize this field to any value.
|
||||
permits: usize = 0,
|
||||
|
||||
pub fn wait(s: *Semaphore, io: Io) Io.Cancelable!void {
|
||||
try s.mutex.lock(io);
|
||||
defer s.mutex.unlock(io);
|
||||
while (s.permits == 0) try s.cond.wait(io, &s.mutex);
|
||||
s.permits -= 1;
|
||||
if (s.permits > 0) s.cond.signal(io);
|
||||
}
|
||||
|
||||
pub fn waitUncancelable(s: *Semaphore, io: Io) void {
|
||||
s.mutex.lockUncancelable(io);
|
||||
defer s.mutex.unlock(io);
|
||||
while (s.permits == 0) s.cond.waitUncancelable(io, &s.mutex);
|
||||
s.permits -= 1;
|
||||
if (s.permits > 0) s.cond.signal(io);
|
||||
}
|
||||
|
||||
pub fn post(s: *Semaphore, io: Io) void {
|
||||
s.mutex.lockUncancelable(io);
|
||||
defer s.mutex.unlock(io);
|
||||
|
||||
s.permits += 1;
|
||||
s.cond.signal(io);
|
||||
}
|
||||
|
||||
test Semaphore {
|
||||
if (builtin.single_threaded) return error.SkipZigTest;
|
||||
const io = testing.io;
|
||||
|
||||
const TestContext = struct {
|
||||
sem: *Semaphore,
|
||||
n: *i32,
|
||||
fn worker(ctx: *@This()) !void {
|
||||
try ctx.sem.wait(io);
|
||||
ctx.n.* += 1;
|
||||
ctx.sem.post(io);
|
||||
}
|
||||
};
|
||||
const num_threads = 3;
|
||||
var sem: Semaphore = .{ .permits = 1 };
|
||||
var threads: [num_threads]std.Thread = undefined;
|
||||
var n: i32 = 0;
|
||||
var ctx = TestContext{ .sem = &sem, .n = &n };
|
||||
|
||||
for (&threads) |*t| t.* = try std.Thread.spawn(.{}, TestContext.worker, .{&ctx});
|
||||
for (threads) |t| t.join();
|
||||
try sem.wait(io);
|
||||
try testing.expect(n == num_threads);
|
||||
}
|
||||
|
|
@ -2655,7 +2655,7 @@ fn batchAwaitAsync(userdata: ?*anyopaque, b: *Io.Batch) Io.Cancelable!void {
|
|||
fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout) Io.Batch.AwaitConcurrentError!void {
|
||||
const t: *Threaded = @ptrCast(@alignCast(userdata));
|
||||
if (is_windows) {
|
||||
const deadline: ?Io.Clock.Timestamp = timeout.toDeadline(ioBasic(t)) catch |err| switch (err) {
|
||||
const deadline: ?Io.Clock.Timestamp = timeout.toTimestamp(ioBasic(t)) catch |err| switch (err) {
|
||||
error.Unexpected => deadline: {
|
||||
recoverableOsBugDetected();
|
||||
break :deadline .{ .raw = .{ .nanoseconds = 0 }, .clock = .awake };
|
||||
|
|
@ -2754,7 +2754,7 @@ fn batchAwaitConcurrent(userdata: ?*anyopaque, b: *Io.Batch, timeout: Io.Timeout
|
|||
else => {},
|
||||
}
|
||||
const t_io = ioBasic(t);
|
||||
const deadline = timeout.toDeadline(t_io) catch return error.UnsupportedClock;
|
||||
const deadline = timeout.toTimestamp(t_io) catch return error.UnsupportedClock;
|
||||
while (true) {
|
||||
const timeout_ms: i32 = t: {
|
||||
if (b.completions.head != .none) {
|
||||
|
|
@ -10918,7 +10918,7 @@ fn nowWasi(clock: Io.Clock) Io.Clock.Error!Io.Timestamp {
|
|||
fn sleep(userdata: ?*anyopaque, timeout: Io.Timeout) Io.SleepError!void {
|
||||
const t: *Threaded = @ptrCast(@alignCast(userdata));
|
||||
if (timeout == .none) return;
|
||||
if (use_parking_sleep) return parking_sleep.sleep(try timeout.toDeadline(ioBasic(t)));
|
||||
if (use_parking_sleep) return parking_sleep.sleep(try timeout.toTimestamp(ioBasic(t)));
|
||||
if (native_os == .wasi) return sleepWasi(t, timeout);
|
||||
if (@TypeOf(posix.system.clock_nanosleep) != void) return sleepPosix(timeout);
|
||||
return sleepNanosleep(t, timeout);
|
||||
|
|
@ -12630,7 +12630,7 @@ fn netReceivePosix(
|
|||
var message_i: usize = 0;
|
||||
var data_i: usize = 0;
|
||||
|
||||
const deadline = timeout.toDeadline(t_io) catch |err| return .{ err, message_i };
|
||||
const deadline = timeout.toTimestamp(t_io) catch |err| return .{ err, message_i };
|
||||
|
||||
recv: while (true) {
|
||||
if (message_buffer.len - message_i == 0) return .{ null, message_i };
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
mutex: Io.Mutex,
|
||||
rwlock: Io.RwLock,
|
||||
|
||||
modules: std.ArrayList(Module),
|
||||
ranges: std.ArrayList(Module.Range),
|
||||
|
|
@ -6,7 +6,7 @@ ranges: std.ArrayList(Module.Range),
|
|||
unwind_cache: if (can_unwind) ?[]Dwarf.SelfUnwinder.CacheEntry else ?noreturn,
|
||||
|
||||
pub const init: SelfInfo = .{
|
||||
.mutex = .init,
|
||||
.rwlock = .init,
|
||||
.modules = .empty,
|
||||
.ranges = .empty,
|
||||
.unwind_cache = null,
|
||||
|
|
@ -30,7 +30,7 @@ pub fn deinit(si: *SelfInfo, gpa: Allocator) void {
|
|||
|
||||
pub fn getSymbol(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!std.debug.Symbol {
|
||||
const module = try si.findModule(gpa, io, address, .exclusive);
|
||||
defer si.mutex.unlock(io);
|
||||
defer si.rwlock.unlock(io);
|
||||
|
||||
const vaddr = address - module.load_offset;
|
||||
|
||||
|
|
@ -75,13 +75,13 @@ pub fn getSymbol(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!st
|
|||
}
|
||||
pub fn getModuleName(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error![]const u8 {
|
||||
const module = try si.findModule(gpa, io, address, .shared);
|
||||
defer si.mutex.unlock(io);
|
||||
defer si.rwlock.unlockShared(io);
|
||||
if (module.name.len == 0) return error.MissingDebugInfo;
|
||||
return module.name;
|
||||
}
|
||||
pub fn getModuleSlide(si: *SelfInfo, gpa: Allocator, io: Io, address: usize) Error!usize {
|
||||
const module = try si.findModule(gpa, io, address, .shared);
|
||||
defer si.mutex.unlock(io);
|
||||
defer si.rwlock.unlockShared(io);
|
||||
return module.load_offset;
|
||||
}
|
||||
|
||||
|
|
@ -183,8 +183,8 @@ pub fn unwindFrame(si: *SelfInfo, gpa: Allocator, io: Io, context: *UnwindContex
|
|||
comptime assert(can_unwind);
|
||||
|
||||
{
|
||||
try si.mutex.lock(io);
|
||||
defer si.mutex.unlock(io);
|
||||
si.rwlock.lockSharedUncancelable(io);
|
||||
defer si.rwlock.unlockShared(io);
|
||||
if (si.unwind_cache) |cache| {
|
||||
if (Dwarf.SelfUnwinder.CacheEntry.find(cache, context.pc)) |entry| {
|
||||
return context.next(gpa, entry);
|
||||
|
|
@ -193,7 +193,7 @@ pub fn unwindFrame(si: *SelfInfo, gpa: Allocator, io: Io, context: *UnwindContex
|
|||
}
|
||||
|
||||
const module = try si.findModule(gpa, io, context.pc, .exclusive);
|
||||
defer si.mutex.unlock(io);
|
||||
defer si.rwlock.unlock(io);
|
||||
|
||||
if (si.unwind_cache == null) {
|
||||
si.unwind_cache = try gpa.alloc(Dwarf.SelfUnwinder.CacheEntry, 2048);
|
||||
|
|
@ -378,8 +378,8 @@ const Module = struct {
|
|||
fn findModule(si: *SelfInfo, gpa: Allocator, io: Io, address: usize, lock: enum { shared, exclusive }) Error!*Module {
|
||||
// With the requested lock, scan the module ranges looking for `address`.
|
||||
switch (lock) {
|
||||
.shared => try si.mutex.lock(io),
|
||||
.exclusive => try si.mutex.lock(io),
|
||||
.shared => si.rwlock.lockSharedUncancelable(io),
|
||||
.exclusive => si.rwlock.lockUncancelable(io),
|
||||
}
|
||||
for (si.ranges.items) |*range| {
|
||||
if (address >= range.start and address < range.start + range.len) {
|
||||
|
|
@ -389,12 +389,15 @@ fn findModule(si: *SelfInfo, gpa: Allocator, io: Io, address: usize, lock: enum
|
|||
// The address wasn't in a known range. We will rebuild the module/range lists, since it's possible
|
||||
// a new module was loaded. Upgrade to an exclusive lock if necessary.
|
||||
switch (lock) {
|
||||
.shared => {},
|
||||
.shared => {
|
||||
si.rwlock.unlockShared(io);
|
||||
si.rwlock.lockUncancelable(io);
|
||||
},
|
||||
.exclusive => {},
|
||||
}
|
||||
// Rebuild module list with the exclusive lock.
|
||||
{
|
||||
errdefer si.mutex.unlock(io);
|
||||
errdefer si.rwlock.unlock(io);
|
||||
for (si.modules.items) |*mod| {
|
||||
unwind: {
|
||||
const u = &(mod.unwind orelse break :unwind catch break :unwind);
|
||||
|
|
@ -412,7 +415,10 @@ fn findModule(si: *SelfInfo, gpa: Allocator, io: Io, address: usize, lock: enum
|
|||
}
|
||||
// Downgrade the lock back to shared if necessary.
|
||||
switch (lock) {
|
||||
.shared => {},
|
||||
.shared => {
|
||||
si.rwlock.unlock(io);
|
||||
si.rwlock.lockSharedUncancelable(io);
|
||||
},
|
||||
.exclusive => {},
|
||||
}
|
||||
// Scan the newly rebuilt module ranges.
|
||||
|
|
@ -423,8 +429,8 @@ fn findModule(si: *SelfInfo, gpa: Allocator, io: Io, address: usize, lock: enum
|
|||
}
|
||||
// Still nothing; unlock and error.
|
||||
switch (lock) {
|
||||
.shared => si.mutex.unlock(io),
|
||||
.exclusive => si.mutex.unlock(io),
|
||||
.shared => si.rwlock.unlockShared(io),
|
||||
.exclusive => si.rwlock.unlock(io),
|
||||
}
|
||||
return error.MissingDebugInfo;
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue