
一、簡(jiǎn)單的發(fā)送與接收消息 HelloWorld
1. 發(fā)送消息
發(fā)送消息首先要獲取與rabbitmq-server的連接,然后從渠道(chann)中指定的queue發(fā)送消息 , 不能定義兩個(gè)queue名字相同,但屬性不同
示例:
package com.zf.rabbitmq01;
import JAVA.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 發(fā)送消息
* @author zhanghuan
*
*/
public class Sender01 {
public static void main(String[] args) throws IOException {
ConnectionFactory connFac = new ConnectionFactory() ;
//RabbitMQ-Server安裝在本機(jī),所以直接用127.0.0.1
connFac.setHost("127.0.0.1");
//創(chuàng)建一個(gè)連接
Connection conn = connFac.newConnection() ;
//創(chuàng)建一個(gè)渠道
Channel channel = conn.createChannel() ;
//定義Queue名稱(chēng)
String queueName = "queue01" ;
//為channel定義queue的屬性,queueName為Queue名稱(chēng)
channel.queueDeclare( queueName , false, false, false, null) ;
String msg = "Hello World!";
//發(fā)送消息
channel.basicPublish("", queueName , null , msg.getBytes());
System.out.println("send message[" + msg + "] to "+ queueName +" success!");
channel.close();
conn.close();
}
}
package com.zf.rabbitmq01;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
* 接收消息
* @author zhanghuan
*
*/
public class Recv01 {
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory connFac = new ConnectionFactory() ;
connFac.setHost("127.0.0.1");
Connection conn = connFac.newConnection() ;
Channel channel = conn.createChannel() ;
String queueName = "queue01";
channel.queueDeclare(queueName, false, false, false, null) ;
//上面的部分,與Sender01是一樣的
//配置好獲取消息的方式
QueueingConsumer consumer = new QueueingConsumer(channel) ;
channel.basicConsume(queueName, true, consumer) ;
//循環(huán)獲取消息
while(true){
//獲取消息,如果沒(méi)有消息,這一步將會(huì)一直阻塞
Delivery delivery = consumer.nextDelivery() ;
String msg = new String(delivery.getBody()) ;
System.out.println("received message[" + msg + "] from " + queueName);
}
}
}

二、消息確認(rèn)與公平調(diào)度消費(fèi)者
從本節(jié)開(kāi)始稱(chēng)Sender為生產(chǎn)者 , Recv為消費(fèi)者
1. 消息確認(rèn)
為了確保消息一定被消費(fèi)者處理,rabbitMQ提供了消息確認(rèn)功能,就是在消費(fèi)者處理完任務(wù)之后,就給服務(wù)器一個(gè)回饋,服務(wù)器就會(huì)將該消息刪除,如果消費(fèi)者超時(shí)不回饋,那么服務(wù)器將就將該消息重新發(fā)送給其他消費(fèi)者默認(rèn)是開(kāi)啟的,在消費(fèi)者端通過(guò)下面的方式開(kāi)啟消息確認(rèn), 首先將autoAck自動(dòng)確認(rèn)關(guān)閉,等我們的任務(wù)執(zhí)行完成之后,手動(dòng)的去確認(rèn),類(lèi)似JDBC的autocommit一樣
QueueingConsumer consumer = new QueueingConsumer(channel);
Boolean autoAck = false;
channel.basicConsume("hello", autoAck, consumer);
在前面的例子中使用的是channel.basicConsume(channelName, true, consumer) ; 在接收到消息后,就會(huì)自動(dòng)反饋一個(gè)消息給服務(wù)器。
下面這個(gè)例子來(lái)測(cè)試消息確認(rèn)的功能。
package com.zf.rabbitmq03;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 發(fā)送消息
* @author zhanghuan
*
*/
public class Sender03 {
public static void main(String[] args) throws IOException {
ConnectionFactory connFac = new ConnectionFactory() ;
//RabbitMQ-Server安裝在本機(jī),所以直接用127.0.0.1
connFac.setHost("127.0.0.1");
//創(chuàng)建一個(gè)連接
Connection conn = connFac.newConnection() ;
//創(chuàng)建一個(gè)渠道
Channel channel = conn.createChannel() ;
//定義Queue名稱(chēng)
String queueName = "queue01" ;
//為channel定義queue的屬性,queueName為Queue名稱(chēng)
channel.queueDeclare( queueName , false, false, false, null) ;
String msg = "Hello World!";
//發(fā)送消息
channel.basicPublish("", queueName , null , msg.getBytes());
System.out.println("send message[" + msg + "] to "+ queueName +" success!");
channel.close();
conn.close();
}
}
與Sender01.java一樣,沒(méi)有什么區(qū)別。
package com.zf.rabbitmq03;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
* 接收消息
* @author zhanghuan
*
*/
public class Recv03 {
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory connFac = new ConnectionFactory() ;
connFac.setHost("127.0.0.1");
Connection conn = connFac.newConnection() ;
Channel channel = conn.createChannel() ;
String channelName = "channel01";
channel.queueDeclare(channelName, false, false, false, null) ;
//配置好獲取消息的方式
QueueingConsumer consumer = new QueueingConsumer(channel) ;
//取消 autoAck
Boolean autoAck = false ;
channel.basicConsume(channelName, autoAck, consumer) ;
//循環(huán)獲取消息
while(true){
//獲取消息,如果沒(méi)有消息,這一步將會(huì)一直阻塞
Delivery delivery = consumer.nextDelivery() ;
String msg = new String(delivery.getBody()) ;
//確認(rèn)消息,已經(jīng)收到
channel.basicAck(delivery.getEnvelope().getDeliveryTag()
, false);
System.out.println("received message[" + msg + "] from " + channelName);
}
}
}
注意:一旦將autoAck關(guān)閉之后,一定要記得處理完消息之后,向服務(wù)器確認(rèn)消息。否則服務(wù)器將會(huì)一直轉(zhuǎn)發(fā)該消息如果將上面的channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);注釋掉, Sender03.java只需要運(yùn)行一次 , Recv03.java每次運(yùn)行將都會(huì)收到HelloWorld消息
注意:但是這樣還是不夠的,如果rabbitMQ-Server突然掛掉了,那么還沒(méi)有被讀取的消息還是會(huì)丟失 ,所以我們可以讓消息持久化。 只需要在定義Queue時(shí),設(shè)置持久化消息就可以了,方法如下:
boolean durable = true;channel.queueDeclare(channelName, durable, false, false, null);
這樣設(shè)置之后,服務(wù)器收到消息后就會(huì)立刻將消息寫(xiě)入到硬盤(pán),就可以防止突然服務(wù)器掛掉,而引起的數(shù)據(jù)丟失了。 但是服務(wù)器如果剛收到消息,還沒(méi)來(lái)得及寫(xiě)入到硬盤(pán),就掛掉了,這樣還是無(wú)法避免消息的丟失。

2. 公平調(diào)度
上一個(gè)例子能夠?qū)崿F(xiàn)發(fā)送一個(gè)Message與接收一個(gè)Message
從上一個(gè)Recv01中可以看出,必須處理完一個(gè)消息,才會(huì)去接收下一個(gè)消息。如果生產(chǎn)者眾多,那么一個(gè)消費(fèi)者肯定是忙不過(guò)來(lái)的。此時(shí)就可以用多個(gè)消費(fèi)者來(lái)對(duì)同一個(gè)Channel的消息進(jìn)行處理,并且要公平的分配任務(wù)給多個(gè)消費(fèi)者。不能部分很忙,部分總是空閑
實(shí)現(xiàn)公平調(diào)度的方式就是讓每個(gè)消費(fèi)者在同一時(shí)刻會(huì)分配一個(gè)任務(wù)。 通過(guò)channel.basicQos(1);可以設(shè)置
列如:
當(dāng)有多個(gè)消費(fèi)者同時(shí)收取消息,且每個(gè)消費(fèi)者在接收消息的同時(shí),還要做其它的事情,且會(huì)消耗很長(zhǎng)的時(shí)間,在此過(guò)程中可能會(huì)出現(xiàn)一些意外,比如消息接收到一半的時(shí)候,一個(gè)消費(fèi)者宕掉了,這時(shí)候就要使用消息接收確認(rèn)機(jī)制,可以讓其它的消費(fèi)者再次執(zhí)行剛才宕掉的消費(fèi)者沒(méi)有完成的事情。另外,在默認(rèn)情況下,我們創(chuàng)建的消息隊(duì)列以及存放在隊(duì)列里面的消息,都是非持久化的,也就是說(shuō)當(dāng)RabbitMQ宕掉了或者是重啟了,創(chuàng)建的消息隊(duì)列以及消息都不會(huì)保存,為了解決這種情況,保證消息傳輸?shù)目煽啃?,我們可以使用RabbitMQ提供的消息隊(duì)列的持久化機(jī)制。

生產(chǎn)者:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
public class ClientSend1 {
public static final String queue_name="my_queue";
public static final Boolean durable=true;
//消息隊(duì)列持久化
public static void main(String[] args)
throws java.io.IOException{
ConnectionFactory factory=new ConnectionFactory();
//創(chuàng)建連接工廠(chǎng)
factory.setHost("localhost");
factory.setVirtualHost("my_mq");
factory.setUsername("zhxia");
factory.setPassword("123456");
Connection connection=factory.newConnection();
//創(chuàng)建連接
Channel channel=connection.createChannel();
//創(chuàng)建信道
channel.queueDeclare(queue_name, durable, false, false, null);
//聲明消息隊(duì)列,且為可持久化的
String message="Hello world"+Math.random();
//將隊(duì)列設(shè)置為持久化之后,還需要將消息也設(shè)為可持久化的,MessageProperties.PERSISTENT_TEXT_PLAIN
channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
System.out.println("Send message:"+message);
channel.close();
connection.close();
}
}
說(shuō)明:行17 和行20 需要同時(shí)設(shè)置,也就是將隊(duì)列設(shè)置為持久化之后,還需要將發(fā)送的消息也要設(shè)置為持久化才能保證隊(duì)列和消息一直存在
消費(fèi)者:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class ClientReceive1 {
public static final String queue_name="my_queue";
public static final Boolean autoAck=false;
public static final Boolean durable=true;
public static void main(String[] args)
throws java.io.IOException,java.lang.InterruptedException{
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("my_mq");
factory.setUsername("zhxia");
factory.setPassword("123456");
Connection connection=factory.newConnection();
Channel channel=connection.createChannel();
channel.queueDeclare(queue_name, durable, false, false, null);
System.out.println("Wait for message");
channel.basicQos(1);
//消息分發(fā)處理
QueueingConsumer consumer=new QueueingConsumer(channel);
channel.basicConsume(queue_name, autoAck, consumer);
while(true){
Thread.sleep(500);
QueueingConsumer.Delivery deliver=consumer.nextDelivery();
String message=new String(deliver.getBody());
System.out.println("Message received:"+message);
channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
}
}
}
說(shuō)明:行22: 設(shè)置RabbitMQ調(diào)度分發(fā)消息的方式,也就是告訴RabbitMQ每次只給消費(fèi)者處理一條消息,也就是等待消費(fèi)者處理完并且已經(jīng)對(duì)剛才處理的消息進(jìn)行確認(rèn)之后, 才發(fā)送下一條消息,防止消費(fèi)者太過(guò)于忙碌。如下圖所示:

三、發(fā)布/訂閱消息
前面都是一條消息只會(huì)被一個(gè)消費(fèi)者處理。
如果要每個(gè)消費(fèi)者都處理同一個(gè)消息,rabbitMq也提供了相應(yīng)的方法。
在以前的程序中,不管是生產(chǎn)者端還是消費(fèi)者端都必須知道一個(gè)指定的QueueName才能發(fā)送、獲取消息。 而rabbitMQ消息模型的核心思想是生產(chǎn)者不會(huì)將消息直接發(fā)送給隊(duì)列。
因?yàn)?,生產(chǎn)者通常不會(huì)知道消息將會(huì)被哪些消費(fèi)者接收。
生產(chǎn)者的消息雖然不是直接發(fā)送給Queue,但是消息會(huì)交給Exchange,所以需要定義Exchange的消息分發(fā)模式 ,之前的程序中,有如下一行代碼:
channel.basicPublish("", queueName , null , msg.getBytes());
第一個(gè)參數(shù)為空字符串,其實(shí)第一個(gè)參數(shù)就是ExchangeName,這里用空字符串,就表示消息會(huì)交給默認(rèn)的Exchange。
下面我們將自己定義Exchange的屬性。
package com.zf.rabbitmq04;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 發(fā)送消息
* @author zhanghuan
*
*/
public class Sender04 {
public static void main(String[] args) throws IOException {
ConnectionFactory connFac = new ConnectionFactory() ;
//RabbitMQ-Server安裝在本機(jī),所以直接用127.0.0.1
connFac.setHost("127.0.0.1");
//創(chuàng)建一個(gè)連接
Connection conn = connFac.newConnection() ;
//創(chuàng)建一個(gè)渠道
Channel channel = conn.createChannel() ;
//定義ExchangeName,第二個(gè)參數(shù)是Exchange的類(lèi)型,fanout表示消息將會(huì)分列發(fā)送給多賬戶(hù)
String exchangeName = "news" ;
channel.exchangeDeclare(exchangeName, "fanout") ;
String msg = "Hello World!";
//發(fā)送消息,這里與前面的不同,這里第一個(gè)參數(shù)不再是字符串,而是ExchangeName ,第二個(gè)參數(shù)也不再是queueName,而是空字符串
channel.basicPublish( exchangeName , "" , null , msg.getBytes());
System.out.println("send message[" + msg + "] to exchange "+ exchangeName +" success!");
channel.close();
conn.close();
}
}
Send04.java 發(fā)送消息時(shí)沒(méi)有指定的queueName 用的空字符串代替的。 Exchange的類(lèi)型有direct, topic, headers 、 fanout四種,上面用的是fanout類(lèi)型
package com.zf.rabbitmq04;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
* 接收消息
* @author zhanghuan
*
*/
public class Recv04_01 {
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory connFac = new ConnectionFactory() ;
connFac.setHost("127.0.0.1");
Connection conn = connFac.newConnection() ;
Channel channel = conn.createChannel() ;
String exchangeName = "news" ;
channel.exchangeDeclare(exchangeName, "fanout") ;
//這里使用沒(méi)有參數(shù)的queueDeclare方法創(chuàng)建Queue并獲取QueueName
String queueName = channel.queueDeclare().getQueue() ;
//將queue綁定到Exchange中
channel.queueBind( queueName, exchangeName, "") ;
//配置好獲取消息的方式
QueueingConsumer consumer = new QueueingConsumer(channel) ;
channel.basicConsume(queueName, true, consumer) ;
//循環(huán)獲取消息
while(true){
//獲取消息,如果沒(méi)有消息,這一步將會(huì)一直阻塞
Delivery delivery = consumer.nextDelivery() ;
String msg = new String(delivery.getBody()) ;
System.out.println("received message[" + msg + "] from " + queueName);
}
}
}
Recv04_01.java 使用channel.queueDeclare()方法創(chuàng)建了一個(gè)Queue,該Queue有系統(tǒng)創(chuàng)建,并分配了一個(gè)隨機(jī)的名稱(chēng)。 然后將該Queue與與Exchange綁定在一起。 該Queue就能從Exchange中后去消息了。
測(cè)試
將Recv04_01.java 文件復(fù)制幾份 Recv04_02.java Recv04_03.java,然后執(zhí)行Recv04_01 與 Recv04_02,接下來(lái)執(zhí)行Sender04發(fā)送消息,可以看到Recv04_01 與Recv04_02都接收到了消息。然后執(zhí)行Recv04_03,沒(méi)有獲取到任何消息。接下來(lái)再執(zhí)行Sender04發(fā)送消息,可以看到Recv04_01 、Recv04_02與Recv04_03都接收到了消息。
說(shuō)明Exchange在收到生產(chǎn)者的消息后,會(huì)將消息發(fā)送給當(dāng)前已經(jīng)與它綁定了的所有Queue 。 然后被移除。

四、消息路由
生產(chǎn)者會(huì)生產(chǎn)出很多消息 , 但是不同的消費(fèi)者可能會(huì)有不同的需求,只需要接收指定的消息,其他的消息需要被過(guò)濾掉。 這時(shí)候就可以對(duì)消息進(jìn)行過(guò)濾了。 在消費(fèi)者端設(shè)置好需要接收的消息類(lèi)型。
如果不使用默認(rèn)的Exchange發(fā)送消息,而是使用我們自定定義的Exchange發(fā)送消息,那么下面這個(gè)方法的第二個(gè)參數(shù)就不是QueueName了,而是消息的類(lèi)型。
channel.basicPublish( exchangeName , messageType , null , msg.getBytes());
示例:
package com.zf.rabbitmq05;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 發(fā)送消息
* @author zhanghuan
*
*/
public class Sender05 {
public static void main(String[] args) throws IOException {
ConnectionFactory connFac = new ConnectionFactory() ;
//RabbitMQ-Server安裝在本機(jī),所以直接用127.0.0.1
connFac.setHost("127.0.0.1");
//創(chuàng)建一個(gè)連接
Connection conn = connFac.newConnection() ;
//創(chuàng)建一個(gè)渠道
Channel channel = conn.createChannel() ;
String exchangeName = "exchange02";
String messageType = "type01";
channel.exchangeDeclare(exchangeName, "direct") ;
//定義Queue名
String msg = "Hello World!";
//發(fā)送消息
channel.basicPublish( exchangeName , messageType , null , msg.getBytes());
System.out.println("send message[" + msg + "] to "+ exchangeName +" success!");
channel.close();
conn.close();
}
}
package com.zf.rabbitmq05;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
* 接收消息
* @author zhanghuan
*
*/
public class Recv05_01 {
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory connFac = new ConnectionFactory() ;
connFac.setHost("127.0.0.1");
Connection conn = connFac.newConnection() ;
Channel channel = conn.createChannel() ;
String exchangeName = "exchange02";
channel.exchangeDeclare(exchangeName, "direct") ;
String queueName = channel.queueDeclare().getQueue() ;
//第三個(gè)參數(shù)就是type,這里表示只接收type01類(lèi)型的消息。
channel.queueBind(queueName, exchangeName, "type01") ;
//也可以選擇接收多種類(lèi)型的消息。只需要再下面再綁定一次就可以了
channel.queueBind(queueName, exchangeName, "type02") ;
//配置好獲取消息的方式
QueueingConsumer consumer = new QueueingConsumer(channel) ;
channel.basicConsume(queueName, true, consumer) ;
//循環(huán)獲取消息
while(true){
//獲取消息,如果沒(méi)有消息,這一步將會(huì)一直阻塞
Delivery delivery = consumer.nextDelivery() ;
String msg = new String(delivery.getBody()) ;
System.out.println("received message[" + msg + "] from " + exchangeName);
}
}
}
這時(shí),啟動(dòng)Recv05_01.java 然后啟動(dòng)Sender05.java ,消費(fèi)者端就會(huì)收到消息。然后將Sender05.java 中的messageType分別改為type02 type03 然后發(fā)送消息 , 可以看到消費(fèi)者端能接收到type02的消息,但是不能接收到type03的消息。

五、Topic類(lèi)型消息
上一節(jié)中使用了消息路由,消費(fèi)者可以選擇性的接收消息。 但是這樣還是不夠靈活。
比如某個(gè)消費(fèi)者要訂閱娛樂(lè)新聞消息 。 包括新浪、網(wǎng)易、騰訊的娛樂(lè)新聞。那么消費(fèi)者就需要綁定三次,分別綁定這三個(gè)網(wǎng)站的消息類(lèi)型。 如果新聞門(mén)戶(hù)更多了,那么消費(fèi)者將要綁定個(gè)更多的消息類(lèi)型, 其實(shí)消費(fèi)者只是需要訂閱娛樂(lè)新聞,不管是哪個(gè)網(wǎng)站的新聞,都需要。 那么在rabbitMQ中可以使用topic類(lèi)型。 模糊匹配消息類(lèi)型。
模糊匹配中的 *代表一個(gè) #代表零個(gè)或1個(gè)
示例:
package com.zf.rabbitmq06;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
* 接收消息
* @author zhanghuan
*
*/
public class Recv06_01 {
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory connFac = new ConnectionFactory() ;
connFac.setHost("127.0.0.1");
Connection conn = connFac.newConnection() ;
Channel channel = conn.createChannel() ;
String exchangeName = "exchange03";
channel.exchangeDeclare(exchangeName, "topic") ;
String queueName = channel.queueDeclare().getQueue() ;
//第三個(gè)參數(shù)就是type,這里表示只接收type01類(lèi)型的消息。
channel.queueBind(queueName, exchangeName, "#.type01") ;
//配置好獲取消息的方式
QueueingConsumer consumer = new QueueingConsumer(channel) ;
channel.basicConsume(queueName, true, consumer) ;
//循環(huán)獲取消息
while(true){
//獲取消息,如果沒(méi)有消息,這一步將會(huì)一直阻塞
Delivery delivery = consumer.nextDelivery() ;
String msg = new String(delivery.getBody()) ;
System.out.println("received message[" + msg + "] from " + exchangeName);
}
}
}
package com.zf.rabbitmq06;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 發(fā)送消息
* @author zhanghuan *
*/
public class Sender06 {
public static void main(String[] args) throws IOException {
ConnectionFactory connFac = new ConnectionFactory() ;
//RabbitMQ-Server安裝在本機(jī),所以直接用127.0.0.1
connFac.setHost("127.0.0.1");
//創(chuàng)建一個(gè)連接
Connection conn = connFac.newConnection() ;
//創(chuàng)建一個(gè)渠道
Channel channel = conn.createChannel() ;
String exchangeName = "exchange03";
String messageType = "fs.type01";
channel.exchangeDeclare(exchangeName, "topic") ;
//定義Queue名
String msg = "Hello World!";
//發(fā)送消息
channel.basicPublish( exchangeName , messageType , null , msg.getBytes());
System.out.println("send message[" + msg + "] to "+ exchangeName +" success!");
channel.close();
conn.close();
}
}
使用topic之后 。不管Sender端發(fā)送的消息類(lèi)型是fs.type01 還是 xx.type01 還是 type01 ,消費(fèi)者都會(huì)收到消息
六、RPC 遠(yuǎn)程過(guò)程調(diào)用
當(dāng)客戶(hù)端想要調(diào)用服務(wù)器的某個(gè)方法來(lái)完成某項(xiàng)功能時(shí),就可以使用rabbitMQ支持的PRC服務(wù)。
其實(shí)RPC服務(wù)與普通的收發(fā)消息的區(qū)別不大, RPC的過(guò)程其實(shí)就是客戶(hù)端向服務(wù)端定義好的Queue發(fā)送消息,其中攜帶的消息就應(yīng)該是服務(wù)端將要調(diào)用的方法的參數(shù) ,并使用Propertis告訴服務(wù)端將結(jié)果返回到指定的Queue。
示例:
package com.zf.rabbitmq07;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
public class RPCServer {
public static final String RPC_QUEUE_NAME = "rpc_queue";
public static String sayHello(String name){
return "hello " + name ;
}
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory connFac = new ConnectionFactory() ;
connFac.setHost("localhost");
Connection conn = connFac.newConnection() ;
Channel channel = conn.createChannel() ;
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null) ;
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false , consumer) ;
while(true){
System.out.println("服務(wù)端等待接收消息..");
Delivery deliver = consumer.nextDelivery() ;
System.out.println("服務(wù)端成功收到消息..");
BasicProperties props = deliver.getProperties() ;
String message = new String(deliver.getBody() , "UTF-8") ;
String responseMessage = sayHello(message) ;
BasicProperties responseProps = new BasicProperties.Builder()
.correlationId(props.getCorrelationId())
.build() ;
//將結(jié)果返回到客戶(hù)端Queue
channel.basicPublish("", props.getReplyTo() , responseProps , responseMessage.getBytes("UTF-8") ) ;
//向客戶(hù)端確認(rèn)消息
channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
System.out.println("服務(wù)端返回消息完成..");
}
}
}
package com.zf.rabbitmq07;
import java.io.IOException;
import java.util.UUID;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
public class RPCClient {
public static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
ConnectionFactory connFac = new ConnectionFactory() ;
connFac.setHost("localhost");
Connection conn = connFac.newConnection() ;
Channel channel = conn.createChannel() ;
//響應(yīng)QueueName ,服務(wù)端將會(huì)把要返回的信息發(fā)送到該Queue
String responseQueue = channel.queueDeclare().getQueue() ;
String correlationId = UUID.randomUUID().toString() ;
BasicProperties props = new BasicProperties.Builder()
.replyTo(responseQueue)
.correlationId(correlationId)
.build();
String message = "is_zhoufeng";
channel.basicPublish( "" , RPC_QUEUE_NAME , props , message.getBytes("UTF-8"));
QueueingConsumer consumer = new QueueingConsumer(channel) ;
channel.basicConsume( responseQueue , consumer) ;
while(true){
Delivery delivery = consumer.nextDelivery() ;
if(delivery.getProperties().getCorrelationId().equals(correlationId)){
String result = new String(delivery.getBody()) ;
System.out.println(result);
}
}
}
}
寫(xiě)在最后:
- 針對(duì)于Java程序員,筆者最近整理了一些面試真題,思維導(dǎo)圖,程序人生等PDF學(xué)習(xí)資料;
- 關(guān)注私信我"86",即可獲??!
- 希望讀到這的您能點(diǎn)個(gè)小贊和關(guān)注下我,以后還會(huì)更新技術(shù)干貨,謝謝您的支持!
