관리 메뉴

나만을 위한 블로그

[Android] 코루틴 채널(Channel)이란? 본문

Android

[Android] 코루틴 채널(Channel)이란?

참깨빵위에참깨빵 2023. 6. 29. 22:48
728x90
반응형

코루틴으로 데이터를 처리, 전달할 때 Flow, Channel이라는 두 개념을 사용할 수 있다. 이 포스팅에선 Channel에 대해 확인한다. 채널이 뭔지 공식문서부터 확인한다.

 

https://kotlinlang.org/docs/channels.html

 

Channels | Kotlin

 

kotlinlang.org

채널은 개념적으로 BlockingQueue와 매우 유사하다. 중요한 차이점은 차단하는 넣기(put) 작업 대신 일시중지하는 보내기(suspending send)가 있고 차단하는 take 작업 대신 일시중지하는 수신(suspending receive)이 있다는 것이다
val channel = Channel<Int>()
launch {
    // CPU를 많이 쓰는 계산 or 비동기 로직이라 생각하고 사각형 5개만 사용
    for (x in 1..5) channel.send(x * x)
}
// 받은 정수 5개 출력
repeat(5) { println(channel.receive()) }
println("Done!")

// 1
// 4
// 9
// 16
// 25
// Done!
Queue와 다르게 채널은 더 이상 들어오는 요소가 없음을 나타내기 위해 닫힐 수 있다. 받는 측에선 일반 for 루프를 써서 채널에서 요소를 받는 게 편하다. 개념적으로 close는 채널에 특별한 close token을 보내는 것과 같다. close token이 수신되는 즉시 반복이 중지되므로 닫기 이전에 전송된 모든 요소를 받는 것이 보장된다...(중략)

 

https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/

 

Channel

Channel is a non-blocking primitive for communication between a sender (via SendChannel) and a receiver (via ReceiveChannel). Conceptually, a channel is similar to Java's java.util.concurrent.BlockingQueue, but it has suspending operations instead of block

kotlinlang.org

채널은 송신자(SendChannel을 통함)와 수신자(ReceiveChannel) 간의 통신을 위한 non-blocking primitive다. 개념적으로 채널은 자바의 BlockingQueue와 유사하지만 blocking 작업 대신 suspending operation이 있으며 닫을 수 있다...(중략)

 

아는 만큼 보인다지만 대부분의 공식문서는 봐도 뭐라는지 모르겠다. 하지만 아예 정보를 얻지 못한 것은 아니다.

 

  • 채널은 개념적으로 자바의 BlockingQueue와 유사한 형태다
  • 코루틴 기반이기 때문에 suspend 형태로 보내기, 받기가 가능하다

 

이제 다른 사람들은 어떻게 설명하는지 확인해 본다.

 

https://kt.academy/article/cc-channel

 

Channel in Kotlin Coroutines

What a channel is, how it works, and where we can use it.

kt.academy

Channel API는 코루틴 간 통신 프리미티브로 추가됐다. 많은 사람들이 채널을 파이프로 상상하지만 난 다른 비유를 좋아한다. 책 교환을 위한 공용 책장을 아는가? 한 사람은 다른 사람이 책을 찾을 수 있게 책을 가져와야 한다. 이는 채널이 작동하는 방식과 매우 유사하다. 채널은 모든 발신자, 수신자를 지원하며 채널로 전송되는 모든 값은 한 번만 수신된다

채널은 2개의 다른 인터페이스를 구현하는 인터페이스다

- SendChannel : 요소를 전송하고 채널을 닫는 데 사용
- ReceiveChannel : 요소를 받는(가져오는) 데 사용

수신을 시도할 때 채널에 요소가 없으면 요소를 사용 가능할 때까지 코루틴이 일시 중지된다. 책장과 같이 누가 책을 찾으러 책장에 갔지만 비어 있다면 가져간 사람이 거기에 요소를 둘 때까지 그 사람은 정지해야 한다. 반면 채널 용량이 다 차면 전송이 일시 중단된다. 책장에 누가 책을 선반에 올려 놓으려는데 가득 차면 누군가 책을 집어들고 공간을 만들 때까지 멈춰 있어야 한다
채널은 발신자, 수신자가 몇 개든 있을 수 있다. 그러나 가장 일반적인 상황은 채널 양쪽에 하나의 코루틴이 있는 경우다

https://www.kodeco.com/books/kotlin-coroutines-by-tutorials/v2.0/chapters/11-channels

 

Kotlin Coroutines by Tutorials, Chapter 11: Channels

Although experimental, channels are a very important API you can use with coroutines. In this chapter, you’ll create examples to understand what a channel is and how to act as a producer or consumer for it synchronously and asynchronously. You’ll under

www.kodeco.com

채널은 코루틴 간에 값 스트림을 전송하는 데 쓸 수 있는 간단한 추상화다. 컨텐츠를 수신하는 목적지로 컨텐츠를 전송하는 소스를 생각하라. 즉 요소는 생산자 코루틴에 의해 채널로 전송되고 소비자 코루틴에 의해 수신된다. 기본적으로 채널은 데이터를 비동기적으로 보내고 작동하는 BlockingQueue와 같다
채널의 기본 속성이자 이해해야 하는 중요한 개념은 채널이 버퍼에 포함할 수 있는 최대 요소 수를 정의하는 용량(capacity)이다. 용량이 N인 채널이 있다고 가정한다. 생산자는 채널에 값을 보낼 수 있지만 채널이 N 용량에 도달하면 생산자는 소비자가 같은 채널에서 읽기 시작할 때까지 일시 중단한다. 생산과 소비가 서로 다른 시간이 소요되는 작업인 경우 성능을 최적화하는 방법이다...(중략)...생산자 채널은 데이터가 필요한 소비자 채널이 있을 때까지 아무것도 생산하지 않는다. 기본적으로 버퍼가 없다. 요소는 생산자의 보내기 호출과 소비자의 수신 호출이 시간 내에 만날 때에만(랑데뷰) 생산자에서 소비자로 전송된다. 이 때문에 send()는 다른 코루틴이 receive()를 호출할 때까지 일시 중단되고 다른 코루틴이 send()를 호출할 때까지 receive()가 일시 중단된다. 이것이 랑데뷰란 이름이 붙은 이유다...(중략)

 

https://vtsen.hashnode.dev/introduction-to-kotlin-flows-and-channels

 

Introduction to Kotlin Flows and Channels

Summarizing the behaviors of using Kotlin flows (cold stream) and Kotlin channels (hot stream)

vtsen.hashnode.dev

채널 데이터는 스트림 외부에서 생성된다. 채널 생성과 hot stream으로의 데이터 전송은 분리된다. hot stream으로 데이터를 보내는 걸 채널의 발신자(sender)라고 한다

hot stream은 버퍼 용량이 있는 버퍼와 같다. 기본 버퍼 용량은 0이다. 버퍼가 오버플로우되면 수신자가 있기 전까지 데이터 전송이 일시 중지된다. 채널의 데이터를 수신하려면 Channel.receive()를 호출한다
버퍼 용량은 기본 0이기 때문에 첫 데이터를 스트림으로 보내는 즉시 버퍼 오버플로우가 발생한다. 따라서 수신자가 있기 전까지 첫 데이터 이후 데이터 전송이 중단된다. 이 동작은 lazy stream과 유사해 보이지만 그렇지 않다. 버퍼 용량이 0보다 크면 버퍼가 다 찰 때까지 수신자가 없어도 데이터 전송이 발생하기 때문에 여전히 eager stream이다. Flow와 달리 새 구독자 or 새 채널의 수신기가 있어도 새로운 스트림이 생성되지 않는다. 여전히 기존 스트림을 사용한다. 따라서 multicast stream이다

위 다이어그램을 기반으로 채널을 구독하는 수신자 #1, #2, #3이 이미 존재한다고 가정하면 채널의 발신자는 1, 2, 3을 hot stream으로 보낸다. 수신자 #1이 1을 받으면 수신자 #2, #3은 더 이상 1을 수신하지 않는다. 대신 #2는 2를 수신하고 #3은 3을 수신한다

 

여러 내용과 그림들을 찾을 수 있었지만 공통적인 내용은 아래와 같다.

 

  • 채널은 Hot Stream이다. 즉 데이터가 생성되면 수신자가 있든 없든 곧바로 전달된다
  • 채널은 일반적으로 두 코루틴 사이에 존재한다. 데이터를 만드는 코루틴을 생산자, 데이터를 받는 코루틴을 소비자라고 한다. 생산자, 소비자는 각각 하나 이상일 수 있다
  • 채널의 기본 용량은 0이다. 채널의 인스턴스를 만들 때 capacity 생성자 매개변수를 통해 내가 만들 채널의 기본 용량을 지정할 수 있다
  • 채널의 send, receive 함수는 모두 중단 함수기 때문에 기본 용량이 다 차면 수신자가 나타나기 전까지 데이터 전송이 일시 정지된다. 반대의 경우에도 마찬가지다

 

즉 채널은 데이터를 생성하는 코루틴과 데이터를 받는 코루틴 사이에서 데이터를 주고받을 때 사용하는 일종의 버퍼라고 볼 수 있다.

이제 코드로는 어떻게 채널을 사용할 수 있는지 확인한다.

 

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

suspend fun main() {
    val channel = Channel<Int>()
    println("시작")
    CoroutineScope(Dispatchers.IO).launch {
        for (i in 1 .. 5) {
            delay(1000L)
            channel.send(i * i)
        }
        channel.close()
    }

    for (j in channel) {
        println(j)
    }
    println("종료")
}
// 시작
// 1
// 4
// 9
// 16
// 25
// 종료

 

종료를 출력하는 println() 위의 for문에서 channel 변수를 다루려면 메인 함수에 suspend 키워드를 붙여야 한다. 그러지 않으면 컴파일 에러가 난다.

왜냐면 해당 for문이 channel 크기만큼 반복하면서 안에 들어있는 값을 출력하기 때문에 코루틴의 receive 연산을 내부적으로 수행하는 것과 같아서, 값을 읽을 때 긴 시간 차단될 수 있기 때문이다. 채널에서 데이터를 받는(receive) 것은 suspend 연산 중 하나기 때문에 해당 코드를 다루는 함수는 suspend 키워드를 사용해 선언돼야 한다.

 

아래는 코루틴 2개와 채널을 같이 사용하는 예시다.

 

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.channels.produce

@ExperimentalCoroutinesApi
suspend fun main() {
    // 채널로 값을 보내는 코루틴
    val numbers = CoroutineScope(Dispatchers.IO).produce {
        for (i in 1 .. 5) {
            delay(1000)
            send(i * i)
            println("i * i send! ${i * i}")
        }
    }

    // 채널에서 값을 받아 출력하는 코루틴
    CoroutineScope(Dispatchers.IO).launch {
        numbers.consumeEach {
            println(it)
        }
    }

    delay(5500) // 코루틴 2개가 완료될 때까지 충분한 여유 시간
}

// i * i send! 1
// 1
// i * i send! 4
// 4
// i * i send! 9
// 9
// i * i send! 16
// 16
// i * i send! 25
// 25

 

produce {}는 채널을 만들고, 그 채널에 값을 보내는 코루틴을 만드는 코루틴 빌더다. consumeEach {}는 채널에서 값을 받아서 사용하는 코루틴을 만드는 코루틴 빌더다.

먼저 produce {} 빌더에서 1초에 한 번씩 두 숫자를 곱해서 채널로 전송한다. i가 1이면 1x1, 2면 2x2,..., 5x5까지 이어진다. 채널로 전송할 때마다 어떤 숫자를 보냈는지 확인하기 위해 println()을 추가했다.

produce {} 빌더 이후 또 다른 CoroutineScope로 별도의 코루틴을 만들고, consumeEach {} 빌더를 numbers를 통해 호출해 채널을 통해 전달받은 값을 출력한다. 마지막으로 2개의 코루틴이 완료될 때까지 충분히 여유있는 시간값을 delay()에 넣어서 호출한다. 이 값이 작으면 produce {} 빌더 안에서 모든 숫자가 전송되기 전에 프로그램이 끝나 버리므로 이 숫자를 유지하던가, produce {} 안의 delay()에 넣은 숫자를 줄인다.

반응형
Comments