您好,欢迎来到站长目录(28sn.com)!


深度分析:面试阿里,字节跳动,美团几乎都会被问到的阻塞队列

来源:网络整理 浏览:189次 时间:2021-02-24
基本概念

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。
1)支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。

2)支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空

深度分析:面试阿里,字节跳动,美团几乎都会被问到的阻塞队列

阻塞队列一共有7种,我们着重讲一下
ArrayBlockingQueue ,
LinkedBlockingQueue ,
DelayQueue,
SynchronousQueue
这四种阻塞队列

ArrayBlockingQueue

基于数组实现有界的阻塞队列(循环数组)

类的继承
public class ArrayBlockingQueue<E> extends AbstractQueue<E>        implements BlockingQueue<E>, java.io.Serializable
主要成员变量
 private static final long serialVersionUID = -817911632652898426L;    final Object[] items; //底层存储元素的数组    int takeIndex; //进行取操作时的下标    int putIndex;//进行放操作时的下标    int count;//队列中元素的数量    final ReentrantLock lock;//阻塞时用的锁    private final Condition notEmpty;//满时的condition队列    private final Condition notFull;//空时的condition队列
构造器

参数有容量和全局锁是否是公平锁

 public ArrayBlockingQueue(int capacity, boolean fair) {        if (capacity <= 0)            throw new IllegalArgumentException();        this.items = new Object[capacity];        lock = new ReentrantLock(fair);        notEmpty = lock.newCondition();        notFull =  lock.newCondition();    }

不用确定是否是公平锁,默认是非公平锁

public ArrayBlockingQueue(int capacity) {        this(capacity, false);    }

在第一个构造器的前提下,将整个集合移入阻塞队列

public ArrayBlockingQueue(int capacity, boolean fair,                              Collection<? extends E> c) {        this(capacity, fair);        final ReentrantLock lock = this.lock;        lock.lock(); // Lock only for visibility, not mutual exclusion        try {            int i = 0;            try {                for (E e : c) {                    checkNotNull(e);                    items[i++] = e;                }            } catch (ArrayIndexOutOfBoundsException ex) {                throw new IllegalArgumentException();            }            count = i;            putIndex = (i == capacity) ? 0 : i;        } finally {            lock.unlock();        }    }
主要方法

put()

public void put(E e) throws InterruptedException {        checkNotNull(e);        final ReentrantLock lock = this.lock;        lock.lockInterruptibly();        try {            while (count == items.length)                notFull.await();            enqueue(e);        } finally {            lock.unlock();        }    }

1.首先判断添加的是否非空,是空的会抛出异常。
2.给put方法上锁
3.当集合元素数量和集合的长度相等时,调用put方法的线程将会被放入notFull队列上等待。
4.如果不相等,则enqueue(),也就是让e进入队列。
我们再看看enqueue()方法(入队方法)

 private void enqueue(E x) {        // assert lock.getHoldCount() == 1;        // assert items[putIndex] == null;        final Object[] items = this.items;        items[putIndex] = x;        if (++putIndex == items.length)            putIndex = 0;        count++;        notEmpty.signal();    }

其实就是让该元素入队,并且唤醒因为集合空而等待的线程。

take()方法同理。

LinkedBlockingQueue

LinkedBlockingQueue底层是基于链表实现的,所以其基本成员变量和LinkedList差不多。

类的继承关系

public class LinkedBlockingQueue<E> extends AbstractQueue<E>        implements BlockingQueue<E>, java.io.Serializable 
构造器

无参构造器,默认容量为最大容量

public LinkedBlockingQueue() {        this(Integer.MAX_VALUE);    }

手动设定容量

public LinkedBlockingQueue(int capacity) {        if (capacity <= 0) throw new IllegalArgumentException();        this.capacity = capacity;        last = head = new Node<E>(null);    }

将整个集合挪入队列中,默认容量同样是最大容量。

public LinkedBlockingQueue(Collection<? extends E> c) {        this(Integer.MAX_VALUE);        final ReentrantLock putLock = this.putLock;        putLock.lock(); // Never contended, but necessary for visibility        try {            int n = 0;            for (E e : c) {                if (e == null)                    throw new NullPointerException();                if (n == capacity)                    throw new IllegalStateException("Queue full");                enqueue(new Node<E>(e));                ++n;            }            count.set(n);        } finally {            putLock.unlock();        }    }
主要成员变量

链表就一定会有节点
内部节点类
和ArrayBlockingQueue不同的是,它有两个全局锁,一个负责放元素,一个负责取元素。

static class Node<E> {        E item;        /**         * One of:         * - the real successor Node         * - this Node, meaning the successor is head.next         * - null, meaning there is no successor (this is the last node)         */        Node<E> next;        Node(E x) { item = x; }    }

除了节点之外。

private transient Node<E> last;//尾节点transient Node<E> head;//头节点private final AtomicInteger count = new AtomicInteger();//计算当前阻塞队列中的元素个数 private final int capacity;//容量 //获取并移除元素时使用的锁,如take, poll, etcprivate final ReentrantLock takeLock = new ReentrantLock();//notEmpty条件对象,当队列没有数据时用于挂起执行删除的线程private final Condition notEmpty = takeLock.newCondition();//添加元素时使用的锁如 put, offer, etcprivate final ReentrantLock putLock = new ReentrantLock();//notFull条件对象,当队列数据已满时用于挂起执行添加的线程private final Condition notFull = putLock.newCondition();
主要方法

put()方法

  public void put(E e) throws InterruptedException {        if (e == null) throw new NullPointerException();        // Note: convention in all put/take/etc is to preset local var        // holding count negative to indicate failure unless set.        int c = -1;        Node<E> node = new Node<E>(e);        final ReentrantLock putLock = this.putLock;        final AtomicInteger count = this.count;        putLock.lockInterruptibly();        try {            while (count.get() == capacity) {                notFull.await();            }            enqueue(node);            c = count.getAndIncrement();            if (c + 1 < capacity)                notFull.signal();        } finally {            putLock.unlock();        }        if (c == 0)            signalNotEmpty();    }

基本和ArrayBlockingQueue一样,只是锁的数量不同,导致有一些细微的区别。

代码示例
public class TestDemo16 {    private static LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(10);    public static void main(String[] args) {        new Thread("put"){            @Override            public void run() {                //添加元素                for(int i=0; i<10; i++){                    System.out.println("put: "+i);                    try {                        queue.put(i);                        TimeUnit.MILLISECONDS.sleep(100);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }            }        }.start();        new Thread("take"){            @Override            public void run() {                //获取元素                for(int i=0; i<10; i++){                    try {                        System.out.println("take: "+queue.take());                        TimeUnit.MILLISECONDS.sleep(100);                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }            }        }.start();    }}
DelayQueue

基于PriorityQueue 延时阻塞队列,DelayQueue中的元素只有当其延时时间到达,才能够去当前队列中获取到该元素,DelayQueue是一个***队列。主要用于缓存系统的设计、定时任务系统的设计。
实现DelayQueue的三个步骤

第一步:继承Delayed接口

第二步:实现getDelay(TimeUnit unit),该方法返回当前元素还需要延时多长时间,单位是纳秒

第三步:实现compareTo方法来指定元素的顺序
例如;

class Test implements Delayed {    private long time; //Test实例延时时间    public Test(long time, TimeUnit unit){        this.time = System.currentTimeMillis() + unit.toMillis(time);    }    @Override    public long getDelay(TimeUnit unit) {        return this.time - System.currentTimeMillis();    }    @Override    public int compareTo(Delayed o) {        long diff = this.time - ((Test)o).time;        if(diff <= 0){            return -1;        }else{            return 1;        }    }}
        DelayQueue<Test> queue = new DelayQueue<>();        queue.put(new Test(5, TimeUnit.SECONDS));        queue.put(new Test(10, TimeUnit.SECONDS));        queue.put(new Test(15, TimeUnit.SECONDS));        System.out.println("begin time: "+ LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));        for(int i=0; i<3; i++){            try {                Test test = queue.take();            } catch (InterruptedException e) {                e.printStackTrace();            }            System.out.println("time: "+LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));        }
SynchronousQueue

SynchronousQueue是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。它支持公平访问队列。默认情况下线程采用非公平性策略访问队列。使用以下构造方法可以创建公平性访问的SynchronousQueue,如果设置为true,则等待的线程会采用先进先出的顺序访问队列

SynchronousQueue可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue的吞吐量高于LinkedBlockingQueue和ArrayBlockingQueue.

 public static void main(String[] args) throws InterruptedException {        SynchronousQueue queue=new SynchronousQueue();        LinkedBlockingQueue        new Thread("put"){            @Override            public void run() {                System.out.println("put has started");                for(int i=0;i<5;i++){                    System.out.println("put after takeThread");                    try {                        queue.put((int)((Math.random() * 100) + 1));                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }                System.out.println("put has ended");            }        }.start();        new Thread("take"){            @Override            public void run() {                System.out.println("take has started");                for(int i=0;i<5;i++){                    try {                        System.out.println("take from putThread"+queue.take());                    } catch (InterruptedException e) {                        e.printStackTrace();                    }                }                System.out.println("put has ended");            }        }.start();        }
总结

1:ArrayBlockingQueue和LinkedBlockingQueue的区别和联系?
1)数据存储容器不一样,ArrayBlockingQueue采用数组去存储数据、LinkedBlockingQueue采用链表去存储数据。
2)ArrayBlockingQueue(循环数组)采用数组去存储数据,不会产生额外的对象实例; LinkedBlockingQueue采用链表去存储数据,在插入和删除元素只与一个节点有关,需要去生成一个额外的Node对象,这可能长时间内需要并发处理大批量的数据,对于性能和后期GC会产生影响。
3)ArrayBlockingQueue是有界的,初始化时必须要指定容量;LinkedBlockingQueue默认是***的,Integer.MAX_VALUE, 当添加速度大于删除速度、有可能造成内存溢出。
4) ArrayBlockingQueue在读和写使用的锁是一样的,即添加操作和删除操作使用的是同一个ReentrantLock,没有实现锁分离;LinkedBlockingQueue实现了锁分离,添加的时候采用putLock、删除的时候采用takeLock,这样能提高队列的吞吐量。
2:ArrayBlockingQueue可以使用两把锁提高效率吗?
不能,主要原因是ArrayBlockingQueue底层循环数组来存储数据,LinkedBlockingQueue底层 链表来存储数据,链表队列的添加和删除,只是和某一个节点有关,为了防止head和last相互影响,就需要有一个原子性的计数器,每个添加操作先加入队列,计数器+1,这样是为了保证队列在移除的时候, 长度是大于等于计数器的,通过原子性的计数器,使得当前添加和移除互不干扰。对于循环数据来说,当我们走到最后一个位置需要返回到第一个位置,这样的操作是无法原子化,只能使用同一把锁来解决。

推荐站点

  • 我爱发烧音乐我爱发烧音乐

    我爱发烧音乐囊括了从流行音乐到古典音乐多个类型的音乐作品,专栏推荐最新的音乐,提供音乐排名榜单!可供免费线上收听音乐,歌曲流畅,音效极佳! 网站提供的钢琴以及二胡专栏,可供收听者,陶冶情操,改善心情,是难得的轻音乐典藏!

    www.520fs.com
  • 世纪音乐网世纪音乐网

    世纪音乐网是专业的在线音乐试听MP3下载网站。歌曲总计30余万首,收录了网上最新歌曲和流行音乐,DJ舞曲,非主流音乐,经典老歌,劲舞团歌曲,搞笑歌曲,儿童歌曲,英文歌曲等。是您上网听歌的最佳网站。

    www.ssjj.com
  • 杭州网杭州网

      杭州网是杭州地区唯一的新闻门户网站,由中共杭州市委宣传部、杭州日报报业集团和杭州广播电视集团共同组建的杭州网络传媒有限公司运营。

    www.hangzhou.com.cn
  • 深圳在线深圳在线

      深圳在线 www.szol.net是深圳本地最大、最早的地方生活资讯网站之一,网站名“深圳在线www.szol.net”由南方报业传媒集团编辑委员会总编辑、南方日报社总编辑、南方都市报总编辑、南方书画院名誉院长王春芙亲笔题名,深圳在线www.szol.net团队与深圳热线www.szonline.net、奥一网www.oeeee.com都源于全国最早成立于1996年的知名网络公司——深圳万用网。

    www.szol.net
  • 今题网今题网

     今题网- 中国领先的社区服务网,提供社区服务, 在线交友和商家推广服务,于2004年创建上线,公司现有员工超过百名。今题网自成立以来,凭借其独特的定位和丰富的社区交友功能, 凭借其团队超强的搜索引擎优化技术吸引超过千万的用户成为今题网的注册会员。

    www.jinti.com

鄂公网安备 42062502000001号