The Ultimate Breakdown of Kotlin Coroutines. Part 4. Create, Start, Suspend, Intercept.

The previous parts mostly explained what the compiler does with coroutines. However, there is the other side of a coin - runtime support. In Kotlin's case, standard library functions, including intrinsic one, which are responsible for all the essential operations - creation, starting, suspension and interception. There is also a part responsible for resumption, but I already explained it in part 2.

Coroutine Intrinsics

Previous examples had an elementary coroutine builder. They used so-called empty continuation. Let us now recreate kotlinx.coroutines' async function, which runs a coroutine on another thread and then, upon its completion, returns the result to the main thread.

First, we need a class which waits on the main thread:

class Async<T> {
    suspend fun await(): T = TODO()
}

then the root continuation:

class AsyncContinuation<T>: Continuation<T> {
    override val context = EmptyCoroutineContext

    override fun resumeWith(result: Result<T>) {
        TODO()
    }
}

Now, we can piece these two classes together. In await, we should check whether a coroutine already computed the result and if so, return it by using it.resume(value) trick from resume with result section. Otherwise, we should save the continuation, so it can be resumed when the result is available. Inside the continuation's resumeWith, we should check whether we await the result and resume the awaiting continuation with the computed result; otherwise, we save the result so that it will be accessible in await. In code, it will look like:

class AsyncContinuation<T>: Continuation<T> {
    var result: T? = null
    var awaiting: Continuation<T>? = null
    override val context: CoroutineContext
        get() = EmptyCoroutineContext

    override fun resumeWith(result: Result<T>) {
        if (awaiting != null) {
            awaiting?.resumeWith(result)
        } else {
            this.result = result.getOrThrow()
        }
    }
}

class Async<T>(val ac: AsyncContinuation<T>) {
    suspend fun await(): T =
        suspendCoroutine<T> {
            val res = ac.result
            if (res != null)
                it.resume(res)
            else
                ac.awaiting = it
        }
}

Finally, we can write the builder itself:

fun <T> async(c: suspend () -> T): Async<T> {
    val ac = AsyncContinuation<T>()
    c.startCoroutine(ac)
    return Async(ac)
}

and simple main function to test, that everything works as expected (again, check with suspension just to be sure we did not miss anything):

fun main() {
    var c: Continuation<String>? = null
    builder {
        val async = async {
            println("Async in thread ${Thread.currentThread().id}")
            suspendCoroutine<String> { c = it }
        }
        println("Await in thread ${Thread.currentThread().id}")
        println(async.await())
    }
    c?.resume("OK")
}

Upon running, it will print

Async in thread 1
Await in thread 1
OK

Since it is not multithreaded yet, it will run async's coroutine in the main thread. However, before we make it multithreaded, we need to cover how the suspendCoroutine function works.

suspendCoroutine

After explaining in depth how resume works, which hoops the compilers jumps through to generate a (correct) state machine, let's see, what happens, when we call suspendCoroutine. We now know two pieces about the function: it somehow returns COROUTINE_SUSPENDED and it provides access to continuation parameter. The function is defined as follows:

public suspend inline fun <T> suspendCoroutine(
        crossinline block: (Continuation<T>) -> Unit
): T = suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
    val safe = SafeContinuation(c.intercepted())
    block(safe)
    safe.getOrThrow()
}

So, it does five different things:

  1. accesses continuation argument
  2. intercepts continuation
  3. wraps it in SafeContinuation
  4. calls the lambda argument
  5. returns result, COROUTINE_SUSPENDED or throws an exception

suspendCoroutineUninterceptedOrReturn

First, let us examine how one can access a continuation argument without suspending current executions. suspendCoroutineUninterceptedOrReturn is an intrinsic function that does only one thing: inlines provided lambda parameter passing continuation parameter to it. Its purpose is to give access to the continuation argument, which is invisible in suspend functions and lambdas. Thus we cannot write in pure Kotlin. It has to be intrinsic.

Fun fact: since the lambda returns returnType | COROUTINE_SUSPENDED, the compiler does not check its return type, so there can be some funny CCEs at runtime because of this unsoundness in the Koltin type system:

import kotlin.coroutines.intrinsics.*

suspend fun returnsInt(): Int =
        suspendCoroutineUninterceptedOrReturn { "Nope, it returns String" }

suspend fun main() {
    1 + returnsInt()
}

will throw

Exception in thread "main" java.lang.ClassCastException: java.lang.String cannot
be cast to java.lang.Number

Furthermore, the runtime throws the CCE upon the usage of the return value. So, if one just ignores its return value or even better (I mean, worse) calls the function in tail-call position (to enable tail-call optimization), no exception is thrown. So, the next example runs just fine:

import kotlin.coroutines.intrinsics.*

suspend fun returnsInt(): Int = 
        suspendCoroutineUninterceptedOrReturn { "Nope, it returns String" }

suspend fun alsoReturnsInt(): Int = returnsInt()

suspend fun main() {
    returnsInt()
    alsoReturnsInt()
}

SafeContinuation

Of course, there is a reason for SafeContinuation. Let's consider the following example:

fun builder(c: suspend () -> Unit) {
    c.startCoroutine(object: Continuation<Unit> {
        override val context = EmptyCoroutineContext
        override fun resumeWith(result: Result<Unit>) {
            result.getOrThrow()
        }
    })
}

fun main() {
    builder {
        suspendCoroutineUninterceptedOrReturn {
            it.resumeWithException(IllegalStateException("Boo"))
        }
    }
}

One might assume, that we will get IllegalStateException, but this in not what happens here:

Exception in thread "main" kotlin.KotlinNullPointerException
   at kc.jvm.internal.ContinuationImpl.releaseIntercepted(ContinuationImpl.kt:118)
   at kc.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:39)
   at kc.ContinuationKt.startCoroutine(Continuation.kt:114)

That is an example of undefined behavior.

So, what happens here and why it causes the KNPE? When we call resumeWithException, inside BaseContinuationImpl.resumeWith we call releaseIntercepted, where we set intercepted field to CompletedContinuation:

protected override fun releaseIntercepted() {
    val intercepted = intercepted
    if (intercepted != null && intercepted !== this) {
        context[ContinuationInterceptor]!!
                .releaseInterceptedContinuation(intercepted)
    }
    this.intercepted = CompletedContinuation // just in case
}

Then, when we throw the exception by calling getOrThrow, BaseContinuationImpl.resumeWith catches it (see the section about resume with exception), and calls releaseIntercepted again, but since there is no continuation interceptor in context, we get the KNPE.

That is what essentially SafeContinuation prevents. It catches an exception inside its resumeWith method and saves it until suspendCoroutine calls getOrThrow. Also, getOrThrow returns COROUTINE_SUSPENDED for not-yet-finished coroutines. In other words, when a wrapped coroutine suspends, getOrThrow tells suspendCoroutine to suspend.

startCoroutine

We have already covered how a coroutine suspends, what happens when it resumes and how the compiler handles it. However, we have never looked at how one can create or start a coroutine. In all previous examples, one could notice a call to startCoroutine. There are two versions of the function: to start a suspend lambda without parameters and to start a coroutine with either one parameter or a receiver. It is defined as follows:

public fun <T> (suspend () -> T).startCoroutine(completion: Continuation<T>) {
    createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}

So, it

  1. creates a coroutine
  2. intercepts it
  3. starts it

Once again, createCoroutineUnintercepted has two versions - without parameters and with exactly one parameter. All it does is calling suspending lambda's create function. After the interception, we resume the coroutine with a dummy value. As explained in the resume with the value section, the state-machine ignores its first state value. Thus, it is the perfect way to start a coroutine without calling invokeSuspend. However, the way we start callable references is different. Since they are tail-call, in other words, do not have a continuation inside an object, we wrap them in a hand-written one.

create

create is generated by the compiler, and it

  1. creates a copy of the lambda by calling a constructor with captured variables
  2. puts create's arguments into parameter fields.

For example, if we have a lambda like

fun main() {
    val i = 1
    val lambda: suspend (Int) -> Int = { i + it }
}

the generated create will look like

public fun create(value: Any?, completion: Continuation): Continuation {
    val result = main$lambda$1(this.$i, completion)
    result.I$0 = value as Int
}

note that the constructor, in addition to captured parameters, accepts a completion object.

invoke

invoke is basically startCoroutine without an interception. In invoke, we call create and resume a new instance with dummy value by calling invokeSuspend. We cannot just call invokeSuspend without calling the constructor first because that would not create a continuation needed for the completion chain, as explained in the continuation-passing style section. Also, recursive suspend lambda calls would reset label's value.

Interception

After all this boring theory, we can finally turn our async example from the previous section into a multithreaded one. In all previous examples I used EmptyCoroutineContext as context for root continuations. CoroutineContext, the type of context property, is essentially a hash map from CoroutineContext.Key to CoroutineContext.Element. A programmer can store coroutine-local information in it, and here 'coroutine' is used in a broad sense to represent a lightweight thread, not just a suspend function or a suspend lambda. So, one can view context as a replacement of ThreadLocal. To access it, the user should use coroutineContext intrinsic. Even a single context element is a context itself, so it forms a tree. The fact that one element of the context is context itself comes in handy when we need to move a coroutine from one thread to another, i.e., intercept it. In order to do that, we need to provide the key and the element to the context. There is a special interface ContinuationInterceptor, which overrides CoroutineContext.Element and has a property key. Let us create one:

object SingleThreadedInterceptor: ContinuationInterceptor {
    override val key = ContinuationInterceptor.Key

    override fun <T> interceptContinuation(
            continuation: Continuation<T>
    ): Continuation<T> = SingleThreadedContinuation(continuation)
}

In its method interceptContinuation we simply wrap provided continuation with a new one, and in this continuation we can run the coroutine on a different thread:

class SingleThreadedContinuation<T>(val c: Continuation<T>): Continuation<T> {
    override val context: CoroutineContext
        get() = c.context

    override fun resumeWith(result: Result<T>) {
        thread {
            c.resumeWith(result)
        }
    }
}

Inside the resumeWith function, as one can see, we simply resume the continuation on another thread.

Note that we pass the context of provided continuation as our own, so our continuation inherits it from the wrapped one. That is not required, but since context is a replacement for ThreadLocal, we should keep it. All we are allowed to do is add additional infrastructural information, like ContinuationInterceptor, but we can never remove anything added by the user.

It is important to note that the key property should be constant. Otherwise, get on this key will return null, and there will be no interception.

Now, if we change AsyncContinuation, async function and main to use the interceptor:

class AsyncContinuation<T>(
        override val context: CoroutineContext
): Continuation<T> {
    var result: T? = null
    var awaiting: Continuation<T>? = null

    override fun resumeWith(result: Result<T>) {
        if (awaiting != null) {
            awaiting?.resumeWith(result)
        } else {
            this.result = result.getOrThrow()
        }
    }
}

fun <T> async(
        context: CoroutineContext = EmptyCoroutineContext,
        c: suspend () -> T
): Async<T> {
    val ac = AsyncContinuation<T>(context)
    c.startCoroutine(ac)
    return Async(ac)
}

fun main() {
    var c: Continuation<String>? = null
    builder {
        val async = async(SingleThreadedIntercepted) {
            println("Async in thread ${Thread.currentThread().id}")
            suspendCoroutine<String> { c = it }
        }
        println("Await in thread ${Thread.currentThread().id}")
        println(async.await())
    }
    c?.resume("OK")
}

and when we run the program, we get something like

Async in thread 11
Await in thread 1
OK

as expected.

But what part of coroutine machinery calls interceptContinuation function of the interceptor? The function wraps the continuation, but who calls the function? Well, intercepted does. If we rewrite async as

fun <T> async(
        context: CoroutineContext = EmptyCoroutineContext,
        c: suspend () -> T
): Async<T> {
    val ac = AsyncContinuation<T>(context)
    c.createCoroutineUnintercepted(ac)
//        .intercepted()
        .resume(Unit)
    return Async(ac)
}

(note, that I commented intercepted call out) and then run the example, we get

Async in thread 1
Await in thread 1
OK

since without interception, we do not wrap the continuation to run the coroutine on another thread.

But how intercepted does that? Well, intercepted, after some indirections, does the following:

context[ContinuationInterceptor]?.interceptContinuation(this)

remember, CoroutineContext.Element is itself a CoroutineContext with a single element, which returns itself on get if its key is the same as the provided one. That is why it is important to use constants as keys. We also cache intercepted continuation in the interceptedfield. The field causes KNPE when we do not wrap the continuation with SafeContinuation.

Restricted Suspension

There are cases when we do not want to allow calling other suspend functions or lambdas from ours, for example, inside a lambda, passed to sequence function, we want to call only yield and yieldAll functions, unless the functions we call inside the lambda, call yield or yieldAll. Furthermore, we do not want to intercept their continuations. We want to limit them to the main thread. In this case, we use @RestrictsSuspension annotation on classes or interfaces, which contain leaf suspend functions, which the lambda allowed to call. If we look at sequence, the SequenceScope interface is annotated with the annotation.

Since we do not want to intercept the continuations, their contexts cannot be other than EmptyCoroutineContext.

This concludes all the essential coroutines support in the standard library. Granted, there is still Continuation class, but I covered it breafly in the part 2 of the series. In the next part I will finally explain suspend functions, which I used in the examples, but never explained them properly.

Comments

Popular posts from this blog

The Ultimate Breakdown of Kotlin Coroutines. Part 1. State-Machines and Suspension.

The Ultimate Breakdown of Kotlin Coroutines. Part 5. Suspend Functions.

The Ultimate Breakdown of Kotlin Coroutines. Part 2. Continuation Passing Style and Resumption.