관리 메뉴

나만을 위한 블로그

[코틀린 코루틴] 16. 채널 본문

책/코틀린 코루틴

[코틀린 코루틴] 16. 채널

참깨빵위에참깨빵 2024. 3. 24. 00:54
728x90
반응형

코루틴끼리의 통신을 위한 기본적 방법으로 채널 api가 추가됐다.

채널은 송신자, 수신자 수에 제한이 없고 채널로 전송된 모든 값은 단 한 번만 받을 수 있다. 채널은 서로 다른 2개의 인터페이스를 구현한 하나의 인터페이스다.

 

  • SendChannel : 원소를 보내거나 더하거나 채널을 닫기 위해 사용
  • ReceiveChannel : 원소를 받거나 꺼낼 때 사용

 

public interface SendChannel<in E> {
    public suspend fun send(element: E)
		public fun close(cause: Throwable? = null): Boolean
}

public interface ReceiveChannel<out E> {
    public suspend fun receive(): E
    public fun cancel(cause: CancellationException? = null)
}

public interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

 

두 인터페이스는 구분돼 있고 채널의 진입점 제한을 위해 ReceiveChannel, SendChannel 중 하나만 노출시킬 수도 있다.

send, receive 모두 중단 함수다. 원소를 보내고 받는 함수가 중단 함수인 것은 필수적인 특징이다.

 

  • receive를 호출했는데 채널에 원소가 없다면 코루틴은 원소가 들어올 때까지 중단된다
  • send는 Channel의 용량이 다 찼을 때 중단된다. 대부분의 채널은 용량이 제한돼 있다

 

채널은 송신자, 수신자 수에 제한이 없지만 채널 양 끝에 각각 하나의 코루틴만 있는 경우가 일반적이다.

아래는 생성자가 원소를 보내고 소비자가 원소를 받는 상황을 구현한 예다.

 

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

suspend fun main(): Unit = coroutineScope {
    val channel = Channel<Int>()
    launch {
        repeat(5) { index ->
            delay(1000)
            println("Producing next one")
            channel.send(index * 2)
        }
    }
    launch {
        repeat(5) {
            val received = channel.receive()
            println("received : $received")
        }
    }
}

// (1초 후)
// Producing next one
// received : 0
// (1초 후)
// Producing next one
// received : 2
// (1초 후)
// Producing next one
// received : 4
// (1초 후)
// Producing next one
// received : 6
// (1초 후)
// Producing next one
// received : 8

 

위 구현 방식은 불완전하다. 수신자는 얼마나 많은 원소를 보내는지 알아야 한다.

수신자가 이런 정보를 아는 경우는 별로 없어서 송신자가 보내는 만큼 수신자가 기다리는 방식을 선호한다. 채널이 닫힐 때까지 원소를 받기 위해 for, consumeEach 함수를 쓸 수 있다.

 

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

suspend fun main(): Unit = coroutineScope {
    val channel = Channel<Int>()
    launch {
        repeat(5) { index ->
            println("Producing next one")
            delay(1000)
            channel.send(index * 2)
        }
        channel.close()
    }
    launch {
        for (element in channel) {
            println("element : $element")
        }
//        channel.consumeEach { element ->
//            println("element : $element")
//        }
    }
}

 

이 방식의 문제는 예외 발생 시 채널을 닫는 걸 잊기 쉽단 것이다. 예외로 인해 코루틴이 원소를 보내는 걸 중단하면 다른 코루틴은 원소를 영원히 기다려야 한다.

ReceiveChannel을 리턴하는 코루틴 빌더 produce 함수를 쓰는 게 더 편하다.

 

fun CoroutineScope.produceNumbers(
    max: Int
): ReceiveChannel<Int> = produce {
    var x = 0
    while (x < 5) send(x++)
}

 

produce는 빌더로 시작된 코루틴이 어떻게 종료되든 상관없이 채널을 닫기 때문에 close를 반드시 호출한다.

 

채널 타입

 

설정한 용량 크기에 따라 채널을 4개로 구분할 수 있다.

 

  • Unlimited : 제한이 없는 용량 버퍼를 가진 Channel.UNLIMITED로 설정된 채널. send가 중단되지 않음
  • Buffered : 특정 용량 크기 or Channel.BUFFERED(기본값 64)로 설정된 채널
  • Rendezvous(랑데뷰) : 용량이 0이거나 Channel.RENDEZVOUS(용량이 0)인 채널. 송신자, 수신자가 만날 때만 원소를 교환함
  • Conflated(융합) : 버퍼 크기가 1인 Channel.CONFLATED를 가진 채널. 새 원소가 이전 원소를 대체함

 

채널이 가진 용량을 예시로 확인한다. 채널에 직접 설정할 수 있지만 produce 함수로 설정할 수도 있다.

송신자는 빠르게, 수신자를 느리게 만든다. 용량이 무제한이면 채널은 모든 원소를 받고 수신자가 하나씩 가져가게 된다.

 

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay

suspend fun main(): Unit = coroutineScope {
    val channel = produce(capacity = Channel.UNLIMITED) {
        repeat(5) { index ->
            send(index * 2)
            delay(100)
            println("보냄")
        }
    }

    delay(1000)
    for (element in channel) {
        println("element : $element")
        delay(1000)
    }
}

// 보냄
// (0.1초 후)
// 보냄
// (0.1초 후)
// 보냄
// (0.1초 후)
// 보냄
// (0.1초 후)
// 보냄
// (1 - 4 * 0.1 = 0.6초 후)
// element : 0
// element : 2
// element : 4
// element : 6
// element : 8

 

정해진 크기의 용량을 가졌다면 버퍼가 다 찰 때까지 원소가 생성된다. 이후 생성자는 수신자가 원소를 소비하길 기다린다.

 

버퍼 오버플로우일 때

 

채널을 커스텀하기 위해 버퍼가 다 찼을 때(onBufferOverFlow 매개변수)의 행동을 정의할 수 있다.

 

  • SUSPEND : 기본 옵션, 버퍼가 다 차면 send()가 중단됨
  • DROP_OLDEST : 버퍼가 다 차면 가장 오래된 원소가 제거됨
  • DROP_LATEST : 버퍼가 다 차면 가장 최근 원소가 제거됨

 

채널 용량 중 Channel.CONFLATED는 용량을 1로 설정하고 onBufferOverflow를 DROP_OLDEST로 설정한 걸 알 수 있다.

현재는 produce에서 onBufferOverflow를 설정할 수 없어서 오버플로 옵션을 바꾸려면 Channel 함수를 써서 채널을 정의해야 한다.

 

import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

suspend fun main(): Unit = coroutineScope {
    val channel = Channel<Int>(
        capacity = 2,
        onBufferOverflow = BufferOverflow.DROP_OLDEST
    )
    launch {
        repeat(5) { index ->
            channel.send(index * 2)
            delay(100)
            println("보냄")
        }
        channel.close()
    }

    delay(1000)
    for (element in channel) {
        println("element : $element")
        delay(1000)
    }
}

// 보냄
// (0.1초 후)
// 보냄
// (0.1초 후)
// 보냄
// (0.1초 후)
// 보냄
// (0.1초 후)
// 보냄
// (1 - 4 * 0.1 = 0.6초 후)
// element : 6
// (1초 후)
// element : 8

 

통신의 기본 형태로서의 채널

 

채널은 서로 다른 코루틴이 통신할 때 유용하다. 충돌이 발생하지 않고 공평함을 보장한다.

여러 바리스타가 커피를 만드는 상황을 가정한다. 각 바리스타는 서로 독립적으로 작업하는 코루틴이라 할 수 있다. 커피 종류가 다르면 준비하는 데 걸리는 시간도 다르지만 주문 받은 순서대로 처리하고 싶을 수 있다.

이를 해결하는 가장 쉬운 방법은 주문을 채널로 받고 만들어진 커피를 다른 채널로 보내는 것이다.

 

suspend fun CoroutineScope.serveOrders(
    orders: ReceiveChannel<Order>,
    baristaName: String
): ReceiveChannel<CoffeeResult> = produce {
    for (order in orders) {
        val coffee = prepareCoffee(order.type)
        send(
            CoffeeResult(
                coffee = coffee,
                customer = order.customer,
                baristaName = baristaName
            )
        )
    }
}

 

반응형
Comments