日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

這篇文章主要介紹了LaravelKafka的使用詳解,kafka是一個分布式消息隊列,具有高性能、持久化、多副本備份、橫向擴展能力,有對于消息隊列感興趣的同學可以參考下

本文并沒有kafka的安裝教程,本文是針對已經安裝kafka及其配置好kafka的php拓展并且使用laravel框架進行開發項目,配置一個可供laravel框架使用的生產及消費者類。

以下代碼修改自本站的YII框架關于kafka類的代碼,經過測試使用在本人的項目中,可正常運行,larvael版本:5.6 代碼放置larvael框架位置:app/Tools/Kafka.php

<?php
namespace App\Tools;
 
use Illuminate\Config\Repository;
 
use Illuminate\Support\Facades\DB;
use Monolog\Logger;
use Monolog\Handler\StreamHandler;
 
use Illuminate\Http\Request;
 
class Kafka
{
    public $broker_list = '127.0.0.1';//配置kafka,可以用逗號隔開多個kafka
    public $topic = 'test';//管道名稱
    public $partition = 0;
 
    protected $producer = null;
    protected $consumer = null;
 
    public function __construct()
    {
        if (empty($this->broker_list)) {
            throw new InvalidConfigException("broker not config");
        }
        $rk = new \RdKafka\Producer();
        if (empty($rk)) {
            throw new InvalidConfigException("producer error");
        }
        $rk->setLogLevel(LOG_DEBUG);
        if (!$rk->addBrokers($this->broker_list)) {
            throw new InvalidConfigException("producer error");
        }
        $this->producer = $rk;
    }
 
    /**
     * 生產者
     * @param array $messages
     * @return mixed
     */
    public function send($messages = [],$topic)
    {
        $topic = $this->producer->newTopic($topic);
        return $topic->produce(RD_KAFKA_PARTITION_UA, $this->partition, json_encode($messages));
    }
 
    /**
     * 消費者
     */
    public function consumer($object, $callback){
        $conf = new \RdKafka\Conf();
        $conf->set('group.id', 0);
        $conf->set('metadata.broker.list', $this->broker_list);
     
        $topicConf = new \RdKafka\TopicConf();
        $topicConf->set('auto.offset.reset', 'smallest');
     
        $conf->setDefaultTopicConf($topicConf);
     
        $consumer = new \RdKafka\KafkaConsumer($conf);
     
        $consumer->subscribe([$this->topic]);
     
        echo "waiting for messages.....\n";
        while(true) {
            $message = $consumer->consume(120*1000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                echo "message payload....";
                $object->$callback($message->payload);
                break;
            }
            sleep(1);
        }
    }
}
?>

在控制器中如何使用:

首先再頭部導入這個類:use App\Tools\Kafka;

下面是使用生產者實例:

public function test(){ 
    $topic = 'tool';//輸入使用管道名稱
    $data['shop_id'] = 58;
    $data['bar_code']=586;
    $data['goods_num'] = 1;
    $data['goods_unit'] = '個';
 
    $Kafka = new Kafka();
    $Error_Msg = $Kafka->send($data,$topic);//傳入數組會自動轉換json
    var_dump($Error_Msg); 
}

下面是消費者實例,消費者我這里使用了的是php腳本進行的操作:

<?php
 
$conf = new RdKafka\Conf();
 
$conf->set('group.id', 'myConsumerGroup');
 
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("localhost:9092");
 
$topicConf = new RdKafka\TopicConf();
$topicConf->set('auto.commit.interval.ms', 100);
$topicConf->set('offset.store.method', 'file');
$topicConf->set('offset.store.path', sys_get_temp_dir());
$topicConf->set('auto.offset.reset', 'smallest');
 
$topic = $rk->newTopic("tool", $topicConf);//讀取的管道
 
// Start consuming partition 0
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
 
while (true) {
    $message = $topic->consume(0, 120*10000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            //沒有錯誤打印信息
            $message = json_decode(json_encode($message),true);
            $data = json_decode($message['payload'],true);
            var_dump($data);
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "等待接收信息\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "超時\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
    sleep(1);
}
 
?>



分享到:
標簽:Laravel Kafka的使用
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定