RxJava의 다양한 연산자를 알고 가기 전, 꼭 알아야 하는 개념이 있다고 한다. 바로 Hot Observable
과 Cold Observable
이다.
Cold Observable
데이터 소스에서 발행되는 Data들은 구독자가 구독을 하고 나서부터 발행을 시작한다. 즉, 구독자는 데이터 소스의 Data를 모두 받음을 보장한다.
사실 Cold Observable은 이전 포스팅에서 다루었던 Observable, Flowable, Single 등 거의 대부분 해당된다. 하지만 이번 포스팅에서 '데이터를 모두 받음을 보장한다는 것이 뭘까'에 대해 알아보자.
String[] strings = new String[]{"딸기", "소스", "치킨"};
Flowable<String> source = Flowable.fromArray(strings).subscribeOn(Schedulers.computation()).observeOn(Schedulers.single());
//첫번째 구독자
source.subscribe(next -> {
System.out.println(Thread.currentThread().getName() + " Subscriber # 1: " + next);
});
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
//두번째 구독자
source.subscribe(next -> {
System.out.println(Thread.currentThread().getName() + " Subscriber # 2: " + next);
});
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
//결과
RxSingleScheduler-1 Subscriber # 1: 딸기
RxSingleScheduler-1 Subscriber # 1: 소스
RxSingleScheduler-1 Subscriber # 1: 치킨
RxSingleScheduler-1 Subscriber # 2: 딸기
RxSingleScheduler-1 Subscriber # 2: 소스
RxSingleScheduler-1 Subscriber # 2: 치킨
- Flowable
- RxJava 2.x에서 추가된 1..N Data flow 흐름을 명시하는 클래스. fromArray로 배열을 받아 Flowable을 생성하였다. 데이터 흐름에 따라 onNext(), onComplete(), onError() 이벤트를 발생한다.
- Subscriber
- 데이터 소스에 subcribe 함수를 호출하여 데이터 소스에서 발생하는 이벤트를 구독한다. 매개변수가 하나 일 경우 onNext 이벤트 발생에 대한 명시이다.
2개의 subscribe()는 Thread.sleep()
으로 인해 다른 시간에 구독이 된다. 데이터의 발행은 현재 지연이 없으므로 순식간에 발행되는 데 1초가 지나고도 두 번째 구독자는 모든 데이터를 이벤트로 받는다.
Hot Observable
구독자의 유무 상관없이 데이터 생성되는 시점에 Data를 발행한다. 그래서 구독자들은 Observable에 대해 모든 Data를 받는 것을 보장받지 못한다.
BaseClasses 들은 모두 Cold Observable이므로 Hot Observable로 변경해야 한다. 그 방법으로는 2가지가 있다.
Subjects
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
/**
* Returns true if the subject has any Observers.
* <p>The method is thread-safe.
* @return true if the subject has any Observers
*/
Subject의 특징은 데이터의 발행과 구독이 모두 가능하다는 점이다. 그 이유는 Observable
과 Observer
를 모두 구현했기 때문이다.
Subject는 클래스 그대로 쓰이지 않고 Subject를 구현한 자식 클래스들을 사용한다. 하나씩 알아보자.
- AsyncSubject
데이터 소스에서 onComplete 호출 이전 마지막 Data만 가져온다.
AsyncSubject<String> subject = AsyncSubject.create();
subject.subscribe(data ->
System.out.println(Thread.currentThread().getName() + " Subscriber # 1 = > " + data));
subject.onNext("1");
subject.onNext("3");
subject.subscribe(data ->
System.out.println(Thread.currentThread().getName() + " Subscriber # 2 = > " + data));
subject.onNext("5");
subject.onComplete();
//아래 Data는 받을 수 없다.
subject.onNext("8");
//결과
main Subscriber # 1 = > 5
main Subscriber # 2 = > 5
- 데이터 소스는 총 3개의 Data를 발행한다. 하지만
AsyncSubject
는 onComplete가 호출되기 전 마지막 Data인 "5"만 이벤트로 받는다. onComplete 호출이 되면 구독자의 dispose() 함수가 호출되면서 연결이 끊킨다. - BehaviorSubject 첫 번째 구독자는 이전 Data 발행이 없으므로
createDefault
로 설정한 기본값 "6" Data를 받는다. 두 번째 구독자는 구독하는 시점의 가장 최근 발행된 "3" Data를 받는다.
BehaviorSubject<String> subject =
BehaviorSubject.createDefault("6"); //구독 이전 데이터 발행이 없으므로 기본값 6을 받음
subject.subscribe(data ->
System.out.println(Thread.currentThread().getName() + " Subscriber # 1 = > " + data));
subject.onNext("1");
subject.onNext("3");
subject.subscribe(data ->
System.out.println(Thread.currentThread().getName() + " Subscriber # 2 = > " + data));
subject.onNext("5");
subject.onComplete();
//결과
main Subscriber # 1 = > 6
main Subscriber # 1 = > 1
main Subscriber # 1 = > 3
main Subscriber # 2 = > 3
main Subscriber # 1 = > 5
main Subscriber # 2 = > 5
- 구독한 시점의 가장 최근의 값 혹은 기본 값 Data부터 받아온다.
- PublishSubject두 번째 구독자의 구독 시점에는 Data "1", "3"은 이미 발행된 후로 두번째 구독자는 "5" Data만 받게 된다.
PublishSubject<String> subject = PublishSubject.create();
subject.subscribe(data ->
System.out.println(Thread.currentThread().getName() + " Subscriber # 1 = > " + data));
subject.onNext("1");
subject.onNext("3");
subject.subscribe(data ->
System.out.println(Thread.currentThread().getName() + " Subscriber # 2 = > " + data));
subject.onNext("5");
subject.onComplete();
//결과
main Subscriber # 1 = > 1
main Subscriber # 1 = > 3
main Subscriber # 1 = > 5
main Subscriber # 2 = > 5
- 가장 평범한 Subject로 구독한 시점부터 발행된 데이터를 구독한다.
- ReplaySubjectHot Observable의 특성과 맞게 Data 생성고 동시에 발행되지만 모든 구독자에게 모든 Data 발행을 보장한다. 따라서 Backpressure 관리가 필요하다.
- 이 Subject는 특별하게도 Cold Observable과 비슷하게 구독 시점과 상관없이 모든 Data를 받는다.
ReplaySubject<String> subject = ReplaySubject.create();
subject.subscribe(data ->
System.out.println(Thread.currentThread().getName() + " Subscriber # 1 = > " + data));
subject.onNext("1");
subject.onNext("3");
subject.subscribe(data ->
System.out.println(Thread.currentThread().getName() + " Subscriber # 2 = > " + data));
subject.onNext("5");
subject.onComplete();
//결과
main Subscriber # 1 = > 1
main Subscriber # 1 = > 3
main Subscriber # 2 = > 1
main Subscriber # 2 = > 3
main Subscriber # 1 = > 5
main Subscriber # 2 = > 5
두 번째 구독자 이전에 "1", "3"의 Data를 발행했음에도 Cold Observable과 동일하게 모든 Data를 받는다.
ConnectableObservable
Cold Observable을 Hot Observable로 만드는 두 번째 방법이다.
String[] dt = {"1", "2", "3"};
Observable<String> balls = Observable.interval(100L, TimeUnit.MILLISECONDS) //Observable<Long>
.map(Long::intValue)
.map(i -> dt[i])
.take(dt.length);
ConnectableObservable<String> source = balls.publish();
source.subscribe(data -> System.out.println(Thread.currentThread().getName() + " Subscriber # 1 = > " + data));
source.subscribe(data -> System.out.println(Thread.currentThread().getName() + " Subscriber # 2 = > " + data));
source.connect();//데이터 발행 시작
try {
Thread.sleep(250L);
source.subscribe(data -> System.out.println(Thread.currentThread().getName() + " Subscriber # 3 = > " + data));
Thread.sleep(100L);
}catch (InterruptedException e) {
e.printStackTrace();
}
//결과
RxComputationThreadPool-1 Subscriber # 1 = > 1
RxComputationThreadPool-1 Subscriber # 2 = > 1
RxComputationThreadPool-1 Subscriber # 1 = > 2
RxComputationThreadPool-1 Subscriber # 2 = > 2
RxComputationThreadPool-1 Subscriber # 1 = > 3
RxComputationThreadPool-1 Subscriber # 2 = > 3
RxComputationThreadPool-1 Subscriber # 3 = > 3
코드가 제일 복잡한 것 같다. 하나씩 보자.
- Observable.interval(long period, TimeUnit..)
- Observable을 생성하는 interval 연산자로 period 만큼 지연되면서 Data를 발행한다. 발행되는 Data는 0,1,2,3,4...이다.
- map
- 위 interval 연산자로 생성되는 Observable은 Long형이므로 dt Data의 인덱스를 의미하기 위해 Integer로 변환한다.
- take(int size)
- Observable의 최대 Data 발행 수를 size 수만큼 제한한다.
- ConnectableObservable
- Hot Observable로 Observable의 publish() 호출로 생성된다. 데이터의 발행은 connect함수 호출 후로 이뤄지는 특징이 있다.
첫 번째와 두 번째 구독자가 구독을 시작하는 시점에는 ConnectableObservable의 connect가 호출되지 않았다. 따라서 Data가 아직 발행되지 않았으므로 첫 번째와 두 번째 구독자는 모든 Data를 받는다.
connect() 호출 이후, 데이터 발행이 시작된다. interval 연산자로 인해 데이터 발행은 매번 100L 0.1초
지연되고 나서 발행된다.
세 번째 구독자는 connect() 호출 이후, Thread.sleep 함수로 250L 0.25초
지연된다. 데이터 발행의 지연과 비교했을 때 두 번의 Data가 이미 발행됐을 시간이다. 결과를 봐도 세 번째 Data만 받는 것을 알 수 있다.
정리하며
실제로 많이 사용되는 경우는 Cold Observable이다. 앱을 예시로 들면 네트워크 비동기 통신을 이해할 수 있다. 데이터를 요청하는 앱 즉 구독자는 서버로부터 모든 Data를 받는 것을 보장받아야 하기 때문이다. 하지만 모든 Data를 받는다는 것은 Data를 어딘가, 버퍼에 저장해두는 것으로 OutOfMemory를 주의해야 한다. 따라서 적은 Data 흐름을 생성할 때, 주로 사용한다.
Hot Observable도 필요한 곳이 있다. 예시를 들면 클릭 이벤트 등이다. 안드로이드 개발을 해보면 사용자가 버튼을 순간적으로 여러 번 눌렀을 때 한 번만 요청하도록 고민한다. 버튼의 enable을 조작하여 위의 문제 등을 해결하기도 했다. RxJava의 Hot Observable을 이용하면 쉽게 해결이 가능하다.
참고
- RxJava 프로그래밍
'Language > RxJava' 카테고리의 다른 글
[RxJava] 여러개 Completable 결합 - andThen (0) | 2021.05.06 |
---|---|
[RxJava] Operators - 변환 (0) | 2021.01.24 |
[RxJava] Operators - 생성 (0) | 2021.01.24 |
[RxJava] BaseClasses (0) | 2020.10.26 |
Hello RxJava, Rx는 왜 사용할까? (3) | 2020.10.17 |