2000字范文,分享全网优秀范文,学习好帮手!
2000字范文 > java多线程-阻塞队列BlockingQueue

java多线程-阻塞队列BlockingQueue

时间:2022-03-05 17:39:09

相关推荐

java多线程-阻塞队列BlockingQueue

大纲

BlockingQueue接口ArrayBlockingQueue

一、BlockingQueue接口

public interface BlockingQueue<E> extends Queue<E> {//add、offer向队列插值,返回插入是否成功boolean add(E e);boolean offer(E e);//向队列插值,队列满则阻塞至队列非满void put(E e) throws InterruptedException;//offer方法加上超时,超时返回falseboolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;//从队列拿值,队列空则阻塞至队列非空E take() throws InterruptedException;//向队列拿值,返回是否成功,超时返回falseE poll(long timeout, TimeUnit unit) throws InterruptedException;//队列剩余空间int remainingCapacity();//删除boolean remove(Object o);//是否包含boolean contains(Object o);//移除队列中的值到集合中int drainTo(Collection<? super E> c);//maxElements移除最大个数int drainTo(Collection<? super E> c, int maxElements);}

二、ArrayBlockingQueue

主要成员变量

/** 队列 */final Object[] items;/** 拿值的索引,用于 take, poll, peek, remove 方法*/int takeIndex;/** 存值的索引, 用于 next put, offer, or, add 方法*/int putIndex;/** 队列中元素数量 */int count;/** 锁,两个condition都是由这个锁创建 */final ReentrantLock lock;/** 可以拿值的Condition(不空可拿) */private final Condition notEmpty;/** 可以存值的Condition(不满可存) */private final Condition notFull;

构造函数

public ArrayBlockingQueue(int capacity) {this(capacity, false);}public ArrayBlockingQueue(int capacity, boolean fair) {//初始化锁与condition,默认非公平所if (capacity <= 0)throw new IllegalArgumentException();this.items = new Object[capacity];lock = new ReentrantLock(fair);notEmpty = lock.newCondition();notFull = lock.newCondition();}public ArrayBlockingQueue(int capacity, boolean fair,Collection<? extends E> c) {this(capacity, fair);//将集合中元素拷贝至队列final ReentrantLock lock = this.lock;lock.lock(); // Lock only for visibility, not mutual exclusiontry {int i = 0;try {for (E e : c) {checkNotNull(e);items[i++] = e;}} catch (ArrayIndexOutOfBoundsException ex) {throw new IllegalArgumentException();}//队列数量等于集合数量count = i;//存索引赋值:队列满赋0,不满赋集合数量putIndex = (i == capacity) ? 0 : i;} finally {lock.unlock();}}

主要方法

取:

//非阻塞public E poll() {final ReentrantLock lock = this.lock;lock.lock();try {return (count == 0) ? null : dequeue();} finally {lock.unlock();}}//阻塞public E take() throws InterruptedException {final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {while (count == 0)//队列大小为0拿值线程阻塞notEmpty.await();return dequeue();} finally {lock.unlock();}}//出队private E dequeue() {final Object[] items = this.items;E x = (E) items[takeIndex];items[takeIndex] = null;//对应putindexif (++takeIndex == items.length)takeIndex = 0;count--;if (itrs != null)itrs.elementDequeued();//唤醒存值线程 notFull.signal();return x;}

存:

//入队private void enqueue(E x) {// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items = this.items;items[putIndex] = x;if (++putIndex == items.length)putIndex = 0;count++;notEmpty.signal();}//非阻塞public boolean offer(E e) {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lock();try {if (count == items.length)return false;else {enqueue(e);return true;}} finally {lock.unlock();}}//阻塞public void put(E e) throws InterruptedException {checkNotNull(e);final ReentrantLock lock = this.lock;lock.lockInterruptibly();try {//队列满则阻塞while (count == items.length)notFull.await();enqueue(e);} finally {lock.unlock();}}

阻塞队列阻塞的存取值方法通过2个condition的await和signal方法完成:当队列空时notEmpty.await取值阻塞,当队列满时notFull.await存值阻塞;入队操作完成时调用notEmpty.signal唤醒取值线程,出队才做完成时唤醒存值线程。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。