Scheme to the Spec Part I: Concurrent Cycle Collection
by Matthew Plant
Scheme to the Spec is a series on the more complex, often overlooked aspects of programming language
implementation.
In this series we will dive deep into my work-in-progress implementation of R6RS scheme, scheme-rs,
an implementation designed to integrate seamlessly with the async-rust ecosystem.
Our first article discusses how to implement Garbage-Collected smart pointers that we can use both within the interpreter
and the interfacing Rust code. In later articles we will discuss topics such as tail call optimizations, implementing continuations,
and syntax transformers. Our final article will be implementing on-stack replacement with LLVM.
Part I: Garbage Collected Smart Pointers via Concurrent Cycle Collection
Ten years ago I wrote an article on how to implement a conservative garbage collector for C, and I got a lot of constructive
criticism regarding the actual usefulness of the code presented. It only makes sense that on the ten-year anniversary of
publishing I should try to fix my error by writing a garbage collector that is precise, does not make assumptions, and (hopefully)
actually works.
1. The Gc
smart pointer
Before we can start writing code, we have to figure out what goals we want to accomplish. More specifically, what do we want our
Gc
type to do? How does it behave? What kind of data can it contain?
I want the Gc
type to work as follows:
- The API for the
Gc
should behave similarly to aArc<:sync::rwlock>>
; that is, it should support interior mutability
through a read/write lock and it should be clonable and sendable across threads. T
is allowed to be any data type that satisfies'static + Send + Sync
. This includesArc
.- When
Gc
no longer has any references to it reachable from the stack, then we should clean it up properly, including any cycles.
Why a RwLock
and not a Mutex
? RwLocks
are not always ideal as a lock, so we need a particularly good justification for
choosing them. Since read and write locks are given access in a FIFO queue and neither has a higher priority than the other, acquiring
a write lock can take a disproportionate amount of time. But, let’s consider the language we are implementing: a functional language
with relatively few concurrent writes.
In Scheme, writes occur most often when adding a variable to a lexical environment, and by and large that happens synchronously. For
example, in the following snippet
(define pi 3.1415) (define e 2.7182)
write locks are acquired on the environment sequentially, and thus the process does not need to wait.
RwLocks
would be a poor choice for Scheme code with multiple threads all reading and writing to a global variable. This pattern is
just not very common in functional programming where message passing is vastly preferred.
Now that we have an idea as to how our smart pointer should behave, we can begin our implementation.
1.1. Scaffolding and allocation
Our Gc type will be composed of three separate types:
Gc
: User-facing type, contains a pointer to our heap allocated memory.GcInner
: The inner data type, which containsT
and any other information we may need to keep track of our data.GcHeader
: Internal book-keeping information.
To begin, we have no extra information we need to store in GcHeader
, so we can knock it out quickly:
struct GcHeader;
GcInner
is just the header and the data, so it is equally simple:
struct GcInner<T> { header: GcHeader, data: T, }
We are ready to put together our Gc
type. One might define it rather simply as the following:
struct Gc<T> { ptr: *mut T, }
Unfortunately, this definition has two major problems:
1.1.1. Variance
This struct does not pass through subtyping relationships on T; i.e., this structure is invariant over T.
Ideally, for every two type parameters A
and B
, if A
is a subtype of B
we would also like Gc
to
be a subtype of Gc
because we want a Gc
to behave as close to a T
as possible. The reason this
occurs is because pointer types in Rust are invariant, and thus our wrapper type will be as well.
We can fix this by using a NonNull
pointer type, which is covariant over T
. Additionally, this
ensures that any Gc we successfully create will never contain a null pointer, which is a plus.
In reality, variance is not particularly useful for our struct since we only support types that are 'static
,
and most subtyping relationships in Rust regard references. However, there are a few times when it may come up ,
so there is no reason to not support it.
1.1.2. Drop checking
As it’s specified now, the Rust compiler is forced to assume that any Gc
will be strictly out-lived by the
underlying data. For our data type this not the case. Although a lot of Gc
s represent references to data,
some Gc
s represent the entire lifetime – they represent the data itself. Therefore, dropping a Gc
can potentially
drop the underlying T
value. The Rust compiler needs to know about this in order to perform its drop check
analysis, or else potentially unsound code can be constructed .
In order to indicate this information, we can use a PhantomData
, a neat wrapper type that indicates to the compiler that our data type should behave as if it has ownership over
a type T
.
With these two data types we can put together our Gc
pub struct Gc<T> { ptr: NonNull<GcInner<T>>, marker: PhantomData<GcInner<T>>, } impl<T> Gc<T> { pub fn new(data: T) -> Gc<T> { Self { ptr: NonNull::from(Box::leak(Box::new(GcInner { header: GcHeader::default(), data, }))), marker: PhantomData, } } }
The new
function is rather straightforward but worth commenting on. The easiest way to allocate a pointer in
Rust is to use the Box::new
function to allocate space and copy data onto the heap, and then use the Box::leak
function to consume the box without running its destructor and returning us a dangling pointer.
2. Thread-safe interior mutability
After our data is allocated, we need a way to read and write to it in a thread-safe manner. By default, Rust
assumes that for the lifetime an immutable reference is held, the data referenced to will not be modified. We would like to opt
out of that assumption. To that end, we must use the UnsafeCell
wrapper.
Of course, we want to ensure that if someone is trying to read the data behind a a Gc that no one is trying
to modify it at the same time, as that would be unsound.
Additionally, we are going to tell Rust that it should trust us to implement these rules in a thread-safe manner.
Our GcInner
type now looks like the following:
pub struct GcInner<T: ?Sized> { header: UnsafeCell<GcHeader>, data: UnsafeCell<T>, } unsafe impl<T: ?Sized + Send> Send for GcInner<T> {} unsafe impl<T: ?Sized + Sync> Sync for GcInner<T> {} unsafe impl Send for GcHeader {} unsafe impl Sync for GcHeader {}
This sets up our Gc
type to have thread-safe interior mutability. We must now implement it.
2.1. Semaphores
A Semaphore is a way to control access to a
resource. Essentially, it is an array of N slots that each process is allowed to claim ownership of.
If all N slots are claimed, then processes must queue up and wait for the processes with ownership
to relinquish them.
Acquire(Semaphore) -> Option Release(Permit)
(both of these operations are atomic)
The ordering of these slots is irrelevant. Processes are only concerned with having ownership of a slot or
not. If the Semaphore has only one slot, you can make this value behave exactly like a typical Mutex:
Lock(Semaphore) -> Permit: loop: match Acquire(Semaphore): Some(Permit) => return Permit,
If we were to change our semaphore to have N > 1 slots, we need to add another atomic operation in order to properly
mimic our Mutex
:
AcquireMany(Semaphore, NumSlots) -> Option
Locking is pretty much the same but instead of attempting to acquire one slot we attempt to acquire all N.
We don’t have to always lock a variable. We can use semaphores with N > 1 to mimic Rust’s safety rules – we can
have an unlimited number of immutable references OR one single mutable reference, and never both. The trick is
to have our reads only acquire one slot and to have our writes acquire all N.
To implement this, we’re going to use tokio’s Semaphore,
which interfaces well with async rust. Let’s add it to GcHeader
:
struct GcHeader { semaphore: tokio::sync::Semaphore, }
2.2. Read/write guards
We need some types to represent our acquired resources. In Rust, this done with Guards. Guards are
structs that:
- hold a reference and a permit of use for a resource
- are treated transparently as a reference to the resource (via
Deref
and/orDerefMut
) - release the permit when they are dropped
- has the lifetime of the struct used to create the guard (so cannot be returned from a function with the smart pointer as a local)
pub struct GcReadGuard<'a, T: ?Sized> { _permit: tokio::sync::SemaphorePermit<'a>, data: *const T, marker: PhantomData<&'a T>, } impl<T: ?Sized> Deref for GcReadGuard<'_, T> { type Target = T; fn deref(&self) -> &T { unsafe { &*self.data } } } impl<T: ?Sized> AsRef<T> for GcReadGuard<'_, T> { fn as_ref(&self) -> &T { self } } unsafe impl<T: ?Sized + Send> Send for GcReadGuard<'_, T> {} unsafe impl<T: ?Sized + Sync> Sync for GcReadGuard<'_, T> {} pub struct GcWriteGuard<'a, T: ?Sized> { _permit: tokio::sync::SemaphorePermit<'a>, data: *mut T, marker: PhantomData<&'a mut T>, } impl<T> Deref for GcWriteGuard<'_, T: ?Sized> { type Target = T; fn deref(&self) -> &T { unsafe { &*self.data } } } impl<T> DerefMut for GcWriteGuard<'_, T: ?Sized> { fn deref_mut(&mut self) -> &mut T { unsafe { &mut *self.data } } } unsafe impl<T: ?Sized + Send> Send for GcWriteGuard<'_, T> {} unsafe impl<T: ?Sized + Sync> Sync for GcWriteGuard<'_, T> {}
Again, you will notice that in both of these structs we use the same trick to ensure the desired variance
(covariance) over each of the guards type parameters.
Since we use the tokio semaphore, we can create futures that await the acquisition of a permit:
impl<T> Gc<T> { pub async fn read(&self) -> GcReadGuard<'_, T> { unsafe { let _permit = (*self.ptr.as_ref().header.get()) .semaphore .acquire() .await .unwrap(); let data = &*self.ptr.as_ref().data.get() as *const T; GcReadGuard { _permit, data, marker: PhantomData, } } } pub async fn write(&self) -> GcWriteGuard<'_, T> { unsafe { let _permit = (*self.ptr.as_ref().header.get()) .semaphore .acquire_many(MAX_READS) .await .unwrap(); let data = &mut *self.ptr.as_ref().data.get() as *mut T; GcWriteGuard { _permit, data, marker: PhantomData, } } } }
This gives us a heap allocated object with thread-safe interior mutability that lives forever.
Our last task is to make it mortal.
3. Garbage collection
There are two main techniques for determining if allocated objects should be freed; tracing and reference
counting. Besides practical differences in performance and memory overhead, they differ in what information they
take as input:
- Tracing algorithms require a set of roots; values that one is required to travese in order to reach heap
objects. This includes stack variables and globals. - Reference counting algorithms require the number of active references to an object.
Determining an object’s reference count in Rust is fairly straightforward. In Rust, an object’s reference count is equal to
the difference of clones and drops plus one. This is because Rust has an affine type system for objects that do not implement Copy
, which means that an object that does not implement Copy
can be moved
at most once. Therefore, moves do not affect the reference count of an object.
To contrast this, figuring out the root objects in Rust is difficult. It’s certainly possible, but reference counts are
much easier to work with. This is why Rust’s default automatic memory management types, Rc
and Arc, are both reference counted and can be easily implemented in user
code. Therefore, we will use reference counting.
But reference counting has a key problem. If it didn’t we wouldn’t have a reason to implement our own type.
3.1. Cycles
An object can be unreachable but still have a positive reference count. This is because our Gc
type allows for
the creation of cyclical data structures (see fig. 1). Such data structures are pretty common to, especially in functional languages like Scheme. In order to ensure that cyclical data structures are collected
appropriately, we will be implementing
Concurrent Cycle Collection in Reference Counted Systems by David F. Bacon and V.T. Rajan ,
an algorithm for automatically detecting and collecting cycles in reference counted data structures.
3.2. Synchronous cycle collection
Before we implement concurrent cycle detection, let’s start with synchronous. Here is the code
listing for the synchronous cycle collection algorithm:
Increment(s) RC(S) = RC(S) + 1 color(S) = black |
ScanRoots() for S in Roots Scan(s) |
Decrement(S) RC(S) = RC(S) - 1 if (RC(S) == 0) Release(S) else PossibleRoot(S) |
CollectRoots() for S in Roots remove S from Roots buffered(S) = false CollectWhite(S) |
Release(S) for T in children(S) Decrement(T) color(S) = black if (! buffered(S)) Free(S) |
MarkGray(S) if (color(S) != gray) color(S) = gray for T in children(S) Rc(T) = RC(T) - 1 MarkGray(T) |
PossibleRoot(S) if (color(S) != purple) color(S) = purple if (! buffered(S)) buffered(S) = true append S to Roots |
Scan(S) if (color(S) == gray) if (RC(S) > 0) ScanBlack(S) else color(S) = white for T in children(S) Scan(T) |
CollectCycles() MarkRoots() ScanRoots() CollectRoots() |
ScanBlack(S) color(S) = black for T in children(S) RC(T) = RC(T) + 1 if (color(T) != black) ScanBlack(T) |
MarkRoots() for S in Roots if (color(S) == purple) MarkGray(S) else buffered(S) = false remove S from Roots if (color(S) == black and RC(S) == 0) Free(S) |
CollectWhite(S) if (color(S) == white and !buffered(S)) color(S) = black for T in children(S) CollectWhite(T) Free(S) |
Everyone loves to trust code at face value, but how does this actually work? I encourage you to read the paper, but I
will summarize the operation leaving out some very important details.
Cycle Collection relies on this key insight: if you were to perform a drop on every node in a cycle, that cycle will be
garbage if the remaining ref count of every node in that cycle is zero. This kind of makes sense intuitively, what we’re
basically saying here is that if we somehow already knew that a data structure was cyclic, we could just manually sever
an outgoing reference for some random node and the cascade of decrementing reference counts would cleanly free the whole
data structure.
With this in mind, we can can construct an efficient algorithm to collect cyclical garbage. We’ll color the nodes in
different colors as they pass through different stages
Decrement
: When we decrement a reference count and the reference count is greater than zero, add it to our list of
possible roots. If we’ve already added it before performing a collection cycle (it’s been marked purple), skip this.Mark
: Go through the roots, and perform the test described in the previous paragraph. Practically, this works by
performing a depth first search on the root, marking each node gray and decrementing the reference count of each child
and repeating the process on them. If the child is already marked gray, we skip them.Scan
: Go through the roots and recursively check their children for reference counts greater than zero. If there is
any greater than zero, recursively mark all of their children black to indicate the data is live. Otherwise, the
structure is marked white, to indicate it is ready to be freed.Collect
: Go through the roots marked white and free them.
There is one thing in particular that is not immediately clear as to how we are going to implement. How do
we iterate over the children? We haven’t put any bounds on the T
in our Gc
beyond that it has to be 'static
. Well,
until Rust gains a more powerful reflection story, we are going to have to add a classic Trait plus derive macro combo.
3.3. The Trace trait and derive macro
Let’s define the trait we need to implement the above code. We need a function that matches the following form:
for T in children(S): F(T)
We can extract two type parameters from this statement: S, and F(T) where T == S. Therefore, the function we want will
have the form fn for_each_children(S, impl FnMut(S))
.
It’s not entirely obvious at first glance, but there is another function that we must pay attention to: free
.
The free
function presented in the above code listing does not correspond to how memory is freed in Rust. That is
because this code assumes that when we free code we are not running any of its members destructors. But we do want to
run our destructors. At least, we want to run the destructor if the type is not a Gc
. We’re going to have to add
a finalize
function to handle a custom drop routine:
unsafe trait Trace: 'static { unsafe fn visit_children(&self, visitor: unsafe fn(OpaqueGcPtr)); unsafe fn finalize(&mut self) { drop_in_place(self as *mut Self); } }
These two things are in fact the only two things our collection algorithm cares about in regards to Gc
’s data.
Therefore, whenever we’re in the context of the collection algorithm, we will cast our Gc’s into a trait object:
type OpaqueGc = GcInner<dyn Trace>; pub type OpaqueGcPtr = NonNull<OpaqueGc>; impl<T: Trace> Gc<T> { pub unsafe fn as_opaque(&self) -> OpaqueGcPtr { self.ptr as OpaqueGcPtr } }
With the Trace trait defined, we can implement it for some primitive types to give us some building blocks with which
to implement composite structures. A good starting point will be Rust’s std library, since those data structures are used
throughout the entire Rust ecosystem. I won’t provide all of the ones I implemented, but here
are a few:
unsafe trait GcOrTrace: 'static { unsafe fn visit_or_recurse(&self, visitor: unsafe fn(OpaqueGcPtr)); unsafe fn finalize_or_skip(&mut self); } unsafe impl<T: Trace> GcOrTrace for Gc<T> { unsafe fn visit_or_recurse(&self, visitor: unsafe fn(OpaqueGcPtr)) { visitor(self.as_opaque()) } unsafe fn finalize_or_skip(&mut self) {} } unsafe impl<T: Trace + ?Sized> GcOrTrace for T { unsafe fn visit_or_recurse(&self, visitor: unsafe fn(OpaqueGcPtr)) { self.visit_children(visitor); } unsafe fn finalize_or_skip(&mut self) { self.finalize(); } } unsafe impl<T> Trace for Vec<T> where T: GcOrTrace, { unsafe fn visit_children(&self, visitor: unsafe fn(OpaqueGcPtr)) { for child in self { child.visit_or_recurse(visitor); } } unsafe fn finalize(&mut self) { for mut child in std::mem::take(self).into_iter() { child.finalize_or_skip(); std::mem::forget(child); } } } unsafe impl<T> Trace for Option<T> where T: GcOrTrace, { unsafe fn visit_children(&self, visitor: unsafe fn(OpaqueGcPtr)) { if let Some(inner) = self { inner.visit_or_recurse(visitor); } } unsafe fn finalize(&mut self) { if let Some(inner) = self { inner.finalize_or_skip(); } } } unsafe impl<T> Trace for std::sync::Arc<T> where T: GcOrTrace, { unsafe fn visit_children(&self, visitor: unsafe fn(OpaqueGcPtr)) { self.as_ref().visit_or_recurse(visitor); } }
If you are able to immediately spot the error in the code above, you are much smarter than
I am. I hope you at least have the courtesy to be less good looking. Anyway, this code isn’t
correct with our current set of assumptions. And that’s due to the implementation for Arc,
which recurses into its data.
The problem is that we are basically disregarding the reference count of the Arc
. Consider the
following structure:
In this case, dropping A results in the immediate dropping of C while a dangling reference from B remains.
Essentially, the Arc
collapses all of the incoming references to C into one.
Here is the correct code for Arc
. Part of what makes this code correct is that we added an
explanation, so be sure to always do that.
unsafe impl<T> Trace for std::sync::Arc<T> where T: ?Sized + 'static, { unsafe fn visit_children(&self, _visitor: unsafe fn(OpaqueGcPtr)) { } }
Another important thing to discuss is the operation of finalize
for Vec
(and other container types):
unsafe impl<T> Trace for Vec<T> where T: GcOrTrace, { unsafe fn finalize(&mut self) { for mut child in std::mem::take(self).into_iter() { child.finalize_or_skip(); std::mem::forget(child); } } }
Our finalization here is pretty particular: we take the memory allocated by the Vec
, convert it into
and owned iterator so we can be sure there are no dangling references to child that drop
may be run on,
and drop the children.
I am not going to get too into the Trait derive proc macro code as it’s not particularly interesting and
quite long. But it’s important to note what it is actually doing for any given type T
:
visit_children
: for every field, if the type is aGc
, callvisitor
on theOpaqueGcPtr
of the field.
If the type is not aGc
, recursively callvisit_children
on it.finalize
: for every field that is not aGc
, callfinalize
on it.
If you would like to see how this is done, the code is available here.
3.4. Deallocation
The free
function needs to call std::alloc::dealloc
to
free
the allocated memory. This function takes a Layout (a size
and alignment). How do we find this for our opaque dyn Trace
pointer? Turns out the size and alignment of the
underlying data is part of Rust’s vtable. So, we can just call Layout::for_value
on our reference and create one.
Here’s the resulting free function:
unsafe fn trace<'a>(s: OpaqueGcPtr) -> &'a mut dyn Trace { &mut *s.as_ref().data.get() } unsafe fn free(s: OpaqueGcPtr) { let trace = trace(s); let layout = Layout::for_value(trace); trace.finalize(); std::alloc::dealloc(s.as_ptr() as *mut u8, layout); }
3.5. Extending from synchronous to concurrent
If we wish to move our collection into a separate thread, we are somehow going to have to deal
with two things:
- Increment and Decrement operations can happen in parallel while our algorithm is running. In
the synchronous algorithm, we could guarantee that reference counts would stay the same for the
lifetime ofCollectCycles
. - The mutation of the reference graph can causes the
MarkGray
function to incorrectly mark
a live object as garbage.
In order to explain how each of these things are dealt with, let’s take a look at our final GcHeader
:
pub struct GcHeader { rc: usize, crc: usize, color: Color, buffered: bool, semaphore: Semaphore, } enum Color { Black, Gray, White, Purple, Red, Orange, }
This header is quite large and therefore a lot of overhead. It can be made smaller with some tricks,
such as assuming that most nodes will not have more than 255 reference counts and storing overflowing
ref counts externally, but we will ignore those tricks for now.
3.5.1. The mutation buffer
You will notice that the ref counts for our Gc
type are not atomic. This is because our concurrent
algorithm requires a constraint: only one thread at a time can ever modify the ref count, or indeed
any field in the GcHeader
except for semaphore (which is already thread safe). This gives our
garbage collection process exclusive read and write access to most of the header.
What gives? How does that work? Isn’t this algorithm supposed to be concurrent? We don’t want
decrementing or incrementing ref counts to wait on the completion of the GC process. That would make
our drop and clone functions blocking, which would gum up our whole system.
The solution is the mutation buffer, an unbounded channel that lets us buffer incremements and
decrements to be handled by our collection process as it pleases.
This wil allow our collector to stop processing incremements and decrements at any time and effectively
have a snapshot of the system without causing any pauses in other threads. Since the buffer is unbounded,
calling send
is a non-blocking, sync operation:
#[derive(Copy, Clone)] struct Mutation { kind: MutationKind, gc: NonNull<OpaqueGc>, } impl Mutation { fn new(kind: MutationKind, gc: NonNull<OpaqueGc>) -> Self { Self { kind, gc } } } unsafe impl Send for Mutation {} unsafe impl Sync for Mutation {} #[derive(Copy, Clone)] enum MutationKind { Inc, Dec, } struct MutationBuffer { mutation_buffer_tx: UnboundedSender<Mutation>, mutation_buffer_rx: Mutex<Option<UnboundedReceiver<Mutation>>>, } unsafe impl Sync for MutationBuffer {} impl Default for MutationBuffer { fn default() -> Self { let (mutation_buffer_tx, mutation_buffer_rx) = unbounded_channel(); Self { mutation_buffer_tx, mutation_buffer_rx: Mutex::new(Some(mutation_buffer_rx)), } } } static MUTATION_BUFFER: OnceLock<MutationBuffer> = OnceLock::new(); fn inc_rc<T: Trace>(gc: NonNull<GcInner<T>>) { MUTATION_BUFFER .get_or_init(MutationBuffer::default) .mutation_buffer_tx .send(Mutation::new(MutationKind::Inc, gc as NonNull<OpaqueGc>)) .unwrap(); } fn dec_rc<T: Trace>(gc: NonNull<GcInner<T>>) { MUTATION_BUFFER .get_or_init(MutationBuffer::default) .mutation_buffer_tx .send(Mutation::new(MutationKind::Dec, gc as NonNull<OpaqueGc>)) .unwrap(); }
And now we finally can implement Clone
and Drop
for Gc
:
impl<T: Trace> Clone for Gc<T> { fn clone(&self) -> Gc<T> { inc_rc(self.ptr); Self { ptr: self.ptr, marker: PhantomData, } } } impl<T: Trace> Drop for Gc<T> { fn drop(&mut self) { dec_rc(self.ptr); } }
This also presents a structure for our collection process: wait for some increments and
decrements, process them, then decide if we want to try to perform a collection:
static COLLECTOR_TASK: OnceLock<JoinHandle<()>> = OnceLock::new(); pub fn init_gc() { let _ = MUTATION_BUFFER.get_or_init(MutationBuffer::default); let _ = COLLECTOR_TASK .get_or_init(|| tokio::task::spawn(async { unsafe { run_garbage_collector().await } })); } const MUTATIONS_BUFFER_DEFAULT_CAP: usize = 10_000; async unsafe fn run_garbage_collector() { let mut last_epoch = Instant::now(); let mut mutation_buffer_rx = MUTATION_BUFFER .get_or_init(MutationBuffer::default) .mutation_buffer_rx .lock() .take() .unwrap(); let mut mutation_buffer: Vec<_> = Vec::with_capacity(MUTATIONS_BUFFER_DEFAULT_CAP); loop { epoch(&mut last_epoch, &mut mutation_buffer).await; mutation_buffer.clear(); } } async unsafe fn epoch(last_epoch: &mut Instant, mutation_buffer: &mut Vec<Mutation>) { process_mutation_buffer(mutation_buffer).await; let duration_since_last_epoch = Instant::now() - *last_epoch; if duration_since_last_epoch > Duration::from_millis(100) { tokio::task::spawn_blocking(|| unsafe { process_cycles() }) .await .unwrap(); *last_epoch = Instant::now(); } } async unsafe fn process_mutation_buffer( mutation_buffer_rx: &mut UnboundedReceiver<Mutation>, mutation_buffer: &mut Vec<Mutation> ) { let to_recv = mutation_buffer_rx.len(); mutation_buffer_rx .recv_many(mutation_buffer, to_recv) .await; for mutation in mutation_buffer { match mutation.kind { MutationKind::Inc => increment(mutation.gc), MutationKind::Dec => decrement(mutation.gc), } } }
3.5.2. The Cyclical Reference Count
You will notice in the GcHeader
we added a second reference counting field called the CRC. This is distinguished
from the “true” reference count, the RC, by being the hypothetical reference count of the node that has perhaps
become invalidated during the epoch. We can modify the code listing to create the following functions that use the CRC
instead :
static mut ROOTS: Vec<OpaqueGcPtr> = Vec::new(); static mut CYCLE_BUFFER: Vec<Vec<OpaqueGcPtr>> = Vec::new(); static mut CURRENT_CYCLE: Vec<OpaqueGcPtr> = Vec::new(); unsafe fn increment(s: OpaqueGcPtr) { *rc(s) += 1; scan_black(s); } unsafe fn decrement(s: OpaqueGcPtr) { *rc(s) -= 1; if *rc(s) == 0 { release(s); } else { possible_root(s); } } unsafe fn release(s: OpaqueGcPtr) { for_each_child(s, decrement); *color(s) = Color::Black; if !*buffered(s) { free(s); } } unsafe fn possible_root(s: OpaqueGcPtr) { scan_black(s); *color(s) = Color::Purple; if !*buffered(s) { *buffered(s) = true; (&raw mut ROOTS).as_mut().unwrap().push(s); } } unsafe fn mark_roots() { let mut new_roots = Vec::new(); for s in (&raw const ROOTS).as_ref().unwrap().iter() { if *color(*s) == Color::Purple && *rc(*s) > 0 { mark_gray(*s); new_roots.push(*s); } else { *buffered(*s) = false; if *rc(*s) == 0 { free(*s); } } } ROOTS = new_roots; } unsafe fn scan_roots() { for s in (&raw const ROOTS).as_ref().unwrap().iter() { scan(*s) } } unsafe fn collect_roots() { for s in std::mem::take((&raw mut ROOTS).as_mut().unwrap()) { if *color(s) == Color::White { collect_white(s); let current_cycle = std::mem::take((&raw mut CURRENT_CYCLE).as_mut().unwrap()); (&raw mut CYCLE_BUFFER) .as_mut() .unwrap() .push(current_cycle); } else { *buffered(s) = false; } } } unsafe fn mark_gray(s: OpaqueGcPtr) { if *color(s) != Color::Gray { *color(s) = Color::Gray; *crc(s) = *rc(s) as isize; for_each_child(s, |t| { mark_gray(t); if *crc(t) > 0 { *crc(t) -= 1; } }); } } unsafe fn scan(s: OpaqueGcPtr) { if *color(s) == Color::Gray { if *crc(s) == 0 { *color(s) = Color::White; for_each_child(s, scan); } else { scan_black(s); } } } unsafe fn scan_black(s: OpaqueGcPtr) { if *color(s) != Color::Black { *color(s) = Color::Black; for_each_child(s, scan_black); } } unsafe fn collect_white(s: OpaqueGcPtr) { if *color(s) == Color::White { *color(s) = Color::Orange; *buffered(s) = true; (&raw mut CURRENT_CYCLE).as_mut().unwrap().push(s); for_each_child(s, collect_white); } }
3.5.3. The safety phase
Because of concurrent mutations to the edges of our graph, there is a possibility that our mark algorithm
will produce results that are incorrect. Therefore, we divide our collection algorithm into two phases:
- The marking phase, which produces candidate cycles.
- The safety phase, which determines if candidate cycles are garbage.
The correctness of this approach relies on another key insight: any mutation that occurs while we are
marking our cycles must appear in the next epoch. Therefore, we need to check that no external node adds
an edge to our potential cycle over the current and next epoch.
We have two tests for this, corresponding to the current and next epoch respectively:
3.5.3.1. Σ-test
The sigma test requires preparation. In essence preparation boils down to using the same algorithm as
MarkGray
, but we are restricting it to only the nodes of a given cycle:
unsafe fn sigma_preparation() { for c in (&raw const CYCLE_BUFFER).as_ref().unwrap() { for n in c { *color(*n) = Color::Red; *crc(*n) = *rc(*n); } for n in c { for_each_child(*n, |m| { if *color(m) == Color::Red && *crc(m) > 0 { *crc(m) -= 1; } }); } for n in c { *color(*n) = Color::Orange; } } }
At the end of prepartion, if any of the circular reference counts are greater than zero, the graph is live.
We check this in the next epoch:
unsafe fn sigma_test(c: &[OpaqueGcPtr]) -> bool { for n in c { if *crc(*n) > 0 { return false; } } true }
The reason that this test is called the “sigma” test is because the sum of the circular reference
counts will remain zero in a garbage cycle.
3.5.3.2. Δ-test
If any node’s reference count is incremented in the next epoch, it will be colored black and fail the
delta test:
unsafe fn delta_test(c: &[OpaqueGcPtr]) -> bool { for n in c { if *color(*n) != Color::Orange { return false; } } true }
3.5.4. Cleaning up
If a candidate cycle fails either of the tests, we want to make sure to properly re-color the nodes.
There’s an additional heuristic that adds some nodes back to the roots list.
unsafe fn refurbish(c: &[OpaqueGcPtr]) { for (i, n) in c.iter().enumerate() { match (i, *color(*n)) { (0, Color::Orange) | (_, Color::Purple) => { *color(*n) = Color::Purple; unsafe { (&raw mut ROOTS).as_mut().unwrap().push(*n); } } _ => { *color(*n) = Color::Black; *buffered(*n) = false; } } } }
If a candidate cycle passes both tests, we need to free it and decrement any outgoing reference counts.
This is pretty straightforward:
unsafe fn cyclic_decrement(m: OpaqueGcPtr) { if *color(m) != Color::Red { if *color(m) == Color::Orange { *rc(m) -= 1; *crc(m) -= 1; } else { decrement(m); } } } unsafe fn free_cycle(c: &[OpaqueGcPtr]) { for n in c { *color(*n) = Color::Red; } for n in c { for_each_child(*n, cyclic_decrement); } for n in c { free(*n); } }
3.5.5. Misc helper functions
Before we put everything together, let’s briefly talk about some of the helper functions:
unsafe fn color<'a>(s: OpaqueGcPtr) -> &'a mut Color { &mut (*s.as_ref().header.get()).color } unsafe fn rc<'a>(s: OpaqueGcPtr) -> &'a mut usize { &mut (*s.as_ref().header.get()).rc } unsafe fn buffered<'a>(s: OpaqueGcPtr) -> &'a mut bool { &mut (*s.as_ref().header.get()).buffered } unsafe fn semaphore<'a>(s: OpaqueGcPtr) -> &'a Semaphore { &(*s.as_ref().header.get()).semaphore } fn acquire_permit(semaphore: &'_ Semaphore) -> SemaphorePermit<'_> { loop { if let Ok(permit) = semaphore.try_acquire() { return permit; } } } unsafe fn for_each_child(s: OpaqueGcPtr, visitor: unsafe fn(OpaqueGcPtr)) { let permit = acquire_permit(semaphore(s)); (*s.as_ref().data.get()).visit_children(visitor); drop(permit); }
These are all pretty straightforward, but you may notice that we acquire a read lock on
the data for the Gc
when we call for_each_child
. I couldn’t find any mention of it in the
paper but it has the assumption that pointer store and loads are atomic.
The way this is implemented is not ideal as the permit is acquired for the lifetime of the
visit_children
function. A better approach would be to collect all of the OpaqueGcPtr
s into
a buffer and traverse them after releasing the permit.
3.6. Bringing it all together
Finally, we can write the last of our functions:
unsafe fn process_cycles() { free_cycles(); collect_cycles(); sigma_preparation(); } unsafe fn collect_cycles() { mark_roots(); scan_roots(); collect_roots(); }
4. Testing
Garbage Collection is very hard to properly test. Often times testing is done via metrics:
For example, there were x1 allocations time t1, there were x2 allocations
at time t2, t1 < t2 and x1 > x2
so the garbage collector did some work over t2 – t1.
Because we are building our garbage collector on top of a system that already supports reference
counted smart pointers, we can use an Arc
to analyse the operation of our collection algorithms.
By analyzing the strong reference count of an Arc
embedded in a garbage cyclical data structure,
we can determine that our cycle was properly destroyed if the strong reference count for the Arc
decreases.
We could also do this with our own Gc
type, but as we have established only one thread can ever
read the reference count. Therefore, if we want to adapt our unit tests to include the collector
running in parallel, we cannot use them.
This is a pretty nice consequence of allowing Arc
in our Gc
s, we can add a very simple unit
test to our code. It is insufficient amount of testing, but this article is already quite long, so
I will leave the rest up to you dear reader.
Consider the following structure:
The following unit test replicates it:
#[cfg(test)] mod test { use super::*; use crate::gc::*; use std::sync::Arc; #[tokio::test] async fn cycles() { #[derive(Default, Trace)] struct Cyclic { next: Option<Gc<Cyclic>>, out: Option<Arc<()>>, } let out_ptr = Arc::new(()); let a = Gc::new(Cyclic::default()); let b = Gc::new(Cyclic::default()); let c = Gc::new(Cyclic::default()); a.write().await.next = Some(b.clone()); b.write().await.next = Some(c.clone()); b.write().await.out = Some(out_ptr.clone()); c.write().await.next = Some(a.clone()); assert_eq!(Arc::strong_count(&out_ptr), 2); drop(a); drop(b); drop(c); let mut mutation_buffer = Vec::new(); unsafe { process_mutation_buffer(&mut mutation_buffer).await; process_cycles(); process_cycles(); } assert_eq!(Arc::strong_count(&out_ptr), 1); } }
Footnotes