관리 메뉴

나만을 위한 블로그

[Rxjava] Disposable이란? CompositeDisposable이란? 본문

개인 공부/Rxjava

[Rxjava] Disposable이란? CompositeDisposable이란?

참깨빵위에참깨빵 2021. 11. 21. 01:09
728x90
반응형

Rxjava는 데이터를 발행하는 생산자, 발행된 데이터를 받아 처리하는 소비자로 나눠진 형태로 구성된 Reactive Streams를 바탕으로 하는 라이브러리다. 또한 옵저버 패턴을 확장해서 관찰 대상 객체의 상태가 변하면 이걸 관찰하는 객체에 알려주는데, 결국은 누가 누군가를 구독해야 한다.

그럼 구독 해제는 어떻게 할까? 관심없어지면 당연히 구독 해제도 할 수 있어야 하는데 이걸 위해선 어떤 걸 사용하면 좋을까? 이걸 도와주는 것이 Disposable이다.

그 전에 왜 Disposable이란 이름이 붙었을까? 이 단어의 사전적 정의는 아래와 같다.

 

사용 후 버리게 되어 있는, 일회용의 / 이용 가능한

 

이 Disposable에 대한 설명을 찾아봤는데 공식 홈페이지로 보이는 Rxjava Javadoc에서는 Disposable을 매우 간결하게 설명하고 있다.

 

http://reactivex.io/RxJava/javadoc/io/reactivex/disposables/Disposable.html

 

Disposable (RxJava Javadoc 2.2.21)

dispose void dispose() Dispose the resource, the operation should be idempotent.

reactivex.io

1회용 리소스를 나타낸다

 

이 페이지를 더 확인해 보니 Disposable은 인터페이스로 정의되어 있고 2개의 메서드가 정의되어 있다. 각 메서드의 원형과 설명은 아래와 같다.

 

  • void dispose() : 리소스를 삭제한다. 작업은 멱등원이어야 한다
  • boolean isDisposed() : 이 리소스가 삭제된 경우 true를 리턴한다

 

딱히 뭐 없다. 그래서 바로 다른 사람들은 어떻게 설명하는지 찾아봤다.

 

https://blog.yena.io/studynote/2020/12/06/Android-RxJava(4).html 

 

[Android] RxJava Disposable

RxJava 1 포스트 [Android] RxJava 시작하기 RxJava 2 포스트 [Android] RxJava Observable 옵저버블 RxJava 3 포스트 [Android] RxJava Cold Observable, Hot Observable Disposable 이전 포스트들에서 Observable 객체에서 발행할 아이

blog.yena.io

만약 Observable이 발행하는 아이템 개수가 정해져 있다면 모두 발행된 후 onComplete()가 호출되고 안전하게 종료될 것이다. 하지만 아이템을 무한 발행하거나 오래 실행되는 Observable의 경우, 제대로 종료하지 않으면 메모리 누수가 발생할 수 있다. 더 이상 Observable의 구독이 필요없을 때는 이를 폐기(dispose)하는 게 효율적이다. Disposable.dispose()를 호출해 언제든 아이템 발행을 중단할 수 있다.

 

https://www.raywenderlich.com/3983802-working-with-rxjava-disposables-in-kotlin

 

Working with RxJava Disposables in Kotlin

In this tutorial, you’ll learn the basics of RxJava Disposables. You will be building an app from scratch that retrieves real-time cryptocurrency data.

www.raywenderlich.com

Disposable은 단기적인 편의(short-term convenience)를 의미한다. 또한 수명이 짧거나 사용 후 폐기돼야 함을 의미한다. 같은 아이디어가 Rxjava의 Disposables에서 전달된다. Observer가 Emitter 또는 Observable을 구독하면 스트림을 생성한다. 이 스트림은 나중에 1회용 고체 폐기물(solid waste)이 되는 자원을 차지한다. 스트림이 오래 실행될 경우 이 낭비를 처리해야 한다. Observable에는 호출될 때 처리를 수행하는 onComplete()가 있다. 하지만 많은 경우 구독을 쉽고 빠르게 취소할 수 있는 기능이 있는 게 더 편리하다. 스트림은 끝없이 실행되며 메모리 누수를 방지하기 위해 Disposable을 써서 스트림을 처리한다. Disposable은 Observable, Observer 사이의 링크다.

 

번외로 이 Disposable의 사용은 안드로이드의 경우 onDestroy()에서 수행하는 사람도 있다고 한다. onSuccess() 또는 onNext()에서 하지 않는 이유는 여러 번 방출되는 경우도 있고 합리적이지 않기 때문이라고 한다. 자세한 건 아래 링크 참고.

 

https://stackoverflow.com/questions/57246068/rxjava-is-it-correct-to-clear-disposable-in-subscribe

 

RxJava, Is it correct to clear disposable in subscribe?

Usually, I call "disposable.clear()" on "onStop/onDestroy" function. But today, I saw that my co-worker using the below code that calling the clear function in the subscribe function like: val

stackoverflow.com

 

이제 간단하게 Disposable을 어떻게 사용하는지 확인해보자.

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;

import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) {
        Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
        Disposable disposable = observable.subscribe(System.out::println);

        new Thread(() -> {
            try {
                Thread.sleep(2500);
            }   catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("리소스가 폐기됐는가? : " + disposable.isDisposed());
            disposable.dispose();
            System.out.println("disposable.dispose() 호출");
            System.out.println("리소스가 폐기됐는가? : " + disposable.isDisposed());
        }).start();
    }
}

 

interval()은 일정 시간마다 아이템을 발행시키는 생성 연산자다. 그래서 1초마다 아이템 1개가 발행된다.

그리고 구독 해제를 수행할 Disposable을 선언 후 Observable과 매핑시킨다. Observable은 println의 영향으로 1초마다 발행되는 하나의 아이템을 콘솔에 출력하게 된다.

이것은 2.5초 동안 이뤄지기 때문에 2초 동안 0, 1이라는 2개의 값만 내보낸 뒤 try-catch 이후의 코드들이 실행된다.

dispose() 호출 이전에는 아직 호출되지 않았기 때문에 false, 그 이후에는 구독이 해제됐을 테니 true가 나올 것이다.

실제로 위 코드를 실행하면 아래의 화면이 나온다.

 

 

Rxjava 2부터 Observer는 언제든 Disposable을 호출해 구독을 해제시킬 수 있다. onNext(), onError(), onComplete()에서 데이터 방출이 더 이상 필요하지 않다면 언제든 구독을 해제해서 데이터를 안 받을 수 있단 뜻이다.

 

CompositeDisposable이란?

 

이제 Disposable 사용법은 알겠고, 나머진 응용해서 사용해보면 될 것 같다.

그러나 구독자가 하나가 아니라 여러 경우라면 어떻게 해야 할까? Disposable을 일대일 대응되게 만들어서 처리해야 하나? 개발자들이 이딴 귀찮은 작업을 절대로 좋아할 리가 없다. 이 때를 위해 Rxjava 개발자들이 만들어 둔 것이 CompositeDisposable이란 것이다. Composite의 사전적 정의는 아래와 같다.

 

합성의, 합성물

 

대충 예상해 보면 Disposable들이 여러 개 들어 있어서 합성된 것 같아 이런 이름을 붙인 게 아닐까싶다.

CompositeDisposable의 사용법과 실행 결과는 아래와 같다.

 

import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.CompositeDisposable;
import io.reactivex.rxjava3.disposables.Disposable;

import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) {
        Observable<Long> observable = Observable.interval(1, TimeUnit.SECONDS);
        Disposable disposable = observable.subscribe(System.out::println);
        Disposable disposable2 = observable.subscribe(System.out::println);
        Disposable disposable3 = observable.subscribe(System.out::println);
        Disposable disposable4 = observable.subscribe(System.out::println);
        Disposable disposable5 = observable.subscribe(System.out::println);
        CompositeDisposable compositeDisposable = new CompositeDisposable();
        compositeDisposable.add(disposable);
        compositeDisposable.add(disposable2);
        compositeDisposable.add(disposable3);
        compositeDisposable.addAll(disposable4, disposable5);

        new Thread(() -> {
            try {
                Thread.sleep(2500);
            }   catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("리소스가 폐기됐는가? : " + compositeDisposable.isDisposed());
            System.out.println("리소스가 폐기됐는가? : " + disposable.isDisposed());
            System.out.println("리소스가 폐기됐는가? : " + disposable2.isDisposed());
            System.out.println("리소스가 폐기됐는가? : " + disposable3.isDisposed());
            System.out.println("리소스가 폐기됐는가? : " + disposable4.isDisposed());
            System.out.println("리소스가 폐기됐는가? : " + disposable5.isDisposed());
            compositeDisposable.dispose();
            System.out.println("compositeDisposable.dispose() 호출");
            System.out.println("리소스가 폐기됐는가? : " + compositeDisposable.isDisposed());
            System.out.println("리소스가 폐기됐는가? : " + disposable.isDisposed());
            System.out.println("리소스가 폐기됐는가? : " + disposable2.isDisposed());
            System.out.println("리소스가 폐기됐는가? : " + disposable3.isDisposed());
            System.out.println("리소스가 폐기됐는가? : " + disposable4.isDisposed());
            System.out.println("리소스가 폐기됐는가? : " + disposable5.isDisposed());
        }).start();
    }
}

 

add()로 disposable 객체들을 낱개로 담아도 되고, addAll()로 여러 개를 한 번에 담아도 되는 걸 확인할 수 있다.

이렇게 여러 개가 담긴 compositeDisposable 객체를 통해 dispose()를 호출하고 그 전, 그 후로 isDisposed()를 걸어보면 정상 작동하는 중이란 것 또한 확인할 수 있다.

 

안드로이드라면 이걸 어떻게 사용할 수 있을까? 레트로핏과 Rxjava를 같이 사용해서 서버 통신을 한 화면에서 여러 번 수행하는 경우, 사용자가 화면 이탈 시 한꺼번에 구독 해제시키기 위해 CompositeDisposable에 넣고 onDestroy() 쯤에서 dispose()를 호출하는 방법도 있겠다.

반응형
Comments