皮皮网

【rtsp播放源码】【apache开源项目源码】【众信呼死你源码】linkedblockingqueue 源码

来源:firstchild源码 时间:2025-01-24 13:06:18

1.从源码全面解析 LinkedBlockingQueue的来龙去脉
2.还不了解Java的5大BlockingQueue阻塞队列源码,看这篇文章就够了
3.LinkedBlockingQueue
4.LinkedBlockingDeque
5.阿里P9整理Java 高频面试题聊一聊 JUC 下的 LinkedBlockingQueue
6.喜提JDK的BUG一枚!多线程的情况下请谨慎使用这个类的stream遍历。

linkedblockingqueue 源码

从源码全面解析 LinkedBlockingQueue的来龙去脉

       并发编程是互联网技术的核心,面试官常在此领域对求职者进行深入考察。为了帮助读者在面试中占据优势,rtsp播放源码本文将解析 LinkedBlockingQueue 的工作原理。

       阻塞队列是并发编程中常见的数据结构,它在生产者和消费者模型中扮演重要角色。生产者负责向队列中添加元素,而消费者则从队列中取出元素。LinkedBlockingQueue 是 Java 中的一种高效阻塞队列实现,它底层基于链表结构。

       在初始化阶段,LinkedBlockingQueue 不需要指定队列大小。除了基本成员变量,它还包含两把锁,分别用于读取和写入操作。有读者疑惑,为何需要两把锁,而其他队列只用一把?本文后续将揭晓答案。

       生产者使用 `add()`、`offer()`、`offer(time)` 和 `put()` 方法向队列中添加元素。消费者则通过 `remove()`、apache开源项目源码`poll()`、`poll(time)` 和 `take()` 方法从队列中获取元素。

       在解析源码时,发现 LinkedBlockingQueue 与 ArrayBlockingQueue 在锁的使用上有所不同。ArrayBlockingQueue 使用互斥锁,而 LinkedBlockingQueue 使用读锁和写锁。这是否意味着 ArrayBlockingQueue 可以使用相同类型的锁?答案是肯定的,且使用两把锁的 ArrayBlockingQueue 在性能上有所提升。

       流程图展示了 LinkedBlockingQueue 和 ArrayBlockingQueue 之间的相似之处。有兴趣的读者可以自行绘制。

       总结而言,LinkedBlockingQueue 是一种高效的阻塞队列实现,其底层结构基于链表。它通过读锁和写锁管理线程安全,为生产者和消费者提供了并发支持。通过优化锁的使用,LinkedBlockingQueue 在某些场景下展现出更好的性能。

       互联网寒冬虽在,但学习和分享是抵御寒冬的最佳方式。通过交流经验,可以减少弯路,提高效率。如果你对后端架构和中间件源码感兴趣,欢迎与我交流,众信呼死你源码共同进步。

还不了解Java的5大BlockingQueue阻塞队列源码,看这篇文章就够了

       引言

       本文将详细解读Java中常见的5种BlockingQueue阻塞队列,包括它们的优缺点、区别以及典型应用场景,以帮助深入理解这5种队列的独特性质和使用场合。

       常见的BlockingQueue有以下5种:

       1. **基于数组实现的阻塞队列**:创建时需指定容量大小,是有限队列。

       2. **基于链表实现的阻塞队列**:默认无界,可自定义容量。

       3. **无缓冲阻塞队列**:生产的数据需立即被消费,无缓冲。

       4. **优先级阻塞队列**:支持元素按照大小排序,无界。

       5. **延迟阻塞队列**:基于PriorityQueue实现,无界。

       **BlockingQueue简介

**

       BlockingQueue作为接口,定义了放数据和取数据的多组方法,适用于并发多线程环境,特别适合生产者-消费者模式。

       **应用场景

**

       BlockingQueue的作用类似于消息队列,用于解耦、异步处理和削峰,适用于线程池的软文营销平台源码核心功能实现。

       **区别与比较

**

       - **ArrayBlockingQueue**:基于数组实现,容量可自定义。

       - **LinkedBlockingQueue**:基于链表实现,无界或自定义容量。

       - **SynchronousQueue**:同步队列,生产者和消费者直接交互,无需缓冲。

       - **PriorityBlockingQueue**:实现优先级排序,无界队列。

       - **DelayQueue**:本地延迟队列,支持元素延迟执行。

       在选择使用哪种队列时,需考虑具体任务的特性、吞吐量需求以及是否需要优先级排序或延迟执行。

       本文旨在提供全面理解Java中BlockingQueue的指南,从源码剖析到应用场景,帮助开发者更好地应用这些工具于实际项目中。

LinkedBlockingQueue

        LinkedBlockingDeque在结构上有别于之前讲解过的阻塞队列,它不是Queue而是Deque,中文翻译成双端队列,双端队列指可以从任意一端入队或者出队元素的队列,实现了在队列头和队列尾的高效插入和移除

        LinkedBlockingDeque是链表实现的线程安全的无界的同时支持FIFO、LIFO的双端阻塞队列,可以回顾下之前的LinkedBlockingQueue阻塞队列特点,本质上是类似的,但是又有些不同:

        Queue和Deque的关系有点类似于单链表和双向链表,LinkedBlockingQueue和LinkedBlockingDeque的内部结点实现就是单链表和双向链表的区别,具体可参考源码。

        在第二点中可能有些人有些疑问,两个互斥锁和一个互斥锁的区别在哪里?我们可以考虑以下场景:

        A线程先进行入队操作,B线程随后进行出队操作,如果是LinkedBlockingQueue,A线程入队过程还未结束(已获得锁还未释放),B线程出队操作不会被阻塞等待(锁不同),如果是LinkedBlockingDeque则B线程会被阻塞等待(同一把锁)A线程完成操作才继续执行

        LinkedBlockingQueue一般的操作是获取一把锁就可以,但有些操作例如remove操作,则需要同时获取两把锁,之前的LinkedBlockingQueue讲解曾经说明过

        LinkedBlockingQueue 由于是单链表结构,只能一端操作,读只能在头,写只能在尾,因此两把锁效率更高。LinkedBlockingDeque 由于是双链表结构,两端头尾都能读写,因此只能用一把锁保证原子性。 当然效率也就更低

        ArrayBlockingQueue

        LinkedBlockingQueue

        问题,为什么ArrayBlockingQueue 不能用两把锁

        因为取出后,ArrayBlockingQueue 的元素需要向前移动。

        LinkedBlockingQueue内部由单链表实现,只能从head取元素,从tail添加元素。添加元素和获取元素都有独立的锁,也就是说LinkedBlockingQueue是读写分离的,读写操作可以并行执行。LinkedBlockingQueue采用可重入锁(ReentrantLock)来保证在并发情况下的线程安全。

        LinkedBlockingQueue一共有三个构造器,分别是无参构造器、可以指定容量的构造器、可以穿入一个容器的构造器。如果在创建实例的时候调用的是无参构造器,LinkedBlockingQueue的默认容量是Integer.MAX_VALUE,这样做很可能会导致队列还没有满,但是内存却已经满了的情况(内存溢出)。

        size()方法会遍历整个队列,时间复杂度为O(n),所以最好选用isEmtpy

        1.判断元素是否为null,为null抛出异常

        2.加锁(可中断锁)

        3.判断队列长度是否到达容量,如果到达一直等待

        4.如果没有队满,enqueue()在队尾加入元素

        5.队列长度加1,此时如果队列还没有满,调用signal唤醒其他堵塞队列

        1.加锁(依旧是ReentrantLock),注意这里的锁和写入是不同的两把锁

        2.判断队列是否为空,如果为空就一直等待

        3.通过dequeue方法取得数据

        3.取走元素后队列是否为空,如果不为空唤醒其他等待中的队列

        原理:在队尾插入一个元素, 如果队列没满,立即返回true; 如果队列满了,立即返回false。

        原理:如果没有元素,直接返回null;如果有元素,出队

        1、具体入队与出队的原理图:

        图中每一个节点前半部分表示封装的数据x,后边的表示指向的下一个引用。

        1.1、初始化

        初始化之后,初始化一个数据为null,且head和last节点都是这个节点。

        1.2、入队两个元素过后

        1.3、出队一个元素后

        表面上看,只是将头节点的next指针指向了要删除的x1.next,事实上这样我觉的就完全可以,但是jdk实际上是将原来的head节点删除了,而上边看到的这个head节点,正是刚刚出队的x1节点,只是其值被置空了。

        2、三种入队对比:

        3、三种出队对比:

LinkedBlockingDeque

        LinkedBlockingDeque: 由双向链表组成的有界阻塞队列,队列容量大小可选,默认大小为Integer.MAX_VALUE。队头部和队尾都可以写入和移除元素,因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半锁的竞争 。

        LinkedBlockingDeque是双向链表实现的阻塞队列。该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(插入/删除);

        在不能够插入元素时,它将阻塞住试图插入元素的线程;在不能够抽取元素时,它将阻塞住试图抽取的线程。;

        LinkedBlockingDeque还是可选容量的,防止过度膨胀,默认等于Integer.MAX_VALUE。;

        LinkedBlockingDuque没有进行读写锁的分离,因此同一时间只能有一个线程对其操作,因此在高并发应用中,它的性能要远远低于LinkedBlockingQueue。

        Deque特性: 队头和队尾都可以插入和移除元素,支持FIFO和FILO。

        相比于其他阻塞队列,LinkedBlockingDeque多了addFirst()、addLast()、peekFirst()、peekLast()等方法,以XXXFirst结尾的方法,表示插入、获取获移除双端队列的队头元素。以xxxLast结尾的方法,表示插入、获取获移除双端队列的队尾元素。

        假设:有多个消费者,每个消费者有自己的一个消息队列,生产者不断的生产数据扔到队列中,消费者消费数据有快有慢。为了提升效率,速度快的消费者可以从其它消费者队列的队尾出队元素放到自己的消息队列中,由于是从其它队列的队尾出队,这样可以减少并发冲突(其它消费者从队首出队元素),又能提升整个系统的吞吐量。这其实是一种“工作窃取算法”的思路。

        BlockingDeque相对于BlockingQueue,最大的特点就是增加了在队首入队/队尾出队的阻塞方法。下面是两个接口的比较:

        可以看first指向队首节点,last指向队尾节点。利用ReentrantLock来保证线程安全,所有对队列的修改操作都需要先获取全局锁

        初始:

        队首插入节点node:

        初始:

        队尾插入节点node:

        可响应中断阻塞式队首入队-void putFirst(E e)

        可响应中断阻塞式队尾入队-void putLast(E e)

        非阻塞式队首入队-boolean offerFirst(E e)

        非阻塞式队尾入队-boolean offerLast(E e)

        可响应中断限时阻塞队首入队-boolean offerFirst(E e, long timeout, TimeUnit unit)

        可响应中断限时阻塞队尾入队-boolean offerLast(E e, long timeout, TimeUnit unit)

        非阻塞队首入队(抛异常)-void addFirst(E e)

        非阻塞队尾入队(抛异常)-void addLast(E e)

        非阻塞式队尾入队-boolean add(E e)

        可响应中断阻塞式队尾入队-void put(E e)

        非阻塞式队尾入队-boolean offer(E e)

        可响应中断限时阻塞队尾入队-boolean offer(E,long,TimeUnit)

        初始:

        删除队首节点:

        从队尾出队last节点,一定会将prev指针指向自身,以区别于非出队时last指向具体节点(即prev指针不会指向自身,可能为真实节点或null)。

        初始:

        删除队尾节点:

        出队核心方法-void unlink(Node x)

        可响应中断阻塞式队首出队-E takeFirst()

        可响应中断阻塞式队尾出队-E takeLast()

        非阻塞式队首出队-E pollFirst()

        非阻塞式队尾出队-E pollLast()

        可响应中断限时阻塞队首出队- E pollFirst(long timeout, TimeUnit unit)

        可响应中断限时阻塞队尾出队- E pollLast(long timeout, TimeUnit unit)

        不移除元素队首出队-E peekFirst()

        不移除元素队尾出队-E peekLast()

        非阻塞队首出队(抛异常)-E removeFirst()

        非阻塞队尾出队(抛异常)–E removeLast()

        从队首向后移除指定元素-boolean removeFirstOccurrence(Object o)

        从队尾向前移除指定元素-boolean removeLastOccurrence(Object o)

        可响应中断阻塞式队首出队-E take()

        非阻塞式出队-E poll()

        阻塞式超时出队-E poll(timeout, unit)

        阻塞式出队-E peek()

        移除元素- E remove()

        此迭代器是弱一致性的。因为即使节点被删除, 迭代器也会照样返回被删除节点的item 。

        正向迭代器和反向迭代器 只需要实现2个抽象方法。

        LinkedBlockingDeque是一个 无界阻塞双端队列 ,相比普通队列,主要是多了【队尾出队元素】/【队首入队元素】的功能。

阿里P9整理Java 高频面试题聊一聊 JUC 下的 LinkedBlockingQueue

       本文将深入探讨Java并发包(JUC)中的LinkedBlockingQueue队列,首先介绍LinkedBlockingQueue的主要特性和实现方式。

       LinkedBlockingQueue是一个双端队列,其继承自AbstractQueue类,并实现了BlockingQueue接口。它具有以下特性:

       1. LinkedBlockingQueue允许线程安全地向队列中添加或删除元素。

       2. 队列中的亦众gps源码元素按插入顺序进行存储。

       3. 队列的容量可以设置。

       4. 支持元素的等待和通知操作。

       5. 可以被多个线程安全地访问。

       LinkedBlockingQueue的常用方法有put()和take()。下面将分别解析这两个方法的实现。

       put()方法用于将元素添加到队列中。其核心逻辑如下:

       1. 当队列未满时,直接将元素添加到队尾。

       2. 当队列已满时,调用线程将进入等待状态,等待队列中的元素被消费或队列容量增加。

       3. 当有元素被消费或队列容量增加时,等待线程将被唤醒并继续执行。

       offer()方法类似于put()方法,但不阻塞线程,如果队列已满,则返回false。

       take()方法用于从队列中获取元素。其核心逻辑如下:

       1. 当队列为空时,调用线程将进入等待状态,等待队列中有元素被添加。

       2. 当队列中有元素时,线程将被唤醒并获取队列中的元素。

       3. 如果需要,可以设置超时等待时间。

       LinkedBlockingQueue的其他方法,如poll()、peek()等,实现方式与put()和take()类似,提供不同类型的获取操作。

       总结,LinkedBlockingQueue是一个功能强大的并发队列,其通过实现put()和take()方法提供了线程安全的元素添加和获取操作。通过理解其内部机制,开发者可以更有效地使用并发技术,提高程序的性能和稳定性。希望本文的解析能够帮助你更好地理解LinkedBlockingQueue,并在面试或工作中发挥重要作用。

       为了进一步提升你的Java技能,我整理了一份涵盖基础知识、高级技术、面试技巧等内容的全面学习资料。这份资料包括性能优化、微服务架构、并发编程、开源框架、分布式系统等多个领域的内容,以及技术+人事面试的全面指导。通过系统学习这份资料,你将能够应对各大公司技术面试,甚至在求职过程中脱颖而出。获取这份资料的链接如下:[资料分享链接]。

喜提JDK的BUG一枚!多线程的情况下请谨慎使用这个类的stream遍历。

       在探讨问题之前,我们先回顾一下 LinkedBlockingQueue 的线程安全性。在传统的观点中,LinkedBlockingQueue 是线程安全的,因为它内部使用了 ReentrantLock。然而,就在 RocketMQ 的讨论版中,一个问题揭示了 LinkedBlockingQueue 在特定情况下的线程不安全性,引发了我们的好奇心。

       核心问题在于 LinkedBlockingQueue 的 stream 遍历方式,在多线程环境下可能出现死循环。我们通过一个简单的 demo 来深入分析这一现象。首先,引入了一个链接,其中详细展示了如何在多线程环境下复现这一 Bug。

       在分析代码之前,让我们先明确 demo 的基本逻辑:创建了 个线程,每个线程不断调用 offer 和 remove 方法。主线程则通过 stream 对 queue 进行遍历,目标是找到队列中的第一个非空元素。这看似是一个简单的遍历操作,但事实并非如此。

       关键点在于 tryAdvance 方法,看似平凡的遍历操作隐藏了陷阱。当运行代码时,预期的输出并未出现,而是陷入了一个死循环,控制台仅输出了一行信息或交替输出几次后停止。

       我们的疑问指向了 JDK 版本,尤其是 JDK 8。通过替换为 JDK ,我们观察到交替输出的效果。这使得我们大胆推测,这可能是 JDK 8 版本的 Bug。为了验证这一假设,我们进行了详细的分析。

       通过线程 dump 文件,我们发现主线程始终处于可运行状态,似乎没有被锁阻塞。然而,从控制台的输出来看,它似乎处于阻塞状态。这一现象让我们联想到一个经典的场景:线程陷入死循环。

       通过深入源码分析,我们发现了死循环的根源。在 stream 遍历的关键方法 tryAdvance 中,存在一个 while 循环,其条件始终满足,导致死循环。而问题的核心在于移除队列头部元素的代码逻辑,当有其他线程不断调用 remove 方法时,可能会形成特定的节点结构,触发死循环。

       经过详细的分析,我们揭示了这一 Bug 的原理,并通过简化代码演示了整个过程。通过将实例代码简化,我们揭示了死循环是如何在多线程环境下产生的。这不仅有助于理解 Bug 的本质,也为后续的 Bug 修复提供了思路。

       为了验证解决方案的正确性,我们对比了 JDK 8 和 JDK 的源码差异。在 JDK 中,通过引入了一个名为 succ 的方法,成功解决了死循环问题。这一方法通过确保节点不会指向自身,从而避免了死循环的产生。

       通过这篇文章的分析,我们不仅揭示了 LinkedBlockingQueue 在特定条件下的线程不安全性,还探讨了如何通过升级 JDK 版本、避免使用 stream 遍历,以及使用 synchronized 修饰符等方式来规避此类问题。同时,我们还延伸至其他数据结构,如 ConcurrentHashMap,讨论了它们在不同使用场景下的线程安全性问题。

       最后,我们再次强调在多线程环境下,LinkedBlockingQueue 的 stream 遍历方式可能存在一定的问题,可能会导致死循环。理解并解决这类 Bug,对于确保代码的健壮性和性能至关重要。

java多线程关于消费者和生产者,求源程序,求大神解答。愿意提高报酬

        自己看代码体会吧

import java.util.concurrent.BlockingQueue;

       import java.util.concurrent.ExecutorService;

       import java.util.concurrent.Executors;

       import java.util.concurrent.LinkedBlockingQueue;

       public class BlockingQueueTest {

        public static void main(String[] args) {

         ExecutorService service = Executors.newCachedThreadPool();

         BlockingQueue<String> blockingQueue = new LinkedBlockingQueue<String>();

         System.out.println("blockingQueue now contains "  + blockingQueue.size() + " unit");

         service.submit(new Consumer1(blockingQueue));

         gap(blockingQueue);

         service.submit(new Productor2(blockingQueue));

         gap(blockingQueue);

         service.submit(new Productor3(blockingQueue));

         gap(blockingQueue);

         service.submit(new Productor4(blockingQueue));

         gap(blockingQueue);

         service.submit(new Productor5(blockingQueue));

         gap(blockingQueue);

         

         service.shutdown();

        }

        private static void gap(BlockingQueue<String> blockingQueue) {

         try {

          Thread.sleep();

         } catch (InterruptedException e) {

          // TODO Auto-generated catch block

          e.printStackTrace();

         }

         System.out.println("blockingQueue now contains "  + blockingQueue.size() + " unit");

        }

       }

       class Consumer1 implements Runnable{

        BlockingQueue<String> blockingQueue;

        

        public Consumer1(BlockingQueue<String> blockingQueue) {

         super();

         this.blockingQueue = blockingQueue;

        }

        @Override

        public void run() {

         // TODO Auto-generated method stub

         System.out.println("Consumer1 start: need  units");

         for(int i = 0; i < ; i++){

          try {

           blockingQueue.take();

          } catch (InterruptedException e) {

           // TODO Auto-generated catch block

           e.printStackTrace();

          }

         }

         System.out.println("Consumer1 end: has got  units");

        }

        

       }

       class Productor2 implements Runnable{

        BlockingQueue<String> blockingQueue;

        

        public Productor2(BlockingQueue<String> blockingQueue) {

         super();

         this.blockingQueue = blockingQueue;

        }

        @Override

        public void run() {

         // TODO Auto-generated method stub

         System.out.println("Productor2 start: put 5 units");

         for(int i = 0; i < 5; i++){

          try {

           blockingQueue.put("Object");

          } catch (InterruptedException e) {

           // TODO Auto-generated catch block

           e.printStackTrace();

          }

         }

         System.out.println("Productor2 end: has put 5 units");

        }

        

       }

       class Productor3 implements Runnable{

        BlockingQueue<String> blockingQueue;

        

        public Productor3(BlockingQueue<String> blockingQueue) {

         super();

         this.blockingQueue = blockingQueue;

        }

        @Override

        public void run() {

         // TODO Auto-generated method stub

         System.out.println("Productor3 start: put 5 units");

         for(int i = 0; i < 5; i++){

          try {

           blockingQueue.put("Object");

          } catch (InterruptedException e) {

           // TODO Auto-generated catch block

           e.printStackTrace();

          }

         }

         System.out.println("Productor3 end: has put 5 units");

        }

        

       }

       class Productor4 implements Runnable{

        BlockingQueue<String> blockingQueue;

        

        public Productor4(BlockingQueue<String> blockingQueue) {

         super();

         this.blockingQueue = blockingQueue;

        }

        @Override

        public void run() {

         // TODO Auto-generated method stub

         System.out.println("Productor4 start: put  units");

         for(int i = 0; i < ; i++){

          try {

           blockingQueue.put("Object");

          } catch (InterruptedException e) {

           // TODO Auto-generated catch block

           e.printStackTrace();

          }

         }

         System.out.println("Productor4 end: has put  units");

        }

        

       }

       class Productor5 implements Runnable{

        BlockingQueue<String> blockingQueue;

        

        public Productor5(BlockingQueue<String> blockingQueue) {

         super();

         this.blockingQueue = blockingQueue;

        }

        @Override

        public void run() {

         // TODO Auto-generated method stub

         System.out.println("Productor5 start: put  units");

         for(int i = 0; i < ; i++){

          try {

           blockingQueue.put("Object");

          } catch (InterruptedException e) {

           // TODO Auto-generated catch block

           e.printStackTrace();

          }

         }

         System.out.println("Productor5 end: has put  units");

        }

        

       }

       每个线程是隔了1s启动的, 结果

       blockingQueue now contains 0 unit

       Consumer1 start: need units

       blockingQueue now contains 0 unit

       Productor2 start: put 5 units

       Productor2 end: has put 5 units

       blockingQueue now contains 0 unit

       Productor3 start: put 5 units

       Productor3 end: has put 5 units

       Consumer1 end: has got units

       blockingQueue now contains 0 unit

       Productor4 start: put units

       Productor4 end: has put units

       blockingQueue now contains unit

       Productor5 start: put units

       blockingQueue now contains unit