专业的编程技术博客社区

网站首页 > 博客文章 正文

协程不仅仅是并发性(协程不仅仅是并发性还是并发)

baijin 2024-09-06 14:53:49 博客文章 7 ℃ 0 评论


每次听到 Kotlin 协程时,您可能会想到一种简单、简洁且高性能的解决方案来处理异步任务(例如网络请求)。但这是他们唯一的目的吗?让我们考虑一下 Kotlin 协程在并发之外的用途。

Kotlin 协程原语

让我们首先了解协程底层机制是如何工作的。如果我们看一下 Kotlin Coroutines Primitives,它只涉及几个类和函数:

  • Continuation
  • CoroutineContext
  • suspendCoroutine
  • createCoroutine
  • startCoroutine

这就是我们的kotlin-stdlib. 但它们有什么用呢?让我们更深入地了解 Kotlin 协程的设计工作原理。

Continuation

Continuation 只是一个具有两个成员的接口:context和resumeWith:

public interface Continuation<in T> {
    public val context: CoroutineContext
    public fun resumeWith(result: Result<T>)
}

它有什么作用以及目的是什么?Continuation– 字面意思是初级协程。它就像一个经过缓冲的回调,能够传播附加信息并向协程提供有用的执行结果。

我们还没有讨论过CoroutineContext,我们resumeWith暂时考虑一下。

resumeWith

与任何其他回调一样,这是一个在协程完成其工作时调用的函数。它用于kotlin.Result以安全的方式传播协程内部发生的任何异常。

Continuation因此,我们可以使用以下方式创建并发逻辑:

suspend fun executeNetworkRequest(): String = 
    suspendCoroutine { continuation ->
        thread {
            continuation.resumeWith(someBlockingRequest())
        }
    }

suspendCoroutine– 是 kotlin 协程代码和非基于协程的代码之间的桥梁。

或者使用现有的异步(伪代码):

suspend fun executeNetworkRequest(): String =
    suspendCoroutine { continuation ->
        apiService.getSomething()
            // onSuccess & onFailure is a callback
            .onSuccess(continuation::resume)
            .onFailure(continuation::resumeWithException)
            .executeAsync()
    }

Continuation<in T>.resume(..)kotlin.Result是为了避免每次都通过的扩展。

因此,我们不仅可以实现并发逻辑,还可以使用现有的并发逻辑并使其与 Kotlin 协程一起使用。

startCoroutine

此外,我们可以使用从非挂起上下文启动挂起函数startCoroutine。在 Kotlin 中,如果你的main函数是suspend.

kotlinx.coroutines也用它来运行协程,但当然,那里的机制要困难得多。

import kotlin.coroutines.*

fun main() {
    val operation: suspend () -> Unit = ::a
    operation.startCoroutine(
        object : Continuation<Unit> {
            // we will talk about it lower
            override val coroutineContext = EmptyCoroutineContext

            // called when coroutine finished its work
            override fun resumeWith(result: Result<Unit>) {
                println(
                    if(result.isSuccess) 
                        "Successfully done" 
                    else "Error happenned: ${result.exceptionOrNull()}"
                )
            }
        }
    )
}

suspend fun a() {...}

但是,当然,您不能仅在kotlinx.coroutines以这种方式执行协程时调用挂起函数。

CoroutineContext

现在我们来到了另一位成员Continuation—— CoroutineContext。它是做什么用的?

在编程中,我们经常同时处理多个任务,有效地管理它们可能具有挑战性。KotlinCoroutineContext有助于解决这一挑战。它是一个将所需数据传播到协程的提供者。在现实世界中,通常是在复杂的协程链之间传递参数。

更清楚地说,CoroutineContext 代表kotlinx.coroutines结构化并发。

简单的例子

让我们根据之前的示例创建startCoroutine能够从中检索值的代码CoroutineContext:

import kotlin.coroutines.*

// define our container for data we need inside coroutine
// it should always inherit 'CoroutineContext.Element'
data class ExecutionContext(val someInt: Int) : CoroutineContext.Element {
    override val key = EXECUTION_CONTEXT_KEY
}

// define type-safe key using which we will get our value
val EXECUTION_CONTEXT_KEY = object : CoroutineContext.Key<ExecutionContext> {}

// define coroutine context that we will pass into Continuation
private val myCoroutineContext = object : CoroutineContext {
    private val values = mapOf<CoroutineContext.Key<*>, CoroutineContext.Element>(
        EXECUTION_CONTEXT_KEY to ExecutionContext(10000)
    )

    override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? {
        return values[key] as? E
    }

    // .. we omit other functions for simplicity
}

suspend fun a() {
    // here we retrieve value from coroutine context
    // coroutineContext is compiler intrinsic, can be called
    // only from suspend world
    val executionContext = coroutineContext[EXECUTION_CONTEXT_KEY]!!
    println(executionContext.someInt!!)
}

fun main() {
    val operation: suspend () -> Unit = ::a
    operation.startCoroutine(
        object : Continuation<Unit> {
            override val context: CoroutineContext = myCoroutineContext

            override fun resumeWith(result: Result<Unit>) {
                println(
                    if(result.isSuccess) 
                        "Successfully done" 
                    else "Error happenned: ${result.exceptionOrNull()}"
                )
            }
        }
    )
}

CoroutineContext.Element– 是用于存储内部元素的抽象CoroutineContext

CoroutineContext.Key– 是 的标识符CoroutineContext.Element。

您可以在此处使用此代码。

实际项目示例

假设我们有 API 服务。通常,我们需要有一些授权层,所以让我们考虑下一个示例(为此,我采用了 gRPC):

// let's define an element that will persist in `CoroutineContext`
data class AuthorizationContext(
    val accessHash: String?,
    val provider: AuthorizationProvider,
) : CoroutineContext.Element {
    companion object Key : CoroutineContext.Key<AuthorizationContext>

    override val key: CoroutineContext.Key<*> = Key
}

// now we define our interceptor
class AuthorizationInterceptor(
    private val authorizationProvider: AuthorizationProvider,
) : CoroutineContextServerInterceptor() {
    companion object {
        private val ACCESS_TOKEN_METADATA_KEY: Metadata.Key<String> =
            Metadata.Key.of("access-token", Metadata.ASCII_STRING_MARSHALLER)
    }

    override fun coroutineContext(call: ServerCall<*, *>, headers: Metadata): CoroutineContext {
        return AuthorizationContext(
            accessHash = headers.get(ACCESS_TOKEN_METADATA_KEY),
            provider = authorizationProvider,
        )
    }
}

CoroutineContextinkotlinx.coroutines实际上只是 a Map<K : CoroutineContext.Key, V : CoroutineContext.Element>(更准确地说,它是JVM 上的ConcurrentHashMap),与上面的示例中的相同。但是,如果我们谈论kotlinx.coroutines,它会传播到所需协程内的所有子协程(我们没有这样的机制)。

所以,现在我们可以在子协程中获取它:

suspend inline fun provideAuthorization(block: (UserId) -> Unit) {
    val authContext = coroutineContext[AuthorizationContext]
    authContext.accessHash ?: throw StatusException(Status.UNAUTHORIZED)

    val userId = authContext.provider.provide(authContext.accessHash)
    return block(userId)
}

有趣的事实:coroutineContext是 Kotlin 中唯一具有suspend修饰符的属性。

对于 gRPC,我们还需要注册拦截器并编写 RPC。但这个 gRPC 解决方案的想法很简单——解耦逻辑并简化开发人员体验。

对于 Java,gRPC 使用ThreadLocal,因此我们也可以考虑CoroutineContext作为ThreadLocal. 我们不能ThreadLocal在协程中使用,因为通常协程不链接到特定线程(特别是当我们谈论时withContext)。协程更有可能在另一个线程上恢复(此外,不同的协程可以在单个线程上运行)。

但是,这是否意味着协程存在的唯一原因是并发性?让我解释。

Sequence

最常见的例子之一是 – kotlin.sequences.Sequence<T>。简而言之,它是一个惰性集合,仅在您开始使用元素时才进行迭代。您可以在这里阅读更多有关它们的信息。

如果您查看过SequenceScope源代码,它会在幕后使用挂起函数:

@RestrictSuspension
public abstract class SequenceScope<in T> internal constructor() {
    /**
     * Yields a value to the [Iterator] being built and suspends
     * until the next value is requested.
     */
    public abstract suspend fun yield(value: T)

    // ... other
}

@RestrictSuspension禁止消费者调用非成员挂起函数。

所以,这个想法是元素被延迟消耗。您可以将它们用作常规集合并利用惰性迭代。

但它在幕后是如何工作的呢?我们看一下实现源码:

private typealias State = Int  

private const val State_NotReady: State = 0  
private const val State_ManyNotReady: State = 1  
private const val State_ManyReady: State = 2  
private const val State_Ready: State = 3  
private const val State_Done: State = 4  
private const val State_Failed: State = 5  

private class SequenceBuilderIterator<T> : SequenceScope<T>(), Iterator<T>, Continuation<Unit> {  
    private var state = State_NotReady  
    private var nextValue: T? = null  
    private var nextIterator: Iterator<T>? = null  
    var nextStep: Continuation<Unit>? = null  

    override fun hasNext(): Boolean {  
        while (true) {  
            when (state) {  
                State_NotReady -> {}  
                State_ManyNotReady ->  
                if (nextIterator!!.hasNext()) {  
                    state = State_ManyReady  
                    return true  
                } else {  
                    nextIterator = null  
                }  
                State_Done -> return false  
                State_Ready, State_ManyReady -> return true  
                else -> throw exceptionalState()  
            }  

            state = State_Failed  
            val step = nextStep!!  
            nextStep = null  
            step.resume(Unit) // IMPORTANT: it starts executing next yield 
        }  
    }  

    override fun next(): T {  
        when (state) {  
            State_NotReady, State_ManyNotReady -> return nextNotReady()  
            State_ManyReady -> {  
                state = State_ManyNotReady  
                return nextIterator!!.next()  
            }  
            State_Ready -> {  
                state = State_NotReady  
                @Suppress("UNCHECKED_CAST")  
                val result = nextValue as T  
                nextValue = null  
                return result  
            }  
            else -> throw exceptionalState()  
        }  
    }  

    private fun nextNotReady(): T {  
        if (!hasNext()) throw NoSuchElementException() else return next()  
    }  

    private fun exceptionalState(): Throwable = when (state) {  
        State_Done -> NoSuchElementException()  
        State_Failed -> IllegalStateException("Iterator has failed.")  
        else -> IllegalStateException("Unexpected state of the iterator: $state")  
    }  


    override suspend fun yield(value: T) {  
        nextValue = value  
        state = State_Ready  
        return suspendCoroutineUninterceptedOrReturn { c ->  
            nextStep = c  
            COROUTINE_SUSPENDED  
        }  
    }    
    override fun resumeWith(result: Result<Unit>) {  
        result.getOrThrow() // just rethrow exception if it is there  
        state = State_Done  
    }  

    override val context: CoroutineContext  
        get() = EmptyCoroutineContext  
}

COROUTINE_SUSPENDED是 Kotlin 编译器内部使用的一个特殊常量,用于管理协程挂起和恢复。它不是开发人员通常直接交互的东西,而是充当协程机制内的内部信号。

看起来有点难读,不是吗?让我们一步一步来:

  1. 首先,我们从状态开始。我们有接下来的状态,让我们简单地讨论一下它们:State_NotReady:迭代器现在尚未准备好提供项目。它可能正在等待操作或进一步处理以使项目可用。State_ManyNotReady:迭代器已准备好提供多个项目,但它们尚未立即准备好。它正在等待物品已准备好消费的信号(基本上,等待终端操作员)。State_ManyReady:迭代器现在已准备好提供多个项目。它可以立即给出序列中的下一项。State_Ready:迭代器有一个可供提供的项目。当有人询问时,它会立即提供该物品。State_Done:迭代器没有更多项目可提供。它已经完成了从序列中生成元素的工作。当我们离开时我们会达到这种状态SequenceBuilderState_Failed:发生了意外情况,迭代器遇到了问题。通常,这种情况不应该发生。
  2. hasNext基于状态,在准备好使用时返回一个值或一组值。此外,它在 内部的每次迭代中开始执行序列while。因此,如果有State_NotReady,它会通过执行下一个收益来使其准备好。
  3. 该next函数根据当前状态从迭代器中检索下一项(与 相同hasNext)。如果没有下一个被调用hasNext,则可以到达nextNotReady()。在其他情况下,它只会返回值。
  4. yield函数只是改变序列迭代器实现的状态。当添加新元素时,它会更改为State_Ready. 使用suspendCoroutineUninterceptedOrReturn挂起协程(执行)并稍后恢复。当之前的协程(挂起点)完成时,它将启动。

为了完成我的解释,让我们以如何仅通过使用回调来实现相同的功能作为结束:

val sequence = sequence { yield(1) { yield(2) { yield(3) { /* ... */ } } } }

但是,看起来有点难以阅读,不是吗?这就是协程在这种特殊情况下有用的原因。

最后,它看起来并没有那么复杂,对吗?

DeepRecursiveScope

现在,我们来讨论 Kotlin 协程的另一个案例 – DeepRecursiveScope。您可能知道,通常当特定函数调用本身时,我们有可能会遇到这样的情况,StackOverflowError因为每次调用都会将其贡献给我们的堆栈。

例如,出于同样的目的,也存在tailrec语言构造。不同之处在于tailrec不能通过调用其他函数进行分支(条件检查)。

您可以在这里阅读更多内容。

因此,DeepRecursiveScope不依赖于传统的堆栈流,而是使用协程提供的所有功能。

我们不会停留在确切的实现细节上(您可以在此处DeepRecursiveScope查看它),因为它与支持所提供机制的附加行为具有相同的想法,但让我们讨论 Kotlin 协程如何解决这个特定问题。Sequence

内部协程

它究竟是如何解决问题的呢?正如我之前提到的,协程受到CPS(Continuation Passing Style)的启发,但这并不完全是 Kotlin 编译器有效处理协程的方法。

Kotlin 编译器使用优化组合来有效管理协程堆栈和执行。让我们检查一下它到底做了什么:

  • 编译器转换:Kotlin 编译器生成状态机,与我们在Sequences的实现细节中看到的相同。它并没有消除所有堆栈调用,而是将其减少到足以避免遇到StackOverflowError.
  • 连续的堆分配:在传统的回调链中,每个函数调用都会将数据推送到调用堆栈上。如果链很深,这可能会消耗大量堆栈空间。在协程方法中,当协程挂起时,其延续作为对象存储在堆上。该延续对象保存恢复协程执行所需的信息(堆栈、调度程序等)。这种堆存储允许更大的容量来处理深度调用链,而不会带来堆栈溢出的风险。

协程的具体机制如下:

  1. 序列化:挂起的协程的堆栈状态保存在堆分配的延续对象中。
  2. 恢复:当准备好恢复时,框架会设置一个本机堆栈来模拟捕获的状态。
  3. 内存复制:序列化堆栈状态从延续对象复制到本机堆栈。
  4. 上下文配置:配置执行上下文以匹配原始状态。
  5. 程序计数器:程序计数器设置为正确指令的保存值。
  6. 调用:使用 CPS 调用继续代码,恢复执行。

堆栈恢复还可以帮助我们恢复不同的线程,因为它们对协程的堆栈一无所知。

那么,从现在开始我们就可以了解协程内部是如何工作的了。让我们继续讨论 Kotlin 协程在并发之外的其他应用。

Jetpack 组合

如果您曾经使用过 Compose,例如处理指针事件,您可能已经提到过协程使用了一些技巧来监听更新:

@RestrictsSuspension // <---
@JvmDefaultWithCompatibility
interface AwaitPointerEventScope : Density {
    // ...

    suspend fun awaitPointerEvent(
        pass: PointerEventPass = PointerEventPass.Main
    ): PointerEvent

    suspend fun <T> withTimeoutOrNull(
        timeMillis: Long,
        block: suspend AwaitPointerEventScope.() -> T
    ): T? = block()

    suspend fun <T> withTimeout(
        timeMillis: Long,
        block: suspend AwaitPointerEventScope.() -> T
    ): T = block()

    // ...
}

因此,如您所见,用于处理指针事件的范围标记为@RestrictsSuspension。如果我们查看提供的文档,我们将看到以下内容:

This is a restricted suspension scope. Code in this scope is
always called un-dispatched and may only suspend for calls 
to [awaitPointerEvent]. These functions resume synchronously and the caller may mutate the result
**before** the next await call to affect the next stage of the input processing pipeline.

awaitPointerEvent使用 kotlin 原语处理,没有kotlinx.coroutines. 在这种情况下,我们不需要任何kotlinx.coroutines逻辑(它只是在用户操作后从主循环线程调用的回调)。

安卓软件开发工具包

由于我们可以在所有需要回调的情况下使用suspend函数,因此还有其他有用的情况可以应用此类逻辑。例如,在请求权限时:

val result = Warden
    .with(this)
    .requestPermission(Manifest.permission.CALL_PHONE)

when (result) {
    is PermissionState.Denied -> dialNumber(phoneNumber)
    PermissionState.Granted -> startCall(phoneNumber)
}

例如,我们使用Warden库。

结论

总之,本文探讨了 Kotlin 协程的各个方面,强调了它们超越传统并发任务的多功能性。我们深入研究了协程原语的内部工作原理,讨论了它们在序列和复杂的问题解决场景(例如深度递归)中的使用,并研究了展示其广泛适用性的现实世界示例。标题“协程不仅仅是并发”,恰当地反映了 Kotlin 协程在现代软件开发中提供的多样化功能。

本文暂时没有评论,来添加一个吧(●'◡'●)

欢迎 发表评论:

最近发表
标签列表