每次听到 Kotlin 协程时,您可能会想到一种简单、简洁且高性能的解决方案来处理异步任务(例如网络请求)。但这是他们唯一的目的吗?让我们考虑一下 Kotlin 协程在并发之外的用途。
Kotlin 协程原语
让我们首先了解协程底层机制是如何工作的。如果我们看一下 Kotlin Coroutines Primitives,它只涉及几个类和函数:
- Continuation
- CoroutineContext
- suspendCoroutine
- createCoroutine
- startCoroutine
这就是我们的kotlin-stdlib. 但它们有什么用呢?让我们更深入地了解 Kotlin 协程的设计工作原理。
Continuation
Continuation 只是一个具有两个成员的接口:context和resumeWith:
public interface Continuation<in T> {
public val context: CoroutineContext
public fun resumeWith(result: Result<T>)
}
它有什么作用以及目的是什么?Continuation– 字面意思是初级协程。它就像一个经过缓冲的回调,能够传播附加信息并向协程提供有用的执行结果。
我们还没有讨论过CoroutineContext,我们resumeWith暂时考虑一下。
resumeWith
与任何其他回调一样,这是一个在协程完成其工作时调用的函数。它用于kotlin.Result以安全的方式传播协程内部发生的任何异常。
Continuation因此,我们可以使用以下方式创建并发逻辑:
suspend fun executeNetworkRequest(): String =
suspendCoroutine { continuation ->
thread {
continuation.resumeWith(someBlockingRequest())
}
}
suspendCoroutine– 是 kotlin 协程代码和非基于协程的代码之间的桥梁。
或者使用现有的异步(伪代码):
suspend fun executeNetworkRequest(): String =
suspendCoroutine { continuation ->
apiService.getSomething()
// onSuccess & onFailure is a callback
.onSuccess(continuation::resume)
.onFailure(continuation::resumeWithException)
.executeAsync()
}
Continuation<in T>.resume(..)kotlin.Result是为了避免每次都通过的扩展。
因此,我们不仅可以实现并发逻辑,还可以使用现有的并发逻辑并使其与 Kotlin 协程一起使用。
startCoroutine
此外,我们可以使用从非挂起上下文启动挂起函数startCoroutine。在 Kotlin 中,如果你的main函数是suspend.
kotlinx.coroutines也用它来运行协程,但当然,那里的机制要困难得多。
import kotlin.coroutines.*
fun main() {
val operation: suspend () -> Unit = ::a
operation.startCoroutine(
object : Continuation<Unit> {
// we will talk about it lower
override val coroutineContext = EmptyCoroutineContext
// called when coroutine finished its work
override fun resumeWith(result: Result<Unit>) {
println(
if(result.isSuccess)
"Successfully done"
else "Error happenned: ${result.exceptionOrNull()}"
)
}
}
)
}
suspend fun a() {...}
但是,当然,您不能仅在kotlinx.coroutines以这种方式执行协程时调用挂起函数。
CoroutineContext
现在我们来到了另一位成员Continuation—— CoroutineContext。它是做什么用的?
在编程中,我们经常同时处理多个任务,有效地管理它们可能具有挑战性。KotlinCoroutineContext有助于解决这一挑战。它是一个将所需数据传播到协程的提供者。在现实世界中,通常是在复杂的协程链之间传递参数。
更清楚地说,CoroutineContext 代表kotlinx.coroutines结构化并发。
简单的例子
让我们根据之前的示例创建startCoroutine能够从中检索值的代码CoroutineContext:
import kotlin.coroutines.*
// define our container for data we need inside coroutine
// it should always inherit 'CoroutineContext.Element'
data class ExecutionContext(val someInt: Int) : CoroutineContext.Element {
override val key = EXECUTION_CONTEXT_KEY
}
// define type-safe key using which we will get our value
val EXECUTION_CONTEXT_KEY = object : CoroutineContext.Key<ExecutionContext> {}
// define coroutine context that we will pass into Continuation
private val myCoroutineContext = object : CoroutineContext {
private val values = mapOf<CoroutineContext.Key<*>, CoroutineContext.Element>(
EXECUTION_CONTEXT_KEY to ExecutionContext(10000)
)
override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? {
return values[key] as? E
}
// .. we omit other functions for simplicity
}
suspend fun a() {
// here we retrieve value from coroutine context
// coroutineContext is compiler intrinsic, can be called
// only from suspend world
val executionContext = coroutineContext[EXECUTION_CONTEXT_KEY]!!
println(executionContext.someInt!!)
}
fun main() {
val operation: suspend () -> Unit = ::a
operation.startCoroutine(
object : Continuation<Unit> {
override val context: CoroutineContext = myCoroutineContext
override fun resumeWith(result: Result<Unit>) {
println(
if(result.isSuccess)
"Successfully done"
else "Error happenned: ${result.exceptionOrNull()}"
)
}
}
)
}
CoroutineContext.Element– 是用于存储内部元素的抽象CoroutineContext
CoroutineContext.Key– 是 的标识符CoroutineContext.Element。
您可以在此处使用此代码。
实际项目示例
假设我们有 API 服务。通常,我们需要有一些授权层,所以让我们考虑下一个示例(为此,我采用了 gRPC):
// let's define an element that will persist in `CoroutineContext`
data class AuthorizationContext(
val accessHash: String?,
val provider: AuthorizationProvider,
) : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<AuthorizationContext>
override val key: CoroutineContext.Key<*> = Key
}
// now we define our interceptor
class AuthorizationInterceptor(
private val authorizationProvider: AuthorizationProvider,
) : CoroutineContextServerInterceptor() {
companion object {
private val ACCESS_TOKEN_METADATA_KEY: Metadata.Key<String> =
Metadata.Key.of("access-token", Metadata.ASCII_STRING_MARSHALLER)
}
override fun coroutineContext(call: ServerCall<*, *>, headers: Metadata): CoroutineContext {
return AuthorizationContext(
accessHash = headers.get(ACCESS_TOKEN_METADATA_KEY),
provider = authorizationProvider,
)
}
}
CoroutineContextinkotlinx.coroutines实际上只是 a Map<K : CoroutineContext.Key, V : CoroutineContext.Element>(更准确地说,它是JVM 上的ConcurrentHashMap),与上面的示例中的相同。但是,如果我们谈论kotlinx.coroutines,它会传播到所需协程内的所有子协程(我们没有这样的机制)。
所以,现在我们可以在子协程中获取它:
suspend inline fun provideAuthorization(block: (UserId) -> Unit) {
val authContext = coroutineContext[AuthorizationContext]
authContext.accessHash ?: throw StatusException(Status.UNAUTHORIZED)
val userId = authContext.provider.provide(authContext.accessHash)
return block(userId)
}
有趣的事实:coroutineContext是 Kotlin 中唯一具有suspend修饰符的属性。
对于 gRPC,我们还需要注册拦截器并编写 RPC。但这个 gRPC 解决方案的想法很简单——解耦逻辑并简化开发人员体验。
对于 Java,gRPC 使用ThreadLocal,因此我们也可以考虑CoroutineContext作为ThreadLocal. 我们不能ThreadLocal在协程中使用,因为通常协程不链接到特定线程(特别是当我们谈论时withContext)。协程更有可能在另一个线程上恢复(此外,不同的协程可以在单个线程上运行)。
但是,这是否意味着协程存在的唯一原因是并发性?让我解释。
Sequence
最常见的例子之一是 – kotlin.sequences.Sequence<T>。简而言之,它是一个惰性集合,仅在您开始使用元素时才进行迭代。您可以在这里阅读更多有关它们的信息。
如果您查看过SequenceScope源代码,它会在幕后使用挂起函数:
@RestrictSuspension
public abstract class SequenceScope<in T> internal constructor() {
/**
* Yields a value to the [Iterator] being built and suspends
* until the next value is requested.
*/
public abstract suspend fun yield(value: T)
// ... other
}
@RestrictSuspension禁止消费者调用非成员挂起函数。
所以,这个想法是元素被延迟消耗。您可以将它们用作常规集合并利用惰性迭代。
但它在幕后是如何工作的呢?我们看一下实现源码:
private typealias State = Int
private const val State_NotReady: State = 0
private const val State_ManyNotReady: State = 1
private const val State_ManyReady: State = 2
private const val State_Ready: State = 3
private const val State_Done: State = 4
private const val State_Failed: State = 5
private class SequenceBuilderIterator<T> : SequenceScope<T>(), Iterator<T>, Continuation<Unit> {
private var state = State_NotReady
private var nextValue: T? = null
private var nextIterator: Iterator<T>? = null
var nextStep: Continuation<Unit>? = null
override fun hasNext(): Boolean {
while (true) {
when (state) {
State_NotReady -> {}
State_ManyNotReady ->
if (nextIterator!!.hasNext()) {
state = State_ManyReady
return true
} else {
nextIterator = null
}
State_Done -> return false
State_Ready, State_ManyReady -> return true
else -> throw exceptionalState()
}
state = State_Failed
val step = nextStep!!
nextStep = null
step.resume(Unit) // IMPORTANT: it starts executing next yield
}
}
override fun next(): T {
when (state) {
State_NotReady, State_ManyNotReady -> return nextNotReady()
State_ManyReady -> {
state = State_ManyNotReady
return nextIterator!!.next()
}
State_Ready -> {
state = State_NotReady
@Suppress("UNCHECKED_CAST")
val result = nextValue as T
nextValue = null
return result
}
else -> throw exceptionalState()
}
}
private fun nextNotReady(): T {
if (!hasNext()) throw NoSuchElementException() else return next()
}
private fun exceptionalState(): Throwable = when (state) {
State_Done -> NoSuchElementException()
State_Failed -> IllegalStateException("Iterator has failed.")
else -> IllegalStateException("Unexpected state of the iterator: $state")
}
override suspend fun yield(value: T) {
nextValue = value
state = State_Ready
return suspendCoroutineUninterceptedOrReturn { c ->
nextStep = c
COROUTINE_SUSPENDED
}
}
override fun resumeWith(result: Result<Unit>) {
result.getOrThrow() // just rethrow exception if it is there
state = State_Done
}
override val context: CoroutineContext
get() = EmptyCoroutineContext
}
COROUTINE_SUSPENDED是 Kotlin 编译器内部使用的一个特殊常量,用于管理协程挂起和恢复。它不是开发人员通常直接交互的东西,而是充当协程机制内的内部信号。
看起来有点难读,不是吗?让我们一步一步来:
- 首先,我们从状态开始。我们有接下来的状态,让我们简单地讨论一下它们:State_NotReady:迭代器现在尚未准备好提供项目。它可能正在等待操作或进一步处理以使项目可用。State_ManyNotReady:迭代器已准备好提供多个项目,但它们尚未立即准备好。它正在等待物品已准备好消费的信号(基本上,等待终端操作员)。State_ManyReady:迭代器现在已准备好提供多个项目。它可以立即给出序列中的下一项。State_Ready:迭代器有一个可供提供的项目。当有人询问时,它会立即提供该物品。State_Done:迭代器没有更多项目可提供。它已经完成了从序列中生成元素的工作。当我们离开时我们会达到这种状态SequenceBuilderState_Failed:发生了意外情况,迭代器遇到了问题。通常,这种情况不应该发生。
- hasNext基于状态,在准备好使用时返回一个值或一组值。此外,它在 内部的每次迭代中开始执行序列while。因此,如果有State_NotReady,它会通过执行下一个收益来使其准备好。
- 该next函数根据当前状态从迭代器中检索下一项(与 相同hasNext)。如果没有下一个被调用hasNext,则可以到达nextNotReady()。在其他情况下,它只会返回值。
- yield函数只是改变序列迭代器实现的状态。当添加新元素时,它会更改为State_Ready. 使用suspendCoroutineUninterceptedOrReturn挂起协程(执行)并稍后恢复。当之前的协程(挂起点)完成时,它将启动。
为了完成我的解释,让我们以如何仅通过使用回调来实现相同的功能作为结束:
val sequence = sequence { yield(1) { yield(2) { yield(3) { /* ... */ } } } }
但是,看起来有点难以阅读,不是吗?这就是协程在这种特殊情况下有用的原因。
最后,它看起来并没有那么复杂,对吗?
DeepRecursiveScope
现在,我们来讨论 Kotlin 协程的另一个案例 – DeepRecursiveScope。您可能知道,通常当特定函数调用本身时,我们有可能会遇到这样的情况,StackOverflowError因为每次调用都会将其贡献给我们的堆栈。
例如,出于同样的目的,也存在tailrec语言构造。不同之处在于tailrec不能通过调用其他函数进行分支(条件检查)。
您可以在这里阅读更多内容。
因此,DeepRecursiveScope不依赖于传统的堆栈流,而是使用协程提供的所有功能。
我们不会停留在确切的实现细节上(您可以在此处DeepRecursiveScope查看它),因为它与支持所提供机制的附加行为具有相同的想法,但让我们讨论 Kotlin 协程如何解决这个特定问题。Sequence
内部协程
它究竟是如何解决问题的呢?正如我之前提到的,协程受到CPS(Continuation Passing Style)的启发,但这并不完全是 Kotlin 编译器有效处理协程的方法。
Kotlin 编译器使用优化组合来有效管理协程堆栈和执行。让我们检查一下它到底做了什么:
- 编译器转换:Kotlin 编译器生成状态机,与我们在Sequences的实现细节中看到的相同。它并没有消除所有堆栈调用,而是将其减少到足以避免遇到StackOverflowError.
- 连续的堆分配:在传统的回调链中,每个函数调用都会将数据推送到调用堆栈上。如果链很深,这可能会消耗大量堆栈空间。在协程方法中,当协程挂起时,其延续作为对象存储在堆上。该延续对象保存恢复协程执行所需的信息(堆栈、调度程序等)。这种堆存储允许更大的容量来处理深度调用链,而不会带来堆栈溢出的风险。
协程的具体机制如下:
- 序列化:挂起的协程的堆栈状态保存在堆分配的延续对象中。
- 恢复:当准备好恢复时,框架会设置一个本机堆栈来模拟捕获的状态。
- 内存复制:序列化堆栈状态从延续对象复制到本机堆栈。
- 上下文配置:配置执行上下文以匹配原始状态。
- 程序计数器:程序计数器设置为正确指令的保存值。
- 调用:使用 CPS 调用继续代码,恢复执行。
堆栈恢复还可以帮助我们恢复不同的线程,因为它们对协程的堆栈一无所知。
那么,从现在开始我们就可以了解协程内部是如何工作的了。让我们继续讨论 Kotlin 协程在并发之外的其他应用。
Jetpack 组合
如果您曾经使用过 Compose,例如处理指针事件,您可能已经提到过协程使用了一些技巧来监听更新:
@RestrictsSuspension // <---
@JvmDefaultWithCompatibility
interface AwaitPointerEventScope : Density {
// ...
suspend fun awaitPointerEvent(
pass: PointerEventPass = PointerEventPass.Main
): PointerEvent
suspend fun <T> withTimeoutOrNull(
timeMillis: Long,
block: suspend AwaitPointerEventScope.() -> T
): T? = block()
suspend fun <T> withTimeout(
timeMillis: Long,
block: suspend AwaitPointerEventScope.() -> T
): T = block()
// ...
}
因此,如您所见,用于处理指针事件的范围标记为@RestrictsSuspension。如果我们查看提供的文档,我们将看到以下内容:
This is a restricted suspension scope. Code in this scope is
always called un-dispatched and may only suspend for calls
to [awaitPointerEvent]. These functions resume synchronously and the caller may mutate the result
**before** the next await call to affect the next stage of the input processing pipeline.
awaitPointerEvent使用 kotlin 原语处理,没有kotlinx.coroutines. 在这种情况下,我们不需要任何kotlinx.coroutines逻辑(它只是在用户操作后从主循环线程调用的回调)。
安卓软件开发工具包
由于我们可以在所有需要回调的情况下使用suspend函数,因此还有其他有用的情况可以应用此类逻辑。例如,在请求权限时:
val result = Warden
.with(this)
.requestPermission(Manifest.permission.CALL_PHONE)
when (result) {
is PermissionState.Denied -> dialNumber(phoneNumber)
PermissionState.Granted -> startCall(phoneNumber)
}
例如,我们使用Warden库。
结论
总之,本文探讨了 Kotlin 协程的各个方面,强调了它们超越传统并发任务的多功能性。我们深入研究了协程原语的内部工作原理,讨论了它们在序列和复杂的问题解决场景(例如深度递归)中的使用,并研究了展示其广泛适用性的现实世界示例。标题“协程不仅仅是并发”,恰当地反映了 Kotlin 协程在现代软件开发中提供的多样化功能。
本文暂时没有评论,来添加一个吧(●'◡'●)