ReativeX는 Data를 스트림 형식으로 전달하면서 여러 이벤트를 발생하고 관찰자는 해당 이벤트일 때 어떤 행동을 할 것인지 정의할 수 있다. 그리고 그 스트림과 이벤트는 Rx의 BaseClasses에 따라 차이를 보인다.
Observable
Observable은 RxJava 1.x에서 유일하게 수의 제한 없는 (0.. N flows) Data의 흐름을 명시하기 위해 사용했던 클래스이다.
하지만 Observable은 자동으로 Backpressure
를 지원하지 않아서 별도의 함수 호출로 해당 기능을 지원하게 만들어야 했고, 그렇지만 Rx를 잘 알지 못하는 초보 개발자들을 위해 버전 2.x부터 Backpressure
를 자동으로 지원하는 Flowable
클래스가 나왔다. 그래서 요즘 최신 RxJava 글들을 보면 Observable보다 Flowable을 사용하는 글들이 많이 보이긴 하지만, Observable을 아예 사용하지 말자는 아닌 것 같다.
//source.operator1().operator2().subcribe()
public static void main(String[] args) {
String[] strings = new String[]{"딸기", "소스", "치킨"};
Observable.fromArray(strings)
.subscribe(next -> {
System.out.println(Thread.currentThread().getName() + " onNext: " + next);
}, error -> {
System.out.println(Thread.currentThread().getName() + " onError: " + error.getMessage());
}, () -> {
System.out.println(Thread.currentThread().getName() + " onComplete");
});
}
//출력
main onNext: 딸기
main onNext: 소스
main onNext: 치킨
main onComplete
Observable에서 발생하는 이벤트를 subscribe()
함수로 이벤트에 대한 작업을 명시할 수 있다.
- onNext : source에서 전달하는 0.. N의 Data를 순차적으로 받는 이벤트이다.
- onError : 데이터를 처리하는 과정에서 발생하는 ERROR를 캐치하여 전달받는 이벤트로 해당 이벤트 이후 Obserable은 흐름은 종료된다. 이 매개변수를 정의하지 않을 경우 ERROR 발생 시 Crash가 난다. (Android 앱 종료)
- onComplete : source로 부터 데이터 전달이 완료되었을 때를 의미하는 이벤트이다.
create()
operator로 Data 스트림을 직접 생성할 때, onComplete 호출 이후로 onNext를 호출하면 안 된다.
Flowable
Flowable는 RxJava 2.x 버전에서 Observable을 보안하기 위해 나온 클래스이며 Backpressure
기능을 자동으로 지원해준다.
우선 기본 흐름과 기능, 발생하는 이벤트는 Observable과 동일하며, 함수 호출에 필요한 매개변수 타입이 약간 다른 점이 있다.
//source.operator1().operator2().subcribe()
public static void main(String[] args) {
String[] strings = new String[]{"딸기", "소스", "치킨"};
Flowable.fromArray(strings)
.subscribe(next -> {
System.out.println(Thread.currentThread().getName() + " onNext: " + next);
}, error -> {
System.out.println(Thread.currentThread().getName() + " onError: " + error.getMessage());
}, () -> {
System.out.println(Thread.currentThread().getName() + " onComplete");
});
}
//출력
main onNext: 딸기
main onNext: 소스
main onNext: 치킨
main onComplete
Backpressure
그러면 이 Obserable과 Flowable의 차이점이라고 볼 수 있는 BackPressure
이란 무엇일까? 해당 기능은 흐름 제어라고 볼 수 있다.
Data를 발행하는 속도보다 해당 Data를 처리하는 속도가 느릴 때, 미처 처리하지 못하는 Data들이 버퍼에 쌓여 간다. 그 말은 불필요한 메모리를 잡아먹고 있다는 뜻이 되고 자칫 OutOfMemoryException
이 발생되어 앱이 종료될 수 있다. 이런 상황을 가만히 방관하지 않고 흐름 제어하는 것이 BackPressure
라고 볼 수 있다.
예시를 보자.
Observable.range(1, 1000_000_000)
.map(x -> {
try {
Thread.sleep(10L);
System.out.println(Thread.currentThread().getName() + " sender Data:" + x);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
})
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.single())
.subscribe(next -> {
try {
Thread.sleep(1000L);
System.out.println(Thread.currentThread().getName() + " onNext: " + next);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, error -> {
System.out.println(Thread.currentThread().getName() + " onError: " + error.getMessage());
}, () -> {
System.out.println(Thread.currentThread().getName() + " onComplete");
});
//observeOn에서 Worker 쓰레드로 선언안해줘서 sender가 1개만 됐음
//3십몇만 까지 계속 sender함
try {
Thread.sleep(80000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//출력
...(생략)
RxSingleScheduler-1 onNext: 3
RxComputationThreadPool-1 sender Data:285
RxComputationThreadPool-1 sender Data:286
RxComputationThreadPool-1 sender Data:287
RxComputationThreadPool-1 sender Data:288
RxComputationThreadPool-1 sender Data:289
RxComputationThreadPool-1 sender Data:290
RxComputationThreadPool-1 sender Data:291
RxComputationThreadPool-1 sender Data:292
RxComputationThreadPool-1 sender Data:293
RxComputationThreadPool-1 sender Data:294
...
Obserable은 range()
operator를 이용하여 1에서 10억까지의 Data의 스트림이 만들어진다. 그리고 Thread의 sleep로 너무 빠른 Data의 출력을 지연시키기 위해 데이터를 변환시키는 map()
operator를 이용한다.
subscribe
operator의 onNext
이벤트에서 Thread의 sleep 함수로 지연을 준다. 스트림의 데이터를 발행할 때는 sleep(10L)
, 데이터를 처리할 때는 sleep(1000L)
로 100배 느려지기 때문에 출력과 같은 상황이 발생할 것이다.
다음은 테스트하다가 며칠 고민한 부분이다. 스트림을 관찰하여 처리하는 스레드를 명시하는 operator observeOn()
, 또는 스트림의 Data를 발행하는 스레드를 명시하는 operator subscribeOn
둘 중에 최소 observeOn를 명시해야 Backpressure 예시 상황이 성립된다. 발행과 관찰이 같은 스레드에서 일어날 경우 발행을 할 때는 관찰이 막혀있고, 관찰을 할때는 발행이 막혀있어 예시에서 원하는 문제 상황이 발생하지 않는다.
이제 출력을 보면, 관찰자가 3번째 데이터를 처리했을 때, Data는 이미 300개 가깝게 발행되고 있다. 관찰자의 처리하는 속도로 보았을 때, 앞으로의 데이터 발행은 불필요하게 느껴질 것이다. 기본적인 Obserable에는 늘어나는 데이터를 저장하기 위해 버퍼의 사이즈가 점점 커지게 되고 10억까지 데이터를 차마 받지 못한 채 어느 시점에서 OutOfMemory
에러가 발생할 것이다.
그렇다면 Flowable은?
Flowable.range(1, 1000_000_000)
.onBackpressureBuffer()
.map(x -> {
try {
Thread.sleep(10L);
System.out.println(Thread.currentThread().getName() + " sender Data:" + x);
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
})
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.single())
.subscribe(next -> {
try {
Thread.sleep(1000L);
System.out.println(Thread.currentThread().getName() + " onNext: " + next);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, error -> {
System.out.println(Thread.currentThread().getName() + " onError: " + error.getMessage());
}, () -> {
System.out.println(Thread.currentThread().getName() + " onComplete");
});
//MainThread 가 종료되지 않아야 실행이 된다.
try {
Thread.sleep(80000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//출력
RxComputationThreadPool-1 sender Data:119
RxComputationThreadPool-1 sender Data:120
RxComputationThreadPool-1 sender Data:121
RxComputationThreadPool-1 sender Data:122
RxComputationThreadPool-1 sender Data:123
RxComputationThreadPool-1 sender Data:124
RxComputationThreadPool-1 sender Data:125
RxComputationThreadPool-1 sender Data:126
RxComputationThreadPool-1 sender Data:127
RxComputationThreadPool-1 sender Data:128
RxSingleScheduler-1 onNext: 2
RxSingleScheduler-1 onNext: 3
RxSingleScheduler-1 onNext: 4
RxSingleScheduler-1 onNext: 5
..(생략)
RxSingleScheduler-1 onNext: 50
RxComputationThreadPool-1 sender Data:129
Observable에서 Flowable로 변경했고 다른 변경한 부분은 없다. 그래서 출력 부분만 보면 된다.
Data가 128개까지 발행된 후 더 이상 발행을 멈추고 관찰자의 처리가 그 뒤를 이어가는 것을 볼 수 있다. 또 어느 정도 관찰자가 버퍼의 데이터를 처리하면 다시 데이터 발행을 시작한다.
public abstract class Flowable<@NonNull T> implements Publisher<T> {
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
BUFFER_SIZE = Math.max(1, Integer.getInteger("rx3.buffer-size", 128));
}
출력에서도 볼 수 있듯이 Flowable은 최소 버퍼 사이즈를 128로 명시하여 Backpressure을 지원한다.
Single
//Single.just("Hello", "RxJava") Error 발생
Single.just("Single RxJava")
.subscribe(success -> {
System.out.println(Thread.currentThread().getName() + " onSuccess: " + success);
}, error -> {
System.out.println(Thread.currentThread().getName() + " onError: " + error.getMessage());
});
//출력
main onSuccess: Single RxJava
Single
은 오직 1개의 데이터 스트림과 그에 따른 Error를 처리하는 클래스이다. 발생하는 이벤트로 성공을 의미하는 onSuccess
와 Error를 의미하는 onError
가 있다. 오직 하나의 Data 출력만을 의미하기에 just operator에 2개 이상의 Data를 넣을 경우 컴파일 오류를 발생한다
Completable
Completable.complete()
.delay(1, TimeUnit.MILLISECONDS)
.subscribe(() -> {
System.out.println(Thread.currentThread().getName() + " onCompleted");
}, error -> {
System.out.println(Thread.currentThread().getName() + " onError: " + error.getMessage());
});
try {
Thread.sleep(5000);
}catch (InterruptedException e) {
e.printStackTrace();
}
//출력
RxComputationThreadPool-1 onCompleted
Completable
은 Data 없이 오직 완료와 Error만 전달한다. 그에 따라 발생하는 이벤트로 완료를 의미하는 onCompleted
와 Error를 의미하는 onError
가 있다.
Maybe
Maybe.just("Hello Maybe")
.subscribe(success -> {
System.out.println(Thread.currentThread().getName() + " onSuccess: " + success);
}, error -> {
System.out.println(Thread.currentThread().getName() + " onError: " + error.getMessage());
}, () -> {
System.out.println(Thread.currentThread().getName() + " onCompleted");
} );
//출력
main onSuccess: Hello Maybe
Maybe.fromCallable(() -> {
String str = null;
return str;
}).subscribe(success -> {
System.out.println(Thread.currentThread().getName() + " onSuccess: " + success);
}, error -> {
System.out.println(Thread.currentThread().getName() + " onError: " + error.getMessage());
}, () -> {
System.out.println(Thread.currentThread().getName() + " onCompleted");
} );
//출력
main onCompleted
Maybe는 0개 또는 최대 1개의 Data를 발행한다. 발생하는 이벤트로는 Data가 있을 경우 onSuccess, Data가 없을 경우는 onComplte가 있다.
참고
'Language > RxJava' 카테고리의 다른 글
[RxJava] 여러개 Completable 결합 - andThen (0) | 2021.05.06 |
---|---|
[RxJava] Operators - 변환 (0) | 2021.01.24 |
[RxJava] Operators - 생성 (0) | 2021.01.24 |
[RxJava] Hot Observable (0) | 2020.11.05 |
Hello RxJava, Rx는 왜 사용할까? (3) | 2020.10.17 |