2025年写给Rikka自己的RxJava2说明书

写给Rikka自己的RxJava2说明书目录 为了便于学习和检索 这里列出一个目录 1 前言 2 两句话概括 RxJava 3 一个 RxJava2 Retrofti 的示例 4 Single 是啥 有什么用 5 just 源码 6 map 操作符 7 onSubscribe Disponsable disposable 8 Disposable 9

大家好,我是讯享网,很高兴认识大家。

目录

为了便于学习和检索,这里列出一个目录:

1. 前言
2. 两句话概括RxJava
3. 一个RxJava2+Retrofti的示例
4. Single是啥?有什么用
5. just()源码
6. map操作符
7. onSubscribe(Disponsable disposable)
8. Disposable
9. subscribeOn
10. observerOn
11. Schedulers
12. 总结

前言

今天主要从实际开发出发,比如Retrofit+RxJava2的例子,然后从例子中去剖析RxJava的用法、特点。
尽量写的比较简单好懂(本来本篇blog讲的就不深,如果还写的天马行空,我自己都看不懂…)
至于源码那就随缘了,之前读过,有一个大致的了解,这篇blog应该还会继续去追究源码,多读一次更加深印象一次

两句话概括Rxjava

我用自己目前浅薄的知识来概括Rxjava,便于后面的学习:

  1. RxJava是响应式编程模型。(你就把它和 屏幕的点击事件:点一下,产生一个回调 一模一样)
  2. RxJava的用法和特点和优势都在在于异步的处理(就是线程调度) (所以RxJava和网络请求(Retrofit)有天作之合,因为网络请求都是要放在别的线程(就是后台)去做的,回调的结果又要返回到主线程(前台)中来,这种线程的切换其实很麻烦,所以网络线程上的调度就是RxJava发挥最多的地方)

一个RxJava2+Retrofit2的示例

PS:这里我没有找到一个很好的Api(那种要填post的Path的),只能拿到一个GET请求的API(= =!一堆网上的都过期了,我又不想申请,懒得找了淦,不过其实用法都大同小异,认真想一想的话POST其实很简单的)

按序号分步走:
1、导入RxJava、Gson、Retorfit包并申请权限
大概就是下面这样(版本可能老了一丢丢,但这是学习版,都是2.0,所以问题不大)

 api 'com.squareup.okhttp3:okhttp:3.8.0' api 'com.squareup.retrofit2:retrofit:2.3.0' api 'com.squareup.retrofit2:converter-gson:2.3.0' api 'com.squareup.retrofit2:adapter-rxjava2:2.3.0' api 'io.reactivex.rxjava2:rxjava:2.0.7' api 'io.reactivex.rxjava2:rxandroid:2.0.1' api 'com.jakewharton.rxbinding2:rxbinding:2.0.0' api 'com.google.code.gson:gson:2.8.5' 

讯享网

然后在Manifest中申请网络权限,不然申请网络权限会 permission denied

讯享网 <uses-permission android:name="android.permission.INTERNET"/> 

2、写一个api网络申请接口类、根据接口的回调写JavaBean
我们一般会把Retrofit网络申请放在一个接口类中,因为很多时候我们不止申请一个请求,而且解耦和封装更加符合编程规范(←重点)
我这里找的接口是 : https://www.bilibili.com/widget/getSearchDefaultWords#
这个接口是显示bilibili首页搜索栏的默认搜索关键字,如下:
在这里插入图片描述
讯享网
纳尼?jojo?那我先进去循环十遍先。。。
。。。
回来后,在Api中写下下面的接口类:

public interface Api { 
    @GET("widget/getSearchDefaultWords#") Single<List<Repo>> getSearchDefault(); } 

对Retrofit2不熟悉的人可能看不懂,但我这里也就简单的讲明一下用法,它有两行,一行注解,一行接口函数。
第一行很简单,就是做一个拼凑,把url拼凑成一个请求头
Android 深入Http(1)HTTP原理和机制
↑我的这篇文章里 讲过我们在浏览器 输入一个网址的时候,http首先做的事情就是把 一个URL 转换成 一个Http Request

那么@GET("widget/getSearchDefaultWords#")的含义就是将 widget/getSearchDefaultWords# 这个api的path, 拼接成:
GET /widget/getSearchDefaultWords# HTTP/1.1这么一个Http Request的请求行。

Single<List<Repo>> getSearchDefault();这一行则是Api的方法,我们通过调用这个 getSearchDefault() 的方法,就能返回一个 Single<List<Repo>>的返回体。
Single是什么后面会讲(当然也可以换成Observer),然后其里面是 Repo的List,因为我们申请接口的返回是存放在在一个List/Array中的,所以我们要用List去装这些Repo(等下会知道)。

而Repo就是返回的结果啦,我们一开始怎么知道返回的结果是什么?所以我们要去Postman或者能测试接口的软件,去看看这个接口返回的数据是什么。
在这里插入图片描述
那么上面的Body就是我们的返回Response,从大括号和中括号也可以看出来,我们这些Repo用中括号阔起来放在大括号里面,就说明大括号是个数组List,而里面的中括号则是这个List的一个元素,这也是为什么我们前面要用List来放Repo。

我们复制全部的Body,到Android Studio中新建一个类 Repo,然后键盘敲ALT+INS,然后选择GsonFormat,在转换框中粘贴进去,按下回车,就会变成下面这样:
在这里插入图片描述
这个插件就是把JSON格式转换成一个JavaBean的神器!(如果没有该插件的话可以去百度查查怎么在Android Studio下载,就几步,很快捷的)

这样的话,我们的api就算是封装好了。

3、构建Retrofit,并使用RxJava进行网络请求
最后一步来了,就是进行网络请求。
我们先建立一个Retrofit对象,并且让他添加RxJava和Gson的适配工厂

讯享网 String url = "https://www.bilibili.com/"; Gson gson = new Gson(); Retrofit retrofit = new Retrofit.Builder() .baseUrl(url) //添加RxJava2的适配 .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) //添加Gson的转换 .addConverterFactory(GsonConverterFactory.create(gson)) .build(); 

然后创建Api 的对象并且实现Api的方法:

 Api api = retrofit.create(Api.class); api.getSearchDefault() //将网络请求放在后台 .subscribeOn(Schedulers.io()) //将结果的返回放在前台 .observeOn(AndroidSchedulers.mainThread()) .subscribe(new SingleObserver<List<Repo>>() { 
    @Override public void onSubscribe(Disposable d) { 
    Log.d(TAG, "onSubscribe"); } @Override public void onSuccess(List<Repo> repo) { 
    Log.d(TAG, repo.get(0).toString()); } @Override public void onError(Throwable e) { 
    Log.e(TAG, e.getMessage()); } }); 

其中

讯享网 .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) 

这两个后面在解析,是有关于线程调度的,而且一定要加这两行,不让Retrofit是不会把网络请求放在后台的…那样的话网络请求可能会阻塞,一定会报错。

// subscirbe == enqueue .subscribe(new SingleObserver<List<Repo>>() { 
    ... }); 

至于上面这个 subscribe方法其实就相当于Okhttp的 enqueue方法,让这个网络去执行!
然后参数就是网络的回调,有 onSubscribeonSuccessonError方法,分别代表执行、网络请求成功并返回结果,网络请求失败。我们在success中打印结果。

接下来我们来运行程序,得到下面的结果:

在这里插入图片描述
Ok,看来请求成功辽。
从上面的图中可以分析一下:从onSubsrcibe到onSuccess,用了1s多的时间,这就是一个网络请求的时间。那么onSubscribe就是程序一开始执行所会回调的程序。

那我们就来仔细的去了解其中的几个方法,首先最开始的Single的介绍开始

Single是啥?有什么用

为了更好的介绍Single,我们可以来写一下Single的一个简单的示例:

讯享网 Single single = Single.just("1"); single.subscribe(new SingleObserver<String>() { 
    @Override public void onSubscribe(Disposable d) { 
    } @Override public void onSuccess(String s) { 
    Log.d(TAG, s); } @Override public void onError(Throwable e) { 
    } }); 

在这里,我们要引入 订阅者观察者 两个概念:

  • 订阅者/被观察者
    就是接收消息的一方,订阅事件的一端,在有事件传过来的时候,可以得到这些事件。然后可以对事件进行处理。
    在本例中,订阅者就是subscribe() 括号里面的东西,就是那个 SingleObserver 对象
    它所做的三个方法 ,就是对 发送方发送的 Disaponable、String、Throwable进行处理。
  • 观察者
    就是发送消息的一方,发送事件的一端,当执行完任务后,会得到一份结果,但自己不能处理,只能给订阅者处理,自己看着他处理(我自己烤了个鸡腿,但是我只能递给你,眼巴巴的看着你吃QAQ)
    所以这也是为什么:发送事件的一方叫做观察者,接收事件的一方又叫做被观察者
    在本例中,观察者就是Single single,它使用just()方法向 被观察者 发送了一个字符 “1”,然后使用了subscribe()方法来观察 被观察者 怎么处理这个“1”。

我们接下来来分析一下just() 是具体干了什么,从源码的角度分析。(这里开始刚源码啦)

just()源码

我们来追踪一下just()的源码:

 @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Single<T> just(final T item) { 
    //这一行是判空操作 ObjectHelper.requireNonNull(item, "value is null"); return RxJavaPlugins.onAssembly(new SingleJust<T>(item)); } 

那个return句子有两个东西, 一个是 onAssembly()的东西,另一个是它传入的SingleJust对象,,Assembly就是集合的意思,这说明它会对括号里的参数进行一个“装箱”。我们来看看onAssembly()方法:

讯享网 @NonNull public static <T> Single<T> onAssembly(@NonNull Single<T> source) { 
    //Fuction是一个转换器,把传入的对象 转换成一个 要处理的 传出的对象 //onSingleAssembly是一个转换后的结果..我们一般用不上这个参数 Function<? super Single, ? extends Single> f = onSingleAssembly; if (f != null) { 
    return apply(f, source); } return source; } 

这句话就是把传入的对象进行一个加工处理。
然后就出来了,也就是说,return语句的关键在于另一条SingleJust这个对象。

我们打开了SingleJust的类,把焦点聚集在下面这个方法上

 //SingleJsust 重写了下面方法 @Override protected void subscribeActual(SingleObserver<? super T> s) { 
    //woc!这不就是外面 subscribe调用的方法咩?它们绝壁有关联的 s.onSubscribe(Disposables.disposed()); s.onSuccess(value); } 

可以这么说,Single通过just() 方法创建了一个SingleJust的对象。
然后SingleJust里面有 subscribeActual() 方法,它会调用参数的onSubscribe()和onSuccess()方法,而这就和Single调用的subscribe()方法一样,有一个大胆的猜测就是我们调用 Single的 subscribe() 其实就是调用 SingleJust的 subscribeActual()方法。
抱着这个猜测,我们去查看subscrible()的源码:
在这里插入图片描述
第一、二、三行分别是判空和包装,不看!
try/catch只看try语句,所以到最后,它就是调用了 subscribeActual()这个方法!

那么在这里就可以总结下了:

  • Single通过just()来构造了一个对象,这个对象使用subscrible()方法的执行其实就使用了SingleJust的 subscribeActual()方法。他直接调用了onSubscribe啊onSuccess!
  • 所以我们 重写了 subscribe(),等于我们定制了 subscribleActual()方法

这就是 subscrible()的作用,它会直接调用我们重写的SingObserver的东西。
所以 当我们在网络请求搞完了,retrofit得到了一个response之后,我门就是拿着它去走 subscrible()方法的回调的~

接下来,我们先不讲解线程调度,也不讲回调方法,我们来看一下RxJava一个特有的东西:操作符

map操作符

map()方法的作用是:转换数据类型
比如说我得到的网络申请结果是一个Integer,但是我要发给 订阅者 的是String,那么我在每次发送之前就要把Integer转换成String,就要在map()里面做:

讯享网 Single single = Single.just(1); single.map(new Function<Integer, String>() { 
    @Override public String apply(Integer integer) throws Exception { 
    return String.valueOf(integer); } }); 

我们传入了一个1,然后在使用map()去构造Function然后重写了apply方法,最后在apply方法中,我们就是做类型的装换哒。

以上就是map操作符的用法,看起来是不是比较简单呢?但是如果你想深入一点研究这个map,会发现它还是有点用法的规矩的。

首先map在链式构造里面不等于建造模式,因为它不是返回this,比如:

 single.map(new Function<Integer, String>() { 
    .... }).subscribe(new SingleObserver() { 
    ... }); 

这是因为使用map后返回的对象已经不同了= =。我们来看看map()方法:
在这里插入图片描述
是通过传入的function来转换成一个 SingleMap对象。
然后返回的值和之前的 SingleJust都不同啦!结构都变了…后面如果使用了subscrible(),那调用的就不是SingleJust的subscribeActual()了,而是调用SingleMap()的subscribeActual()了,而这个方法的具体实现如下:

讯享网 @Override protected void subscribeActual(final SingleObserver<? super R> t) { 
    //帮SingleObserver去做网络请求了,所以map在中间相当于做了一个代理 source.subscribe(new MapSingleObserver<T, R>(t, mapper)); } 

实际上,SingleMapsubscribeActual()方法里面会创建一个MapSingleObserver,来调用Observer的那三个方法:
在这里插入图片描述
有两个参数:

  • SingleObserver t
    我们自己重写的SingleObserver(),里面有我们自己定制的onSuccess()、onSubscribe()、onError()
  • Funcation mapper
    这个是我们在map中重写的 apply()方法

在往下面看,发现MapSingleObserver也会走那三个方法,而更重要的是在 onSuccess()方法中对结果 value值进行了apply()的处理,然后返回这个结果。

到这里我们就大概知道map所做的事情了,我们可以用下面几幅图来描述一下map所做的事情:
①:当没有使用map的时候,就是调用SingleJust的subscribe()方法。
在这里插入图片描述
②:当我们使用了map操作符后
在这里插入图片描述
map就是做了个代理,我们使用了map的subscribeActual(),它会帮我们去做后台请求,又能把结果转换目标类型,这样看的话其实很简单。

了解了map这个操作符,我们就来了解一下 Disponsable这个对象,但是由于它出现在onSubscribe()里面,所以我们先看一下onSubscribe()这个方法是干什么的吧~

onSubscribe(Disponsable disposable)

我们在 onSubscribe(Disponsable disposable) 看到里面有个参数:Disponsable
它是做什么的呢?

用来取消订阅
Rxjava可以让订阅者和被订阅者断开,之后的内容不再发送;发消息的人也可以停下来。
可以通过调用Dispinsable的方法来取消后续的发送。比如:

 @Override public void onSubscribe(Disposable d) { 
    d.dispose(); Log.d(TAG, "onSubscribe"); } 

那它在执行完onSubscribe后,就不会再走 onSuccess() 或者 onError方法了。

那么Disponsable用途是干嘛的捏?
一般是用来防止内存、流量泄露。比如我(被观察者)已经拿完了并且处理完应该要做完的事件,但是这个时候观察者还在后台的不断的申请资源和发送事件,那一到我这个主线程来也会进入onSuccess()方法,我是不知道的,所以会多出很多不必要的东西来占用内存。
so,通过disposable来断开观察者和被观察者的连接。

Ok,Disposable的用处讲完了,那么我们就来从源码的角度分析Disposable吧。

Disposable

它是一个接口:

讯享网public interface Disposable { 
    void dispose(); boolean isDisposed(); } 

这两个方法都特别好理解。用在最上层的Observerable 就实现dispose()了。
它在走onNext()会判断当前自己的disposable()里面的机制会不会让自己停止,如果是的话,那就不会再走onNext()了。

Dispose会从上一直往下传,每一层如果停掉了当前了Disponse,那它也会停掉了上一层的传递。就是这样。

subscribeOn

subscribeOn也是一个操作符,但它操作的不是数据,而是线程。它把数据、请求放到了后台线程去。
所以我们还是要去看它的subscribeActual()做了什么。

 @Override protected void subscribeActual(final SingleObserver<? super T> s) { 
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s, source); s.onSubscribe(parent); //通过schedule做线程切换 Disposable f = scheduler.scheduleDirect(parent); parent.task.replace(f); } 

parent是最上层的Observer,并把它包装成了一个Runnable对象。而它重写了run方法就是:

讯享网 @Override public void run() { 
    //把请求网上调了 source.subscribe(this); } 

就相当于:
在这里插入图片描述
scheduler还是使用了Executor去进行线程切换,而不直接使用Executor的原因是scheduler的场景在RxJava2更多。

从图中也可以看出,如果多次使用subscribeOn(),则最上层(即第一行)的subscribeOn()才会起作用
Disposable f = scheduler.scheduleDirect(parent) 中的f作为Dsponsable可以被我所控制,如果我们在外边调用了dispose(),则里面也会调用这个f的dispose()。

至于切线程的具体细节,这里就不多讲了。

ObserverOn

研究它和研究SubscribeOn()一样,也是看subscribeActual():

 @Override protected void subscribeActual(final SingleObserver<? super T> s) { 
    source.subscribe(new ObserveOnSingleObserver<T>(s, scheduler)); } 

它直接就把数据交往上层了,在数据的过去时没做任何操作:
在这里插入图片描述
所以它真正做的事情不在subscribeActual()里面。
因为ObserverOn是掌管数据回来的线程,所以用脚都能想到,它的处理,肯定是在数据回来,也就是Obseverable的onSuccess和onError()中:

讯享网 @Override public void onSuccess(T value) { 
    this.value = value; //scheduleDirect切换线程 Disposable d = scheduler.scheduleDirect(this); DisposableHelper.replace(this, d); } @Override public void onError(Throwable e) { 
    this.error = e; //scheduleDirect切换线程 Disposable d = scheduler.scheduleDirect(this); DisposableHelper.replace(this, d); } @Override public void run() { 
    Throwable ex = error; if (ex != null) { 
    actual.onError(ex); } else { 
    actual.onSuccess(value); } } 

因为onSuccess()中调用了scheduleDirect(),所以我们要去看run方法的实现。而run方法中,它就是执行了actual的回调方法。
它是这么走的:
在这里插入图片描述
所以从图中可以看出,如果多次使用ObserverOn,则最后一个使用ObserverOn()生效

那我们看到了它们且线程的流程,那我们来看下Schedulers具体做了什么事情。

Schedulers

 .subscribeOn(Schedulers.io()) .subscribeOn(Schedulers.newThread()) 

Schedulers.io()代表I/O线程,其实绝大多数的后台操作都是在这个线程上面做。
而和IO线程差不多的就是 newThread()线程。我们通过前面看到的 scheduleDirect()在做什么:

讯享网 @NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { 
    //Worker就是用Executor来做线程切换 //通过createWorker()来走创建一个Executor final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); w.schedule(new Runnable() { 
    @Override public void run() { 
    try { 
    decoratedRun.run(); } finally { 
    w.dispose(); } } }, delay, unit); return w; } 

Worker就是用executor来做线程切换,它来走run()方法。
io()和newThread()的区别是io是从Executor的线程中启用一个可用的线程来做事情。而newTreahd直接创建一个新线程,这两个其实都差不多。

切回主线程的方法:

.observeOn(AndroidSchedulers.mainThread()) 

点进去会到下面这里:
在这里插入图片描述
没错,看到了Handler,其实这里就是用Handler来做切换线程的。

到这里,其实框架就了解完了。

总结

RxJava就是异步,把请求数据调到想要的线程,把返回的数据返回到前台。

  • 通过subscribe(),最终会执行subscribeActual(xxObserverable)
  • 要把整个过程看成链式的,数据的传递、线程的切换会更加清晰
  • 操作符map和flatmap都是做返回体类型的转换
  • Disponsable的作用是切断观察者和被观察者的联系
  • subscribeOn和ObserverOn通过 Schedulers做线程切换,实质是通过Handler或Executor来切换

说实话,我连背压、FlatMap都没有讲,但是了解了RxJava的结构,这些还会难吗。

先这样,下次做RxJava估计是进行更加细致的总结。

小讯
上一篇 2025-02-14 07:20
下一篇 2025-01-05 09:52

相关推荐

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容,请联系我们,一经查实,本站将立刻删除。
如需转载请保留出处:https://51itzy.com/kjqy/63365.html