Apache Kafka 是一個(gè)開(kāi)源的分布式事件流平臺(tái),被眾多公司用于高性能數(shù)據(jù)管道、流分析、數(shù)據(jù)集成等。本章將討論 kafka 的自動(dòng)化部署。
什么是事件流: “事件流” 是按時(shí)間排序的一系列業(yè)務(wù)事件。事件流處理平臺(tái)實(shí)時(shí)從事件源 (比如數(shù)據(jù)庫(kù)、移動(dòng)設(shè)備、傳感器、應(yīng)用程序等) 捕獲數(shù)據(jù),持久化存儲(chǔ)這些事件流以供檢索、處理和響應(yīng)事件流,并根據(jù)需要將事件流路由到不同的目標(biāo)技術(shù)。因此,事件流可確保數(shù)據(jù)的連續(xù)性、正確解釋事件,從而使正確的信息在正確的時(shí)間出現(xiàn)在正確的位置。
本章包含以下主題:
- Kafka 介紹
- 部署環(huán)境介紹
- 自動(dòng)化部署 Kafka
- 驗(yàn)證
- 總結(jié)
Kafka 是一個(gè)分布式系統(tǒng),由服務(wù)端和客戶端組成,通過(guò) TCP 網(wǎng)絡(luò)協(xié)議通信。它可以部署在物理機(jī)、虛擬機(jī)或者容器上。
Servers:Kafka 以一個(gè)或多個(gè) Servers 集群的形式運(yùn)行,可以跨多個(gè)數(shù)據(jù)中心或云區(qū)域。提供存儲(chǔ)層的 Servers 被成為 Brokers。其他 Servers 運(yùn)行 Kafka Connect,以事件流的形式持續(xù)導(dǎo)入和導(dǎo)出數(shù)據(jù),將 Kafka 與現(xiàn)有的系統(tǒng) (如關(guān)系型數(shù)據(jù)庫(kù)) 以及其他 Kafka 集群集成。Kafka 集群具有高度的可擴(kuò)展性和容錯(cuò)性,如果任何一個(gè) Server 故障,其他 Server 將接管工作,以確保連續(xù)運(yùn)行,沒(méi)有任何數(shù)據(jù)損失。
Clients:通過(guò) Client 可以編寫分布式應(yīng)用和微服務(wù),以并行、大規(guī)模的方式讀取、寫入和處理事件流,即使在網(wǎng)絡(luò)問(wèn)題或機(jī)器故障的情況下也能容錯(cuò)。
36.1.1 主要概念和術(shù)語(yǔ)
Event (事件) 記錄了真實(shí) “發(fā)生的事情”,在文檔中也被稱為記錄或消息。當(dāng)向 Kafka 讀寫數(shù)據(jù)時(shí),是以 event 的形式進(jìn)行的。從概念上講,Event 包含鍵、值、時(shí)間戳和可選的元數(shù)據(jù)頭。如示例 36.1。
示例 36.1,事件示例:
Event key: "Official accounts" Event value: "sretech" Event timestamp: "Dec. 24, 2022 at 2:06 p.m."
Producers (生產(chǎn)者) 是向 Kafka 發(fā)布 (寫) events 的客戶端應(yīng)用程序。Consumers 是訂閱 (讀和處理) events 的應(yīng)用程序。在 Kafka 中,Producers 和 consumers 完全解耦,互不相干,這是實(shí)現(xiàn) Kafka 高度可擴(kuò)展性的一個(gè)關(guān)鍵設(shè)計(jì)因素。例如,Producers 從來(lái)不需要等待 consumers。
Topics (主題) 用于分類存儲(chǔ) events。可以將 topics 想象成文件夾,而 events 就是文件夾中的文件。Kafka 中的 topics 可以多 producers 和多 consumers:一個(gè) topic 可以有零個(gè)、一個(gè)或多個(gè) producers 向其寫入事件,也可以有零個(gè)、一個(gè)或多個(gè) consumers 訂閱這些事件。一個(gè)主題中的事件可以根據(jù)需要反復(fù)讀取——與傳統(tǒng)的消息傳遞系統(tǒng)不同,事件在消費(fèi)后不會(huì)被刪除。可以通過(guò)每個(gè)主題的配置設(shè)置來(lái)定義 Kafka 應(yīng)該保留事件多長(zhǎng)時(shí)間,之后舊的事件會(huì)被丟棄。Kafka 的性能相對(duì)于數(shù)據(jù)大小來(lái)說(shuō)是有效恒定的,所以存儲(chǔ)數(shù)據(jù)很長(zhǎng)時(shí)間是完全可以的。
主題是分區(qū) (partitioned) 的,一個(gè)主題被分散到位于不同 Kafka brokers 的若干 “桶” 中。這種分布式的數(shù)據(jù)放置對(duì)可擴(kuò)展性非常重要,因?yàn)樗试S客戶端應(yīng)用程序同時(shí)從/向多個(gè) brokers 讀取和寫入數(shù)據(jù)。當(dāng)一個(gè)新的事件被發(fā)布到一個(gè)主題時(shí),它實(shí)際上是被附加到主題的一個(gè)分區(qū)中。具有相同 key 的事件(例如,customer 或 vehicle ID)被寫入相同的分區(qū),Kafka 保證一個(gè)給定的主題分區(qū)的任何消費(fèi)者將始終以完全相同的順序讀取該分區(qū)的事件。
圖 36.1 圖片來(lái)自 Kafka 官網(wǎng)
這個(gè)例子的主題有四個(gè)分區(qū) P1-P4。兩個(gè)不同的生產(chǎn)者客戶端通過(guò)網(wǎng)絡(luò)向主題的分區(qū)寫入事件,向主題發(fā)布新的事件,彼此獨(dú)立。具有相同密鑰的事件(在圖中用顏色表示)被寫到同一個(gè)分區(qū)中。注意,如果合適的話,兩個(gè)生產(chǎn)者都可以寫到同一個(gè)分區(qū)。
為了使數(shù)據(jù)具有容錯(cuò)性和高可用性,每個(gè)主題都可以被復(fù)制,甚至跨地理區(qū)域或數(shù)據(jù)中心,這樣總有多個(gè) Broker 擁有數(shù)據(jù)的副本,以防出錯(cuò)。一個(gè)常見(jiàn)的生產(chǎn)設(shè)置是復(fù)制系數(shù)為3,也就是說(shuō),數(shù)據(jù)總是有三個(gè)副本。這種復(fù)制是在主題分區(qū)的層面上進(jìn)行的。
36.2 部署環(huán)境介紹
完成本教程至少需要四個(gè)節(jié)點(diǎn):一個(gè)節(jié)點(diǎn)作為 Ansible 控制節(jié)點(diǎn),三個(gè)節(jié)點(diǎn)部署 zookeeper 集群及 Kafka 集群。
Kafka 可以基于 Zookeeper 部署,也可以基于 KRaft 協(xié)議部署。KRaft 的方式部署起來(lái)相對(duì)簡(jiǎn)單,因此本教程僅演示基于 Zookeeper 的部署方式。36.2.1 節(jié)點(diǎn)信息
四個(gè)節(jié)點(diǎn)分別為 2 核 CPU、2 GB 內(nèi)存的虛擬機(jī)。
節(jié)點(diǎn)信息:
# 系統(tǒng)版本:Rocky linux release 9.1
ansible Inventory hosts:
[zookeeper] zk1.server.aiops.red zk2.server.aiops.red zk3.server.aiops.red [kafka] kafka1.server.aiops.red kafka2.server.aiops.red kafka3.server.aiops.red
36.2.2 節(jié)點(diǎn)要求
為使安裝過(guò)程順利進(jìn)行,節(jié)點(diǎn)應(yīng)滿足以下要求。
36.2.2.1 時(shí)鐘同步
Zookeeper、Kafka 集群節(jié)點(diǎn)時(shí)鐘須保持同步。
要自動(dòng)化實(shí)現(xiàn)時(shí)鐘同步,可以參考 “Linux 9 自動(dòng)化部署 NTP 服務(wù)”。
36.2.2.2 主機(jī)名解析
Ansible 控制節(jié)點(diǎn)能夠解析 Zookeeper、Kafka 節(jié)點(diǎn)的主機(jī)名,通過(guò)主機(jī)名訪問(wèn)節(jié)點(diǎn)。
要實(shí)現(xiàn)主機(jī)名稱解析,可以在 Ansible 控制節(jié)點(diǎn)的 /etc/hosts 文件中指定節(jié)點(diǎn)的 IP、主機(jī)名條目,或者參考 “Linux 9 自動(dòng)化部署 DNS 服務(wù)” 一文配置 DNS 服務(wù)。
36.2.2.3 主機(jī)名解析
Ansible 控制節(jié)點(diǎn)可以免密登錄各節(jié)點(diǎn),并能夠免密執(zhí)行sudo。可以參考 “Linux 9 自動(dòng)化部署 NTP 服務(wù)” 中的 “部署環(huán)境要求” 一節(jié)實(shí)現(xiàn)。
36.2.2.4 依賴的服務(wù)及組件
Zookeeper、Kafka 集群的安裝依賴 JDK。JDK 的自動(dòng)化安裝可以參考 ”Linux 9 自動(dòng)化部署 JDK“。
Zookeeper 集群的自動(dòng)化安裝請(qǐng)參考 “Linux 9 自動(dòng)化部署 Zookeeper 集群”。
36.3 自動(dòng)化部署 Kafka
自動(dòng)化部署 Kafka 集群通過(guò)兩個(gè) Ansible Role 實(shí)現(xiàn):一個(gè)用于下載、分發(fā) Kafka 安裝包,一個(gè)用于部署 Kafka 集群。
使用獨(dú)立的用戶運(yùn)行應(yīng)用程序,能夠起到一定程度的隔離性與安全性。因此,首先創(chuàng)建運(yùn)行 Kafka 服務(wù)的用戶。
36.3.1 創(chuàng)建用戶
創(chuàng)建用戶是一個(gè)常見(jiàn)的操作,因此為創(chuàng)建用戶編寫一個(gè)獨(dú)立的 Ansible Role。
創(chuàng)建 create_user Role:
ansible-galaxy role init --init-path ~/roles create_user cd ~/roles/create_user/
圖 36.2 提供創(chuàng)建用戶的角色
編輯 tasks/main.yml 文件,內(nèi)容如下:
--- # tasks file for create_user - name: create user task ansible.builtin.user: name: "{{ item.0 }}" home: "{{ item.1 }}" shell: "{{ item.2 }}" with_together: - "{{ user_name }}" - "{{ user_dir }}" - "{{ user_shell }}"
在ansible.builtin.user任務(wù)中包含了三個(gè)列表類型的變量,user_name 定義創(chuàng)建的用戶名稱,user_dir 定義創(chuàng)建的用戶家目錄,user_shell 定義創(chuàng)建的用戶 Shell。之所以使用列表類型的變量,是為了滿足一次性創(chuàng)建多個(gè)用戶的需求。可以為變量指定一個(gè)或多個(gè)值,對(duì)應(yīng)創(chuàng)建一個(gè)或多個(gè)用戶。在指定多個(gè)值時(shí),要注意三個(gè)變量的順序要一致。這三個(gè)變量,將在 Playbook 中提供。
36.3.2 下載并分發(fā) Kafka 軟件包
Kafka 下載地址:https://kafka.apache.org/downloads,當(dāng)前穩(wěn)定版是 3.3.1。
創(chuàng)建下載 Kafka 軟件包的 Ansible Role。可以把該角色抽象為下載所有二進(jìn)制 tarball,而不是只針對(duì) Kafka:
ansible-galaxy role init --init-path ~/roles download_binary_tarball cd ~/roles/download_binary_tarball/
圖 36.3 創(chuàng)建下載二進(jìn)制包的角色
編輯 download_binary_tarball 角色的 tasks/main.yml 文件,添加下載軟件包的任務(wù),內(nèi)容如下:
--- # tasks file for download_binary_tarball - name: create download and unarchive directory task ansible.builtin.file: path: "{{ item }}" state: directory with_items: "{{ unarchive_path }}" - name: download and unarchive tarball task ansible.builtin.unarchive: src: "{{ item.0 }}" dest: "{{ item.1 }}" remote_src: true extra_opts: - --strip-components=1 with_together: - "{{ package_url }}" - "{{ unarchive_path }}"
在任務(wù)的主配置文件中添加了兩個(gè)任務(wù):ansible.builtin.file任務(wù)創(chuàng)建存儲(chǔ)下載軟件包的目錄;ansible.builtin.unarchive任務(wù)將軟件包解壓到ansible.builtin.file創(chuàng)建的對(duì)應(yīng)目錄中。
任務(wù)使用了兩個(gè)列表類型的變量:unarchive_path 和 package_url,這樣既可以解壓?jiǎn)蝹€(gè)軟件包,也可以同時(shí)解壓多個(gè)軟件包。下載、解壓多個(gè)軟件包時(shí),為這兩個(gè)變量定義多個(gè)值,但它們之前的順序要對(duì)應(yīng)。也就是說(shuō),package_url 第一個(gè)值指定的軟件包將被解壓到 unarchive_path 第一個(gè)值指定的目錄中。
這兩個(gè)變量在具體的 Playbook 中定義,這樣在部署任何二進(jìn)制 tarball 時(shí),都可以使用 download_binary_tarball 角色下載軟件包。
創(chuàng)建 ~/playbooks/deploy_kafka/ 目錄,在該目錄下創(chuàng)建 download_kafka.yaml Playbook 文件:
mkdir ~/playbooks/deploy_kafka cd ~/playbooks/deploy_kafka vim download_kafka.yaml
圖 36.4 創(chuàng)建下載 Kafka 的 Playbook
download_kafka.yaml 內(nèi)容如下:
--- - name: download kafka binary tarball play hosts: localhost become: false gather_facts: false vars_files: - vars.yaml roles: - role: download_binary_tarball tags: download_binary_tarball - name: create user play hosts: kafka become: true gather_facts: false vars_files: - vars.yaml roles: - role: create_user tags: create_user - name: distribute binary packages play hosts: kafka become: true gather_facts: false vars_files: - vars.yaml tasks: - name: copy the installation file task ansible.posix.synchronize: src: "{{ unarchive_path.0 }}" dest: "{{ user_dir.0 }}" - name: set the file owner task ansible.builtin.shell: "chown {{ user_name.0 }}.{{ user_name.0 }} {{ user_dir.0 }} -R" ...
download_kafka.yaml Playbook 文件中包含了三個(gè) Play。第一個(gè) Play 使用 download_binary_tarball Role,將 Kafka 二進(jìn)制文件下載到 localhost 主機(jī)。第二個(gè) Play 使用 create_user Role 在托管節(jié)點(diǎn)上創(chuàng)建運(yùn)行 Kafka 服務(wù)的用戶。第三個(gè) Play 中包含兩個(gè)任務(wù),作用是將 Kafka 文件分發(fā)到 Kafka 托管節(jié)點(diǎn),并設(shè)置文件屬主及數(shù)組為運(yùn)行 Kafka 的用戶。
創(chuàng)建 vars.yaml 變量文件,在文件中定義 unarchive_path 和 package_url 變量:
mkdir vars vim vars/vars.yaml
圖 36.5 創(chuàng)建變量文件
vars.yaml 文件內(nèi)容如下:
package_url: - https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz unarchive_path: - ~/software/kafka_tools/kafka user_name: - sretech user_dir: - /opt/sretech user_shell: - /sbin/nologin
執(zhí)行 download_kafka.yaml Playbook,下載 Kafka 軟件包:
ansible-playbook download_kafka.yaml
圖 36.6 下載并分發(fā) Kafka 安裝文件
查看 Kafka 安裝文件:
ssh kafka1.server.aiops.red sudo tree -L 1 /opt/sretech/kafka
圖 36.7 查看 Kafka 安裝文件
36.3.3 部署 Kafka 集群
創(chuàng)建部署 Kafka 集群的 Ansible 角色:
ansible-galaxy role init --init-path ~/roles deploy_kafka cd ~/roles/deploy_kafka/
圖 36.8 創(chuàng)建部署 Kafka 的角色
36.3.3.1 創(chuàng)建日志目錄
為 Kafka 服務(wù)提供日志目錄。編輯 tasks/main.yml 文件,添加以下任務(wù):
- name: create logs directory task ansible.builtin.file: path: "{{ logs_dir }}" state: directory owner: "{{ user_name.0 }}" group: "{{ user_name.0 }}"
36.3.3.2 設(shè)置 Local Facts
在 Kafka 集群配置中,需要設(shè)置 broker.id,用以區(qū)分 broker。Kafka 在啟動(dòng)時(shí)會(huì)在 zookeeper 中 /brokers/ids 路徑下創(chuàng)建一個(gè)以當(dāng)前 broker.id 為名稱的虛節(jié)點(diǎn),Kafka 的健康狀態(tài)檢查就依賴于此節(jié)點(diǎn)。當(dāng) broker 下線時(shí),該虛節(jié)點(diǎn)會(huì)自動(dòng)刪除,其他 broker 或者客戶端通過(guò)判斷 /brokers/ids 路徑下是否有此 broker 的 id 來(lái)確定該 broker 的健康狀態(tài)。
創(chuàng)建三個(gè) Facts 文件:
for bid in $(seq 0 2); do echo -e "[kafka]nbroker_id=${bid}" > broker${bid}.fact; done
圖 36.9 創(chuàng)建 Local Facts 變量
在 Kafka 節(jié)點(diǎn)上創(chuàng)建 /etc/ansible/facts.d/ 目錄:
ansible kafka -m file -a "path=/etc/ansible/facts.d state=directory" -b
將三個(gè)文件拷貝到對(duì)應(yīng)節(jié)點(diǎn)的 /etc/ansible/facts.d/ 目錄:
ansible kafka1.server.aiops.red -m copy -a "src=broker0.fact dest=/etc/ansible/facts.d/kf.fact" -b ansible kafka2.server.aiops.red -m copy -a "src=broker1.fact dest=/etc/ansible/facts.d/kf.fact" -b ansible kafka3.server.aiops.red -m copy -a "src=broker2.fact dest=/etc/ansible/facts.d/kf.fact" -b
36.3.3.3 配置文件模板
為 Kafka 服務(wù)提供配置文件模板,在 templates/ 目錄下創(chuàng)建 server.properties.j2 文件,內(nèi)容如下:
broker.id={{ ansible_local.kf.kafka.broker_id }} log.dirs={{ logs_dir }} zookeeper.connect={% for host in groups['zookeeper'] %}{{ host }}:{{ zookeeper_port }}{% if not loop.last %},{% endif %}{% endfor %}
在 tasks/main.yml 文件中新增任務(wù),將 server.properties.j2 模板文件拷貝到托管節(jié)點(diǎn):
- name: generate configuration files task ansible.builtin.template: src: server.properties.j2 dest: "{{ user_dir.0 }}/kafka/config/server.properties" owner: "{{ user_name.0 }}" group: "{{ user_name.0 }}" mode: 0644 notify: Restart kafka service handler
ansible.builtin.template任務(wù)用于生成 Kafka 配置文件,當(dāng)文件有變化時(shí),通過(guò)notify觸發(fā) Handlers。
36.3.3.4 單元文件模板
在 templates/ 目錄下創(chuàng)建systemd單元模板文件 kafka.service.j2,內(nèi)容如下:
[Unit] Description=Kafka Wants.NETwork.target After=network.target [Service] Type=simple User={{ user_name.0 }} Group= {{ user_name.0 }} ExecStart=/bin/sh -c '{{ user_dir.0 }}/kafka/bin/kafka-server-start.sh {{ user_dir.0 }}/kafka/config/server.properties > {{ logs_dir }}/start-kafka.log 2>&1' ExecStop={{ user_dir.0 }}/kafka/bin/kafka-server-stop.sh Restart=on-abnormal [Install] WantedBy=multi-user.target
在 tasks/main.yml 文件中新增任務(wù),將 kafka.service.j2 模板文件拷貝到托管節(jié)點(diǎn):
- name: generate systemd unit files task ansible.builtin.template: src: kafka.service.j2 dest: /usr/lib/systemd/system/kafka.service mode: 0644
36.3.3.5 開(kāi)啟防火墻
Kafka 啟動(dòng)后會(huì)監(jiān)聽(tīng) TCP 9092 端口。在 tasks/main.yml 文件中新增任務(wù),為該端口開(kāi)啟防火墻:
- name: turn on kafka ports in the firewalld task ansible.builtin.firewalld: port: "{{ kafka_port }}/tcp" permanent: true immediate: true state: enabled
36.3.3.6 設(shè)置 JAVA_HOME
在 Kafka 的 kafka-run-class.sh 文件中添加 JAVA_HOME 變量。編輯 tasks/main.yml 文件中新增lineinfile任務(wù):
- name: set JAVA_HOME in kafka-run-class.sh file task ansible.builtin.lineinfile: path: "{{ user_dir }}/kafka/bin/kafka-run-class.sh" line: "export JAVA_HOME=/opt/jdk19" insertafter: "/bin/bash"
36.3.3.7 啟動(dòng) Kafka 服務(wù)
在 tasks/main.yml 文件中新增任務(wù),啟動(dòng) Kafka 服務(wù),并將其設(shè)置為開(kāi)機(jī)啟動(dòng):
- name: started kafka service task ansible.builtin.systemd: name: kafka.service state: started enabled: true daemon_reload: true
36.3.3.8 部署 Kafka 集群
在 ~/playbooks/deploy_kafka/ 目錄中創(chuàng)建 deploy_kafka.yaml Playbook 文件,內(nèi)容如下:
--- - name: deploy kafka cluster play hosts: kafka become: true gather_facts: true vars_files: - vars.yaml roles: - role: deploy_kafka tags: deploy_kafka ...
在 vars/vars.yaml 文件中添加所需變量,最終的文件內(nèi)容如下:
package_url: - https://downloads.apache.org/kafka/3.3.1/kafka_2.13-3.3.1.tgz unarchive_path: - ~/software/kafka_tools/kafka user_name: - sretech user_dir: - /opt/sretech user_shell: - /sbin/nologin logs_dir: /var/log/kafka kafka_port: 9092 zookeeper_port: 2181
執(zhí)行 deploy_kafka.yaml Playbook 文件,自動(dòng)化完成 Kafka 集群的部署:
ansible-playbook deploy_kafka.yaml
圖 36.10 自動(dòng)化部署 Kafka 集群
36.4 驗(yàn)證
本小節(jié)對(duì) Kafka 服務(wù)進(jìn)行一些操作,以驗(yàn)證集群是否可用。
36.4.1 創(chuàng)建 Topic
在 kafka1.server.aiops.red 上創(chuàng)建副本為 3,分區(qū)為 1,名稱為 aiops 的 topic:
~/software/kafka_tools/kafka/bin/kafka-topics.sh --create --bootstrap-server kafka1.server.aiops.red:9092 --replication-factor 3 --partitions 1 --topic aiops
圖 36.11 在 kafka1.server.aiops.red 上創(chuàng)建名為 aiops 的 Topic
創(chuàng)建完成后,Topic 會(huì)同步到集群的另外兩個(gè) broker kafka2.server.aiops.red 和 kafka3.server.aiops.red 上。
36.4.2 查詢 Topic
通過(guò)--list命令查看 Broker 上的可用 Topic。查詢 kafka1|2|3.server.aiops.red 上的可用 Topic:
~/software/kafka_tools/kafka/bin/kafka-topics.sh --bootstrap-server kafka2.server.aiops.red:9092 --list ~/software/kafka_tools/kafka/bin/kafka-topics.sh --bootstrap-server kafka3.server.aiops.red:9092 --list
圖 36.12 查詢 Broker 上的可用 Topic
從輸出看出,集群中的三個(gè) Broker 都包含名為 aiops 的 Topic。
36.4.3 生產(chǎn)消息
向 kafka1.server.aiops.red 的 aiops Topic 寫入消息:
~/software/kafka_tools/kafka/bin/kafka-console-producer.sh --broker-list kafka1.server.aiops.red:9092 --topic aiops
圖 36.13 向 Topic 寫入數(shù)據(jù)
36.4.4 消費(fèi)消息
從 kafka2.server.aiops.red 的 aiops Topic 讀取消息:
~/software/kafka_tools/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka2.server.aiops.red:9092 --topic aiops --from-beginning
圖 36.14 從 Topic 讀取消息
36.5 總結(jié)
Kafka 是常用的消息隊(duì)列系統(tǒng),本教程演示了 Kafka 集群的自動(dòng)化部署及簡(jiǎn)單操作。教程同樣適用于其他基于 RPM 的 Linux 發(fā)行版。
來(lái)源:魏文第