The Ultimate Breakdown of Kotlin Coroutines. Part 2. Continuation Passing Style and Resumption.
This is a continuation of series about coroutines internals. In the first part I explained how the compiler turns sequential code into suspendable by turing it into state-machines and how suspension works. This part covers continuation passing style - the reason for two-world model, and the counterpart of suspension - resumption.
Continuation Passing Style
In the first blogpost I touched upon the COROUTINE_SUSPENDED
marker
and said that suspending functions and lambdas return the marker when they
suspend. Consequently, every suspend function return
returnType | COROUTINE_SUSPENDED
union type. However, since neither Kotlin nor
JVM support union types, every coroutine's return type is Any?
(also known as
java.lang.Object
) at runtime.
Let's now look to resume process closely. Suppose we have a couple of coroutines, one of them calls the other:
fun main() {
val a: suspend () -> Unit = { suspendMe() }
val b: suspend () -> Unit = { a() }
builder { b() }
c?.resume(Unit)
}
suspendMe
here, as in the previous example, suspends. Stack trace inside
suspendMe
look like (skipping non-relevant parts)
suspendMe
main$a$1.invokeSuspend
main$a$1.invoke
main$b$1.invokeSuspend
main$b$1.invoke
// ...
builder
main
as one can see, everything is as expected. main
calls builder
, which in
turns calls b.invoke
, and so on until suspendMe
. Since suspendMe
suspends,
it returns COROUTINE_SUSPENDED
to a
's invokeSuspend
. As explained in the
state-machine section checks, the caller checks that suspendMe
returns
COROUTINE_SUSPENDED
and, in turn, returns COROUTINE_SUSPENDED
. The same
happens in all functions in call-stack in reverse order.
With the suspension process explained and out of the way, its counterpart -
resumption - is next. When we call c?.resume(Unit)
. c
is, technically, a
,
since suspendMe
is a tail-call function (more on that in the relevant
section). resume
calls BaseContinuationImpl.resumeWith
.
BaseContinuationImpl
is a superclass of all coroutines, not user-accessible,
but used for almost everything coroutines-related that requires a class. It is
the core of coroutine machinery, responsible for the resumption process.
BaseContinuationImpl
, in turn, calls a
's invokeSuspend
.
So, when we call c?.resume(Unit)
, the stacktrace becomes
main$a$1.invokeSuspend
BaseContinuationImpl.resumeWith
main
Now a
continues its execution and returns Unit
. But the execution returns to
BaseContinuationImpl.resumeWith
. However, we need to continue the execution of
b
since b
called a
. In other words, we need to store a link to b
somewhere in a
, so then, inside BaseContinuationImpl.resumeWith
, we can call
b
's resumeWith
, which then resumes the execution of b
. Remember, b
is a
coroutine, and all coroutines inherit BaseContinuationImpl
, which has the
method resumeWith
. Thus, we need to pass b
to a
. The only place where we
can pass b
to a
is the invoke
function call. So, we add a parameter to
invoke
. a.invoke
's signature becomes
fun invoke(c: Continuation<Unit>): Any?
Continuation
is a superinterface of all coroutines (unlike
BaseContinuationImpl
, it is user-accessible), in this case, suspend lambdas.
It is at the top of the inheritance chain. The type parameter of continuation is
the old return type of suspending lambda. The type parameter is the same as the
type parameter of resumeWith
's Result
parameter:
resumeWith(result: Result<Unit>)
. One might recall from the builder
example
in the suspending lambda section, where we create a continuation object. The
object overrides resumeWith
with the same signature.
Adding the continuation
parameter to suspend lambdas and functions is known as
Continuation-Passing Style, the style actively used in lisps such as Scheme.
For example, if a function returns a value in a continuation-passing style in
Scheme, it passes the value to the continuation parameter. So, a function
accepts the continuation parameter, and the caller passes the continuation by
calling call/cc
intrinsic. The same happens in Kotlin with passing return
value to caller's continuation's resumeWith
. However, unlike Scheme, Kotlin
does not use something like call/cc
. Every coroutine already has a
continuation. The caller passes it to the callee as an argument. Since the
coroutine passes the return value to resumeWith
, its parameter has the same
type as the coroutine's return type. Technically, the type is Result<T>
, but
it is just a union T | Throwable
; in this case, T
is Unit
. The next
section uses return types other than Unit
to illustrate how to resume a
coroutine with a value. The other part, Throwable
, is for resuming a coroutine
with an exception and is explained in the relevant section.
After we passed parent coroutine's continuation to a child coroutine, we need to store it somewhere. Since "parent coroutine's continuation" is quite long and mouthful for a name, we call it 'completion'. We chose this name because the coroutine calls it upon the completion.
Since we add a continuation parameter to each suspend function and lambda, we
cannot call suspending functions or lambdas from ordinary functions, and we
cannot call them by passing null as the parameter since the coroutine call
resumeWith
on it. Instead, we should use coroutine builders, which provide
root continuation and start the coroutine. That is the reason for the two
worlds model.
Resume With Result
Let us consider the following example with a suspending function, returning a
value, instead of Unit
:
import kotlin.coroutines.*
var c: Continuation<Int>? = null
suspend fun suspendMe(): Int = suspendCoroutine { continuation ->
c = continuation
}
fun builder(c: suspend () -> Unit) {
c.startCoroutine(object: Continuation<Unit> {
override val context = EmptyCoroutineContext
override fun resumeWith(result: Result<Unit>) {
result.getOrThrow()
}
})
}
fun main() {
val a: suspend () -> Unit = { println(suspendMe()) }
builder { a() }
c?.resume(42)
}
if one runs the program, it prints 42
. However, suspendMe
does not return
42
. It just suspends and returns nothing. By the way, suspendMe
's
continuation has type Continuation<Int>
, i.e., the compiler moves function's
return to type argument of the Continuation
interface I mentioned in the
previous section (about continuation-passing style).
The state-machine section touched upon the $result
variable inside the
invokeSuspend
function. The listing shows the invokeSuspend
function of a
,
but, unlike the previous example, with its signature:
fun invokeSuspend($result: Any?): Any? {
when(this.label) {
0 -> {
this.label = 1
$result = suspendMe(this)
if ($result == COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED
goto 1
}
1 -> {
println($result)
return Unit
}
else -> {
throw IllegalStateException("call to 'resume' before 'invoke' with coroutine")
}
}
}
The listing shows that the $result
variable is both parameter of the function
and result of suspending call. Thus, when we call c?.resume(42)
, the value
42
is passed to BaseContinuationImpl.resumeImpl
, it calls invokeSuspend
with it. Now, since label
's value is 1
(suspendMe
suspended), 42
is
printed. Note that in the first state, we ignore the argument of
invokeSuspend
, and this becomes important when we see how we start a
coroutine.
So, what happens, when we call resume
inside suspendCoroutine
? Like
suspendCoroutine<Int> { it.resume(42) }
Following the resume process, resume
calls continuation's resumeWith
, which
calls invokeSuspend
with value 42
. $result
then contains the value and
work the same as if suspendMe
returned 42
. In other words,
suspendCoroutine
with an unconditional resume will not suspend the coroutine
and is semantically the same as returning the value.
It is important to note that passing COROUTINE_SUSPENDED
to continuation's
resumeWith
leads to undefined behavior.
Resume with Exception
After reading the previous section about resume with a value, one might assume
that $result
's type is Int | COROUTINE_SUSPENDED
, but this is not completely
true. It is Int | COROUTINE_SUSPENDED | Result$Failue(Throwable)
, or, more
generally, it is returnType | COROUTINE_SUSPENDED | Result$Failue(Throwable)
.
The section covers the last part: Result$Failue(Throwable)
.
Let us change the previous example to resume the coroutine with exception:
import kotlin.coroutines.*
var c: Continuation<Int>? = null
suspend fun suspendMe(): Int = suspendCoroutine { continuation ->
c = continuation
}
fun builder(c: suspend () -> Unit) {
c.startCoroutine(object: Continuation<Unit> {
override val context = EmptyCoroutineContext
override fun resumeWith(result: Result<Unit>) {
println(result.exceptionOrNull())
}
})
}
fun main() {
val a: suspend () -> Unit = { println(1 + suspendMe()) }
builder { a() }
c?.resumeWithException(IllegalStateException("BOO"))
}
which, upon running, will print the exception. Note, that it is printed inside
the builder
function (because of println(result.exceptionOrNull())
). There
are a couple of things happening here: inside the generated state machine and
inside BaseContinuationImpl
's resumeWith
.
First, we change the generated state machine. As explained before, the type of
$result
variable is Int | COROUTINE_SUSPENDED | Result$Failue(Throwable)
,
but when we resume, by convention, its type cannot be COROUTINE_SUSPENDED
.
Still, the type is Int | Result$Failure(Throwable)
, which we cannot just pass
to plus
, at least, without a check and CHECKCAST
. Otherwise, we will get CCE
at runtime. Thus, we check the $result
variable and throw the exception if the
variable holds it.
fun invokeSuspend($result: Any?): Any? {
when(this.label) {
0 -> {
this.label = 1
$result = suspendMe(this)
if ($result == COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED
goto 1
}
1 -> {
$result.throwOnFailure()
println(1 + $result)
return Unit
}
else -> {
throw IllegalStateException("call to 'resume' before 'invoke' with coroutine")
}
}
}
where throwOnFailure
is a function that performs the check and throwing part
for us.
Now, when we throw the exception, it should end up in the main
function.
However, as we saw from the example, it comes to builder
's root continuation's
resumeWith
. The builder creates the root continuation, and, unlike other
continuations, it has no completion. We expect it to reach root continuation,
since when we call one suspending function or lambda from another, we want to
propagate the exception through suspending stack (also known as an async stack),
from callee to caller, regardless of whether there was a suspension, or not,
unless, there is an explicit try-catch block. Thankfully, we can propagate the
exception the same way as the execution upon coroutine's completion, through the
chain of completion
fields. We, after all, should pass it to the caller, just
like the return value. When invokeSuspend
throws an exception,
BaseContinuationImpl.resumeWith
catches it, wraps into Result
inline class,
which is essentially T | Result$Failure(Throwable)
, and calls completion
's
resumeWith
with the result (simplified):
abstract class BaseContinuationImpl(
private val completion: Continuation<Any?>
): Continuation<Any?> {
public final override fun resumeWith(result: Result<Any?>) {
val outcome = try {
val outcome = invokeSuspend(result)
if (outcome == COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (e: Throwable) {
Result.failure(e)
}
completion.resumeWith(outcome)
}
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}
The function passes the exception to invokeSuspend
, invokeSuspend
calls
throwOnFailure
and throws it again, then the exception is caught in
BaseContinuationImpl.resumeWith
and wrapped again until it reaches root
continuation's resumeWith
, where, in this case, the coroutine builder prints
it. By the way, resumeWithException
works in release coroutines precisely in
the same way (except the catching part): it wraps the exception into Result
like in a burrito. It passes it to continuation's resumeWith
. resume
also
wraps the argument into Result
and passes it to resumeWith
.
This concludes the three heads of the hydra - I covered suspension, resumption and state-machines. In the following blogpost I will explain more how a coroutine stores its data before suspension and restores them after resumption.
Comments
Post a Comment