Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion gc/mmtk/mmtk.c
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,27 @@ rb_mmtk_alloc_fast_path(struct objspace *objspace, struct MMTk_ractor_cache *rac
}
}

static bool
obj_can_parallel_free_p(VALUE obj)
{
switch (RB_BUILTIN_TYPE(obj)) {
case T_ARRAY:
case T_BIGNUM:
case T_COMPLEX:
case T_FLOAT:
case T_HASH:
case T_OBJECT:
case T_RATIONAL:
case T_REGEXP:
case T_STRING:
case T_STRUCT:
case T_SYMBOL:
return true;
default:
return false;
}
}

VALUE
rb_gc_impl_new_obj(void *objspace_ptr, void *cache_ptr, VALUE klass, VALUE flags, bool wb_protected, size_t alloc_size)
{
Expand Down Expand Up @@ -732,7 +753,7 @@ rb_gc_impl_new_obj(void *objspace_ptr, void *cache_ptr, VALUE klass, VALUE flags
mmtk_post_alloc(ractor_cache->mutator, (void*)alloc_obj, alloc_size, MMTK_ALLOCATION_SEMANTICS_DEFAULT);

// TODO: only add when object needs obj_free to be called
mmtk_add_obj_free_candidate(alloc_obj);
mmtk_add_obj_free_candidate(alloc_obj, obj_can_parallel_free_p((VALUE)alloc_obj));

objspace->total_allocated_objects++;

Expand Down
2 changes: 1 addition & 1 deletion gc/mmtk/mmtk.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ void mmtk_post_alloc(MMTk_Mutator *mutator,
size_t bytes,
MMTk_AllocationSemantics semantics);

void mmtk_add_obj_free_candidate(MMTk_ObjectReference object);
void mmtk_add_obj_free_candidate(MMTk_ObjectReference object, bool can_parallel_free);

void mmtk_declare_weak_references(MMTk_ObjectReference object);

Expand Down
11 changes: 8 additions & 3 deletions gc/mmtk/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,10 @@ pub unsafe extern "C" fn mmtk_init_binding(
let mmtk_boxed = mmtk_init(&builder);
let mmtk_static = Box::leak(Box::new(mmtk_boxed));

let binding = RubyBinding::new(mmtk_static, &binding_options, upcalls);
let mut binding = RubyBinding::new(mmtk_static, &binding_options, upcalls);
binding
.weak_proc
.init_parallel_obj_free_candidates(memory_manager::num_of_workers(binding.mmtk));

crate::BINDING
.set(binding)
Expand Down Expand Up @@ -296,8 +299,10 @@ pub unsafe extern "C" fn mmtk_post_alloc(

// TODO: Replace with buffered mmtk_add_obj_free_candidates
#[no_mangle]
pub extern "C" fn mmtk_add_obj_free_candidate(object: ObjectReference) {
binding().weak_proc.add_obj_free_candidate(object)
pub extern "C" fn mmtk_add_obj_free_candidate(object: ObjectReference, can_parallel_free: bool) {
binding()
.weak_proc
.add_obj_free_candidate(object, can_parallel_free)
}

// =============== Weak references ===============
Expand Down
135 changes: 96 additions & 39 deletions gc/mmtk/src/weak_proc.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering;
use std::sync::Mutex;

use mmtk::{
Expand All @@ -9,10 +11,13 @@ use mmtk::{
use crate::{abi::GCThreadTLS, upcalls, Ruby};

pub struct WeakProcessor {
non_parallel_obj_free_candidates: Mutex<Vec<ObjectReference>>,
parallel_obj_free_candidates: Vec<Mutex<Vec<ObjectReference>>>,
parallel_obj_free_candidates_counter: AtomicUsize,

/// Objects that needs `obj_free` called when dying.
/// If it is a bottleneck, replace it with a lock-free data structure,
/// or add candidates in batch.
obj_free_candidates: Mutex<Vec<ObjectReference>>,
weak_references: Mutex<Vec<ObjectReference>>,
}

Expand All @@ -25,32 +30,59 @@ impl Default for WeakProcessor {
impl WeakProcessor {
pub fn new() -> Self {
Self {
obj_free_candidates: Mutex::new(Vec::new()),
non_parallel_obj_free_candidates: Mutex::new(Vec::new()),
parallel_obj_free_candidates: vec![Mutex::new(Vec::new())],
parallel_obj_free_candidates_counter: AtomicUsize::new(0),
weak_references: Mutex::new(Vec::new()),
}
}

/// Add an object as a candidate for `obj_free`.
///
/// Multiple mutators can call it concurrently, so it has `&self`.
pub fn add_obj_free_candidate(&self, object: ObjectReference) {
let mut obj_free_candidates = self.obj_free_candidates.lock().unwrap();
obj_free_candidates.push(object);
pub fn init_parallel_obj_free_candidates(&mut self, num_workers: usize) {
debug_assert_eq!(self.parallel_obj_free_candidates.len(), 1);

for _ in 1..num_workers {
self.parallel_obj_free_candidates
.push(Mutex::new(Vec::new()));
}
}

/// Add many objects as candidates for `obj_free`.
/// Add an object as a candidate for `obj_free`.
///
/// Multiple mutators can call it concurrently, so it has `&self`.
pub fn add_obj_free_candidates(&self, objects: &[ObjectReference]) {
let mut obj_free_candidates = self.obj_free_candidates.lock().unwrap();
for object in objects.iter().copied() {
obj_free_candidates.push(object);
pub fn add_obj_free_candidate(&self, object: ObjectReference, can_parallel_free: bool) {
if can_parallel_free {
// Newly allocated objects are placed in parallel_obj_free_candidates using
// round-robin. This may not be ideal for load balancing.
let idx = self
.parallel_obj_free_candidates_counter
.fetch_add(1, Ordering::Relaxed)
% self.parallel_obj_free_candidates.len();

self.parallel_obj_free_candidates[idx]
.lock()
.unwrap()
.push(object);
} else {
self.non_parallel_obj_free_candidates
.lock()
.unwrap()
.push(object);
}
}

pub fn get_all_obj_free_candidates(&self) -> Vec<ObjectReference> {
let mut obj_free_candidates = self.obj_free_candidates.lock().unwrap();
std::mem::take(obj_free_candidates.as_mut())
// let mut obj_free_candidates = self.obj_free_candidates.lock().unwrap();
let mut all_obj_free_candidates = self
.non_parallel_obj_free_candidates
.lock()
.unwrap()
.to_vec();

for candidates_mutex in &self.parallel_obj_free_candidates {
all_obj_free_candidates.extend(candidates_mutex.lock().unwrap().to_vec());
}

std::mem::take(all_obj_free_candidates.as_mut())
}

pub fn add_weak_reference(&self, object: ObjectReference) {
Expand All @@ -63,7 +95,18 @@ impl WeakProcessor {
worker: &mut GCWorker<Ruby>,
_tracer_context: impl ObjectTracerContext<Ruby>,
) {
worker.add_work(WorkBucketStage::VMRefClosure, ProcessObjFreeCandidates);
worker.add_work(
WorkBucketStage::VMRefClosure,
ProcessNonParallelObjFreeCanadidates {},
);

for index in 0..self.parallel_obj_free_candidates.len() {
worker.add_work(
WorkBucketStage::VMRefClosure,
ProcessParallelObjFreeCandidates { index },
);
}

worker.add_work(WorkBucketStage::VMRefClosure, ProcessWeakReferences);

worker.add_work(WorkBucketStage::Prepare, UpdateFinalizerObjIdTables);
Expand All @@ -80,36 +123,50 @@ impl WeakProcessor {
}
}

struct ProcessObjFreeCandidates;
fn process_obj_free_candidates(obj_free_candidates: &mut Vec<ObjectReference>) {
// Process obj_free
let mut new_candidates = Vec::new();

for object in obj_free_candidates.iter().copied() {
if object.is_reachable() {
// Forward and add back to the candidate list.
let new_object = object.forward();
trace!("Forwarding obj_free candidate: {object} -> {new_object}");
new_candidates.push(new_object);
} else {
(upcalls().call_obj_free)(object);
}
}

*obj_free_candidates = new_candidates;
}

struct ProcessParallelObjFreeCandidates {
index: usize,
}

impl GCWork<Ruby> for ProcessObjFreeCandidates {
impl GCWork<Ruby> for ProcessParallelObjFreeCandidates {
fn do_work(&mut self, _worker: &mut GCWorker<Ruby>, _mmtk: &'static mmtk::MMTK<Ruby>) {
// If it blocks, it is a bug.
let mut obj_free_candidates = crate::binding()
.weak_proc
.obj_free_candidates
let mut obj_free_candidates = crate::binding().weak_proc.parallel_obj_free_candidates
[self.index]
.try_lock()
.expect("It's GC time. No mutators should hold this lock at this time.");

let n_cands = obj_free_candidates.len();
.expect("Lock for parallel_obj_free_candidates should not be held");

debug!("Total: {n_cands} candidates");
process_obj_free_candidates(&mut obj_free_candidates);
}
}

// Process obj_free
let mut new_candidates = Vec::new();
struct ProcessNonParallelObjFreeCanadidates;

for object in obj_free_candidates.iter().copied() {
if object.is_reachable() {
// Forward and add back to the candidate list.
let new_object = object.forward();
trace!("Forwarding obj_free candidate: {object} -> {new_object}");
new_candidates.push(new_object);
} else {
(upcalls().call_obj_free)(object);
}
}
impl GCWork<Ruby> for ProcessNonParallelObjFreeCanadidates {
fn do_work(&mut self, _worker: &mut GCWorker<Ruby>, _mmtk: &'static mmtk::MMTK<Ruby>) {
let mut obj_free_candidates = crate::binding()
.weak_proc
.non_parallel_obj_free_candidates
.try_lock()
.expect("Lock for non_parallel_obj_free_candidates should not be held");

*obj_free_candidates = new_candidates;
process_obj_free_candidates(&mut obj_free_candidates);
}
}

Expand Down