This post summarises some recent experiments and learnings around concurrency & Koka. There’s no immediate application yet, just a bunch of thoughts which might be interesting if you’re into concurrency, parallelism, or Koka. If you’ve never heard of Koka before that’s OK, you don’t really need any prior knowledge (but I wrote about it here).

Koka and concurrency:

There are a few active avenues of interest when it comes to Koka and concurrency.

First, there’s the async effect, currently implemented in the community stdlib. That effect allows a function to suspend execution and await the result of a callback, as well as the ability to execute multiple async operations concurrently. It’s basically an implementation of the async / await semantics in JS and many other languages, but as a library (rather than built into the compiler). Currently this only supports koka code compiled into the JS backend. Under the hood, an async operation is represented as a continuation function - when the operation is complete, that function is invoked and (from the perspective of the Koka code you write), the async code resumes from where it left off.

There’s also work going on to create libuv bindings for koka. There’s a fully-featured attempt in this community repo, as well as a more minimal version in koka itself with just the core scheduling primitives. This work allows execution of async koka code in compiled binaries (the C target).

Both of these are currently limited to single-thread concurrency1, i.e. concurrency-without-parallelism. This is nothing to sneeze at, it’s done well enough for NodeJS for more than a decade, and it’s a step up from many scripting languages which only support synchronous IO.

Koka and parallelism:

But at the same time, Koka has some great fundamentals when it comes to true parallelism (with multi-threading). The way to share mutable state is via a ref, which is already thread-safe. And Koka’s reference counting algorithm was designed to perform well for both single and multi-threaded environments.

All this is to say: single-threaded concurrency is cool and all, but there’s no technical reason Koka couldn’t support true parallel concurrency like Go, Rust, OCaml and Guile Scheme.

What’s Guile Scheme? Don’t ask me, I’ve never used it. but I think of it often when it comes to concurrency. Years ago, I read Andy Wingo’s excellent series on implementing Concurrent ML primitives in Guile Scheme. It’s stayed in the back of my mind as an interesting point in the concurrency design space, and seems to keep coming up as a lesser-known approach which ought to be more widely known and adopted. See also Concurrent ML has a branding problem, where the tl;dr is “Concurrent ML’s primitives are great but the terms used to describe it are confusing so people ignore it”.

Implementing CML primitives in Koka

Using Andy’s blog (and the guile fibers library) as a guide, I’ve been implementing CML primitives in Koka. I have a branch here if you’re curious, which works with the above libuv basics branch of Koka. I wouldn’t spend much time trying to read or understand it, but you can if you’re curious.

This has been a fun experiment, and I don’t expect it to go anywhere soon. But I’ve learnt some things from it, which I think are worth sharing.

Cancellation in asynchronous code

I care a lot about cancellation. To me, a language without robust support for cancellation of async operations is not a serious language. Any non-toy implementation of happy eyeballs(the “hello world of concurrency”) requires cancellation so that it doesn’t leak resources.

A while back I wrote a PR to make async cancellation first-class. The semantics here are based on Scala’s fs2, which I know fairly well.

But one problem which always bugs me is that if you’re waiting for the first of two operations, they might both happen. Say you’re waiting for input on two sockets:

val message = firstof(
	{ socket1.next-message() },
	{ socket2.next-message() }
)

When the first async branch completes, the second will be canceled, and cancellation hooks will run. But this isn’t atomic, and I don’t think it can be. If different messages arrive on socket1 and socket2, then they’ll both trigger completion of the overall async expression. Only one will win, and the other one will get canceled. But at that point it’s too late - we’ve already read the message off that channel and effectively discarded it.

In this case there are obviously ways around it, like feeding all messages from both sockets into a single channel and reading messages off that, or simply processing each socket in its own async loop.

Cancellation in CML (Concurrent ML)

Cancellation in CML seems much more elegant. In a CML select, there is a known set of operations which are awaiting completion, and only one will complete.

Specifically if you have two CML channels and you select over both of them, like so:

val message = select(
	channel-1.receive-op(),
	channel-2.receive-op()
)

Then it is guaranteed that only one message will be consumed. If messages arrive on channel-1 and channel-2 at the exact same time, they’ll race to commit these receive operations, and only one will succeed because the operations are both associated with a single opstate ref.

Whichever channel was not selected, the corresponding send-op will remain suspended and will not get lost, it’ll just wait around for another receive-op to pair with.

This is good. This feels like a proper system, as opposed to “a bunch of things that happen”.

It also seems efficient, as many of the operations handle cancellation for free (e.g. a channel send operation will naturally clean up cancelled receive operations as part of its execution logic).

Some CML operations don’t get cancellation for free, which somewhat blurs the line between a CML operation and an async evaluation. For example a timeout creates a runtime resource (a timer). If we don’t clean these up we may leak timers. They’d resolve to a harmless no-op so it’s not a correctness issue, but repeated use could result in a significant number of unwanted timers consuming resources.

What is this?

When dealing with a new concept, it can be helpful to explain it as “X but Y”. Having now implemented parts of CML, the simplest way I can describe a CML operation is:

  • a single async suspension (i.e. some code which will be resumed via a callback)
  • … with an opstate reference that’s shared among all operations in the current scope (i.e. all operations in a select())

This opstate is a reference which can have the values Waiting, Claimed and Done. Completing an operation involves atomically transitioning from Waiting -> Done, and there’s a half-committed Claimed state which serves as a transitory state when attempting to complete an operation in a way that may not succeed (i.e could transition back to Waiting).

CML ensures that only one among a set of possible operations succeeds, by sharing the same opstate among all alternative operations. If you select between two receive operations on two different channels, then only one can be completed - code attempting to complete the other will be racing to modify the same opstate, and fail.

Why doesn’t all async code have an opstate?

Now that I understand the key difference, it’s time to wonder why we don’t just do this all the time, if it’s so neat?

It turns out, using an opstate to select between alternatives is of limited use. In Koka, the async effect describes code which may suspend. Chaining together two async functions results in an async function, with an arbitrary number of suspension points.

e.g. the following code is async:`

val message1 = channel.receive()

But the following code is also async:

val message1 = channel.receive()
val message2 = channel.receive()

The second code composes two async operations into another async operation. It’s impossible to tell how many async suspensions might be involed. But an opstate represents a single operation. It’s either Waiting or it’s Done. If a piece of async code suspends at 3 different points, which one represents Done?

CML cannot compose operations

So if we want to use this approach for arbitrary async actions, we’d need to split them into individual operations, where each can suspend at most once, and the operation is Done when it resumes (or completes without suspending).

This is what CML does. It’s a little confusing, because CML is “composable” in that multiple operations can be composed into a single operation which chooses only one sub-operation (that’s the select operation). But this seems to be the only kind of composition possible, you can’t sequence CML operations into larger atomic operations.

Most operations don’t need to be atomic

Atomicity sounds great, but the more I’ve thought about it there’s quite a limited set of places where it matters. One is ovbiously channel receive operations, because taking a message from a channel only to have it go unused is clearly bad. And the same likely goes for channel send operations, although it’s less obvious when that would be a practical concern.

But… is that all? Maybe it is.

Many async operations either can’t be atomic, or don’t need to:

  • Remote interactions (like connecting to a server): it’s impossible for the counterpart to observe or respect the opstate, and typically you’ll dispose the entire connection if you abandon the code path that establishes a connection
  • Reading files / sockets: the read operation can’t easily “put the bytes back in the pipe” if the operation is already completed when they arrive.
  • To workaround this you can stream the contents into a channel, and read from that.
  • Timeouts: the counterparty to the operation (the clock) doesn’t care if the operation happens or not. When the configured time passes, it generates an event. If that event goes unused, it has no effect on the clock.

There may be some circular logic in here - the advice “if you need the operation to be atomic, use a channel” incidentally ensures that only channels need to support atomic operations. But still, I don’t think the majority of async operations benefit from atomic guarantees in practice.

… But atomic operations should be completable by non-atomic ones

One last thing to mention is that even though a timeout doesn’t need to be atomic, it’s important that it can participate in an atomic operation. That is, this should work:

val result: maybe<string> = select([
	channel.receive().map(Just),
	timeout(1.second).as(None)
])

If the channel receive happens to occur in parallel with the one-second timeout and the receive wins, then that’s fine, the timer won’t be offended. But if the timeout wins, we want the channel receive to not happen, so that we don’t lose a message sent to the channel.

So while only channel operations seem to need to be atomic, we do want arbitrary other async operations that we’ll select() over to be atomic-aware. If an atomic operation loses the race, we want to prevent its side effects from occurring.

What would this look like in Koka?

I wasn’t really sure where this experiment would take me when I started. How does the current async effect and the world of CML fit together, if at all? Should CML become the new basis for asynchronous code in Koka?

The answer is much narrower, which is kind of a relief. If we want to support atomic channel operations, they will need to be first class values, representing a single operation. And it should be possible to select between a combination of async expressions and channel operations.

// either an atomic operation or an arbitrary async code
type operation<a>
	AtomicOp(...) // an atomic channel operation
	AsyncOp(f: () -> async a) // wrapper for arbitrary async code

// Return the result of the first operation, canceling all others.
// AtomicOps in this list will occur atomically, but AsyncOps will not.
fun select(operations: list<operation<a>>): async a
	...

Unfortunately, there’s no way to have this function be useful and have straightforward semantics. We want to support async operations, but it will also be possible to apply a single atomic operation, with a function like: fun perform(operation: operation<a>): async a. Which means that we can’t stop someone from doing this:

select([
	channel-1.receive-op(), // AtomicOp
	{ perform(channel-2.receive-op()) }, // AsyncOp
	timeout(1.second),
])

This will mostly work, but with the subtle problem that the receive-op on channel-2 is non-transactional, now that it’s placed within an opaque async expression. And code that wishes to perform multiple channel operations in sequence can only be written as an AsyncOp, again removing the ability to operate atomically.

These are difficult and subtle distinctions to convey to a user, unfortunately. It’s not clear that the benefits would be worth the confusion.

Alternative design: enforce single operations

A more drastic approach would be to remove the ability to race/select() over arbitrary async code, and require that all select operations occur on only singular operations like “channel receive”, “timeout”, etc. Without digging in too deeply, I think this is what Guile does. You can’t define an operation which is “run this async code”, because in guile async code is initiated by spawning fibers. The operation you can select over is “join this fiber” (i.e await its completion), which is a singular operation. So instead of select(fn1, fn2, fn3) you would first start all 3 functions in their own fibers, and then select(fiber1.join, fiber2.join, fiber3.join).

Which actually does have the same problems as I outlined above. If you were to spawn a fiber which just received a single message from a channel, then racing that fiber’s join event with other atomic events would not execute the channel receive atomically. This seems like a less likely mistake to make by accident, because fibers are more tedious to use.

In addition to fibers being more tedious, they typically don’t play well with my other longstanding interest, Structured Concurrency. Koka already supports structured concurrency well, so I think encouraging a fibers-style API would be a big step backwards.

Summing up:

CML is interesting and elegant, and understanding how it works has given me some insights into atomic events in the face of parallel (threaded) concurrency. One day I hope Koka supports true parallel concurrency, at which point we’ll want something like CML’s approach for channels, at least. But exposing atomic cancellation semantics to the user while preserving ease-of-use for arbitrary compositions of async code will likely be a challenge.