관리 메뉴

나만을 위한 블로그

[코틀린 코루틴] 18. 핫 / 콜드 데이터 소스 본문

책/코틀린 코루틴

[코틀린 코루틴] 18. 핫 / 콜드 데이터 소스

참깨빵위에참깨빵 2024. 4. 9. 01:14
728x90
반응형

채널은 값을 핫 스트림으로 갖지만 콜드 스트림이 필요할 때가 있다. 우리가 쓰는 대부분의 소스는 2종류로 구분할 수 있어서 핫 / 콜드 스트림 데이터 차이를 이해하는 게 소프트웨어 측면에서 유용하다. List, Set 등의 컬렉션은 핫이고 Sequence, 자바 스트림은 콜드다. 채널은 핫이지만 Flow, Observable, Single 같은 RxJava 스트림은 콜드다.

 

핫 vs 콜드

 

핫 데이터 스트림은 열정적이라 데이터 소비와 무관하게 원소를 생성하지만 콜드 데이터 스트림은 게을러서 요청이 있을 때만 작업하고 아무것도 하지 않는다.

핫인 리스트, 콜드인 시퀀스를 쓸 때 그 차이가 나온다. 핫 데이터 스트림의 빌더와 연산은 즉시 실행된다. 콜드 데이터 스트림에선 원소가 필요할 때까지 실행되지 않는다.

 

fun main() {
    val l = buildList {
        repeat(3) {
            add("User $it")
            println("L : Added User")
        }
    }

    val l2 = l.map {
        println("L : Processing")
        "Processed $it"
    }

    val s = sequence {
        repeat(3) {
            yield("User $it")
            println("S : Added User")
        }
    }

    val s2 = s.map {
        println("S : Processing")
        "Processed $it"
    }
}

// L : Added User
// L : Added User
// L : Added User
// L : Processing
// L : Processing
// L : Processing

 

결과 시퀀스, 스트림, Flow 같은 콜드 데이터 스트림은

 

  • 무한할 수 있다
  • 최소한의 연산만 수행한다
  • 중간에 생성되는 값들을 보관할 필요가 없어서 메모리를 적게 쓴다

 

시퀀스는 원소를 지연 처리해서 더 적은 연산을 수행한다. 작동 방식은 map, filter 등 중간 연산은 이전에 만든 시퀀스에 새 연산을 추가할 뿐이다. 최종 연산이 모든 작업을 실행한다.

아래 예에서 시퀀스의 경우 find는 map의 결과에서 1번째 원소를 달라고 한다. sequenceOf에서 리턴된 시퀀스에 쿼리하고 map을 수행한 뒤 filter로 넘긴다. filter는 주어진 원소가 요구사항에 부합하는지 확인한다. 요구사항을 만족하지 못하면 filter는 적절한 원소를 찾을 때까지 계속 질의한다.

 

fun m(i: Int): Int {
    print("m$i ")
    return i * i
}

fun f(i: Int): Boolean {
    print("f$i ")
    return i >= 10
}

fun main() {
    listOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .map { m(it) }
        .find { f(it) }
        .let { println(it) }
    println()

    sequenceOf(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .map { m(it) }
        .find { f(it) }
        .let { print(it) }
}
// m1 m2 m3 m4 m5 m6 m7 m8 m9 m10 f1 f4 f9 f16 16
//
// m1 f1 m2 f4 m3 f9 m4 f16 16

 

리스트는 원소의 컬렉션이지만 시퀀스는 원소를 어떻게 계산할지 정의한 것이다. 핫 데이터 스트림은

 

  • 항상 사용 가능한 상태(각 연산이 최종 연산이 될 수 있음)
  • 여러 번 썼을 때 매번 결과를 다시 계산해야 함

 

자바의 스트림은 코틀린 시퀀스와 비슷한 특징을 갖고 있다. 모두 콜드 스트림 데이터다.

 

핫 채널, 콜드 플로우

 

Flow를 만드는 일반적인 방법은 produce 함수와 비슷한 형태의 빌더를 쓰는 것인데 이 빌더가 flow다.

 

fun main() = runBlocking {
    val channel = produce {
        while (true) {
            val x = computeNextValue()
            send(x)
        }
    }
    
    val flow = flow {
        while (true) {
            val x = computeNextValue()
            emit(x)
        }
    }
}

 

채널은 핫 데이터 스트림이라서 소비되는 것과 상관없이 값을 생성한 뒤에 갖게 된다. 수신자가 얼마나 많은지는 신경쓰지 않는다. 각 원소는 1번만 받을 수 있어서 첫 수신자가 모든 원소를 소비하면 2번째 소비자는 채널이 비어 있고 이미 닫힌 걸 알게 된다. 따라서 2번째 소비자는 어떤 원소도 받을 수 없다.

 

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay

private fun CoroutineScope.makeChannel() = produce {
    println("Channel started")
    for (i in 1 .. 3) {
        delay(1000)
        send(i)
    }
}

suspend fun main() = coroutineScope {
    val channel = makeChannel()

    delay(1000)
    println("Calling channel...")
    for (value in channel) {
        println(value)
    }

    println("Consuming again...")
    for (value in channel) {
        println(value)
    }
}

// Channel started
// (1초 후)
// Calling channel...
// 1
// (1초 후)
// 2
// (1초 후)
// 3
// Consuming again...

 

플로우는 콜드 데이터 스트림이라 값이 필요할 때만 생성한다. 따라서 flow는 빌더가 아니고 어떤 처리도 하지 않는다.

flow는 그저 collect 같은 최종 연산이 호출될 때 어떻게 생성돼야 하는지 정의한 것이다. 그래서 flow 빌더는 CoroutineScope가 필요하다.

flow 빌더는 빌더를 호출한 최종 연산의 스코프에서 실행되고, Flow의 각 최종 연산은 처음부터 데이터를 처리하기 시작한다.

 

import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flow

private fun makeFlow() = flow {
    println("Flow started")
    for (i in 1 .. 3) {
        delay(1000)
        emit(i)
    }
}

suspend fun main() = coroutineScope {
    val flow = makeFlow()
    delay(1000)
    println("Calling flow...")
    flow.collect { value -> println(value) }
    println("Consuming again...")
    flow.collect { value -> println(value) }
}

// (1초 후)
// Calling flow...
// Flow started
// (1초 후)
// 1
// (1초 후)
// 2
// (1초 후)
// 3
// Consuming again...
// Flow started
// (1초 후)
// 1
// (1초 후)
// 2
// (1초 후)
// 3

 

반응형
Comments