首先說一點,企業中最常用的實際上既不是RocketMQ,也不是Kafka,而是RabbitMQ。
RocketMQ很強大,但主要是阿里推廣自己的云產品而開源出來的一款消息隊列,其實中小企業用RocketMQ的沒有想象中那么多。
深層次的原因在于兔寶在中小企業普及更早,經受的考驗也更久,很容易產生「回頭客」,當初隨RabbitMQ成長的一批人才如今大部分都已成為企業中的中堅骨干,技術選型親睞RabbitMQ的幾率就更高。
至于Kafka,主要還是用在大數據和日志采集方面,除了一些公司有特定的需求會使用外,對消息收發準確率要求較高的公司依然是以RabbitMQ作為企業級消息隊列的首選。
工作這么多年我自身的感受是,RabbitMQ經久不衰,除非后續其他消息中間件有與眾不同的使用體驗,否則依然是RabbitMQ的占有率更高。
所以準備進入軟件行業的小伙伴,我建議有必要系統的先把RabbitMQ學好,然后再學習其他消息中間件擴展視野,他們的原理大同小異,是可以觸類旁通的。
RabbitMQ避免消息丟失的方法主要是利用消息確認機制和手動簽收機制,所以有必要把這兩個概念搞清楚。
主要是生產者使用的機制,用來確認消息是否被成功消費。
配置如下:
這樣,當你實現RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback這兩個接口的方法后,就可以針對性地進行消息確認的日志記錄,之后做進一步的消息發送補償,以達到接近100%投遞的目的。
偽代碼如下:
RabbitMQ的消息是自動簽收的,你可以理解為快遞簽收了,那么這個快遞的狀態就從發送變為已簽收,唯一的區別是快遞公司會對物流軌跡有記錄,而MQ簽收后就從隊列中刪除了。
企業級開發中,RabbitMQ我們基本都開啟手動簽收方式,這樣可以有效避免消息的丟失。
前文中已經在生產者開啟了手動簽收機制,那么作為消費方,也要設置手動簽收。
配置如下:
消費監聽時,手動簽收就一行代碼,偽代碼如下:
兩個概念搞清楚后,就可以來學習消息丟失的問題和處理方案了。
消息丟失的原因無非有三種:
1)、消息發出后,中途網絡故障,服務器沒收到;
2)、消息發出后,服務器收到了,還沒持久化,服務器宕機;
3)、消息發出后,服務器收到了,消費方還未處理業務邏輯,服務卻掛掉了,而消息也自動簽收,等于啥也沒干。
這三種情況,(1) 和 (2)是由于生產方未開啟消息確認機制導致,(3)是由于消費方未開啟手動簽收機制導致。
1)、生產方發送消息時,要try...catch,在catch中捕獲異常,并將MQ發送的關鍵內容記錄到日志表中,日志表中要有消息發送狀態,若發送失敗,由定時任務定期掃描重發并更新狀態;
2)、生產方publisher必須要加入確認回調機制,確認成功發送并簽收的消息,如果進入失敗回調方法,就修改數據庫消息的狀態,等待定時任務重發;
3)、消費方要開啟手動簽收ACK機制,消費成功才將消息移除,失敗或因異常情況而尚未處理,就重新入隊。
其實這就是前面闡述兩個概念時已經講過的內容,也是接近100%消息投遞的企業級方案之一,主要目的就是為了解決消息丟失的問題。
消息重復大體上有兩種情況會出現:
1)、消息消費成功,事務已提交,簽收時結果服務器宕機或網絡原因導致簽收失敗,消息狀態會由unack轉變為ready,重新發送給其他消費方;
2)、消息消費失敗,由于retry重試機制,重新入隊又將消息發送出去。
網上大體上能搜羅到的方法有三種:
1)、消費方業務接口做好冪等;
2)、消息日志表保存MQ發送時的唯一消息ID,消費方可以根據這個唯一ID進行判斷避免消息重復;
3)、消費方的Message對象有個getRedelivered()方法返回Boolean,為TRUE就表示重復發送過來的。
我這里只推薦第一種,業務方法冪等這是最直接有效的方式,(2)還要和數據庫產生交互,(3)有可能導致第一次消費失敗但第二次消費成功的情況被砍掉。
消息積壓出現的場景一般有兩種:
1)、消費方的服務掛掉,導致一直無法消費消息;
2)、消費方的服務節點太少,導致消費能力不足,從而出現積壓,這種情況極可能就是生產方的流量過大導致。
1)、既然消費能力不足,那就擴展更多消費節點,提升消費能力;
2)、建立專門的隊列消費服務,將消息批量取出并持久化,之后再慢慢消費。
(1)就是最直接的方式,也是消息積壓最常用的解決方案,但有些企業考慮到服務器成本壓力,會選擇第(2)種方案進行迂回,先通過一個獨立服務把要消費的消息存起來,比如存到數據庫,之后再慢慢處理這些消息即可。
這里單獨講一下本人在工作中使用RabbitMQ的一些心得,希望能有所幫助。
1)、消息丟失、消息重復、消息積壓三個問題中,實際上主要解決的還是消息丟失,因為大部分公司遇不到消息積壓的場景,而稍微有水準的公司核心業務都會解決冪等問題,所以幾乎不存在消息重復的可能;
2)、消息丟失的最常見企業級方案之一就是定時任務補償,因為不論是SOA還是微服務的架構,必然會有分布式任務調度的存在,自然也就成為MQ最直接的補償方式,如果MQ一定要實現100%投遞,這種是最普遍的方案。但我實際上不推薦中小企業使用該方案,因為憑空增加維護成本,而且沒有一定規模的項目完全沒必要,大家都小看了RabbitMQ本身的性能,比如我們公司,支撐一個三甲醫院,也就是三臺8核16G服務器的集群,上線至今3年毫無壓力;
3)、不要迷信網上和培訓機構講解的生產者消息確認機制,也就是前面兩個概念中講到的ConfirmCallback和ReturnCallback,這種機制十分降低MQ性能,我們團隊曾遇到過一次流量高峰期帶來的MQ傳輸及消費性能大幅降低的情況,后來發現是消息確認機制導致,關閉后立馬恢復正常,從此以后都不再使用這種機制,MQ運行十分順暢。同時我們會建立后臺管理實現人工補償,通過識別業務狀態判斷消費方是否處理了業務邏輯,畢竟這種情況都是少數,性能和運維成本,在這一塊我們選擇了性能;
4)、我工作這些年使用RabbitMQ沒見過自動簽收方式,一定是開啟手動簽收;
5)、手動簽收方式你在網上看到的教程幾乎都是處理完業務邏輯之后再手動簽收,但實際上這種用法是不科學的,在分布式的架構中,MQ用來解耦和轉發是非常常見的,如果是支付業務,往往在回調通知中通過MQ轉發到其他服務,其他服務如果業務處理不成功,那么手動簽收也不執行,這個消息又會入隊發給其他消費者,這樣就可能在流量洪峰階段因為偶然的業務處理失敗造成堵塞,甚至標題所講的三種問題同時出現,這樣就會得不償失。
不科學的用法:在處理完業務邏輯后再手動簽收,否則不簽收,就好比客人進店了你得買東西,否則不讓走。
科學的用法:不論業務邏輯是否處理成功,最終都要將消息手動簽收,MQ的使命不是保證客人進店了必須消費,不消費就不讓走,而是客人能進來就行,哪怕是隨便看看也算任務完成。
可能有人會問你這樣不是和自動簽收沒區別嗎,NO,你要知道如果自動簽收,出現消息丟失你連記錄日志的可能都沒有。
另外,為什么一定要這么做,因為MQ是中間件,本身就是輔助工具,就是一個滴滴司機,保證給你送到順便說個再見就行,沒必要還下車給你搬東西。
如果強加給MQ過多壓力,只會造成本身業務的畸形。我們使用MQ的目的就是解耦和轉發,不再做多余的事情,保證MQ本身是流暢的、職責單一的即可。
本篇主要講了RabbitMQ的三種常見問題及解決方案,同時分享了一些作者本人工作中使用的心得,我想網上是很難找到的,如果哪一天用到了,不妨再打開看看,也許能避免一些生產環境可能出現的問題。
我總結下來就是三點:
1)、消息100%投遞會增加運維成本,中小企業視情況使用,非必要不使用;
2)、消息確認機制影響性能,非必要不使用;
3)、消費者先保證消息能簽收,業務處理失敗可以人工補償。
工作中怕的永遠不是一個技術不會使用,而是遇到問題不知道有什么解決思路。
END
私信作者回復「資源」有更多驚喜內容哦!
消息隊列的優點:
沒有引入消息隊列:系統A發送數據給系統B,C;當有需求系統B不需要接受系統A的數據了,又要改系統A的代碼;或者系統D也新增需求,需要接收系統A的數據了,此時又要去改系統A的代碼,給系統D發送數據。頻繁地改代碼,系統間耦合度高。
引入消息隊列:系統A直接發布數據到消息隊列中間件,需要數據的系統直接訂閱MQ即可,不需要數據的不訂閱。系統之間沒有任何耦合度。
典型的就是秒殺系統,在短時間內大量的訪問會增加系統的壓力,導致系統崩潰。引入MQ后可以在系統之前加入緩沖,減少系統壓力。
用戶注冊后需要發送短信和郵件,此時可以使用MQ的異步消息。
應用1
應用2
ActiveMQ | RabbitMQ | RocketMQ | Kafka | |
性能(單臺) | 近萬級 | 萬級 | 十萬級 | 百萬級 |
消息持久化 | 支持 | 支持 | 支持 | 支持 |
多語言支持 | 支持 | 支持 | 很少 | 支持 |
社區活躍度 | 高 | 高 | 中 | 高 |
支持協議 | 多 (JMS,AMQP...) | 多( AMQP,STOMP,MQTT....) | 少 | 少 |
綜合評價 | 優點: 成熟,已經在很多公司得到應用。較多的文檔。各種協議支持較好,有多個語言的成熟客戶端。缺點:性能較弱。缺乏大規模吞吐的場景的應用有江河日下之感 | 優點:性能較好,管理界面較豐富,在互聯網公司也有較大規模的應用,有多個語言的成熟客戶端。缺點:內部機制很難了解,也意味很難定制和掌控。集群不支持動態擴用。 | 優點:模型簡單接口易用。在阿里有大規模應用分布式系統,性能很好,版本更新很快。缺點:文檔少支持的語言較少尚未主流 | 優點:天生分布式,性能最好,所以常見用于大數據領域。缺點:運維難度大有數據混亂的情況,對ZooKeeeper強依賴。多副本機制下對帶寬有定的要求。 |
開發語言:erlang
采用的是AMQP協議
默認端口:15672(http) 5672(amqp)
rabbitMQ的前臺控制頁面
交換機頁面
rabbitMQ的各個組成部分:
組成部分
消費流程:
生產者綁定到交換機(簡單工作隊列的交換機是默認的交換機)上,然后生產者通過channel信道發送消息是將消息發送到綁定到交換機上,交換機本身是不負責存儲消息的,它只是根據routingKey將消息轉發到指定的隊列中,由消費者監聽消費
常見的隊列標簽:
AD:自動刪除(消費者與隊列斷開連接后自動刪除)
D:隊列持久化
I:內部隊列
TTL:過期隊列
DLX:死信隊列
DLK:死信路由key
Lim:設置隊列的消息最大數量
rabbit的管理界面默認是五秒刷新一次,可以更改
簡單模式下也會有一個默認交換機,發送消息是交換機做的事
官方文檔上的圖
創建一個隊列
創建一個隊列
JAVA代碼演示:
/**
* simple模式生產者
*/
public class Producer {
private final static String HOST="10.211.55.4";
private final static String USERNAME="wangpeng";
private final static String PASSWORD="wangpeng";
//注意:使用docker啟動mq時,需要開放15672和5672兩個端口,頁面上使用的是15672(http)而代碼中使用的是5672(amqp)
private final static Integer PORT=5672;
private final static String QUEUE_NAME="helloQueue";
public static void main(String[] args) {
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setVirtualHost("/");
Connection connection=null;
Channel channel=null;
try {
connection=connectionFactory.newConnection();
//一個連接中可以包含多個通道。channel是邏輯通道;連接耗時耗費資源
channel=connection.createChannel();
//durable:是否持久化,服務器重啟隊列是否還存在,設置為true,則隊列會持久化,但是里面的消息不會持久化,mq重啟,消息會丟失
//exclusive:是否排他
//autoDelete:隊列消費完是否自動刪除,即對列中沒有消息是否要刪除
//arguments:其他參數
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message="helloSimpleQueue";
//第三個是額外參數//arguments:其他參數,設置為MessageProperties.PERSISTENT_TEXT_PLAIN則表示這條消息會持久化隊列中
//沒有交換機就寫空就可以
channel.basicPublish("", message, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println("消息發送成功");
} catch (Exception e) {
e.getMessage();
} finally {
//關閉連接
...
}
}
}
/**
* simple模式消費者
*/
public class Consumer {
//服務端沒有隊列,消費會報錯!!
private final static String HOST="10.211.55.4";
private final static String USERNAME="wangpeng";
private final static String PASSWORD="wangpeng";
//注意:使用docker啟動mq時,需要開放15672和5672兩個端口,頁面上使用的是15672而代碼中使用的是5672
private final static Integer PORT=5672;
private final static String QUEUE_NAME="helloQueue";
public static void main(String[] args) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setVirtualHost("/");
Connection connection=null;
Channel channel=null;
try {
connection=connectionFactory.newConnection();
channel=connection.createChannel();
channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {
@Override
//消息消費成功回調
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("接收消息成功:" + new String(message.getBody()));
}
}, new CancelCallback() {
@Override
//消息消費失敗回調
public void handle(String consumerTag) throws IOException {
System.out.println("消息接收失敗:" + consumerTag);
}
});
} catch (Exception e) {
e.getMessage();
} finally {
//關閉連接
...
}
}
}
輪詢:每個隊列收到的消息數量一致(99/3),輪詢模式下,消息會被均分給每個消費者,不論消費者處理時間快慢
公平分發:根據隊列的能力分發,誰快給誰
官方文檔上的圖
這種模式下需要將隊列綁定到交換機上,但是無routingkey,我們可以自己在頁面上創建一個指定模式的交換機
官方文檔上的圖
創建交換機:
創建交換機
將隊列綁定交換機:
將隊列綁定交換機
JAVA代碼演示:
/**
* fanout模式生產者
*/
public class Producer {
private final static String HOST="10.211.55.7";
private final static String USERNAME="wangpeng";
private final static String PASSWORD="wangpeng";
//注意:使用docker啟動mq時,需要開放15672和5672兩個端口,頁面上使用的是15672(http)而代碼中使用的是5672(amqp)
private final static Integer PORT=5672;
private final static String QUEUE_NAME="helloQueue";
private final static String EXCHANGE_NAME="fanout-Test";
public static void main(String[] args) {
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setVirtualHost("/");
Connection connection=null;
Channel channel=null;
try {
connection=connectionFactory.newConnection("helloProducerConnection");
//一個連接中可以包含多個通道。channel是邏輯通道;連接耗時耗費資源
channel=connection.createChannel();
//durable:是否持久化,服務器重啟是否還存在
//exclusive:是否排他
//autoDelete:消費完是否自動刪除
//arguments:其他參數
//可以聲明多個隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueDeclare(QUEUE_NAME + "1", false, false, false, null);
channel.queueDeclare(QUEUE_NAME + "2", false, false, false, null);
//聲明交換機fanout模式,routingKey必須為空字符串而不是null
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//將隊列綁定到交換機上
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
channel.queueBind(QUEUE_NAME + "1", EXCHANGE_NAME, "");
channel.queueBind(QUEUE_NAME + "2", EXCHANGE_NAME, "");
String message="777";
//發布消息
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("消息發送成功");
} catch (Exception e) {
e.getStackTrace();
} finally {
}
}
}
/**
* fanout模式消費者
*/
public class Consumer {
//服務端沒有隊列,消費會報錯!!
private final static String HOST="10.211.55.7";
private final static String USERNAME="wangpeng";
private final static String PASSWORD="wangpeng";
//注意:使用docker啟動mq時,需要開放15672和5672兩個端口,頁面上使用的是15672而代碼中使用的是5672
private final static Integer PORT=5672;
private final static String QUEUE_NAME="helloQueue";
private final static String EXCHANGE_NAME="fanout-Test";
public static void main(String[] args) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setVirtualHost("/");
Connection connection=null;
Channel channel=null;
try {
connection=connectionFactory.newConnection("helloProducerConnection");
channel=connection.createChannel();
//將信道與交換機綁定
channel.exchangeBind(EXCHANGE_NAME, EXCHANGE_NAME, "");
//指定隊列接收消息
channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("接收消息成功:" + new String(message.getBody()));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("消息接收失敗:" + consumerTag);
}
});
channel.basicConsume(QUEUE_NAME + 1, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("接收消息成功:" + new String(message.getBody()));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("消息接收失敗:" + consumerTag);
}
});
channel.basicConsume(QUEUE_NAME + 2, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("接收消息成功:" + new String(message.getBody()));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("消息接收失敗:" + consumerTag);
}
});
System.in.read();
} catch (Exception e) {
e.getMessage();
} finally {
}
}
}
既要綁定到交換機,又要設置routingkey,會根據routingkey分發到不同的隊列中
官方文檔上的圖
綁定交換機,與路由模式不同的是,routingkey支持模糊匹配(特殊字符 * 與 # ,用于做模糊匹配,其中 * 用于匹配一個單詞, #用于匹配多個單詞(可以是零個))
官方文檔上的圖
JAVA代碼演示:
/**
* topic模式生產者
*/
public class Producer {
private final static String HOST="10.211.55.7";
private final static String USERNAME="wangpeng";
private final static String PASSWORD="wangpeng";
//注意:使用docker啟動mq時,需要開放15672和5672兩個端口,頁面上使用的是15672(http)而代碼中使用的是5672(amqp)
private final static Integer PORT=5672;
private final static String QUEUE_NAME="helloQueue";
private final static String EXCHANGE_NAME="topic-Test";
public static void main(String[] args) {
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setVirtualHost("/");
Connection connection=null;
Channel channel=null;
try {
connection=connectionFactory.newConnection("helloProducerConnection");
//一個連接中可以包含多個通道。channel是邏輯通道;連接耗時耗費資源
channel=connection.createChannel();
//durable:是否持久化,服務器重啟是否還存在
//exclusive:是否排他
//autoDelete:消費完是否自動刪除
//arguments:其他參數
//可以聲明多個隊列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
channel.queueDeclare(QUEUE_NAME + "1", false, false, false, null);
channel.queueDeclare(QUEUE_NAME + "2", false, false, false, null);
//聲明交換機direct模式
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "topic.*");
channel.queueBind(QUEUE_NAME + "1", EXCHANGE_NAME, "topic.#");
channel.queueBind(QUEUE_NAME + "2", EXCHANGE_NAME, "*.111");
String message="topic is good";
//只給routingKey=directs的發布消息
channel.basicPublish(EXCHANGE_NAME, "topic.111.222", null, message.getBytes());
System.out.println("消息發送成功");
} catch (Exception e) {
e.getStackTrace();
} finally {
}
}
}
/**
* topic模式消費者
*/
public class Consumer {
//服務端沒有隊列,消費會報錯!!
private final static String HOST="10.211.55.7";
private final static String USERNAME="wangpeng";
private final static String PASSWORD="wangpeng";
//注意:使用docker啟動mq時,需要開放15672和5672兩個端口,頁面上使用的是15672而代碼中使用的是5672
private final static Integer PORT=5672;
private final static String QUEUE_NAME="helloQueue";
private final static String EXCHANGE_NAME="direct-Test";
public static void main(String[] args) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException {
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost(HOST);
connectionFactory.setPort(PORT);
connectionFactory.setUsername(USERNAME);
connectionFactory.setPassword(PASSWORD);
connectionFactory.setVirtualHost("/");
Connection connection=null;
Channel channel=null;
try {
connection=connectionFactory.newConnection("helloProducerConnection");
channel=connection.createChannel();
//交換機發的消息的routingKey是啥這里就寫啥,這里寫不寫都無所謂
//channel.exchangeBind(EXCHANGE_NAME,EXCHANGE_NAME,"topic.111.222");
channel.basicConsume(QUEUE_NAME, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("接收消息成功:" + new String(message.getBody()));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("消息接收失敗:" + consumerTag);
}
});
channel.basicConsume(QUEUE_NAME + 1, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("接收消息成功:" + new String(message.getBody()));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("消息接收失敗:" + consumerTag);
}
});
channel.basicConsume(QUEUE_NAME + 2, true, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("接收消息成功:" + new String(message.getBody()));
}
}, new CancelCallback() {
@Override
public void handle(String consumerTag) throws IOException {
System.out.println("消息接收失敗:" + consumerTag);
}
});
} catch (Exception e) {
e.getMessage();
} finally {
}
}
}
<!-- spring整合rabbitmq依賴-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
//定義一個配置類用于讀取yml中的自定義配置
@ConfigurationProperties(prefix="spring.rabbitmq")
public class ConnectionFactoryConfig {
private String username;
private String password;
private String host;
private Integer port;
private String virtualHost;
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username=username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password=password;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host=host;
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port=port;
}
public String getVirtualHost() {
return virtualHost;
}
public void setVirtualHost(String virtualHost) {
this.virtualHost=virtualHost;
}
@Override
public String toString() {
return "ConnectionFactoryConfig{" +
"username='" + username + '\'' +
", password='" + password + '\'' +
", host='" + host + '\'' +
", port=" + port +
", virtualHost='" + virtualHost + '\'' +
'}';
}
}
//配置mq的連接信息
@Configuration
//這里可以使用此注解將@ConfigurationProperties注解加入IOC容器,也可以直接在@ConfigurationProperties注解修飾的類上,加@Component注解加入spring的IOC容器
@EnableConfigurationProperties({ConnectionFactoryConfig.class})
public class RabbitMqConfig {
@Autowired
public ConnectionFactoryConfig config;
@Autowired
public RabbitTemplate rabbitTemplate;
//自己定義了一個連接工廠學習,日常中我們可以不定義,直接在yml中設置值就行——>spring.rabbitmq.username:
@Bean("connectionFactory")
public CachingConnectionFactory createConnectionFactory() {
CachingConnectionFactory cachingConnectionFactory=new CachingConnectionFactory();
cachingConnectionFactory.setHost(config.getHost());
cachingConnectionFactory.setPort(config.getPort());
cachingConnectionFactory.setUsername(config.getUsername());
cachingConnectionFactory.setPassword(config.getPassword());
cachingConnectionFactory.setVirtualHost(config.getVirtualHost());
//消息到達交換機是否確認
//或者在配置文件中配置publisher-confirms 為true
cachingConnectionFactory.setPublisherConfirms(true);
//消息到達隊列是否確認
//或者在配置文件中配置publisher-returns 為true
cachingConnectionFactory.setPublisherReturns(true);
return cachingConnectionFactory;
}
//封裝了對mq的管理操作
@Bean
public RabbitAdmin rabbitAdmin() {
return new RabbitAdmin(createConnectionFactory());
@PostConstruct
public void init() {
//設置消息未到達隊列是否回調setReturnCallback接口
//使用return-callback時必須設置mandatory為true,或者在配置中設置mandatory-expression的值為true
rabbitTemplate.setMandatory(true);
//消息到達交換機是否確認
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("消息是否到達交換機:" + ack);
}
});
//如果exchange到queue成功,則不回調return;如果exchange到queue失敗,則回調return(需設置mandatory=true,否則不回回調,消息就丟了)
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息是否沒有到達隊列..." + JSON.toJSONString(message));
}
});
}
}
//創建相關隊列和交換機并綁定隊列
//聲明消費者監聽相關隊列
public class RabbitMqTopicByRabbitAdminConfig {
//該類封裝了對mq的管理操作
private RabbitAdmin rabbitAdmin;
@Autowired
public void setRabbitAdmin(RabbitAdmin rabbitAdmin) {
this.rabbitAdmin=rabbitAdmin;
}
@Autowired
private CachingConnectionFactory connectionFactory;
@PostConstruct
public void init() {
rabbitAdmin.declareQueue(new Queue("rabbitAdminQueue", false, false, false));
rabbitAdmin.declareQueue(new Queue("rabbitAdminQueueAnnotation", false, false, false));
rabbitAdmin.declareExchange(new TopicExchange("rabbitAdminExchange", false, false));
rabbitAdmin.declareBinding(new Binding("rabbitAdminQueue", Binding.DestinationType.QUEUE, "rabbitAdminExchange", "*.queue.#", null));
rabbitAdmin.declareBinding(new Binding("rabbitAdminQueueAnnotation", Binding.DestinationType.QUEUE, "rabbitAdminExchange", "*.queueAnnotation.#", null));
}
//使用代碼配置的方式綁定消費者,也可以使用@RabbitListener(queues={"rabbitAdminQueue"})
@Bean
public SimpleMessageListenerContainer adminMessageContainer() {
SimpleMessageListenerContainer container=new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("rabbitAdminQueue");
container.setMessageListener(SpringUtil.getBean("topicExchangeAdminListener", TopicExchangeAdminListener.class));
//設置手動ack,消費者收到消息是否回執
//也可以設置在配置文件中:spring.rabbitmq.listener.simple.acknowledge-mode=manual
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//設置ack手動回執之后,保證消費者每次只拿到一條消息
container.setPrefetchCount(1);
return container;
}
}
/**
* @Author:wangpeng
* @Description:監聽相關隊列的消費者,設置手動回執ack后,如何給隊列回執ack
**/
@Component("topicExchangeAdminListener")
//當使用ChannelAwareMessageListener時,代表需要手動設置ack回執,在SimpleMessageListenerContainer中設置了手動回執
public class TopicExchangeAdminListener implements ChannelAwareMessageListener {
@Override
public void onMessage(Message message, Channel channel) throws Exception {
//表明消費者確認收到當前消息,第二個參數表示一次是否 ack 多條消息
String s=new String(message.getBody());
if (s.equals("666")) {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} else if (s.equals("777")) {
//第二個參數表示一次是否拒絕多條消息 第三個參數表示是否把當前消息從新入隊(false的話,會直接丟掉)
//同一個channel deliveryTag不一致
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} else if (s.equals("888")) {
//表明消費者拒絕當前消息,第二個參數表示是否把當前消息從新入隊
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
} else {
throw new RuntimeException("不消費!!");
}
}
}
以上是關于rabbitMQ的一些常規基礎操作,后面還會有一些高級操作,像死信隊列、延遲隊列、如何保證順序消費等