欢迎来到皮皮网网首页

【net 源码查看】【一影随行源码】【vue评论回复源码】rxjava源码分析

来源:前端地球源码 时间:2025-01-11 19:29:19

1.Android 面试知识点记录——Rxjava与协程
2.RxJava3原理解析
3.okhttp,源码retrofit,android-async-http,volley应该选择哪一
4.深入理解 RxJava2:Scheduler(2)
5.我的RxJava源码解读笔记
6.Hystrix介绍

rxjava源码分析

Android 面试知识点记录——Rxjava与协程

       作者:浪人笔记

       RxJava协程与操作符

       在RxJava中,map操作符实现简单,分析它在原有Observable的源码基础上添加MapObservable观察者,将变换函数作为参数传递。分析MapObservable的源码onNext方法接收元素,传递给变换函数进行变换,分析net 源码查看结果作为新元素发射。源码

       而flatMap操作符实现相对复杂。分析它在原有Observable上添加FlatMapObservable观察者,源码并将变换函数作为参数传递。分析FlatMapObservable的源码onNext方法接收元素,传递给变换函数得到新Observable,分析注册到FlatMapSubscriber中等待下一次数据。源码数据处理完成后,分析调用FlatMapSubscriber的源码onComplete方法,合并所有得到的Observable成新Observable发射。

       RxJava1.0与2.0区别

       RxJava2.0在异常处理、背压支持、线程调度和性能等方面改进和提升。

       背压概念与解决方案

       背压是指数据产生速度大于消费速度,导致内存溢出等问题。RxJava1.x没有背压支持,使用onBackpressureBuffer、onBackpressureDrop缓解,而RxJava2.0引入Flowable支持背压,一影随行源码提供更多背压控制策略。

       Flowable类型支持背压,通过onBackpressureBuffer、onBackpressureDrop、onBackpressureLatest等方法处理背压问题。Flowable方式类似Observable,使用时需指定背压策略。

       subscribeOn与observeOn

       subscribeOn只影响第一次指定的线程,后续操作不影响。subscribeOn理解为管道入口,observeOn理解为出口,数据处理后出口策略可变。

       RxJava数据流回主线程

       使用observeOn(AndroidSchedulers.mainThread()),内部实现为Handler(Looper.getMainLooper())。

       协程、进程、线程、协程区别

       Kotlin协程基于挂起函数实现,不依赖操作系统和编译器,实现协程效果。

       处理回调地狱与协程

       协程通过挂起函数减少回调嵌套,提高代码可读性。使用withContext指定协程执行上下文,避免主线程阻塞。vue评论回复源码

       开发中选择调度器

       通常选择主线程、IO密集型和CPU密集型调度器,对应Kotlin协程中的上下文。

       Android核心知识点记录

       包含性能优化、车载技术、Framework底层原理、音视频开发、Jetpack全家桶、Kotlin、Gradle、OkHttp源码解析、Flutter等主题,涵盖Android开发者必备知识。

       Android面试题集锦

       整理多年面试题集,包括Android基础知识、性能优化、音视频、Jetpack全家桶、Kotlin、Gradle、OkHttp源码解析等内容,为开发者提供面试准备。

       Android音视频面试题集

       聚焦于Android音视频开发的关键问题,帮助开发者深入理解音视频技术。个股排序的源码

RxJava3原理解析

       RxJava3是一个用于构建异步和基于事件的程序的强大工具,官方定义为Java VM上的可观测序列库。本文以3.0.版本的源码为例,从基础使用开始,讲解如何与Retrofit结合,实现网络请求的链式操作。

       首先,我们通过一个简单的示例来演示如何构建Retrofit实例,定义API并发起网络请求,从而利用RxJava的链式操作。

       接着,我们从基础的just操作符开始理解订阅关系。Single.just(1)创建了一个SingleJust实例,RxJava的订阅过程主要由subscribeActual方法控制。SingleJust在实际订阅时,直接回调观察者的onSubscribe和onSuccess,没有错误处理,因为数据不包含失败状态。

       然后,我们探讨map操作符,它用于数据转换。map的实现是通过构建SingleMap,其订阅过程与just类似,只是指尖q将源码将上游的数据通过map操作进行转换后再传递给下游的观察者。

       框架结构方面,RxJava以操作符(如map)为核心,它们通过dispose方法来控制工作流程。dispose有多种情况,理解这些情况有助于更好地控制程序的执行。

       对于无后续操作的Single.just,如无延迟,dispose操作相对简单,因为任务很快完成。而Observable.interval和Single.delay则涉及后续任务和延迟,它们通过Disposable和调度器管理任务的执行和取消。

       线程切换是RxJava的关键功能,subscribeOn和observeOn分别用于指定操作的线程。例如,SingleSubscribeOn用于指定订阅操作的线程,而ObserveOnSingleObserver则在指定线程中执行观察者的方法。

       最后,Scheduler是控制线程执行的关键,如Schedulers.newThread、Schedulers.io和AndroidSchedulers.mainThread各有其用途。RxJava的这些核心特性使得它在Android开发中广泛应用,特别是处理异步操作和线程切换。

okputation 和 io。

       NewThreadWorker 在 computation、io 和 newThread 中都有涉及,下面简单了解一下这个类。NewThreadWorker 与 ScheduledThreadPoolExecutor 之间是一对一的关系,在构造函数中通过工厂方法创建一个 corePoolSize 为 1 的 ScheduledThreadPoolExecutor 对象并持有。

       ScheduledThreadPoolExecutor 从 JDK1.5 开始存在,这个类继承于 ThreadPoolExecutor,支持立即、延时和周期性任务。但是注意,在 ScheduledThreadPoolExecutor 中,maximumPoolSize 参数是无效的,corePoolSize 表示最大线程数,且它的队列是无界的。这里不再深入探讨该类,否则会涉及太多内容。

       有了这个类,RxJava2 在实现 Worker 时就站在了巨人的肩膀上,线程调度可以直接使用该类解决,唯一的麻烦之处就是封装一层 Disposable 的逻辑。

       ComputationScheduler 是计算密集型的 Scheduler,其线程数与 CPU 核心数密切相关。当线程数远超过 CPU 核心数目时,CPU 的时间更多地损耗在了线程的上下文切换。因此,保持最大线程数与 CPU 核心数一致是比较通用的方式。

       FixedSchedulerPool 可以看作是固定数量的真正 Worker 的缓存池。确定了 MAX_THREADS 后,在 ComputationScheduler 的构造函数中会创建 FixedSchedulerPool 对象,FixedSchedulerPool 内部会直接创建一个长度为 MAX_THREADS 的 PoolWorker 数组。PoolWorker 继承自 NewThreadWorker,但没有任何额外的代码。

       PoolWorker 的使用方法是从池子里取一个 PoolWorker 并返回。但是需要注意,每个 Worker 是独立的,每个 Worker 内部的任务是绑定在这个 Worker 中的。如果按照上述方法暴露 PoolWorker,会出现两个问题:

       为了解决上述问题,需要在 PoolWorker 外再包一层 EventLoopWorker。EventLoopWorker 是一个代理对象,它会将 Runnable 代理给 FixedSchedulerPool 中取到的 PoolWorker 来调度,并负责管理通过它创建的任务。当自身被取消时,会将创建的任务全部取消。

       与 ComputationScheduler 恰恰相反,IoScheduler 的线程数是无上限的。这是因为 IO 设备的速度远低于 CPU 速度,在等待 IO 操作时,CPU 往往是闲置的。因此,应该创建更多的线程让 CPU 尽可能地利用。当然,并不是线程越多越好,线程数目膨胀到一定程度会影响 CPU 的效率,也会消耗大量的内存。在 IoScheduler 中,每个 Worker 在空置一段时间后就会被清除以控制线程的数目。

       CachedWorkerPool 是一个变长并定期清理的 ThreadWorker 的缓存池,内部通过一个 ConcurrentLinkedQueue 维护。和 PoolWorker 类似,ThreadWorker 也是继承自 NewThreadWorker。仅仅是增加了一个 expirationTime 字段,用来标识这个 ThreadWorker 的超时时间。

       在 CachedWorkerPool 初始化时,会传入 Worker 的超时时间,目前是写死的 秒。这个超时时间表示 ThreadWorker 闲置后最大存活时间(实际中不保证 秒时被回收)。

       IoScheduler 中也存在一个 EventLoopWorker 类,它和 ComputationScheduler 中的作用类似。因为 CachedWorkerPool 是每隔 秒清理一次队列的,所以 ThreadWorker 的存活时间取决于入队的时机。如果一直没有被再次取出,其被实际清理的延迟在 - 秒之间。

       熟悉线程的读者会发现,ComputationScheduler 与 IoScheduler 很像某些参数下的 ThreadPoolExecutor。它们对线程的控制外在表现很相似,但实际的线程执行对象不一样。这两者的对比有助于我们更深刻地理解 Scheduler 设计的内在逻辑。

       Scheduler 是 RxJava 线程的核心概念,RxJava 基于此屏蔽了 Thread 相关的概念,只与 Scheduler/Worker/Runnable 打交道。

       本来计划继续基于 Scheduler 和大家一起探讨 subscribeOn 与 observeOn,但考虑到篇幅问题,这些留待下篇分享。

       感谢大家的阅读,欢迎关注笔者的公众号,可以第一时间获取更新,同时欢迎留言沟通。

我的RxJava源码解读笔记

       RxJava是一个用于处理异步任务的库,其主要功能包括观察者模式、数据发送与接收、切换线程、数据变换等。在学习RxJava源码时,梳理了其工作流程,包括创建Observable、创建观察者(使用Subscriber)、订阅(使用subscribe方法)、变换操作(如map、compose)、线程切换(通过subscribeOn和observeOn方法)等关键步骤。从源码角度深入理解了RxJava的工作原理,如Observable的创建、Subscriber的实现、OnSubscribe的作用、Subscription的生命周期管理、变换操作的具体实现以及线程控制机制。通过分析RxJava的源码,不仅加强了记忆,也为实际应用提供了清晰的指导。RxJava通过观察者模式实现了数据的高效异步处理,支持在线程间灵活切换,通过变换操作符实现了数据的转换,是处理异步编程和事件流的理想工具。

Hystrix介绍

        对Hystrix耳闻已久,最近刚好想在项目中使用这个神器就顺带研究了一把,很多细节来不及深入研究只能把宏观上的各个概念讲解一下,这个介绍的素材大都来自github上的Hystrix官网。

         所谓一图胜千言,但凡能够用图片来表示而且能够表示清楚的,就不多用文字描述了,看图肯定比看文字要让人来的更爽一些。当然我还是非常建议去github上的Hystrix官方wiki去看原汁原味的文档,在参考文献部分已经给出了链接。

         最后提一点,就是在Hystrix的实现当中大量使用了RxJava的开源包的技术,这个技术之前没怎么研究过,所以后面的很多源码的分析更多侧重过程分析而不会深入细节,有兴趣的可以自己深入研究下,我就准备哪天得空好好去研究一下,毕竟RxJava这个东西号称是一个通过使用可观察序列来编写异步和基于事件的程序的库。

        hystrix的出现即为解决雪崩效应,它通过四个方面的机制来解决这个问题

        Hystrix的隔离主要是为每个依赖组件提供一个隔离的线程环境,提供两种模式的隔离:

        Hystrix的熔断器其实可以理解为就是一个统计中心,统计一定时间窗口内访问次数,成功次数,失败次数等数值判定是否发生熔断。发生电路熔断的过程如下:

       hystrix工作原理-英文版

        hystrix工作原理-中文版

        关于RxJava的详解