관리 메뉴

나만을 위한 블로그

[Rxjava] 연산자의 종류와 생성 연산자(create, interval, just) 본문

개인 공부/Rxjava

[Rxjava] 연산자의 종류와 생성 연산자(create, interval, just)

참깨빵위에참깨빵 2021. 5. 26. 17:56
728x90
반응형

Rxjava에는 수많은 연산자들이 있다.

 

http://reactivex.io/documentation/operators.html

 

ReactiveX - Operators

Introduction Each language-specific implementation of ReactiveX implements a set of operators. Although there is much overlap between implementations, there are also some operators that are only implemented in certain implementations. Also, each implementa

reactivex.io

공식 홈페이지에 나와 있는 연산자들만 세봐도 양이 많은 걸 알 수 있다.

그래서 연산자를 어떤 기준으로 나눌 수 있는지 확인하고, 나눠진 기준 별로 어떤 연산자들이 있는지 알아보고자 한다.

 

먼저 공식 홈페이지 기준으로 연산자는 아래의 카테고리들로 분류된다.

  • 생성 연산자
  • 변환 연산자
  • 필터 연산자
  • 결합 연산자
  • 조건 연산자
  • 에러 처리 연산자
  • 기타 연산자

이것들을 하나의 글에 다 담기에는 양이 너무 많으니 생성 연산자부터 기록하려고 한다.

먼저 생성 연산자는 뭘 하는 연산자일까?

새로운 Observable을 생성하는 연산자

공식 홈페이지에선 위와 같이 설명하고 있다. 이 생성 연산자의 종류는 아래와 같다.

  • create
  • defer
  • empty / never / throw
  • from
  • interval
  • just
  • range
  • repeat
  • start
  • timer

이 중에서 몇 개만 확인해보려고 한다. 먼저 create()는 observer 메서드를 프로그래밍 방식으로 호출해 처음부터 Observable을 생성하는 연산자라고 설명되어 있다. create()의 예제는 아래와 같다. 두 문자열을 발행하는 Observable을 생성하는 코드다.

import io.reactivex.rxjava3.core.Observable;

public class CreateOperator
{
    public static void main(String[] args)
    {
        Observable<String> source = Observable.create(emitter -> {
            emitter.onNext("Hello");
            emitter.onNext("Rxjava!");
            emitter.onComplete();
        });

        source.subscribe(System.out::println);
    }
}

위 코드를 실행한 결과는 아래와 같다.

 

emitter라는 걸 통해 hello와 Rxjava! 라는 문자열을 발행하는 걸 볼 수 있다. 그리고 이 source란 이름의 Observable을 구독하기 위해 subscribe()를 호출하고 그 안에 출력 메서드를 넣어 결과를 출력시킨다.

아이템의 발행이 끝나면 onComplete()를 호출해야 한다. 이 메서드가 호출된 이후 아이템이 추가로 발행되더라도 구독자는 추가 발행되는 데이터를 받을 수 없다.

오류가 발생한 경우 onError()를 호출해서 해당 에러를 처리해야 한다.

import io.reactivex.rxjava3.core.Observable;

public class CreateOperator
{
    public static void main(String[] args)
    {
        Observable<String> source = Observable.create(emitter -> {
            emitter.onNext("Hello");
            emitter.onError(new Throwable());
            emitter.onNext("Rxjava!");
            emitter.onComplete();
        });

        source.subscribe(System.out::println,
                throwable -> System.out.println("에러로 인해 종료되었습니다"));
    }
}

 

onError() 이후로는 onComplete()와 같이 추가로 발행한 데이터는 구독자가 받지 못하는 걸 볼 수 있다.

 

just() 연산자는 onNext()와 onComplete()를 쓰지 않고도 데이터 발행이 가능하다.

import io.reactivex.rxjava3.core.Observable;

public class JustOperator
{
    public static void main(String[] args)
    {
        Observable<String> source = Observable.just("Hello", "Rxjava!");
        source.subscribe(System.out::println);
    }
}

 

create() 연산자는 onNext()를 통해 결과값을 방출하지만 just() 연산자는 곧바로 데이터들을 방출한다. 방출하고 나면 옵저버의 역할은 끝난다.

just()를 쓸 때 주의할 건 구독자가 구독할 때 just()의 호출이 끝난 시점까지 쓰레드가 더 이상 진행되지 않고 pending된다는 것이다. 때문에 just()를 쓸 때 무거운 로직을 넣게 되면 성능상 좋지 않다고 한다. 자세한 내용은 아래 링크 참고.

https://softwaree.tistory.com/36

 

[RxJava] create, just, defer와 fromCallable 차이점 & 샘플코드

처음 RxJava에 입문하게 되면 많은 연산자로 인한 혼란의 연속일 것입니다. 그중에 가장 헷갈리는 것이 Observable를 생성하는 연산자인데 이 포스팅에서는 Observable를 기준으로 create, just, defer, fro

softwaree.tistory.com

 

interval()은 일정 시간 간격으로 정수 데이터 흐름을 만들어내는 연산자다. 구독한 시간을 기준으로 그 시간부터 주어진 시간 간격으로 0부터 1씩 증가하는 Long 객체를 발행한다.

import io.reactivex.rxjava3.core.Observable;

import java.util.concurrent.TimeUnit;

public class IntervalOperator
{
    public static void main(String[] args) throws InterruptedException
    {
        Observable<Long> source = Observable.interval(100L, TimeUnit.MILLISECONDS)
                .map(data -> (data + 1) * 100)
                .take(5);
        source.subscribe(System.out::println);
        Thread.sleep(1000);
    }
}

 

100부터 500까지 5개의 숫자들을 1초가 지난 다음 한 번에 출력한다.

여기서 잠깐 이전 글에서 다뤘던 Hot Observable 예제를 확인하고 가자.

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.observables.ConnectableObservable;

import java.util.concurrent.TimeUnit;

public class HotObservableMain
{
    public static void main(String[] args) throws InterruptedException
    {
        hotObservableTest();
    }

    private static void hotObservableTest() throws InterruptedException
    {
        Observable observable = Observable.interval(1, TimeUnit.SECONDS);
        ConnectableObservable connectableObservable = observable.publish();

        connectableObservable.subscribe(item -> System.out.println("Observer 1 : " + item));
        connectableObservable.connect();
        Thread.sleep(3000);

        connectableObservable.subscribe(item -> System.out.println("Observer 2 : " + item));
        Thread.sleep(5000);
    }
}

코드를 보면 sleep()을 써서 코드 진행을 지연시키는 걸 볼 수 있다. sleep()은 왜 쓰는 걸까?

왜냐면 interval()은 메인 쓰레드에서 실행되는 게 아니라 워커 쓰레드에서 실행되기 때문에, interval()로 만든 Observable이 워커 쓰레드에서 작동하면 메인 쓰레드는 자기가 할 일이 없는 줄 알고 프로그램을 종료시켜버린다. 그래서 메인 쓰레드가 프로그램을 종료시키지 않도록 sleep()을 거는 것이다.

 

참고로 위 코드에서 System.out::println이라 쓰인 부분이 있는데, 이것은 println()을 람다식으로 표현한 것이다. 이렇게도 쓸 수 있으니 참고하자.

 

참고한 링크)

https://choheeis.github.io/rxjava/2020/03/09/RxJavaOperatorUpgrade.html

 

[RxJava] 🏈 RxJava Operator 심화

chohee's develop & coding diary

choheeis.github.io

https://blog.yena.io/studynote/2020/10/23/Android-RxJava(2).html 

 

[Android] RxJava Observable 옵저버블

지난 포스트 [Android] RxJava 시작하기에서는 반응형 프로그래밍에 대한 개념을 설명하고 명령형 프로그램과의 차이를 서술했다. 이번 포스트에서는 Observable이 어떻게 동작하는지 알아보자. 그리

blog.yena.io

https://taeiim.tistory.com/entry/RxJava2-4-%EB%A6%AC%EC%95%A1%ED%8B%B0%EB%B8%8C-%EC%97%B0%EC%82%B0%EC%9E%90-%ED%99%9C%EC%9A%A9-%EC%83%9D%EC%84%B1-%EB%B3%80%ED%99%98-%EA%B2%B0%ED%95%A9-%EC%A1%B0%EA%B1%B4-%EC%97%B0%EC%82%B0%EC%9E%90

 

[RxJava2] #4. 리액티브 연산자 - 활용 (생성, 변환, 결합, 조건 연산자)

4. 리액티브 연산자 - 활용 4.1 연산자 분류 연산자 종류가 많아 카테고리 별로 나눔 (ReactiveX 홈페이지 기준) 연산자 종류 연산자 함수 생성 연산자 just(), fromXXX(), create(), interval(), range(), timer(..

taeiim.tistory.com

 

반응형
Comments