The Ultimate Breakdown of Kotlin Coroutines. Part 4. Create, Start, Suspend, Intercept.
The previous parts mostly explained what the compiler does with coroutines. However, there is the other side of a coin - runtime support. In Kotlin's case, standard library functions, including intrinsic one, which are responsible for all the essential operations - creation, starting, suspension and interception. There is also a part responsible for resumption, but I already explained it in part 2.
Coroutine Intrinsics
Previous examples had an elementary coroutine builder. They used so-called empty
continuation. Let us now recreate kotlinx.coroutines' async
function, which
runs a coroutine on another thread and then, upon its completion, returns the
result to the main thread.
First, we need a class which waits on the main thread:
class Async<T> {
suspend fun await(): T = TODO()
}
then the root continuation:
class AsyncContinuation<T>: Continuation<T> {
override val context = EmptyCoroutineContext
override fun resumeWith(result: Result<T>) {
TODO()
}
}
Now, we can piece these two classes together. In await
, we should check
whether a coroutine already computed the result and if so, return it by using
it.resume(value)
trick from resume with result section. Otherwise, we should
save the continuation, so it can be resumed when the result is available. Inside
the continuation's resumeWith
, we should check whether we await the result and
resume the awaiting continuation with the computed result; otherwise, we save
the result so that it will be accessible in await
. In code, it will look like:
class AsyncContinuation<T>: Continuation<T> {
var result: T? = null
var awaiting: Continuation<T>? = null
override val context: CoroutineContext
get() = EmptyCoroutineContext
override fun resumeWith(result: Result<T>) {
if (awaiting != null) {
awaiting?.resumeWith(result)
} else {
this.result = result.getOrThrow()
}
}
}
class Async<T>(val ac: AsyncContinuation<T>) {
suspend fun await(): T =
suspendCoroutine<T> {
val res = ac.result
if (res != null)
it.resume(res)
else
ac.awaiting = it
}
}
Finally, we can write the builder itself:
fun <T> async(c: suspend () -> T): Async<T> {
val ac = AsyncContinuation<T>()
c.startCoroutine(ac)
return Async(ac)
}
and simple main function to test, that everything works as expected (again, check with suspension just to be sure we did not miss anything):
fun main() {
var c: Continuation<String>? = null
builder {
val async = async {
println("Async in thread ${Thread.currentThread().id}")
suspendCoroutine<String> { c = it }
}
println("Await in thread ${Thread.currentThread().id}")
println(async.await())
}
c?.resume("OK")
}
Upon running, it will print
Async in thread 1
Await in thread 1
OK
Since it is not multithreaded yet, it will run async
's coroutine in the main
thread. However, before we make it multithreaded, we need to cover how the
suspendCoroutine
function works.
suspendCoroutine
After explaining in depth how resume works, which hoops the compilers jumps
through to generate a (correct) state machine, let's see, what happens, when we
call suspendCoroutine
. We now know two pieces about the function: it somehow
returns COROUTINE_SUSPENDED
and it provides access to continuation parameter.
The function is defined as follows:
public suspend inline fun <T> suspendCoroutine(
crossinline block: (Continuation<T>) -> Unit
): T = suspendCoroutineUninterceptedOrReturn { c: Continuation<T> ->
val safe = SafeContinuation(c.intercepted())
block(safe)
safe.getOrThrow()
}
So, it does five different things:
- accesses continuation argument
- intercepts continuation
- wraps it in
SafeContinuation
- calls the lambda argument
- returns result,
COROUTINE_SUSPENDED
or throws an exception
suspendCoroutineUninterceptedOrReturn
First, let us examine how one can access a continuation argument without
suspending current executions. suspendCoroutineUninterceptedOrReturn
is an
intrinsic function that does only one thing: inlines provided lambda parameter
passing continuation parameter to it. Its purpose is to give access to the
continuation argument, which is invisible in suspend functions and lambdas.
Thus we cannot write in pure Kotlin. It has to be intrinsic.
Fun fact: since the lambda returns returnType | COROUTINE_SUSPENDED
, the
compiler does not check its return type, so there can be some funny CCEs at
runtime because of this unsoundness in the Koltin type system:
import kotlin.coroutines.intrinsics.*
suspend fun returnsInt(): Int =
suspendCoroutineUninterceptedOrReturn { "Nope, it returns String" }
suspend fun main() {
1 + returnsInt()
}
will throw
Exception in thread "main" java.lang.ClassCastException: java.lang.String cannot
be cast to java.lang.Number
Furthermore, the runtime throws the CCE upon the usage of the return value. So, if one just ignores its return value or even better (I mean, worse) calls the function in tail-call position (to enable tail-call optimization), no exception is thrown. So, the next example runs just fine:
import kotlin.coroutines.intrinsics.*
suspend fun returnsInt(): Int =
suspendCoroutineUninterceptedOrReturn { "Nope, it returns String" }
suspend fun alsoReturnsInt(): Int = returnsInt()
suspend fun main() {
returnsInt()
alsoReturnsInt()
}
SafeContinuation
Of course, there is a reason for SafeContinuation
. Let's consider the
following example:
fun builder(c: suspend () -> Unit) {
c.startCoroutine(object: Continuation<Unit> {
override val context = EmptyCoroutineContext
override fun resumeWith(result: Result<Unit>) {
result.getOrThrow()
}
})
}
fun main() {
builder {
suspendCoroutineUninterceptedOrReturn {
it.resumeWithException(IllegalStateException("Boo"))
}
}
}
One might assume, that we will get IllegalStateException
, but this in not what
happens here:
Exception in thread "main" kotlin.KotlinNullPointerException
at kc.jvm.internal.ContinuationImpl.releaseIntercepted(ContinuationImpl.kt:118)
at kc.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:39)
at kc.ContinuationKt.startCoroutine(Continuation.kt:114)
That is an example of undefined behavior.
So, what happens here and why it causes the KNPE? When we call
resumeWithException
, inside BaseContinuationImpl.resumeWith
we call
releaseIntercepted
, where we set intercepted
field to
CompletedContinuation
:
protected override fun releaseIntercepted() {
val intercepted = intercepted
if (intercepted != null && intercepted !== this) {
context[ContinuationInterceptor]!!
.releaseInterceptedContinuation(intercepted)
}
this.intercepted = CompletedContinuation // just in case
}
Then, when we throw the exception by calling getOrThrow
,
BaseContinuationImpl.resumeWith
catches it (see the section about resume with
exception), and calls releaseIntercepted
again, but since there is no
continuation interceptor in context
, we get the KNPE.
That is what essentially SafeContinuation
prevents. It catches an exception
inside its resumeWith
method and saves it until suspendCoroutine
calls
getOrThrow
. Also, getOrThrow
returns COROUTINE_SUSPENDED
for
not-yet-finished coroutines. In other words, when a wrapped coroutine suspends,
getOrThrow
tells suspendCoroutine
to suspend.
startCoroutine
We have already covered how a coroutine suspends, what happens when it resumes
and how the compiler handles it. However, we have never looked at how one can
create or start a coroutine. In all previous examples, one could notice a call
to startCoroutine
. There are two versions of the function: to start a suspend
lambda without parameters and to start a coroutine with either one parameter or
a receiver. It is defined as follows:
public fun <T> (suspend () -> T).startCoroutine(completion: Continuation<T>) {
createCoroutineUnintercepted(completion).intercepted().resume(Unit)
}
So, it
- creates a coroutine
- intercepts it
- starts it
Once again, createCoroutineUnintercepted
has two versions - without parameters
and with exactly one parameter. All it does is calling suspending lambda's
create
function. After the interception, we resume the coroutine with a dummy
value. As explained in the resume with the value section, the state-machine
ignores its first state value. Thus, it is the perfect way to start a coroutine
without calling invokeSuspend
. However, the way we start callable references
is different. Since they are tail-call, in other words, do not have a
continuation inside an object, we wrap them in a hand-written one.
create
create
is generated by the compiler, and it
- creates a copy of the lambda by calling a constructor with captured variables
- puts
create
's arguments into parameter fields.
For example, if we have a lambda like
fun main() {
val i = 1
val lambda: suspend (Int) -> Int = { i + it }
}
the generated create
will look like
public fun create(value: Any?, completion: Continuation): Continuation {
val result = main$lambda$1(this.$i, completion)
result.I$0 = value as Int
}
note that the constructor, in addition to captured parameters, accepts a completion object.
invoke
invoke
is basically startCoroutine
without an interception. In invoke
, we
call create
and resume a new instance with dummy value by calling
invokeSuspend
. We cannot just call invokeSuspend
without calling the
constructor first because that would not create a continuation needed for the
completion chain, as explained in the continuation-passing style section. Also,
recursive suspend lambda calls would reset label
's value.
Interception
After all this boring theory, we can finally turn our async
example from the
previous section into a multithreaded one. In all previous examples I used
EmptyCoroutineContext
as context
for root continuations. CoroutineContext
,
the type of context
property, is essentially a hash map from
CoroutineContext.Key
to CoroutineContext.Element
. A programmer can store
coroutine-local information in it, and here 'coroutine' is used in a broad sense
to represent a lightweight thread, not just a suspend function or a suspend
lambda. So, one can view context
as a replacement of ThreadLocal
. To access
it, the user should use coroutineContext
intrinsic. Even a single context
element is a context itself, so it forms a tree. The fact that one element of
the context is context itself comes in handy when we need to move a coroutine
from one thread to another, i.e., intercept it. In order to do that, we need to
provide the key and the element to the context. There is a special interface
ContinuationInterceptor
, which overrides CoroutineContext.Element
and has a
property key
. Let us create one:
object SingleThreadedInterceptor: ContinuationInterceptor {
override val key = ContinuationInterceptor.Key
override fun <T> interceptContinuation(
continuation: Continuation<T>
): Continuation<T> = SingleThreadedContinuation(continuation)
}
In its method interceptContinuation
we simply wrap provided continuation with
a new one, and in this continuation we can run the coroutine on a different
thread:
class SingleThreadedContinuation<T>(val c: Continuation<T>): Continuation<T> {
override val context: CoroutineContext
get() = c.context
override fun resumeWith(result: Result<T>) {
thread {
c.resumeWith(result)
}
}
}
Inside the resumeWith
function, as one can see, we simply resume the
continuation on another thread.
Note that we pass the context
of provided continuation as our own, so our
continuation inherits it from the wrapped one. That is not required, but since
context
is a replacement for ThreadLocal
, we should keep it. All we are
allowed to do is add additional infrastructural information, like
ContinuationInterceptor
, but we can never remove anything added by the user.
It is important to note that the key
property should be constant. Otherwise,
get
on this key will return null, and there will be no interception.
Now, if we change AsyncContinuation
, async
function and main
to use the
interceptor:
class AsyncContinuation<T>(
override val context: CoroutineContext
): Continuation<T> {
var result: T? = null
var awaiting: Continuation<T>? = null
override fun resumeWith(result: Result<T>) {
if (awaiting != null) {
awaiting?.resumeWith(result)
} else {
this.result = result.getOrThrow()
}
}
}
fun <T> async(
context: CoroutineContext = EmptyCoroutineContext,
c: suspend () -> T
): Async<T> {
val ac = AsyncContinuation<T>(context)
c.startCoroutine(ac)
return Async(ac)
}
fun main() {
var c: Continuation<String>? = null
builder {
val async = async(SingleThreadedIntercepted) {
println("Async in thread ${Thread.currentThread().id}")
suspendCoroutine<String> { c = it }
}
println("Await in thread ${Thread.currentThread().id}")
println(async.await())
}
c?.resume("OK")
}
and when we run the program, we get something like
Async in thread 11
Await in thread 1
OK
as expected.
But what part of coroutine machinery calls interceptContinuation
function of
the interceptor? The function wraps the continuation, but who calls the
function? Well, intercepted
does. If we rewrite
async
as
fun <T> async(
context: CoroutineContext = EmptyCoroutineContext,
c: suspend () -> T
): Async<T> {
val ac = AsyncContinuation<T>(context)
c.createCoroutineUnintercepted(ac)
// .intercepted()
.resume(Unit)
return Async(ac)
}
(note, that I commented intercepted
call out) and then run the example, we get
Async in thread 1
Await in thread 1
OK
since without interception, we do not wrap the continuation to run the coroutine on another thread.
But how intercepted
does that? Well, intercepted
, after some indirections,
does the following:
context[ContinuationInterceptor]?.interceptContinuation(this)
remember, CoroutineContext.Element
is itself a CoroutineContext
with a
single element, which returns itself on get
if its key is the same as the
provided one. That is why it is important to use constants as keys. We also
cache intercepted continuation in the intercepted
field. The field causes KNPE
when we do not wrap the continuation with SafeContinuation
.
Restricted Suspension
There are cases when we do not want to allow calling other suspend functions or
lambdas from ours, for example, inside a lambda, passed to sequence
function,
we want to call only yield
and yieldAll
functions, unless the functions we
call inside the lambda, call yield
or yieldAll
. Furthermore, we do not want
to intercept their continuations. We want to limit them to the main thread. In
this case, we use @RestrictsSuspension
annotation on classes or interfaces,
which contain leaf suspend functions, which the lambda allowed to call. If we
look at sequence
, the SequenceScope
interface is annotated with the
annotation.
Since we do not want to intercept the continuations, their context
s cannot be
other than EmptyCoroutineContext
.
This concludes all the essential coroutines support in the standard library.
Granted, there is still Continuation
class, but I covered it breafly in the
part 2 of the series. In the next part I will finally explain suspend functions,
which I used in the examples, but never explained them properly.
Comments
Post a Comment