관리 메뉴

나만을 위한 블로그

[코틀린 코루틴] 24. 공유 플로우, 상태 플로우 본문

책/코틀린 코루틴

[코틀린 코루틴] 24. 공유 플로우, 상태 플로우

참깨빵위에참깨빵 2024. 4. 25. 22:28
728x90
반응형

일반적으로 Flow는 콜드 데이터라 요청 시마다 값이 계산된다. 하지만 여러 수신자가 하나의 데이터가 바뀌는지 감지하는 경우도 있다. 이 때 메일링 리스트와 비슷한 SharedFlow를 쓴다. StateFlow는 감지 가능한 값과 비슷하다.

 

SharedFlow

 

SharedFlow로 메시지를 내보내면 대기 중인 모든 코루틴이 수신한다.

 

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch

suspend fun main(): Unit = coroutineScope {
    val mutableSharedFlow = MutableSharedFlow<String>(replay = 0)
    // 또는 MutableSharedFlow<String>()

    launch {
        mutableSharedFlow.collect {
            println("#1 received $it")
        }
    }

    launch {
        mutableSharedFlow.collect {
            println("#2 received $it")
        }
    }

    delay(1000)
    mutableSharedFlow.emit("Message1")
    mutableSharedFlow.emit("Message2")
}

// (1초 후)
// #1 received Message1
// #1 received Message2
// #2 received Message1
// #2 received Message2
// 프로그램이 종료되지 않음

 

MutableSharedFlow는 메시지를 보내는 작업을 유지할 수도 있다. 기본값이 0인 replay 인자를 설정하면 마지막으로 전송한 값들이 정해진 수만큼 저장된다. 코루틴이 감지를 시작하면 저장된 값들을 먼저 받는다.

resetReplayCache를 쓰면 값을 저장한 캐시를 초기화할 수 있다.

 

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.launch

suspend fun main(): Unit = coroutineScope {
    val mutableSharedFlow = MutableSharedFlow<String>(replay = 2)

    mutableSharedFlow.emit("Message1")
    mutableSharedFlow.emit("Message2")
    mutableSharedFlow.emit("Message3")

    println(mutableSharedFlow.replayCache)      // [Message2, Message3]

    launch {
        mutableSharedFlow.collect {
            println("#1 received $it")
            // #1 received Message2
            // #1 received Message3
        }
    }

    delay(100)
    mutableSharedFlow.resetReplayCache()
    println(mutableSharedFlow.replayCache)      // []
}

 

코틀린에선 감지만 하는 인터페이스, 변경하는 인터페이스를 구분하는 게 관행이다. 앞에서 SendChannel, ReceiveChannel, Channel로 구분하는 게 예시다.

MutableSharedFlow는 SharedFlow, FlowCollector 모두 상속한다. SharedFlow는 Flow를 상속하고 감지하는 목적으로 쓰고 FlowCollector는 값을 내보내기 위해 쓴다.

아래는 안드로이드에서 SharedFlow를 쓰는 예시다.

 

class UserProfileViewModel: ViewModel() {
    private val _userChanges = MutableSharedFlow<UserChange>()
    val userChanges: SharedFlow<UserChange> = _userChanges

    fun onCreate() {
        viewModelScope.launch {
            userChanges.collect(::applyUserChange)
        }
    }

    fun onNameChanged(newName: String) {
        // ...
        _userChanges.emit(NameChange(newName))
    }

    fun onPublicKeyChanged(newPublicKey: String) {
        // ...
        _userChanges.emit(PublicKeyChange(newPublicKey))
    }
}

 

shareIn

 

Flow는 유저 액션, DB 변경, 새 메시지 같은 변화를 감지할 때 주로 쓰인다.

다양한 클래스가 변화를 감지하는 상황에서 하나의 Flow로 여러 Flow를 만들려면 SharedFlow가 해결책이고 Flow를 SharedFlow로 바꾸는 가장 쉬운 방법이 shareIn 함수를 쓰는 것이다.

 

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.launch

suspend fun main(): Unit = coroutineScope {
    val flow = flowOf("A", "B", "C")
        .onEach { delay(1000) }

    val sharedFlow: SharedFlow<String> = flow.shareIn(
        scope = this,
        started = SharingStarted.Eagerly,
        // replay = 0 (default)
    )

    delay(500)

    launch {
        sharedFlow.collect { println("#1 $it") }
    }

    delay(1000)

    launch {
        sharedFlow.collect { println("#2 $it") }
    }

    delay(1000)

    launch {
        sharedFlow.collect { println("#3 $it") }
    }
}

// #1 A
// (1초 후)
// #2 B
// #1 B
// (1초 후)
// #1 C
// #2 C
// #3 C

 

shareIn은 SharedFlow를 만들고 Flow의 원소를 내보낸다. Flow의 원소를 모으는 코루틴을 시작하기 때문에 shareIn은 첫 인자로 코루틴 스코프를 받는다. 3번 인자는 기본값이 0인 replay고 2번 인자인 started는 리스너 수에 따라 값을 언제부터 감지할지 결정한다.

 

  • SharingStarted.Eagerly : 즉시 값을 감지하기 시작하고 Flow로 값을 전송한다. replay 값에 제한이 있고 감지 시작 전에 값이 나오면 일부를 유실할 수 있다. 0이면 먼저 들어온 값이 전부 유실된다

 

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.launch

suspend fun main(): Unit = coroutineScope {
    val flow = flowOf("A", "B", "C")
        .onEach { delay(1000) }

    val sharedFlow: SharedFlow<String> = flow.shareIn(
        scope = this,
        started = SharingStarted.Eagerly,
    )

    delay(100)

    launch {
        sharedFlow.collect { println("#1 $it") }
    }
    print("Done")
}

// (0.1초 후)
// Done

 

  • SharingStarted.Lazily : 첫 구독자가 나올 때 감지를 시작한다. 첫 구독자는 방출된 모든 값을 받는 게 보장되고 이후의 구독자는 replay 수만큼 가장 최근에 저장된 값을 받는다. 모든 구독자가 사라져도 업스트림 Flow는 active 상태지만 구독자가 없으면 replay 수만큼 가장 최신 값들만 캐싱한다

 

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.launch

suspend fun main(): Unit = coroutineScope {
    val flow = flowOf("A", "B", "C")
    val flow2 = flowOf("D")
        .onEach { delay(1000) }

    val sharedFlow = merge(flow, flow2).shareIn(
        scope = this,
        started = SharingStarted.Lazily,
    )

    delay(100)
    launch {
        sharedFlow.collect { println("#1 $it") }
    }

    delay(1000)
    launch {
        sharedFlow.collect { println("#2 $it") }
    }
}

// (0.1초 후)
// #1 A
// #1 B
// #1 C
// (1초 후)
// #2 D
// #1 D

 

  • WhileSubscribed() : 첫 구독자가 나올 때 감지하기 시작하고 마지막 구독자가 사라지면 Flow도 멈춘다. SharedFlow가 멈췄을 때 새 구독자가 나오면 Flow가 재시작된다. 기본값은 0이고 마지막 구독자가 사라진 뒤 감지할 시간을 나타내는 stopTimeoutMillis, replayExpirationMillis 파라미터를 추가로 갖고 있다

 

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch

suspend fun main(): Unit = coroutineScope {
    val flow = flowOf("A", "B", "C", "D")
        .onStart { println("Started") }
        .onCompletion { println("Finished") }
        .onEach { delay(1000) }

    val sharedFlow = flow.shareIn(
        scope = this,
        started = SharingStarted.WhileSubscribed(),
    )

    delay(3000)
    launch {
        println("#1 ${sharedFlow.first()}")
    }

    launch {
        println("#2 ${sharedFlow.take(2).toList()}")
    }

    delay(3000)
    launch {
        println("#3 ${sharedFlow.first()}")
    }
}

// (3초 후)
// Started
// (1초 후)
// #1 A
// (2초 후)
// #2 [A, B]
// Finished
// (1초 후)
// Started
// (1초 후)
// #3 A
// Finished

 

같은 변화를 감지하려는 서비스가 여럿일 때 shareIn을 쓰면 편하다. 저장소가 시간이 지나면서 어떻게 변하는지 감지하고 싶을 때가 있다.

여러 서비스가 위치에 의존한다면 각 서비스가 DB를 독자적으로 감지하는 건 최적화된 방법이 아니다. 대신 이런 변화를 감지하고 SharedFlow를 통해 감지된 변화를 공유하는 서비스를 만들 수 있는데 이 때 SharedFlow를 쓴다.

 

StateFlow

 

StateFlow는 SharedFlow의 개념을 확장시킨 것으로 replay 값이 1인 SharedFlow와 비슷하게 작동한다.

StateFlow는 value 프로퍼티로 접근 가능한 값 하나를 갖고 있다.

초기값은 생성자로 전달돼야 한다. value 프로퍼티로 값을 얻어올 수도 있고 설정할 수도 있다.

 

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.launch

suspend fun main(): Unit = coroutineScope {
    val state = MutableStateFlow("A")
    println(state.value) // A
    launch {
        state.collect { println("Value changed to $it") }
        // Value changed to A
    }

    delay(1000)
    state.value = "B" // Value changed to B

    delay(1000)
    launch {
        state.collect { println("and now it is $it") }
        // and now it is B
    }

    delay(1000)
    state.value = "C" // Value changed to C -> and now it is C
}

 

안드로이드에선 LiveData를 대체하는 최신 방식으로 쓰인다. 코루틴을 완벽 지원하고 2번째로 초기값을 가져서 null일 필요가 없다.

따라서 StateFlow는 뷰모델에서 상태를 나타낼 때 주로 쓰인다. StateFlow의 상태를 감지할 수 있고 감지된 상태에 따라 뷰가 보여지고 갱신된다.

 

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
): ViewModel() {
    private val _uiState = MutableStateFlow<NewsState>(LoadingNews)
    val uiState: StateFlow<NewsState> = _uiState
    
    fun onCreate() {
        scope.launch {
            _uiState.value = NewsLoaded(newsRepository.getNews())
        }
    }
}

 

StateFlow는 데이터가 덮어씌워지기 때문에 관찰이 느린 경우 상태의 중간 변화를 받을 수 없는 경우도 있다.

모든 이벤트를 다 받으려면 SharedFlow를 써야 한다. 현재 상태만 갖는 특성은 설계적으로 의도된 것이다. StateFlow는 현재 상태만 나타내서 이전 상태에는 관심없다.

 

stateIn

 

Flow<T>를 StateFlow<T>로 바꾸는 함수다. 스코프에서만 호출 가능하지만 중단 함수기도 하다.

StateFlow는 항상 값을 가져야 해서 값을 명시하지 않은 경우 첫 값이 계산될 때까지 기다려야 한다.

 

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.stateIn

suspend fun main(): Unit = coroutineScope {
    val flow = flowOf("A", "B", "C")
        .onEach { delay(1000) }
        .onEach { println("Produced $it") }
    val stateFlow: StateFlow<String> = flow.stateIn(this)

    println("Listening")
    println(stateFlow.value)
    stateFlow.collect { println("Received $it") }
}

// (1초 후)
// Produced A
// Listening
// A
// Received A
// (1초 후)
// Produced B
// Received B
// (1초 후)
// Produced C
// Received C

 

stateIn의 2번째 형태는 중단 함수가 아니지만 초기값, started 모드를 지정해야 한다. started는 shareIn과 같은 옵션이다.

stateIn은 하나의 데이터 소스에서 값이 바뀐 걸 감지할 경우 주로 쓴다. StateFlow로 상태를 바꿀 수 잇고 결국 뷰가 변화를 감지할 수 있게 된다.

 

class LocationsViewModel(
    private val locationService: LocationService
): ViewModel() {
    private val location = locationService.observeLocations()
        .map { it.toLocationsDisplay() }
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.Lazily,
            initialValue = emptyList()
        )
}
반응형
Comments