1.Vue Router 源码学习笔记4 - pushState和replaceState的工具实现
2.ForkjoinPool -1
3.RocketMQ 5.0: POP 消费模式 原理详解 & 源码解析
Vue Router 源码学习笔记4 - pushState和replaceState的实现
在Vue Router中,HTML5History的源码push和replace操作主要通过util/push-state.js中的相应函数来执行,它们依赖window.history.pushState和window.history.replaceState API。代码对于HTML5History,工具如果浏览器支持,源码就按照标准流程进行,代码波段投资指标源码即利用pushState或replaceState改变浏览器的工具历史记录,而不会导致页面刷新。源码
对于HashHistory,代码浏览器支持与否对操作方式有影响。工具若支持,源码同样采用类似方法,代码通过pushState设置hash部分,工具replaceState则调用window.location.replace替换当前URL。源码然而,代码如果浏览器不支持pushState,会直接操作window.location更改URL,金盾飘零源码以#符号为标志。
MDN文档中提到,pushState需要三个参数:状态对象、标题(通常忽略)和可选的URL。而replaceState与pushState类似,只是替换当前历史项,而非新增,尽管它会在浏览器历史中生成新的eclipse引用源码路径记录。
当路由更改后,紧接着是视图的同步更新。详细了解这两个方法的使用,可以参考MDN文档:developer.mozilla.org/zh-CN/docs/Web/API/History/pushState。
继续深入学习,确保在实际项目中正确运用这些原理,实现无缝的路由切换。
ForkjoinPool -1
ForkJoinæ¯ç¨äºå¹¶è¡æ§è¡ä»»å¡çæ¡æ¶ï¼ æ¯ä¸ä¸ªæ大任å¡åå²æè¥å¹²ä¸ªå°ä»»å¡ï¼æç»æ±æ»æ¯ä¸ªå°ä»»å¡ç»æåå¾å°å¤§ä»»å¡ç»æçæ¡æ¶ãForkå°±æ¯æä¸ä¸ªå¤§ä»»å¡åå为è¥å¹²åä»»å¡å¹¶è¡çæ§è¡ï¼Joinå°±æ¯å并è¿äºåä»»å¡çæ§è¡ç»æï¼æåå¾å°è¿ä¸ªå¤§ä»»å¡çç»æãä¸é¢æ¯ä¸ä¸ªæ¯ä¸ä¸ªç®åçJoin/Fork计ç®è¿ç¨ï¼å°1âæ°åç¸å
é常è¿æ ·ä¸ªæ¨¡åï¼ä½ 们ä¼æ³å°ä»ä¹ï¼
Release Framework ï¼ å¸¸è§çå¤ç模åæ¯ä»ä¹ï¼ task pool - worker poolç模åã ä½æ¯Forkjoinpool éåäºå®å ¨ä¸åç模åã
ForkJoinPoolä¸ç§ExecutorServiceçå®ç°ï¼è¿è¡ForkJoinTaskä»»å¡ãForkJoinPoolåºå«äºå ¶å®ExecutorServiceï¼ä¸»è¦æ¯å 为å®éç¨äºä¸ç§å·¥ä½çªå(work-stealing)çæºå¶ãææ被ForkJoinPool管çç线ç¨å°è¯çªåæ交å°æ± åéçä»»å¡æ¥æ§è¡ï¼æ§è¡ä¸åå¯äº§çåä»»å¡æ交å°æ± åä¸ã
ForkJoinPoolç»´æ¤äºä¸ä¸ªWorkQueueçæ°ç»(æ°ç»é¿åº¦æ¯2çæ´æ°æ¬¡æ¹ï¼èªå¨å¢é¿)ãæ¯ä¸ªworkQueueé½æä»»å¡éå(ForkJoinTaskçæ°ç»)ï¼å¹¶ä¸ç¨baseãtopæåä»»å¡éåéå°¾åé头ãwork-stealingæºå¶å°±æ¯å·¥ä½çº¿ç¨æ¨ä¸ªæ«æä»»å¡éåï¼å¦æéåä¸ä¸ºç©ºååéå°¾çä»»å¡å¹¶æ§è¡ã示æå¾å¦ä¸
æµç¨å¾ï¼
poolå±æ§
workQueuesæ¯poolçå±æ§ï¼å®æ¯WorkQueueç±»åçæ°ç»ãexternalPushåexternalSubmitæå建çworkQueue没æowner(å³ä¸æ¯worker)ï¼ä¸ä¼è¢«æ¾å°workQueuesçå¶æ°ä½ç½®ï¼ècreateWorkerå建çworkQueueï¼å³workerï¼æownerï¼ä¸ä¼è¢«æ¾å°workQueuesçå¥æ°ä½ç½®ã
WorkQueueçå 个éè¦æååé说æå¦ä¸ï¼
è¿æ¯WorkQueueçconfigï¼é«ä½è·poolçconfigå¼ä¿æä¸è´ï¼èä½ä½åæ¯workQueueå¨workQueuesæ°ç»çä½ç½®ã
ä»workQueueså±æ§çä»ç»ä¸ï¼æ们ç¥éï¼ä¸æ¯ææworkQueueé½æworkerï¼æ²¡æworkerçworkQueueç§°ä¸ºå ¬å ±éåï¼shared queueï¼ï¼configç第ä½å°±æ¯ç¨æ¥å¤ææ¯å¦æ¯å ¬å ±éåçãå¨externalSubmitå建工ä½éåæ¶ï¼æï¼
q.config = k | SHARED_QUEUE;
å ¶ä¸qæ¯æ°å建çworkQueueï¼kå°±æ¯qå¨workQueuesæ°ç»ä¸çä½ç½®ï¼SHARED_QUEUE=1<<ï¼æ³¨æè¿éconfig没æä¿çmodeçä¿¡æ¯ã
èå¨registerWorkerä¸ï¼åæ¯è¿æ ·ç»workQueueçconfigèµå¼çï¼
w.config = i | mode;
wæ¯æ°å建çworkQueueï¼iæ¯å ¶å¨workQueuesæ°ç»ä¸çä½ç½®ï¼æ²¡æ设置SHARED_QUEUEæ è®°ä½
scanStateæ¯workQueueçå±æ§ï¼æ¯intç±»åçãscanStateçä½ä½å¯ä»¥ç¨æ¥å®ä½å½åworkerå¤äºworkQueuesæ°ç»çåªä¸ªä½ç½®ãæ¯ä¸ªworkerå¨è¢«å建æ¶ä¼å¨å ¶æé å½æ°ä¸è°ç¨poolçregisterWorkerï¼èregisterWorkerä¼ç»scanStateèµä¸ä¸ªåå§å¼ï¼è¿ä¸ªå¼æ¯å¥æ°ï¼å 为workeræ¯ç±createWorkerå建ï¼å¹¶ä¼è¢«æ¾å°WorkQueuesçå¥æ°ä½ç½®ï¼ècreateWorkerå建workeræ¶ä¼è°ç¨registerWorkerã
ç®è¨ä¹ï¼workerçscanStateåå§å¼æ¯å¥æ°ï¼éworkerçscanstateåå§å¼=INACTIVE=1<<ï¼å°äº0ï¼éworkerçworkQueueå¨externalSubmitä¸å建ï¼ã
å½æ¯æ¬¡è°ç¨signalWorkï¼ætryReleaseï¼å¤éworkeræ¶ï¼workerçé«ä½å°±ä¼å 1
å¦å¤ï¼scanState<0表示workeræªæ¿æ´»ï¼å½workerè°ç¨runtaskæ§è¡ä»»å¡æ¶ï¼scanStateä¼è¢«ç½®ä¸ºå¶æ°ï¼å³è®¾ç½®scanStateçæå³è¾¹ä¸ä½ä¸º0ã
workerä¼ç æ¶ï¼æ¯è¿æ ·åå¨ç
workerçå¤é类似è¿æ ·ï¼
å¨workerä¼ç ç4è¡ä¼ªç ä¸ï¼è®©ctlçä½ä½çå¼å为worker.scanStateï¼è¿æ ·ä¸æ¬¡å°±å¯ä»¥éè¿scanStateå¤é该workerãå¤é该workeræ¶ï¼æ该workerçpreStack设置为ctlä½ä½çå¼ï¼è¿æ ·ä¸ä¸æ¬¡å¤éçworkerå°±æ¯scanStateçäºè¯¥preStackçworkerã
è¿ééè¿preStackä¿åä¸ä¸ä¸ªworkerï¼è¿ä¸ªworkeræ¯å½åworkeræ´æ©å°å¨çå¾ ï¼æ以形æä¸ä¸ªåè¿å åºçæ ã
runStateæ¯intç±»åçå¼ï¼æ§å¶æ´ä¸ªpoolçè¿è¡ç¶æåçå½å¨æï¼æä¸é¢å 个å¼ï¼å¯ä»¥å¥½å 个å¼åæ¶åå¨ï¼ï¼
å¦ærunStateå¼ä¸º0ï¼è¡¨ç¤ºpoolå°æªåå§åã
RSLOCK表示éå®poolï¼å½æ·»å workeråpoolç»æ¢æ¶ï¼å°±è¦ä½¿ç¨RSLOCKéå®æ´ä¸ªpoolãå¦æç±äºrunState被éå®ï¼å¯¼è´å ¶ä»æä½çå¾ runState解éï¼é常ç¨waitè¿è¡çå¾ ï¼ï¼å½runState设置äºRSIGNALï¼è¡¨ç¤ºrunState解éï¼å¹¶éç¥ï¼notifyAllï¼çå¾ çæä½ã
å©ä¸4个å¼é½è·runStateçå½å¨ææå ³ï¼é½å¯ä»¥é¡¾åæä¹ï¼
å½éè¦åæ¢æ¶ï¼è®¾ç½®runStateçSTOPå¼ï¼è¡¨ç¤ºåå¤å ³éï¼è¿æ ·å ¶ä»æä½çå°è¿ä¸ªæ è®°ä½ï¼å°±ä¸ä¼ç»§ç»æä½ï¼æ¯å¦tryAddWorkerçå°STOPå°±ä¸ä¼åå建workerï¼
ètryTerminate对è¿äºçå½å¨æç¶æçå¤çåæ¯è¿æ ·çï¼
å½åtopåbaseçåå§å¼ä¸º INITIAL_QUEUE_CAPACITY >>>1= (1 << )>>>1 = /2ãç¶åpushä¸ä¸ªtaskä¹åï¼top+=1ï¼ä¹å°±æ¯è¯´ï¼top对åºçä½ç½®æ¯æ²¡ætaskçï¼æè¿pushè¿æ¥çtaskå¨top-1çä½ç½®ãèbaseçä½ç½®åè½å¯¹åºå°taskï¼base对åºæå æ¾è¿éåçtaskï¼top-1对åºæåæ¾è¿éåçtaskã
qlockå¼å«ä¹ï¼1: locked,macd柱状指标源码 < 0: terminate; else 0
å³å½qlockå¼ä½0æ¶ï¼å¯ä»¥æ£å¸¸æä½ï¼å¼=1æ¶ï¼è¡¨ç¤ºéå®
int SQMASK=0xeï¼åä»»ä½æ´æ°è·SQMASKä½ä¸åï¼å¾å°çæ°å°±æ¯å¶æ°ã
è¯æï¼
注æè¿éå为äºè¿å¶æ¯ ï¼å°¤å ¶æ³¨ææå³è¾¹ç¬¬ä¸ä½æ¯0ï¼ä»»ä½æ°è·æå³è¾¹ç¬¬ä¸ä½æ¯0çæ°ä½ä¸åï¼å¾å°çæ°å°±æ¯å¶æ°ï¼å 为ä½ä¸ä¹åï¼ç¬¬ä¸ä½å°±æ¯0ï¼æ¯å¦s=A&SQMASKï¼Aå¯ä»¥æ¯ä»»ææ´æ°ï¼ç¶åæsæäºè¿å¶è¿è¡å¤é¡¹å¼å±å¼ï¼åæs=2 n1+2 n2 â¦â¦+2^nnï¼è¿énâ¥1ï¼æ以så¯ä»¥è¢«2æ´é¤ï¼å³sæ¯å¶æ°ã
æ以ä¸ä¸ªæ°æ¯å¥æ°è¿æ¯å¶æ°ï¼çå ¶æå³è¾¹ç¬¬ä¸ä½å³å¯ã
æ们ç¥éworkQueueæexternalPushå建çåcreateWorkerå建çworkerï¼ä¸¤ç§æ¹å¼å建çworkQueueï¼å ¶æ¾ç½®å°workQueuesçä½ç½®æ¯ä¸åçï¼åè æ¾å°workQueueçå¶æ°ä½ç½®ï¼èåè åæ¾å°å¥æ°ä½ç½®ãä¸åworkQueueæ¾å°èªå·±å¨workQueuesçä½ç½®çç®æ³æç¹ä¸åã
ä¸é¢çä¸ä¸forkjoinæ¡æ¶è·åworkQueuesä¸çå¶æ°ä½ç½®çworkQueueçç®æ³ï¼
è¿æ ·å°±è½è·åworkQueuesçå¶æ°ä½ç½®çworkQueueãmä¿è¯m & r & SQMASKè¿æ´ä¸ªè¿ç®ç»æä¸ä¼è¶ åºworkQueuesçä¸æ ï¼SQMASKä¿è¯åå°çæ¯å¶æ°ä½ç½®çworkQueueãè¿éæä¸ä¸ªæ趣çç°è±¡ï¼å设0å°workQueues.length-1ä¹é´æn个å¶æ°ï¼m & r & SQMASKæ¯æ¬¡é½è½åå°å ¶ä¸ä¸ä¸ªå¶æ°ï¼èä¸è¿ç»n次åå°çå¶æ°ä¸ä¼åºç°éå¤å¼ï¼æ£åæ§é常好ãèä¸æ¯å¾ªç¯çï¼å³1å°n次ån个ä¸åå¶æ°ï¼n+1å°2nä¹æ¯ån次ä¸åå¶æ°ï¼æ¤æ¶n个å¶æ°æ¯ä¸ªé½è¢«éæ°åä¸æ¬¡ãä¸é¢åæä¸rå¼æä»ä¹ç§å¯ï¼ä¸ºä½è½ä¿è¯è¿æ ·çæ£åæ§
ThreadLocalRandomå æä¸å¸¸éPROBE_INCREMENT = 0x9eb9ï¼ä»¥åä¸ä¸ªéæçprobeGenerator =new AtomicInteger() ï¼ç¶åæ¯ä¸ªçº¿ç¨çprobe= probeGenerator.addAndGet(PROBE_INCREMENT)æ以第ä¸ä¸ªçº¿ç¨çprobeå¼æ¯0x9eb9ï¼ç¬¬äºä¸ªçº¿ç¨çå¼å°±æ¯0x9eb9+0x9eb9ï¼ç¬¬ä¸ä¸ªçº¿ç¨çå¼å°±æ¯0x9eb9+0x9eb9+0x9eb9以æ¤ç±»æ¨ï¼æ´ä¸ªå¼æ¯çº¿æ§çï¼å¯ä»¥ç¨y=kx表示ï¼å ¶ä¸k=0x9eb9ï¼x表示第å 个线ç¨ãè¿æ ·æ¯ä¸ªçº¿ç¨çprobeå¯ä»¥ä¿è¯ä¸ä¸æ ·ï¼èä¸å ·æå¾å¥½ç离æ£æ§ã
å®é ä¸ï¼å¯ä»¥ä¸ç¨0x9eb9è¿ä¸ªå¼ï¼ç¨ä»»æä¸ä¸ªå¥æ°é½æ¯å¯ä»¥çï¼æ¯å¦1ãå¦æç¨1çè¯ï¼probe+=1ï¼è¿æ ·æ¯ä¸ªçº¿ç¨çprobeå°±é½æ¯ä¸åçï¼èä¸å ·æå¾å¥½ç离æ£æ§ãä¹å°±æ¯è¯´ï¼å设æéå¶æ¡ä»¶probe<nï¼è¶ è¿nå产ç溢åºãåprobeèªå n次åæä¼å¼å§åºç°éå¤å¼ï¼n次åprobeæ¯æ¬¡èªå çå¼é½ä¸åãå®é ä¸ç¨ä»»æä¸ä¸ªå¥æ°ï¼é½å¯ä»¥ä¿è¯probeèªå n次åæä¼å¼å§åºç°éå¤å¼ï¼æå ´è¶£å¯çæ¬ææåéå½é¨åãç±äºå¥æ°ç离æ£æ§ï¼æ以åªè¦çº¿ç¨æ°å°äºmæè SQMASK两è ä¸çæå°å¼ï¼åæ¯ä¸ªçº¿ç¨é½è½å¯ä¸å°å æ®ä¸ä¸ªwsä¸çä¸ä¸ªä½ç½®
å½ä¸ä¸ªæä½æ¯å¨éForkjoinThreadç线ç¨ä¸è¿è¡çï¼å称该æä½ä¸ºå¤é¨æä½ãæ¯å¦æ们åé¢æ§è¡pool.invokeï¼invokeå åæ§è¡externalPushãç±äºinvokeæ¯å¨éForkjoinThread线ç¨ä¸è¿è¡çï¼è¿éæ¯å¨main线ç¨ä¸è¿è¡ï¼ï¼æ以æ¯ä¸ä¸ªå¤é¨æä½ï¼è°ç¨çæ¯externalPushãä¹åtaskçæ§è¡æ¯éè¿ForkJoinThreadæ¥æ§è¡çï¼æ以taskä¸çforkå°±æ¯å é¨æä½ï¼è°ç¨çæ¯pushï¼æä»»å¡æ交å°å·¥ä½éåãå ¶å®forkçå®ç°æ¯ç±»ä¼¼ä¸é¢è¿æ ·çï¼
å³forkä¼æ ¹æ®æ§è¡èªèº«ç线ç¨æ¯å¦æ¯ForkJoinThreadçå®ä¾æ¥å¤ææ¯å¤äºå¤é¨è¿æ¯å é¨ãé£ä¸ºä½è¦åºåå å¤é¨ï¼
ä»»ä½çº¿ç¨é½å¯ä»¥ä½¿ç¨ForkJoinæ¡æ¶ï¼ä½æ¯å¯¹äºéForkJoinThreadç线ç¨ï¼å®å°åºæ¯ææ ·çï¼ForkJoinæ æ³æ§å¶ï¼ä¹æ æ³å¯¹å ¶ä¼åãå æ¤åºååºå å¤é¨ï¼è¿æ ·æ¹ä¾¿ForkJoinæ¡æ¶å¯¹ä»»å¡çæ§è¡è¿è¡æ§å¶åä¼å
forkJoinPool.invoke(task)æ¯æä»»å¡æ¾å ¥å·¥ä½éåï¼å¹¶çå¾ ä»»å¡æ§è¡ãæºç å¦ä¸
è¿éexternalPushè´è´£ä»»å¡æ交ï¼externalPushæºç å¦ä¸ï¼
RocketMQ 5.0: POP 消费模式 原理详解 & 源码解析
RocketMQ 5.0 引入 Pop 消费模式,用于解决 Push 消费模式存在的痛点。Pop 消费模式将客户端的重平衡逻辑迁移至 Broker 端,使得消息消费过程更加高效,避免消息堆积和横向扩展能力受限的问题。引入轻量化客户端后,通过 gRPC 封装 Pop 消费接口,实现了多语言支持,无需在客户端实现重平衡逻辑。ocx软电源码
Pop 消费模式的原理在于客户端仅需发送 Pop 请求,由 Broker 端根据请求分配消息队列并返回消息。这样可以实现多客户端同时消费同一队列,避免单一客户端挂起导致消息堆积,同时也消除了频繁重平衡导致的消息积压问题。
Pop 消费流程涉及消息拉取、不可见时间管理、消费失败处理和消息重试等关键环节。消息拉取时,系统会为一批消息生成 CheckPoint,并在 Broker 内存中保存,以便与 ACK 消息匹配。消息不可见时间机制确保在规定时间内未被 ACK 的消息将被重试。消费失败时,客户端通过修改消息不可见时间来调整重试策略。当消费用时超过预设时间,Broker 也会将消息放入重试队列。通过定时消息,Broker 可以提前消费重试队列中的消息,与 ACK 消息匹配,实现高效消息处理。
在 Broker 端,重平衡逻辑也进行了优化。Pop 模式的重平衡允许多个消费者同时消费同一队列,通过 popShareQueueNum 参数配置额外的负载获取队列次数。Pop 消息处理涉及从队列中 POP 消息、生成 CheckPoint 用于匹配 ACK 消息、以及存储 CheckPoint 与 Ack 消息匹配。Broker 端还通过 PopBufferMergeService 线程实现内存与磁盘中的 CheckPoint 和 Ack 消息匹配,以及消息重试处理。
源码解析部分涉及 Broker 端的重平衡逻辑、Pop 消息处理、Ack 消息处理、CheckPoint 与 Ack 消息匹配逻辑等关键组件的实现细节,这些细节展示了 RocketMQ 5.0 如何通过优化消费模式和流程设计,提升消息消费的效率和稳定性。