관리 메뉴

나만을 위한 블로그

[코틀린 코루틴] 19. 플로우란 무엇인가 본문

책/코틀린 코루틴

[코틀린 코루틴] 19. 플로우란 무엇인가

참깨빵위에참깨빵 2024. 4. 13. 00:08
728x90
반응형

Flow는 비동기적으로 계산해야 할 값의 스트림을 나타낸다. Flow 인터페이스 자체는 떠다니는 원소들을 모으고 Flow의 끝에 도달할 때까지 각 값을 처리한다.

Flow의 collect는 컬렉션의 forEach와 비슷하다. 이 함수는 Flow의 유일한 멤버 함수고 다른 함수들은 확장 함수로 정의돼 있다. iterator만 멤버 함수로 가진 Iterable 또는 Sequence와 비슷하다.

 

플로우와 값들을 나타내는 다른 방법들 비교

 

Flow 개념은 RxJava, Reactor를 쓴다면 잘 알겠지만 그 외의 사람들에겐 아니다. 여러 값을 리턴하는 함수가 필요하다고 가정한다. 한 번에 모든 값을 만들 때는 List, Set 같은 컬렉션을 쓴다.

 

fun allUsers(): List<User> = api.getAllUsers().map { it.toUser() }

 

List, Set이 모든 원소의 계산이 완료된 컬렉션이란 걸 명심하라. 값들을 계산하는 과정에 시간이 걸려서 원소들이 채워질 때까지 모든 값이 생성되는 걸 기다려야 한다.

원소를 하나씩 계산할 때는 원소가 나오자마자 바로 얻을 수 있는 게 낫다. Sequence를 쓰는 게 한 가지 방법이다.

Sequence는 복잡한 결과값을 계산하는 등 CPU 집약적 연산 or 파일 읽기 등의 블로킹 연산을 수행할 때 필요할 때마다 값을 계산하는 Flow를 나타내기에 적절하다.

forEach 같은 Sequence의 최종 연산은 중단 함수가 아니라서 Sequence 빌더 내부에 중단점이 있다면 값을 기다리는 쓰레드가 블로킹된다. 따라서 Sequence 빌더의 스코프에선 SequenceScope의 리시버에서 호출되는 함수(yield, yieldAll) 외의 다른 중단 함수는 쓸 수 없다.

 

fun getSequence(): Sequence<String> = sequence {
    repeat(3) {
        delay(1000) // 컴파일 에러 발생
        yield("User $it")
    }
}

 

 

위 예시는 컴파일되더라도 forEach 같은 최종 연산이 코루틴을 중단시키는 대신 쓰레드를 블로킹해서 생각 못한 쓰레드 블로킹이 발생할 수 있다.

Sequence는 데이터 소스 개수가 많거나 원소가 무거운 경우, 원소를 필요할 때만 계산하거나 읽는 지연 연산을 수행하는 상황에서 사용하는 게 낫다.

 

아래 예시에선 같은 쓰레드에서 launch로 시작된 코루틴이 대기하며 한 코루틴이 다른 코루틴을 블로킹하게 된다.

 

import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.withContext

fun getSequence(): Sequence<String> = sequence {
    repeat(3) {
        Thread.sleep(1000)
        // 여기에 delay(1000)을 쓰는 것과 같은 결과
        yield("User $it")
    }
}

suspend fun main() {
    withContext(newSingleThreadContext("main")) {
        launch {
            repeat(3) {
                delay(100)
                println("Processing on coroutine")
            }
        }

        val list = getSequence()
        list.forEach { println(it) }
    }
}

// (1초 후)
// User 0
// (1초 후)
// User 1
// (1초 후)
// User 2
// Processing on coroutine
// (0.1초 후)
// Processing on coroutine
// (0.1초 후)
// Processing on coroutine

 

 

이 상황에서 Sequence 대신 Flow를 써야 한다. Flow를 쓰면 코루틴이 연산 수행에 필요한 기능을 모두 쓸 수 있다.

Flow의 빌더, 연산은 중단 함수고 구조화된 동시성, 예외처리를 지원한다. Flow는 코루틴을 써야 하는 데이터 스트림으로 사용돼야 한다.

 

플로우의 특징

 

Flow의 최종 연산은 쓰레드를 블로킹하는 대신 코루틴을 중단시킨다. Flow는 코루틴 컨텍스트를 활용하고 예외처리 등의 코루틴 기능도 제공한다.

Flow 처리는 취소 가능하고 구조화된 동시성을 기본적으로 갖고 있다. 아래 예시는 CoroutineName 컨텍스트가 collect에서 flow 빌더의 람다식으로 전달되는 걸 보여준다.

 

import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext

// flow 빌더는 중단 함수가 아니라서 CoroutineScope가 불필요
fun usersFlow(): Flow<String> = flow {
    repeat(3) {
        delay(1000)
        val ctx = currentCoroutineContext()
        val name = ctx[CoroutineName]?.name
        emit("User $it in $name")
    }
}

suspend fun main() {
    val users = usersFlow()
    withContext(CoroutineName("Name")) {
        val job = launch {
            users.collect { println(it) }
        }

        launch {
            delay(2100)
            println("I got enough")
            job.cancel()
        }
    }
}

// (1초 후)
// User 0 in Name
// (1초 후)
// User 1 in Name
// (0.1초 후)
// I got enough

 

사용 예

 

현업에선 채널보다 플로우가 필요한 경우가 더 많다. 데이터 스트림을 사용하면 대부분은 데이터가 필요할 때마다 요청한다. DB 변경, UI 위젯 변화 또는 센서 같이 이벤트를 감지해야 한다면 감지하는 모듈 각각이 이벤트를 받는다. 감지하는 모듈이 없다면 이벤트 받기를 멈춰야 한다. 따라서 이런 상황에선 채널보다 Flow가 낫다. 몇몇 경우엔 둘을 섞어 쓰기도 한다.

Flow가 쓰이는 전형적 예는 아래와 같다.

 

  • 웹소켓, RSocket 알림 같이 서버가 보낸 이벤트를 통해 전달된 메시지를 받을 때
  • 텍스트 입력, 클릭 같은 유저 액션이 감지된 경우
  • 센서, 위치, 지도 같은 기기의 정보 변경을 받는 경우
  • DB 변경 감지
반응형
Comments