Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,7 @@ set(ZIG_STAGE2_SOURCES
src/Type.zig
src/Value.zig
src/Zcu.zig
src/Zcu/PerThread.zig
src/arch/aarch64/CodeGen.zig
src/arch/aarch64/Emit.zig
src/arch/aarch64/Mir.zig
Expand Down
2 changes: 1 addition & 1 deletion lib/std/Progress.zig
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ pub const Node = struct {
}

fn init(free_index: Index, parent: Parent, name: []const u8, estimated_total_items: usize) Node {
assert(parent != .unused);
assert(parent == .none or @intFromEnum(parent) < node_storage_buffer_len);

const storage = storageByIndex(free_index);
storage.* = .{
Expand Down
7 changes: 6 additions & 1 deletion lib/std/Thread.zig
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,13 @@ pub fn getCurrentId() Id {
pub const CpuCountError = error{
PermissionDenied,
SystemResources,
Unsupported,
Unexpected,
};

/// Returns the platforms view on the number of logical CPU cores available.
pub fn getCpuCount() CpuCountError!usize {
return Impl.getCpuCount();
return try Impl.getCpuCount();
}

/// Configuration options for hints on how to spawn threads.
Expand Down Expand Up @@ -782,6 +783,10 @@ const WasiThreadImpl = struct {
return tls_thread_id;
}

fn getCpuCount() error{Unsupported}!noreturn {
return error.Unsupported;
}

fn getHandle(self: Impl) ThreadHandle {
return self.thread.tid.load(.seq_cst);
}
Expand Down
111 changes: 98 additions & 13 deletions lib/std/Thread/Pool.zig
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,45 @@ cond: std.Thread.Condition = .{},
run_queue: RunQueue = .{},
is_running: bool = true,
allocator: std.mem.Allocator,
threads: []std.Thread,
threads: if (builtin.single_threaded) [0]std.Thread else []std.Thread,
ids: if (builtin.single_threaded) struct {
inline fn deinit(_: @This(), _: std.mem.Allocator) void {}
fn getIndex(_: @This(), _: std.Thread.Id) usize {
return 0;
}
} else std.AutoArrayHashMapUnmanaged(std.Thread.Id, void),

const RunQueue = std.SinglyLinkedList(Runnable);
const Runnable = struct {
runFn: RunProto,
};

const RunProto = *const fn (*Runnable) void;
const RunProto = *const fn (*Runnable, id: ?usize) void;

pub const Options = struct {
allocator: std.mem.Allocator,
n_jobs: ?u32 = null,
n_jobs: ?usize = null,
track_ids: bool = false,
};

pub fn init(pool: *Pool, options: Options) !void {
const allocator = options.allocator;

pool.* = .{
.allocator = allocator,
.threads = &[_]std.Thread{},
.threads = if (builtin.single_threaded) .{} else &.{},
.ids = .{},
};

if (builtin.single_threaded) {
return;
}

const thread_count = options.n_jobs orelse @max(1, std.Thread.getCpuCount() catch 1);
if (options.track_ids) {
try pool.ids.ensureTotalCapacity(allocator, 1 + thread_count);
pool.ids.putAssumeCapacityNoClobber(std.Thread.getCurrentId(), {});
}

// kill and join any threads we spawned and free memory on error.
pool.threads = try allocator.alloc(std.Thread, thread_count);
Expand All @@ -49,6 +61,7 @@ pub fn init(pool: *Pool, options: Options) !void {

pub fn deinit(pool: *Pool) void {
pool.join(pool.threads.len); // kill and join all threads.
pool.ids.deinit(pool.allocator);
pool.* = undefined;
}

Expand Down Expand Up @@ -96,7 +109,7 @@ pub fn spawnWg(pool: *Pool, wait_group: *WaitGroup, comptime func: anytype, args
run_node: RunQueue.Node = .{ .data = .{ .runFn = runFn } },
wait_group: *WaitGroup,

fn runFn(runnable: *Runnable) void {
fn runFn(runnable: *Runnable, _: ?usize) void {
const run_node: *RunQueue.Node = @fieldParentPtr("data", runnable);
const closure: *@This() = @alignCast(@fieldParentPtr("run_node", run_node));
@call(.auto, func, closure.arguments);
Expand Down Expand Up @@ -134,6 +147,70 @@ pub fn spawnWg(pool: *Pool, wait_group: *WaitGroup, comptime func: anytype, args
pool.cond.signal();
}

/// Runs `func` in the thread pool, calling `WaitGroup.start` beforehand, and
/// `WaitGroup.finish` after it returns.
///
/// The first argument passed to `func` is a dense `usize` thread id, the rest
/// of the arguments are passed from `args`. Requires the pool to have been
/// initialized with `.track_ids = true`.
///
/// In the case that queuing the function call fails to allocate memory, or the
/// target is single-threaded, the function is called directly.
pub fn spawnWgId(pool: *Pool, wait_group: *WaitGroup, comptime func: anytype, args: anytype) void {
wait_group.start();

if (builtin.single_threaded) {
@call(.auto, func, .{0} ++ args);
wait_group.finish();
return;
}

const Args = @TypeOf(args);
const Closure = struct {
arguments: Args,
pool: *Pool,
run_node: RunQueue.Node = .{ .data = .{ .runFn = runFn } },
wait_group: *WaitGroup,

fn runFn(runnable: *Runnable, id: ?usize) void {
const run_node: *RunQueue.Node = @fieldParentPtr("data", runnable);
const closure: *@This() = @alignCast(@fieldParentPtr("run_node", run_node));
@call(.auto, func, .{id.?} ++ closure.arguments);
closure.wait_group.finish();

// The thread pool's allocator is protected by the mutex.
const mutex = &closure.pool.mutex;
mutex.lock();
defer mutex.unlock();

closure.pool.allocator.destroy(closure);
}
};

{
pool.mutex.lock();

const closure = pool.allocator.create(Closure) catch {
const id: ?usize = pool.ids.getIndex(std.Thread.getCurrentId());
pool.mutex.unlock();
@call(.auto, func, .{id.?} ++ args);
wait_group.finish();
return;
};
closure.* = .{
.arguments = args,
.pool = pool,
.wait_group = wait_group,
};

pool.run_queue.prepend(&closure.run_node);
pool.mutex.unlock();
}

// Notify waiting threads outside the lock to try and keep the critical section small.
pool.cond.signal();
}

pub fn spawn(pool: *Pool, comptime func: anytype, args: anytype) !void {
if (builtin.single_threaded) {
@call(.auto, func, args);
Expand Down Expand Up @@ -181,14 +258,16 @@ fn worker(pool: *Pool) void {
pool.mutex.lock();
defer pool.mutex.unlock();

const id: ?usize = if (pool.ids.count() > 0) @intCast(pool.ids.count()) else null;
if (id) |_| pool.ids.putAssumeCapacityNoClobber(std.Thread.getCurrentId(), {});

while (true) {
while (pool.run_queue.popFirst()) |run_node| {
// Temporarily unlock the mutex in order to execute the run_node
pool.mutex.unlock();
defer pool.mutex.lock();

const runFn = run_node.data.runFn;
runFn(&run_node.data);
run_node.data.runFn(&run_node.data, id);
}

// Stop executing instead of waiting if the thread pool is no longer running.
Expand All @@ -201,17 +280,23 @@ fn worker(pool: *Pool) void {
}

pub fn waitAndWork(pool: *Pool, wait_group: *WaitGroup) void {
var id: ?usize = null;

while (!wait_group.isDone()) {
if (blk: {
pool.mutex.lock();
defer pool.mutex.unlock();
break :blk pool.run_queue.popFirst();
}) |run_node| {
run_node.data.runFn(&run_node.data);
pool.mutex.lock();
if (pool.run_queue.popFirst()) |run_node| {
id = id orelse pool.ids.getIndex(std.Thread.getCurrentId());
pool.mutex.unlock();
run_node.data.runFn(&run_node.data, id);
continue;
}

pool.mutex.unlock();
wait_group.wait();
return;
}
}

pub fn getIdCount(pool: *Pool) usize {
return @intCast(1 + pool.threads.len);
}
2 changes: 1 addition & 1 deletion lib/std/multi_array_list.zig
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ pub fn MultiArrayList(comptime T: type) type {
self.sortInternal(a, b, ctx, .unstable);
}

fn capacityInBytes(capacity: usize) usize {
pub fn capacityInBytes(capacity: usize) usize {
comptime var elem_bytes: usize = 0;
inline for (sizes.bytes) |size| elem_bytes += size;
return elem_bytes * capacity;
Expand Down
4 changes: 2 additions & 2 deletions src/Air.zig
Original file line number Diff line number Diff line change
Expand Up @@ -1563,12 +1563,12 @@ pub fn internedToRef(ip_index: InternPool.Index) Inst.Ref {
}

/// Returns `null` if runtime-known.
pub fn value(air: Air, inst: Inst.Ref, mod: *Module) !?Value {
pub fn value(air: Air, inst: Inst.Ref, pt: Zcu.PerThread) !?Value {
if (inst.toInterned()) |ip_index| {
return Value.fromInterned(ip_index);
}
const index = inst.toIndex().?;
return air.typeOfIndex(index, &mod.intern_pool).onePossibleValue(mod);
return air.typeOfIndex(index, &pt.zcu.intern_pool).onePossibleValue(pt);
}

pub fn nullTerminatedString(air: Air, index: usize) [:0]const u8 {
Expand Down
Loading