2000字范文,分享全网优秀范文,学习好帮手!
2000字范文 > java多线程阻塞队列_阻塞队列和多线程消费者 如何知道何时停止

java多线程阻塞队列_阻塞队列和多线程消费者 如何知道何时停止

时间:2019-08-30 22:18:11

相关推荐

java多线程阻塞队列_阻塞队列和多线程消费者 如何知道何时停止

我有一个单线程生成器,它创建一些任务对象,然后添加到 ArrayBlockingQueue (具有固定大小) .

我也开始了一个多线程的消费者 . 这是一个固定的线程池( Executors.newFixedThreadPool(threadCount); ) . 然后我向这个threadPool提交了一些ConsumerWorker入口,每个ConsumerWorker都对上面提到的ArrayBlockingQueue实例进行了引用 .

每个这样的工作人员都会在队列中执行 take() 并处理该任务 .

我的问题是,当没有更多的工作要做时,让 Worker 知道的最佳方法是什么 . 换句话说,如何告诉Workers, 生产环境 者已经完成了对队列的添加,从这一点开始,每个工作人员在看到Queue为空时应该停止 .

我现在得到的是一个设置,其中我的Producer初始化了一个回调,当他完成它的工作(向队列中添加东西)时会触发回调 . 我还保留了我创建并提交给ThreadPool的所有ConsumerWorkers的列表 . 当Producer Callback告诉我 生产环境 者已完成时,我可以告诉每个 Worker . 此时,他们应该继续检查队列是否为空,当它变为空时,它们应该停止,从而允许我优雅地关闭ExecutorService线程池 . 就是这样的

public class ConsumerWorker implements Runnable{

private BlockingQueue inputQueue;

private volatile boolean isRunning = true;

public ConsumerWorker(BlockingQueue inputQueue) {

this.inputQueue = inputQueue;

}

@Override

public void run() {

//worker loop keeps taking en element from the queue as long as the producer is still running or as

//long as the queue is not empty:

while(isRunning || !inputQueue.isEmpty()) {

System.out.println("Consumer "+Thread.currentThread().getName()+" START");

try {

Object queueElement = inputQueue.take();

//process queueElement

} catch (Exception e) {

e.printStackTrace();

}

}

}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue

public void setRunning(boolean isRunning) {

this.isRunning = isRunning;

}

}

这里的问题是我有一个明显的竞争条件,有时 生产环境 者将完成,发出信号,消费者工作者将在消耗队列中的所有内容之前停止 .

我的问题是,同步这个的最佳方法是什么,以便一切正常?我应该同步整个部分来检查 生产环境 者是否正在运行加上如果队列是空的加上从队列中取出一些块(在队列对象上)?我应该只在ConsumerWorker实例上同步 isRunning 布尔的更新吗?还有其他建议吗?

更新,这里是我最终使用的工作实施:

public class ConsumerWorker implements Runnable{

private BlockingQueue inputQueue;

private final static Produced POISON = new Produced(-1);

public ConsumerWorker(BlockingQueue inputQueue) {

this.inputQueue = inputQueue;

}

@Override

public void run() {

//worker loop keeps taking en element from the queue as long as the producer is still running or as

//long as the queue is not empty:

while(true) {

System.out.println("Consumer "+Thread.currentThread().getName()+" START");

try {

Produced queueElement = inputQueue.take();

Thread.sleep(new Random().nextInt(100));

if(queueElement==POISON) {

break;

}

//process queueElement

} catch (Exception e) {

e.printStackTrace();

}

System.out.println("Consumer "+Thread.currentThread().getName()+" END");

}

}

//this is used to signal from the main thread that he producer has finished adding stuff to the queue

public void stopRunning() {

try {

inputQueue.put(POISON);

} catch (InterruptedException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

}

这很大程度上受到JohnVint在下面的回答的启发,只有一些小修改 .

===由于@ vendhan的评论而更新 .

谢谢你的观察 . 你是对的,这个问题的第一个代码片段(在其他问题中)是 while(isRunning || !inputQueue.isEmpty()) 没有真正意义的那个 .

在我实际的最终实现中,我做了一些更接近你更换“||”的建议 . (或)用“&&”(和),从某种意义上说,每个 Worker (消费者)现在只检查他从列表中得到的元素是否是毒丸,如果是的话就停止了(理论上我们可以说 Worker 有要运行并且队列不能为空) .

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。