설정
ㆍ람다식 사용을 위한 Java 8 설정
1 2 3 4 5 6 | android { compileOptions { sourceCompatibility JavaVersion.VERSION_1_8 targetCompatibility JavaVersion.VERSION_1_8 } } | cs |
ㆍ종속성 설정
1 2 3 | dependencies { implementation 'io.reactivex.rxjava2:rxjava:2.2.8' } | cs |
Hello World
ㆍ람다식 사용
1 2 3 4 5 6 7 8 9 | package rxjava.examples; import io.reactivex.*; public class HelloWorld { public static void main(String[] args) { Flowable.just("Hello world").subscribe(System.out::println); } } | cs |
ㆍ내부 클래스 사용
1 2 3 4 5 6 7 8 | import io.reactivex.functions.Consumer; Flowable.just("Hello world") .subscribe(new Consumer<String>() { @Override public void accept(String s) { System.out.println(s); } }); | cs |
기본 클래스
ㆍio.reactivex.Flowable : 0~N개의 데이터 발행. Reactive-Streams 및 Backpressure 지원
ㆍio.reactivex.Observable : 0~N개의 데이터 발행. Backpressure 없음
ㆍio.reactivex.Single : 하나의 데이터만 발행. onSuccess() 또는 onError() 호출
ㆍio.reactivex.Completable : 데이터를 발행하지 않고 처리 결과만 확인. onError() 또는 onComplete() 호출
ㆍio.reactivex.Maybe : 최대 하나의 데이터만 발행. onSuccess(), onError(), onComplete() 중에 하나만 호출
간단한 백그라운드 계산
ㆍ일반적으로 백그라운드 스레드에서 계산 또는 네트워크 요청을 하고, UI 스레드에서 결과를 나타냅니다.
ㆍ이해하기 쉽게 아래와 같이 작성하였고,
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | import io.reactivex.schedulers.Schedulers; Flowable<String> source = Flowable.fromCallable(() -> { Thread.sleep(1000); // imitate expensive computation return "Done"; }); Flowable<String> runBackground = source.subscribeOn(Schedulers.io()); Flowable<String> showForeground = runBackground.observeOn(Schedulers.single()); showForeground.subscribe(System.out::println, Throwable::printStackTrace); Thread.sleep(2000); | cs |
ㆍ일반적으로는 아래와 같이 빌드 패턴을 사용합니다.
1 2 3 4 5 6 7 8 9 10 11 | import io.reactivex.schedulers.Schedulers; Flowable.fromCallable(() -> { Thread.sleep(1000); // imitate expensive computation return "Done"; }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.single()) .subscribe(System.out::println, Throwable::printStackTrace); Thread.sleep(2000); // <--- wait for the flow to finish | cs |
ㆍsubscribeOn을 사용하여 지정된 스케쥴러에서 작업을 실행합니다.
ㆍobserveOn을 사용하면 subscribe가 지정된 스케쥴러에서 처리됩니다.
스케쥴러
ㆍSchedulers.computation() : 계산용 스케쥴러입니다. 스케쥴러의 스레드 수는 기본적으로 cpu의 수와 같습니다.
ㆍSchedulers.io() : 네트워크 호출, I/O 작업용 스케쥴러입니다. 필요할 때마다 스레드를 계속 생성합니다.
ㆍSchedulers.single() : 순차적, FIFO 방식으로 단일 스레드를 생성하여 작업을 실행합니다.
ㆍSchedulers.trampoline() : 스레드를 생성하지 않고 순차적, FIFO 방식으로 작업을 실행합니다.
안드로이드에는 AndroidSchedulers.mainThread(), SwingScheduler.instance() 또는 JavaFXSchedulers.gui()와 같은 일반적인 스케쥴러가 정의되어 있습니다.
In addition, there is option to wrap an existing Executor (and its subtypes such as ExecutorService) into a Scheduler via Schedulers.from(Executor). This can be used, for example, to have a larger but still fixed pool of threads (unlike computation() and io() respectively).
The Thread.sleep(2000); at the end is no accident. In RxJava the default Schedulers run on daemon threads, which means once the Java main thread exits, they all get stopped and background computations may never happen. Sleeping for some time in this example situations lets you see the output of the flow on the console with time to spare.
동시성
Flows in RxJava are sequential in nature split into processing stages that may run concurrently with each other:
1 2 3 4 | Flowable.range(1, 10) .observeOn(Schedulers.computation()) .map(v -> v * v) .blockingSubscribe(System.out::println); | cs |
This example flow squares the numbers from 1 to 10 on the computation Scheduler and consumes the results on the "main" thread (more precisely, the caller thread of blockingSubscribe). However, the lambda v -> v * v doesn't run in parallel for this flow; it receives the values 1 to 10 on the same computation thread one after the other.
병렬 처리
1 2 3 4 5 6 7 | Flowable.range(1, 10) .flatMap(v -> Flowable.just(v) .subscribeOn(Schedulers.computation()) .map(w -> w * w) ) .blockingSubscribe(System.out::println); | cs |
Practically, parallelism in RxJava means running independent flows and merging their results back into a single flow. The operator flatMap does this by first mapping each number from 1 to 10 into its own individual Flowable, runs them and merges the computed squares.
Note, however, that flatMap doesn't guarantee any order and the end result from the inner flows may end up interleaved. There are alternative operators:
concatMap that maps and runs one inner flow at a time and
concatMapEager which runs all inner flows "at once" but the output flow will be in the order those inner flows were created.
Alternatively, the Flowable.parallel() operator and the ParallelFlowable type help achieve the same parallel processing pattern:
1 2 3 4 5 6 | Flowable.range(1, 10) .parallel() .runOn(Schedulers.computation()) .map(v -> v * v) .sequential() .blockingSubscribe(System.out::println); | cs |
Dependent sub-flows
flatMap is a powerful operator and helps in a lot of situations. For example, given a service that returns a Flowable, we'd like to call another service with values emitted by the first service:
1 2 3 4 5 6 7 8 | Flowable<Inventory> inventorySource = warehouse.getInventoryAsync(); inventorySource.flatMap(inventoryItem -> erp.getDemandAsync(inventoryItem.getId()) .map(demand -> System.out.println("Item " + inventoryItem.getName() + " has demand " + demand)); ) .subscribe(); | cs |
Continuations
Sometimes, when an item has become available, one would like to perform some dependent computations on it. This is sometimes called continuations and, depending on what should happen and what types are involved, may involve various operators to accomplish.
Dependent
The most typical scenario is to given a value, invoke another service, await and continue with its result:
1 2 3 | service.apiCall() .flatMap(value -> service.anotherApiCall(value)) .flatMap(next -> service.finalCall(next)) | cs |
It is often the case also that later sequences would require values from earlier mappings. This can be achieved by moving the outer flatMap into the inner parts of the previous flatMap for example:
1 2 3 4 5 | service.apiCall() .flatMap(value -> service.anotherApiCall(value) .flatMap(next -> service.finalCallBoth(value, next)) ) | cs |
Here, the original value will be available inside the inner flatMap, courtesy of lambda variable capture.
Non-dependent
In other scenarios, the result(s) of the first source/dataflow is irrelevant and one would like to continue with a quasi independent another source. Here, flatMap works as well:
1 2 3 | Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource) continued.map(v -> v.toString()) .subscribe(System.out::println, Throwable::printStackTrace); | cs |
however, the continuation in this case stays Observable instead of the likely more appropriate Single. (This is understandable because from the perspective of flatMapSingle, sourceObservable is a multi-valued source and thus the mapping may result in multiple values as well).
Often though there is a ㅁway that is somewhat more expressive (and also lower overhead) by using Completable as the mediator and its operator andThen to resume with something else:
1 2 3 4 | sourceObservable .ignoreElements() // returns Completable .andThen(someSingleSource) .map(v -> v.toString()) | cs |
The only dependency between the sourceObservable and the someSingleSource is that the former should complete normally in order for the latter to be consumed.
Deferred-dependent
Sometimes, there is an implicit data dependency between the previous sequence and the new sequence that, for some reason, was not flowing through the "regular channels". One would be inclined to write such continuations as follows:
1 2 3 4 5 6 7 | AtomicInteger count = new AtomicInteger(); Observable.range(1, 10) .doOnNext(ignored -> count.incrementAndGet()) .ignoreElements() .andThen(Single.just(count.get())) .subscribe(System.out::println); | cs |
Unfortunately, this prints 0 because Single.just(count.get()) is evaluated at assembly time when the dataflow hasn't even run yet. We need something that defers the evaluation of this Single source until runtime when the main source completes:
1 2 3 4 5 6 7 | AtomicInteger count = new AtomicInteger(); Observable.range(1, 10) .doOnNext(ignored -> count.incrementAndGet()) .ignoreElements() .andThen(Single.defer(() -> Single.just(count.get()))) .subscribe(System.out::println); | cs |
or
1 2 3 4 5 6 7 | AtomicInteger count = new AtomicInteger(); Observable.range(1, 10) .doOnNext(ignored -> count.incrementAndGet()) .ignoreElements() .andThen(Single.fromCallable(() -> count.get())) .subscribe(System.out::println); | cs |
Type conversions
Sometimes, a source or service returns a different type than the flow that is supposed to work with it. For example, in the inventory example above, getDemandAsync could return a Single<DemandRecord>. If the code example is left unchanged, this will result in a compile time error (however, often with misleading error message about lack of overload).
In such situations, there are usually two options to fix the transformation: 1) convert to the desired type or 2) find and use an overload of the specific operator supporting the different type.
Converting to the desired type
Each reactive base class features operators that can perform such conversions, including the protocol conversions, to match some other type. The following matrix shows the available conversion options:
|
Flowable |
Observable |
Single |
Maybe |
Completable |
Flowable |
|
toObservable |
first, firstOrError, single, singleOrError, last, lastOrError(1) |
firstElement, singleElement, lastElement |
ignoreElements |
Observable |
toFlowable(2) |
|
first, firstOrError, single, singleOrError, last, lastOrError(1) |
firstElement, singleElement, lastElement |
ignoreElements |
Single |
toFlowable(3) |
toObservable |
|
toMaybe |
toCompletable |
Maybe |
toFlowable(3) |
toObservable |
toSingle |
|
ignoreElement |
Completable |
toFlowable |
toObservable |
toSingle |
toMaybe |
|
(1): When turning a multi-valued source into a single valued source, one should decide which of the many source values should be considered as the result.
(2): Turning an Observable into Flowable requires an additional decision: what to do with the potential unconstrained flow of the source Observable? There are several strategies available (such as buffering, dropping, keeping the latest) via the BackpressureStrategy parameter or via standard Flowable operators such as onBackpressureBuffer, onBackpressureDrop, onBackpressureLatest which also allow further customization of the backpressure behavior.
(3): When there is only (at most) one source item, there is no problem with backpressure as it can be always stored until the downstream is ready to consume.
Using an overload with the desired type
Many frequently used operator has overloads that can deal with the other types. These are usually named with the suffix of the target type:
Operator |
Overloads |
flatMap |
flatMapSingle, flatMapMaybe, flatMapCompletable, flatMapIterable |
concatMap |
concatMapSingle, concatMapMaybe, concatMapCompletable, concatMapIterable |
switchMap |
switchMapSingle, switchMapMaybe, switchMapCompletable |
The reason these operators have a suffix instead of simply having the same name with different signature is type erasure. Java doesn't consider signatures such as operator(Function<T, Single<R>>) and operator(Function<T, Maybe<R>>) different (unlike C#) and due to erasure, the two operators would end up as duplicate methods with the same signature.
Operator naming conventions
Naming in programming is one of the hardest things as names are expected to be not long, expressive, capturing and easily memorable. Unfortunately, the target language (and pre-existing conventions) may not give too much help in this regard (unusable keywords, type erasure, type ambiguities, etc.).
Unusable keywords
In the original Rx.NET, the operator that emits a single item and then completes is called Return(T). Since the Java convention is to have a lowercase letter start a method name, this would have been return(T) which is a keyword in Java and thus not available. Therefore, RxJava chose to name this operator just(T). The same limitation exists for the operator Switch, which had to be named switchOnNext. Yet another example is Catch which was named onErrorResumeNext.
Type erasure
Many operators that expect the user to provide some function returning a reactive type can't be overloaded because the type erasure around a Function<T, X> turns such method signatures into duplicates. RxJava chose to name such operators by appending the type as suffix as well:
1 2 3 | Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) Flowable<R> flatMapMaybe(Function<? super T, ? extends MaybeSource<? extends R>> mapper) | cs |
Type ambiguities
Even though certain operators have no problems from type erasure, their signature may turn up being ambiguous, especially if one uses Java 8 and lambdas. For example, there are several overloads of concatWith taking the various other reactive base types as arguments (for providing convenience and performance benefits in the underlying implementation):
1 2 3 | Flowable<T> concatWith(Publisher<? extends T> other); Flowable<T> concatWith(SingleSource<? extends T> other); | cs |
Both Publisher and SingleSource appear as functional interfaces (types with one abstract method) and may encourage users to try to provide a lambda expression:
1 2 | someSource.concatWith(s -> Single.just(2)) .subscribe(System.out::println, Throwable::printStackTrace); | cs |
Unfortunately, this approach doesn't work and the example does not print 2 at all. In fact, since version 2.1.10, it doesn't even compile because at least 4 concatWith overloads exist and the compiler finds the code above ambiguous.
The user in such situations probably wanted to defer some computation until the someSource has completed, thus the correct unambiguous operator should have been defer:
1 2 | someSource.concatWith(Single.defer(() -> Single.just(2))) .subscribe(System.out::println, Throwable::printStackTrace); | cs |
Sometimes, a suffix is added to avoid logical ambiguities that may compile but produce the wrong type in a flow:
1 2 3 | Flowable<T> merge(Publisher<? extends Publisher<? extends T>> sources); Flowable<T> mergeArray(Publisher<? extends T>... sources); | cs |
This can get also ambiguous when functional interface types get involved as the type argument T.
Error handling
Dataflows can fail, at which point the error is emitted to the consumer(s). Sometimes though, multiple sources may fail at which point there is a choice whether or not wait for all of them to complete or fail. To indicate this opportunity, many operator names are suffixed with the DelayError words (while others feature a delayError or delayErrors boolean flag in one of their overloads):
1 2 3 | Flowable<T> concat(Publisher<? extends Publisher<? extends T>> sources); Flowable<T> concatDelayError(Publisher<? extends Publisher<? extends T>> sources); | cs |
Of course, suffixes of various kinds may appear together:
1 | Flowable<T> concatArrayEagerDelayError(Publisher<? extends T>... sources); | cs |
Base class vs base type
The base classes can be considered heavy due to the sheer number of static and instance methods on them. RxJava 2's design was heavily influenced by the Reactive Streams specification, therefore, the library features a class and an interface per each reactive type:
Type |
Class |
Interface |
Consumer |
0..N backpressured |
Flowable |
Publisher(1) |
Subscriber |
0..N unbounded |
Observable |
ObservableSource(2) |
Observer |
1 element or error |
Single |
SingleSource |
SingleObserver |
0..1 element or error |
Maybe |
MaybeSource |
MaybeObserver |
0 element or error |
Completable |
CompletableSource |
CompletableObserver |
1The org.reactivestreams.Publisher is part of the external Reactive Streams library. It is the main type to interact with other reactive libraries through a standardized mechanism governed by the Reactive Streams specification.
2The naming convention of the interface was to append Source to the semi-traditional class name. There is no FlowableSource since Publisher is provided by the Reactive Streams library (and subtyping it wouldn't have helped with interoperation either). These interfaces are, however, not standard in the sense of the Reactive Streams specification and are currently RxJava specific only.
출처
https://mindorks.com/android/store/RxJava/reactivex/rxjava
'Android > RxJava2' 카테고리의 다른 글
[안드로이드 RxJava2] Observable 클래스 (0) | 2019.05.26 |
---|