관리 메뉴

나만을 위한 블로그

[코틀린 코루틴] 26. 일반적인 사용 예제 본문

책/코틀린 코루틴

[코틀린 코루틴] 26. 일반적인 사용 예제

참깨빵위에참깨빵_ 2024. 5. 1. 17:04
728x90
반응형

대부분 앱은 3계층으로 구분할 수 있다.

 

  • 데이터 / 어댑터 계층
  • 도메인 계층
  • 표현 / api / UI 계층

 

각 계층에서 코루틴을 쓰는 일반적인 방법을 확인한다.

 

데이터 / 어댑터 계층

 

레포지토리, 프로바이더, 어댑터, 데이터 소스를 구현하는 계층이다.이 계층을 다루는 건 상대적으로 쉬운 편인데 인기 있는 여러 JVM 라이브러리가 기본적으로 or 몇 가지 의존성을 추가하는 걸로 코루틴을 지원하기 때문이다.

레트로핏으로 요청을 정의한 함수를 블로킹 함수 대신 중단 함수로 만들려면 suspend를 추가한다.

 

interface GithubApi {
    @GET("orgs/{organization}/repos?per_page=100")
    suspend fun getOrganizationRepos(
        @Path("organization") organization: String
    ): List<Repo>
}

 

Room도 중단 함수로 만들기 위해 suspend를 넣을 수 있고 테이블 상태 감지를 위한 Flow도 지원한다.

 

콜백 함수

 

코루틴을 지원하지 않는 라이브러리를 써서 콜백을 반드시 써야 한다면 suspendCancellableCoroutine을 써서 콜백 함수로 바꾼다. 콜백 함수가 노출되면 Continuation 객체의 resume()을 써서 코루틴을 재개해야 한다.

콜백 함수가 취소 가능하다면 invokeOnCancellation 람다식 안에서 취소해야 한다.

 

suspend fun requestNews(): News {
    return suspendCancellableCoroutine<News> { cont -> 
        val call = requestNewsApi { news ->
            cont.resume(news)
        }
        cont.invokeOnCancellation { 
            call.cancel()
        }
    }
}

 

성공, 실패 시 쓰는 함수를 구분하는 콜백을 구현하는 방법은 여러 가지 있다.

 

  • 콜백 함수를 래핑하고 Result를 리턴 타입으로 설정한 뒤, 코루틴을 Result.success 또는 Result.failure로 재개
  • nullable 값을 리턴한 뒤 결과값 or null로 코루틴 재개
  • 콜백 함수 성공 시 결과값을 리턴, 실패 시 중단점에서 예외 던짐

 

블로킹 함수

 

일반적인 중단 함수에선 절대 블로킹 함수를 호출하면 안 되지만 어쩔 수 없이 써야 하는 라이브러리도 있다.

코루틴에선 쓰레드를 정밀하게 사용해서 쓰레드가 블로킹되면 심각한 문제가 발생할 수 있다.

안드로이드에서 Dispatchers.Main 쓰레드가 블로킹되면 전체 앱 실행이 멈춘다. Dispatchers.Default 쓰레드를 블로킹하면 프로세서를 효율적으로 쓰지 못하게 된다. 따라서 디스패처를 명시하지 않고 블로킹 함수를 호출해선 안 된다.

블로킹 함수를 호출하려면 withContext를 써서 디스패처를 명시해야 한다. 대부분 앱에서 레포지토리를 구현할 땐 Dispatchers.IO를 쓰면 된다.

 

class DiscSaveRepository(
    private val discReader: DiscReader
): SaveRepository {
    override suspend fun loadSave(name: String): SaveData =
        withContext(Dispatchers.IO) {
            discReader.read("save/$name")
        }
}

 

IO 쓰레드를 쓰더라도 Dispatchers.IO의 쓰레드가 64개로 제한돼 있어서 백엔드, 안드로이드에서 충분하지 않을 수 있단 걸 알아야 한다.

모든 요청이 블로킹 함수를 호출하고 초당 수천 개의 요청을 처리해야 한다면 64개의 쓰레드를 기다리는 요청이 급격히 증가한다. 이 경우 Dispatchers.IO의 limitedParallelism을 써서 64개보다 많은 쓰레드를 활용하는 새 쓰레드를 만드는 것도 고려할 수 있다.

 

class LibraryGoogleAccountVerifier: GoogleAcountVerifier {
    private val dispatcher = Dispatchers.IO
        .limitedParallelism(100)

    private var verifier = GoogleIdTokenVerifier.Builder()
        .setAudience()
        .build()
    
    override suspend fun getUserData(
        googleToken: String
    ): GoogleUserData? = withContext(dispatcher) {
        verifier.verify(googleToken)
            ?.payload
            ?.let {
                GoogleUserData(
                    email = it.email,
                    name = it.getString("given_name"),
                    surname = it.getstring("family_name"),
                    imageUrl = it.getString("picture")
                )
            }
    }
}

 

CPU 집약적 연산은 Dispatchers.Default, 메인 뷰를 다루는 연산은 Dispatchers.Main.immediate에서 실행해야 한다.

디스패처 설정 시 withContext를 쓸 수도 있다.

 

suspend fun calculateModel() = withContext(Dispatchers.Default) {
    model.fit(
        dataSet = newTrain,
        epochs = 10,
        batchSize = 100,
        verbose = false
    )
}

suspend fun setUserName(name: String) = withContext(Dispatchers.Main.immediate) {
    userNameView.text = name
}

 

플로우로 감지하기

 

중단 함수는 하나의 값을 생성하고 가져오는 과정을 구현하기 적합하다. 하지만 여러 값을 다룰 경우 Flow를 써야 한다.

Room처럼 하나의 DB 연산 수행 시에는 중단 함수를 쓰고 테이블에서 변화를 감지할 땐 Flow를 쓴다.

네트워크 호출도 비슷하다. api로 하나의 값을 가져올 땐 중단 함수를 쓰는 게 가장 좋다.

하지만 웹소켓을 설정하고 메시지를 기다릴 땐 Flow를 써야 한다. 사용하는 라이브러리에서 값을 리턴할 수 없다면 Flow를 만들 때 callbackFlow 또는 channelFlow를 쓴다. flow 빌더 끝에는 awaitClose를 반드시 써야 한다.

 

fun listenMessages(): Flow<List<Message>> = callbackFlow { 
    socket.on("NewMessage") { args ->
        trySend(args.toMessage())
    }
    awaitClose()
}

 

버튼 클릭, 텍스트 변경 등의 UI 이벤트를 감지할 때 주로 Flow를 쓴다.

 

fun EditText.listenTextChange(): Flow<String> = callbackFlow { 
    val watcher = doAfterTextChanged { 
        trySendBlocking(it.toString())
    }
    awaitClose { removeTextChangedListener(watcher) }
}

 

Flow는 콜백 함수로 쓰일 수 있고 콜백 함수가 여러 값을 만들 때 써야 한다.

 

fun flowFrom(api: CallbackBasedApi): Flow<T> = callbackFlow { 
    val callback = object : Callback {
        override fun onNextValue(value: T) {
            trySendBlocking(value)
        }
        
        override fun onApiError(cause: Throwable) {
            cancel(CancellationException("Api Error", cause))
        }
        
        override fun onCompleted() = channel.close()
    }
    
    api.register(callback)
    awaitClose { api.unregister(callback) }
}

 

flow 빌더에서 특정 디스패처가 필요하면 생성된 Flow에서 flowOn을 쓰면 된다.

 

fun fibonacciFlow(): Flow<BigDecimal> = flow {
    var a = BigDecimal.ZERO
    var b = BigDecimal.ONE
    emit(a)
    emit(b)

    while (true) {
        var temp = a
        a = b
        b += temp
        emit(b)
    }
}.flowOn(Dispatchers.Default)

fun filesContentFlow(path: String): Flow<String> = channelFlow { 
    File(path).takeIf { it.exists() }
        ?.listFiles()
        ?.forEach { 
            send(it.readText())
        }
}.flowOn(Dispatchers.IO)

 

도메인 계층

 

이 계층에선 비즈니스 로직을 구현하고 유스케이스, 서비스, 퍼사드 객체를 정의한다.

이 계층에선 코루틴 스코프 객체에서 연산을 처리하거나 중단 함수를 노출시키면 절대 안 된다. 스코프 객체에서 코루틴을 시작하는 건 아래에 있는 표현 계층이 담당해야 한다. 도메인 계층에서 코루틴을 시작하려면 코루틴 스코프 함수를 써야 한다.

실제 예를 보면 도메인 계층에선 다른 중단 함수를 호출하는 중단 함수가 대부분이다.

 

class NetworkUserRepository(
    private val api: UserApi
): UserRepository {
    override suspend fun getUser(): User = api.getUser().toDomainUser()
}

class NetworkNewsService(
    private val newsRepo: NewsRepository,
    private val settings: SettingsRepository,
) {
    suspend fun getNews(): List<News> = newsRepo.getNews()
        .map { it.toDomainNews() }
    
    suspend fun getNewsSummary(): List<News> {
        val type = settings.getNewsSummaryType()
        return newsRepo.getNewsSummary(type)
    }
}

 

동시 호출

 

프로세스 2개를 병렬 실행하려면 함수 본체를 coroutineScope로 래핑하고 내부에서 async 빌더를 써서 각 프로세스를 비동기 실행해야 한다.

 

suspend fun produceCurrentUser(): User = coroutineScope { 
    val profile = async { repo.getProfile() }
    val friends = async { repo.getFriends() }
    User(profile.await(), friends.await())
}

 

두 프로세스를 병렬 실행하는 게 전부고 취소, 예외처리, 컨텍스트 전달 같은 다른 처리는 동일하게 유지돼야 한다.

2개의 비동기 프로세스를 시작한 뒤 완료되는 걸 기다리려면 async 함수를 써서 각 프로세스를 처리하는 코루틴을 만드는 게 방법이다. 하지만 하나의 프로세스만 async로 시작하고 나머지 하나는 기존 코루틴에서 실행해도 같은 결과를 얻을 수 있다.

 

suspend fun getArticlesForUser(
    userToken: String?
): List<ArticleJson> = coroutineScope { 
    val articles = async { articleRepository.getArticles() }
    val user = userService.getUser(userToken)
    articles.await()
        .filter { canSeeOnList(user, it) }
        .map { toArticleJson(it) }
}

 

각자 선호하는 방법을 쓰면 된다.

coroutineScope를 쓰면 자식 코루틴에서 발생한 예외는 coroutineScope가 생성한 코루틴을 중단하게 되어 모든 자식 코루틴을 취소한 뒤 예외를 던지게 만든다. 몇몇 경우엔 이 방식이 부적합할 수 있다.

서로 독립적인 작업 여러 개를 동시 시작하려면 자식 코루틴으로 예외 전파가 안 되는 supervisorScope를 써야 한다.

 

suspend fun notifyAnalytics(actions: List<UserAction>) = supervisorScope { 
    actions.forEach { action -> 
        launch { 
            notifyAnaltics(action)
        }
    }
}

 

표현 / api / UI 계층

 

주로 코루틴으로 시작하는 계층이다. 몇몇 앱에선 스프링부트, 케이터(Ktor) 같은 프레임워크가 모든 작업을 대신해서 표현 계층을 구현하는 게 가장 쉽다. 웹플럭스를 스프링부트와 같이 쓰면 컨트롤러 함수에 suspend만 추가하면 스프링은 함수를 코루틴으로 실행한다.

안드로이드에선 작업 스케줄링을 위해 WorkManager를 쓴다. CoroutineWorker 클래스를 써서 doWork()를 구현하면 작업이 수행해야 하는 것들을 명시할 수 있다. 해당 메서드는 중단 함수고 라이브러리가 코루틴으로 시작하기 때문에 직접 코루틴으로 시작할 필요가 없다.

 

class CoroutineDownloadWorker(
    context: Context,
    params: WorkerParameters
): CoroutineWorker(context, params) {
    override suspend fun doWork(): Result {
        val data = downloadSynchronously()
        saveData(data)

        return Result.success()
    }
}

 

하지만 특정 상황에선 코루틴을 직접 시작해야 할 수 있다. 이 때는 일반적으로 스코프 객체에서 launch를 쓴다.

안드로이드에선 lifecycle-viewmodel-ktx가 있어서 대부분 viewModelScope 또는 lifecycleScope를 쓰면 된다.

 

class UserProfileViewModel(
    private val loadProfileUseCase: LoadProfileUseCase,
    private val updateProfileUseCase: UpdateProfileUseCase,
): ViewModel() {
    private val userProfile = MutableSharedFlow<UserProfileData>()
    
    val userName: Flow<String> = userProfile.map { it.name }
    val userSurname: Flow<String> = userProfile.map { it.surname }
    // ...
    
    fun onCreate() {
        viewModelScope.launch { 
            val userProfileData = loadProfileUseCase.execute()
            userProfile.value = userProfileData
            // ...
        }
    }
    
    fun onNameChanged(newName: String) {
        viewModelScope.launch { 
            val newProfile = userProfile.copy(name = newName)
            userProfile.value = newProfile
            updateProfileUseCase.execute(newProfile)
        }
    }
}

 

커스텀 스코프 만들기

 

코루틴을 시작하거나 스코프를 만드는 라이브러리, 클래스가 없다면 커스텀 스코프를 만들고 스코프에서 코루틴을 시작해야 한다.

 

class NotificationsSender(
    private val client: NotificationsClient,
    private val notificationScope: CoroutineScope,
) {
    fun sendNotifications(notifications: List<Notification>) {
        for (n in notifications) {
            notificationScope.launch { 
                client.send(n)
            }
        }
    }
}

class LatestNewsViewModel(
    private val newsRepository: NewsRepository
): ViewModel() {
    private val _uiState = MutableStateFlow<NewsState>(LoadingNews)
    val uiState: StateFlow<NewsState> = _uiState
    
    fun onCreate() {
        scope.launch {
            _uiState.value = NewsLoaded(newsRepository.getNews())
        }
    }
}

 

CoroutineScope 함수를 써서 커스텀 코루틴 스코프를 정의한다. 스코프 안에서 SupervisorJob을 쓰는 게 일반적으로 통용되는 방식이다.

스코프를 정의할 때 디스패처나 예외 처리기를 명시할 수 있다. 스코프 객체를 취소할 수도 있다. 안드로이드에선 대부분 스코프가 특정 상황에서 취소되거나 자식 스코프를 취소할 수 있어서 '프로세스를 실행하려면 어떤 스코프를 써야 하는가?' 라는 질문을 '어떤 상황에서 프로세스를 취소해야 하는가?'로 요약할 수 있다.

뷰모델이 정리될 때 모델의 스코프는 취소된다. WorkManager는 연관된 작업이 취소될 때 스코프를 취소한다.

 

// 취소, 예외 처리기를 만든 예
abstract class BaseViewModel: ViewModel() {
    private val _failure: MutableLiveData<Throwable> = MutableLiveData()
    val failure: LiveData<Throwable> = _failure
    
    private val exceptionHandler = CoroutineExceptionHandler { _, throwable -> 
        _failure.value = throwable
    }
    
    private val context = Dispatchers.Main + SupervisorJob() + exceptionHandler
    
    protected val scope = CoroutineScope(context)

    override fun onCleared() {
        context.cancelChildren()
    }
}

 

플로우 활용하기

 

Flow를 쓸 때는 onEach에서 변경을 처리하고 launchIn으로 다른 코루틴에서 Flow를 시작하며 onStart로 Flow가 시작될 때 특정 행동을 정의하고, onCompletion으로 Flow가 완료됐을 때 행동을 정의하며 catch로 예외를 처리한다.

Flow에서 발생하는 모든 예외를 처리하려면 가장 마지막에 catch를 명시해야 한다.

 

fun updateNews() {
    newsFlow().onStart { showProgressBar() }
        .onCompletion { hideProgressBar() }
        .onEach { view.showNews(it) }
        .catch { view.handleError(it) }
        .launchIn(viewModelScope)
}

 

안드로이드에선 뷰모델의 MutableStateFlow 타입 프로퍼티에 앱 상태를 나타내는 방법을 자주 쓴다.

상태 변경에 따라 뷰를 갱신하는 코루틴이 해당 프로퍼티를 감지한다.

 

class NewsViewModel: BaseViewModel() {
    private val _loading = MutableStateFlow(false)
    val loading: StateFlow<Boolean> = _loading

    private val _news = MutableStateFlow(emptyList<News>())
    val news: StateFlow<List<News>> = _news

    fun onCreate() {
        newsFlow().onStart { _loading.value = true }
            .onCompletion { _loading.value = false }
            .onEach { _news.value = it }
            .catch { _failure.value = it }
            .launchIn(viewModelScope)
    }
}

class LatestNewsActivity: AppCompatActivity() {
    @Inject
    val newsViewModel: NewsViewModel

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        // ...
        launchOnstarted {
            newsViewModel.loading.collect {
                progressBar.visibility = if (it) View.VISIBLE else View.GONE
            }
        }
        
        launchOnStarted {
            newsViewModel.news.collect {
                newsList.adapter = NewsAdapter(it)
            }
        }
    }
}

 

상태를 나타내는 프로퍼티를 한 Flow에서만 쓴다면 stateIn()을 쓸 수 있다.

started 파라미터에 따라 Flow는 즉시(클래스가 초기화될 때), 지연되서(코루틴이 Flow 데이터를 모으기 시작할 때), 구독할 때 시작한다.

 

class NewsViewModel: BaseViewModel() {
    private val _loading = MutableStateFlow(false)
    val loading: StateFlow<Boolean> = _loading

    private val _news = MutableStateFlow(emptyList<News>())
    val newsState: StateFlow<List<News>> = newsFlow()
        .onStart { _loading.value = true }
        .onCompletion { _loading.value = false }
        .onEach { _news.value = it }
        .catch { _failure.value = it }
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.Lazily,
            initialValue = emptyList()
        )
}

class LocationsViewModel(
    locationService: LocationService
): ViewModel() {
    private val location = locationService.observeLocations()
        .map { it.toLocationsDisplay() }
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.Lazily,
            initialValue = LocationsDisplay.Loading,
        )
    // ...
}

 

StateFlow는 상태를 나타낼 때 사용한다. SharedFlow는 여러 코루틴이 이벤트나 데이터 변경을 감지할 경우 사용한다.

 

class UserProfileViewModel: ViewModel() {
    private val _userChanges = MutableSharedFlow<UserChange>()
    val userChanges: SharedFlow<UserChange> = _userChanges
    
    fun onCreate() {
        viewModelScope.launch { 
            userChanges.collect(::applyUserChange)
        }
    }
    
    fun onNameChnaged(newName: String) {
        // ...
        _userChanges.emit(NameChange(newName))
    }
    
    fun onPublicKeyChanged(newPublicKey: String) {
        // ...
        _userChanges.emit(PublicKeyChange(newPublicKey))
    }
}

 

 

반응형
Comments