Transformer 用途
Transformer,顾名思义是转换器的意思。早在 RxJava1.x 版本就有了Observable.Transformer、Single.Transformer和Completable.Transformer,在2.x版本中变成了ObservableTransformer、SingleTransformer、CompletableTransformer、FlowableTransformer和MaybeTransformer。其中,FlowableTransformer和MaybeTransformer是新增的。由于 RxJava2 将Observable拆分成 Observable 和 Flowable,所以多了一个FlowableTransformer。同时,Maybe是 RxJava2 新增的一个类型,所以多了MaybeTransformer。
Transformer 能够将一个 Observable/Flowable/Single/Completable/Maybe 对象转换成另一个 Observable/Flowable/Single/Completable/Maybe 对象,和调用一系列的内联操作符是一模一样的。
举个简单的例子,写一个transformer()方法将一个发射整数的Observable转换为发射字符串的Observable。
public staticObservableTransformer transformer() { return new ObservableTransformer () { @Override public ObservableSource apply(@NonNull Observable upstream) { return upstream.map(new Function () { @Override public java.lang.String apply(@NonNull Integer integer) throws Exception { return java.lang.String.valueOf(integer); } }); } }; }复制代码
接下来是使用transformer()方法,通过标准的RxJava的操作。
Observable.just(123,456) .compose(transformer()) .subscribe(new Consumer() { @Override public void accept(@io.reactivex.annotations.NonNull String s) throws Exception { System.out.println("s="+s); } });复制代码
最后打印了二次,分别是
s=123s=456复制代码
通过这个例子,可以简单和直观地了解到Transformer的作用。
其实,在大名鼎鼎的图片加载框架 Glide 以及 Picasso 中也有类似的transform概念,能够将图形进行变换。
跟compose操作符相结合
compose操作于整个数据流中,能够从数据流中得到原始的Observable/Flowable...
当创建Observable/Flowable...时,compose操作符会立即执行,而不像其他的操作符需要在onNext()调用后才执行。关于compose操作符,老外的这篇文章不错
国内也有相应的翻译常用的场景
1. 切换到主线程
对于网络请求,我们经常会做如下的操作来切换线程。
.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())复制代码
于是,我做了一个简单的封装。
import io.reactivex.FlowableTransformerimport io.reactivex.ObservableTransformerimport io.reactivex.android.schedulers.AndroidSchedulersimport io.reactivex.schedulers.Schedulers/** * Created by Tony Shen on 2017/7/13. */object RxJavaUtils { @JvmStatic fun observableToMain():ObservableTransformer { return ObservableTransformer{ upstream -> upstream.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) } } @JvmStatic fun flowableToMain(): FlowableTransformer { return FlowableTransformer{ upstream -> upstream.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) } }}复制代码
上面这段代码是Kotlin写的,为啥不用Java?个人习惯把一些工具类来用Kotlin来编写,而且使用lambda表达式也更为直观。
对于Flowable切换到主线程的操作,可以这样使用
.compose(RxJavaUtils.flowableToMain())复制代码
2. RxLifecycle中的LifecycleTransformer
trello出品的能够配合Android的生命周期,防止App内存泄漏,其中就使用了LifecycleTransformer。
知乎也做了一个类似的,能够做同样的事情。在我的项目中也使用了知乎的RxLifecycle,根据个人的习惯和爱好,我对LifecycleTransformer稍微做了一些修改,将五个Transformer合并成了一个。
import org.reactivestreams.Publisher;import io.reactivex.Completable;import io.reactivex.CompletableSource;import io.reactivex.CompletableTransformer;import io.reactivex.Flowable;import io.reactivex.FlowableTransformer;import io.reactivex.Maybe;import io.reactivex.MaybeSource;import io.reactivex.MaybeTransformer;import io.reactivex.Observable;import io.reactivex.ObservableSource;import io.reactivex.ObservableTransformer;import io.reactivex.Single;import io.reactivex.SingleSource;import io.reactivex.SingleTransformer;import io.reactivex.annotations.NonNull;import io.reactivex.functions.Function;import io.reactivex.functions.Predicate;import io.reactivex.processors.BehaviorProcessor;/** * Created by Tony Shen on 2017/5/25. */public class LifecycleTransformerimplements ObservableTransformer , FlowableTransformer , SingleTransformer , MaybeTransformer , CompletableTransformer { private final BehaviorProcessor lifecycleBehavior; private LifecycleTransformer() throws IllegalAccessException { throw new IllegalAccessException(); } public LifecycleTransformer(@NonNull BehaviorProcessor lifecycleBehavior) { this.lifecycleBehavior = lifecycleBehavior; } @Override public CompletableSource apply(Completable upstream) { return upstream.ambWith( lifecycleBehavior.filter(new Predicate () { @Override public boolean test(@LifecyclePublisher.Event Integer event) throws Exception { return event == LifecyclePublisher.ON_DESTROY_VIEW || event == LifecyclePublisher.ON_DESTROY || event == LifecyclePublisher.ON_DETACH; } }).take(1).flatMapCompletable(new Function () { @Override public Completable apply(Integer flowable) throws Exception { return Completable.complete(); } }) ); } @Override public Publisher apply(final Flowable upstream) { return upstream.takeUntil( lifecycleBehavior.skipWhile(new Predicate () { @Override public boolean test(@LifecyclePublisher.Event Integer event) throws Exception { return event != LifecyclePublisher.ON_DESTROY_VIEW && event != LifecyclePublisher.ON_DESTROY && event != LifecyclePublisher.ON_DETACH; } }) ); } @Override public MaybeSource apply(Maybe upstream) { return upstream.takeUntil( lifecycleBehavior.skipWhile(new Predicate () { @Override public boolean test(@LifecyclePublisher.Event Integer event) throws Exception { return event != LifecyclePublisher.ON_DESTROY_VIEW && event != LifecyclePublisher.ON_DESTROY && event != LifecyclePublisher.ON_DETACH; } }) ); } @Override public ObservableSource apply(Observable upstream) { return upstream.takeUntil( lifecycleBehavior.skipWhile(new Predicate () { @Override public boolean test(@LifecyclePublisher.Event Integer event) throws Exception { return event != LifecyclePublisher.ON_DESTROY_VIEW && event != LifecyclePublisher.ON_DESTROY && event != LifecyclePublisher.ON_DETACH; } }).toObservable() ); } @Override public SingleSource apply(Single upstream) { return upstream.takeUntil( lifecycleBehavior.skipWhile(new Predicate () { @Override public boolean test(@LifecyclePublisher.Event Integer event) throws Exception { return event != LifecyclePublisher.ON_DESTROY_VIEW && event != LifecyclePublisher.ON_DESTROY && event != LifecyclePublisher.ON_DETACH; } }) ); }}复制代码
3. 缓存的使用
对于缓存,我们大致都会这样写
cache.put(key,value);复制代码
更优雅一点的做法是使用AOP,大致会这样写
@Cacheable(key = "...")getValue() { ....}复制代码
如果你想在RxJava的链式调用中也使用缓存,还可以考虑使用transformer的方式,下面我写了一个简单的方法
/** * Created by Tony Shen on 2017/7/13. */public class RxCache { public staticFlowableTransformer transformer(final String key, final Cache cache) { return new FlowableTransformer () { @Override public Publisher apply(@NonNull Flowable upstream) { return upstream.map(new Function () { @Override public T apply(@NonNull T t) throws Exception { cache.put(key,(Serializable) t); return t; } }); } }; }}复制代码
结合上述三种使用场景,封装了一个方法用于获取内容,在这里网络框架使用Retrofit。虽然Retrofit本身支持通过Interceptor的方式来添加Cache,但是可能某些业务场景下还是想用自己的Cache,那么可以采用下面类似的封装。
/** * 获取内容 * @param fragment * @param param * @param cacheKey * @return */ public FlowablegetContent(Fragment fragment,ContentParam param,String cacheKey) { return apiService.loadVideoContent(param) .compose(RxLifecycle.bind(fragment). toLifecycleTransformer()) .compose(RxJavaUtils. flowableToMain()) .compose(RxCache. transformer(cacheKey,App.getInstance().cache)); }复制代码
4. 追踪RxJava的使用
初学者可能会对RxJava内部的数据流向会感到困惑,所以我写了一个类用于追踪RxJava的使用,对于调试代码还蛮有帮助的。
先来看一个简单的例子
Observable.just("tony","cafei","aaron") .compose(RxTrace.logObservable("first",RxTrace.LOG_SUBSCRIBE|RxTrace.LOG_NEXT_DATA)) .subscribe(new Consumer () { @Override public void accept(@io.reactivex.annotations.NonNull String s) throws Exception { System.out.println("s="+s); } });复制代码
下图显示了上面代码中的数据流向。
然后,再刚才代码的基础上加一个map操作符,把小写的字符串都转换成大写。
Observable.just("tony","cafei","aaron") .compose(RxTrace.logObservable("first",RxTrace.LOG_SUBSCRIBE|RxTrace.LOG_NEXT_DATA)) .map(new Function () { @Override public String apply(@io.reactivex.annotations.NonNull String s) throws Exception { return s.toUpperCase(); } }) .compose(RxTrace. logObservable("second",RxTrace.LOG_NEXT_DATA)) .subscribe(new Consumer () { @Override public void accept(@io.reactivex.annotations.NonNull String s) throws Exception { System.out.println("s="+s); } });复制代码
看看这一次数据是怎样流向的,由于显示器不够大,其实截图还少了一点内容:(,但是能够看明白日志的展示。
最后,加上监测onComlete和OnTerminate
Observable.just("tony","cafei","aaron") .compose(RxTrace.logObservable("first",RxTrace.LOG_SUBSCRIBE|RxTrace.LOG_NEXT_DATA)) .map(new Function () { @Override public String apply(@io.reactivex.annotations.NonNull String s) throws Exception { return s.toUpperCase(); } }) .compose(RxTrace. logObservable("second",RxTrace.LOG_NEXT_DATA)) .compose(RxJavaUtils. observableToMain()) .compose(RxTrace. logObservable("third",RxTrace.LOG_COMPLETE|RxTrace.LOG_TERMINATE)) .subscribe(new Consumer () { @Override public void accept(@io.reactivex.annotations.NonNull String s) throws Exception { System.out.println("s="+s); } });复制代码
上面已经展示过的截图就不显示了,就展示最后的onComlete和OnTerminate。
最后,我已经把RxTrace的代码放到
为何不单独开一个repository呢?它只有一个类,我就懒得创建了:(总结
compose操作符和Transformer结合使用,一方面让代码看起来更加简洁化,另一方面能够提高代码的复用性。RxJava提倡链式调用,compose能够防止链式被打破。