博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Transformer 在RxJava中的使用
阅读量:6859 次
发布时间:2019-06-26

本文共 10354 字,大约阅读时间需要 34 分钟。

Transformer.jpeg

Transformer 用途

Transformer,顾名思义是转换器的意思。早在 RxJava1.x 版本就有了Observable.Transformer、Single.Transformer和Completable.Transformer,在2.x版本中变成了ObservableTransformer、SingleTransformer、CompletableTransformer、FlowableTransformer和MaybeTransformer。其中,FlowableTransformer和MaybeTransformer是新增的。由于 RxJava2 将Observable拆分成 Observable 和 Flowable,所以多了一个FlowableTransformer。同时,Maybe是 RxJava2 新增的一个类型,所以多了MaybeTransformer。

Transformer 能够将一个 Observable/Flowable/Single/Completable/Maybe 对象转换成另一个 Observable/Flowable/Single/Completable/Maybe 对象,和调用一系列的内联操作符是一模一样的。

举个简单的例子,写一个transformer()方法将一个发射整数的Observable转换为发射字符串的Observable。

public static 
ObservableTransformer
transformer() { return new ObservableTransformer
() { @Override public ObservableSource
apply(@NonNull Observable
upstream) { return upstream.map(new Function
() { @Override public java.lang.String apply(@NonNull Integer integer) throws Exception { return java.lang.String.valueOf(integer); } }); } }; }复制代码

接下来是使用transformer()方法,通过标准的RxJava的操作。

Observable.just(123,456)       .compose(transformer())       .subscribe(new Consumer
() { @Override public void accept(@io.reactivex.annotations.NonNull String s) throws Exception { System.out.println("s="+s); } });复制代码

最后打印了二次,分别是

s=123s=456复制代码

通过这个例子,可以简单和直观地了解到Transformer的作用。

其实,在大名鼎鼎的图片加载框架 Glide 以及 Picasso 中也有类似的transform概念,能够将图形进行变换。

跟compose操作符相结合

compose操作于整个数据流中,能够从数据流中得到原始的Observable/Flowable...

当创建Observable/Flowable...时,compose操作符会立即执行,而不像其他的操作符需要在onNext()调用后才执行。

关于compose操作符,老外的这篇文章不错

国内也有相应的翻译

常用的场景

1. 切换到主线程

对于网络请求,我们经常会做如下的操作来切换线程。

.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())复制代码

于是,我做了一个简单的封装。

import io.reactivex.FlowableTransformerimport io.reactivex.ObservableTransformerimport io.reactivex.android.schedulers.AndroidSchedulersimport io.reactivex.schedulers.Schedulers/** * Created by Tony Shen on 2017/7/13. */object RxJavaUtils {    @JvmStatic    fun  observableToMain():ObservableTransformer {        return ObservableTransformer{            upstream ->            upstream.subscribeOn(Schedulers.io())                    .observeOn(AndroidSchedulers.mainThread())        }    }    @JvmStatic    fun  flowableToMain(): FlowableTransformer {        return FlowableTransformer{            upstream ->            upstream.subscribeOn(Schedulers.io())                    .observeOn(AndroidSchedulers.mainThread())        }    }}复制代码

上面这段代码是Kotlin写的,为啥不用Java?个人习惯把一些工具类来用Kotlin来编写,而且使用lambda表达式也更为直观。

对于Flowable切换到主线程的操作,可以这样使用

.compose(RxJavaUtils.flowableToMain())复制代码

2. RxLifecycle中的LifecycleTransformer

trello出品的能够配合Android的生命周期,防止App内存泄漏,其中就使用了LifecycleTransformer。

知乎也做了一个类似的,能够做同样的事情。

在我的项目中也使用了知乎的RxLifecycle,根据个人的习惯和爱好,我对LifecycleTransformer稍微做了一些修改,将五个Transformer合并成了一个。

import org.reactivestreams.Publisher;import io.reactivex.Completable;import io.reactivex.CompletableSource;import io.reactivex.CompletableTransformer;import io.reactivex.Flowable;import io.reactivex.FlowableTransformer;import io.reactivex.Maybe;import io.reactivex.MaybeSource;import io.reactivex.MaybeTransformer;import io.reactivex.Observable;import io.reactivex.ObservableSource;import io.reactivex.ObservableTransformer;import io.reactivex.Single;import io.reactivex.SingleSource;import io.reactivex.SingleTransformer;import io.reactivex.annotations.NonNull;import io.reactivex.functions.Function;import io.reactivex.functions.Predicate;import io.reactivex.processors.BehaviorProcessor;/** * Created by Tony Shen on 2017/5/25. */public class LifecycleTransformer
implements ObservableTransformer
, FlowableTransformer
, SingleTransformer
, MaybeTransformer
, CompletableTransformer { private final BehaviorProcessor
lifecycleBehavior; private LifecycleTransformer() throws IllegalAccessException { throw new IllegalAccessException(); } public LifecycleTransformer(@NonNull BehaviorProcessor
lifecycleBehavior) { this.lifecycleBehavior = lifecycleBehavior; } @Override public CompletableSource apply(Completable upstream) { return upstream.ambWith( lifecycleBehavior.filter(new Predicate
() { @Override public boolean test(@LifecyclePublisher.Event Integer event) throws Exception { return event == LifecyclePublisher.ON_DESTROY_VIEW || event == LifecyclePublisher.ON_DESTROY || event == LifecyclePublisher.ON_DETACH; } }).take(1).flatMapCompletable(new Function
() { @Override public Completable apply(Integer flowable) throws Exception { return Completable.complete(); } }) ); } @Override public Publisher
apply(final Flowable
upstream) { return upstream.takeUntil( lifecycleBehavior.skipWhile(new Predicate
() { @Override public boolean test(@LifecyclePublisher.Event Integer event) throws Exception { return event != LifecyclePublisher.ON_DESTROY_VIEW && event != LifecyclePublisher.ON_DESTROY && event != LifecyclePublisher.ON_DETACH; } }) ); } @Override public MaybeSource
apply(Maybe
upstream) { return upstream.takeUntil( lifecycleBehavior.skipWhile(new Predicate
() { @Override public boolean test(@LifecyclePublisher.Event Integer event) throws Exception { return event != LifecyclePublisher.ON_DESTROY_VIEW && event != LifecyclePublisher.ON_DESTROY && event != LifecyclePublisher.ON_DETACH; } }) ); } @Override public ObservableSource
apply(Observable
upstream) { return upstream.takeUntil( lifecycleBehavior.skipWhile(new Predicate
() { @Override public boolean test(@LifecyclePublisher.Event Integer event) throws Exception { return event != LifecyclePublisher.ON_DESTROY_VIEW && event != LifecyclePublisher.ON_DESTROY && event != LifecyclePublisher.ON_DETACH; } }).toObservable() ); } @Override public SingleSource
apply(Single
upstream) { return upstream.takeUntil( lifecycleBehavior.skipWhile(new Predicate
() { @Override public boolean test(@LifecyclePublisher.Event Integer event) throws Exception { return event != LifecyclePublisher.ON_DESTROY_VIEW && event != LifecyclePublisher.ON_DESTROY && event != LifecyclePublisher.ON_DETACH; } }) ); }}复制代码

3. 缓存的使用

对于缓存,我们大致都会这样写

cache.put(key,value);复制代码

更优雅一点的做法是使用AOP,大致会这样写

@Cacheable(key = "...")getValue() {    ....}复制代码

如果你想在RxJava的链式调用中也使用缓存,还可以考虑使用transformer的方式,下面我写了一个简单的方法

/** * Created by Tony Shen on 2017/7/13. */public class RxCache {    public static 
FlowableTransformer
transformer(final String key, final Cache cache) { return new FlowableTransformer
() { @Override public Publisher
apply(@NonNull Flowable
upstream) { return upstream.map(new Function
() { @Override public T apply(@NonNull T t) throws Exception { cache.put(key,(Serializable) t); return t; } }); } }; }}复制代码

结合上述三种使用场景,封装了一个方法用于获取内容,在这里网络框架使用Retrofit。虽然Retrofit本身支持通过Interceptor的方式来添加Cache,但是可能某些业务场景下还是想用自己的Cache,那么可以采用下面类似的封装。

/**     * 获取内容     * @param fragment     * @param param     * @param cacheKey     * @return     */    public Flowable
getContent(Fragment fragment,ContentParam param,String cacheKey) { return apiService.loadVideoContent(param) .compose(RxLifecycle.bind(fragment).
toLifecycleTransformer()) .compose(RxJavaUtils.
flowableToMain()) .compose(RxCache.
transformer(cacheKey,App.getInstance().cache)); }复制代码

4. 追踪RxJava的使用

初学者可能会对RxJava内部的数据流向会感到困惑,所以我写了一个类用于追踪RxJava的使用,对于调试代码还蛮有帮助的。

先来看一个简单的例子

Observable.just("tony","cafei","aaron")                .compose(RxTrace.
logObservable("first",RxTrace.LOG_SUBSCRIBE|RxTrace.LOG_NEXT_DATA)) .subscribe(new Consumer
() { @Override public void accept(@io.reactivex.annotations.NonNull String s) throws Exception { System.out.println("s="+s); } });复制代码

下图显示了上面代码中的数据流向。

第一次做Trace.png

然后,再刚才代码的基础上加一个map操作符,把小写的字符串都转换成大写。

Observable.just("tony","cafei","aaron")                .compose(RxTrace.
logObservable("first",RxTrace.LOG_SUBSCRIBE|RxTrace.LOG_NEXT_DATA)) .map(new Function
() { @Override public String apply(@io.reactivex.annotations.NonNull String s) throws Exception { return s.toUpperCase(); } }) .compose(RxTrace.
logObservable("second",RxTrace.LOG_NEXT_DATA)) .subscribe(new Consumer
() { @Override public void accept(@io.reactivex.annotations.NonNull String s) throws Exception { System.out.println("s="+s); } });复制代码

看看这一次数据是怎样流向的,由于显示器不够大,其实截图还少了一点内容:(,但是能够看明白日志的展示。

第二次做Trace.png

最后,加上监测onComlete和OnTerminate

Observable.just("tony","cafei","aaron")                .compose(RxTrace.
logObservable("first",RxTrace.LOG_SUBSCRIBE|RxTrace.LOG_NEXT_DATA)) .map(new Function
() { @Override public String apply(@io.reactivex.annotations.NonNull String s) throws Exception { return s.toUpperCase(); } }) .compose(RxTrace.
logObservable("second",RxTrace.LOG_NEXT_DATA)) .compose(RxJavaUtils.
observableToMain()) .compose(RxTrace.
logObservable("third",RxTrace.LOG_COMPLETE|RxTrace.LOG_TERMINATE)) .subscribe(new Consumer
() { @Override public void accept(@io.reactivex.annotations.NonNull String s) throws Exception { System.out.println("s="+s); } });复制代码

上面已经展示过的截图就不显示了,就展示最后的onComlete和OnTerminate。

第三次做Trace.png

最后,我已经把RxTrace的代码放到

为何不单独开一个repository呢?它只有一个类,我就懒得创建了:(

总结

compose操作符和Transformer结合使用,一方面让代码看起来更加简洁化,另一方面能够提高代码的复用性。RxJava提倡链式调用,compose能够防止链式被打破。

转载地址:http://ejxyl.baihongyu.com/

你可能感兴趣的文章
使用Nginx搭建Tomcat9集群,Redis实现Session共享
查看>>
Extjs4.1 序列化和反序列化
查看>>
git 最常用命令
查看>>
iOS self 和 super 学习
查看>>
利用deadline_timer实现定时器Timer
查看>>
分布式日志收集系统:Facebook Scribe
查看>>
数据挖掘的方法有哪些?-转
查看>>
js的闭包的一个示例说明
查看>>
ARCGIS10如何修改图例的大小
查看>>
bin/sh failed with exit code 1
查看>>
Novell推出针对SAP所有应用而优化Linux平台
查看>>
《梦幻西游》打响反盗号战役:为2亿玩家提供360安全武器
查看>>
Silverlight面向客户端,HTML5面向Web
查看>>
微软拟向互联网开发商提供免费IIS 服务器
查看>>
seajs和requirejs对比;node初识
查看>>
JS函数
查看>>
(转)linux下vi命令修改文件及保存的使用方法
查看>>
循环中else的用法
查看>>
Reverse String
查看>>
linux安装ffmpeg
查看>>