日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

目錄
  • 概述
  • 環(huán)境準(zhǔn)備
    • Docker & Docker-Compose
    • Linux服務(wù)器
  • 步驟一:部署到開(kāi)發(fā)環(huán)境上
    • docker-compose.yml文件編寫(xiě)
    • 運(yùn)行啟動(dòng)腳本
    • (拓展)容器可視化頁(yè)面
    • (拓展)Kafka可視化頁(yè)面
    • 用腳本命令進(jìn)行測(cè)試
    • 整合Spring Boot應(yīng)用
  • 步驟二:部署到生產(chǎn)環(huán)境上
    • docker-compose.yml
    • docker-compose配置文件變化部分以及說(shuō)明
    • 啟動(dòng)Nginx容器
    • 最后一步:IP別名映射的重要性
    • extra_hosts配置
    • 開(kāi)發(fā)機(jī)修改hosts

概述

Kafka的環(huán)境配置花了我好幾天時(shí)間才搞明白,現(xiàn)整理一下。

原來(lái)公司對(duì)Kafka環(huán)境的安裝是這樣的:

  • 獲取(下載)Zookeeper和Kafka的Linux安裝包
  • 安裝Zookeeper前保證有Java環(huán)境
  • 修改配置文件
  • 根據(jù)官網(wǎng)教程逐步安裝

原來(lái)的環(huán)境部署方式已經(jīng)有些落后了,現(xiàn)在我們搭建環(huán)境基本都會(huì)采用Docker容器,不僅簡(jiǎn)化了部署流程,還方便管理。

在Kafka2.8版本之前,Kafka是強(qiáng)依賴于Zookeeper中間件的,這本身就很不合理,中間件依賴另一個(gè)中間件,搭建起來(lái)實(shí)在麻煩,并且Zookeeper需要搭建三個(gè)以上服務(wù)作為集群(不考慮掛掉的話一個(gè)也可以,但這就不叫集群了,只能算單節(jié)點(diǎn)),Kafka也要三個(gè)以上做集群(為了數(shù)據(jù)的高可用),所幸Kafka2.8之后推出了KRaft模式,即拋棄Zookeeper,由Kafka節(jié)點(diǎn)自己做Controller來(lái)選舉Leader,本篇文章內(nèi)容就是介紹如何在Docker-Compose中搭建Kafka KRaft環(huán)境。

環(huán)境準(zhǔn)備

Docker & Docker-Compose

需要提前準(zhǔn)備好Docker和Docker-Compose環(huán)境,如果沒(méi)有安裝是不行的

Linux服務(wù)器

可以是Windows下的VMware虛擬機(jī),也可以是云服務(wù)器

順帶說(shuō)明一下我當(dāng)前的環(huán)境吧:

  • 開(kāi)發(fā)機(jī)是Windows,因?yàn)樾枰狶inux環(huán)境,所以整了個(gè)虛擬機(jī)安裝CentOS 7(有Docker),在這上面搭建我開(kāi)發(fā)時(shí)需要的所有環(huán)境,比如MySQL、Redis等,這樣可以保證開(kāi)發(fā)和生產(chǎn)的環(huán)境一致,也不用另一套在Windows上搭建環(huán)境的學(xué)習(xí)成本。
  • 虛擬機(jī)上的CentOS,上面有說(shuō),在這里只安裝了Docker & Docker-Compose,然后搭建容器環(huán)境給開(kāi)發(fā)時(shí)用。
  • 云服務(wù)器,同樣也是CentOS7環(huán)境,畢竟光開(kāi)發(fā)是不夠的,應(yīng)用到生產(chǎn)環(huán)境上才算部署完整,因?yàn)榇蟾怕饰覀儠?huì)需要Nginx轉(zhuǎn)發(fā)等配置,和開(kāi)發(fā)環(huán)境會(huì)有些許不同,所以最終目的是要在云服務(wù)器上搭建完成才可以。

步驟一:部署到開(kāi)發(fā)環(huán)境上

首先讓我們?cè)陂_(kāi)發(fā)環(huán)境上面部署好Kafka環(huán)境,然后寫(xiě)一個(gè)Spring Boot應(yīng)用去連接。

docker-compose.yml文件編寫(xiě)

version: "3"
services:
  kafka1:
    image: 'bitnami/kafka:3.3.1'
    network_mode: mynetwork
    container_name: kafka11
    user: root
    ports:
      - 9192:9092
      - 9193:9093
    environment:
      ### 通用配置
      # 允許使用kraft,即Kafka替代Zookeeper
      - KAFKA_ENABLE_KRAFT=yes
      # kafka角色,做broker,也要做controller
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      # 指定供外部使用的控制類請(qǐng)求信息
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      # 定義kafka服務(wù)端socket監(jiān)聽(tīng)端口
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      # 定義安全協(xié)議
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      # 使用Kafka時(shí)的集群id,集群內(nèi)的Kafka都要用這個(gè)id做初始化,生成一個(gè)UUID即可
      - KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
      # 集群地址
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka11:9093,2@kafka22:9093,3@kafka33:9093
      # 允許使用PLAINTEXT監(jiān)聽(tīng)器,默認(rèn)false,不建議在生產(chǎn)環(huán)境使用
      - ALLOW_PLAINTEXT_LISTENER=yes
      # 設(shè)置broker最大內(nèi)存,和初始內(nèi)存
      - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M 
      # 不允許自動(dòng)創(chuàng)建主題
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
?
      ### broker配置
      # 定義外網(wǎng)訪問(wèn)地址(宿主機(jī)ip地址和端口)
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.54:9192
      # broker.id,必須唯一
      - KAFKA_BROKER_ID=1
    volumes:
      - /home/mycontainers/kafka1/kafka/kraft:/bitnami/kafka
    #extra_hosts:
      #- "kafka1:云服務(wù)器IP"
      #- "kafka2:云服務(wù)器IP"
      #- "kafka3:云服務(wù)器IP"
  kafka2:
    image: 'bitnami/kafka:3.3.1'
    network_mode: mynetwork
    container_name: kafka22
    user: root
    ports:
      - 9292:9092
      - 9293:9093
    environment:
      ### 通用配置
      # 允許使用kraft,即Kafka替代Zookeeper
      - KAFKA_ENABLE_KRAFT=yes
      # kafka角色,做broker,也要做controller
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      # 指定供外部使用的控制類請(qǐng)求信息
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      # 定義kafka服務(wù)端socket監(jiān)聽(tīng)端口
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      # 定義安全協(xié)議
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      # 使用Kafka時(shí)的集群id,集群內(nèi)的Kafka都要用這個(gè)id做初始化,生成一個(gè)UUID即可
      - KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
      # 集群地址
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka11:9093,2@kafka22:9093,3@kafka33:9093
      # 允許使用PLAINTEXT監(jiān)聽(tīng)器,默認(rèn)false,不建議在生產(chǎn)環(huán)境使用
      - ALLOW_PLAINTEXT_LISTENER=yes
      # 設(shè)置broker最大內(nèi)存,和初始內(nèi)存
      - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M 
      # 不允許自動(dòng)創(chuàng)建主題
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
?
      ### broker配置
      # 定義外網(wǎng)訪問(wèn)地址(宿主機(jī)ip地址和端口)
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.54:9292
      # broker.id,必須唯一
      - KAFKA_BROKER_ID=2
    volumes:
      - /home/mycontainers/kafka2/kafka/kraft:/bitnami/kafka
  kafka3:
    image: 'bitnami/kafka:3.3.1'
    network_mode: mynetwork
    container_name: kafka33
    user: root
    ports:
      - 9392:9092
      - 9393:9093
    environment:
      ### 通用配置
      # 允許使用kraft,即Kafka替代Zookeeper
      - KAFKA_ENABLE_KRAFT=yes
      # kafka角色,做broker,也要做controller
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      # 指定供外部使用的控制類請(qǐng)求信息
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      # 定義kafka服務(wù)端socket監(jiān)聽(tīng)端口
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
      # 定義安全協(xié)議
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      # 使用Kafka時(shí)的集群id,集群內(nèi)的Kafka都要用這個(gè)id做初始化,生成一個(gè)UUID即可
      - KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
      # 集群地址
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka11:9093,2@kafka22:9093,3@kafka33:9093
      # 允許使用PLAINTEXT監(jiān)聽(tīng)器,默認(rèn)false,不建議在生產(chǎn)環(huán)境使用
      - ALLOW_PLAINTEXT_LISTENER=yes
      # 設(shè)置broker最大內(nèi)存,和初始內(nèi)存
      - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M 
      # 不允許自動(dòng)創(chuàng)建主題
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
?
      ### broker配置
      # 定義外網(wǎng)訪問(wèn)地址(宿主機(jī)ip地址和端口)
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.54:9392
      # broker.id,必須唯一
      - KAFKA_BROKER_ID=3
    volumes:
      - /home/mycontainers/kafka3/kafka/kraft:/bitnami/kafka

配置文件說(shuō)明: 如果你懂一些docker-compose的話,這個(gè)配置文件應(yīng)該很好明白,這里挑一些來(lái)說(shuō):

鏡像選擇:bitnami/kafka:3.3.1

這是寫(xiě)這篇文章時(shí)最新的版本

容器名是:kafka11、kafka22、kafka33

這個(gè)當(dāng)然隨便寫(xiě),我用11、22、33是為了后面演示生產(chǎn)環(huán)境時(shí)不沖突,先不用管,后面會(huì)說(shuō)

端口配置是9092和9093:

9092端口用于BROKER傳輸,即Kafka集群服務(wù)端口,我們用Kafka腳本或SpringBoot應(yīng)用時(shí),連接的就是這個(gè)端口;9093是CONTROLLER端口,前面說(shuō)過(guò),我們拋棄了Zookeeper,用Kafka來(lái)代替,這個(gè)9093就是充當(dāng)著原來(lái)Zookeeper集群的通訊端口

總結(jié)一下,9092用于外網(wǎng),因?yàn)槭荎afka要給外部訪問(wèn);9093用于內(nèi)網(wǎng),只用于集群通訊,用于內(nèi)網(wǎng)是因?yàn)榄h(huán)境都搭建在一個(gè)服務(wù)器的Docker容器內(nèi),相當(dāng)于公司服務(wù)器內(nèi)網(wǎng),所以不對(duì)外開(kāi)放,生產(chǎn)時(shí)一般會(huì)搭建在不同的服務(wù)器上面,可以是外網(wǎng)也可以是內(nèi)網(wǎng),集群的環(huán)境配置自由度很高,這點(diǎn)我就不多說(shuō)明了。

network_mode:

給容器加入網(wǎng)絡(luò),這樣才可以進(jìn)行容器間的通訊,可以讓容器1中能識(shí)別容器2的容器名稱,進(jìn)而解析出IP,很重要,不然只能手動(dòng)配置容器IP,不過(guò)這樣很蠢,容器IP不固定,會(huì)動(dòng)態(tài)變化。

當(dāng)然用–link也可以。

KAFKA_KRAFT_CLUSTER_ID:

KRaft模式下,要配置這個(gè)集群ID,這個(gè)ID可以用Kafka命令去生成:

kafka-storage.sh random-uuid

一開(kāi)始就用官網(wǎng)示例的LelM2dIFQkiUFvXCEcqRWA就可以了。

KAFKA_BROKER_ID:

broker的id,這個(gè)要唯一,搞過(guò)Kafka的懂的都懂

KAFKA_CFG_ADVERTISED_LISTENERS:

對(duì)外的訪問(wèn)地址,我配置的是192.168.1.54,因?yàn)槲疫@臺(tái)虛擬機(jī)用的是橋接網(wǎng)絡(luò),分配的IP就是192.168.1.54,這個(gè)和容器IP不一樣,容器IP只用于容器間的通訊,這個(gè)54的配置目的是可以在開(kāi)發(fā)機(jī)上(假如開(kāi)發(fā)機(jī)的IP是192.168.1.10)通過(guò):192.168.1.54:9092來(lái)連接Kafka,很重要

因?yàn)橛腥齻€(gè)Kafka容器,容器內(nèi)部都用9092和9093端口保持一致,所以端口映射就有所不同:

kafka1配置9192、9193;kafka2配置9292、9293;kafka3配置9392、9393

這個(gè)配置也很重要

extra_hosts:

這個(gè)配置的目的是修改容器內(nèi)的hosts文件,增加一個(gè)IP的別名映射,在開(kāi)發(fā)階段用不上,是后面部署生產(chǎn)環(huán)境時(shí)用的,暫時(shí)不用理會(huì),這時(shí)候可以先注釋掉。

其他的看注釋就可以了,還有的是在YML配置文件對(duì)Kafka的配置,比如KAFKA_ENABLE_KRAFT,是從Docker Hub的Kafka鏡像文檔上看的,這是一種快速配置手段,但我們同樣可以使用老辦法,將kafka的配置文件(server.properties)進(jìn)行路徑映射,如果熟悉docker應(yīng)該能理解。

運(yùn)行啟動(dòng)腳本

docker-compose -f docker-compose.yml up

如果沒(méi)有報(bào)錯(cuò),通過(guò)docker ps可以看到Kafka容器:

[root@localhost ~]# docker ps | grep kafka
80cf69513390   provectuslabs/kafka-ui:latest              "/bin/sh -c 'java $J…"   14 hours ago   Up 14 hours           0.0.0.0:17008->8080/tcp, :::17008->8080/tcp                                                          kafka-ui
c66b8c979abb   bitnami/kafka:3.3.1                        "/opt/bitnami/script…"   14 hours ago   Up 4 hours            0.0.0.0:9292->9092/tcp, :::9292->9092/tcp, 0.0.0.0:9293->9093/tcp, :::9293->9093/tcp                 kafka2
70f26172ba3e   bitnami/kafka:3.3.1                        "/opt/bitnami/script…"   14 hours ago   Up 4 hours            0.0.0.0:9392->9092/tcp, :::9392->9092/tcp, 0.0.0.0:9393->9093/tcp, :::9393->9093/tcp                 kafka3
0193e15cd92a   bitnami/kafka:3.3.1                        "/opt/bitnami/script…"   15 hours ago   Up 4 hours            0.0.0.0:9192->9092/tcp, :::9192->9092/tcp, 0.0.0.0:9193->9093/tcp, :::9193->9093/tcp                 kafka1

注意:在啟動(dòng)容器的時(shí)候,不出意外的話會(huì)出很多意外,檢查一下腳本有沒(méi)有問(wèn)題,重點(diǎn)排查KAFKA_BROKER_ID和KAFKA_CFG_ADVERTISED_LISTENERS。還有容器名稱不要沖突,區(qū)分好1、2、3,路徑映射也不要弄錯(cuò),kafka1、kafka2、kafka3。

(拓展)容器可視化頁(yè)面

如果嫌棄docker ps,可以安裝portainer來(lái)查看可視化容器頁(yè)面:

version: "3"
services:
  portainer:
    container_name: portainer
    ports:
      - 9000:9000
    restart: always
    network_mode: mynetwork
    volumes:
      - /etc/localtime:/etc/localtime
      - /var/run/docker.sock:/var/run/docker.sock
      - /home/mycontainers/portainer/data:/data:rw
    image: portainer/portainer

訪問(wèn):http://localhost:9000,即可,請(qǐng)自行修改端口和IP

(拓展)Kafka可視化頁(yè)面

kafka-ui是Apache出品的Kafka可視化容器,入門(mén)是夠用了,當(dāng)然還有很多可視化工具,由于篇幅關(guān)系,并且這不是本文重點(diǎn),所以就不細(xì)說(shuō)了。

version: "3"
services:
  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    network_mode: mynetwork
    container_name: kafka-ui
    restart: always
    ports:
      - 8080:8080
    volumes:
      - /etc/localtime:/etc/localtime
    environment:
      # 集群名稱
      - KAFKA_CLUSTERS_0_NAME=local
      # 集群地址
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka11:9092,kafka22:9092,kafka33:9092

訪問(wèn):http://localhost:8080,即可,請(qǐng)自行修改端口和IP

用腳本命令進(jìn)行測(cè)試

Kafka容器內(nèi)有自帶腳本可以使用,所以部署完Kafka環(huán)境后,最簡(jiǎn)單快速的方式就是進(jìn)入容器內(nèi)進(jìn)行測(cè)試:

進(jìn)入任意一個(gè)Kafka容器

docker exec -it kafka11 bash

創(chuàng)建一個(gè)主題,名稱為demo

kafka-topics.sh --create --topic demo --partitions 3 --replication-factor 3 --bootstrap-server kafka11:9092,kafka22:9092,kafka33:9092

查看所有主題

kafka-topics.sh --bootstrap-server kafka11:9092 --list

生產(chǎn)一些消息

kafka-console-producer.sh --bootstrap-server kafka11:9092 --topic demo

消費(fèi)一些消息

kafka-console-consumer.sh --bootstrap-server kafka11:9092 --topic demo

制造一些假數(shù)據(jù)

kafka-producer-perf-test.sh --topic demo --record-size 1024 --num-records 1000000 --throughput 10000 --producer-props bootstrap.servers=kafka11:9092 batch.size=16384 linger.ms=0

測(cè)試沒(méi)有問(wèn)題,命令可以使用,到kafka-ui的可視化頁(yè)面查看下,有我們剛剛創(chuàng)建的主題,也有我們制造的假數(shù)據(jù),三個(gè)Kafka節(jié)點(diǎn)沒(méi)有異常,Controller也進(jìn)行了選舉,說(shuō)明我們的Kafka已經(jīng)成功創(chuàng)建并運(yùn)行,還替代了原本Zookeeper的工作,至此,開(kāi)發(fā)環(huán)境搭建完成。

整合Spring Boot應(yīng)用

搭建完了Kafka環(huán)境,現(xiàn)在讓我們整合到Spring Boot中來(lái)使用

  • 創(chuàng)建一個(gè)Maven項(xiàng)目

配置pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
?
    <groupId>com.cc</groupId>
    <artifactId>kafkaDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
?
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.4.RELEASE</version>
        <relativePath/>
    </parent>
?
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.3.4.RELEASE</version>
        </dependency>
    </dependencies>
</project>

新建應(yīng)用程序啟動(dòng)類:

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

配置application.yml,去連接Kafka環(huán)境

server:
  port: 8888
?
spring:
  application:
    name: kafkaDemo
?
  kafka:
    bootstrap-servers: mylocalhost:9192,mylocalhost:9292,mylocalhost:9392
    producer:
      # 生產(chǎn)者序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # ack策略
      # 0:生產(chǎn)者發(fā)送消息就不管了,效率高,但是容易丟數(shù)據(jù),且沒(méi)有重試機(jī)制
      # 1:消息發(fā)送到Leader并落盤(pán)后就返回,如果Leader掛了并且Follower還沒(méi)有同步數(shù)據(jù)就會(huì)丟失數(shù)據(jù)
      # -1:消息要所有副本都羅盤(pán)才返回,保證數(shù)據(jù)不丟失(但是有可能重復(fù)消費(fèi))
      acks: -1
      # 失敗重試次數(shù)
      retries: 3
      # 批量提交的數(shù)據(jù)大小
      batch-size: 16384
      # 生產(chǎn)者暫存數(shù)據(jù)的緩沖區(qū)大小
      buffer-memory: 33554432
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 是否自動(dòng)提交偏移量,如果要手動(dòng)確認(rèn)消息,就要設(shè)置為false
      enable-auto-commit: false
      # 消費(fèi)消息后間隔多長(zhǎng)時(shí)間提交偏移量(ms)
      auto-commit-interval: 100
      # 默認(rèn)的消費(fèi)者組,如果不指定就會(huì)用這個(gè)
      group-id: mykafka
      # kafka意外宕機(jī)時(shí)的消息消費(fèi)策略
      # earliest:當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開(kāi)始消費(fèi);無(wú)提交的offset時(shí),從頭開(kāi)始消費(fèi)
      # latest:當(dāng)各分區(qū)下有已提交的offset時(shí),從提交的offset開(kāi)始消費(fèi);無(wú)提交的offset時(shí),消費(fèi)新產(chǎn)生的該分區(qū)下的數(shù)據(jù)
      # none:topic各分區(qū)都存在已提交的offset時(shí),從offset后開(kāi)始消費(fèi);只要有一個(gè)分區(qū)不存在已提交的offset,則拋出異常
      auto-offset-reset: latest
    listener:
      # 手動(dòng)確認(rèn)消息
      ack-mode: manual_immediate
      # 消費(fèi)者運(yùn)行的線程數(shù)
      concurrency: 2

我們只測(cè)試連接,不演示消息的生產(chǎn)和消費(fèi),現(xiàn)在關(guān)注application.yml配置文件,我們只看這個(gè)配置:

bootstrap-servers: mylocalhost:9192,mylocalhost:9292,mylocalhost:9392

mylocalhost是我在本機(jī)hosts中設(shè)置的IP別名映射,即:mylocalhost=192.168.1.54

9192、9292、9392是kafka集群容器映射 出來(lái)對(duì)外的端口,查看一下docker-compose.yml文件即可明白。

啟動(dòng)程序,應(yīng)用連接Kafka,出現(xiàn)了…JVM…字樣,表示成功連接Kafka。

步驟二:部署到生產(chǎn)環(huán)境上

生產(chǎn)環(huán)境和開(kāi)發(fā)環(huán)境有什么不同呢?不同在我們往往沒(méi)有那么多的IP資源可以對(duì)外開(kāi)放,則需要做Nginx轉(zhuǎn)發(fā)。

比如說(shuō)Kafka集群是在內(nèi)網(wǎng)機(jī)器中部署,最終由一臺(tái)代理機(jī)器轉(zhuǎn)發(fā)Kafka集群,當(dāng)然這個(gè)代理機(jī)器實(shí)際上也是要做負(fù)載均衡的,不然代理機(jī)器掛了整個(gè)集群就無(wú)了,不過(guò)Nginx的負(fù)載均衡不在本文篇幅中。

先來(lái)看看生產(chǎn)環(huán)境的docker-compose.yml:

docker-compose.yml

version: "3"
services:
  kafka1:
    image: 'bitnami/kafka:3.3.1'
    network_mode: mynetwork
    container_name: kafka1
    user: root
    environment:
      ### 通用配置
      # 允許使用kraft,即Kafka替代Zookeeper
      - KAFKA_ENABLE_KRAFT=yes
      # kafka角色,做broker,也要做controller
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      # 指定供外部使用的控制類請(qǐng)求信息
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      # 定義kafka服務(wù)端socket監(jiān)聽(tīng)端口
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:17005,CONTROLLER://:9093
      # 定義安全協(xié)議
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      # 集群地址
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
      # 允許使用PLAINTEXT監(jiān)聽(tīng)器,默認(rèn)false,不建議在生產(chǎn)環(huán)境使用
      - ALLOW_PLAINTEXT_LISTENER=yes
      # 設(shè)置broker最大內(nèi)存,和初始內(nèi)存
      - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M 
      # 不允許自動(dòng)創(chuàng)建主題
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
      # 使用Kafka時(shí)的集群id,集群內(nèi)的Kafka都要用這個(gè)id做初始化,生成一個(gè)UUID即可
      - KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
?
      ### broker配置
      # 定義外網(wǎng)訪問(wèn)地址(宿主機(jī)ip地址和端口)
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka1:17005
      # broker.id,必須唯一
      - KAFKA_BROKER_ID=1
    volumes:
      - /home/mycontainers/kafka1/kafka/kraft:/bitnami/kafka
  kafka2:
    image: 'bitnami/kafka:3.3.1'
    network_mode: mynetwork
    container_name: kafka2
    user: root
    environment:
      ### 通用配置
      # 允許使用kraft,即Kafka替代Zookeeper
      - KAFKA_ENABLE_KRAFT=yes
      # kafka角色,做broker,也要做controller
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      # 指定供外部使用的控制類請(qǐng)求信息
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      # 定義kafka服務(wù)端socket監(jiān)聽(tīng)端口
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:17005,CONTROLLER://:9093
      # 定義安全協(xié)議
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      # 集群地址
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
      # 允許使用PLAINTEXT監(jiān)聽(tīng)器,默認(rèn)false,不建議在生產(chǎn)環(huán)境使用
      - ALLOW_PLAINTEXT_LISTENER=yes
      # 設(shè)置broker最大內(nèi)存,和初始內(nèi)存
      - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M 
      # 不允許自動(dòng)創(chuàng)建主題
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
      # 使用Kafka時(shí)的集群id,集群內(nèi)的Kafka都要用這個(gè)id做初始化,生成一個(gè)UUID即可
      - KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
?
      ### broker配置
      # 定義外網(wǎng)訪問(wèn)地址(宿主機(jī)ip地址和端口)
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka2:17005
      # broker.id,必須唯一
      - KAFKA_BROKER_ID=2
?
    volumes:
      - /home/mycontainers/kafka2/kafka/kraft:/bitnami/kafka
  kafka3:
    image: 'bitnami/kafka:3.3.1'
    network_mode: mynetwork
    container_name: kafka3
    user: root
    environment:
      ### 通用配置
      # 允許使用kraft,即Kafka替代Zookeeper
      - KAFKA_ENABLE_KRAFT=yes
      # kafka角色,做broker,也要做controller
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      # 指定供外部使用的控制類請(qǐng)求信息
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      # 定義kafka服務(wù)端socket監(jiān)聽(tīng)端口
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:17005,CONTROLLER://:9093
      # 定義安全協(xié)議
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      # 集群地址
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093
      # 允許使用PLAINTEXT監(jiān)聽(tīng)器,默認(rèn)false,不建議在生產(chǎn)環(huán)境使用
      - ALLOW_PLAINTEXT_LISTENER=yes
      # 設(shè)置broker最大內(nèi)存,和初始內(nèi)存
      - KAFKA_HEAP_OPTS=-Xmx512M -Xms256M 
      # 不允許自動(dòng)創(chuàng)建主題
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
      # 使用Kafka時(shí)的集群id,集群內(nèi)的Kafka都要用這個(gè)id做初始化,生成一個(gè)UUID即可
      - KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
?
      ### broker配置
      # 定義外網(wǎng)訪問(wèn)地址(宿主機(jī)ip地址和端口)
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka3:17005
      # broker.id,必須唯一
      - KAFKA_BROKER_ID=3
    volumes:
      - /home/mycontainers/kafka3/kafka/kraft:/bitnami/kafka

注意,這里我們的容器名是kafka1、kafka2、kafka3,和開(kāi)發(fā)環(huán)境有所區(qū)別。

接下來(lái)是重點(diǎn)部分,要理解重點(diǎn)配置才可以正常使用,生產(chǎn)環(huán)境這部分我花了兩天才搞通。

docker-compose配置文件變化部分以及說(shuō)明

先重新聲明一些題要:

  • 開(kāi)發(fā)機(jī)的IP是:192.168.1.10
  • 開(kāi)發(fā)環(huán)境的虛擬機(jī)IP是:192.168.1.54(與開(kāi)發(fā)機(jī)同網(wǎng)絡(luò))
  • Kafka集群處于內(nèi)網(wǎng)環(huán)境,比如公司的內(nèi)部服務(wù)器,無(wú)法對(duì)外訪問(wèn)
  • 云服務(wù)器的IP是:a.a.a.a(自行腦補(bǔ)),云服務(wù)器就是代理服務(wù)器,可以訪問(wèn)Kafka集群服務(wù)
  • 開(kāi)發(fā)環(huán)境的Kafka容器名是:kafka11、kafka22、kafka33
  • 生產(chǎn)環(huán)境的Kafka容器名是:kafka1、kafka2、kafka3

接下來(lái)對(duì)生產(chǎn)環(huán)境的配置變動(dòng)進(jìn)行說(shuō)明:

  • 去掉端口映射,因?yàn)槲覀兊葧?huì)會(huì)創(chuàng)建一個(gè)Ngnix容器來(lái)轉(zhuǎn)發(fā),容器間已經(jīng)可以進(jìn)行通訊,所以就不需要對(duì)外端口了,除非這個(gè)Nginx容器是另外一臺(tái)機(jī)器上,那么就需要對(duì)外端口。
  • KAFKA_CFG_ADVERTISED_LISTENERS的外網(wǎng)訪問(wèn)地址從實(shí)際IP改成了kafka1:17005, 在開(kāi)發(fā)環(huán)境中,我們的開(kāi)發(fā)機(jī)子可以通過(guò)虛擬機(jī)IP來(lái)訪問(wèn)容器,所以配置192.168.1.54,但是生產(chǎn)環(huán)境這里我們不能直接訪問(wèn)了(假設(shè)生產(chǎn)環(huán)境的Kafka集群是在內(nèi)網(wǎng)),我們只能訪問(wèn)代理服務(wù)器,讓代理服務(wù)器幫忙轉(zhuǎn)發(fā)請(qǐng)求,所以這里改的:kafka1:17005,必須是代理服務(wù)器可以訪問(wèn)到的,因?yàn)槲覀兇矸?wù)器和生產(chǎn)環(huán)境的Kafka集群是同一個(gè)容器組內(nèi),所以可以訪問(wèn),這是為了便于演示,實(shí)際上代理服務(wù)器和Kafka集群肯定不會(huì)在同一臺(tái)機(jī)器內(nèi),所以就不能用:kafka1:17005,而是要用:[代理服務(wù)器可以訪問(wèn)到的Kafka集群地址]:17005
  • 17005端口替換原來(lái)的9092端口,因?yàn)槲业脑品?wù)器安全組沒(méi)有開(kāi)放9092,所以改成17005,這個(gè)端口要和等會(huì)轉(zhuǎn)發(fā)用的Nginx端口保持一致,即Nginx容器也要開(kāi)放17005端口

啟動(dòng)Nginx容器

nginx.yml:

version: "3"
services:
  nginx:
    image: nginx:latest
    network_mode: mynetwork
    container_name: nginx
    restart: always
    ports:
      - 17005:9092
    volumes:
      - /etc/localtime:/etc/localtime
      - /home/mycontainers/nginx/nginx.conf:/etc/nginx/nginx.conf
      - /home/mycontainers/nginx/logs:/var/log/nginx
      - /home/mycontainers/nginx/conf.d/:/etc/nginx/conf.d

手動(dòng)修改nginx.conf:

stream {
    upstream kafka {
        server kafka1:17005;
        server kafka2:17005;
        server kafka3:17005;
    }
?
    server {
        listen 9092;
        proxy_pass kafka;
    }
}
?
http {
    ... 
    location / {
        ...
    }
    ...
}

啟動(dòng)Nginx容器后,修改一下配置,將Kaka集群配置進(jìn)來(lái),然后監(jiān)聽(tīng)9092端口進(jìn)行轉(zhuǎn)發(fā),當(dāng)然端口可以定制,這個(gè)不重要,重要的是Nginx容器對(duì)外的17005端口,流程是這樣的:

  • 外部訪問(wèn)17005端口,映射到Nginx的9092端口
  • Nginx的9092端口對(duì)應(yīng)了Kafka的集群,Kafka集群的端口是17005,所以Nginx的對(duì)外也要是17005,這樣要保證強(qiáng)一致

至此,生產(chǎn)環(huán)境搭建完成。

最后一步:IP別名映射的重要性

還記得上面搭建開(kāi)發(fā)環(huán)境時(shí)候先注釋掉的extra_hosts配置嗎:

# 剛剛我們有進(jìn)行聲明,云服務(wù)器IP是a.a.a.a,所以去掉注釋,自行替換
extra_hosts:
    - "kafka1:云服務(wù)器IP"
    - "kafka2:云服務(wù)器IP"
    - "kafka3:云服務(wù)器IP"

現(xiàn)在用上了,這個(gè)只配置到了開(kāi)發(fā)環(huán)境的kafka11容器中,kafka22是沒(méi)有的,我們先進(jìn)入kafka22容器中去連接生產(chǎn)環(huán)境的Kafka集群看看:

進(jìn)入kafka22容器

docker exec -it kafka22 bash

連接生產(chǎn)環(huán)境的Kafka集群(通過(guò)Nginx轉(zhuǎn)發(fā))

kafka-topics.sh --bootstrap-server a.a.a.a:17005 --list

會(huì)報(bào)錯(cuò):

[2023-01-11 07:29:07,495] WARN [AdminClient clientId=adminclient-1] Error connecting to node kafka3:17005 (id: 3 rack: null) (org.apache.kafka.clients.NetworkClient)
java.net.UnknownHostException: kafka3
    at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)
    at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1519)
    at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1378)
    at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306)
    at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)
    at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)
    at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:510)
    at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:467)
    at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173)
    at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:990)
    at org.apache.kafka.clients.NetworkClient.ready(NetworkClient.java:301)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.sendEligibleCalls(KafkaAdminClient.java:1143)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.processRequests(KafkaAdminClient.java:1403)
    at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1346)
    at java.base/java.lang.Thread.run(Thread.java:829)

可以看到關(guān)鍵字:kafka3:17005,在開(kāi)發(fā)環(huán)境中連接生產(chǎn)環(huán)境報(bào)這個(gè)kafka3,而kafka3是在生產(chǎn)環(huán)境中配置的,我們開(kāi)發(fā)環(huán)境配置的是kafka33,所以很明顯,在連接Kafka的時(shí)候,會(huì)自動(dòng)去讀取集群地址,就是我們生產(chǎn)環(huán)境的docker-compose.yml中的:

      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka1:9093,2@kafka2:9093,3@kafka3:9093

雖然Kafka集群運(yùn)行起來(lái)了,Nginx轉(zhuǎn)發(fā)成功了,但是實(shí)際連接不上,因?yàn)椴恢肋@個(gè)kafka3是什么意思,所以我們還需要做最后一步:IP別名映射

extra_hosts配置

退出kafka22容器,恢復(fù)kafka11容器的extra_hosts配置:

extra_hosts:
    - "kafka1:a.a.a.a"
    - "kafka2:a.a.a.a"
    - "kafka3:a.a.a.a"

重新啟動(dòng)開(kāi)發(fā)環(huán)境的Kafka容器,然后進(jìn)入kafka11容器去連接生產(chǎn)環(huán)境,這時(shí)候就成功了,因?yàn)闀?huì)把kafka1、kafka2、kafka3都映射到a.a.a.a即代理服務(wù)器的公網(wǎng)IP,所以至此,生產(chǎn)環(huán)境搭建完成。

開(kāi)發(fā)機(jī)修改hosts

我們現(xiàn)在知道,連接生產(chǎn)環(huán)境的Kafka獲取的集群地址是kafka1、kafka2、kafka3,所以在開(kāi)發(fā)機(jī)中我們同樣需要修改hosts配置,映射實(shí)際的公網(wǎng)IP,不然無(wú)法識(shí)別。

以上就是詳細(xì)講解Docker-Compose部署Kafka KRaft集群環(huán)境的詳細(xì)內(nèi)容,更多關(guān)于Docker Compose部署Kafka KRaft的資料請(qǐng)關(guān)注其它相關(guān)文章!

分享到:
標(biāo)簽:服務(wù)器 環(huán)境 講解 部署 集群
用戶無(wú)頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過(guò)答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定