大家好,我是指北君。
(資料圖片僅供參考)
今天指北君帶領(lǐng)大家接著學(xué)習(xí)RabbitMQ,了解RabbitMQ的五大通信模型之一的Work模型;接下來(lái)還會(huì)有關(guān)于RabbitMQ的系列教程,對(duì)你有幫助的話(huà)記得關(guān)注哦~
回顧上一篇文章中,簡(jiǎn)單的介紹了一下RabbitMQ,以及安裝和hello world。
有的小伙伴留言說(shuō)看不懂其中的方法參數(shù),這里先解釋一下幾個(gè)基本的方法參數(shù)。
// 聲明隊(duì)列方法channel.queueDeclare(QUEUE_NAME, false, false, false, null);/** * param1:queue 隊(duì)列的名字 * param2:durable 是否持久化;比如現(xiàn)在發(fā)送到隊(duì)列里面的消息,如果沒(méi)有持久化,重啟這個(gè)隊(duì)列后數(shù) 據(jù)會(huì)丟失(false) true:重啟之后數(shù)據(jù)依然在 * param3:exclusive 是否排外(是否是當(dāng)前連接的專(zhuān)屬隊(duì)列),排外的意思是: * 1:連接關(guān)閉之后 這個(gè)隊(duì)列是否自動(dòng)刪除(false:不自動(dòng)刪除) * 2:是否允許其他通道來(lái)進(jìn)行訪問(wèn)這個(gè)數(shù)據(jù)(false:不允許) * param4:autoDelete 是否自動(dòng)刪除 * 就是當(dāng)最后一個(gè)連接斷開(kāi)的時(shí)候,是否自動(dòng)刪除這個(gè)隊(duì)列(false:不刪除) * param5:arguments(map) 聲明隊(duì)列的時(shí)候,附帶的一些參數(shù) */
// 發(fā)送數(shù)據(jù)到隊(duì)列channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, "第一個(gè)隊(duì)列消息...".getBytes());/** * param1:exchange 交換機(jī) 沒(méi)有就設(shè)置為 "" 值就可以了 * param2:routingKey 路由的key 現(xiàn)在沒(méi)有設(shè)置key,直接使用隊(duì)列的名字 * param3:BasicProperties 發(fā)送數(shù)據(jù)到隊(duì)列的時(shí)候,是否要帶一些參數(shù)。 * MessageProperties.PERSISTENT_TEXT_PLAIN表示沒(méi)有帶任何參數(shù) * param4:body 向隊(duì)列中發(fā)送的消息數(shù)據(jù) */Work模型
work模型稱(chēng)為工作隊(duì)列或者競(jìng)爭(zhēng)消費(fèi)者模式,多個(gè)消費(fèi)者消費(fèi)的數(shù)據(jù)之和才是原來(lái)隊(duì)列中的所有數(shù)據(jù),適用于流量的削峰。
演示寫(xiě)個(gè)簡(jiǎn)單的測(cè)試:
生產(chǎn)者
public class Producer { private static final String QUEUE_NAME = "queue_work_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 100; i++) { channel.basicPublish("", QUEUE_NAME, null, ("work模型:" + i).getBytes()); } channel.close(); connection.close(); }}
消費(fèi)者
// 消費(fèi)者1public class Consumer { private static final String QUEUE_NAME = "queue_work_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // channel.basicQos(0, 1, false); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(System.currentTimeMillis() + "消費(fèi)者1接收到信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, false, defaultConsumer); }}
// 消費(fèi)者2public class Consumer2 { private static final String QUEUE_NAME = "queue_work_1"; public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); // channel.basicQos(0, 1, false); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println(System.currentTimeMillis() + "消費(fèi)者2接收到信息:" + new String(body)); channel.basicAck(envelope.getDeliveryTag(), false); // 這里加了個(gè)延遲,表示處理業(yè)務(wù)時(shí)間 try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } }; channel.basicConsume(QUEUE_NAME, false, defaultConsumer); }}
結(jié)果
可以看出來(lái):100條消息,消費(fèi)者之間是平分的,消費(fèi)者1 幾乎是瞬間完成,消費(fèi)者2 則是慢慢吞吞的運(yùn)行完畢,消費(fèi)者1大量時(shí)間處于空閑狀態(tài),消費(fèi)者2則一直忙碌。這顯然是不適用于實(shí)際開(kāi)發(fā)中。
我們需要遵從一個(gè)原則,就是能者多勞,消費(fèi)越快的人,消費(fèi)的越多;
現(xiàn)在我們把消費(fèi)者1和2的代碼中// channel.basicQos(0, 1, false);這行代碼取消注釋?zhuān)俅芜\(yùn)行;
現(xiàn)在的結(jié)果就比較符合能者多勞,雖然你干的多,但是工資是一樣的呀~
work模型的一個(gè)主要的方法是basicQos();這里也解釋一下其參數(shù):
// 設(shè)置限流機(jī)制channel.basicQos(0, 1, false);/** * param1: prefetchSize,消息本身的大小 如果設(shè)置為0 那么表示對(duì)消息本身的大小不限制 * param2: prefetchCount,告訴rabbitmq不要一次性給消費(fèi)者推送大于N個(gè)消息 * param3:global,是否將上面的設(shè)置應(yīng)用于整個(gè)通道,false表示只應(yīng)用于當(dāng)前消費(fèi)者 */小結(jié)
本文到這里就結(jié)束了,主要介紹了RabbitMQ通信模型中的work模型,適用于限流、削峰等應(yīng)用場(chǎng)景。
關(guān)鍵詞: 通信模型 發(fā)送數(shù)據(jù) 能者多勞 空閑狀態(tài)
X 關(guān)閉
X 關(guān)閉
- 廈門(mén)啟動(dòng)節(jié)日期間主要副食品供應(yīng)補(bǔ)貼和平價(jià)菜機(jī)制
- 平潭簽約恒鼎海上運(yùn)輸服務(wù)項(xiàng)目
- 當(dāng)前快報(bào):《平潭綜合實(shí)驗(yàn)區(qū)“十四五”美麗海灣建設(shè)工作方案》印發(fā)
- 春運(yùn)大幕開(kāi)啟 平潭高鐵站首日旅客同比增長(zhǎng)18.1%|每日消息
- 春運(yùn)啟動(dòng) 泉州公路部門(mén)多措并舉保暢通_當(dāng)前通訊
- 【世界快播報(bào)】2023年度家庭醫(yī)生簽約啟動(dòng) 市民可關(guān)注微信公眾號(hào)簽約
- 每日簡(jiǎn)訊:常斌、林瑞良任福建省副省長(zhǎng)
- 15G資費(fèi)不大降!三大運(yùn)營(yíng)商誰(shuí)提供的5G網(wǎng)速最快?中國(guó)信通院給出答案
- 2聯(lián)想拯救者Y70發(fā)布最新預(yù)告:售價(jià)2970元起 迄今最便宜的驍龍8+旗艦
- 3亞馬遜開(kāi)始大規(guī)模推廣掌紋支付技術(shù) 顧客可使用“揮手付”結(jié)賬
- 4現(xiàn)代和起亞上半年出口20萬(wàn)輛新能源汽車(chē)同比增長(zhǎng)30.6%
- 5如何讓居民5分鐘使用到各種設(shè)施?沙特“線(xiàn)性城市”來(lái)了
- 6AMD實(shí)現(xiàn)連續(xù)8個(gè)季度的增長(zhǎng) 季度營(yíng)收首次突破60億美元利潤(rùn)更是翻倍
- 7轉(zhuǎn)轉(zhuǎn)集團(tuán)發(fā)布2022年二季度手機(jī)行情報(bào)告:二手市場(chǎng)“飄香”
- 8充電寶100Wh等于多少毫安?鐵路旅客禁止、限制攜帶和托運(yùn)物品目錄
- 9好消息!京東與騰訊續(xù)簽三年戰(zhàn)略合作協(xié)議 加強(qiáng)技術(shù)創(chuàng)新與供應(yīng)鏈服務(wù)
- 10名創(chuàng)優(yōu)品擬通過(guò)香港IPO全球發(fā)售4100萬(wàn)股 全球發(fā)售所得款項(xiàng)有什么用處?