日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網為廣大站長提供免費收錄網站服務,提交前請做好本站友鏈:【 網站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(50元/站),

點擊這里在線咨詢客服
新站提交
  • 網站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會員:747

Apache Kafka 是一個開源的分布式事件流平臺,被眾多公司用于高性能數據管道、流分析、數據集成等。本章將討論 kafka 的自動化部署。

 

什么是事件流: “事件流” 是按時間排序的一系列業務事件。事件流處理平臺實時從事件源 (比如數據庫、移動設備、傳感器、應用程序等) 捕獲數據,持久化存儲這些事件流以供檢索、處理和響應事件流,并根據需要將事件流路由到不同的目標技術。因此,事件流可確保數據的連續性、正確解釋事件,從而使正確的信息在正確的時間出現在正確的位置。

 

本章包含以下主題:

 

  • Kafka 介紹
  • 部署環境介紹
  • 自動化部署 Kafka
  • 驗證
  • 總結
36.1 Kafka 介紹

 

Kafka 是一個分布式系統,由服務端和客戶端組成,通過 TCP 網絡協議通信。它可以部署在物理機、虛擬機或者容器上。

Servers:Kafka 以一個或多個 Servers 集群的形式運行,可以跨多個數據中心或云區域。提供存儲層的 Servers 被成為 Brokers。其他 Servers 運行 Kafka Connect,以事件流的形式持續導入和導出數據,將 Kafka 與現有的系統 (如關系型數據庫) 以及其他 Kafka 集群集成。Kafka 集群具有高度的可擴展性和容錯性,如果任何一個 Server 故障,其他 Server 將接管工作,以確保連續運行,沒有任何數據損失。

Clients:通過 Client 可以編寫分布式應用和微服務,以并行、大規模的方式讀取、寫入和處理事件流,即使在網絡問題或機器故障的情況下也能容錯。

36.1.1 主要概念和術語

Event (事件) 記錄了真實 “發生的事情”,在文檔中也被稱為記錄或消息。當向 Kafka 讀寫數據時,是以 event 的形式進行的。從概念上講,Event 包含鍵、值、時間戳和可選的元數據頭。如示例 36.1。

示例 36.1,事件示例:

Event key: "Official accounts" Event value: "sretech" Event timestamp: "Dec. 24, 2022 at 2:06 p.m."

Producers (生產者) 是向 Kafka 發布 (寫) events 的客戶端應用程序。Consumers 是訂閱 (讀和處理) events 的應用程序。在 Kafka 中,Producers 和 consumers 完全解耦,互不相干,這是實現 Kafka 高度可擴展性的一個關鍵設計因素。例如,Producers 從來不需要等待 consumers。

Topics (主題) 用于分類存儲 events。可以將 topics 想象成文件夾,而 events 就是文件夾中的文件。Kafka 中的 topics 可以多 producers 和多 consumers:一個 topic 可以有零個、一個或多個 producers 向其寫入事件,也可以有零個、一個或多個 consumers 訂閱這些事件。一個主題中的事件可以根據需要反復讀取——與傳統的消息傳遞系統不同,事件在消費后不會被刪除。可以通過每個主題的配置設置來定義 Kafka 應該保留事件多長時間,之后舊的事件會被丟棄。Kafka 的性能相對于數據大小來說是有效恒定的,所以存儲數據很長時間是完全可以的。

主題是分區 (partitioned) 的,一個主題被分散到位于不同 Kafka brokers 的若干 “桶” 中。這種分布式的數據放置對可擴展性非常重要,因為它允許客戶端應用程序同時從/向多個 brokers 讀取和寫入數據。當一個新的事件被發布到一個主題時,它實際上是被附加到主題的一個分區中。具有相同 key 的事件(例如,customer 或 vehicle ID)被寫入相同的分區,Kafka 保證一個給定的主題分區的任何消費者將始終以完全相同的順序讀取該分區的事件。


 

圖 36.1 圖片來自 Kafka 官網

這個例子的主題有四個分區 P1-P4。兩個不同的生產者客戶端通過網絡向主題的分區寫入事件,向主題發布新的事件,彼此獨立。具有相同密鑰的事件(在圖中用顏色表示)被寫到同一個分區中。注意,如果合適的話,兩個生產者都可以寫到同一個分區。

為了使數據具有容錯性和高可用性,每個主題都可以被復制,甚至跨地理區域或數據中心,這樣總有多個 Broker 擁有數據的副本,以防出錯。一個常見的生產設置是復制系數為3,也就是說,數據總是有三個副本。這種復制是在主題分區的層面上進行的。

36.2 部署環境介紹

完成本教程至少需要四個節點:一個節點作為 Ansible 控制節點,三個節點部署 zookeeper 集群及 Kafka 集群。

 

Kafka 可以基于 Zookeeper 部署,也可以基于 KRaft 協議部署。KRaft 的方式部署起來相對簡單,因此本教程僅演示基于 Zookeeper 的部署方式。
36.2.1 節點信息

 

四個節點分別為 2 核 CPU、2 GB 內存的虛擬機。

節點信息:

# 系統版本:Rocky linux release 9.1

ansible Inventory hosts:

[zookeeper] zk1.server.aiops.red zk2.server.aiops.red zk3.server.aiops.red [kafka] kafka1.server.aiops.red kafka2.server.aiops.red kafka3.server.aiops.red36.2.2 節點要求

為使安裝過程順利進行,節點應滿足以下要求。

36.2.2.1 時鐘同步

Zookeeper、Kafka 集群節點時鐘須保持同步。

要自動化實現時鐘同步,可以參考 “Linux 9 自動化部署 NTP 服務”。

36.2.2.2 主機名解析

Ansible 控制節點能夠解析 Zookeeper、Kafka 節點的主機名,通過主機名訪問節點。

要實現主機名稱解析,可以在 Ansible 控制節點的 /etc/hosts 文件中指定節點的 IP、主機名條目,或者參考 “Linux 9 自動化部署 DNS 服務” 一文配置 DNS 服務。

36.2.2.3 主機名解析

Ansible 控制節點可以免密登錄各節點,并能夠免密執行sudo。可以參考 “Linux 9 自動化部署 NTP 服務” 中的 “部署環境要求” 一節實現。

36.2.2.4 依賴的服務及組件

Zookeeper、Kafka 集群的安裝依賴 JDK。JDK 的自動化安裝可以參考 ”Linux 9 自動化部署 JDK“。

Zookeeper 集群的自動化安裝請參考 “Linux 9 自動化部署 Zookeeper 集群”。

36.3 自動化部署 Kafka

自動化部署 Kafka 集群通過兩個 Ansible Role 實現:一個用于下載、分發 Kafka 安裝包,一個用于部署 Kafka 集群。

使用獨立的用戶運行應用程序,能夠起到一定程度的隔離性與安全性。因此,首先創建運行 Kafka 服務的用戶。

36.3.1 創建用戶

創建用戶是一個常見的操作,因此為創建用戶編寫一個獨立的 Ansible Role。

創建 create_user Role:

ansible-galaxy role init --init-path ~/roles create_user cd ~/roles/create_user/


 

圖 36.2 提供創建用戶的角色

編輯 tasks/main.yml 文件,內容如下:

--- # tasks file for create_user - name: create user task ansible.builtin.user: name: "{{ item.0 }}" home: "{{ item.1 }}" shell: "{{ item.2 }}" with_together: - "{{ user_name }}" - "{{ user_dir }}" - "{{ user_shell }}"

ansible.builtin.user任務中包含了三個列表類型的變量,user_name 定義創建的用戶名稱,user_dir 定義創建的用戶家目錄,user_shell 定義創建的用戶 Shell。之所以使用列表類型的變量,是為了滿足一次性創建多個用戶的需求。可以為變量指定一個或多個值,對應創建一個或多個用戶。在指定多個值時,要注意三個變量的順序要一致。這三個變量,將在 Playbook 中提供。

36.3.2 下載并分發 Kafka 軟件包

Kafka 下載地址:https://kafka.apache.org/downloads,當前穩定版是 3.3.1。

創建下載 Kafka 軟件包的 Ansible Role。可以把該角色抽象為下載所有二進制 tarball,而不是只針對 Kafka:

ansible-galaxy role init --init-path ~/roles download_binary_tarball cd ~/roles/download_binary_tarball/


 

圖 36.3 創建下載二進制包的角色

編輯 download_binary_tarball 角色的 tasks/main.yml 文件,添加下載軟件包的任務,內容如下:

--- # tasks file for download_binary_tarball - name: create download and unarchive directory task ansible.builtin.file: path: "{{ item }}" state: directory with_items: "{{ unarchive_path }}" - name: download and unarchive tarball task ansible.builtin.unarchive: src: "{{ item.0 }}" dest: "{{ item.1 }}" remote_src: true extra_opts: - --strip-components=1 with_together: - "{{ package_url }}" - "{{ unarchive_path }}"

在任務的主配置文件中添加了兩個任務:ansible.builtin.file任務創建存儲下載軟件包的目錄;ansible.builtin.unarchive任務將軟件包解壓到ansible.builtin.file創建的對應目錄中。

任務使用了兩個列表類型的變量:unarchive_path 和 package_url,這樣既可以解壓單個軟件包,也可以同時解壓多個軟件包。下載、解壓多個軟件包時,為這兩個變量定義多個值,但它們之前的順序要對應。也就是說,package_url 第一個值指定的軟件包將被解壓到 unarchive_path 第一個值指定的目錄中。

這兩個變量在具體的 Playbook 中定義,這樣在部署任何二進制 tarball 時,都可以使用 download_binary_tarball 角色下載軟件包。

創建 ~/playbooks/deploy_kafka/ 目錄,在該目錄下創建 download_kafka.yaml Playbook 文件:

mkdir ~/playbooks/deploy_kafka cd ~/playbooks/deploy_kafka vim download_kafka.yaml


 

圖 36.4 創建下載 Kafka 的 Playbook

download_kafka.yaml 內容如下:

--- - name: download kafka binary tarball play hosts: localhost become: false gather_facts: false vars_files: - vars.yaml roles: - role: download_binary_tarball tags: download_binary_tarball - name: create user play hosts: kafka become: true gather_facts: false vars_files: - vars.yaml roles: - role: create_user tags: create_user - name: distribute binary packages play hosts: kafka become: true gather_facts: false vars_files: - vars.yaml tasks: - name: copy the installation file task ansible.posix.synchronize: src: "{{ unarchive_path.0 }}" dest: "{{ user_dir.0 }}" - name: set the file owner task ansible.builtin.shell: "chown {{ user_name.0 }}.{{ user_name.0 }} {{ user_dir.0 }} -R" ...

download_kafka.yaml Playbook 文件中包含了三個 Play。第一個 Play 使用 download_binary_tarball Role,將 Kafka 二進制文件下載到 localhost 主機。第二個 Play 使用 create_user Role 在托管節點上創建運行 Kafka 服務的用戶。第三個 Play 中包含兩個任務,作用是將 Kafka 文件分發到 Kafka 托管節點,并設置文件屬主及數組為運行 Kafka 的用戶。

創建 vars.yaml 變量文件,在文件中定義 unarchive_path 和 package_url 變量:

mkdir vars vim vars/vars.yaml


 

圖 36.5 創建變量文件

vars.yaml 文件內容如下:

package_url: - https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz unarchive_path: - ~/software/kafka_tools/kafka user_name: - sretech user_dir: - /opt/sretech user_shell: - /sbin/nologin

執行 download_kafka.yaml Playbook,下載 Kafka 軟件包:

ansible-playbook download_kafka.yaml


 

圖 36.6 下載并分發 Kafka 安裝文件

查看 Kafka 安裝文件:

ssh kafka1.server.aiops.red sudo tree -L 1 /opt/sretech/kafka


 

圖 36.7 查看 Kafka 安裝文件

36.3.3 部署 Kafka 集群

創建部署 Kafka 集群的 Ansible 角色:

ansible-galaxy role init --init-path ~/roles deploy_kafka cd ~/roles/deploy_kafka/


 

圖 36.8 創建部署 Kafka 的角色

36.3.3.1 創建日志目錄

為 Kafka 服務提供日志目錄。編輯 tasks/main.yml 文件,添加以下任務:

- name: create logs directory task ansible.builtin.file: path: "{{ logs_dir }}" state: directory owner: "{{ user_name.0 }}" group: "{{ user_name.0 }}"36.3.3.2 設置 Local Facts

在 Kafka 集群配置中,需要設置 broker.id,用以區分 broker。Kafka 在啟動時會在 zookeeper 中 /brokers/ids 路徑下創建一個以當前 broker.id 為名稱的虛節點,Kafka 的健康狀態檢查就依賴于此節點。當 broker 下線時,該虛節點會自動刪除,其他 broker 或者客戶端通過判斷 /brokers/ids 路徑下是否有此 broker 的 id 來確定該 broker 的健康狀態。

創建三個 Facts 文件:

for bid in $(seq 0 2); do echo -e "[kafka]nbroker_id=${bid}" > broker${bid}.fact; done


 

圖 36.9 創建 Local Facts 變量

在 Kafka 節點上創建 /etc/ansible/facts.d/ 目錄:

ansible kafka -m file -a "path=/etc/ansible/facts.d state=directory" -b

將三個文件拷貝到對應節點的 /etc/ansible/facts.d/ 目錄:

ansible kafka1.server.aiops.red -m copy -a "src=broker0.fact dest=/etc/ansible/facts.d/kf.fact" -b ansible kafka2.server.aiops.red -m copy -a "src=broker1.fact dest=/etc/ansible/facts.d/kf.fact" -b ansible kafka3.server.aiops.red -m copy -a "src=broker2.fact dest=/etc/ansible/facts.d/kf.fact" -b36.3.3.3 配置文件模板

為 Kafka 服務提供配置文件模板,在 templates/ 目錄下創建 server.properties.j2 文件,內容如下:

broker.id={{ ansible_local.kf.kafka.broker_id }} log.dirs={{ logs_dir }} zookeeper.connect={% for host in groups['zookeeper'] %}{{ host }}:{{ zookeeper_port }}{% if not loop.last %},{% endif %}{% endfor %}

在 tasks/main.yml 文件中新增任務,將 server.properties.j2 模板文件拷貝到托管節點:

- name: generate configuration files task ansible.builtin.template: src: server.properties.j2 dest: "{{ user_dir.0 }}/kafka/config/server.properties" owner: "{{ user_name.0 }}" group: "{{ user_name.0 }}" mode: 0644 notify: Restart kafka service handler

ansible.builtin.template任務用于生成 Kafka 配置文件,當文件有變化時,通過notify觸發 Handlers。

36.3.3.4 單元文件模板

在 templates/ 目錄下創建systemd單元模板文件 kafka.service.j2,內容如下:

[Unit] Description=Kafka Wants.NETwork.target After=network.target [Service] Type=simple User={{ user_name.0 }} Group= {{ user_name.0 }} ExecStart=/bin/sh -c '{{ user_dir.0 }}/kafka/bin/kafka-server-start.sh {{ user_dir.0 }}/kafka/config/server.properties > {{ logs_dir }}/start-kafka.log 2>&1' ExecStop={{ user_dir.0 }}/kafka/bin/kafka-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target

在 tasks/main.yml 文件中新增任務,將 kafka.service.j2 模板文件拷貝到托管節點:

- name: generate systemd unit files task ansible.builtin.template: src: kafka.service.j2 dest: /usr/lib/systemd/system/kafka.service mode: 064436.3.3.5 開啟防火墻

Kafka 啟動后會監聽 TCP 9092 端口。在 tasks/main.yml 文件中新增任務,為該端口開啟防火墻:

- name: turn on kafka ports in the firewalld task ansible.builtin.firewalld: port: "{{ kafka_port }}/tcp" permanent: true immediate: true state: enabled36.3.3.6 設置 JAVA_HOME

在 Kafka 的 kafka-run-class.sh 文件中添加 JAVA_HOME 變量。編輯 tasks/main.yml 文件中新增lineinfile任務:

- name: set JAVA_HOME in kafka-run-class.sh file task ansible.builtin.lineinfile: path: "{{ user_dir }}/kafka/bin/kafka-run-class.sh" line: "export JAVA_HOME=/opt/jdk19" insertafter: "/bin/bash"36.3.3.7 啟動 Kafka 服務

在 tasks/main.yml 文件中新增任務,啟動 Kafka 服務,并將其設置為開機啟動:

- name: started kafka service task ansible.builtin.systemd: name: kafka.service state: started enabled: true daemon_reload: true36.3.3.8 部署 Kafka 集群

在 ~/playbooks/deploy_kafka/ 目錄中創建 deploy_kafka.yaml Playbook 文件,內容如下:

--- - name: deploy kafka cluster play hosts: kafka become: true gather_facts: true vars_files: - vars.yaml roles: - role: deploy_kafka tags: deploy_kafka ...

在 vars/vars.yaml 文件中添加所需變量,最終的文件內容如下:

package_url: - https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz unarchive_path: - ~/software/kafka_tools/kafka user_name: - sretech user_dir: - /opt/sretech user_shell: - /sbin/nologin logs_dir: /var/log/kafka kafka_port: 9092 zookeeper_port: 2181

執行 deploy_kafka.yaml Playbook 文件,自動化完成 Kafka 集群的部署:

ansible-playbook deploy_kafka.yaml


 

圖 36.10 自動化部署 Kafka 集群

36.4 驗證

本小節對 Kafka 服務進行一些操作,以驗證集群是否可用。

36.4.1 創建 Topic

在 kafka1.server.aiops.red 上創建副本為 3,分區為 1,名稱為 aiops 的 topic:

~/software/kafka_tools/kafka/bin/kafka-topics.sh --create --bootstrap-server kafka1.server.aiops.red:9092 --replication-factor 3 --partitions 1 --topic aiops

圖 36.11 在 kafka1.server.aiops.red 上創建名為 aiops 的 Topic

創建完成后,Topic 會同步到集群的另外兩個 broker kafka2.server.aiops.red 和 kafka3.server.aiops.red 上。

36.4.2 查詢 Topic

通過--list命令查看 Broker 上的可用 Topic。查詢 kafka1|2|3.server.aiops.red 上的可用 Topic:

~/software/kafka_tools/kafka/bin/kafka-topics.sh --bootstrap-server kafka2.server.aiops.red:9092 --list ~/software/kafka_tools/kafka/bin/kafka-topics.sh --bootstrap-server kafka3.server.aiops.red:9092 --list


 

圖 36.12 查詢 Broker 上的可用 Topic

從輸出看出,集群中的三個 Broker 都包含名為 aiops 的 Topic。

36.4.3 生產消息

向 kafka1.server.aiops.red 的 aiops Topic 寫入消息:

~/software/kafka_tools/kafka/bin/kafka-console-producer.sh --broker-list kafka1.server.aiops.red:9092 --topic aiops

圖 36.13 向 Topic 寫入數據

36.4.4 消費消息

從 kafka2.server.aiops.red 的 aiops Topic 讀取消息:

~/software/kafka_tools/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka2.server.aiops.red:9092 --topic aiops --from-beginning


 

圖 36.14 從 Topic 讀取消息

36.5 總結

Kafka 是常用的消息隊列系統,本教程演示了 Kafka 集群的自動化部署及簡單操作。教程同樣適用于其他基于 RPM 的 Linux 發行版。

 

來源:魏文第

分享到:
標簽:Kafka
用戶無頭像

網友整理

注冊時間:

網站:5 個   小程序:0 個  文章:12 篇

  • 51998

    網站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會員

趕快注冊賬號,推廣您的網站吧!
最新入駐小程序

數獨大挑戰2018-06-03

數獨一種數學游戲,玩家需要根據9

答題星2018-06-03

您可以通過答題星輕松地創建試卷

全階人生考試2018-06-03

各種考試題,題庫,初中,高中,大學四六

運動步數有氧達人2018-06-03

記錄運動步數,積累氧氣值。還可偷

每日養生app2018-06-03

每日養生,天天健康

體育訓練成績評定2018-06-03

通用課目體育訓練成績評定