更新時(shí)間:2022-07-29 10:58:53 來(lái)源:動(dòng)力節(jié)點(diǎn) 瀏覽2011次
在本Java教程中,動(dòng)力節(jié)點(diǎn)小編將通過(guò)相同的生產(chǎn)者/消費(fèi)者概念來(lái)解釋BlockingQueue in Java.
Ajava.util.Queue 支持在檢索元素時(shí)等待隊(duì)列變?yōu)榉强眨⒃诖鎯?chǔ)元素時(shí)等待隊(duì)列中的空間變?yōu)榭捎玫牟僮鳌?/p>
我們需要?jiǎng)?chuàng)建四個(gè) Java 類:
CrunchifyMessage.java 放置和獲取消息
CrunchifyBlockingProducer.java將消息放入隊(duì)列
CrunchifyBlockingConsumer.java 從隊(duì)列中獲取消息
CrunchifyBlockingMain.java 開(kāi)始測(cè)試
BlockingQueue 實(shí)現(xiàn)是thread-safe. 所有排隊(duì)方法本質(zhì)上都是原子的并且使用內(nèi)部鎖。
讓我們開(kāi)始在 Java 中實(shí)現(xiàn)線程安全的 BlockingQueue
第1步
創(chuàng)建類 CrunchifyMessage.java。這是簡(jiǎn)單的Java 對(duì)象。
package com.crunchify.example;
/**
* @author Crunchify.com
* simple Message class to put and get message into queue
*/
public class CrunchifyMessage {
private String crunchifyMsg;
public CrunchifyMessage(String string) {
this.crunchifyMsg = string;
}
public String getMsg() {
return crunchifyMsg;
}
}
第2步
CrunchifyBlockingProducer.java 創(chuàng)建創(chuàng)建簡(jiǎn)單味精并將其放入隊(duì)列的生產(chǎn)者 。
package com.crunchify.example;
import java.util.concurrent.BlockingQueue;
/**
* @author Crunchify.com
*
*/
public class CrunchifyBlockingProducer implements Runnable {
private BlockingQueue<CrunchifyMessage> crunchQueue;
public CrunchifyBlockingProducer(BlockingQueue<CrunchifyMessage> queue) {
this.crunchQueue = queue;
}
@Override
public void run() {
// producing CrunchifyMessage messages
for (int i = 1; i <= 5; i++) {
CrunchifyMessage msg = new CrunchifyMessage("i'm msg " + i);
try {
Thread.sleep(10);
crunchQueue.put(msg);
System.out.println("CrunchifyBlockingProducer: Message - " + msg.getMsg() + " produced.");
} catch (Exception e) {
System.out.println("Exception:" + e);
}
}
// adding exit message
CrunchifyMessage msg = new CrunchifyMessage("All done from Producer side. Produced 50 CrunchifyMessages");
try {
crunchQueue.put(msg);
System.out.println("CrunchifyBlockingProducer: Exit Message - " + msg.getMsg());
} catch (Exception e) {
System.out.println("Exception:" + e);
}
}
}
第3步
創(chuàng)建 CrunchifyBlockingConsumer.java 從隊(duì)列中消費(fèi)消息的類。
package com.crunchify.example;
import java.util.concurrent.BlockingQueue;
/**
* @author Crunchify.com
*
*/
public class CrunchifyBlockingConsumer implements Runnable {
private BlockingQueue<CrunchifyMessage> queue;
public CrunchifyBlockingConsumer(BlockingQueue<CrunchifyMessage> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
CrunchifyMessage msg;
// consuming messages until exit message is received
while ((msg = queue.take()).getMsg() != "exit") {
Thread.sleep(10);
System.out.println("CrunchifyBlockingConsumer: Message - " + msg.getMsg() + " consumed.");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
第4步
創(chuàng)建 CrunchifyBlockingMain.java 運(yùn)行 BlockingQueue測(cè)試的簡(jiǎn)單方法。運(yùn)行這個(gè)程序來(lái)檢查 BlockingQueue 的行為。
package com.crunchify.example;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
/**
* @author Crunchify.com
*
*/
public class CrunchifyBlockingMain {
public static void main(String[] args) {
// Creating BlockingQueue of size 10
// BlockingQueue supports operations that wait for the queue to become non-empty when retrieving an element, and
// wait for space to become available in the queue when storing an element.
BlockingQueue<CrunchifyMessage> crunchQueue = new ArrayBlockingQueue<>(10);
CrunchifyBlockingProducer crunchProducer = new CrunchifyBlockingProducer(crunchQueue);
CrunchifyBlockingConsumer crunchConsumer = new CrunchifyBlockingConsumer(crunchQueue);
// starting producer to produce messages in queue
new Thread(crunchProducer).start();
// starting consumer to consume messages from queue
new Thread(crunchConsumer).start();
System.out.println("Let's get started. Producer / Consumer Test Started.\n");
}
}
BlockingQueue不接受空元素。在嘗試添加、放置或提供null時(shí),實(shí)現(xiàn)會(huì)拋出NullPointerException。
null用作標(biāo)記值以指示輪詢操作失敗。
結(jié)果:
Let's get started. Producer / Consumer Test Started.
CrunchifyBlockingProducer: Message - i'm msg 1 produced.
CrunchifyBlockingProducer: Message - i'm msg 2 produced.
CrunchifyBlockingConsumer: Message - i'm msg 1 consumed.
CrunchifyBlockingConsumer: Message - i'm msg 2 consumed.
CrunchifyBlockingProducer: Message - i'm msg 3 produced.
CrunchifyBlockingConsumer: Message - i'm msg 3 consumed.
CrunchifyBlockingProducer: Message - i'm msg 4 produced.
CrunchifyBlockingConsumer: Message - i'm msg 4 consumed.
CrunchifyBlockingProducer: Message - i'm msg 5 produced.
CrunchifyBlockingProducer: Exit Message - All done from Producer side. Produced 50 CrunchifyMessages
CrunchifyBlockingConsumer: Message - i'm msg 5 consumed.
CrunchifyBlockingConsumer: Message - All done from Producer side. Produced 50 CrunchifyMessages consumed.
當(dāng)您想限制某種傳入請(qǐng)求時(shí),您應(yīng)該使用相同的
生產(chǎn)者可以通過(guò)無(wú)限隊(duì)列遠(yuǎn)遠(yuǎn)領(lǐng)先于消費(fèi)者。如果消費(fèi)者沒(méi)有趕上生產(chǎn)者,那么它可能會(huì)導(dǎo)致 OutOfMemoryError. 在這種情況下,最好向潛在的生產(chǎn)者發(fā)出隊(duì)列已滿的信號(hào),并在失敗后迅速放棄。
換句話說(shuō):生產(chǎn)者自然受到限制。
阻塞隊(duì)列通常用于并發(fā)應(yīng)用程序
它提供了正確的、線程安全的實(shí)現(xiàn)
內(nèi)存消耗也應(yīng)該受到限制
相關(guān)閱讀
0基礎(chǔ) 0學(xué)費(fèi) 15天面授
有基礎(chǔ) 直達(dá)就業(yè)
業(yè)余時(shí)間 高薪轉(zhuǎn)行
工作1~3年,加薪神器
工作3~5年,晉升架構(gòu)
提交申請(qǐng)后,顧問(wèn)老師會(huì)電話與您溝通安排學(xué)習(xí)