今天我們將研究 Java BlockingQueue。`java.util.concurrent.BlockingQueue` 是一個支持等待操作的 Java Queue,在檢索和移除元素時等待佇列變為非空,並在添加元素時等待佇列中有空間可用。
Java BlockingQueue
Java BlockingQueue 不接受 `null` 值,如果您嘗試在佇列中存儲 null 值,將會拋出 `NullPointerException`。Java BlockingQueue 實現是線程安全的。所有的排隊方法在本質上都是原子性的,並使用內部鎖或其他形式的並發控制。Java BlockingQueue 接口是 Java 集合框架的一部分,主要用於實現生產者消費者問題。我們不需要擔心為生產者等待空間可用或為消費者等待對象可用,因為這由 BlockingQueue 的實現類處理。Java 提供了幾種 BlockingQueue 實現,例如 `ArrayBlockingQueue`、`LinkedBlockingQueue`、`PriorityBlockingQueue`、`SynchronousQueue` 等等。在實現 BlockingQueue 的生產者消費者問題時,我們將使用 ArrayBlockingQueue 實現。以下是一些您應該了解的重要方法。
put(E e)
: 此方法用於將元素插入佇列。如果佇列已滿,則等待空間可用。E take()
: This method retrieves and remove the element from the head of the queue. If queue is empty it waits for the element to be available.
現在讓我們使用 Java BlockingQueue 來實現生產者消費者問題。
Java BlockingQueue 示例 – 訊息
這只是一個普通的 Java 物件,將由生產者生成並添加到佇列中。您也可以稱其為有效載荷或佇列訊息。
package com.journaldev.concurrency;
public class Message {
private String msg;
public Message(String str){
this.msg=str;
}
public String getMsg() {
return msg;
}
}
Java BlockingQueue 示例 – 生產者
生產者類別將創建訊息並將其放入佇列中。
package com.journaldev.concurrency;
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable {
private BlockingQueue queue;
public Producer(BlockingQueue q){
this.queue=q;
}
@Override
public void run() {
//生成訊息
for(int i=0; i<100; i++){
Message msg = new Message(""+i);
try {
Thread.sleep(i);
queue.put(msg);
System.out.println("Produced "+msg.getMsg());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//添加退出訊息
Message msg = new Message("exit");
try {
queue.put(msg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Java BlockingQueue 示例 – 消費者
消費者類別將處理來自佇列的訊息,並在收到退出訊息時終止。
package com.journaldev.concurrency;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable{
private BlockingQueue queue;
public Consumer(BlockingQueue q){
this.queue=q;
}
@Override
public void run() {
try{
Message msg;
//消費訊息直到收到退出訊息
while((msg = queue.take()).getMsg() !="exit"){
Thread.sleep(10);
System.out.println("Consumed "+msg.getMsg());
}
}catch(InterruptedException e) {
e.printStackTrace();
}
}
}
Java BlockingQueue 示例 – 服务
最后,我们必须为生产者和消费者创建BlockingQueue服务。这个生产者消费者服务将创建一个固定大小的BlockingQueue,并与生产者和消费者共享。该服务将启动生产者和消费者线程,然后退出。
package com.journaldev.concurrency;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerService {
public static void main(String[] args) {
//创建大小为10的BlockingQueue
BlockingQueue queue = new ArrayBlockingQueue<>(10);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
//启动生产者以将消息放入队列
new Thread(producer).start();
//启动消费者以从队列中消耗消息
new Thread(consumer).start();
System.out.println("Producer and Consumer has been started");
}
}
上述Java BlockingQueue示例程序的输出如下所示。
Producer and Consumer has been started
Produced 0
Produced 1
Produced 2
Produced 3
Produced 4
Consumed 0
Produced 5
Consumed 1
Produced 6
Produced 7
Consumed 2
Produced 8
...
Java线程休眠在生产者和消费者中被用于以一定延迟产生和消耗消息。
Source:
https://www.digitalocean.com/community/tutorials/java-blockingqueue-example