RxJavaのlift
参考: RxJava学習のベストプラクティスっぽいもの
※あくまで個人的なメモなので、詳しいことは上記参考参照
まぁ要はOperatorを通す物って感じなのだろうか。やってみた
前提となるテスト
package sample;
import java.util.List;
import org.junit.Test;
import rx.Observable;
import rx.functions.Action1;
import static org.junit.Assert.*;
public class SampleServiceTest {
@Test
public void test() {
Observable<SampleService.Sample> observable = SampleService.getSamples();
/* 普段使う場合?
observable.doOnNext(new Action1<SampleService.Sample>() {
@Override
public void call(SampleService.Sample sample) {
System.out.println(sample.getName());
}
})
.subscribe();
*/
List<SampleService.Sample> samples = observable.toList()
.toBlocking()
.single();
assertEquals(3, samples.size());
assertEquals("hoge", samples.get(0).getName());
assertEquals("fuga", samples.get(1).getName());
assertEquals("foobar", samples.get(2).getName());
}
}
っていうようにObsrevable<SampleService.Sample>を返すメソッドを作って、テスト上なのでtoBlockingを使って取得してその内容をテストするっていうだけの単調な物
これが正しく作用するように実装書く
SampleService.java
前提として入力は文字列型。それをSampleService.Sample型で取得出来るようにする
package sample;
import rx.Observable;
import rx.Subscriber;
public class SampleService {
public static Observable<Sample> getSamples() {
return Observable.from(new String[] { "hoge", "fuga", "hogefuga" })
.lift(new SampleOperator());
}
public static class Sample {
private String name;
public Sample(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
private static class SampleOperator implements Observable.Operator<Sample, String> {
@Override
public Subscriber<String> call(final Subscriber<? super Sample> s) {
return new Subscriber<String>() {
@Override
public void onNext(String str) {
if (!s.isUnsubscribed()) {
s.onNext(new Sample(str));
}
}
@Override
public void onError(Throwable t) {
if (!s.isUnsubscribed()) {
s.onError(t);
}
}
@Override
public void onCompleted() {
if (!s.isUnsubscribed()) {
s.onCompleted();
}
}
};
}
}
}
入力が文字列なので、そこからストリームとして流れる経過途中でOperatorを使って文字列からSampleクラス型への変換を行う感じ。んまぁ適当な例だけど、これなら別に自前でOperator作らなくても普通にmapでも出来る
package sample;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
public class SampleService {
public static Observable<Sample> getSamples() {
return Observable.from(new String[] { "hoge", "fuga", "hogefuga" })
.map(new Func1<String, Sample>() {
@Override
public Sample call(String t1) {
return new Sample(t1);
}
});
}
public static class Sample {
private String name;
public Sample(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
}
なんだけど、昔からあるのかは分からないがObservable.Transformerっていうのを使っても出来る
package sample;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;
public class SampleService {
public static Observable<Sample> getSamples() {
return Observable.from(new String[] { "hoge", "fuga", "hogefuga" })
.compose(new SampleTransformer());
}
public static class Sample {
private String name;
public Sample(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
private static class SampleTransformer implements Observable.Transformer<String, Sample> {
@Override
public Observable<Sample> call(Observable<String> source) {
return source.map(new Func1<String, Sample>() {
@Override
public Sample call(String t1) {
return new Sample(t1);
}
});
}
}
}
まぁっていう感じでストリームとして流れるデータを処理するOperatorを差し込む事が出来るような感じな機能なのかなと。まだちょっと良く分かってない
ちなみにRxJava Toolbox for Android(通称:rxt4a)にあるOperatorAddCompositeSubscriptionっていうのソースを読むのも良いかと
もうちょい勉強して追記なりネタなり書くかも