Hanbit the Developer

Kotlin Documentation | Shared mutable state and concurrency 본문

Kotlin

Kotlin Documentation | Shared mutable state and concurrency

hanbikan 2023. 5. 26. 14:46

Kotlin Documentation 시리즈에 대해

Category: Official libraries - Coroutines

문서 링크: https://kotlinlang.org/docs/shared-mutable-state-and-concurrency.html


멀티 쓰레드 병렬 처리 문제는, 주로 shared mutable state 동기화에서 발생한다. 해결 방안은 멀티쓰레드에서의 그것과 유사하다.

The problem

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines 
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    println("Completed ${n * k} actions in $time ms")    
}

var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}
Completed 100000 actions in 53 ms
Counter = 96719

Counter가 100000이 아닌 값이 나오는 것은, 백 개의 코루틴이 여러 쓰레드에서 동기화 없이 값을 증가시켰기 때문이다.

Volatiles are of no help

counter에 @Volatile을 붙이면 문제가 해결된다는 오해가 있다. 하지만 volatile은 atomic read, write는 보장하지만 atomicity of larger actions을 보장하지 않기 때문에 언제나 100000을 얻지 못한다.

Thread-safe data structures

이를 처리하는 일반적인 방법은 쓰레드 세이프한 자료 구조를 사용하는 것이다.(StringBuffer, Hashtable 등)

val counter = **AtomicInteger()**

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.incrementAndGet()
        }
    }
    println("Counter = $counter")
}
Completed 100000 actions in 51 ms
Counter = 100000

이는 가장 빠른 솔루션이지만 쓰레드 세이프하지 않은 복잡한 상태나 복잡한 명령에 적용하는 것이 쉽지 않다.

Thread confinement fine-grained

Thread confinement는 하나의 쓰레드로 모든 접근은 제한하는 것이다. 이는 주로 UI 상태가 하나의 event-dispatch/application thread로 제한되는 UI 어플리케이션에서 쓰인다.

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            // confine each increment to a single-threaded context
            withContext(counterContext) {
                counter++
            }
        }
    }
    println("Counter = $counter")
}

하지만 매우 느리게 작동한다.(Completed 100000 actions in 2177 ms)

*newSingleThreadContext(): 단일 스레드를 사용하는 Dispatchers 생성

Thread confinement coarse-grained

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main() = runBlocking {
    // confine everything to a single-threaded context
    withContext(counterContext) {
        massiveRun {
            counter++
        }
    }
    println("Counter = $counter")
}

→ Completed 100000 actions in 46 ms

Mutual exclusion

절대로 동시 실행이 불가능한 임계구역을 사용할 수 있다. blocking 상황에서 synchronized 또는 ReentrantLock을 사용할 수 있다. Mutex는 lock, unlock을 통해 임계구역을 설정할 수 있다. 중요한 차이점은 lock 함수는 정지 함수라는 점이다.

withLock 확장 함수로 mutex.lock(); try { … } finally { mutex.unlock() } 패턴을 편하게 사용할 수도 있다:

val mutex = Mutex()
var counter = 0

fun main() = runBlocking {
    withContext(Dispatchers.Default) {
        massiveRun {
            // protect each increment with lock
            mutex.withLock {
                counter++
            }
        }
    }
    println("Counter = $counter")
}

해당 예제는 fine-grained이기 때문에 비용이 많이 든다. 하지만 주기적으로 shared state를 변경해야 하지만, 해당 상태를 confine할 쓰레드가 없을 때 좋은 선택이 된다.

-> Completed 100000 actions in 880 ms

Actors

actor는 코루틴, 해당 코루틴에 confine되고 캡슐화된 상태, 다른 코루틴들과 커뮤니케이션할 채널의 결합으로 만들어진다. 단순한 actor는 함수로 작성될 수 있으나 복잡한 것은 클래스로 작성된다.

액터 코루틴 빌더는, 메시지를 수신할 스코프의 액터의 메일박스 채널과 Job 객체를 결과로 보내는 송신 채널을 쉽게 결합한다. so that a single reference to the actor can be carried around as its handle.

액터가 처리할 메시지들의 클래스를 정의함으로써 액터를 사용하는 첫 스탭을 밟게 된다. sealed class가 이에 적합하다.

// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply

그 다음 액터 코루틴 빌더를 사용하여 액터를 실행하는 함수를 정의한다:

// This function launches a new counter actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // actor state
    for (msg in channel) { // iterate over incoming messages
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

메인 함수는 다음과 같다:

fun main() = runBlocking<Unit> {
    val counter = counterActor() // create the actor
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.send(IncCounter)
        }
    }
    // send a message to get a counter value from an actor
    val response = CompletableDeferred<Int>()
    counter.send(GetCounter(response))
    println("Counter = ${response.await()}")
    counter.close() // shutdown the actor
}
Completed 100000 actions in 1050 ms
Counter = 100000

액터가 실행되는 컨텍스트는 크게 중요하지 않다. 액터는 코루틴이며, 코루틴은 순차적으로 실행된다. 따라서 특정 코루틴으로 한정되는 상태 제한은 shared mutable state에 대한 솔루션으로 작용한다. 액터는 자신의 private state를 변경하지만 메시지를 통해서만 다른 것에 영향을 줄 수 있다.(락에 대한 필요성을 회피)

액터는 언제나 할 일이 있고 다른 컨텍스트로 전환되지 않기 때문에 락킹 기법보다 효율적이다.