public actual open class LockFreeLinkedListHead : LockFreeLinkedListNode() { public actual val isEmpty: Boolean get() = next === this }
public actual open class LockFreeLinkedListNode { // 1 private val _next = atomic<Any>(this) private val _prev = atomic(this) private val _removedRef = atomic<Removed?>(null) }
public final override suspend fun receive(): E { // 1 val result = pollInternal()
@Suppress("UNCHECKED_CAST") if (result !== POLL_FAILED && result !is Closed<*>) return result as E // 2 return receiveSuspend(RECEIVE_THROWS_ON_CLOSE) }
protected open fun pollInternal(): Any? { while (true) { // 3 val send = takeFirstSendOrPeekClosed() ?: return POLL_FAILED val token = send.tryResumeSend(null) if (token != null) { assert { token === RESUME_TOKEN } //4 send.completeResumeSend() return send.pollResult }
send.undeliveredElement() } }
// CancellableContinuationImpl private fun dispatchResume(mode: Int) { if (tryResume()) return // 5 dispatch(mode) }
internal fun <T> DispatchedTask<T>.dispatch(mode: Int) { // 省略 if (!undispatched && delegate is DispatchedContinuation<*> && mode.isCancellableMode == resumeMode.isCancellableMode) {
val dispatcher = delegate.dispatcher val context = delegate.context if (dispatcher.isDispatchNeeded(context)) { // 6 dispatcher.dispatch(context, this) } else { resumeUnconfined() } } else { // 省略 } }
private suspend fun <R> receiveSuspend(receiveMode: Int): R = suspendCancellableCoroutineReusable sc@ { cont -> val receive = if (onUndeliveredElement == null) ReceiveElement(cont as CancellableContinuation<Any?>, receiveMode) else ReceiveElementWithUndeliveredHandler(cont as CancellableContinuation<Any?>, receiveMode, onUndeliveredElement) while (true) { if (enqueueReceive(receive)) { removeReceiveOnCancel(cont, receive) return@sc }
val result = pollInternal() if (result is Closed<*>) { receive.resumeReceiveClosed(result) return@sc } if (result !== POLL_FAILED) { cont.resume(receive.resumeValue(result as E), receive.resumeOnCancellationFun(result as E)) return@sc } } }
internal open class LinkedListChannel<E>(onUndeliveredElement: OnUndeliveredElement<E>?) : AbstractChannel<E>(onUndeliveredElement) { protected final override val isBufferAlwaysEmpty: Boolean get() = true protected final override val isBufferEmpty: Boolean get() = true protected final override val isBufferAlwaysFull: Boolean get() = false protected final override val isBufferFull: Boolean get() = false
protected override fun offerInternal(element: E): Any { while (true) { val result = super.offerInternal(element) when { result === OFFER_SUCCESS -> return OFFER_SUCCESS result === OFFER_FAILED -> { // try to buffer when (val sendResult = sendBuffered(element)) { null -> return OFFER_SUCCESS is Closed<*> -> return sendResult } // otherwise there was receiver in queue, retry super.offerInternal } result is Closed<*> -> return result else -> error("Invalid offerInternal result $result") } } }
protected override fun offerSelectInternal(element: E, select: SelectInstance<*>): Any { while (true) { val result = if (hasReceiveOrClosed) super.offerSelectInternal(element, select) else (select.performAtomicTrySelect(describeSendBuffered(element)) ?: OFFER_SUCCESS) when { result === ALREADY_SELECTED -> return ALREADY_SELECTED result === OFFER_SUCCESS -> return OFFER_SUCCESS result === OFFER_FAILED -> {} // retry result === RETRY_ATOMIC -> {} // retry result is Closed<*> -> return result else -> error("Invalid result $result") } } } }