日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線(xiàn)咨詢(xún)客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

 

阿里P7架構(gòu)師的RabbitMQ學(xué)習(xí)筆記告訴你什么叫做消息隊(duì)列王者

 

一、簡(jiǎn)單的發(fā)送與接收消息 HelloWorld

1. 發(fā)送消息

發(fā)送消息首先要獲取與rabbitmq-server的連接,然后從渠道(chann)中指定的queue發(fā)送消息 , 不能定義兩個(gè)queue名字相同,但屬性不同

示例:

package com.zf.rabbitmq01;
import JAVA.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * 發(fā)送消息
 * @author zhanghuan
 *
 */
public class Sender01 {
    public static void main(String[] args) throws IOException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        //RabbitMQ-Server安裝在本機(jī),所以直接用127.0.0.1
        connFac.setHost("127.0.0.1");
        //創(chuàng)建一個(gè)連接
        Connection conn = connFac.newConnection() ;
        //創(chuàng)建一個(gè)渠道
        Channel channel = conn.createChannel() ;
        //定義Queue名稱(chēng)
        String queueName = "queue01" ;
        //為channel定義queue的屬性,queueName為Queue名稱(chēng)
        channel.queueDeclare( queueName , false, false, false, null) ;
        String msg = "Hello World!";
        //發(fā)送消息
        channel.basicPublish("", queueName , null , msg.getBytes());
        System.out.println("send message[" + msg + "] to "+ queueName +" success!");
        channel.close();
        conn.close();
    }
}

package com.zf.rabbitmq01;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
 * 接收消息
 * @author zhanghuan
 *
 */
public class Recv01 {
    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        connFac.setHost("127.0.0.1");
        Connection conn = connFac.newConnection() ;
        Channel channel = conn.createChannel() ;
        String queueName = "queue01";
        channel.queueDeclare(queueName, false, false, false, null) ;
        //上面的部分,與Sender01是一樣的
        //配置好獲取消息的方式
        QueueingConsumer consumer = new QueueingConsumer(channel) ;
        channel.basicConsume(queueName, true, consumer) ;
        //循環(huán)獲取消息
        while(true){
            //獲取消息,如果沒(méi)有消息,這一步將會(huì)一直阻塞
            Delivery delivery = consumer.nextDelivery() ;
            String msg = new String(delivery.getBody()) ;
            System.out.println("received message[" + msg + "] from " + queueName);
        }
    }
}

 

阿里P7架構(gòu)師的RabbitMQ學(xué)習(xí)筆記告訴你什么叫做消息隊(duì)列王者

 

二、消息確認(rèn)與公平調(diào)度消費(fèi)者

從本節(jié)開(kāi)始稱(chēng)Sender為生產(chǎn)者 , Recv為消費(fèi)者

1. 消息確認(rèn)

為了確保消息一定被消費(fèi)者處理,rabbitMQ提供了消息確認(rèn)功能,就是在消費(fèi)者處理完任務(wù)之后,就給服務(wù)器一個(gè)回饋,服務(wù)器就會(huì)將該消息刪除,如果消費(fèi)者超時(shí)不回饋,那么服務(wù)器將就將該消息重新發(fā)送給其他消費(fèi)者默認(rèn)是開(kāi)啟的,在消費(fèi)者端通過(guò)下面的方式開(kāi)啟消息確認(rèn), 首先將autoAck自動(dòng)確認(rèn)關(guān)閉,等我們的任務(wù)執(zhí)行完成之后,手動(dòng)的去確認(rèn),類(lèi)似JDBC的autocommit一樣

QueueingConsumer consumer = new QueueingConsumer(channel);
Boolean autoAck = false;
channel.basicConsume("hello", autoAck, consumer);

在前面的例子中使用的是channel.basicConsume(channelName, true, consumer) ; 在接收到消息后,就會(huì)自動(dòng)反饋一個(gè)消息給服務(wù)器。

下面這個(gè)例子來(lái)測(cè)試消息確認(rèn)的功能。

package com.zf.rabbitmq03;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * 發(fā)送消息
 * @author zhanghuan
 *
 */
public class Sender03 {
    public static void main(String[] args) throws IOException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        //RabbitMQ-Server安裝在本機(jī),所以直接用127.0.0.1
        connFac.setHost("127.0.0.1");
        //創(chuàng)建一個(gè)連接
        Connection conn = connFac.newConnection() ;
        //創(chuàng)建一個(gè)渠道
        Channel channel = conn.createChannel() ;
        //定義Queue名稱(chēng)
        String queueName = "queue01" ;
        //為channel定義queue的屬性,queueName為Queue名稱(chēng)
        channel.queueDeclare( queueName , false, false, false, null) ;
        String msg = "Hello World!";
        //發(fā)送消息
        channel.basicPublish("", queueName , null , msg.getBytes());
        System.out.println("send message[" + msg + "] to "+ queueName +" success!");
        channel.close();
        conn.close();
    }
}

與Sender01.java一樣,沒(méi)有什么區(qū)別。

package com.zf.rabbitmq03;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
 * 接收消息
 * @author zhanghuan
 *
 */
public class Recv03 {
    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        connFac.setHost("127.0.0.1");
        Connection conn = connFac.newConnection() ;
        Channel channel = conn.createChannel() ;
        String channelName = "channel01";
        channel.queueDeclare(channelName, false, false, false, null) ;
        //配置好獲取消息的方式
        QueueingConsumer consumer = new QueueingConsumer(channel) ;
        //取消 autoAck
        Boolean autoAck = false ;
        channel.basicConsume(channelName, autoAck, consumer) ;
        //循環(huán)獲取消息
        while(true){
            //獲取消息,如果沒(méi)有消息,這一步將會(huì)一直阻塞
            Delivery delivery = consumer.nextDelivery() ;
            String msg = new String(delivery.getBody()) ;
            //確認(rèn)消息,已經(jīng)收到
            channel.basicAck(delivery.getEnvelope().getDeliveryTag()
                                , false);
            System.out.println("received message[" + msg + "] from " + channelName);
        }
    }
}

注意:一旦將autoAck關(guān)閉之后,一定要記得處理完消息之后,向服務(wù)器確認(rèn)消息。否則服務(wù)器將會(huì)一直轉(zhuǎn)發(fā)該消息如果將上面的channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);注釋掉, Sender03.java只需要運(yùn)行一次 , Recv03.java每次運(yùn)行將都會(huì)收到HelloWorld消息

注意:但是這樣還是不夠的,如果rabbitMQ-Server突然掛掉了,那么還沒(méi)有被讀取的消息還是會(huì)丟失 ,所以我們可以讓消息持久化。 只需要在定義Queue時(shí),設(shè)置持久化消息就可以了,方法如下:

boolean durable = true;channel.queueDeclare(channelName, durable, false, false, null);

這樣設(shè)置之后,服務(wù)器收到消息后就會(huì)立刻將消息寫(xiě)入到硬盤(pán),就可以防止突然服務(wù)器掛掉,而引起的數(shù)據(jù)丟失了。 但是服務(wù)器如果剛收到消息,還沒(méi)來(lái)得及寫(xiě)入到硬盤(pán),就掛掉了,這樣還是無(wú)法避免消息的丟失。

阿里P7架構(gòu)師的RabbitMQ學(xué)習(xí)筆記告訴你什么叫做消息隊(duì)列王者

 

2. 公平調(diào)度

上一個(gè)例子能夠?qū)崿F(xiàn)發(fā)送一個(gè)Message與接收一個(gè)Message

從上一個(gè)Recv01中可以看出,必須處理完一個(gè)消息,才會(huì)去接收下一個(gè)消息。如果生產(chǎn)者眾多,那么一個(gè)消費(fèi)者肯定是忙不過(guò)來(lái)的。此時(shí)就可以用多個(gè)消費(fèi)者來(lái)對(duì)同一個(gè)Channel的消息進(jìn)行處理,并且要公平的分配任務(wù)給多個(gè)消費(fèi)者。不能部分很忙,部分總是空閑

實(shí)現(xiàn)公平調(diào)度的方式就是讓每個(gè)消費(fèi)者在同一時(shí)刻會(huì)分配一個(gè)任務(wù)。 通過(guò)channel.basicQos(1);可以設(shè)置

列如:

當(dāng)有多個(gè)消費(fèi)者同時(shí)收取消息,且每個(gè)消費(fèi)者在接收消息的同時(shí),還要做其它的事情,且會(huì)消耗很長(zhǎng)的時(shí)間,在此過(guò)程中可能會(huì)出現(xiàn)一些意外,比如消息接收到一半的時(shí)候,一個(gè)消費(fèi)者宕掉了,這時(shí)候就要使用消息接收確認(rèn)機(jī)制,可以讓其它的消費(fèi)者再次執(zhí)行剛才宕掉的消費(fèi)者沒(méi)有完成的事情。另外,在默認(rèn)情況下,我們創(chuàng)建的消息隊(duì)列以及存放在隊(duì)列里面的消息,都是非持久化的,也就是說(shuō)當(dāng)RabbitMQ宕掉了或者是重啟了,創(chuàng)建的消息隊(duì)列以及消息都不會(huì)保存,為了解決這種情況,保證消息傳輸?shù)目煽啃?,我們可以使用RabbitMQ提供的消息隊(duì)列的持久化機(jī)制。

阿里P7架構(gòu)師的RabbitMQ學(xué)習(xí)筆記告訴你什么叫做消息隊(duì)列王者

 

生產(chǎn)者:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
public class ClientSend1 {
   public static final String queue_name="my_queue";
   public static final Boolean durable=true;
   //消息隊(duì)列持久化 
   public static void main(String[] args) 
   throws java.io.IOException{
       ConnectionFactory factory=new ConnectionFactory();
       //創(chuàng)建連接工廠(chǎng)
       factory.setHost("localhost");
       factory.setVirtualHost("my_mq");
       factory.setUsername("zhxia");
       factory.setPassword("123456");
       Connection connection=factory.newConnection();
       //創(chuàng)建連接
       Channel channel=connection.createChannel();
       //創(chuàng)建信道
       channel.queueDeclare(queue_name, durable, false, false, null);
       //聲明消息隊(duì)列,且為可持久化的
       String message="Hello world"+Math.random();
       //將隊(duì)列設(shè)置為持久化之后,還需要將消息也設(shè)為可持久化的,MessageProperties.PERSISTENT_TEXT_PLAIN
       channel.basicPublish("", queue_name, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
       System.out.println("Send message:"+message);
       channel.close();
       connection.close();
   }
}

說(shuō)明:行17 和行20 需要同時(shí)設(shè)置,也就是將隊(duì)列設(shè)置為持久化之后,還需要將發(fā)送的消息也要設(shè)置為持久化才能保證隊(duì)列和消息一直存在

消費(fèi)者:

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
public class ClientReceive1 {
    public static final String queue_name="my_queue";
    public static final Boolean autoAck=false;
    public static final Boolean durable=true;
    public static void main(String[] args)
    throws java.io.IOException,java.lang.InterruptedException{
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("my_mq");
        factory.setUsername("zhxia");
        factory.setPassword("123456");
        Connection connection=factory.newConnection();
        Channel channel=connection.createChannel();
        channel.queueDeclare(queue_name, durable, false, false, null);
        System.out.println("Wait for message");
        channel.basicQos(1);
        //消息分發(fā)處理
        QueueingConsumer consumer=new QueueingConsumer(channel);
        channel.basicConsume(queue_name, autoAck, consumer);
        while(true){
            Thread.sleep(500);
            QueueingConsumer.Delivery deliver=consumer.nextDelivery();
            String message=new String(deliver.getBody());
            System.out.println("Message received:"+message);
            channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
        }
    }
}

說(shuō)明:行22: 設(shè)置RabbitMQ調(diào)度分發(fā)消息的方式,也就是告訴RabbitMQ每次只給消費(fèi)者處理一條消息,也就是等待消費(fèi)者處理完并且已經(jīng)對(duì)剛才處理的消息進(jìn)行確認(rèn)之后, 才發(fā)送下一條消息,防止消費(fèi)者太過(guò)于忙碌。如下圖所示:

阿里P7架構(gòu)師的RabbitMQ學(xué)習(xí)筆記告訴你什么叫做消息隊(duì)列王者

 

三、發(fā)布/訂閱消息

前面都是一條消息只會(huì)被一個(gè)消費(fèi)者處理。

如果要每個(gè)消費(fèi)者都處理同一個(gè)消息,rabbitMq也提供了相應(yīng)的方法。

在以前的程序中,不管是生產(chǎn)者端還是消費(fèi)者端都必須知道一個(gè)指定的QueueName才能發(fā)送、獲取消息。 而rabbitMQ消息模型的核心思想是生產(chǎn)者不會(huì)將消息直接發(fā)送給隊(duì)列。

因?yàn)?,生產(chǎn)者通常不會(huì)知道消息將會(huì)被哪些消費(fèi)者接收。

生產(chǎn)者的消息雖然不是直接發(fā)送給Queue,但是消息會(huì)交給Exchange,所以需要定義Exchange的消息分發(fā)模式 ,之前的程序中,有如下一行代碼:

channel.basicPublish("", queueName , null , msg.getBytes());

第一個(gè)參數(shù)為空字符串,其實(shí)第一個(gè)參數(shù)就是ExchangeName,這里用空字符串,就表示消息會(huì)交給默認(rèn)的Exchange。

下面我們將自己定義Exchange的屬性。

package com.zf.rabbitmq04;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * 發(fā)送消息
 * @author zhanghuan
 *
 */
public class Sender04 {
    public static void main(String[] args) throws IOException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        //RabbitMQ-Server安裝在本機(jī),所以直接用127.0.0.1
        connFac.setHost("127.0.0.1");
        //創(chuàng)建一個(gè)連接
        Connection conn = connFac.newConnection() ;
        //創(chuàng)建一個(gè)渠道
        Channel channel = conn.createChannel() ;
        //定義ExchangeName,第二個(gè)參數(shù)是Exchange的類(lèi)型,fanout表示消息將會(huì)分列發(fā)送給多賬戶(hù)
        String exchangeName = "news" ;
        channel.exchangeDeclare(exchangeName, "fanout") ;
        String msg = "Hello World!";
        //發(fā)送消息,這里與前面的不同,這里第一個(gè)參數(shù)不再是字符串,而是ExchangeName ,第二個(gè)參數(shù)也不再是queueName,而是空字符串
        channel.basicPublish( exchangeName , "" , null , msg.getBytes());
        System.out.println("send message[" + msg + "] to exchange "+ exchangeName +" success!");
        channel.close();
        conn.close();
    }
}

Send04.java 發(fā)送消息時(shí)沒(méi)有指定的queueName 用的空字符串代替的。 Exchange的類(lèi)型有direct, topic, headers 、 fanout四種,上面用的是fanout類(lèi)型

package com.zf.rabbitmq04;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
 * 接收消息
 * @author zhanghuan
 *
 */
public class Recv04_01 {
    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        connFac.setHost("127.0.0.1");
        Connection conn = connFac.newConnection() ;
        Channel channel = conn.createChannel() ;
        String exchangeName = "news" ;
        channel.exchangeDeclare(exchangeName, "fanout") ;
        //這里使用沒(méi)有參數(shù)的queueDeclare方法創(chuàng)建Queue并獲取QueueName
        String queueName = channel.queueDeclare().getQueue() ;
        //將queue綁定到Exchange中
        channel.queueBind( queueName, exchangeName, "") ;
        //配置好獲取消息的方式
        QueueingConsumer consumer = new QueueingConsumer(channel) ;
        channel.basicConsume(queueName, true, consumer) ;
        //循環(huán)獲取消息
        while(true){
            //獲取消息,如果沒(méi)有消息,這一步將會(huì)一直阻塞
            Delivery delivery = consumer.nextDelivery() ;
            String msg = new String(delivery.getBody()) ;
            System.out.println("received message[" + msg + "] from " + queueName);
        }
    }
}

Recv04_01.java 使用channel.queueDeclare()方法創(chuàng)建了一個(gè)Queue,該Queue有系統(tǒng)創(chuàng)建,并分配了一個(gè)隨機(jī)的名稱(chēng)。 然后將該Queue與與Exchange綁定在一起。 該Queue就能從Exchange中后去消息了。

測(cè)試

將Recv04_01.java 文件復(fù)制幾份 Recv04_02.java Recv04_03.java,然后執(zhí)行Recv04_01 與 Recv04_02,接下來(lái)執(zhí)行Sender04發(fā)送消息,可以看到Recv04_01 與Recv04_02都接收到了消息。然后執(zhí)行Recv04_03,沒(méi)有獲取到任何消息。接下來(lái)再執(zhí)行Sender04發(fā)送消息,可以看到Recv04_01 、Recv04_02與Recv04_03都接收到了消息。

說(shuō)明Exchange在收到生產(chǎn)者的消息后,會(huì)將消息發(fā)送給當(dāng)前已經(jīng)與它綁定了的所有Queue 。 然后被移除。

阿里P7架構(gòu)師的RabbitMQ學(xué)習(xí)筆記告訴你什么叫做消息隊(duì)列王者

 

四、消息路由

生產(chǎn)者會(huì)生產(chǎn)出很多消息 , 但是不同的消費(fèi)者可能會(huì)有不同的需求,只需要接收指定的消息,其他的消息需要被過(guò)濾掉。 這時(shí)候就可以對(duì)消息進(jìn)行過(guò)濾了。 在消費(fèi)者端設(shè)置好需要接收的消息類(lèi)型。

如果不使用默認(rèn)的Exchange發(fā)送消息,而是使用我們自定定義的Exchange發(fā)送消息,那么下面這個(gè)方法的第二個(gè)參數(shù)就不是QueueName了,而是消息的類(lèi)型。

channel.basicPublish( exchangeName , messageType , null , msg.getBytes());

示例:

package com.zf.rabbitmq05;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * 發(fā)送消息
 * @author zhanghuan
 *
 */
public class Sender05 {
    public static void main(String[] args) throws IOException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        //RabbitMQ-Server安裝在本機(jī),所以直接用127.0.0.1
        connFac.setHost("127.0.0.1");
        //創(chuàng)建一個(gè)連接
        Connection conn = connFac.newConnection() ;
        //創(chuàng)建一個(gè)渠道
        Channel channel = conn.createChannel() ;
        String exchangeName = "exchange02";
        String messageType = "type01";
        channel.exchangeDeclare(exchangeName, "direct") ;
        //定義Queue名
        String msg = "Hello World!";
        //發(fā)送消息
        channel.basicPublish( exchangeName , messageType , null , msg.getBytes());
        System.out.println("send message[" + msg + "] to "+ exchangeName +" success!");
        channel.close();
        conn.close();
    }
}
package com.zf.rabbitmq05;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
 * 接收消息
 * @author zhanghuan
 *
 */
public class Recv05_01 {
    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        connFac.setHost("127.0.0.1");
        Connection conn = connFac.newConnection() ;
        Channel channel = conn.createChannel() ;
        String exchangeName = "exchange02";
        channel.exchangeDeclare(exchangeName, "direct") ;
        String queueName = channel.queueDeclare().getQueue() ;
        //第三個(gè)參數(shù)就是type,這里表示只接收type01類(lèi)型的消息。
        channel.queueBind(queueName, exchangeName, "type01") ;
        //也可以選擇接收多種類(lèi)型的消息。只需要再下面再綁定一次就可以了
        channel.queueBind(queueName, exchangeName, "type02") ;
        //配置好獲取消息的方式
        QueueingConsumer consumer = new QueueingConsumer(channel) ;
        channel.basicConsume(queueName, true, consumer) ;
        //循環(huán)獲取消息
        while(true){
            //獲取消息,如果沒(méi)有消息,這一步將會(huì)一直阻塞
            Delivery delivery = consumer.nextDelivery() ;
            String msg = new String(delivery.getBody()) ;
            System.out.println("received message[" + msg + "] from " + exchangeName);
        }
    }
}

這時(shí),啟動(dòng)Recv05_01.java 然后啟動(dòng)Sender05.java ,消費(fèi)者端就會(huì)收到消息。然后將Sender05.java 中的messageType分別改為type02 type03 然后發(fā)送消息 , 可以看到消費(fèi)者端能接收到type02的消息,但是不能接收到type03的消息。

阿里P7架構(gòu)師的RabbitMQ學(xué)習(xí)筆記告訴你什么叫做消息隊(duì)列王者

 

五、Topic類(lèi)型消息

上一節(jié)中使用了消息路由,消費(fèi)者可以選擇性的接收消息。 但是這樣還是不夠靈活。

比如某個(gè)消費(fèi)者要訂閱娛樂(lè)新聞消息 。 包括新浪、網(wǎng)易、騰訊的娛樂(lè)新聞。那么消費(fèi)者就需要綁定三次,分別綁定這三個(gè)網(wǎng)站的消息類(lèi)型。 如果新聞門(mén)戶(hù)更多了,那么消費(fèi)者將要綁定個(gè)更多的消息類(lèi)型, 其實(shí)消費(fèi)者只是需要訂閱娛樂(lè)新聞,不管是哪個(gè)網(wǎng)站的新聞,都需要。 那么在rabbitMQ中可以使用topic類(lèi)型。 模糊匹配消息類(lèi)型。

模糊匹配中的 *代表一個(gè) #代表零個(gè)或1個(gè)

示例:

package com.zf.rabbitmq06;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
/**
 * 接收消息
 * @author zhanghuan
 *
 */
public class Recv06_01 {
    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        connFac.setHost("127.0.0.1");
        Connection conn = connFac.newConnection() ;
        Channel channel = conn.createChannel() ;
        String exchangeName = "exchange03";
        channel.exchangeDeclare(exchangeName, "topic") ;
        String queueName = channel.queueDeclare().getQueue() ;
        //第三個(gè)參數(shù)就是type,這里表示只接收type01類(lèi)型的消息。
        channel.queueBind(queueName, exchangeName, "#.type01") ;
        //配置好獲取消息的方式
        QueueingConsumer consumer = new QueueingConsumer(channel) ;
        channel.basicConsume(queueName, true, consumer) ;
        //循環(huán)獲取消息
        while(true){
            //獲取消息,如果沒(méi)有消息,這一步將會(huì)一直阻塞
            Delivery delivery = consumer.nextDelivery() ;
            String msg = new String(delivery.getBody()) ;
            System.out.println("received message[" + msg + "] from " + exchangeName);
        }
    }
}
package com.zf.rabbitmq06;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * 發(fā)送消息
 * @author zhanghuan *
 */
public class Sender06 {
    public static void main(String[] args) throws IOException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        //RabbitMQ-Server安裝在本機(jī),所以直接用127.0.0.1
        connFac.setHost("127.0.0.1");
        //創(chuàng)建一個(gè)連接
        Connection conn = connFac.newConnection() ;
        //創(chuàng)建一個(gè)渠道
        Channel channel = conn.createChannel() ;
        String exchangeName = "exchange03";
        String messageType = "fs.type01";
        channel.exchangeDeclare(exchangeName, "topic") ;
        //定義Queue名
        String msg = "Hello World!";
        //發(fā)送消息
        channel.basicPublish( exchangeName , messageType , null , msg.getBytes());
        System.out.println("send message[" + msg + "] to "+ exchangeName +" success!");
        channel.close();
        conn.close();
    }
}

使用topic之后 。不管Sender端發(fā)送的消息類(lèi)型是fs.type01 還是 xx.type01 還是 type01 ,消費(fèi)者都會(huì)收到消息

六、RPC 遠(yuǎn)程過(guò)程調(diào)用

當(dāng)客戶(hù)端想要調(diào)用服務(wù)器的某個(gè)方法來(lái)完成某項(xiàng)功能時(shí),就可以使用rabbitMQ支持的PRC服務(wù)。

其實(shí)RPC服務(wù)與普通的收發(fā)消息的區(qū)別不大, RPC的過(guò)程其實(shí)就是客戶(hù)端向服務(wù)端定義好的Queue發(fā)送消息,其中攜帶的消息就應(yīng)該是服務(wù)端將要調(diào)用的方法的參數(shù) ,并使用Propertis告訴服務(wù)端將結(jié)果返回到指定的Queue。

示例:

package com.zf.rabbitmq07;
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
public class RPCServer {
    public static final String RPC_QUEUE_NAME = "rpc_queue";
    public static String sayHello(String name){
        return "hello " + name ;
    }
    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        connFac.setHost("localhost");
        Connection conn = connFac.newConnection() ;
        Channel channel = conn.createChannel() ;
        channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null) ;
        QueueingConsumer consumer = new QueueingConsumer(channel);
        channel.basicConsume(RPC_QUEUE_NAME, false , consumer) ;
        while(true){
            System.out.println("服務(wù)端等待接收消息..");
            Delivery deliver = consumer.nextDelivery() ;
            System.out.println("服務(wù)端成功收到消息..");
            BasicProperties props =  deliver.getProperties() ;
            String message = new String(deliver.getBody() , "UTF-8") ;
            String responseMessage = sayHello(message) ;
            BasicProperties responseProps = new BasicProperties.Builder()
                        .correlationId(props.getCorrelationId())  
                        .build() ;
            //將結(jié)果返回到客戶(hù)端Queue
            channel.basicPublish("", props.getReplyTo() , responseProps , responseMessage.getBytes("UTF-8") ) ;
            //向客戶(hù)端確認(rèn)消息
            channel.basicAck(deliver.getEnvelope().getDeliveryTag(), false);
            System.out.println("服務(wù)端返回消息完成..");
        }
    }
}
package com.zf.rabbitmq07;
import java.io.IOException;
import java.util.UUID;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.QueueingConsumer.Delivery;
import com.rabbitmq.client.ShutdownSignalException;
public class RPCClient {
    public static final String RPC_QUEUE_NAME = "rpc_queue";
    public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
        ConnectionFactory connFac = new ConnectionFactory() ;
        connFac.setHost("localhost");
        Connection conn = connFac.newConnection() ;
        Channel channel = conn.createChannel() ;
        //響應(yīng)QueueName ,服務(wù)端將會(huì)把要返回的信息發(fā)送到該Queue
        String responseQueue = channel.queueDeclare().getQueue() ;
        String correlationId = UUID.randomUUID().toString() ;
        BasicProperties props = new BasicProperties.Builder()
                .replyTo(responseQueue)
                .correlationId(correlationId)
                .build();
        String message = "is_zhoufeng";
        channel.basicPublish( "" , RPC_QUEUE_NAME , props ,  message.getBytes("UTF-8"));
        QueueingConsumer consumer = new QueueingConsumer(channel)   ;
        channel.basicConsume( responseQueue , consumer) ;
        while(true){
            Delivery delivery = consumer.nextDelivery() ;
            if(delivery.getProperties().getCorrelationId().equals(correlationId)){
                String result = new String(delivery.getBody()) ;
                System.out.println(result);
            }
        }
    }
}

寫(xiě)在最后:

  • 針對(duì)于Java程序員,筆者最近整理了一些面試真題,思維導(dǎo)圖,程序人生等PDF學(xué)習(xí)資料;
  • 關(guān)注私信我"86",即可獲??!
  • 希望讀到這的您能點(diǎn)個(gè)小贊和關(guān)注下我,以后還會(huì)更新技術(shù)干貨,謝謝您的支持!
阿里P7架構(gòu)師的RabbitMQ學(xué)習(xí)筆記告訴你什么叫做消息隊(duì)列王者

分享到:
標(biāo)簽:阿里
用戶(hù)無(wú)頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過(guò)答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定