Hanbit the Developer

Kotlin Documentation | Asynchronous Flow 본문

Kotlin

Kotlin Documentation | Asynchronous Flow

hanbikan 2023. 5. 24. 19:01

Kotlin Documentation 시리즈에 대해

Category: Official libraries - Coroutines

문서 링크: https://kotlinlang.org/docs/flow.html


Representing multiple values

Sequences

*remind: lazily하게 작동하는 방식의 컬렉션 → 각 작업이 짧지 않은 시간이 소요될 경우 사용

fun simple(): Sequence<Int> = sequence { // sequence builder
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it
        yield(i) // yield next value
    }
}

fun main() {
    simple().forEach { value -> println(value) } 
}

Suspending functions

suspend fun simple(): List<Int> {
    delay(1000) // pretend we are doing something asynchronous here
    return listOf(1, 2, 3)
}

fun main() = runBlocking<Unit> {
    simple().forEach { value -> println(value) } 
}

Flows

fun simple(): Flow<Int> = flow { // flow builder
    for (i in 1..3) {
        delay(100) // pretend we are doing something useful here
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> {
    // Launch a concurrent coroutine to check if the main thread is blocked
    launch {
        for (k in 1..3) {
            println("I'm not blocked $k")
            delay(100)
        }
    }
    // Collect the flow
    simple().collect { value -> println(value) } 
}
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
  • flow()는 Flow의 빌더임
  • flow { … }에서 내부 블럭은 정지 함수임
  • emit() - collect()를 통해 값을 방출하고 수집할 수 있음

Flows are cold

sequences와 비슷하게, 플로우는 콜드 스트림이기 때문에, 플로우 빌더 내 코드는 플로우가 수집되기 전에 수행되지 않는다.

fun simple(): Flow<Int> = flow { 
    println("Flow started")
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    println("Calling simple function...")
    val flow = simple() // 여기서 블럭 내부 코드가 수행되지 않는다.
    println("Calling collect...")
    flow.collect { value -> println(value) } 
    println("Calling collect again...")
    flow.collect { value -> println(value) } 
}
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3

Flow cancellation basics

플로우가 cancellable suspending function(like delay) 내에서 정지되었을 때 취소될 수 있다.

fun simple(): Flow<Int> = flow { 
    for (i in 1..3) {
        delay(100)          
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    withTimeoutOrNull(250) { // Timeout after 250ms 
        simple().collect { value -> println(value) } 
    }
    println("Done")
}
Emitting 1
1
Emitting 2
2
Done

Flow builders

flow { … } 빌더는 가장 기본적인 내용이며 다른 빌더들이 있다:

  • flowOf(): 고정된 값들을 방출하는 플로우를 정의한다.
  • .asFlow(): 컬렉션과 시퀸스를 플로우로 변환한다.
(1..3).asFlow().collect { value -> println(value) }

Intermediate flow operations

Flow는 map과 같은 함수들을 통해 변환될 수 있다. 이 함수들은 cold이고 정지 함수가 아니다. 변환된 새 플로우의 정의를 빠르게 반환한다.

inline fun <T, R> Flow<T>.map(
    crossinline transform: suspend (value: T) -> R
): Flow<R>

map, filter처럼 친숙한 이름을 갖고 있으며 가장 중요한 차이점은, 블럭 내에서 정지 함수를 호출할 수 있다는 점이다.

suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}

fun main() = runBlocking<Unit> {
    (1..3).asFlow() // a flow of requests
        .map { request -> performRequest(request) }
        .collect { response -> println(response) }
}

Transform operator

가장 일반적인 함수는 transform()이다. 해당 함수는 map, filter처럼 단순한 변형부터 복잡한 변환까지 구현할 수 있다.

(1..3).asFlow() // a flow of requests
    .transform { request ->
        emit("Making request $request") 
        emit(performRequest(request)) 
    }
    .collect { response -> println(response) }
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3

Size-limiting operators

take()를 통해 사이즈를 제한할 수 있다. 추가로 코루틴 취소는 예외를 던짐으로써 진행되기 때문에 try-finally 등 자원 관리 함수가 정상적으로 작동한다.

fun numbers(): Flow<Int> = flow {
    try {                          
        emit(1)
        emit(2) 
        println("This line will not execute")
        emit(3)    
    } finally {
        println("Finally in numbers")
    }
}

fun main() = runBlocking<Unit> {
    numbers() 
        .take(2) // take only the first two
        .collect { value -> println(value) }
}
1
2
Finally in numbers

Terminal flow operators

Terminal operators on flows are suspending functions.

  • collect()
  • toList(), toSet()
  • first(), single()
  • reduce(), fold()

Flows are sequential

(1..5).asFlow()
    .filter {
        println("Filter $it")
        it % 2 == 0              
    }              
    .map { 
        println("Map $it")
        "string $it"
    }.collect { 
        println("Collect $it")
    }
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5

Flow context

플로우는 호출하는 코루틴 컨텍스트에서 실행된다.

fun simple(): Flow<Int> = flow {
    log("Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}  

fun main() = runBlocking<Unit> {
    simple().collect { value -> log("Collected $value") } 
}
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3

simple().collect { … }가 메인 쓰레드에서 호출되었기 때문에 simple()의 플로우 내부 블럭 또한 메인 쓰레드에서 호출된다.

A common pitfall when using withContext

CPU를 많이 사용하는 작업은 Dispatchers.Default에서, UI를 업데이트 하기 위해서는 Dispatchers.Main에서 실행되어야 하며, 이러한 경우를 위해 withContext()를 호출하여 컨텍스트를 바꾸곤 한다.

하지만 flow { … } 빌더 내부 코드는 컨텍스트 보존 속성을 따라야 하며 다른 컨텍스트에서의 emit()을 허용하지 않는다.

fun simple(): Flow<Int> = flow {
    // The WRONG way to change context for CPU-consuming code in flow builder
    kotlinx.coroutines.withContext(Dispatchers.Default) {
        for (i in 1..3) {
            Thread.sleep(100) // pretend we are computing it in CPU-consuming way
            emit(i) // emit next value
        }
    }
}

fun main() = runBlocking<Unit> {
    simple().collect { value -> println(value) } 
}
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
		Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@794c5ee5, BlockingEventLoop@75eebb8b],
		but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@8ebc633, Dispatchers.Default].
		Please refer to 'flow' documentation or use 'flowOn' instead
 at kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext (SafeCollector.common.kt:85) 
 at kotlinx.coroutines.flow.internal.SafeCollector.checkContext (SafeCollector.kt:106) 
 at kotlinx.coroutines.flow.internal.SafeCollector.emit (SafeCollector.kt:83)

flowOn operator

flowOn() 함수는 플로우 방출의 컨텍스트를 변경하는 데 쓰인다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) // pretend we are computing it in CPU-consuming way
        log("Emitting $i")
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder

fun main() = runBlocking<Unit> {
    simple().collect { value ->
        log("Collected $value") 
    } 
}
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3

이때 값의 방출은 백그라운드에서 진행되는 반면 값 수집은 메인 쓰레드에서 진행된다.

또한 코루틴도 coroutine#1, coroutine#2 두 개로 나뉘었는데, flowOn()이 값 방출을 위해 새로운 코루틴을 생성했기 때문이다.

*추가 정보: withContext()를 사용하면 코루틴은 그대로고 쓰레드(Dispatchers)만 변경된다.

Buffering

without buffer:

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

fun main() = runBlocking<Unit> { 
    val time = measureTimeMillis {
        simple().collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
    }   
    println("Collected in $time ms")
}
1
2
3
Collected in 1234 ms

buffer() 함수는 simple()의 방출 코드를 병렬적으로 수행한다.

val time = measureTimeMillis {
    simple()
        .buffer() // buffer emissions, don't wait
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
}   
println("Collected in $time ms")
1
2
3
Collected in 1071 ms

Conflation

한번 시작된 데이터 소비는 끝날 때까지 하고, 데이터 소비가 끝난 시점에서의 가장 최신 데이터를 다시 소비한다.

val time = measureTimeMillis {
    simple()
        .conflate() // conflate emissions, don't process each one
        .collect { value -> 
            delay(300) // pretend we are processing it for 300 ms
            println(value) 
        } 
}   
println("Collected in $time ms")
1
3
Collected in 749 ms

Processing the latest value

val time = measureTimeMillis {
    simple()
        .collectLatest { value -> // cancel & restart on the latest value
            println("Collecting $value") 
            delay(300) // pretend we are processing it for 300 ms
            println("Done $value") 
        } 
}   
println("Collected in $time ms")
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 698 ms

Composing multiple flows

Zip

val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings 
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
    .collect { println(it) } // collect and print
1 -> one
2 -> two
3 -> three

Combine

zip:

val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time 
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    }
1 -> one at 446 ms from start
2 -> two at 846 ms from start
3 -> three at 1248 ms from start

combine:

val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms          
val startTime = System.currentTimeMillis() // remember the start time 
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    }
1 -> one at 439 ms from start
2 -> one at 639 ms from start
2 -> two at 840 ms from start
3 -> two at 939 ms from start
3 -> three at 1241 ms from start

Flattening flows

flatMapConcat, flattenConcat

플로우의 플로우를 연결해준다.

val startTime = System.currentTimeMillis() // remember the start time 
(1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms 
    .flatMapConcat {
        flow {
            emit("$it: First") 
            delay(500) // wait 500 ms
            emit("$it: Second")  
        }
    }                                                                           
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    }
1: First at 129 ms from start
1: Second at 630 ms from start
2: First at 730 ms from start
2: Second at 1230 ms from start
3: First at 1331 ms from start
3: Second at 1831 ms from start

flatMapMerge, flattenMerge

병렬적으로 모든 플로우를 수집하고 수집된 값들을 하나의 플로우로 병합하여 값들이 가능한 빨리 방출되게 한다.

concurrency 인자로 최대 병렬 플로우 개수 제한을 걸 수도 있다.

val startTime = System.currentTimeMillis() // remember the start time 
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
    .flatMapMerge {
        flow {
            emit("$it: First") 
            delay(500) // wait 500 ms
            emit("$it: Second")  
        }
    }                                                                           
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    }
1: First at 166 ms from start
2: First at 262 ms from start
3: First at 363 ms from start
1: Second at 666 ms from start
2: Second at 762 ms from start
3: Second at 864 ms from start

flatMapLatest

val startTime = System.currentTimeMillis() // remember the start time 
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms 
    .flatMapLatest {
        flow {
            emit("$it: First") 
            delay(500) // wait 500 ms
            emit("$it: Second")  
        }
    }                                                                           
    .collect { value -> // collect and print 
        println("$value at ${System.currentTimeMillis() - startTime} ms from start") 
    }
1: First at 170 ms from start
2: First at 292 ms from start
3: First at 394 ms from start
3: Second at 896 ms from start

Flow exceptions

Collector try and catch

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value ->         
            println(value)
            check(value <= 1) { "Collected $value" }
        }
    } catch (e: Throwable) {
        println("Caught $e")
    } 
}

Everything is caught

emitter, intermediate, terminal operator에서 발생하는 어떤 예외든 캐치된다.

Exception transparency

방출하는 쪽이 캡슐화하여 에러를 처리하려면 어떻게 해야하는가?

catch 연산자를 통해 예외 투명성을 보장할 수 있다. 예외를 포착했을 때 다른 방식으로 대응할 수 있다:

  • throw로 다시 throw를 던진다.
  • emit()
  • 따로 처리하지 않고 로그를 찍는다.
fun simple(): Flow<String> = 
    flow {
        for (i in 1..3) {
            println("Emitting $i")
            emit(i) // emit next value
        }
    }
    .map { value ->
        check(value <= 1) { "Crashed on $value" }                 
        "string $value"
    }

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> emit("Caught $e") } // emit on exception
        .collect { value -> println(value) }
}

Transparent catch

catch는 업스트림 예외만 처리할 수 있다.

fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        println("Emitting $i")
        emit(i)
    }
}

fun main() = runBlocking<Unit> {
    simple()
        .catch { e -> println("Caught $e") } // does not catch downstream exceptions
        .collect { value ->
            **check(value <= 1) { "Collected $value" }**                 
            println(value) 
        }
}
Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
 at FileKt$main$1$2.emit (File.kt:15) 
 at FileKt$main$1$2.emit (File.kt:14) 
 at kotlinx.coroutines.flow.FlowKt__ErrorsKt$catchImpl$2.emit (Errors.kt:158)

Catching declaratively

위 예시에서 collect의 내용을 onEach로 옮기고 그 뒤에 catch를 붙이면 처리가 가능하다.

simple()
    .onEach { value ->
        check(value <= 1) { "Collected $value" }                 
        println(value) 
    }
    .catch { e -> println("Caught $e") }
    .collect()
Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2

Flow completion

플로우가 종료된 후 어떤 액션을 취해야 할 때가 있다.

Imperative finally block

fun simple(): Flow<Int> = (1..3).asFlow()

fun main() = runBlocking<Unit> {
    try {
        simple().collect { value -> println(value) }
    } finally {
        println("Done")
    }
}
1
2
3
Done

Declarative handling

simple()
    .onCompletion { println("Done") }
    .collect { value -> println(value) }
1
2
3
Done

특히 람다의 인자의 널 여부를 체크하여 정상적으로 종료되었는지를 체크할 수 있다.(null일 경우 성공)

fun simple(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}

fun main() = runBlocking<Unit> {
    simple()
        .onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
        .catch { cause -> println("Caught exception") }
        .collect { value -> println(value) }
}
1
Flow completed exceptionally
Caught exception

Imperative versus declarative

선호도나 코드 스타일 등에 의해 결정하면 된다.

Launching flow

특정 플로우를 돌면서 각 아이템 별로 동작을 수행하고자 하는 때가 있다.(addEventListener처럼) 이를 위해 onEach 이후에 collect를 수행할 수 있다:

// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .collect() // <--- Collecting the flow waits
    println("Done")
}
Event: 1
Event: 2
Event: 3
Done

이때 onEach는 intermediate operator이기 때문에 collect를 붙여야 동작한다.

collect를 launchIn이라는 terminal operator로 대체하면 별개의 코루틴에서 플로우 수집을 시작한다:

fun main() = runBlocking<Unit> {
    events()
        .onEach { event -> println("Event: $event") }
        .launchIn(this) // <--- Launching the flow in a separate coroutine
    println("Done")
}
Done
Event: 1
Event: 2
Event: 3

launchIn은 CoroutineScope를 명시해야 한다. 위 예시에서는 runBlocking의 스코프를 전달함으로써 메인 함수가 플로우 종료를 기다릴 수 있었다.

추가로 launchIn은 Job을 반환한다.

Flow cancellation checks

플로우 빌더는 방출되는 각각의 값들에 대해 ensureActive(코루틴 스코프가 활성화 되었는지 체크)로 체크하는 동작을 추가적으로 수행한다.

fun foo(): Flow<Int> = flow { 
    for (i in 1..5) {
        println("Emitting $i") 
        emit(i) 
    }
}

fun main() = runBlocking<Unit> {
    foo().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}
Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled
 at kotlinx.coroutines.JobSupport.cancel (JobSupport.kt:1579) 
 at kotlinx.coroutines.CoroutineScopeKt.cancel (CoroutineScope.kt:287) 
 at kotlinx.coroutines.CoroutineScopeKt.cancel$default (CoroutineScope.kt:285)

하지만 대부분의 다른 플로우 함수들은 성능을 위해 캔슬 체크를 진행하지 않는다.

fun main() = runBlocking<Unit> {
    (1..5).asFlow().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}
1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23

Making busy flow cancellable

캔슬을 체크하기 위해서 .onEach { currentCoroutineContext().ensureActive() }를 추가할 수 있으나 이미 cancellable이라는 사용성이 좋은 함수가 있다:

fun main() = runBlocking<Unit> {
    (1..5).asFlow().cancellable().collect { value -> 
        if (value == 3) cancel()  
        println(value)
    } 
}
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365