관리 메뉴

나만을 위한 블로그

[코틀린 코루틴] 21. 플로우 만들기 본문

책/코틀린 코루틴

[코틀린 코루틴] 21. 플로우 만들기

참깨빵위에참깨빵_ 2024. 4. 13. 23:01
728x90
반응형
원시값을 갖는 플로우

 

Flow를 만드는 가장 간단한 방법은 플로우가 어떤 값을 가져야 할지 정의하는 flowOf()를 쓰는 것이다. listOf()와 비슷하다.

 

import kotlinx.coroutines.flow.flowOf

suspend fun main() {
    flowOf(1, 2, 3, 4, 5)
        .collect { println(it) }
}

// 1
// 2
// 3
// 4
// 5

 

값이 없는 Flow가 필요하면 emptyFlow()를 쓰면 된다. emptyList()와 비슷하다.

 

import kotlinx.coroutines.flow.emptyFlow

suspend fun main() {
    emptyFlow<Int>()
        .collect { println(it) }
}

// 아무것도 출력되지 않음

 

컨버터

 

asFlow()를 써서 Iterable, Iterator, Sequence를 Flow로 바꿀 수도 있다.

asFlow()는 즉시 사용 가능한 원소들의 Flow를 만들며 Flow 처리 함수를 만들어서 처리 가능한 원소들의 Flow를 만들 때 유용하다.

 

함수를 플로우로 바꾸기

 

Flow는 RxJava의 Single처럼 시간상 지연되는 하나의 값을 나타낼 때 자주 쓰인다. 따라서 중단 함수를 Flow로 바꾸는 것도 가능하다. 이 때 중단 함수의 결과가 Flow의 유일한 값이 된다. 중단 함수를 Flow로 바꾸려면 함수형의 확장 함수인 asFlow를 쓸 수 있다.

 

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.asFlow

suspend fun main() {
    val function = suspend {
        // 중담 함수를 람다식으로 만든 것
        delay(1000)
        "UserName"
    }

    function.asFlow()
        .collect { println(it) }
}

// (1초 후)
// UserName

 

플로우 빌더

 

Flow를 만들 때 가장 많이 쓰는 방법은 flow 빌더다. flow 빌더는 Sequence 빌더, produce 빌더와 비슷하게 작동한다.

빌더는 flow 함수를 먼저 호출하고 람다식 안에서 emit()을 써서 다음 값을 방출한다. Channel, Flow에서 모든 값을 방출하려면 emitAll()을 쓸 수 있다.

 

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow

fun makeFlow(): Flow<Int> = flow {
    repeat(3) { num ->
        delay(1000)
        emit(num)
    }
}

suspend fun main() {
    makeFlow()
        .collect { println(it) }
}

// (1초 후)
// 0
// (1초 후)
// 1
// (1초 후)
// 2

 

callbackFlow

 

유저의 클릭, 행동 변화를 감지해야 하는 이벤트 플로우가 필요하다고 가정한다. 감지하는 프로세스는 이벤트 처리 프로세스와 독립적이어야 해서 channelFlow를 써도 좋다. 하지만 이 경우엔 callbackFlow를 쓰는 게 더 낫다.

오랫동안 channelFlow, callbackFlow는 큰 차이가 없었지만 1.3.4 버전에서 callbackFlow가 콜백 함수를 래핑하는 방식으로 바뀌었다. callbackFlow는 ProducerScope<T>에서 작동하며 아래는 callbackFlow가 쓰이는 전형적인 방법이다.

 

fun flowFrom(api: CallbackBaseApi): Flow<T> = callbackFlow { 
    val callback = object : Callback {
        override fun onNextValue(value: T) {
            trySendBlocking(value)
        }
        
        override fun onApiError(cause: Throwable) {
            cancel(CancellationException("Api Error", cause))
        }
        
        override fun onCompleted() = channel.close()
    }
    api.register(callback)
    awaitClose { api.unregister(callback) }
}

 

반응형
Comments