Hanbit the Developer

Kotlin Documentation | Channels 본문

Mobile/Kotlin

Kotlin Documentation | Channels

hanbikan 2023. 5. 26. 14:21

Kotlin Documentation 시리즈에 대해

Category: Official libraries - Coroutines

문서 링크: https://kotlinlang.org/docs/channels.html


Channel basics

BlockingQueue(”쓰레드 세이프하게 원자를 추가 및 삭제하는 자료구조로, 큐가 꽉 차있거나 비었을 때 쓰레드를 block할 수 있다.” - ChatGPT)와 개념적으로 유사하다. 차이점은 suspending send 함수, suspending receive 함수를 가졌다는 점이다.

val channel = Channel<Int>()
launch {
    // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
    for (x in 1..5) channel.send(x * x)
}
// here we print five received integers:
repeat(5) { println(channel.receive()) }
println("Done!")
1
4
9
16
25
Done!

Closing and iteration over channels

더이상 값이 없다면 채널을 닫을 수 있다. 리시버 입장에선 channel에 대해 for문을 돎으로써 쉽게 채널로부터 오는 값들을 읽을 수 있다.

val channel = Channel<Int>()
launch {
    for (x in 1..5) {
        channel.send(x * x)
        delay(100)
    }
    channel.close() // we're done sending
}
// here we print received values using `for` loop (until the channel is closed)
for (y in channel) println(y)
println("Done!")

Building channel producers

fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
    for (x in 1..5) send(x * x)
}

fun main() = runBlocking {
    val squares = produceSquares()
    squares.consumeEach { println(it) }
    println("Done!")
}

Pipelines

파이프라인은 코루틴이 벨류 스트림을 무한정 생산할 수 있는 패턴이다.

fun main() = runBlocking {
    val numbers = produceNumbers() // produces integers from 1 and on
    val squares = square(numbers) // squares integers
    repeat(5) {
        println(squares.receive()) // print first five
    }
    println("Done!") // we are done
    coroutineContext.cancelChildren() // cancel children coroutines
}

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
}

fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x)
}

An example: Prime numbers with pipeline

fun main() = runBlocking {
    var cur = numbersFrom(2)
    repeat(10) {
        val prime = cur.receive()
        println(prime)
        cur = filter(cur, prime)
    }
    coroutineContext.cancelChildren() // cancel all children to let main finish    
}

fun CoroutineScope.numbersFrom(start: Int) = produce<Int> {
    var x = start
    while (true) send(x++) // infinite stream of integers from start
}

fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
}

iterator coroutine builder로 대체할 수도 있다.(produce → iterator, send → yield, receive → next, ReceiveChannel → Iterator, remove coroutine scope) 하지만 파이프라인을 사용함으로써 Dispatches.Default를 사용한다면 CPU 코어를 여러 개 사용할 수 있다.

해당 예제는 실용적이지 않다. 실제로는 파이프라인이 다른 정지 함수(네트워크 호출 함수 등)를 포함하며, 이러한 파이프라인들은 완전한 비동기인 produce와는 달리, 임시 중단을 허용하지 않기 때문에 sequence / iterator로 빌드될 수 없다.

Fan-out

여러 코루틴이 같은 채널을 수신할 수 있다:

fun main() = runBlocking<Unit> {
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(950)
    producer.cancel() // cancel producer coroutine and thus kill them all
}

fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        println("Processor #$id received $msg")
    }    
}
Processor #2 received 1
Processor #4 received 2
Processor #0 received 3
Processor #1 received 4
Processor #3 received 5
Processor #2 received 6
Processor #4 received 7
Processor #0 received 8
Processor #1 received 9
Processor #3 received 10

producer를 취소하면 코루틴이 수행 중인 채널을 통한 반복 또한 종료된다.

consumeEach와는 달리 for loop pattern은 멀티 코루틴에 완전히 안전하다. 만약 코루틴 하나가 실패한다고 해도 다른 코루틴들은 여전히 채널을 처리하게 된다. 반면 consumeEach로 처리되는 프로세서는 채널을 normal, abnormal completion으로 취소한다.

Fan-in

여러 코루틴이 같은 채널에 send를 할 수 있다.

val channel = Channel<String>()
launch { sendString(channel, "foo", 200L) }
launch { sendString(channel, "BAR!", 500L) }
repeat(6) { // receive first six
    println(channel.receive())
}
coroutineContext.cancelChildren() // cancel all children to let main finish
foo
foo
BAR!
foo
foo
BAR!

Buffered channels

*”Buffered channels을 사용함으로써 프로듀서와 컨슈머를 디커플링하여 blocking과 대기 시간을 줄일 수 있다. 다만 버퍼가 너무 작은 경우 blocking이 자주 발생하며, 너무 큰 경우 불필요한 메모리를 사용하게 된다.” - ChatGPT

버퍼가 없는 채널은 랑데부(sender, receiver가 서로 만날 때)가 발생했을 때 값을 전달한다. send가 먼저 실행되면 receive가 호출될 때까지 중지되며, receive가 먼저 실행되면 send가 호출될 때까지 중지된다.

Channel() 팩토리 함수와 produce 빌더는 버퍼 사이즈를 명시하는 capacity를 인자로 받는다. 버퍼는 sender가 중지되기 전에 여러 값을 send할 수 있게 해준다. 이는 버퍼가 가득차면 블락되는, capacity가 명시된 BlockingQueue와 유사하다.

val channel = Channel<Int>(4) // create buffered channel
val sender = launch { // launch sender coroutine
    repeat(10) {
        println("Sending $it") // print before sending each element
        channel.send(it) // will suspend when buffer is full
    }
}
// don't receive anything... just wait....
delay(1000)
sender.cancel() // cancel sender coroutine
Sending 0
Sending 1
Sending 2
Sending 3
Sending 4

처음 4개는 버퍼에 추가되었으며 5번째 값을 send하려고 할 때 sender가 중지되었다.

Channels are fair

FIFO를 따르기 때문에 공정하다고 할 수 있다.

data class Ball(var hits: Int)

fun main() = runBlocking {
    val table = Channel<Ball>() // a shared table
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0)) // serve the ball
    delay(1000) // delay 1 second
    coroutineContext.cancelChildren() // game over, cancel them
}

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // receive the ball in a loop
        ball.hits++
        println("$name $ball")
        delay(300) // wait a bit
        table.send(ball) // send the ball back
    }
}
ping Ball(hits=1)
pong Ball(hits=2)
ping Ball(hits=3)
pong Ball(hits=4)

Ticker channels

Ticker channel은 가장 마지막 consumption 이후 delay가 통과될 때마다 Unit을 produce하는 특별한 랑데부 채널이다. complex time-based produce pipeline, windowing(sender가 receive 여부를 체크하지 않고 지속적으로 보내는 기법) 작업, 또는 다른 시간에 의존하는 처리를 생성하는 데 유용하다. on tick 액션을 수행하기 위해 select 안에서 사용될 수 있다.

fun main() = runBlocking<Unit> {
    val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Initial element is available immediately: $nextElement") // no initial delay

    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements have 100ms delay
    println("Next element is not ready in 50 ms: $nextElement")

    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 100 ms: $nextElement")

    // Emulate large consumption delays
    println("Consumer pauses for 150ms")
    delay(150)
    // Next element is available immediately
    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Next element is available immediately after large consumer delay: $nextElement")
    // Note that the pause between `receive` calls is taken into account and next element arrives faster
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } 
    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")

    tickerChannel.cancel() // indicate that no more elements are needed
}
Initial element is available immediately: kotlin.Unit
Next element is not ready in 50 ms: null
Next element is ready in 100 ms: kotlin.Unit
Consumer pauses for 150ms
Next element is available immediately after large consumer delay: kotlin.Unit
Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit