// 创建时指定容量 publicLinkedBlockingQueue(int capacity){ if (capacity <= 0) thrownew IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); }
public E poll(){ final AtomicInteger count = this.count; // 队列为空返回 null if (count.get() == 0) returnnull; E x = null; int c = -1; // 加锁 final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { if (count.get() > 0) { x = dequeue(); // 减少队列元素计数,返回的是旧值 c = count.getAndDecrement(); if (c > 1) // 旧值大于 1 ,就是当前大于 0 // 唤醒调用 notEmpty.await 等待的线程 notEmpty.signal(); } } finally { takeLock.unlock(); } if (c == capacity) // 如果旧值等于 capacity 说明当前空了一个位置 signalNotFull(); return x; }
public E take()throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { // 阻塞等待 while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
通过上面代码可以看出 poll 和 take 方法逻辑大致相同。区别就是在当前队列为空时的处理逻辑。poll 在当前队列为空时返回 null,take 会阻塞等待,知道当前队列中有元素。
poll 和 take 都试用 dequeue() 方法从队列中获取元素。
private E dequeue(){ // assert takeLock.isHeldByCurrentThread(); // assert head.item == null; Node<E> h = head; Node<E> first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
dequeue() 方法逻辑就是获取头节点,并将 head 指向下一个节点。
查看元素
public E peek(){ if (count.get() == 0) returnnull; final ReentrantLock takeLock = this.takeLock; takeLock.lock(); try { Node<E> first = head.next; if (first == null) returnnull; else return first.item; } finally { takeLock.unlock(); } }