관리 메뉴

나만을 위한 블로그

[코틀린 코루틴] 23. 플로우 처리 본문

책/코틀린 코루틴

[코틀린 코루틴] 23. 플로우 처리

참깨빵위에참깨빵_ 2024. 4. 22. 00:03
728x90
반응형
map

 

Flow의 각 원소를 변환 함수에 따라 변환한다. 숫자들을 Flow로 갖고 있고 각 수의 제곱을 계산하는 연산이라면 생성된 Flow는 이 수들의 제곱을 값으로 갖는다.

 

import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map

suspend fun main() {
    flowOf(1, 2, 3) // [1, 2, 3]
        .map { it * it }      // [1, 4, 9]
        .collect { print(it) }// 149
}

 

map을 구현하려면 flow 빌더를 써서 새 Flow를 만들면 된다. 그리고 만들어진 Flow에서 원소들을 모으고 변형된 원소들을 하나씩 내보내면 된다.

map은 값을 꺼내거나 다른 형태로 바꾸는 등으로 사용할 수 있다.

 

// 입력 이벤트로부터 사용자 액션을 받음
fun actionsFlow(): Flow<UserAction> = observeInputEvents().map { toAction(it.code) }

// User를 UserJson으로 변환
fun getAllUser(): Flow<UserJson> = userRepository.getAllUsers()
    .map { it.toUserJson() }

 

filter

 

원래 Flow에서 주어진 조건에 맞는 값들만 가진 Flow를 리턴한다.

 

import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.filter

suspend fun main() {
    (1 .. 10).asFlow()   // [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
        .filter { it <= 5 }     // [1, 2, 3, 4, 5]
        .filter { isEven(it) }  // [2, 4]
        .collect { print(it) }  // 24
}

fun isEven(num: Int): Boolean = num % 2 == 0

 

take, drop

 

특정 수의 원소만 통과시키기 위해 take를 쓸 수 있다.

 

import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.take

suspend fun main() {
    ('A' .. 'Z').asFlow()
        .take(5)         // [A, B, C, D, E]
        .collect { print(it) }  // ABCDE
}

 

drop을 쓰면 특정 수의 원소를 무시할 수 있다.

 

import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.drop

suspend fun main() {
    ('A' .. 'Z').asFlow()
        .drop(20)         // [U, V, W, X, Y, Z]
        .collect { print(it) }  // UVWXYZ
}

 

merge, zip, combine

 

두 Flow를 하나의 Flow로 합치는 법은 여러 가지가 있다. 가장 간단한 방법은 두 Flow에서 생성된 원소들을 하나로 합치는 것이다. 어떤 변경도 필요없고 Flow의 원소가 어디서 왔는지도 중요하지 않다. 최상위 레벨 함수인 merge를 써서 이런 과정을 수행할 수 있다.

 

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.toList

suspend fun main() {
    val ints: Flow<Int> = flowOf(1, 2, 3)
    val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)

    val together: Flow<Number> = merge(ints, doubles)
    print(together.toList())
}

// [1, 2, 3, 0.1, 0.2, 0.3]

 

merge를 쓰면 한 Flow의 원소가 다른 Flow를 기다리지 않는다. merge는 여러 이벤트를 똑같은 방법으로 처리할 때 사용할 수 있다.

zip은 두 Flow로부터 쌍을 만든다. 각 원소는 한 쌍의 일부가 되므로 쌍이 될 원소를 기다려야 한다. 쌍을 이루지 못하고 남은 원소는 유실되므로 한 Flow에서 zipping이 완료되면 생성되는 Flow도 완료된다. 다른 Flow도 마찬가지다.

 

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.zip

suspend fun main() {
    val flow1 = flowOf("A", "B", "C")
        .onEach { delay(400) }
    val flow2 = flowOf(1, 2, 3, 4)
        .onEach { delay(1000) }
    flow1.zip(flow2) { f1, f2 -> "${f1}_${f2}" }
        .collect { println(it) }
}

// (1초 후)
// A_1
// (1초 후)
// B_2
// (1초 후)
// C_3

 

combine은 zip처럼 원소들로 쌍을 형성하기 때문에 첫 번째 쌍을 만들기 위해 느린 Flow를 기다려야 한다. combine을 쓰면 모든 새 원소가 전입자를 대체한다. 첫 번째 쌍이 이미 만들어졌다면 다른 Flow의 이전 원소와 함께 새 쌍이 만들어진다.

zip은 쌍을 필요로 하기 때문에 첫 Flow가 닫히면 함수도 끝난다. combine은 그런 제한이 없어서 두 Flow가 모두 닫힐 때까지 원소를 내보낸다.

 

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onEach

suspend fun main() {
    val flow1 = flowOf("A", "B", "C")
        .onEach { delay(400) }
    val flow2 = flowOf(1, 2, 3, 4)
        .onEach { delay(1000) }
    flow1.combine(flow2) { f1, f2 -> "${f1}_${f2}" }
        .collect { println(it) }
}

// (1초 후)
// B_1
// (0.2초 후)
// C_1
// (0.8초 후)
// C_2
// (1초 후)
// C_3
// (1초 후)
// C_4

 

fold, scan

 

fold는 초기값부터 시작해서 주어진 원소 각각에 대해 두 값을 하나로 합치는 연산을 적용해서 컬렉션의 모든 값을 하나로 합친다. 첫 값은 0이고 덧셈 연산을 할 때 결과값은 모든 수를 더한 값이 된다.

초기값인 0을 먼저 가져오고 첫 원소인 1을 더한다. 결과값인 1에 두 번째 수인 2를 더하고, 결과값인 3에 세 번째 수인 3을 더해서 최종 결과값인 10이 fold로부터 리턴된다.

 

fun main() {
    val list = listOf(1, 2, 3, 4)
    val res = list.fold(0) { acc, i -> acc + i }
    println(res)

    val res2 = list.fold(1) { acc, i -> acc * i }
    println(res2)
}

// 10
// 24

 

fold 대신 scan을 쓸 수도 있다. scan은 누적되는 과정의 모든 값을 생성하는 중간 연산이다.

scan은 이전 단계에서 값을 받은 즉시 새 값을 만들기 때문에 Flow에서 유용하게 쓰인다.

 

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.scan

suspend fun main() {
    flowOf(1, 2, 3, 4)
        .onEach { delay(1000) }
        .scan(0) { acc, v -> acc + v }
        .collect { println(it) }
}

// 0
// (1초 후)
// 1
// (1초 후)
// 3
// (1초 후)
// 6
// (1초 후)
// 10

 

flatMapConcat, flatMapMerge, flatMapLatest

 

 

flatMap은 map과 비슷하지만 변환 함수가 평탄화된 컬렉션을 리턴한다는 게 다르다.

부서 목록이 있고 각 부서가 사원 리스트를 가졌다면 flatMap을 써서 전체 부서의 사원 리스트를 만들 수 있다.

 

val allEmployees: List<Employee> = departments
    .flatMap { department -> department.employees }

// 맵을 쓰면 리스트의 리스트를 얻는다
val listOfListsOfEmployee: List<List<Employee>> = departments
    .map { department -> department.employees }

 

flatMapConcat은 생성된 Flow를 하나씩 처리한다. 그래서 2번째 플로우는 첫 플로우가 완료됐을 때 시작 가능하다.

아래 예에서 각 알파벳이 만드는 Flow는 알파벳과 함께 1~3을 가지며 생성 주기는 1초다.

 

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flatMapConcat
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach

fun flowFrom(elem: String) = flowOf(1, 2, 3)
    .onEach { delay(1000) }
    .map { "${it}_${elem}" }

suspend fun main() {
    flowOf("A", "B", "C")
        .flatMapConcat { flowFrom(it) }
        .collect { println(it) }
}

// (1초 후)
// 1_A
// (1초 후)
// 2_A
// (1초 후)
// 3_A
// (1초 후)
// 1_B
// (1초 후)
// 2_B
// (1초 후)
// 3_B
// (1초 후)
// 1_C
// (1초 후)
// 2_C
// (1초 후)
// 3_C

 

flatMapMerge는 만들어진 Flow를 동시에 처리한다.

 

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach

fun flowFrom(elem: String) = flowOf(1, 2, 3)
    .onEach { delay(1000) }
    .map { "${it}_${elem}" }

suspend fun main() {
    flowOf("A", "B", "C")
        .flatMapMerge { flowFrom(it) }
        .collect { println(it) }
}

// (1초 후)
// 1_A
// 1_B
// 1_C
// (1초 후)
// 2_C
// 2_B
// 2_A
// (1초 후)
// 3_C
// 3_B
// 3_A

 

concurrency 인자를 써서 동시 처리 가능한 Flow 개수를 설정할 수 있다. 기본값은 16이지만 DEFAULT_CONCURRENCY_PROPERTY_NAME 프로퍼티로 바꿀 수 있다.

 

import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach

fun flowFrom(elem: String) = flowOf(1, 2, 3)
    .onEach { delay(1000) }
    .map { "${it}_${elem}" }

suspend fun main() {
    flowOf("A", "B", "C")
        .flatMapMerge(concurrency = 2) { flowFrom(it) }
        .collect { println(it) }
}

// (1초 후)
// 1_A
// 1_B
// (1초 후)
// 2_C
// 2_B
// (1초 후)
// 3_C
// 3_B
// (1초 후)
// 1_C
// (1초 후)
// 2_C
// (1초 후)
// 3_C

 

flatMapMerge를 쓰면 2가지 이점이 있다.

 

  • 동시성 인자를 제어하고 같은 시간에 얼마만큼의 종류를 처리할지 결정할 수 있다
  • Flow를 리턴해서 데이터가 생성될 때마다 다음 원소를 내보낼 수 있다. 함수 사용 측면에서 보면 데이터를 즉시 처리할 수 있다

 

suspend fun getOffers(categories: List<Category>): Flow<Offer> = coroutineScope { 
    categories.map { async { api.requestOffers(it) } }
        .flatMap { it.await() }
}

// 더 나은 방법
suspend fun getOffers(categories: List<Category>): Flow<Offer> = categories.asFlow()
    .flatMapMerge(concurrency = 20) {
        suspend { api.requestOffers(it) }.asFlow()
        // 또는 flow { emit(api.requestOffers(it)) } 
    }

 

재시도(retry)

 

예외는 Flow를 따라 흐르면서 각 단계를 하나씩 종료한다. 종료된 단계는 비활성화되기 때문에 예외가 발생한 뒤 메시지 보내기는 불가능하지만 각 단계가 이전 단계에 대한 참조를 갖고 있으며 Flow를 재시작하기 위해 참조를 쓸 수 있다.

이 원리에 기반해서 코틀린은 retry, retryWhen 함수를 제공한다.

retryWhen은 Flow의 이전 단계에서 예외가 발생할 때마다 predicate를 확인한다. predicate는 예외가 무시되고 이전 단계가 재시작돼야 하는지  또는 Flow를 계속 종료해야 하는지를 정한다.

대부분 몇 번까지 재시도할지, 네트워크 연결 예외 같은 특정 예외 클래스가 발생했을 때만 처리할지를 명시한다. 아래는 retry 사용법이다.

 

suspend fun main() {
    flow {
        emit(1)
        emit(2)
        error("E")
        emit(3)
    }.retry(retries = 3) {
        print(it.message)
        true
    }.collect { print(it) }
}

// 12E12E12E12Exception in thread "main" java.lang.IllegalStateException: E

 

중복 제거 함수

 

반복되는 원소가 동일하다고 판단되면 제거하는 distinctUntilChanged 함수도 유용하다. 이 함수는 바로 이전 원소와 동일한 원소만 제거한다.

 

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf

suspend fun main() {
    flowOf(1, 2, 2, 3, 2, 1, 1, 3)
        .distinctUntilChanged()
        .collect { print(it) } // 123213
}

// distinctUntilChanged의 구현
fun <T> Flow<T>.distinctUntilChanged(): Flow<T> = flow {
    var previous: Any? = NOT_SET
    collect {
        if (previous == NOT_SET || previous != it) {
            emit(it)
            previous = it
        }
    }
}

private val NOT_SET = Any()

 

반응형
Comments