Hanbit the Developer
Kotlin Documentation | Asynchronous Flow 본문
Category: Official libraries - Coroutines
문서 링크: https://kotlinlang.org/docs/flow.html
Representing multiple values
Sequences
*remind: lazily하게 작동하는 방식의 컬렉션 → 각 작업이 짧지 않은 시간이 소요될 경우 사용
fun simple(): Sequence<Int> = sequence { // sequence builder
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it
yield(i) // yield next value
}
}
fun main() {
simple().forEach { value -> println(value) }
}
Suspending functions
suspend fun simple(): List<Int> {
delay(1000) // pretend we are doing something asynchronous here
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
simple().forEach { value -> println(value) }
}
Flows
fun simple(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // pretend we are doing something useful here
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
// Launch a concurrent coroutine to check if the main thread is blocked
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// Collect the flow
simple().collect { value -> println(value) }
}
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
- flow()는 Flow의 빌더임
- flow { … }에서 내부 블럭은 정지 함수임
- emit() - collect()를 통해 값을 방출하고 수집할 수 있음
Flows are cold
sequences와 비슷하게, 플로우는 콜드 스트림이기 때문에, 플로우 빌더 내 코드는 플로우가 수집되기 전에 수행되지 않는다.
fun simple(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling simple function...")
val flow = simple() // 여기서 블럭 내부 코드가 수행되지 않는다.
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
Flow cancellation basics
플로우가 cancellable suspending function(like delay) 내에서 정지되었을 때 취소될 수 있다.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // Timeout after 250ms
simple().collect { value -> println(value) }
}
println("Done")
}
Emitting 1
1
Emitting 2
2
Done
Flow builders
flow { … } 빌더는 가장 기본적인 내용이며 다른 빌더들이 있다:
- flowOf(): 고정된 값들을 방출하는 플로우를 정의한다.
- .asFlow(): 컬렉션과 시퀸스를 플로우로 변환한다.
(1..3).asFlow().collect { value -> println(value) }
Intermediate flow operations
Flow는 map과 같은 함수들을 통해 변환될 수 있다. 이 함수들은 cold이고 정지 함수가 아니다. 변환된 새 플로우의 정의를 빠르게 반환한다.
inline fun <T, R> Flow<T>.map(
crossinline transform: suspend (value: T) -> R
): Flow<R>
map, filter처럼 친숙한 이름을 갖고 있으며 가장 중요한 차이점은, 블럭 내에서 정지 함수를 호출할 수 있다는 점이다.
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.map { request -> performRequest(request) }
.collect { response -> println(response) }
}
Transform operator
가장 일반적인 함수는 transform()이다. 해당 함수는 map, filter처럼 단순한 변형부터 복잡한 변환까지 구현할 수 있다.
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
Size-limiting operators
take()를 통해 사이즈를 제한할 수 있다. 추가로 코루틴 취소는 예외를 던짐으로써 진행되기 때문에 try-finally 등 자원 관리 함수가 정상적으로 작동한다.
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> println(value) }
}
1
2
Finally in numbers
Terminal flow operators
Terminal operators on flows are suspending functions.
- collect()
- toList(), toSet()
- first(), single()
- reduce(), fold()
Flows are sequential
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
Flow context
플로우는 호출하는 코루틴 컨텍스트에서 실행된다.
fun simple(): Flow<Int> = flow {
log("Started simple flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> log("Collected $value") }
}
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
simple().collect { … }가 메인 쓰레드에서 호출되었기 때문에 simple()의 플로우 내부 블럭 또한 메인 쓰레드에서 호출된다.
A common pitfall when using withContext
CPU를 많이 사용하는 작업은 Dispatchers.Default에서, UI를 업데이트 하기 위해서는 Dispatchers.Main에서 실행되어야 하며, 이러한 경우를 위해 withContext()를 호출하여 컨텍스트를 바꾸곤 한다.
하지만 flow { … } 빌더 내부 코드는 컨텍스트 보존 속성을 따라야 하며 다른 컨텍스트에서의 emit()을 허용하지 않는다.
fun simple(): Flow<Int> = flow {
// The WRONG way to change context for CPU-consuming code in flow builder
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
emit(i) // emit next value
}
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> println(value) }
}
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@794c5ee5, BlockingEventLoop@75eebb8b],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@8ebc633, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead
at kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext (SafeCollector.common.kt:85)
at kotlinx.coroutines.flow.internal.SafeCollector.checkContext (SafeCollector.kt:106)
at kotlinx.coroutines.flow.internal.SafeCollector.emit (SafeCollector.kt:83)
flowOn operator
flowOn() 함수는 플로우 방출의 컨텍스트를 변경하는 데 쓰인다.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
fun main() = runBlocking<Unit> {
simple().collect { value ->
log("Collected $value")
}
}
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3
이때 값의 방출은 백그라운드에서 진행되는 반면 값 수집은 메인 쓰레드에서 진행된다.
또한 코루틴도 coroutine#1, coroutine#2 두 개로 나뉘었는데, flowOn()이 값 방출을 위해 새로운 코루틴을 생성했기 때문이다.
*추가 정보: withContext()를 사용하면 코루틴은 그대로고 쓰레드(Dispatchers)만 변경된다.
Buffering
without buffer:
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
1
2
3
Collected in 1234 ms
buffer() 함수는 simple()의 방출 코드를 병렬적으로 수행한다.
val time = measureTimeMillis {
simple()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
1
2
3
Collected in 1071 ms
Conflation
한번 시작된 데이터 소비는 끝날 때까지 하고, 데이터 소비가 끝난 시점에서의 가장 최신 데이터를 다시 소비한다.
val time = measureTimeMillis {
simple()
.conflate() // conflate emissions, don't process each one
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
1
3
Collected in 749 ms
Processing the latest value
val time = measureTimeMillis {
simple()
.collectLatest { value -> // cancel & restart on the latest value
println("Collecting $value")
delay(300) // pretend we are processing it for 300 ms
println("Done $value")
}
}
println("Collected in $time ms")
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 698 ms
Composing multiple flows
Zip
val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
.collect { println(it) } // collect and print
1 -> one
2 -> two
3 -> three
Combine
zip:
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
1 -> one at 446 ms from start
2 -> two at 846 ms from start
3 -> three at 1248 ms from start
combine:
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
1 -> one at 439 ms from start
2 -> one at 639 ms from start
2 -> two at 840 ms from start
3 -> two at 939 ms from start
3 -> three at 1241 ms from start
Flattening flows
flatMapConcat, flattenConcat
플로우의 플로우를 연결해준다.
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms
.flatMapConcat {
flow {
emit("$it: First")
delay(500) // wait 500 ms
emit("$it: Second")
}
}
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
1: First at 129 ms from start
1: Second at 630 ms from start
2: First at 730 ms from start
2: Second at 1230 ms from start
3: First at 1331 ms from start
3: Second at 1831 ms from start
flatMapMerge, flattenMerge
병렬적으로 모든 플로우를 수집하고 수집된 값들을 하나의 플로우로 병합하여 값들이 가능한 빨리 방출되게 한다.
concurrency 인자로 최대 병렬 플로우 개수 제한을 걸 수도 있다.
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapMerge {
flow {
emit("$it: First")
delay(500) // wait 500 ms
emit("$it: Second")
}
}
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
1: First at 166 ms from start
2: First at 262 ms from start
3: First at 363 ms from start
1: Second at 666 ms from start
2: Second at 762 ms from start
3: Second at 864 ms from start
flatMapLatest
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapLatest {
flow {
emit("$it: First")
delay(500) // wait 500 ms
emit("$it: Second")
}
}
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
1: First at 170 ms from start
2: First at 292 ms from start
3: First at 394 ms from start
3: Second at 896 ms from start
Flow exceptions
Collector try and catch
fun main() = runBlocking<Unit> {
try {
simple().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
Everything is caught
emitter, intermediate, terminal operator에서 발생하는 어떤 예외든 캐치된다.
Exception transparency
방출하는 쪽이 캡슐화하여 에러를 처리하려면 어떻게 해야하는가?
catch 연산자를 통해 예외 투명성을 보장할 수 있다. 예외를 포착했을 때 다른 방식으로 대응할 수 있다:
- throw로 다시 throw를 던진다.
- emit()
- 따로 처리하지 않고 로그를 찍는다.
fun simple(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
simple()
.catch { e -> emit("Caught $e") } // emit on exception
.collect { value -> println(value) }
}
Transparent catch
catch는 업스트림 예외만 처리할 수 있다.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple()
.catch { e -> println("Caught $e") } // does not catch downstream exceptions
.collect { value ->
**check(value <= 1) { "Collected $value" }**
println(value)
}
}
Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
at FileKt$main$1$2.emit (File.kt:15)
at FileKt$main$1$2.emit (File.kt:14)
at kotlinx.coroutines.flow.FlowKt__ErrorsKt$catchImpl$2.emit (Errors.kt:158)
Catching declaratively
위 예시에서 collect의 내용을 onEach로 옮기고 그 뒤에 catch를 붙이면 처리가 가능하다.
simple()
.onEach { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2
Flow completion
플로우가 종료된 후 어떤 액션을 취해야 할 때가 있다.
Imperative finally block
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} finally {
println("Done")
}
}
1
2
3
Done
Declarative handling
simple()
.onCompletion { println("Done") }
.collect { value -> println(value) }
1
2
3
Done
특히 람다의 인자의 널 여부를 체크하여 정상적으로 종료되었는지를 체크할 수 있다.(null일 경우 성공)
fun simple(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
}
1
Flow completed exceptionally
Caught exception
Imperative versus declarative
선호도나 코드 스타일 등에 의해 결정하면 된다.
Launching flow
특정 플로우를 돌면서 각 아이템 별로 동작을 수행하고자 하는 때가 있다.(addEventListener처럼) 이를 위해 onEach 이후에 collect를 수행할 수 있다:
// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.collect() // <--- Collecting the flow waits
println("Done")
}
Event: 1
Event: 2
Event: 3
Done
이때 onEach는 intermediate operator이기 때문에 collect를 붙여야 동작한다.
collect를 launchIn이라는 terminal operator로 대체하면 별개의 코루틴에서 플로우 수집을 시작한다:
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- Launching the flow in a separate coroutine
println("Done")
}
Done
Event: 1
Event: 2
Event: 3
launchIn은 CoroutineScope를 명시해야 한다. 위 예시에서는 runBlocking의 스코프를 전달함으로써 메인 함수가 플로우 종료를 기다릴 수 있었다.
추가로 launchIn은 Job을 반환한다.
Flow cancellation checks
플로우 빌더는 방출되는 각각의 값들에 대해 ensureActive(코루틴 스코프가 활성화 되었는지 체크)로 체크하는 동작을 추가적으로 수행한다.
fun foo(): Flow<Int> = flow {
for (i in 1..5) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo().collect { value ->
if (value == 3) cancel()
println(value)
}
}
Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled
at kotlinx.coroutines.JobSupport.cancel (JobSupport.kt:1579)
at kotlinx.coroutines.CoroutineScopeKt.cancel (CoroutineScope.kt:287)
at kotlinx.coroutines.CoroutineScopeKt.cancel$default (CoroutineScope.kt:285)
하지만 대부분의 다른 플로우 함수들은 성능을 위해 캔슬 체크를 진행하지 않는다.
fun main() = runBlocking<Unit> {
(1..5).asFlow().collect { value ->
if (value == 3) cancel()
println(value)
}
}
1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@3327bd23
Making busy flow cancellable
캔슬을 체크하기 위해서 .onEach { currentCoroutineContext().ensureActive() }를 추가할 수 있으나 이미 cancellable이라는 사용성이 좋은 함수가 있다:
fun main() = runBlocking<Unit> {
(1..5).asFlow().cancellable().collect { value ->
if (value == 3) cancel()
println(value)
}
}
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job="coroutine#1":BlockingCoroutine{Cancelled}@5ec0a365
'Kotlin' 카테고리의 다른 글
Kotlin Documentation | Coroutine exceptions handlings (0) | 2023.05.26 |
---|---|
Kotlin Documentation | Channels (0) | 2023.05.26 |
Kotlin Documentation | Coroutine context and dispatchers (0) | 2023.05.24 |
Kotlin Documentation | Composing suspending functions (0) | 2023.05.24 |
Kotlin Documentation | Cancellation and timeouts (0) | 2023.05.24 |