Hanbit the Developer

Coroutines Flow | SharingStarted 본문

Kotlin

Coroutines Flow | SharingStarted

hanbikan 2024. 2. 6. 18:19

배경

구현을 하다보면 Room이나 DataStore에서의 Flow를 stateIn 함수를 통해 StateFlow로 변환하여 사용하는 경우가 잦다.

public fun <T> Flow<T>.stateIn(
    scope: CoroutineScope,
    started: SharingStarted,
    initialValue: T
)

scope, initialValue는 어떤 것인지 명확하게 알 수 있으나 started는 아니다. started 인자의 타입인 SharingStarted에 대해서 알아보고자 한다.

Eagerly

SharingStarted에는 3가지 companion object가 있다.(Eagerly, Lazily, WhileSubscribed) 첫번째는 Eagerly이다.

Sharing is started immediately and never stops.

구독자가 없어도 처음부터 데이터 방출을 시작해서 멈추지 않는다는 것이다. 완전히 hot stream처럼 동작하게 되는 것이다.

사용 사례는 다음과 같다:

“이 전략은 데이터 스트림이 애플리케이션의 생명주기 동안 계속 활성화되어 있어야 하고, 항상 최신 상태를 유지해야 할 때 사용됩니다. 예를 들어, 애플리케이션의 전반적인 상태를 관리하거나, 백그라운드 작업의 결과를 실시간으로 반영해야 하는 경우 적합합니다.” - GPT 4

Lazily

Sharing is started when the first subscriber appears and never stops.

Eagerly와 비슷하나 첫 구독자가 생기면 데이터 방출을 시작한다.

사용 사례는 다음과 같다:

“리소스 사용을 최소화하면서 데이터 스트림을 필요할 때만 활성화하고 싶을 때 사용합니다. 예를 들어, 사용자가 특정 기능에 접근하는 시점에만 데이터를 로딩하고 싶은 경우에 적합합니다. 이 전략은 초기화 비용이 높거나 리소스 소모가 큰 작업에 특히 유용할 수 있습니다.” - GPT 4

WhileSubscribed

Sharing is started when the first subscriber appears, immediately stops when the last subscriber disappears (by default), keeping the replay cache forever (by default).

구독자가 있을 때만 데이터를 방출한다. 리소스를 효율적으로 관리하게 되어 개인적으로 선호하고 있다.

사용 사례는 다음과 같다:

“이 전략은 UI 구성 요소가 활성 상태일 때만 데이터를 수신해야 하는 경우에 적합합니다. 예를 들어, 화면이 사용자에게 보여질 때만 실시간 데이터를 업데이트하고, 화면이 사라지면 업데이트를 중단하고 싶은 경우입니다. 선택적 지연 시간을 통해 화면 전환 중에 발생할 수 있는 잠깐의 구독 해제 상황에서도 데이터 스트림을 유지할 수 있습니다.” - GPT 4

주석에 replay cache라는 워딩이 있는데 이후 replayExpirationMillis 인자 설명에서 자세히 다루겠다.

stopTimeoutMillis

configures a delay (in milliseconds) between the disappearance of the last subscriber and the stopping of the sharing coroutine. It defaults to zero (stop immediately).

이 값을 500으로 설정하면 구독자가 모두 사라지고 0.5초 뒤에 sharing coroutine이 중단되는 것이다. 이때 sharing coroutine은 Flow를 StateFlow로 전환하는 데 쓰이는 코루틴이다.

만약 구독자가 다 사라진 뒤에 300ms 만에 새 구독자가 생긴다면 기존에 사용하던 sharing coroutine을 그대로 사용함으로써 코루틴 생성 같은 초기화 비용을 절감하게 되는 것이다.

replayExpirationMillis

replayExpirationMillis — configures a delay (in milliseconds) between the stopping of the sharing coroutine and the resetting of the replay cache (which makes the cache empty for the shareIn operator and resets the cached value to the original initialValue for the stateIn operator). It defaults to Long.MAX_VALUE (keep replay cache forever, never reset buffer). Use zero value to expire the cache immediately.

구독자가 다 제거되어도 기존의 값을 replayExpirationMillis 만큼 캐시하고 다음에 구독자가 붙으면 초기값 대신 캐시된 값을 사용한다고 한다.

실제로 동작시켜보자. 아래 코드의 대략적인 흐름은 [수집 시작 → 0.5초 후 수집 중단 → 0.5초 후 수집 시작]이다.

val _data = MutableStateFlow(1)
val data: StateFlow<Int> = _data
    .stateIn(viewModelScope, SharingStarted.WhileSubscribed(replayExpirationMillis = 0), 0)

init {
    // Collect 시작
    val job = viewModelScope.launch {
        Log.e("data", "START TO COLLECT")
        data.collectLatest {
            Log.e("data", it.toString())
        }
    }

    viewModelScope.launch {
        delay(500)
        // Collect 중단
        Log.e("data", "-----------------------------")
        Log.e("data", "CANCEL")
        job.cancel()
        delay(500)
        // Collect 시작
        Log.e("data", "-----------------------------")
        Log.e("data", "START TO COLLECT")
        data.collectLatest {
            Log.e("data", it.toString())
        }
    }
}

왼쪽 사진은 replayExpirationMillis를 무한대로 놓았을 때(해당 인자의 기본값이 무한대임), 오른쪽 사진은 replayExpirationMillis를 0으로 놓았을 때이다. 즉 캐시를 사용하지 않으니 기존의 값을 잃어버리고 stateIn에 명시한 initialValue로 돌아가게 된 것이다.