在這個(gè)數(shù)據(jù)驅(qū)動(dòng)的時(shí)代,信息的處理和分析變得越來越重要。而在眾多的大數(shù)據(jù)處理框架中,「Apache Spark」以其獨(dú)特的優(yōu)勢(shì)脫穎而出。
本篇文章,我們將一起走進(jìn)Spark的世界,探索并理解其相關(guān)的基礎(chǔ)概念和使用方法。本文主要目標(biāo)是讓初學(xué)者能夠?qū)park有一個(gè)全面的認(rèn)識(shí),并能實(shí)際應(yīng)用到各類問題的解決之中。
一、Spark是什么
學(xué)習(xí)一個(gè)東西之前先要知道這個(gè)東西是什么。
Spark 是一個(gè)開源的大數(shù)據(jù)處理引擎,它提供了一整套開發(fā) API,包括流計(jì)算和機(jī)器學(xué)習(xí)。它支持批處理和流處理。
Spark 的一個(gè)顯著特點(diǎn)是它能夠在內(nèi)存中進(jìn)行迭代計(jì)算,從而加快數(shù)據(jù)處理速度。盡管 Spark 是用 Scala 開發(fā)的,但它也為 JAVA、Scala、Python/ target=_blank class=infotextkey>Python 和 R 等高級(jí)編程語言提供了開發(fā)接口。
1.Spark組件
Spark提供了6大核心組件:
- Spark Core
- Spark SQL
- Spark Streaming
- Spark MLlib
- Spark GraphX
(1) Spark Core
Spark Core 是 Spark 的基礎(chǔ),它提供了內(nèi)存計(jì)算的能力,是分布式處理大數(shù)據(jù)集的基礎(chǔ)。它將分布式數(shù)據(jù)抽象為彈性分布式數(shù)據(jù)集(RDD),并為運(yùn)行在其上的上層組件提供 API。所有 Spark 的上層組件都建立在 Spark Core 的基礎(chǔ)之上。
(2) Spark SQL
Spark SQL 是一個(gè)用于處理結(jié)構(gòu)化數(shù)據(jù)的 Spark 組件。它允許使用 SQL 語句查詢數(shù)據(jù)。Spark 支持多種數(shù)據(jù)源,包括 Hive 表、Parquet 和 JSON 等。
(3) Spark Streaming
Spark Streaming 是一個(gè)用于處理動(dòng)態(tài)數(shù)據(jù)流的 Spark 組件。它能夠開發(fā)出強(qiáng)大的交互和數(shù)據(jù)查詢程序。在處理動(dòng)態(tài)數(shù)據(jù)流時(shí),流數(shù)據(jù)會(huì)被分割成微小的批處理,這些微小批處理將會(huì)在 Spark Core 上按時(shí)間順序快速執(zhí)行。
(4) Spark MLlib
Spark MLlib 是 Spark 的機(jī)器學(xué)習(xí)庫(kù)。它提供了常用的機(jī)器學(xué)習(xí)算法和實(shí)用程序,包括分類、回歸、聚類、協(xié)同過濾、降維等。MLlib 還提供了一些底層優(yōu)化原語和高層流水線 API,可以幫助開發(fā)人員更快地創(chuàng)建和調(diào)試機(jī)器學(xué)習(xí)流水線。
(5) Spark GraphX
Spark GraphX 是 Spark 的圖形計(jì)算庫(kù)。它提供了一種分布式圖形處理框架,可以幫助開發(fā)人員更快地構(gòu)建和分析大型圖形。
2.Spark的優(yōu)勢(shì)
Spark 有許多優(yōu)勢(shì),其中一些主要優(yōu)勢(shì)包括:
- 速度:Spark 基于內(nèi)存計(jì)算,能夠比基于磁盤的計(jì)算快很多。對(duì)于迭代式算法和交互式數(shù)據(jù)挖掘任務(wù),這種速度優(yōu)勢(shì)尤為明顯。
- 易用性:Spark 支持多種語言,包括 Java、Scala、Python 和 R。它提供了豐富的內(nèi)置 API,可以幫助開發(fā)人員更快地構(gòu)建和運(yùn)行應(yīng)用程序。
- 通用性:Spark 提供了多種組件,可以支持不同類型的計(jì)算任務(wù),包括批處理、交互式查詢、流處理、機(jī)器學(xué)習(xí)和圖形處理等。
- 兼容性:Spark 可以與多種數(shù)據(jù)源集成,包括 Hadoop 分布式文件系統(tǒng)(HDFS)、Apache Cassandra、Apache HBase 和 Amazon S3 等。
- 容錯(cuò)性:Spark 提供了彈性分布式數(shù)據(jù)集(RDD)抽象,可以幫助開發(fā)人員更快地構(gòu)建容錯(cuò)應(yīng)用程序。
3.word Count
上手寫一個(gè)簡(jiǎn)單的代碼例子,下面是一個(gè)Word Count的Spark程序:
import org.apache.spark.{SparkConf, SparkContext}
object SparkWordCount {
def mAIn (args:Array [String]): Unit = {
//setMaster("local[9]") 表示在本地運(yùn)行 Spark 程序,使用 9 個(gè)線程。local[*] 表示使用所有可用的處理器核心。
//這種模式通常用于本地測(cè)試和開發(fā)。
val conf = new SparkConf ().setAppName ("Word Count").setMaster("local[9]");
val sc = new SparkContext (conf);
sc.setLogLevel("ERROR")
val data = List("Hello World", "Hello Spark")
val textFile = sc.parallelize(data)
val wordCounts = textFile.flatMap (line => line.split (" ")).map (
word => (word, 1)).reduceByKey ( (a, b) => a + b)
wordCounts.collect().foreach(println)
}
}
輸出:
(Hello,2)
(World,1)
(Spark,1)
程序首先創(chuàng)建了一個(gè) SparkConf 對(duì)象,用來設(shè)置應(yīng)用程序名稱和運(yùn)行模式。然后,它創(chuàng)建了一個(gè) SparkContext 對(duì)象,用來連接到 Spark 集群。
接下來,程序創(chuàng)建了一個(gè)包含兩個(gè)字符串的列表,并使用 parallelize 方法將其轉(zhuǎn)換為一個(gè) RDD。然后,它使用 flatMap 方法將每一行文本拆分成單詞,并使用 map 方法將每個(gè)單詞映射為一個(gè)鍵值對(duì)(key-value pair),其中鍵是單詞,值是 1。
最后,程序使用 reduceByKey 方法將具有相同鍵的鍵值對(duì)進(jìn)行合并,并對(duì)它們的值進(jìn)行求和。最終結(jié)果是一個(gè)包含每個(gè)單詞及其出現(xiàn)次數(shù)的 RDD。程序使用 collect 方法將結(jié)果收集到驅(qū)動(dòng)程序,并使用 foreach 方法打印出來。
二、Spark基本概念
Spark的理論較多,為了更有效地學(xué)習(xí)Spark,首先來理解下其基本概念。
1.Application
Application指的就是用戶編寫的Spark應(yīng)用程序。
如下,"Word Count"就是該應(yīng)用程序的名字。
import org.apache.spark.sql.SparkSession
object WordCount {
def main(args: Array[String]) {
// 創(chuàng)建 SparkSession 對(duì)象,它是 Spark Application 的入口
val spark = SparkSession.builder.appName("Word Count").getOrCreate()
// 讀取文本文件并創(chuàng)建 Dataset
val textFile = spark.read.textFile("hdfs://...")
// 使用 flatMap 轉(zhuǎn)換將文本分割為單詞,并使用 reduceByKey 轉(zhuǎn)換計(jì)算每個(gè)單詞的數(shù)量
val counts = textFile.flatMap(line => line.split(" "))
.groupByKey(identity)
.count()
// 將結(jié)果保存到文本文件中
counts.write.text("hdfs://...")
// 停止 SparkSession
spark.stop()
}
}
2.Driver
Driver 是運(yùn)行 Spark Application 的進(jìn)程,它負(fù)責(zé)創(chuàng)建 SparkSession 和 SparkContext 對(duì)象,并將代碼轉(zhuǎn)換和操作。
它還負(fù)責(zé)創(chuàng)建邏輯和物理計(jì)劃,并與集群管理器協(xié)調(diào)調(diào)度任務(wù)。
簡(jiǎn)而言之,Spark Application 是使用 Spark API 編寫的程序,而 Spark Driver 是負(fù)責(zé)運(yùn)行該程序并與集群管理器協(xié)調(diào)的進(jìn)程。
可以將Driver 理解為運(yùn)行 Spark Application main 方法的進(jìn)程。
driver的內(nèi)存大小可以進(jìn)行設(shè)置,配置如下:
# 設(shè)置 driver內(nèi)存大小
driver-memory 1024m
3.Master & Worker
在Spark中,Master是獨(dú)立集群的控制者,而Worker是工作者。
一個(gè)Spark獨(dú)立集群需要啟動(dòng)一個(gè)Master和多個(gè)Worker。Worker就是物理節(jié)點(diǎn),Worker上面可以啟動(dòng)Executor進(jìn)程。
4.Executor
在每個(gè)Worker上為某應(yīng)用啟動(dòng)的一個(gè)進(jìn)程,該進(jìn)程負(fù)責(zé)運(yùn)行Task,并且負(fù)責(zé)將數(shù)據(jù)存在內(nèi)存或者磁盤上。
每個(gè)任務(wù)都有各自獨(dú)立的Executor。Executor是一個(gè)執(zhí)行Task的容器。實(shí)際上它是一組計(jì)算資源(cpu核心、memory)的集合。
一個(gè)Worker節(jié)點(diǎn)可以有多個(gè)Executor。一個(gè)Executor可以運(yùn)行多個(gè)Task。
Executor創(chuàng)建成功后,在日志文件會(huì)顯示如下信息:
INFO Executor: Starting executor ID [executorId] on host [executorHostname]
5.RDD
RDD(Resilient Distributed Dataset)叫做彈性分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象,它代表一個(gè)不可變、可分區(qū)、里面的元素可并行計(jì)算的集合。
RDD的 Partition 是指數(shù)據(jù)集的分區(qū)。它是數(shù)據(jù)集中元素的集合,這些元素被分區(qū)到集群的節(jié)點(diǎn)上,可以并行操作。對(duì)于RDD來說,每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理,并決定并行計(jì)算的粒度。用戶可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù),如果沒有指定,那么就會(huì)采用默認(rèn)值。默認(rèn)值就是程序所分配到的CPU Core的數(shù)目。
一個(gè)函數(shù)會(huì)被作用在每一個(gè)分區(qū)。Spark 中 RDD 的計(jì)算是以分片為單位的,compute 函數(shù)會(huì)被作用到每個(gè)分區(qū)上。
RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD,所以RDD之間就會(huì)形成類似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark可以通過這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù),而不是對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算。
6.Job
一個(gè)Job包含多個(gè)RDD及作用于相應(yīng)RDD上的各種操作,每個(gè)Action的觸發(fā)就會(huì)生成一個(gè)job。用戶提交的Job會(huì)提交給DAG Scheduler,Job會(huì)被分解成Stage,Stage會(huì)被細(xì)化成Task。
7.Task
被發(fā)送到Executor上的工作單元。每個(gè)Task負(fù)責(zé)計(jì)算一個(gè)分區(qū)的數(shù)據(jù)。
8.Stage
在 Spark 中,一個(gè)作業(yè)(Job)會(huì)被劃分為多個(gè)階段(Stage)。同一個(gè) Stage 可以有多個(gè) Task 并行執(zhí)行(Task 數(shù)=分區(qū)數(shù))。
階段之間的劃分是根據(jù)數(shù)據(jù)的依賴關(guān)系來確定的。當(dāng)一個(gè) RDD 的分區(qū)依賴于另一個(gè) RDD 的分區(qū)時(shí),這兩個(gè) RDD 就屬于同一個(gè)階段。當(dāng)一個(gè) RDD 的分區(qū)依賴于多個(gè) RDD 的分區(qū)時(shí),這些 RDD 就屬于不同的階段。
上圖中,Stage表示一個(gè)可以順滑完成的階段。曲線表示 Shuffle 過程。
如果Stage能夠復(fù)用前面的Stage的話,那么會(huì)顯示灰色。
9.Shuffle
在 Spark 中,Shuffle 是指在不同階段之間重新分配數(shù)據(jù)的過程。它通常發(fā)生在需要對(duì)數(shù)據(jù)進(jìn)行聚合或分組操作的時(shí)候,例如 reduceByKey 或 groupByKey 等操作。
在 Shuffle 過程中,Spark 會(huì)將數(shù)據(jù)按照鍵值進(jìn)行分區(qū),并將屬于同一分區(qū)的數(shù)據(jù)發(fā)送到同一個(gè)計(jì)算節(jié)點(diǎn)上。這樣,每個(gè)計(jì)算節(jié)點(diǎn)就可以獨(dú)立地處理屬于它自己分區(qū)的數(shù)據(jù)。
10.Stage的劃分
Stage的劃分,簡(jiǎn)單來說是以寬依賴來劃分的。
對(duì)于窄依賴,Partition 的轉(zhuǎn)換處理在 Stage 中完成計(jì)算,不劃分(將窄依賴盡量放在在同一個(gè) Stage 中,可以實(shí)現(xiàn)流水線計(jì)算)。
對(duì)于寬依賴,由于有 Shuffle 的存在,只能在父 RDD 處理完成后,才能開始接下來的計(jì)算,也就是說需要?jiǎng)澐?Stage。
Spark 會(huì)根據(jù) Shuffle/寬依賴 使用回溯算法來對(duì) DAG 進(jìn)行 Stage 劃分,從后往前,遇到寬依賴就斷開,遇到窄依賴就把當(dāng)前的 RDD 加入到當(dāng)前的 Stage 階段中。
至于什么是窄依賴和寬依賴,下文馬上就會(huì)提及。
11.窄依賴 & 寬依賴
(1) 窄依賴
父 RDD 的一個(gè)分區(qū)只會(huì)被子 RDD 的一個(gè)分區(qū)依賴。比如:map,filter和union,這種依賴稱之為「窄依賴」。
窄依賴的多個(gè)分區(qū)可以并行計(jì)算,并且窄依賴的一個(gè)分區(qū)的數(shù)據(jù)如果丟失只需要重新計(jì)算對(duì)應(yīng)的分區(qū)的數(shù)據(jù)就可以了。
(2) 寬依賴
指子RDD的分區(qū)依賴于父RDD的所有分區(qū),稱之為「寬依賴」。
對(duì)于寬依賴,必須等到上一階段計(jì)算完成才能計(jì)算下一階段。
12.DAG
有向無環(huán)圖,其實(shí)說白了就是RDD之間的依賴關(guān)系圖。
- 開始:通過 SparkContext 創(chuàng)建的 RDD。
- 結(jié)束:觸發(fā) Action,一旦觸發(fā) Action 就形成了一個(gè)完整的 DAG(有幾個(gè) Action,就有幾個(gè) DAG)。
三、Spark執(zhí)行流程
Spark的執(zhí)行流程大致如下:
- 構(gòu)建Spark Application的運(yùn)行環(huán)境(啟動(dòng)SparkContext),SparkContext向資源管理器(可以是Standalone、Mesos或YARN)注冊(cè)并申請(qǐng)運(yùn)行Executor資源。
- 資源管理器為Executor分配資源并啟動(dòng)Executor進(jìn)程,Executor運(yùn)行情況將隨著“心跳”發(fā)送到資源管理器上。
- SparkContext構(gòu)建DAG圖,將DAG圖分解成多個(gè)Stage,并把每個(gè)Stage的TaskSet(任務(wù)集)發(fā)送給Task Scheduler (任務(wù)調(diào)度器)。
- Executor向SparkContext申請(qǐng)Task, Task Scheduler將Task發(fā)放給Executor,同時(shí),SparkContext將應(yīng)用程序代碼發(fā)放給Executor。
- Task在Executor上運(yùn)行,把執(zhí)行結(jié)果反饋給Task Scheduler,然后再反饋給DAG Scheduler。
- 當(dāng)一個(gè)階段完成后,Spark 會(huì)根據(jù)數(shù)據(jù)依賴關(guān)系將結(jié)果傳輸給下一個(gè)階段,并開始執(zhí)行下一個(gè)階段的任務(wù)。
- 最后,當(dāng)所有階段都完成后,Spark 會(huì)將最終結(jié)果返回給驅(qū)動(dòng)程序,并完成作業(yè)的執(zhí)行。
四、Spark運(yùn)行模式
Spark 支持多種運(yùn)行模式,包括本地模式、獨(dú)立模式、Mesos 模式、YARN 模式和 Kube.NETes 模式。
- 本地模式:在本地模式下,Spark 應(yīng)用程序會(huì)在單個(gè)機(jī)器上運(yùn)行,不需要連接到集群。這種模式適用于開發(fā)和測(cè)試,但不適用于生產(chǎn)環(huán)境。
- 獨(dú)立模式:在獨(dú)立模式下,Spark 應(yīng)用程序會(huì)連接到一個(gè)獨(dú)立的 Spark 集群,并在集群中運(yùn)行。這種模式適用于小型集群,但不支持動(dòng)態(tài)資源分配。
- Mesos 模式:在 Mesos 模式下,Spark 應(yīng)用程序會(huì)連接到一個(gè) Apache Mesos 集群,并在集群中運(yùn)行。這種模式支持動(dòng)態(tài)資源分配和細(xì)粒度資源共享,目前國(guó)內(nèi)使用較少。
- YARN 模式:在 YARN 模式下,Spark 應(yīng)用程序會(huì)連接到一個(gè) Apache Hadoop YARN 集群,并在集群中運(yùn)行。這種模式支持動(dòng)態(tài)資源分配和與其他 Hadoop 生態(tài)系統(tǒng)組件的集成,Spark在Yarn模式下是不需要Master和Worker的。
- Kubernetes 模式:在 Kubernetes 模式下,Spark 應(yīng)用程序會(huì)連接到一個(gè) Kubernetes 集群,并在集群中運(yùn)行。這種模式支持動(dòng)態(tài)資源分配和容器化部署。
五、RDD詳解
RDD的概念在Spark中十分重要,上面只是簡(jiǎn)單的介紹了一下,下面詳細(xì)的對(duì)RDD展開介紹。
RDD是“Resilient Distributed Dataset”的縮寫,從全稱就可以了解到RDD的一些典型特性:
- Resilient(彈性):RDD之間會(huì)形成有向無環(huán)圖(DAG),如果RDD丟失了或者失效了,可以從父RDD重新計(jì)算得到。即容錯(cuò)性。
- Distributed(分布式):RDD的數(shù)據(jù)是以邏輯分區(qū)的形式分布在集群的不同節(jié)點(diǎn)的。
- Dataset(數(shù)據(jù)集):即RDD存儲(chǔ)的數(shù)據(jù)記錄,可以從外部數(shù)據(jù)生成RDD,例如Json文件,CSV文件,文本文件,數(shù)據(jù)庫(kù)等。
RDD里面的數(shù)據(jù)集會(huì)被邏輯分成若干個(gè)分區(qū),這些分區(qū)是分布在集群的不同節(jié)點(diǎn)的,基于這樣的特性,RDD才能在集群不同節(jié)點(diǎn)并行計(jì)算。
1.RDD特性
- 內(nèi)存計(jì)算:Spark RDD運(yùn)算數(shù)據(jù)是在內(nèi)存中進(jìn)行的,在內(nèi)存足夠的情況下,不會(huì)把中間結(jié)果存儲(chǔ)在磁盤,所以計(jì)算速度非常高效。
- 惰性求值:所有的轉(zhuǎn)換操作都是惰性的,也就是說不會(huì)立即執(zhí)行任務(wù),只是把對(duì)數(shù)據(jù)的轉(zhuǎn)換操作記錄下來而已。只有碰到action操作才會(huì)被真正的執(zhí)行。
- 容錯(cuò)性:Spark RDD具備容錯(cuò)特性,在RDD失效或者數(shù)據(jù)丟失的時(shí)候,可以根據(jù)DAG從父RDD重新把數(shù)據(jù)集計(jì)算出來,以達(dá)到數(shù)據(jù)容錯(cuò)的效果。
- 不變性:RDD是進(jìn)程安全的,因?yàn)镽DD是不可修改的。它可以在任何時(shí)間點(diǎn)被創(chuàng)建和查詢,使得緩存,共享,備份都非常簡(jiǎn)單。在計(jì)算過程中,是RDD的不可修改特性保證了數(shù)據(jù)的一致性。
- 持久化:可以調(diào)用cache或者persist函數(shù),把RDD緩存在內(nèi)存、磁盤,下次使用的時(shí)候不需要重新計(jì)算而是直接使用。
2.RDD操作
RDD支持兩種操作:
- 轉(zhuǎn)換操作(Transformation)。
- 行動(dòng)操作(Actions)。
(1) 轉(zhuǎn)換操作(Transformation)
轉(zhuǎn)換操作以RDD做為輸入?yún)?shù),然后輸出一個(gè)或者多個(gè)RDD。轉(zhuǎn)換操作不會(huì)修改輸入RDD。Map()、Filter()這些都屬于轉(zhuǎn)換操作。
轉(zhuǎn)換操作是惰性求值操作,只有在碰到行動(dòng)操作(Actions)的時(shí)候,轉(zhuǎn)換操作才會(huì)真正實(shí)行。轉(zhuǎn)換操作分兩種:「窄依賴」和「寬依賴」。
下面是一些常見的轉(zhuǎn)換操作:
轉(zhuǎn)換操作 |
描述 |
map |
將函數(shù)應(yīng)用于 RDD 中的每個(gè)元素,并返回一個(gè)新的 RDD |
filter |
返回一個(gè)新的 RDD,其中包含滿足給定謂詞的元素 |
flatMap |
將函數(shù)應(yīng)用于 RDD 中的每個(gè)元素,并將返回的迭代器展平為一個(gè)新的 RDD |
union |
返回一個(gè)新的 RDD,其中包含兩個(gè) RDD 的元素 |
distinct |
返回一個(gè)新的 RDD,其中包含原始 RDD 中不同的元素 |
groupByKey |
將鍵值對(duì) RDD 中具有相同鍵的元素分組到一起,并返回一個(gè)新的 RDD |
reduceByKey |
將鍵值對(duì) RDD 中具有相同鍵的元素聚合到一起,并返回一個(gè)新的 RDD |
sortByKey |
返回一個(gè)新的鍵值對(duì) RDD,其中元素按照鍵排序 |
(2)行動(dòng)操作(Action)
Action是數(shù)據(jù)執(zhí)行部分,其通過執(zhí)行count,reduce,collect等方法真正執(zhí)行數(shù)據(jù)的計(jì)算部分。
Action 操作 |
描述 |
reduce |
通過函數(shù)聚合 RDD 中的所有元素 |
collect |
將 RDD 中的所有元素返回到驅(qū)動(dòng)程序 |
count |
返回 RDD 中的元素個(gè)數(shù) |
first |
返回 RDD 中的第一個(gè)元素 |
take |
返回 RDD 中的前 n 個(gè)元素 |
takeOrdered |
返回 RDD 中的前 n 個(gè)元素,按照自然順序或指定的順序排序 |
saveAsTextFile |
將 RDD 中的元素保存到文本文件中 |
foreach |
將函數(shù)應(yīng)用于 RDD 中的每個(gè)元素 |
3.RDD 的創(chuàng)建方式
創(chuàng)建RDD有3種不同方式:
- 從外部存儲(chǔ)系統(tǒng)。
- 從其他RDD。
- 由一個(gè)已經(jīng)存在的 Scala 集合創(chuàng)建。
(1) 從外部存儲(chǔ)系統(tǒng)
由外部存儲(chǔ)系統(tǒng)的數(shù)據(jù)集創(chuàng)建,包括本地的文件系統(tǒng),還有所有 Hadoop 支持的數(shù)據(jù)集,比如 HDFS、Cassandra、HBase 等:
val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")
(2) 從其他RDD
通過已有的 RDD 經(jīng)過算子轉(zhuǎn)換生成新的 RDD:
val rdd2=rdd1.flatMap(_.split(" "))
(3) 由一個(gè)已經(jīng)存在的 Scala 集合創(chuàng)建
val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
或者
val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))
其實(shí)makeRDD 方法底層調(diào)用了 parallelize 方法:
4.RDD 緩存機(jī)制
RDD 緩存是在內(nèi)存存儲(chǔ)RDD計(jì)算結(jié)果的一種優(yōu)化技術(shù)。把中間結(jié)果緩存起來以便在需要的時(shí)候重復(fù)使用,這樣才能有效減輕計(jì)算壓力,提升運(yùn)算性能。
要持久化一個(gè)RDD,只要調(diào)用其cache()或者persist()方法即可。在該RDD第一次被計(jì)算出來時(shí),就會(huì)直接緩存在每個(gè)節(jié)點(diǎn)中。而且Spark的持久化機(jī)制還是自動(dòng)容錯(cuò)的,如果持久化的RDD的任何partition丟失了,那么Spark會(huì)自動(dòng)通過其源RDD,使用transformation操作重新計(jì)算該partition。
val rdd1 = sc.textFile("hdfs://node01:8020/words.txt")
val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2.cache //緩存/持久化
rdd2.sortBy(_._2,false).collect//觸發(fā)action,會(huì)去讀取HDFS的文件,rdd2會(huì)真正執(zhí)行持久化
rdd2.sortBy(_._2,false).collect//觸發(fā)action,會(huì)去讀緩存中的數(shù)據(jù),執(zhí)行速度會(huì)比之前快,因?yàn)閞dd2已經(jīng)持久化到內(nèi)存中了
需要注意的是,在觸發(fā)action的時(shí)候,才會(huì)去執(zhí)行持久化。
cache()和persist()的區(qū)別在于,cache()是persist()的一種簡(jiǎn)化方式,cache()的底層就是調(diào)用的persist()的無參版本,就是調(diào)用persist(MEMORY_ONLY),將數(shù)據(jù)持久化到內(nèi)存中。
如果需要從內(nèi)存中去除緩存,那么可以使用unpersist()方法。
rdd.persist(StorageLevel.MEMORY_ONLY)
rdd.unpersist()
5.存儲(chǔ)級(jí)別
RDD存儲(chǔ)級(jí)別主要有以下幾種。
級(jí)別 |
使用空間 |
CPU時(shí)間 |
是否在內(nèi)存中 |
是否在磁盤上 |
備注 |
MEMORY_ONLY |
高 |
低 |
是 |
否 |
使用未序列化的Java對(duì)象格式,將數(shù)據(jù)保存在內(nèi)存中。如果內(nèi)存不夠存放所有的數(shù)據(jù),則數(shù)據(jù)可能就不會(huì)進(jìn)行持久化。 |
MEMORY_ONLY_2 |
高 |
低 |
是 |
否 |
數(shù)據(jù)存2份 |
MEMORY_ONLY_SER |
低 |
高 |
是 |
否 |
基本含義同MEMORY_ONLY。唯一的區(qū)別是,會(huì)將RDD中的數(shù)據(jù)進(jìn)行序列化。這種方式更加節(jié)省內(nèi)存 |
MEMORY_ONLY_SER_2 |
低 |
高 |
是 |
否 |
數(shù)據(jù)序列化,數(shù)據(jù)存2份 |
MEMORY_AND_DISK |
高 |
中等 |
部分 |
部分 |
如果數(shù)據(jù)在內(nèi)存中放不下,則溢寫到磁盤 |
MEMORY_AND_DISK_2 |
高 |
中等 |
部分 |
部分 |
數(shù)據(jù)存2份 |
MEMORY_AND_DISK_SER |
低 |
高 |
部分 |
部分 |
基本含義同MEMORY_AND_DISK。唯一的區(qū)別是,會(huì)將RDD中的數(shù)據(jù)進(jìn)行序列化 |
MEMORY_AND_DISK_SER_2 |
低 |
高 |
部分 |
部分 |
數(shù)據(jù)存2份 |
DISK_ONLY |
低 |
高 |
否 |
是 |
使用未序列化的Java對(duì)象格式,將數(shù)據(jù)全部寫入磁盤文件中。 |
DISK_ONLY_2 |
低 |
高 |
否 |
是 |
數(shù)據(jù)存2份 |
OFF_HEAP |
|
|
|
|
這個(gè)目前是試驗(yàn)型選項(xiàng),類似MEMORY_ONLY_SER,但是數(shù)據(jù)是存儲(chǔ)在堆外內(nèi)存的。 |
對(duì)于上述任意一種持久化策略,如果加上后綴_2,代表的是將每個(gè)持久化的數(shù)據(jù),都復(fù)制一份副本,并將副本保存到其他節(jié)點(diǎn)上。
這種基于副本的持久化機(jī)制主要用于進(jìn)行容錯(cuò)。假如某個(gè)節(jié)點(diǎn)掛掉了,節(jié)點(diǎn)的內(nèi)存或磁盤中的持久化數(shù)據(jù)丟失了,那么后續(xù)對(duì)RDD計(jì)算時(shí)還可以使用該數(shù)據(jù)在其他節(jié)點(diǎn)上的副本。如果沒有副本的話,就只能將這些數(shù)據(jù)從源頭處重新計(jì)算一遍了。
6.RDD的血緣關(guān)系
血緣關(guān)系是指 RDD 之間的依賴關(guān)系。當(dāng)你對(duì)一個(gè) RDD 執(zhí)行轉(zhuǎn)換操作時(shí),Spark 會(huì)生成一個(gè)新的 RDD,并記錄這兩個(gè) RDD 之間的依賴關(guān)系。這種依賴關(guān)系就是血緣關(guān)系。
血緣關(guān)系可以幫助 Spark 在發(fā)生故障時(shí)恢復(fù)數(shù)據(jù)。當(dāng)一個(gè)分區(qū)丟失時(shí),Spark 可以根據(jù)血緣關(guān)系重新計(jì)算丟失的分區(qū),而不需要從頭開始重新計(jì)算整個(gè) RDD。
血緣關(guān)系還可以幫助 Spark 優(yōu)化計(jì)算過程。Spark 可以根據(jù)血緣關(guān)系合并多個(gè)連續(xù)的窄依賴轉(zhuǎn)換,減少數(shù)據(jù)傳輸和通信開銷。
我們可以執(zhí)行toDebugString打印RDD的依賴關(guān)系。
下面是一個(gè)簡(jiǎn)單的例子:
val conf = new SparkConf().setAppName("Lineage Example").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.parallelize(List(1, 2, 3, 4, 5))
val mappedData = data.map(x => x + 1)
val filteredData = mappedData.filter(x => x % 2 == 0)
println(filteredData.toDebugString)
在這個(gè)例子中,我們首先創(chuàng)建了一個(gè)包含 5 個(gè)元素的 RDD,并對(duì)它執(zhí)行了兩個(gè)轉(zhuǎn)換操作:map 和 filter。然后,我們使用 toDebugString 方法打印了最終 RDD 的血緣關(guān)系。
運(yùn)行這段代碼后,你會(huì)看到類似下面的輸出:
(2) MapPartitionsRDD[2] at filter at <console>:26 []
| MapPartitionsRDD[1] at map at <console>:24 []
| ParallelCollectionRDD[0] at parallelize at <console>:22 []
這個(gè)輸出表示最終的 RDD 是通過兩個(gè)轉(zhuǎn)換操作(map 和 filter)從原始的 ParallelCollectionRDD 轉(zhuǎn)換而來的。
六、CheckPoint
CheckPoint可以將RDD從其依賴關(guān)系中抽出來,保存到可靠的存儲(chǔ)系統(tǒng)(例如HDFS,S3等), 即它可以將數(shù)據(jù)和元數(shù)據(jù)保存到檢查指向目錄中。 因此,在程序發(fā)生崩潰的時(shí)候,Spark可以恢復(fù)此數(shù)據(jù),并從停止的任何地方開始。
CheckPoint分為兩類:
- 高可用CheckPoint:容錯(cuò)性優(yōu)先。這種類型的檢查點(diǎn)可確保數(shù)據(jù)永久存儲(chǔ),如存儲(chǔ)在HDFS或其他分布式文件系統(tǒng)上。 這也意味著數(shù)據(jù)通常會(huì)在網(wǎng)絡(luò)中復(fù)制,這會(huì)降低檢查點(diǎn)的運(yùn)行速度。
- 本地CheckPoint:性能優(yōu)先。 RDD持久保存到執(zhí)行程序中的本地文件系統(tǒng)。 因此,數(shù)據(jù)寫得更快,但本地文件系統(tǒng)也不是完全可靠的,一旦數(shù)據(jù)丟失,工作將無法恢復(fù)。
開發(fā)人員可以使用RDD.checkpoint()方法來設(shè)置檢查點(diǎn)。在使用檢查點(diǎn)之前,必須使用SparkContext.setCheckpointDir(directory: String)方法設(shè)置檢查點(diǎn)目錄。
下面是一個(gè)簡(jiǎn)單的例子:
import org.apache.spark.{SparkConf, SparkContext}
object CheckpointExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Checkpoint Example").setMaster("local")
val sc = new SparkContext(conf)
// 設(shè)置 checkpoint 目錄
sc.setCheckpointDir("/tmp/checkpoint")
val data = sc.parallelize(List(1, 2, 3, 4, 5))
val mappedData = data.map(x => x + 1)
val filteredData = mappedData.filter(x => x % 2 == 0)
// 對(duì) RDD 進(jìn)行 checkpoint
filteredData.checkpoint()
// 觸發(fā) checkpoint
filteredData.count()
}
}
RDD的檢查點(diǎn)機(jī)制就好比Hadoop將中間計(jì)算值存儲(chǔ)到磁盤,即使計(jì)算中出現(xiàn)了故障,我們也可以輕松地從中恢復(fù)。通過對(duì) RDD 啟動(dòng)檢查點(diǎn)機(jī)制可以實(shí)現(xiàn)容錯(cuò)和高可用。
Persist VS CheckPoint
- 位置:Persist 和 Cache 只能保存在本地的磁盤和內(nèi)存中(或者堆外內(nèi)存–實(shí)驗(yàn)中),而 Checkpoint 可以保存數(shù)據(jù)到 HDFS 這類可靠的存儲(chǔ)上。
- 生命周期:Cache 和 Persist 的 RDD 會(huì)在程序結(jié)束后會(huì)被清除或者手動(dòng)調(diào)用 unpersist 方法,而 Checkpoint 的 RDD 在程序結(jié)束后依然存在,不會(huì)被刪除。CheckPoint將RDD持久化到HDFS或本地文件夾,如果不被手動(dòng)remove掉,是一直存在的,也就是說可以被下一個(gè)driver使用,而Persist不能被其他dirver使用。
七、Spark-Submit
1.詳細(xì)參數(shù)說明
參數(shù)名 |
參數(shù)說明 |
—master |
master 的地址,提交任務(wù)到哪里執(zhí)行,例如 spark://host:port, yarn, local。具體指可參考下面關(guān)于Master_URL的列表 |
—deploy-mode |
在本地 (client) 啟動(dòng) driver 或在 cluster 上啟動(dòng),默認(rèn)是 client |
—class |
應(yīng)用程序的主類,僅針對(duì) java 或 scala 應(yīng)用 |
—name |
應(yīng)用程序的名稱 |
—jars |
用逗號(hào)分隔的本地 jar 包,設(shè)置后,這些 jar 將包含在 driver 和 executor 的 classpath 下 |
—packages |
包含在driver 和executor 的 classpath 中的 jar 的 maven 坐標(biāo) |
—exclude-packages |
為了避免沖突 而指定不包含的 package |
—repositories |
遠(yuǎn)程 repository |
—conf PROP=VALUE |
指定 spark 配置屬性的值, 例如 -conf spark.executor.extraJavaOptinotallow=”-XX:MaxPermSize=256m” |
—properties-file |
加載的配置文件,默認(rèn)為 conf/spark-defaults.conf |
—driver-memory |
Driver內(nèi)存,默認(rèn) 1G |
—driver-java-options |
傳給 driver 的額外的 Java 選項(xiàng) |
—driver-library-path |
傳給 driver 的額外的庫(kù)路徑 |
—driver-class-path |
傳給 driver 的額外的類路徑 |
—driver-cores |
Driver 的核數(shù),默認(rèn)是1。在 yarn 或者 standalone 下使用 |
—executor-memory |
每個(gè) executor 的內(nèi)存,默認(rèn)是1G |
—total-executor-cores |
所有 executor 總共的核數(shù)。僅僅在 mesos 或者 standalone 下使用 |
—num-executors |
啟動(dòng)的 executor 數(shù)量。默認(rèn)為2。在 yarn 下使用 |
—executor-core |
每個(gè) executor 的核數(shù)。在yarn或者standalone下使用 |
2.Master_URL的值
Master URL |
含義 |
local |
使用1個(gè)worker線程在本地運(yùn)行Spark應(yīng)用程序 |
local[K] |
使用K個(gè)worker線程在本地運(yùn)行Spark應(yīng)用程序 |
local[*] |
使用所有剩余worker線程在本地運(yùn)行Spark應(yīng)用程序 |
spark://HOST:PORT |
連接到Spark Standalone集群,以便在該集群上運(yùn)行Spark應(yīng)用程序 |
mesos://HOST:PORT |
連接到Mesos集群,以便在該集群上運(yùn)行Spark應(yīng)用程序 |
yarn-client |
以client方式連接到Y(jié)ARN集群,集群的定位由環(huán)境變量HADOOP_CONF_DIR定義,該方式driver在client運(yùn)行。 |
yarn-cluster |
以cluster方式連接到Y(jié)ARN集群,集群的定位由環(huán)境變量HADOOP_CONF_DIR定義,該方式driver也在集群中運(yùn)行。 |
八、Spark 共享變量
一般情況下,當(dāng)一個(gè)傳遞給Spark操作(例如map和reduce)的函數(shù)在遠(yuǎn)程節(jié)點(diǎn)上面運(yùn)行時(shí),Spark操作實(shí)際上操作的是這個(gè)函數(shù)所用變量的一個(gè)獨(dú)立副本。
這些變量被復(fù)制到每臺(tái)機(jī)器上,并且這些變量在遠(yuǎn)程機(jī)器上的所有更新都不會(huì)傳遞回驅(qū)動(dòng)程序。通常跨任務(wù)的讀寫變量是低效的,所以,Spark提供了兩種共享變量:「廣播變量(broadcast variable)」和「累加器(accumulator)」。
1.廣播變量
廣播變量允許程序員緩存一個(gè)只讀的變量在每臺(tái)機(jī)器上面,而不是每個(gè)任務(wù)保存一份拷貝。說白了其實(shí)就是共享變量。
如果Executor端用到了Driver的變量,如果不使用廣播變量在Executor有多少task就有多少Driver端的變量副本。如果使用廣播變量在每個(gè)Executor中只有一份Driver端的變量副本。
一個(gè)廣播變量可以通過調(diào)用SparkContext.broadcast(v)方法從一個(gè)初始變量v中創(chuàng)建。廣播變量是v的一個(gè)包裝變量,它的值可以通過value方法訪問,下面的代碼說明了這個(gè)過程:
import org.apache.spark.{SparkConf, SparkContext}
object BroadcastExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Broadcast Example").setMaster("local")
val sc = new SparkContext(conf)
val data = sc.parallelize(List(1, 2, 3, 4, 5))
// 創(chuàng)建一個(gè)廣播變量
val factor = sc.broadcast(2)
// 使用廣播變量
val result = data.map(x => x * factor.value)
result.collect().foreach(println)
}
}
廣播變量創(chuàng)建以后,我們就能夠在集群的任何函數(shù)中使用它來代替變量v,這樣我們就不需要再次傳遞變量v到每個(gè)節(jié)點(diǎn)上。另外,為了保證所有的節(jié)點(diǎn)得到廣播變量具有相同的值,對(duì)象v不能在廣播之后被修改。
2.累加器
累加器是一種只能通過關(guān)聯(lián)操作進(jìn)行“加”操作的變量,因此它能夠高效的應(yīng)用于并行操作中。它們能夠用來實(shí)現(xiàn)counters和sums。
一個(gè)累加器可以通過調(diào)用SparkContext.accumulator(v)方法從一個(gè)初始變量v中創(chuàng)建。運(yùn)行在集群上的任務(wù)可以通過add方法或者使用+=操作來給它加值。然而,它們無法讀取這個(gè)值。只有驅(qū)動(dòng)程序可以使用value方法來讀取累加器的值。
示例代碼如下:
import org.apache.spark.{SparkConf, SparkContext}
object AccumulatorExample {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("AccumulatorExample")
val sc = new SparkContext(conf)
val accum = sc.longAccumulator("My Accumulator")
sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
println(accum.value) // 輸出 10
}
}
這個(gè)示例中,我們創(chuàng)建了一個(gè)名為 My Accumulator 的累加器,并使用 sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)) 來對(duì)其進(jìn)行累加。最后,我們使用 println(accum.value) 來輸出累加器的值,結(jié)果為 10。
我們可以利用子類AccumulatorParam創(chuàng)建自己的累加器類型。AccumulatorParam接口有兩個(gè)方法:zero方法為你的數(shù)據(jù)類型提供一個(gè)“0 值”(zero value),addInPlace方法計(jì)算兩個(gè)值的和。例如,假設(shè)我們有一個(gè)Vector類代表數(shù)學(xué)上的向量,我們能夠如下定義累加器:
object VectorAccumulatorParam extends AccumulatorParam[Vector] {
def zero(initialValue: Vector): Vector = {
Vector.zeros(initialValue.size)
}
def addInPlace(v1: Vector, v2: Vector): Vector = {
v1 += v2
}
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)
九、Spark SQL
Spark為結(jié)構(gòu)化數(shù)據(jù)處理引入了一個(gè)稱為Spark SQL的編程模塊。它提供了一個(gè)稱為DataFrame的編程抽象,并且可以充當(dāng)分布式SQL查詢引擎。
1.Spark SQL的特性
- 集成:無縫地將SQL查詢與Spark程序混合。 Spark SQL允許將結(jié)構(gòu)化數(shù)據(jù)作為Spark中的分布式數(shù)據(jù)集(RDD)進(jìn)行查詢,在Python,Scala和Java中集成了API。這種緊密的集成使得可以輕松地運(yùn)行SQL查詢以及復(fù)雜的分析算法。
- Hive兼容性:在現(xiàn)有倉(cāng)庫(kù)上運(yùn)行未修改的Hive查詢。 Spark SQL重用了Hive前端和MetaStore,提供與現(xiàn)有Hive數(shù)據(jù),查詢和UDF的完全兼容性。只需將其與Hive一起安裝即可。
- 標(biāo)準(zhǔn)連接:通過JDBC或ODBC連接。 Spark SQL包括具有行業(yè)標(biāo)準(zhǔn)JDBC和ODBC連接的服務(wù)器模式。
- 可擴(kuò)展性:對(duì)于交互式查詢和長(zhǎng)查詢使用相同的引擎。 Spark SQL利用RDD模型來支持中查詢?nèi)蒎e(cuò),使其能夠擴(kuò)展到大型作業(yè)。不要擔(dān)心為歷史數(shù)據(jù)使用不同的引擎。
2.Spark SQL 數(shù)據(jù)類型
Spark SQL 支持多種數(shù)據(jù)類型,包括數(shù)字類型、字符串類型、二進(jìn)制類型、布爾類型、日期時(shí)間類型和區(qū)間類型等。
數(shù)字類型包括:
- ByteType:代表一個(gè)字節(jié)的整數(shù),范圍是 -128 到 127¹²。
- ShortType:代表兩個(gè)字節(jié)的整數(shù),范圍是 -32768 到 32767¹²。
- IntegerType:代表四個(gè)字節(jié)的整數(shù),范圍是 -2147483648 到 2147483647¹²。
- LongType:代表八個(gè)字節(jié)的整數(shù),范圍是 -9223372036854775808 到 9223372036854775807¹²。
- FloatType:代表四字節(jié)的單精度浮點(diǎn)數(shù)¹²。
- DoubleType:代表八字節(jié)的雙精度浮點(diǎn)數(shù)¹²。
- DecimalType:代表任意精度的十進(jìn)制數(shù)據(jù),通過內(nèi)部的 java.math.BigDecimal 支持。BigDecimal 由一個(gè)任意精度的整型非標(biāo)度值和一個(gè) 32 位整數(shù)組成¹²。
字符串類型包括:
- StringType:代表字符字符串值。
二進(jìn)制類型包括:
- BinaryType:代表字節(jié)序列值。
布爾類型包括:
- BooleanType:代表布爾值。
日期時(shí)間類型包括:
- TimestampType:代表包含字段年、月、日、時(shí)、分、秒的值,與會(huì)話本地時(shí)區(qū)相關(guān)。時(shí)間戳值表示絕對(duì)時(shí)間點(diǎn)。
- DateType:代表包含字段年、月和日的值,不帶時(shí)區(qū)。
區(qū)間類型包括:
- YearMonthIntervalType (startField, endField):表示由以下字段組成的連續(xù)子集組成的年月間隔:MONTH(月份),YEAR(年份)。
- DayTimeIntervalType (startField, endField):表示由以下字段組成的連續(xù)子集組成的日時(shí)間間隔:SECOND(秒),MINUTE(分鐘),HOUR(小時(shí)),DAY(天)。
復(fù)合類型包括:
- ArrayType (elementType, containsNull):代表由 elementType 類型元素組成的序列值。containsNull 用來指明 ArrayType 中的值是否有 null 值。
- MapType (keyType, valueType, valueContainsNull):表示包括一組鍵值對(duì)的值。通過 keyType 表示 key 數(shù)據(jù)的類型,通過 valueType 表示 value 數(shù)據(jù)的類型。valueContainsNull 用來指明 MapType 中的值是否有 null 值。
- StructType (fields):表示一個(gè)擁有 StructFields (fields) 序列結(jié)構(gòu)的值。
- StructField (name, dataType, nullable):代表 StructType 中的一個(gè)字段,字段的名字通過 name 指定,dataType 指定 field 的數(shù)據(jù)類型,nullable 表示字段的值是否有 null 值。
3.DataFrame
DataFrame 是 Spark 中用于處理結(jié)構(gòu)化數(shù)據(jù)的一種數(shù)據(jù)結(jié)構(gòu)。它類似于關(guān)系數(shù)據(jù)庫(kù)中的表,具有行和列。每一列都有一個(gè)名稱和一個(gè)類型,每一行都是一條記錄。
DataFrame 支持多種數(shù)據(jù)源,包括結(jié)構(gòu)化數(shù)據(jù)文件、Hive 表、外部數(shù)據(jù)庫(kù)和現(xiàn)有的 RDD。它提供了豐富的操作,包括篩選、聚合、分組、排序等。
DataFrame 的優(yōu)點(diǎn)在于它提供了一種高級(jí)的抽象,使得用戶可以使用類似于 SQL 的語言進(jìn)行數(shù)據(jù)處理,而無需關(guān)心底層的實(shí)現(xiàn)細(xì)節(jié)。此外,Spark 會(huì)自動(dòng)對(duì) DataFrame 進(jìn)行優(yōu)化,以提高查詢性能。
下面是一個(gè)使用DataFrame的代碼例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()
import spark.implicits._
val data = Seq(
("Alice", 25),
("Bob", 30),
("Charlie", 35)
)
val df = data.toDF("name", "age")
df.show()
在這個(gè)示例中,我們首先創(chuàng)建了一個(gè) SparkSession 對(duì)象,然后使用 toDF 方法將一個(gè)序列轉(zhuǎn)換為 DataFrame。最后,我們使用 show 方法來顯示 DataFrame 的內(nèi)容。
4.創(chuàng)建 DataFrame
在 Scala 中,可以通過以下幾種方式創(chuàng)建 DataFrame:
從現(xiàn)有的 RDD 轉(zhuǎn)換而來。例如:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Create DataFrame").getOrCreate()
import spark.implicits._
case class Person(name: String, age: Int)
val rdd = spark.sparkContext.parallelize(Seq(Person("Alice", 25), Person("Bob", 30)))
val df = rdd.toDF()
df.show()
從外部數(shù)據(jù)源讀取。例如,從 JSON 文件中讀取數(shù)據(jù)并創(chuàng)建 DataFrame:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Create DataFrame").getOrCreate()
val df = spark.read.json("path/to/json/file")
df.show()
通過編程方式創(chuàng)建。例如,使用 createDataFrame 方法:
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
val spark = SparkSession.builder.appName("Create DataFrame").getOrCreate()
val schema = StructType(
List(
StructField("name", StringType, nullable = true),
StructField("age", IntegerType, nullable = true)
)
)
val data = Seq(Row("Alice", 25), Row("Bob", 30))
val rdd = spark.sparkContext.parallelize(data)
val df = spark.createDataFrame(rdd, schema)
df.show()
5.DSL & SQL
在 Spark 中,可以使用兩種方式對(duì) DataFrame 進(jìn)行查詢:「DSL(Domain-Specific Language)」和「 SQL」。
DSL 是一種特定領(lǐng)域語言,它提供了一組用于操作 DataFrame 的方法。例如,下面是一個(gè)使用 DSL 進(jìn)行查詢的例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("DSL and SQL").getOrCreate()
import spark.implicits._
val df = Seq(
("Alice", 25),
("Bob", 30),
("Charlie", 35)
).toDF("name", "age")
df.select("name", "age")
.filter($"age" > 25)
.show()
SQL 是一種結(jié)構(gòu)化查詢語言,它用于管理關(guān)系數(shù)據(jù)庫(kù)系統(tǒng)。在 Spark 中,可以使用 SQL 對(duì) DataFrame 進(jìn)行查詢。例如,下面是一個(gè)使用 SQL 進(jìn)行查詢的例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("DSL and SQL").getOrCreate()
import spark.implicits._
val df = Seq(
("Alice", 25),
("Bob", 30),
("Charlie", 35)
).toDF("name", "age")
df.createOrReplaceTempView("people")
spark.sql("SELECT name, age FROM people WHERE age > 25").show()
DSL 和 SQL 的區(qū)別在于語法和風(fēng)格。DSL 使用方法調(diào)用鏈來構(gòu)建查詢,而 SQL 使用聲明式語言來描述查詢。選擇哪種方式取決于個(gè)人喜好和使用場(chǎng)景。
6.Spark SQL 數(shù)據(jù)源
Spark SQL 支持多種數(shù)據(jù)源,包括 Parquet、JSON、CSV、JDBC、Hive 等。
下面是示例代碼:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Data Sources Example").getOrCreate()
// Parquet
val df = spark.read.parquet("path/to/parquet/file")
// JSON
val df = spark.read.json("path/to/json/file")
// CSV
val df = spark.read.option("header", "true").csv("path/to/csv/file")
// JDBC
val df = spark.read
.format("jdbc")
.option("url", "jdbc:MySQL://host:port/database")
.option("dbtable", "table")
.option("user", "username")
.option("password", "password")
.load()
df.show()
7.load & save
在 Spark 中,load 函數(shù)用于從外部數(shù)據(jù)源讀取數(shù)據(jù)并創(chuàng)建 DataFrame,而 save 函數(shù)用于將 DataFrame 保存到外部數(shù)據(jù)源。
下面是從 Parquet 文件中讀取數(shù)據(jù)并創(chuàng)建 DataFrame 的示例代碼:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Load and Save Example").getOrCreate()
val df = spark.read.load("path/to/parquet/file")
df.show()
下面是將 DataFrame 保存到 Parquet 文件的示例代碼:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Load and Save Example").getOrCreate()
import spark.implicits._
val df = Seq(
("Alice", 25),
("Bob", 30),
("Charlie", 35)
).toDF("name", "age")
df.write.save("path/to/parquet/file")
8.函數(shù)
Spark SQL 提供了豐富的內(nèi)置函數(shù),包括數(shù)學(xué)函數(shù)、字符串函數(shù)、日期時(shí)間函數(shù)、聚合函數(shù)等。你可以在 Spark SQL 的官方文檔中查看所有可用的內(nèi)置函數(shù)。
此外,Spark SQL 還支持「自定義函數(shù)(User-Defined Function,UDF)」,可以讓用戶編寫自己的函數(shù)并在查詢中使用。
下面是一個(gè)使用 SQL 語法編寫自定義函數(shù)的示例代碼:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf
val spark = SparkSession.builder.appName("UDF Example").getOrCreate()
import spark.implicits._
val df = Seq(
("Alice", 25),
("Bob", 30),
("Charlie", 35)
).toDF("name", "age")
df.createOrReplaceTempView("people")
val square = udf((x: Int) => x * x)
spark.udf.register("square", square)
spark.sql("SELECT name, square(age) FROM people").show()
在這個(gè)示例中,我們首先定義了一個(gè)名為 square 的自定義函數(shù),它接受一個(gè)整數(shù)參數(shù)并返回它的平方。然后,我們使用 createOrReplaceTempView 方法創(chuàng)建一個(gè)臨時(shí)視圖,并使用 udf.register 方法注冊(cè)自定義函數(shù)。
最后,我們使用 spark.sql 方法執(zhí)行 SQL 查詢,并在查詢中調(diào)用自定義函數(shù)。
9.DataSet
DataSet 是 Spark 1.6 版本中引入的一種新的數(shù)據(jù)結(jié)構(gòu),它提供了 RDD 的強(qiáng)類型和 DataFrame 的查詢優(yōu)化能力。
10.創(chuàng)建DataSet
在 Scala 中,可以通過以下幾種方式創(chuàng)建 DataSet:
從現(xiàn)有的 RDD 轉(zhuǎn)換而來。例如:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Create DataSet").getOrCreate()
import spark.implicits._
case class Person(name: String, age: Int)
val rdd = spark.sparkContext.parallelize(Seq(Person("Alice", 25), Person("Bob", 30)))
val ds = rdd.toDS()
ds.show()
從外部數(shù)據(jù)源讀取。例如,從 JSON 文件中讀取數(shù)據(jù)并創(chuàng)建 DataSet:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Create DataSet").getOrCreate()
import spark.implicits._
case class Person(name: String, age: Long)
val ds = spark.read.json("path/to/json/file").as[Person]
ds.show()
通過編程方式創(chuàng)建。例如,使用 createDataset 方法:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Create DataSet").getOrCreate()
import spark.implicits._
case class Person(name: String, age: Int)
val data = Seq(Person("Alice", 25), Person("Bob", 30))
val ds = spark.createDataset(data)
ds.show()
11.DataSet VS DataFrame
DataSet 和 DataFrame 都是 Spark 中用于處理結(jié)構(gòu)化數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu)。它們都提供了豐富的操作,包括篩選、聚合、分組、排序等。
它們之間的主要區(qū)別在于類型安全性。DataFrame 是一種弱類型的數(shù)據(jù)結(jié)構(gòu),它的列只有在運(yùn)行時(shí)才能確定類型。這意味著,在編譯時(shí)無法檢測(cè)到類型錯(cuò)誤,只有在運(yùn)行時(shí)才會(huì)拋出異常。
而 DataSet 是一種強(qiáng)類型的數(shù)據(jù)結(jié)構(gòu),它的類型在編譯時(shí)就已經(jīng)確定。這意味著,如果你試圖對(duì)一個(gè)不存在的列進(jìn)行操作,或者對(duì)一個(gè)列進(jìn)行錯(cuò)誤的類型轉(zhuǎn)換,編譯器就會(huì)報(bào)錯(cuò)。
此外,DataSet 還提供了一些額外的操作,例如 map、flatMap、reduce 等。
12.RDD & DataFrame & Dataset 轉(zhuǎn)化
RDD、DataFrame、Dataset三者有許多共性,有各自適用的場(chǎng)景常常需要在三者之間轉(zhuǎn)換。
DataFrame/Dataset 轉(zhuǎn) RDD:
val rdd1=testDF.rdd
val rdd2=testDS.rdd
RDD 轉(zhuǎn) DataSet:
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型
val testDS = rdd.map {line=>
Coltest(line._1,line._2)
}.toDS
可以注意到,定義每一行的類型(case class)時(shí),已經(jīng)給出了字段名和類型,后面只要往case class里面添加值即可。
Dataset 轉(zhuǎn) DataFrame:
import spark.implicits._
val testDF = testDS.toDF
DataFrame 轉(zhuǎn) Dataset:
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型
val testDS = testDF.as[Coltest]
這種方法就是在給出每一列的類型后,使用as方法,轉(zhuǎn)成Dataset,這在數(shù)據(jù)類型在DataFrame需要針對(duì)各個(gè)字段處理時(shí)極為方便。
注意:在使用一些特殊的操作時(shí),一定要加上 import spark.implicits._ 不然toDF、toDS無法使用。
十、Spark Streaming
Spark Streaming 的工作原理是將實(shí)時(shí)數(shù)據(jù)流拆分為小批量數(shù)據(jù),并使用 Spark 引擎對(duì)這些小批量數(shù)據(jù)進(jìn)行處理。這種微批處理(Micro-Batch Processing)的方式使得 Spark Streaming 能夠以近乎實(shí)時(shí)的延遲處理大規(guī)模的數(shù)據(jù)流。
下面是一個(gè)簡(jiǎn)單的 Spark Streaming 示例代碼:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("Spark Streaming Example")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
我們首先創(chuàng)建了一個(gè) StreamingContext 對(duì)象,并指定了批處理間隔為 1 秒。然后,我們使用 socketTextStream 方法從套接字源創(chuàng)建了一個(gè) DStream。接下來,我們對(duì) DStream 進(jìn)行了一系列操作,包括 flatMap、map 和 reduceByKey。最后,我們使用 print 方法打印出單詞計(jì)數(shù)的結(jié)果。
1.Spark Streaming 優(yōu)缺點(diǎn)
Spark Streaming 作為一種實(shí)時(shí)流處理框架,具有以下優(yōu)點(diǎn):
- 高性能:Spark Streaming 基于 Spark 引擎,能夠快速處理大規(guī)模的數(shù)據(jù)流。
- 易用性:Spark Streaming 提供了豐富的 API,可以讓開發(fā)人員快速構(gòu)建實(shí)時(shí)流處理應(yīng)用。
- 容錯(cuò)性:Spark Streaming 具有良好的容錯(cuò)性,能夠在節(jié)點(diǎn)故障時(shí)自動(dòng)恢復(fù)。
- 集成性:Spark Streaming 能夠與 Spark 生態(tài)系統(tǒng)中的其他組件(如 Spark SQL、MLlib 等)無縫集成。
但是,Spark Streaming 也有一些缺點(diǎn):
- 延遲:由于 Spark Streaming 基于微批處理模型,因此它的延遲相對(duì)較高。對(duì)于需要極低延遲的應(yīng)用場(chǎng)景,Spark Streaming 可能不是最佳選擇。
- 復(fù)雜性:Spark Streaming 的配置和調(diào)優(yōu)相對(duì)復(fù)雜,需要一定的經(jīng)驗(yàn)和技能。
2.DStream
DStream(離散化流)是 Spark Streaming 中用于表示實(shí)時(shí)數(shù)據(jù)流的一種抽象。它由一系列連續(xù)的 RDD 組成,每個(gè) RDD 包含一段時(shí)間內(nèi)收集到的數(shù)據(jù)。
在 Spark Streaming 中,可以通過以下幾種方式創(chuàng)建 DStream:
(1) 從輸入源創(chuàng)建。例如,從套接字源創(chuàng)建 DStream:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("DStream Example")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
lines.print()
ssc.start()
ssc.awaitTermination()
(2) 通過轉(zhuǎn)換操作創(chuàng)建。例如,對(duì)現(xiàn)有的 DStream 進(jìn)行 map 操作:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("DStream Example")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
words.print()
ssc.start()
ssc.awaitTermination()
(3) 通過連接操作創(chuàng)建。例如,對(duì)兩個(gè) DStream 進(jìn)行 union 操作:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("DStream Example")
val ssc = new StreamingContext(conf, Seconds(1))
val lines1 = ssc.socketTextStream("localhost", 9999)
val lines2 = ssc.socketTextStream("localhost", 9998)
val lines = lines1.union(lines2)
lines.print()
ssc.start()
ssc.awaitTermination()
總結(jié):簡(jiǎn)單來說 DStream 就是對(duì) RDD 的封裝,你對(duì) DStream 進(jìn)行操作,就是對(duì) RDD 進(jìn)行操作。對(duì)于 DataFrame/DataSet/DStream 來說本質(zhì)上都可以理解成 RDD。
3.窗口函數(shù)
在 Spark Streaming 中,窗口函數(shù)用于對(duì) DStream 中的數(shù)據(jù)進(jìn)行窗口化處理。它允許你對(duì)一段時(shí)間內(nèi)的數(shù)據(jù)進(jìn)行聚合操作。
Spark Streaming 提供了多種窗口函數(shù),包括:
- window:返回一個(gè)新的 DStream,它包含了原始 DStream 中指定窗口大小和滑動(dòng)間隔的數(shù)據(jù)。
- countByWindow:返回一個(gè)新的單元素 DStream,它包含了原始 DStream 中指定窗口大小和滑動(dòng)間隔的元素個(gè)數(shù)。
- reduceByWindow:返回一個(gè)新的 DStream,它包含了原始 DStream 中指定窗口大小和滑動(dòng)間隔的元素經(jīng)過 reduce 函數(shù)處理后的結(jié)果。
- reduceByKeyAndWindow:類似于 reduceByWindow,但是在進(jìn)行 reduce 操作之前會(huì)先按照 key 進(jìn)行分組。
下面是一個(gè)使用窗口函數(shù)的示例代碼:
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
val conf = new SparkConf().setAppName("Window Example")
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(30), Seconds(10))
wordCounts.print()
ssc.start()
ssc.awaitTermination()
在這個(gè)示例中,我們首先創(chuàng)建了一個(gè) DStream,并對(duì)其進(jìn)行了一系列轉(zhuǎn)換操作。然后,我們使用 reduceByKeyAndWindow 函數(shù)對(duì) DStream 進(jìn)行窗口化處理,指定了窗口大小為 30 秒,滑動(dòng)間隔為 10 秒。最后,我們使用 print 方法打印出單詞計(jì)數(shù)的結(jié)果。
4.輸出操作
Spark Streaming允許DStream的數(shù)據(jù)輸出到外部系統(tǒng),如數(shù)據(jù)庫(kù)或文件系統(tǒng),輸出的數(shù)據(jù)可以被外部系統(tǒng)所使用,該操作類似于RDD的輸出操作。Spark Streaming支持以下輸出操作:
- **print() **: 打印DStream中每個(gè)RDD的前10個(gè)元素到控制臺(tái)。
- **saveAsTextFiles(prefix, [suffix] **: 將此DStream中每個(gè)RDD的所有元素以文本文件的形式保存。每個(gè)批次的數(shù)據(jù)都會(huì)保存在一個(gè)單獨(dú)的目錄中,目錄名為:prefix-TIME_IN_MS[.suffix]。
- **saveAsObjectFiles(prefix, [suffix])**: 將此DStream中每個(gè)RDD的所有元素以Java對(duì)象序列化的形式保存。每個(gè)批次的數(shù)據(jù)都會(huì)保存在一個(gè)單獨(dú)的目錄中,目錄名為:prefix-TIME_IN_MS[.suffix]。
- **saveAsHadoopFiles(prefix, [suffix])**:將此DStream中每個(gè)RDD的所有元素以Hadoop文件(SequenceFile等)的形式保存。每個(gè)批次的數(shù)據(jù)都會(huì)保存在一個(gè)單獨(dú)的目錄中,目錄名為:prefix-TIME_IN_MS[.suffix]。
- **foreachRDD(func)**:最通用的輸出操作,將函數(shù)func應(yīng)用于DStream中生成的每個(gè)RDD。通過此函數(shù),可以將數(shù)據(jù)寫入任何支持寫入操作的數(shù)據(jù)源。
十一、Structured Streaming
Structured Streaming 是 Spark 2.0 版本中引入的一種新的流處理引擎。它基于 Spark SQL 引擎,提供了一種聲明式的 API 來處理結(jié)構(gòu)化數(shù)據(jù)流。
與 Spark Streaming 相比,Structured Streaming 具有以下優(yōu)點(diǎn):
- 易用性:Structured Streaming 提供了與 Spark SQL 相同的 API,可以讓開發(fā)人員快速構(gòu)建流處理應(yīng)用。
- 高性能:Structured Streaming 基于 Spark SQL 引擎,能夠快速處理大規(guī)模的數(shù)據(jù)流。
- 容錯(cuò)性:Structured Streaming 具有良好的容錯(cuò)性,能夠在節(jié)點(diǎn)故障時(shí)自動(dòng)恢復(fù)。
- 端到端一致性:Structured Streaming 提供了端到端一致性保證,能夠確保數(shù)據(jù)不丟失、不重復(fù)。
下面是一個(gè)簡(jiǎn)單的 Structured Streaming 示例代碼:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
import spark.implicits._
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
在這個(gè)示例中,我們首先創(chuàng)建了一個(gè) SparkSession 對(duì)象。然后,我們使用 readStream 方法從套接字源創(chuàng)建了一個(gè) DataFrame。接下來,我們對(duì) DataFrame 進(jìn)行了一系列操作,包括 flatMap、groupBy 和 count。最后,我們使用 writeStream 方法將結(jié)果輸出到控制臺(tái)。
Structured Streaming 同樣支持 DSL 和 SQL 語法。
DSL 語法:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
import spark.implicits._
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
SQL 語法:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
lines.createOrReplaceTempView("lines")
val wordCounts = spark.sql(
"""
|SELECT value, COUNT(*) as count
|FROM (
| SELECT explode(split(value, ' ')) as value
| FROM lines
|)
|GROUP BY value
""".stripMargin)
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
1.Source
Structured Streaming 支持多種輸入源,包括文件源(如文本文件、Parquet 文件、JSON 文件等)、Kafka、Socket 等。下面是一個(gè)使用 Scala 語言從 Kafka 中讀取數(shù)據(jù)的例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()
// 訂閱一個(gè)主題
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
2.Output
Structured Streaming 支持多種輸出方式,包括控制臺(tái)輸出、內(nèi)存輸出、文件輸出、數(shù)據(jù)源輸出等。下面是將數(shù)據(jù)寫入到 Parquet 文件中的例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()
// 從 socket 中讀取數(shù)據(jù)
val lines = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// 將數(shù)據(jù)寫入到 Parquet 文件中
lines.writeStream
.format("parquet")
.option("path", "path/to/output/dir")
.option("checkpointLocation", "path/to/checkpoint/dir")
.start()
3.Output Mode
每當(dāng)結(jié)果表更新時(shí),我們都希望將更改后的結(jié)果行寫入外部接收器。
Output mode 指定了數(shù)據(jù)寫入輸出接收器的方式。Structured Streaming 支持以下三種 output mode:
Output Mode |
描述 |
Append |
只將流 DataFrame/Dataset 中的新行寫入接收器。 |
Complete |
每當(dāng)有更新時(shí),將流 DataFrame/Dataset 中的所有行寫入接收器。 |
Update |
每當(dāng)有更新時(shí),只將流 DataFrame/Dataset 中更新的行寫入接收器。 |
4.Output Sink
Output sink 指定了數(shù)據(jù)寫入的位置。Structured Streaming 支持多種輸出接收器,包括文件接收器、Kafka 接收器、Foreach 接收器、控制臺(tái)接收器和內(nèi)存接收器等。下面是一些使用 Scala 語言將數(shù)據(jù)寫入到不同輸出接收器中的例子:
import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()
// 從 socket 中讀取數(shù)據(jù)
val lines = spark
.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// 將數(shù)據(jù)寫入到 Parquet 文件中
lines.writeStream
.format("parquet")
.option("path", "path/to/output/dir")
.option("checkpointLocation", "path/to/checkpoint/dir")
.start()
// 將數(shù)據(jù)寫入到 Kafka 中
//selectExpr 是一個(gè) DataFrame 的轉(zhuǎn)換操作,它允許你使用 SQL 表達(dá)式來選擇 DataFrame 中的列。
//selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 表示選擇 key 和 value 列,并將它們的類型轉(zhuǎn)換為字符串類型。
//這是因?yàn)?Kafka 接收器要求數(shù)據(jù)必須是字符串類型或二進(jìn)制類型。
lines.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()
// 將數(shù)據(jù)寫入到控制臺(tái)中
lines.writeStream
.format("console")
.start()
// 將數(shù)據(jù)寫入到內(nèi)存中
lines.writeStream
.format("memory")
.queryName("tableName")
.start()
5.PV,UV統(tǒng)計(jì)
下面是用Structured Streaming實(shí)現(xiàn)PV,UV統(tǒng)計(jì)的例子,我們來感受實(shí)戰(zhàn)下:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object PVUVExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder.appName("PVUVExample").getOrCreate()
import spark.implicits._
// 假設(shè)我們有一個(gè)包含用戶ID和訪問的URL的輸入流
val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
val data = lines.as[String].map(line => {
val parts = line.split(",")
(parts(0), parts(1))
}).toDF("user", "url")
// 計(jì)算PV
val pv = data.groupBy("url").count().withColumnRenamed("count", "pv")
val pvQuery = pv.writeStream.outputMode("complete").format("console").start()
// 計(jì)算UV
val uv = data.dropDuplicates().groupBy("url").count().withColumnRenamed("count", "uv")
val uvQuery = uv.writeStream.outputMode("complete").format("console").start()
pvQuery.awaitTermination()
uvQuery.awaitTermination()
}
}
這段代碼演示了如何使用Structured Streaming對(duì)數(shù)據(jù)進(jìn)行PV和UV統(tǒng)計(jì)。它首先從一個(gè)socket源讀取數(shù)據(jù),然后使用groupBy和count對(duì)數(shù)據(jù)進(jìn)行PV統(tǒng)計(jì),最后使用dropDuplicates、groupBy和count對(duì)數(shù)據(jù)進(jìn)行UV統(tǒng)計(jì)。
假設(shè)我們?cè)诒镜貑?dòng)了一個(gè)socket服務(wù)器,并向其發(fā)送以下數(shù)據(jù):
user1,http://example.com/page1
user2,http://example.com/page1
user1,http://example.com/page2
user3,http://example.com/page1
user2,http://example.com/page2
user3,http://example.com/page2
那么程序?qū)⑤敵鲆韵陆Y(jié)果:
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---+
| url| pv|
+--------------------+---+
|http://example.co...| 3|
|http://example.co...| 3|
+--------------------+---+
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---+
| url| uv|
+--------------------+---+
|http://example.co...| 2|
|http://example.co...| 3|
+--------------------+---+
總結(jié)
在此,我們對(duì)Spark的基本概念、使用方式以及部分原理進(jìn)行了簡(jiǎn)單的介紹。Spark以其強(qiáng)大的處理能力和靈活性,已經(jīng)成為大數(shù)據(jù)處理領(lǐng)域的一個(gè)重要工具。然而,這只是冰山一角。Spark的世界里還有許多深度和廣度等待著我們?nèi)ヌ剿鳌?/p>
作為初學(xué)者,你可能會(huì)覺得這個(gè)領(lǐng)域龐大且復(fù)雜。但請(qǐng)記住,每個(gè)都是從初學(xué)者開始的。不斷的學(xué)習(xí)和實(shí)踐,你將能夠更好的理解和掌握Spark,并將其應(yīng)用于解決實(shí)際問題。這篇文章可能不能涵蓋所有的知識(shí)點(diǎn),但我希望它能帶給你收獲和思考。