日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長提供免費收錄網(wǎng)站服務(wù),提交前請做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

目錄
  • 一、Docker拉取鏡像并啟動RabbitMQ
  • 二、Hello World
    • (一)依賴導(dǎo)入
    • (二)消息生產(chǎn)者
    • (三)消息消費者
  • 三、實現(xiàn)輪訓(xùn)分發(fā)消息
    • (一)抽取工具類
    • (二)啟動兩個工作線程
    • (三)啟動發(fā)送線程
  • 四、實現(xiàn)手動應(yīng)答
    • (一)消息應(yīng)答概念
    • (二)消息應(yīng)答的方法
    • (三)消息自動重新入隊 
    • (四)消息手動應(yīng)答代碼 
    • 1、生產(chǎn)者
    • 2、睡眠工具類模擬業(yè)務(wù)執(zhí)行
    • 3、消費者

一、Docker拉取鏡像并啟動RabbitMQ

拉取鏡像

docker pull rabbitmq:3.8.8-management

查看鏡像

docker images rabbitmq

Docker啟動RabbitMQ實現(xiàn)生產(chǎn)者與消費者的詳細(xì)過程

 啟動鏡像

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8.8-management

Linux虛擬機記得開放5672端口或者關(guān)閉防火墻,在window通過 主機ip:15672 訪問rabbitmq控制臺

Docker啟動RabbitMQ實現(xiàn)生產(chǎn)者與消費者的詳細(xì)過程

 用戶名密碼默認(rèn)為guest

Docker啟動RabbitMQ實現(xiàn)生產(chǎn)者與消費者的詳細(xì)過程

二、Hello World

(一)依賴導(dǎo)入

<!--指定 jdk 編譯版本-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <!--rabbitmq 依賴客戶端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <!--操作文件流的一個依賴-->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies>

(二)消息生產(chǎn)者

工作原理

Docker啟動RabbitMQ實現(xiàn)生產(chǎn)者與消費者的詳細(xì)過程

  • Broker:接收和分發(fā)消息的應(yīng)用,RabbitMQ Server 就是 Message Broker
  • Connection:publisher/consumer 和 broker 之間的 TCP 連接
  • Channel:如果每一次訪問 RabbitMQ 都建立一個 Connection,在消息量大的時候建立 TCP Connection 的開銷將是巨大的,效率也較低。Channel 是在 connection 內(nèi)部建立的邏輯連接,如果應(yīng)用程序支持多線程,通常每個 thread 創(chuàng)建單獨的 channel 進(jìn)行通訊,AMQP method 包含了 channel id 幫助客戶端和 message broker 識別 channel,所以 channel 之間是完全隔離的。Channel 作為輕量級的 Connection 極大減少了操作系統(tǒng)建立 TCP connection 的開銷
  • Exchange:message 到達(dá) broker 的第一站,根據(jù)分發(fā)規(guī)則,匹配查詢表中的 routing key,分發(fā)消息到 queue 中去。常用的類型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue:消息最終被送到這里等待 consumer 取走

我們需要先獲取連接(Connection),然后通過連接獲取信道(Channel),這里我們演示簡單例子,可以直接跳過交換機(Exchange)發(fā)送隊列(Queue)

public class Producer {
 
    private final static String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws Exception {
        //創(chuàng)建一個連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置主機ip
        factory.setHost("182.92.234.71");
        // 設(shè)置用戶名
        factory.setUsername("guest");
        // 設(shè)置密碼
        factory.setPassword("guest");
        //channel 實現(xiàn)了自動 close 接口 自動關(guān)閉 不需要顯示關(guān)閉
        Connection connection = factory.newConnection();
        // 獲取信道
        Channel channel = connection.createChannel();
        /*
         * 生成一個隊列
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments)
         * 1.隊列名稱
         * 2.隊列里面的消息是否持久化 默認(rèn)消息存儲在內(nèi)存中
         * 3.該隊列是否只供一個消費者進(jìn)行消費 是否進(jìn)行共享 true 可以多個消費者消費
         * 4.是否自動刪除 最后一個消費者端開連接以后 該隊列是否自動刪除 true 自動刪除
         * 5.其他參數(shù)
         **/
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "hello rabbitmq";
        /*
         * 發(fā)送一個消息
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * 1.發(fā)送到哪個交換機
         * 2.路由的key是哪個
         * 3.其他的參數(shù)信息
         * 4.發(fā)送消息的消息體
         *
         **/
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("發(fā)送成功");
    }
}

(三)消息消費者

public class Consumer {
 
    private static final String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws Exception {
        //創(chuàng)建一個連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置主機ip
        factory.setHost("182.92.234.71");
        // 設(shè)置用戶名
        factory.setUsername("guest");
        // 設(shè)置密碼
        factory.setPassword("guest");
        //channel 實現(xiàn)了自動 close 接口 自動關(guān)閉 不需要顯示關(guān)閉
        Connection connection = factory.newConnection();
        // 獲取信道
        Channel channel = connection.createChannel();
 
        // 推送的消息如何進(jìn)行消費的回調(diào)接口
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        };
        // 取消消費的一個回調(diào)接口,如在消費的時候隊列被刪除了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息消費被中斷");
        };
        /*
         * 消費者消費消息
         * basicConsume(String queue, boolean autoAck, 
         * DeliverCallback deliverCallback, CancelCallback cancelCallback)
         * 1.消費哪個隊列
         * 2.消費成功之后是否要自動應(yīng)答 true 代表自動應(yīng)答 false 手動應(yīng)答
         * 3.消費者未成功消費的回調(diào)
         **/
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

三、實現(xiàn)輪訓(xùn)分發(fā)消息

(一)抽取工具類

可以發(fā)現(xiàn),上面獲取連接工廠,然后獲取連接,再獲取信道的步驟是一致的,我們可以抽取成一個工具類來調(diào)用,并使用單例模式-餓漢式完成信道的初始化

public class RabbitMqUtils {
 
    private static Channel channel;
 
    static {
        ConnectionFactory factory = new ConnectionFactory();
        // 設(shè)置ip地址
        factory.setHost("192.168.23.100");
        // 設(shè)置用戶名
        factory.setUsername("guest");
        // 設(shè)置密碼
        factory.setPassword("guest");
        try {
            // 創(chuàng)建連接
            Connection connection = factory.newConnection();
            // 獲取信道
            channel = connection.createChannel();
        } catch (Exception e) {
            System.out.println("創(chuàng)建信道失敗,錯誤信息:" + e.getMessage());
        }
    }
 
    public static Channel getChannel() {
        return channel;
    }
}

(二)啟動兩個工作線程

相當(dāng)于前面的消費者,我們只需要寫一個類,通過ideal實現(xiàn)多線程啟動即可模擬兩個線程

public class Worker01 {
 
    private final static String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = ( consumerTag,  message) -> {
            System.out.println("接受到消息:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (cunsumerTag) -> {
            System.out.println("消費者取消消費接口回調(diào)邏輯");
        };
        // 啟動兩次,第一次為C1, 第二次為C2
        System.out.println("C2消費者等待消費消息");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback,cancelCallback);
    }
}

(三)啟動發(fā)送線程

public class Test01 {
 
    private final static String QUEUE_NAME = "hello";
 
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 通過控制臺輸入充當(dāng)消息,使輪訓(xùn)演示更明顯
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );
            System.out.println("消息發(fā)送完成:" + message);
        }
    }
}

結(jié)果 

Docker啟動RabbitMQ實現(xiàn)生產(chǎn)者與消費者的詳細(xì)過程

四、實現(xiàn)手動應(yīng)答

(一)消息應(yīng)答概念

消費者完成一個任務(wù)可能需要一段時間,如果其中一個消費者處理一個長的任務(wù)并僅只完成 了部分突然它掛掉了,會發(fā)生什么情況。RabbitMQ 一旦向消費者傳遞了一條消息,便立即將該消 息標(biāo)記為刪除。在這種情況下,突然有個消費者掛掉了,我們將丟失正在處理的消息。以及后續(xù) 發(fā)送給該消費這的消息,因為它無法接收到。 為了保證消息在發(fā)送過程中不丟失,rabbitmq 引入消息應(yīng)答機制,消息應(yīng)答就是: 消費者在接 收到消息并且處理該消息之后,告訴 rabbitmq 它已經(jīng)處理了,rabbitmq 可以把該消息刪除了。

自動應(yīng)答:消費者發(fā)送后立即被認(rèn)為已經(jīng)傳送成功。這種模式需要在高吞吐量和數(shù)據(jù)傳輸安全性方面做權(quán),因為這種模式如果消息在接收到之前,消費者那邊出現(xiàn)連接或者 channel 關(guān)閉,那么消息就丟失了。

當(dāng)然另一方面這種模式消費者那邊可以傳遞過載的消息, 沒有對傳遞的消息數(shù)量進(jìn)行限制 , 當(dāng)然這樣有可能使得消費者這邊由于接收太多還來不及處理的消息,導(dǎo)致這些消息的積壓,最終 使得內(nèi)存耗盡,最終這些消費者線程被操作系統(tǒng)殺死,所以這種模式僅適用在消費者可以高效并 以某種速率能夠處理這些消息的情況下使用 。

手動應(yīng)答:消費者接受到消息并順利完成業(yè)務(wù)后再調(diào)用方法進(jìn)行確認(rèn),rabbitmq 才可以把該消息刪除

(二)消息應(yīng)答的方法

  • Channel.basicAck(用于肯定確認(rèn))
  • RabbitMQ 已知道該消息并且成功的處理消息,可以將其丟棄了
  • Channel.basicNack(用于否定確認(rèn))
  • Channel.basicReject(用于否定確認(rèn))
  • 與 Channel.basicNack 相比少一個參數(shù)Multiple
  • multiple 的 true 和 false 代表不同意思

        true 代表批量應(yīng)答 channel 上未應(yīng)答的消息
        比如說 channel 上有傳送 tag 的消息 5,6,7,8 當(dāng)前 tag 是 8 那么此時
        5-8 的這些還未應(yīng)答的消息都會被確認(rèn)收到消息應(yīng)答
        false 同上面相比
        只會應(yīng)答 tag=8 的消息 5,6,7 這三個消息依然不會被確認(rèn)收到消息應(yīng)答

  • 不處理該消息了直接拒絕,可以將其丟棄了

Docker啟動RabbitMQ實現(xiàn)生產(chǎn)者與消費者的詳細(xì)過程

(三)消息自動重新入隊 

如果消費者由于某些原因失去連接(其通道已關(guān)閉,連接已關(guān)閉或 TCP 連接丟失),導(dǎo)致消息未發(fā)送 ACK 確認(rèn),RabbitMQ 將了解到消息未完全處理,并將對其重新排隊。如果此時其他消費者可以處理,它將很快將其重新分發(fā)給另一個消費者。這樣,即使某個消費者偶爾死亡,也可以確 保不會丟失任何消息。

Docker啟動RabbitMQ實現(xiàn)生產(chǎn)者與消費者的詳細(xì)過程

(四)消息手動應(yīng)答代碼 

1、生產(chǎn)者

public class Test01 {
 
    private final static String QUEUE_NAME = "ack";
 
    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
 
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );
            System.out.println("消息發(fā)送完成:" + message);
        }
    }
}

2、睡眠工具類模擬業(yè)務(wù)執(zhí)行

public class SleepUtils {
 
    public static void sleep(int second) {
        try {
            Thread.sleep(1000 * second);
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}

3、消費者

public class Worker01 {
 
    private final static String QUEUE_NAME = "ack";
 
    public static void main(String[] args) throws Exception {
        System.out.println("C1,業(yè)務(wù)時間短");
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = ( consumerTag,  message) -> {
            SleepUtils.sleep(1);  // 模擬業(yè)務(wù)執(zhí)行1秒
            System.out.println("接受到消息:" + new String(message.getBody()));
            /*
             * 1、消息標(biāo)識
             * 2、是否啟動批量確認(rèn),false:否。
             *    啟用批量有可能造成消息丟失,比如未消費的消息提前被確然刪除,后面業(yè)務(wù)消費該消息
             *    時出現(xiàn)異常會導(dǎo)致該消息的丟失
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (cunsumerTag) -> {
            System.out.println("消費者取消消費接口回調(diào)邏輯");
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);
    }
}
 
==============================================================================
public class Worker02 {
 
    private final static String QUEUE_NAME = "ack";
 
    public static void main(String[] args) throws Exception {
        System.out.println("C2,業(yè)務(wù)時間長");
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = ( consumerTag,  message) -> {
            SleepUtils.sleep(15);  // 模擬業(yè)務(wù)執(zhí)行15秒
            System.out.println("接受到消息:" + new String(message.getBody()));
            /*
             * 1、消息標(biāo)識
             * 2、是否啟動批量確認(rèn),false:否。
             *    啟用批量有可能造成消息丟失,比如未消費的消息提前被確然刪除,后面業(yè)務(wù)消費該消息
             *    時出現(xiàn)異常會導(dǎo)致該消息的丟失
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (cunsumerTag) -> {
            System.out.println("消費者取消消費接口回調(diào)邏輯");
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);
    }
}

worker01業(yè)務(wù)時間短,worker02業(yè)務(wù)時間長,我們提前終止worker02模擬出異常,可以看到消息dd會被放回隊列由worker01接收處理。

注意:這里需要先啟動生產(chǎn)者聲明隊列ack,不然啟動消費者會報錯

Docker啟動RabbitMQ實現(xiàn)生產(chǎn)者與消費者的詳細(xì)過程

最后一個案例我們可以看到消息輪訓(xùn)+消息自動重新入隊+手動應(yīng)答。

分享到:
標(biāo)簽:啟動 服務(wù)器 消費者 生產(chǎn)者 過程
用戶無頭像

網(wǎng)友整理

注冊時間:

網(wǎng)站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨大挑戰(zhàn)2018-06-03

數(shù)獨一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學(xué)四六

運動步數(shù)有氧達(dá)人2018-06-03

記錄運動步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績評定2018-06-03

通用課目體育訓(xùn)練成績評定