The Ultimate Breakdown of Kotlin Coroutines. Part 2. Continuation Passing Style and Resumption.

This is a continuation of series about coroutines internals. In the first part I explained how the compiler turns sequential code into suspendable by turing it into state-machines and how suspension works. This part covers continuation passing style - the reason for two-world model, and the counterpart of suspension - resumption.

Continuation Passing Style

In the first blogpost I touched upon the COROUTINE_SUSPENDED marker and said that suspending functions and lambdas return the marker when they suspend. Consequently, every suspend function return returnType | COROUTINE_SUSPENDED union type. However, since neither Kotlin nor JVM support union types, every coroutine's return type is Any? (also known as java.lang.Object) at runtime.

Let's now look to resume process closely. Suppose we have a couple of coroutines, one of them calls the other:

fun main() {
    val a: suspend () -> Unit = { suspendMe() }
    val b: suspend () -> Unit = { a() }
    builder { b() }
    c?.resume(Unit)
}

suspendMe here, as in the previous example, suspends. Stack trace inside suspendMe look like (skipping non-relevant parts)

suspendMe
main$a$1.invokeSuspend
main$a$1.invoke
main$b$1.invokeSuspend
main$b$1.invoke
// ...
builder
main

as one can see, everything is as expected. main calls builder, which in turns calls b.invoke, and so on until suspendMe. Since suspendMe suspends, it returns COROUTINE_SUSPENDED to a's invokeSuspend. As explained in the state-machine section checks, the caller checks that suspendMe returns COROUTINE_SUSPENDED and, in turn, returns COROUTINE_SUSPENDED. The same happens in all functions in call-stack in reverse order.

With the suspension process explained and out of the way, its counterpart - resumption - is next. When we call c?.resume(Unit). c is, technically, a, since suspendMe is a tail-call function (more on that in the relevant section). resume calls BaseContinuationImpl.resumeWith. BaseContinuationImpl is a superclass of all coroutines, not user-accessible, but used for almost everything coroutines-related that requires a class. It is the core of coroutine machinery, responsible for the resumption process. BaseContinuationImpl, in turn, calls a's invokeSuspend.

So, when we call c?.resume(Unit), the stacktrace becomes

main$a$1.invokeSuspend
BaseContinuationImpl.resumeWith
main

Now a continues its execution and returns Unit. But the execution returns to BaseContinuationImpl.resumeWith. However, we need to continue the execution of b since b called a. In other words, we need to store a link to b somewhere in a, so then, inside BaseContinuationImpl.resumeWith, we can call b's resumeWith, which then resumes the execution of b. Remember, b is a coroutine, and all coroutines inherit BaseContinuationImpl, which has the method resumeWith. Thus, we need to pass b to a. The only place where we can pass b to a is the invoke function call. So, we add a parameter to invoke. a.invoke's signature becomes

fun invoke(c: Continuation<Unit>): Any?

Continuation is a superinterface of all coroutines (unlike BaseContinuationImpl, it is user-accessible), in this case, suspend lambdas. It is at the top of the inheritance chain. The type parameter of continuation is the old return type of suspending lambda. The type parameter is the same as the type parameter of resumeWith's Result parameter: resumeWith(result: Result<Unit>). One might recall from the builder example in the suspending lambda section, where we create a continuation object. The object overrides resumeWith with the same signature.

Adding the continuation parameter to suspend lambdas and functions is known as Continuation-Passing Style, the style actively used in lisps such as Scheme. For example, if a function returns a value in a continuation-passing style in Scheme, it passes the value to the continuation parameter. So, a function accepts the continuation parameter, and the caller passes the continuation by calling call/cc intrinsic. The same happens in Kotlin with passing return value to caller's continuation's resumeWith. However, unlike Scheme, Kotlin does not use something like call/cc. Every coroutine already has a continuation. The caller passes it to the callee as an argument. Since the coroutine passes the return value to resumeWith, its parameter has the same type as the coroutine's return type. Technically, the type is Result<T>, but it is just a union T | Throwable; in this case, T is Unit. The next section uses return types other than Unit to illustrate how to resume a coroutine with a value. The other part, Throwable, is for resuming a coroutine with an exception and is explained in the relevant section.

After we passed parent coroutine's continuation to a child coroutine, we need to store it somewhere. Since "parent coroutine's continuation" is quite long and mouthful for a name, we call it 'completion'. We chose this name because the coroutine calls it upon the completion.

Since we add a continuation parameter to each suspend function and lambda, we cannot call suspending functions or lambdas from ordinary functions, and we cannot call them by passing null as the parameter since the coroutine call resumeWith on it. Instead, we should use coroutine builders, which provide root continuation and start the coroutine. That is the reason for the two worlds model.

Resume With Result

Let us consider the following example with a suspending function, returning a value, instead of Unit:

import kotlin.coroutines.*

var c: Continuation<Int>? = null

suspend fun suspendMe(): Int = suspendCoroutine { continuation ->
    c = continuation
}

fun builder(c: suspend () -> Unit) {
    c.startCoroutine(object: Continuation<Unit> {
        override val context = EmptyCoroutineContext
        override fun resumeWith(result: Result<Unit>) {
            result.getOrThrow()
        }
    })
}

fun main() {
    val a: suspend () -> Unit = { println(suspendMe()) }
    builder { a() }
    c?.resume(42)
}

if one runs the program, it prints 42. However, suspendMe does not return 42. It just suspends and returns nothing. By the way, suspendMe's continuation has type Continuation<Int>, i.e., the compiler moves function's return to type argument of the Continuation interface I mentioned in the previous section (about continuation-passing style).

The state-machine section touched upon the $result variable inside the invokeSuspend function. The listing shows the invokeSuspend function of a, but, unlike the previous example, with its signature:

fun invokeSuspend($result: Any?): Any? {
    when(this.label) {
        0 -> {
            this.label = 1
            $result = suspendMe(this)
            if ($result == COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED
            goto 1
        }
        1 -> {
            println($result)
            return Unit
        }
        else -> {
            throw IllegalStateException("call to 'resume' before 'invoke' with coroutine")
        }
    }
}

The listing shows that the $result variable is both parameter of the function and result of suspending call. Thus, when we call c?.resume(42), the value 42 is passed to BaseContinuationImpl.resumeImpl, it calls invokeSuspend with it. Now, since label's value is 1 (suspendMe suspended), 42 is printed. Note that in the first state, we ignore the argument of invokeSuspend, and this becomes important when we see how we start a coroutine.

So, what happens, when we call resume inside suspendCoroutine? Like

suspendCoroutine<Int> { it.resume(42) }

Following the resume process, resume calls continuation's resumeWith, which calls invokeSuspend with value 42. $result then contains the value and work the same as if suspendMe returned 42. In other words, suspendCoroutine with an unconditional resume will not suspend the coroutine and is semantically the same as returning the value.

It is important to note that passing COROUTINE_SUSPENDED to continuation's resumeWith leads to undefined behavior.

Resume with Exception

After reading the previous section about resume with a value, one might assume that $result's type is Int | COROUTINE_SUSPENDED, but this is not completely true. It is Int | COROUTINE_SUSPENDED | Result$Failue(Throwable), or, more generally, it is returnType | COROUTINE_SUSPENDED | Result$Failue(Throwable). The section covers the last part: Result$Failue(Throwable).

Let us change the previous example to resume the coroutine with exception:

import kotlin.coroutines.*

var c: Continuation<Int>? = null

suspend fun suspendMe(): Int = suspendCoroutine { continuation ->
    c = continuation
}

fun builder(c: suspend () -> Unit) {
    c.startCoroutine(object: Continuation<Unit> {
        override val context = EmptyCoroutineContext
        override fun resumeWith(result: Result<Unit>) {
            println(result.exceptionOrNull())
        }
    })
}

fun main() {
    val a: suspend () -> Unit = { println(1 + suspendMe()) }
    builder { a() }
    c?.resumeWithException(IllegalStateException("BOO"))
}

which, upon running, will print the exception. Note, that it is printed inside the builder function (because of println(result.exceptionOrNull())). There are a couple of things happening here: inside the generated state machine and inside BaseContinuationImpl's resumeWith.

First, we change the generated state machine. As explained before, the type of $result variable is Int | COROUTINE_SUSPENDED | Result$Failue(Throwable), but when we resume, by convention, its type cannot be COROUTINE_SUSPENDED. Still, the type is Int | Result$Failure(Throwable), which we cannot just pass to plus, at least, without a check and CHECKCAST. Otherwise, we will get CCE at runtime. Thus, we check the $result variable and throw the exception if the variable holds it.

fun invokeSuspend($result: Any?): Any? {
    when(this.label) {
        0 -> {
            this.label = 1
            $result = suspendMe(this)
            if ($result == COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED
            goto 1
        }
        1 -> {
            $result.throwOnFailure()
            println(1 + $result)
            return Unit
        }
        else -> {
            throw IllegalStateException("call to 'resume' before 'invoke' with coroutine")
        }
    }
}

where throwOnFailure is a function that performs the check and throwing part for us.

Now, when we throw the exception, it should end up in the main function. However, as we saw from the example, it comes to builder's root continuation's resumeWith. The builder creates the root continuation, and, unlike other continuations, it has no completion. We expect it to reach root continuation, since when we call one suspending function or lambda from another, we want to propagate the exception through suspending stack (also known as an async stack), from callee to caller, regardless of whether there was a suspension, or not, unless, there is an explicit try-catch block. Thankfully, we can propagate the exception the same way as the execution upon coroutine's completion, through the chain of completion fields. We, after all, should pass it to the caller, just like the return value. When invokeSuspend throws an exception, BaseContinuationImpl.resumeWith catches it, wraps into Result inline class, which is essentially T | Result$Failure(Throwable), and calls completion's resumeWith with the result (simplified):

abstract class BaseContinuationImpl(
    private val completion: Continuation<Any?>
): Continuation<Any?> {
    public final override fun resumeWith(result: Result<Any?>) {
        val outcome = try {
            val outcome = invokeSuspend(result)
            if (outcome == COROUTINE_SUSPENDED) return
            Result.success(outcome)
        } catch (e: Throwable) {
            Result.failure(e)
        }
        completion.resumeWith(outcome)
    }

    protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}

The function passes the exception to invokeSuspend, invokeSuspend calls throwOnFailure and throws it again, then the exception is caught in BaseContinuationImpl.resumeWith and wrapped again until it reaches root continuation's resumeWith, where, in this case, the coroutine builder prints it. By the way, resumeWithException works in release coroutines precisely in the same way (except the catching part): it wraps the exception into Result like in a burrito. It passes it to continuation's resumeWith. resume also wraps the argument into Result and passes it to resumeWith.

This concludes the three heads of the hydra - I covered suspension, resumption and state-machines. In the following blogpost I will explain more how a coroutine stores its data before suspension and restores them after resumption.

Comments

Popular posts from this blog

The Ultimate Breakdown of Kotlin Coroutines. Part 1. State-Machines and Suspension.

The Ultimate Breakdown of Kotlin Coroutines. Part 5. Suspend Functions.