RxJava遇到的问题整理

编程中遇到的RxJava的错

1.Subject与toList()操作符

一个项目钟一个页面用PublistEvent发送消息(消息的内容是一个CalendarDay日期值),通知另一个页面,另一个页面在去数据库取数据,原来的代码是这样,但是打过log后,toList()后面的数据都没出来。

1
2
3
4
5
6
7
8
9
10
11
PublishEvent.SELECTION_DATE
.flatMap(calendarDay -> mPresenter.getHistoriesByDay(year, month, day)
.flatMap(histories -> Observable.from(histories)
.map(history -> new MineTrainDayItem(history))
.toList()
.compose(bindToLifecycle())
.compose(SchedulersCompat.applyComputationSchedulers())
.subscribe(mineTrainDayItems -> {
removeRangeItem(0, getItemCount());
addItem(mineTrainDayItems);
});

Goole一下,找到这个PublishSubject, 大意是,使用PublishSubject后调用toList()操作符 要用onCompelete才能走入其他输出。即onList()操作符是要在Observable.onComplete 之后才能走,不然会一直卡住。

修改成下面的代码:

1
2
3
4
5
6
7
8
9
10
11
ublishEvent.SELECTION_DATE
.flatMap(calendarDay -> mPresenter.getHistoriesByDay(year, month, day)
.flatMap(histories -> Observable.from(histories))
.map(history -> new MineTrainDayItem(history))
.toList())//放在内层做聚合
.compose(bindToLifecycle())
.compose(SchedulersCompat.applyComputationSchedulers())
.subscribe(mineTrainDayItems -> {
removeRangeItem(0, getItemCount());
addItem(mineTrainDayItems);
});

toList()放在最内层的Observable里面去做数据聚合,最外层就不用等待onComplete().这样就不会再外层的PublishSubject的Observable卡住。


2.Transform 与compose()操作符

看到Megear项目中很多代码都用到了compose()操作符,研究了一下compose()操作符的用法。

就目前来说compose主要是用来对一个Observable做整个变换,例如一个网络请求的过程,请求及数据处理都应该放在一个新线程中,而展示数据,要放到MainThread中,一般而言就平常做法就是subscribeOn()observeOn(),但是如果考虑到这样的请求有很多个,就要考虑到复用问题,不能每次都对每一个Observable对象都做一次subscribeOn()observeOn()操作,这样就不符合一个规范。

2.1Transform

Transformer实际上就是一个Func1<Observable<T>, Observable<R>>,换言之就是:可以通过它将一种类型的Observable转换成另一种类型的Observable,和调用一系列的内联操作符是一模一样的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//一般模式
<T> Transformer<T, T> applySchedulers() {
return new Transformer<T, T>() {
@Override
public Observable<T> call(Observable<T> observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
};
}
//lambda模式
<T> Transformer<T, T> applySchedulers() {
return observable -> observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}

compose()操作符一起用,就可以做到切换线程:

1
2
3
4
5
6
7
8
9
10
11
12
Observable.from(someSource)
.map(new Func1<Data, Data>() {
@Override public Data call(Data data) {
return manipulate(data);
}
})
.compose(this.<YourType>applySchedulers())
.subscribe(new Action1<Data>() {
@Override public void call(Data data) {
doSomething(data);
}
});

2.2重用Transform

上面代码每次都会new一个Transform,依然做不到重用,你可以创建一个实例化版本,节省不必要的实例化对象。毕竟,Transformers关乎着代码重用。

1
2
3
4
5
6
7
8
9
10
static final Observable.Transformer schedulersTransformer = new Observable.Transformer() {
@Override public Object call(Object observable) {
return ((Observable) observable).subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
}
};
<T> Observable.Transformer<T, T> applySchedulers() {
return (Observable.Transformer<T, T>) schedulersTransformer;
}