관리 메뉴

나만을 위한 블로그

[Rxjava] 생성 연산자(range/rangeLong, timer, defer) 본문

개인 공부/Rxjava

[Rxjava] 생성 연산자(range/rangeLong, timer, defer)

참깨빵위에참깨빵 2021. 11. 21. 23:07
728x90
반응형

※ 이 포스팅에서 사용하는 Rxjava 버전은 3.x이다. 2.x를 사용할 경우 import 과정에서 오류가 날 수 있는데 2.x 버전에서 사용할 수 있도록 import 구문을 바꿔주기만 하면 문제없이 작동한다.

 

예전에 Rxjava의 생성 연산자 중 create, interval, just에 대해 포스팅했던 적이 있다.

 

https://onlyfor-me-blog.tistory.com/329

 

[Rxjava] 연산자의 종류와 생성 연산자(create, interval, just)

Rxjava에는 수많은 연산자들이 있다. http://reactivex.io/documentation/operators.html ReactiveX - Operators Introduction Each language-specific implementation of ReactiveX implements a set of operator..

onlyfor-me-blog.tistory.com

 

이번 포스팅에선 다른 생성 연산자인 range, rangeLong, timer, defer에 대해 정리하려고 한다.

참고로 이번 포스팅에선 Observable, Flowable 중 아무거나 끌리는 걸로 예제 코드를 올리려고 한다. 둘의 쓰는 경우가 다르긴 하지만 예제 포스팅에서 둘을 엄격하게 구분해 쓰는 건 빈대 잡으려고 초가삼간 불태우는 것 같아서, 그냥 끌리는 거 쓴다.

Observable, Flowable의 차이가 궁금하다면 아래 포스팅을 참고하라.

https://onlyfor-me-blog.tistory.com/344?category=944665 

 

[Rxjava] Observable vs Flowable

Rxjava에는 2가지 타입이 있다. 이전에 포스팅한 Observable과 새로 소개할 Flowable이 그것이다. 그러니 2가지를 비교하기 전에 먼저 Flowable이 무엇인지 확인해보자. Flowable을 사전에 치면 아래의 뜻이

onlyfor-me-blog.tistory.com

 

range, rangeLong

 

range는 "다양성, 범위, A에서 B 사이, 포함하다, ~에 이르다" 정도의 뜻이 있다.

그럼 개발자가 입력한 범위 안에서 데이터를 만들어주는 생성자가 아닐까 생각해볼 수 있다.

아래는 공식 홈페이지에서 설명하는 range 연산자의 내용이다.

 

http://reactivex.io/documentation/operators/range.html

 

ReactiveX - Range operator

RxGroovy implements this operator as range. It accepts as its parameters the start value of the range and the number of items in the range. If you set that number of items to zero, the resulting Observable will emit no values (if you set it to a negative n

reactivex.io

특정 범위의 순차 정수를 방출하는 Observable을 생성한다. range 연산자는 범위의 시작, 길이를 선택하는 순서대로 일련의 정수 범위를 내보낸다

 

조금 더 밑으로 내리면 range() 자체는 1.x 버전부터 존재했는지 Rxjava 1.x 탭을 누르면 별도의 설명이 나온다.

 

Rxjava는 이 연산자를 범위로 구현한다. 범위의 시작 값, 아이템 수를 매개변수로 받는다. 아이템 수를 0으로 설정하면 결과 Observable은 값을 방출하지(emit) 않는다. 음수로 설정하면 범위에서 예외가 발생한다. 범위는 기본적으로 특정 스케줄러에서 작동하지 않지만 매개변수로 전달해서 스케줄러를 설정할 수 있는 변형이 있다

 

Rxjava Javadoc에선 아래와 같이 2종류가 있다고 말한다.

 

  • range(int start, int count) : 지정된 범위 안에서 일련의 정수를 내보내는 Observable을 반환한다
        start - 시퀀스의 첫 번째 정수 값
        count - 생성할 순차 정수의 개수
  • range(int start, int count, Scheduler scheduler) : 지정된 스케줄러에서 지정된 범위 안에서 일련의 정수를 내보내는 Observable을 반환한다
        start - 시퀀스의 첫 번째 정수 값
        count - 생성할 순차 정수의 개수
        scheduler - 생성 반복을 실행할 스케줄러

 

말만 들어선 뭔지 모르겠다. 간단한 예제 코드부터 확인해보자.

 

import io.reactivex.rxjava3.core.Observable;

public class Main {
    public static void main(String[] args) {
        Observable<Integer> observable = Observable.range(10, 3);
        observable.subscribe(System.out::println);
    }
}

 

이 코드는 어떤 결과를 보여줄까? 간단한 예제니까 그림을 보면서 예상해보자.

 

 

range(10, 3) 형태니까 가로로 긴 직사각형 안에서 n은 10이고 m은 3이 될 것이다.

또한 밑의 가로줄에 있는 n, n+1, n+2는 각각 10, 11, 12가 될 것이다. 이런 식으로 n+m-1에 해당하는 숫자를 방출할 때까지 계속되는데, 예제 코드로 생각해보면 10+3-1, 즉 12까지의 정수값을 방출할 것으로 예상된다.

콘솔을 확인해보면 예상대로 10~12까지의 정수값들이 출력되는 걸 볼 수 있다.

 

 

그럼 rangeLong은 뭘까? rangeLong은 range()의 매개변수로 넣어주는 값들이 Long 타입이라 이런 이름이 붙은 것이다.

매개변수의 자료형만 다를 뿐 사용법은 똑같기 때문에 rangeLong에 대한 설명은 생략한다.

 

timer

 

이름부터 시간이 몇 초 남았는지 알려주는 타이머를 연상케 하는 생성자다. 제한된 시간 안에 어떤 행동을 하든지, 일정 시간이 지난 다음에 뭘 하든지 둘 중 하나의 행동을 할 것 같다. 공식 문서를 바로 확인해보자.

 

http://reactivex.io/documentation/operators/timer.html

 

ReactiveX - Timer operator

In RxJS there are two versions of the timer operator. The first version of timer returns an Observable that emits a single item after a delay period you specify. You can specify the delay either as a Date object (which means, delay until that absolute mome

reactivex.io

timer()는 일정 시간 후에 특정 아이템을 발행하는 Observable을 생성한다

 

Rxjava 1.x 버전에선 아래와 같이 설명하고 있다.

 

timer는 지정된 시간 후에 단일 숫자 0을 내보내는 Observable을 반환한다. timer는 기본적으로 계산 스케줄러에서 작동하거나, 스케줄러를 최종 매개변수로 전달해서 이를 무시할 수 있다

 

이제 간단한 timer() 예제 코드도 확인해서 실제로 어떻게 작동하는지 확인해 보자.

 

import io.reactivex.rxjava3.core.Flowable;

import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.TimeUnit;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        // "분:초.밀리초" 문자열로 바꾸는 Formatter 설정
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("mm:ss.SSS");

        // 처리를 시작한 시각 출력
        System.out.println("시작한 시각 : " + LocalTime.now().format(formatter));

        // 1초 뒤 숫자 0을 통지하는 Flowable을 생성한다
        // 1번 인자 : 통지될 때까지 걸리는 시간
        // 2번 인자 : 통지할 때까지 대기 시간 단위(밀리초)
        Flowable<Long> flowable = Flowable.timer(1000L, TimeUnit.MILLISECONDS);

        // 발행되는 데이터를 구독 시작한다
        flowable.subscribe(
                data -> {
                    // 쓰레드 이름을 얻고
                    String threadName = Thread.currentThread().getName();
                    // 현재 시각을 Formatter에 정의한 형태대로 가져온다
                    String time = LocalTime.now().format(formatter);
                    System.out.println(threadName + " : " + time + " : data = " + data);
                    },
                error -> System.out.println("에러 : " + error),
                () -> System.out.println("데이터 발행 후 완료 통지 받음"));
        Thread.sleep(1500L);
    }
}

 

실행하면 1.5초 뒤 0을 발행하고 종료되는 걸 볼 수 있다.

 

defer

 

http://reactivex.io/documentation/operators/defer.html

 

ReactiveX - Defer operator

The Defer operator waits until an observer subscribes to it, and then it generates an Observable, typically with an Observable factory function. It does this afresh for each subscriber, so although each subscriber may think it is subscribing to the same Ob

reactivex.io

Observer가 구독할 때까지 Observable을 생성하지 않고, 각 Observer에 대해 새로운 Observable을 생성한다

defer() 연산자는 Observer가 구독할 때까지 기다린 다음 일반적으로 Observable 팩토리 함수를 써서 Observable을 생성한다. 각 Observer에 대해 이 작업을 수행하므로, 각 Observer는 동일한 Observable을 구독한다고 생각할 수 있지만 실제로 각 Observer는 고유한 개별 시퀀스를 갖는다. 어떤 상황에선 Observable을 생성하기 위해 마지막 순간(=구독 시간)까지 이 Observable이 최신 데이터를 포함하도록 할 수 있다

 

Rxjava 1.x 문단에서 설명하는 내용은 아래와 같다.

 

이 연산자는 선택한 Observable 팩토리 함수를 유일한 매개변수로 사용한다. 이 함수는 매개변수를 사용하지 않고 Observable을 반환한다. defer()는 기본적으로 특정 스케줄러에서 작동하지 않는다.

 

Rxjava Javadoc에서 특별히 설명하는 내용은 없어서 여기는 생략하고, 추가적으로 Rxjava 리액티브 프로그래밍에서 설명하는 내용을 첨부한다.

 

defer()는 구독이 발생할 때마다 함수형 인터페이스로 정의한 새로운 Flowable/Observable을 생성하는 연산자다. 생성한 Flowable/Observable이 통지하는 데이터는 함수형 인터페이스로 생성한 Flowable/Observable의 데이터다. 또한 just()와 달리 defer()는 선언 시점의 데이터를 통지하는 게 아닌 호출 시점에 데이터 생성이 필요하면 사용한다

 

아래는 defer()를 사용한 예제 코드다.

 

import io.reactivex.rxjava3.core.Flowable;

import java.time.LocalTime;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Flowable<LocalTime> flowable = Flowable.defer(() -> Flowable.just(LocalTime.now()));
        flowable.subscribe(System.out::print);
        System.out.println(" : 1번째 구독 완료");
        Thread.sleep(1500L);
        flowable.subscribe(System.out::print);
        System.out.println(" : 2번째 구독 완료");
    }
}

 

defer()의 함수형 인터페이스에서 만든 Flowable의 내용이 Subscriber에 통지된다. 그리고 1.5초 뒤 같은 Flowable을 다시 구독했을 때 데이터로 받은 현재 시간이 다르기 때문에 Flowable이 새로 생성된다는 걸 알 수 있다.

이 때 defer()가 아니라 just()로 Flowable을 생성할 경우, 통지되는 데이터는 두 번 구독하더라도 모두 같은 값이 된다. 아래 예시와 실행 결과로 확인할 수 있다.

 

import io.reactivex.rxjava3.core.Flowable;

import java.time.LocalTime;

public class Main {
    public static void main(String[] args) throws InterruptedException {
        Flowable<LocalTime> flowable = Flowable.just(LocalTime.now());
        flowable.subscribe(System.out::print);
        System.out.println(" : 1번째 구독 완료");
        Thread.sleep(1500L);
        flowable.subscribe(System.out::print);
        System.out.println(" : 2번째 구독 완료");
    }
}

 

실제로 defer를 just로 바꿔 보면 같은 값(시간)을 콘솔에 출력하는 걸 볼 수 있다.

반응형
Comments