일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- android ar 개발
- 멤버변수
- 객체
- rxjava cold observable
- 플러터 설치 2022
- jvm 작동 원리
- 자바 다형성
- 클래스
- 스택 자바 코드
- 서비스 쓰레드 차이
- 안드로이드 유닛 테스트 예시
- 안드로이드 라이선스
- ar vr 차이
- 안드로이드 레트로핏 crud
- 2022 플러터 설치
- Rxjava Observable
- 안드로이드 유닛테스트란
- ANR이란
- 2022 플러터 안드로이드 스튜디오
- jvm이란
- rxjava disposable
- 안드로이드 os 구조
- 안드로이드 라이선스 종류
- 스택 큐 차이
- 안드로이드 유닛 테스트
- android retrofit login
- rxjava hot observable
- 서비스 vs 쓰레드
- 안드로이드 레트로핏 사용법
- 큐 자바 코드
- Today
- Total
나만을 위한 블로그
[코틀린 코루틴] 23. 플로우 처리 본문
map
Flow의 각 원소를 변환 함수에 따라 변환한다. 숫자들을 Flow로 갖고 있고 각 수의 제곱을 계산하는 연산이라면 생성된 Flow는 이 수들의 제곱을 값으로 갖는다.
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
suspend fun main() {
flowOf(1, 2, 3) // [1, 2, 3]
.map { it * it } // [1, 4, 9]
.collect { print(it) }// 149
}
map을 구현하려면 flow 빌더를 써서 새 Flow를 만들면 된다. 그리고 만들어진 Flow에서 원소들을 모으고 변형된 원소들을 하나씩 내보내면 된다.
map은 값을 꺼내거나 다른 형태로 바꾸는 등으로 사용할 수 있다.
// 입력 이벤트로부터 사용자 액션을 받음
fun actionsFlow(): Flow<UserAction> = observeInputEvents().map { toAction(it.code) }
// User를 UserJson으로 변환
fun getAllUser(): Flow<UserJson> = userRepository.getAllUsers()
.map { it.toUserJson() }
filter
원래 Flow에서 주어진 조건에 맞는 값들만 가진 Flow를 리턴한다.
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.filter
suspend fun main() {
(1 .. 10).asFlow() // [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
.filter { it <= 5 } // [1, 2, 3, 4, 5]
.filter { isEven(it) } // [2, 4]
.collect { print(it) } // 24
}
fun isEven(num: Int): Boolean = num % 2 == 0
take, drop
특정 수의 원소만 통과시키기 위해 take를 쓸 수 있다.
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.take
suspend fun main() {
('A' .. 'Z').asFlow()
.take(5) // [A, B, C, D, E]
.collect { print(it) } // ABCDE
}
drop을 쓰면 특정 수의 원소를 무시할 수 있다.
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.drop
suspend fun main() {
('A' .. 'Z').asFlow()
.drop(20) // [U, V, W, X, Y, Z]
.collect { print(it) } // UVWXYZ
}
merge, zip, combine
두 Flow를 하나의 Flow로 합치는 법은 여러 가지가 있다. 가장 간단한 방법은 두 Flow에서 생성된 원소들을 하나로 합치는 것이다. 어떤 변경도 필요없고 Flow의 원소가 어디서 왔는지도 중요하지 않다. 최상위 레벨 함수인 merge를 써서 이런 과정을 수행할 수 있다.
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.toList
suspend fun main() {
val ints: Flow<Int> = flowOf(1, 2, 3)
val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)
val together: Flow<Number> = merge(ints, doubles)
print(together.toList())
}
// [1, 2, 3, 0.1, 0.2, 0.3]
merge를 쓰면 한 Flow의 원소가 다른 Flow를 기다리지 않는다. merge는 여러 이벤트를 똑같은 방법으로 처리할 때 사용할 수 있다.
zip은 두 Flow로부터 쌍을 만든다. 각 원소는 한 쌍의 일부가 되므로 쌍이 될 원소를 기다려야 한다. 쌍을 이루지 못하고 남은 원소는 유실되므로 한 Flow에서 zipping이 완료되면 생성되는 Flow도 완료된다. 다른 Flow도 마찬가지다.
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.zip
suspend fun main() {
val flow1 = flowOf("A", "B", "C")
.onEach { delay(400) }
val flow2 = flowOf(1, 2, 3, 4)
.onEach { delay(1000) }
flow1.zip(flow2) { f1, f2 -> "${f1}_${f2}" }
.collect { println(it) }
}
// (1초 후)
// A_1
// (1초 후)
// B_2
// (1초 후)
// C_3
combine은 zip처럼 원소들로 쌍을 형성하기 때문에 첫 번째 쌍을 만들기 위해 느린 Flow를 기다려야 한다. combine을 쓰면 모든 새 원소가 전입자를 대체한다. 첫 번째 쌍이 이미 만들어졌다면 다른 Flow의 이전 원소와 함께 새 쌍이 만들어진다.
zip은 쌍을 필요로 하기 때문에 첫 Flow가 닫히면 함수도 끝난다. combine은 그런 제한이 없어서 두 Flow가 모두 닫힐 때까지 원소를 내보낸다.
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onEach
suspend fun main() {
val flow1 = flowOf("A", "B", "C")
.onEach { delay(400) }
val flow2 = flowOf(1, 2, 3, 4)
.onEach { delay(1000) }
flow1.combine(flow2) { f1, f2 -> "${f1}_${f2}" }
.collect { println(it) }
}
// (1초 후)
// B_1
// (0.2초 후)
// C_1
// (0.8초 후)
// C_2
// (1초 후)
// C_3
// (1초 후)
// C_4
fold, scan
fold는 초기값부터 시작해서 주어진 원소 각각에 대해 두 값을 하나로 합치는 연산을 적용해서 컬렉션의 모든 값을 하나로 합친다. 첫 값은 0이고 덧셈 연산을 할 때 결과값은 모든 수를 더한 값이 된다.
초기값인 0을 먼저 가져오고 첫 원소인 1을 더한다. 결과값인 1에 두 번째 수인 2를 더하고, 결과값인 3에 세 번째 수인 3을 더해서 최종 결과값인 10이 fold로부터 리턴된다.
fun main() {
val list = listOf(1, 2, 3, 4)
val res = list.fold(0) { acc, i -> acc + i }
println(res)
val res2 = list.fold(1) { acc, i -> acc * i }
println(res2)
}
// 10
// 24
fold 대신 scan을 쓸 수도 있다. scan은 누적되는 과정의 모든 값을 생성하는 중간 연산이다.
scan은 이전 단계에서 값을 받은 즉시 새 값을 만들기 때문에 Flow에서 유용하게 쓰인다.
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.scan
suspend fun main() {
flowOf(1, 2, 3, 4)
.onEach { delay(1000) }
.scan(0) { acc, v -> acc + v }
.collect { println(it) }
}
// 0
// (1초 후)
// 1
// (1초 후)
// 3
// (1초 후)
// 6
// (1초 후)
// 10
flatMapConcat, flatMapMerge, flatMapLatest
flatMap은 map과 비슷하지만 변환 함수가 평탄화된 컬렉션을 리턴한다는 게 다르다.
부서 목록이 있고 각 부서가 사원 리스트를 가졌다면 flatMap을 써서 전체 부서의 사원 리스트를 만들 수 있다.
val allEmployees: List<Employee> = departments
.flatMap { department -> department.employees }
// 맵을 쓰면 리스트의 리스트를 얻는다
val listOfListsOfEmployee: List<List<Employee>> = departments
.map { department -> department.employees }
flatMapConcat은 생성된 Flow를 하나씩 처리한다. 그래서 2번째 플로우는 첫 플로우가 완료됐을 때 시작 가능하다.
아래 예에서 각 알파벳이 만드는 Flow는 알파벳과 함께 1~3을 가지며 생성 주기는 1초다.
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flatMapConcat
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
fun flowFrom(elem: String) = flowOf(1, 2, 3)
.onEach { delay(1000) }
.map { "${it}_${elem}" }
suspend fun main() {
flowOf("A", "B", "C")
.flatMapConcat { flowFrom(it) }
.collect { println(it) }
}
// (1초 후)
// 1_A
// (1초 후)
// 2_A
// (1초 후)
// 3_A
// (1초 후)
// 1_B
// (1초 후)
// 2_B
// (1초 후)
// 3_B
// (1초 후)
// 1_C
// (1초 후)
// 2_C
// (1초 후)
// 3_C
flatMapMerge는 만들어진 Flow를 동시에 처리한다.
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
fun flowFrom(elem: String) = flowOf(1, 2, 3)
.onEach { delay(1000) }
.map { "${it}_${elem}" }
suspend fun main() {
flowOf("A", "B", "C")
.flatMapMerge { flowFrom(it) }
.collect { println(it) }
}
// (1초 후)
// 1_A
// 1_B
// 1_C
// (1초 후)
// 2_C
// 2_B
// 2_A
// (1초 후)
// 3_C
// 3_B
// 3_A
concurrency 인자를 써서 동시 처리 가능한 Flow 개수를 설정할 수 있다. 기본값은 16이지만 DEFAULT_CONCURRENCY_PROPERTY_NAME 프로퍼티로 바꿀 수 있다.
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
fun flowFrom(elem: String) = flowOf(1, 2, 3)
.onEach { delay(1000) }
.map { "${it}_${elem}" }
suspend fun main() {
flowOf("A", "B", "C")
.flatMapMerge(concurrency = 2) { flowFrom(it) }
.collect { println(it) }
}
// (1초 후)
// 1_A
// 1_B
// (1초 후)
// 2_C
// 2_B
// (1초 후)
// 3_C
// 3_B
// (1초 후)
// 1_C
// (1초 후)
// 2_C
// (1초 후)
// 3_C
flatMapMerge를 쓰면 2가지 이점이 있다.
- 동시성 인자를 제어하고 같은 시간에 얼마만큼의 종류를 처리할지 결정할 수 있다
- Flow를 리턴해서 데이터가 생성될 때마다 다음 원소를 내보낼 수 있다. 함수 사용 측면에서 보면 데이터를 즉시 처리할 수 있다
suspend fun getOffers(categories: List<Category>): Flow<Offer> = coroutineScope {
categories.map { async { api.requestOffers(it) } }
.flatMap { it.await() }
}
// 더 나은 방법
suspend fun getOffers(categories: List<Category>): Flow<Offer> = categories.asFlow()
.flatMapMerge(concurrency = 20) {
suspend { api.requestOffers(it) }.asFlow()
// 또는 flow { emit(api.requestOffers(it)) }
}
재시도(retry)
예외는 Flow를 따라 흐르면서 각 단계를 하나씩 종료한다. 종료된 단계는 비활성화되기 때문에 예외가 발생한 뒤 메시지 보내기는 불가능하지만 각 단계가 이전 단계에 대한 참조를 갖고 있으며 Flow를 재시작하기 위해 참조를 쓸 수 있다.
이 원리에 기반해서 코틀린은 retry, retryWhen 함수를 제공한다.
retryWhen은 Flow의 이전 단계에서 예외가 발생할 때마다 predicate를 확인한다. predicate는 예외가 무시되고 이전 단계가 재시작돼야 하는지 또는 Flow를 계속 종료해야 하는지를 정한다.
대부분 몇 번까지 재시도할지, 네트워크 연결 예외 같은 특정 예외 클래스가 발생했을 때만 처리할지를 명시한다. 아래는 retry 사용법이다.
suspend fun main() {
flow {
emit(1)
emit(2)
error("E")
emit(3)
}.retry(retries = 3) {
print(it.message)
true
}.collect { print(it) }
}
// 12E12E12E12Exception in thread "main" java.lang.IllegalStateException: E
중복 제거 함수
반복되는 원소가 동일하다고 판단되면 제거하는 distinctUntilChanged 함수도 유용하다. 이 함수는 바로 이전 원소와 동일한 원소만 제거한다.
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
suspend fun main() {
flowOf(1, 2, 2, 3, 2, 1, 1, 3)
.distinctUntilChanged()
.collect { print(it) } // 123213
}
// distinctUntilChanged의 구현
fun <T> Flow<T>.distinctUntilChanged(): Flow<T> = flow {
var previous: Any? = NOT_SET
collect {
if (previous == NOT_SET || previous != it) {
emit(it)
previous = it
}
}
}
private val NOT_SET = Any()
'책 > 코틀린 코루틴' 카테고리의 다른 글
[코틀린 코루틴] 26. 일반적인 사용 예제 (0) | 2024.05.01 |
---|---|
[코틀린 코루틴] 24. 공유 플로우, 상태 플로우 (0) | 2024.04.25 |
[코틀린 코루틴] 22. 플로우 생명주기 함수 (0) | 2024.04.16 |
[코틀린 코루틴] 21. 플로우 만들기 (0) | 2024.04.13 |
[코틀린 코루틴] 19. 플로우란 무엇인가 (0) | 2024.04.13 |