일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |
- 스택 자바 코드
- 객체
- 안드로이드 레트로핏 사용법
- 안드로이드 유닛테스트란
- jvm이란
- 2022 플러터 안드로이드 스튜디오
- 안드로이드 레트로핏 crud
- 안드로이드 유닛 테스트
- 안드로이드 라이선스
- 플러터 설치 2022
- ar vr 차이
- 2022 플러터 설치
- android ar 개발
- 서비스 쓰레드 차이
- ANR이란
- 안드로이드 유닛 테스트 예시
- 큐 자바 코드
- Rxjava Observable
- android retrofit login
- 안드로이드 os 구조
- 스택 큐 차이
- 클래스
- rxjava cold observable
- 안드로이드 라이선스 종류
- rxjava disposable
- 멤버변수
- 서비스 vs 쓰레드
- rxjava hot observable
- jvm 작동 원리
- 자바 다형성
- Today
- Total
나만을 위한 블로그
[Rxjava] 연산자의 종류와 생성 연산자(create, interval, just) 본문
Rxjava에는 수많은 연산자들이 있다.
http://reactivex.io/documentation/operators.html
공식 홈페이지에 나와 있는 연산자들만 세봐도 양이 많은 걸 알 수 있다.
그래서 연산자를 어떤 기준으로 나눌 수 있는지 확인하고, 나눠진 기준 별로 어떤 연산자들이 있는지 알아보고자 한다.
먼저 공식 홈페이지 기준으로 연산자는 아래의 카테고리들로 분류된다.
- 생성 연산자
- 변환 연산자
- 필터 연산자
- 결합 연산자
- 조건 연산자
- 에러 처리 연산자
- 기타 연산자
이것들을 하나의 글에 다 담기에는 양이 너무 많으니 생성 연산자부터 기록하려고 한다.
먼저 생성 연산자는 뭘 하는 연산자일까?
새로운 Observable을 생성하는 연산자
공식 홈페이지에선 위와 같이 설명하고 있다. 이 생성 연산자의 종류는 아래와 같다.
- create
- defer
- empty / never / throw
- from
- interval
- just
- range
- repeat
- start
- timer
이 중에서 몇 개만 확인해보려고 한다. 먼저 create()는 observer 메서드를 프로그래밍 방식으로 호출해 처음부터 Observable을 생성하는 연산자라고 설명되어 있다. create()의 예제는 아래와 같다. 두 문자열을 발행하는 Observable을 생성하는 코드다.
import io.reactivex.rxjava3.core.Observable;
public class CreateOperator
{
public static void main(String[] args)
{
Observable<String> source = Observable.create(emitter -> {
emitter.onNext("Hello");
emitter.onNext("Rxjava!");
emitter.onComplete();
});
source.subscribe(System.out::println);
}
}
위 코드를 실행한 결과는 아래와 같다.
emitter라는 걸 통해 hello와 Rxjava! 라는 문자열을 발행하는 걸 볼 수 있다. 그리고 이 source란 이름의 Observable을 구독하기 위해 subscribe()를 호출하고 그 안에 출력 메서드를 넣어 결과를 출력시킨다.
아이템의 발행이 끝나면 onComplete()를 호출해야 한다. 이 메서드가 호출된 이후 아이템이 추가로 발행되더라도 구독자는 추가 발행되는 데이터를 받을 수 없다.
오류가 발생한 경우 onError()를 호출해서 해당 에러를 처리해야 한다.
import io.reactivex.rxjava3.core.Observable;
public class CreateOperator
{
public static void main(String[] args)
{
Observable<String> source = Observable.create(emitter -> {
emitter.onNext("Hello");
emitter.onError(new Throwable());
emitter.onNext("Rxjava!");
emitter.onComplete();
});
source.subscribe(System.out::println,
throwable -> System.out.println("에러로 인해 종료되었습니다"));
}
}
onError() 이후로는 onComplete()와 같이 추가로 발행한 데이터는 구독자가 받지 못하는 걸 볼 수 있다.
just() 연산자는 onNext()와 onComplete()를 쓰지 않고도 데이터 발행이 가능하다.
import io.reactivex.rxjava3.core.Observable;
public class JustOperator
{
public static void main(String[] args)
{
Observable<String> source = Observable.just("Hello", "Rxjava!");
source.subscribe(System.out::println);
}
}
create() 연산자는 onNext()를 통해 결과값을 방출하지만 just() 연산자는 곧바로 데이터들을 방출한다. 방출하고 나면 옵저버의 역할은 끝난다.
just()를 쓸 때 주의할 건 구독자가 구독할 때 just()의 호출이 끝난 시점까지 쓰레드가 더 이상 진행되지 않고 pending된다는 것이다. 때문에 just()를 쓸 때 무거운 로직을 넣게 되면 성능상 좋지 않다고 한다. 자세한 내용은 아래 링크 참고.
https://softwaree.tistory.com/36
interval()은 일정 시간 간격으로 정수 데이터 흐름을 만들어내는 연산자다. 구독한 시간을 기준으로 그 시간부터 주어진 시간 간격으로 0부터 1씩 증가하는 Long 객체를 발행한다.
import io.reactivex.rxjava3.core.Observable;
import java.util.concurrent.TimeUnit;
public class IntervalOperator
{
public static void main(String[] args) throws InterruptedException
{
Observable<Long> source = Observable.interval(100L, TimeUnit.MILLISECONDS)
.map(data -> (data + 1) * 100)
.take(5);
source.subscribe(System.out::println);
Thread.sleep(1000);
}
}
100부터 500까지 5개의 숫자들을 1초가 지난 다음 한 번에 출력한다.
여기서 잠깐 이전 글에서 다뤘던 Hot Observable 예제를 확인하고 가자.
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.observables.ConnectableObservable;
import java.util.concurrent.TimeUnit;
public class HotObservableMain
{
public static void main(String[] args) throws InterruptedException
{
hotObservableTest();
}
private static void hotObservableTest() throws InterruptedException
{
Observable observable = Observable.interval(1, TimeUnit.SECONDS);
ConnectableObservable connectableObservable = observable.publish();
connectableObservable.subscribe(item -> System.out.println("Observer 1 : " + item));
connectableObservable.connect();
Thread.sleep(3000);
connectableObservable.subscribe(item -> System.out.println("Observer 2 : " + item));
Thread.sleep(5000);
}
}
코드를 보면 sleep()을 써서 코드 진행을 지연시키는 걸 볼 수 있다. sleep()은 왜 쓰는 걸까?
왜냐면 interval()은 메인 쓰레드에서 실행되는 게 아니라 워커 쓰레드에서 실행되기 때문에, interval()로 만든 Observable이 워커 쓰레드에서 작동하면 메인 쓰레드는 자기가 할 일이 없는 줄 알고 프로그램을 종료시켜버린다. 그래서 메인 쓰레드가 프로그램을 종료시키지 않도록 sleep()을 거는 것이다.
참고로 위 코드에서 System.out::println이라 쓰인 부분이 있는데, 이것은 println()을 람다식으로 표현한 것이다. 이렇게도 쓸 수 있으니 참고하자.
참고한 링크)
https://choheeis.github.io/rxjava/2020/03/09/RxJavaOperatorUpgrade.html
https://blog.yena.io/studynote/2020/10/23/Android-RxJava(2).html
'개인 공부 > Rxjava' 카테고리의 다른 글
[Rxjava] Single, Maybe, Completable이란? (0) | 2021.11.17 |
---|---|
[Rxjava] Observable vs Flowable (0) | 2021.06.21 |
[Rxjava] Observable이란? - 2 - (0) | 2021.05.26 |
[Rxjava] Observable이란? - 1 - (0) | 2021.05.26 |
[Rxjava] 인텔리제이에서 Rxjava 사용하기 (0) | 2021.05.25 |