Hello everyone and a merry advent time!
Today I’d like to show case a neat little mechanism that allows building concurrent applications without having to worry about the concurrency much. I’ve built this tool as part of the (yet to be released) Rakudo CI Bot. That’s a middle man that watches GitHub for things to test (both via API hooks that GitHub calls, as well as a regularly called puller), pushes test tasks to one or more CI backends (e.g. Azure), monitors those and pulls results once available and reports the results back to GitHub. It has to deal with a lot of externally triggered, thus naturally concurrent events.
So there is a fundamental need to bring synchronity into the processing of these events so things don’t step on each others toes. This is how I did it.
I’ve created components (simple singleton classes) that each is responsible for one of the incoming event sources. Those are:
- The
GitHubCITestRequesterthat receives CITests requests seen on GitHub both, via a poller and a web hook that GitHub itself calls. - The
OBSbackend that pushes test tasks to the Open Build Service and monitors it for test results. - The
CITestSetManagerthat connects the components. It reads test tasks from the DB and sends them off to the test backends. It also monitors the results and acts whenever all tests for a task completes.
Data is persisted in a DB. Each table and field lies in the responsibility of one of these components. So naturally, because the responsibility of each data unit is clear the components are nicely separated. But the tools often need to hand work over to some other component. They do so by creating the respective DB entries (either new rows or filling out fields) and then notifying that other component that data is waiting to be processed.
This notification needs to check a few boxes:
- The processing should start as soon as possible once work is waiting.
- There should always only be a single worker processing the work of a component. This simplifies the design a lot as I don’t have to synchronize workers.
- The notifiers shouldn’t be blocked.
I modelled the above via a method on the each of the above mentioned component classes called process-worklist. That method looks at the DB, retrieves all rows that want to be processed and does what needs to be done.
But this method can be called from multiple places and there is no guarantee that no worker is active when a call is done.
So I need a mechanism that ensures that
- a call never blocks,
- the method is run when it’s called and no run is in progress,
- when a run is already in progress, queues another run right after the running one finishes,
- but doesn’t pile up runs, because a single run will see and process all the work that was added in the mean time.
I achieved all of the above with a method trait called is serial-dedup. Add that trait to a method and it behaves exactly as described above. This is the code:
unit module SerialDedup;
my class SerialDedupData {
has Semaphore $.sem is rw .= new(1);
has $.run-queued is rw = False;
}
my role SerialDedupStore {
has SerialDedupData %.serial-dedup-store-variable;
}
multi sub trait_mod:<is>(Method $r, :$serial-dedup) is export {
my Lock $setup-lock .= new;
$r.wrap(my method ($obj:) {
my $d;
$setup-lock.protect: {
if !$obj.does(SerialDedupStore) {
$obj does SerialDedupStore;
}
$d := $obj.serial-dedup-store-variable{$r.name} //= SerialDedupData.new;
}
if $d.sem.try_acquire() {
my &next = nextcallee;
$d.run-queued = False;
start {
&next($obj);
$d.sem.release();
$obj.&$r() if $d.run-queued;
}
}
else {
$d.run-queued = True;
}
});
}
It works by wrapping the method the trait was added to. State is persisted in a field injected into the object by doesing a Role holding that field. The rest of the code is a pretty straight forward implementation of the required semantics. It tracks whether a call is running via the $.sem semaphore, and keeps a note whether another run was requested (by a concurrent call to the method) in the $.run-queued boolean. And
always runs the wrapped method in a separate thread.
All in all, I love how straight forward, easy and short the code doing this is. Especially when considering that this mechanism is the heart of the concurrency handling of a non-trivial application.