RxAndroidをざっくり使ってみた

2015-06-11T16:49:40+09:00 Java Android RxJava

参考1: 【翻訳】AsyncTask と AsyncTaskLoader を rx.Observable に置き換える - RxJava Android Patterns

参考2: http://stablekernel.com/blog/replace-asynctask-asynctaskloader-rx-observable-rxjava-android-patterns (参考1の元)

参考3: 俺が RxAndroid について知っているいくつかのまとめ

大分前から色々話題になってるRxAndroidをちょっとざっくりと使ってみた

※あくまで個人的なメモなので詳しく知りたいなら上記の参考を読んだ方が良い

※0.24以降になるとAndroidScheduler以外は無くなるそうなので以下のAppObservableを使うには0.24より上のバージョンでは使えなくなるらしい

そもそもReactive Programmingって何よ?

http://www.slideshare.net/shinnosukekugimiya/reactive-android によると「反応的プログラミング」

まぁ使ってみたら分かるとは思うんですが、合ってるのか定かじゃないけどJavaScriptにあるPromiseの機構と似た仕組みを持つ物に近いのではないかと

検証の概要

AccountManager.getAuthTokenByFeaturesの結果で返ってくるGoogleアカウントを利用したトークンの取得結果をRxAndroid(もといRxJava)を介して通知させる

MainActivity.java

package sample.test;

import java.io.IOException;

import android.app.Activity;
import android.os.Bundle;
import android.accounts.AccountManager;
import android.accounts.AccountManagerCallback;
import android.accounts.AccountManagerFuture;
import android.widget.Toast;
import android.util.Log;

import rx.Observer;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.android.app.AppObservable;

import static rx.schedulers.Schedulers.*;

public class MainActivity extends Activity {

    private static final String SCOPE = "oauth2:https://www.googleapis.com/auth/userinfo.email";
    private Subscription subscription = Subscriptions.empty();

    @Override
    protected void onCreate(Bundle bundle) {
        super.onCreate(bundle);
        startAuthorization();
    }

    private void startAuthorization() {
        subscription = AppObservable.bindActivity(this, retrieveToken())
            .subscribe(new Observer<String>() {
                @Override
                public void onNext(String s) {
                    showToast("onNext: " + s);
                }

                @Override
                public void onCompleted() {
                    showToast("onCompleted");
                    Log.v("sample", "[subscribe.onCompleted] thread: " + Thread.currentThread());
                }

                @Override
                public void onError(Throwable e) {
                    showToast("onError: " + e.getMessage());
                }
            });
    }

    @Override
    public void onDestroy() {
        subscription.unsubscribe();
        super.onDestroy();
    }

    private void showToast(String text) {
        Toast.makeText(this, text, Toast.LENGTH_LONG).show();
    }

    private Observable<String> retrieveToken() {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(final Subscriber<? super String> subscriber) {
                // ここではobserveOn?で指定したスレッドにより処理される
                Log.v("sample", "[retrieveToken]: " + Thread.currentThread());

                AccountManager accountManager = AccountManager.get(MainActivity.this);
                accountManager.getAuthTokenByFeatures(
                    "com.google",
                    SCOPE,
                    null,
                    MainActivity.this,
                    null,
                    null,
                    new AccountManagerCallback<Bundle>() {
                        @Override
                        public void run(AccountManagerFuture<Bundle> result) {
                            // ここはおそらくintentにより処理されるようになるはずなのでメインスレッドにより処理される

                            try {
                                String token = result.getResult().getString(AccountManager.KEY_AUTHTOKEN);
                                subscriber.onNext(token);
                            } catch (Exception e) {
                                e.printStackTrace();
                            } finally {
                                subscriber.onCompleted();
                            }
                        }
                    },
                    null
                );
            }
        })
        .observeOn(newThread());
    }
}

っていう感じで使えるのですけど、AccountManagerでトークンを取得した際に発生するコールバック上でsubscribe.onNextで値を伝播させるような形でそれをsubscribeした時にonNextで指定した値をObserver<String>.onNextの引数にて参照されるようになる。んまぁ良く解説しているサイトとかではストリームを扱うライブラリがRxJavaのような物みたいな表現しているけど、そのストリームを伝播する役割っていう所なんじゃないかと

ただちょっと色々罠があって

  • AppObservable.bindActivityは基本メインスレッドで動くようになっているのでobserveOnとかでnewThread等を指定するとエラーになる。なのでbindActivityの返り値にobserveOnするのではなく、その引数に指定するObservableに対してobservceOnしないとメインスレッドで動いて場合によってはエラーになるはず
  • subscribeしたSubscriptionは適切にunsubscribeをしないとメモリリークする恐れがあるとの事

ちなみに

あとobservceされる所で例外起きるとonError(Throwable)が発生するとの事。確認してませんが

んまぁちょっと余談が多いですが、この場合とかだと画面を回転させたりした場合とかでもretrieveTokenで指定した処理が行われてしまう事になる訳ですが、画面回転等の場合において部分的にキャッシュするような役割をもたせる事で回避したりする事も出来るとの事(上記参考の翻訳のやつに書いてる)

package sample.test;

import java.io.IOException;

import android.app.Activity;
import android.os.Bundle;
import android.accounts.AccountManager;
import android.accounts.AccountManagerCallback;
import android.accounts.AccountManagerFuture;
import android.widget.Toast;
import android.util.Log;

import rx.Observer;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.subjects.AsyncSubject;
import rx.subscriptions.Subscriptions;
import rx.android.app.AppObservable;

import static rx.schedulers.Schedulers.*;

public class MainActivity extends Activity {

    private static final String SCOPE = "oauth2:https://www.googleapis.com/auth/userinfo.email";
    private static AsyncSubject<String> cacheSubject;
    private Subscription subscription = Subscriptions.empty();

    @Override
    protected void onCreate(Bundle bundle) {
        super.onCreate(bundle);

        if (bundle == null) {
            if (cacheSubject != null) {
                cacheSubject = null;
            }
        }

        startAuthorization();
    }

    private void startAuthorization() {
        subscription = AppObservable.bindActivity(this, retrieveToken())
            .subscribe(new Observer<String>() {
                @Override
                public void onNext(String s) {
                    showToast("onNext: " + s);
                }

                @Override
                public void onCompleted() {
                    showToast("onCompleted");
                }

                @Override
                public void onError(Throwable e) {
                    showToast("onError: " + e.getMessage());
                }
            });
    }

    @Override
    public void onDestroy() {
        subscription.unsubscribe();
        super.onDestroy();
    }

    private void showToast(String text) {
        Toast.makeText(this, text, Toast.LENGTH_LONG).show();
    }

    private Observable<String> retrieveToken() {
        if (cacheSubject == null) {
            cacheSubject = AsyncSubject.create();

            Observable.create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(final Subscriber<? super String> subscriber) {
                    AccountManager accountManager = AccountManager.get(MainActivity.this);
                    accountManager.getAuthTokenByFeatures(
                        "com.google",
                        SCOPE,
                        null,
                        MainActivity.this,
                        null,
                        null,
                        new AccountManagerCallback<Bundle>() {
                            @Override
                            public void run(AccountManagerFuture<Bundle> result) {
                                String token = null;

                                try {
                                    token = result.getResult().getString(AccountManager.KEY_AUTHTOKEN);
                                    subscriber.onNext(token);
                                } catch (Exception e) {
                                    e.printStackTrace();
                                } finally {
                                    subscriber.onCompleted();
                                }
                            }
                        },
                        null
                    );
                }
            })
            .observeOn(newThread())
            .subscribeOn(newThread())
            .subscribe(cacheSubject);
        }

        return cacheSubject;
    }
}

というようにAsyncSubjectを利用してキャッシュしておいて、画面回転時等にはそれを利用してretrieveTokenでキャッシュされた値だけをストリームとして返すObservableようなのとして利用する感じかと

でなんだらかんだらでログを取って見てみると

[開始]
V/sample  ( 1233): bundle = null

# まだretrieveToken処理はされてないのでcacheはnull
V/sample  ( 1233): cache = null
V/sample  ( 1233): [startAuthorization] thread: Thread[main,5,main]

# retrieveToken自体のスレッドはnewThreadしているので別途なスレッド上で処理される(但し、AccountManager関連な部分でUIスレッドで動作する)
V/sample  ( 1233): [retrieveToken] thread: Thread[RxNewThreadScheduler-1,5,main]

# AppObservableでUIスレッドで処理されるのを強制されているはずなのでここはUIスレッドで処理される
V/sample  ( 1233): [subscribe.onCompleted] thread: Thread[main,5,main]
[画面回転]
V/sample  ( 1233): bundle = Bundle[{android:viewHierarchyState=Bundle[{android:Panels=android.util.SparseArray@40d21610, android:views=android.util.SparseArray@40d2da28, android:ActionBar=android.util.SparseArray@40d03a88}]}]

# 前に処理したキャッシュが残ってる(staticなので)
V/sample  ( 1233): cache = rx.subjects.AsyncSubject@40cf2df8

V/sample  ( 1233): [startAuthorization] thread: Thread[main,5,main]

# retrieveToken自体は実行されるがキャッシュがあるのでその値ストリームを使うのでそのコールバックまでは処理されずにキャッシュされたのが利用される
V/sample  ( 1233): [subscribe.onCompleted] thread: Thread[main,5,main]
[一度バックで終了してもう一度起動]
V/sample  ( 1233): bundle = null
V/sample  ( 1233): cache = null
V/sample  ( 1233): [startAuthorization] thread: Thread[main,5,main]
V/sample  ( 1233): [retrieveAccessToken] thread: Thread[RxNewThreadScheduler-3,5,main]
V/sample  ( 1233): [subscribe.onCompleted] thread: Thread[main,5,main]

っていうように画面回転時等にキャッシュしておいたストリームを利用するような事も可能。今回はAsyncSubjectを使ってるがPublishSubject等の他のもあるけど

てな感じでちょっとざっくりとして使い方でやってみて思ったのは

  • RxJavaを理解してないと正直キツい
  • ちゃんとプロファイル取ってチェックしましょう (当たり前なような事で実は出来てなかったりする)

っていう所ですかねと

ということで今後RxJavaに関する勉強はちょっとづつでも進めていく予定で

babel.jsでECMAScript6