日日操夜夜添-日日操影院-日日草夜夜操-日日干干-精品一区二区三区波多野结衣-精品一区二区三区高清免费不卡

公告:魔扣目錄網(wǎng)為廣大站長(zhǎng)提供免費(fèi)收錄網(wǎng)站服務(wù),提交前請(qǐng)做好本站友鏈:【 網(wǎng)站目錄:http://www.ylptlb.cn 】, 免友鏈快審服務(wù)(50元/站),

點(diǎn)擊這里在線咨詢客服
新站提交
  • 網(wǎng)站:51998
  • 待審:31
  • 小程序:12
  • 文章:1030137
  • 會(huì)員:747

在這個(gè)數(shù)據(jù)驅(qū)動(dòng)的時(shí)代,信息的處理和分析變得越來越重要。而在眾多的大數(shù)據(jù)處理框架中,「Apache Spark」以其獨(dú)特的優(yōu)勢(shì)脫穎而出。

本篇文章,我們將一起走進(jìn)Spark的世界,探索并理解其相關(guān)的基礎(chǔ)概念和使用方法。本文主要目標(biāo)是讓初學(xué)者能夠?qū)park有一個(gè)全面的認(rèn)識(shí),并能實(shí)際應(yīng)用到各類問題的解決之中。

Spark入門指南:從基礎(chǔ)概念到實(shí)踐應(yīng)用全解析

一、Spark是什么

學(xué)習(xí)一個(gè)東西之前先要知道這個(gè)東西是什么。

Spark 是一個(gè)開源的大數(shù)據(jù)處理引擎,它提供了一整套開發(fā) API,包括流計(jì)算和機(jī)器學(xué)習(xí)。它支持批處理和流處理。

Spark 的一個(gè)顯著特點(diǎn)是它能夠在內(nèi)存中進(jìn)行迭代計(jì)算,從而加快數(shù)據(jù)處理速度。盡管 Spark 是用 Scala 開發(fā)的,但它也為 JAVA、Scala、Python/ target=_blank class=infotextkey>Python 和 R 等高級(jí)編程語言提供了開發(fā)接口。

1.Spark組件

Spark提供了6大核心組件:

  • Spark Core
  • Spark SQL
  • Spark Streaming
  • Spark MLlib
  • Spark GraphX

Spark入門指南:從基礎(chǔ)概念到實(shí)踐應(yīng)用全解析

(1) Spark Core

Spark Core 是 Spark 的基礎(chǔ),它提供了內(nèi)存計(jì)算的能力,是分布式處理大數(shù)據(jù)集的基礎(chǔ)。它將分布式數(shù)據(jù)抽象為彈性分布式數(shù)據(jù)集(RDD),并為運(yùn)行在其上的上層組件提供 API。所有 Spark 的上層組件都建立在 Spark Core 的基礎(chǔ)之上。

(2) Spark SQL

Spark SQL 是一個(gè)用于處理結(jié)構(gòu)化數(shù)據(jù)的 Spark 組件。它允許使用 SQL 語句查詢數(shù)據(jù)。Spark 支持多種數(shù)據(jù)源,包括 Hive 表、Parquet 和 JSON 等。

(3) Spark Streaming

Spark Streaming 是一個(gè)用于處理動(dòng)態(tài)數(shù)據(jù)流的 Spark 組件。它能夠開發(fā)出強(qiáng)大的交互和數(shù)據(jù)查詢程序。在處理動(dòng)態(tài)數(shù)據(jù)流時(shí),流數(shù)據(jù)會(huì)被分割成微小的批處理,這些微小批處理將會(huì)在 Spark Core 上按時(shí)間順序快速執(zhí)行。

(4) Spark MLlib

Spark MLlib 是 Spark 的機(jī)器學(xué)習(xí)庫(kù)。它提供了常用的機(jī)器學(xué)習(xí)算法和實(shí)用程序,包括分類、回歸、聚類、協(xié)同過濾、降維等。MLlib 還提供了一些底層優(yōu)化原語和高層流水線 API,可以幫助開發(fā)人員更快地創(chuàng)建和調(diào)試機(jī)器學(xué)習(xí)流水線。

(5) Spark GraphX

Spark GraphX 是 Spark 的圖形計(jì)算庫(kù)。它提供了一種分布式圖形處理框架,可以幫助開發(fā)人員更快地構(gòu)建和分析大型圖形。

2.Spark的優(yōu)勢(shì)

Spark 有許多優(yōu)勢(shì),其中一些主要優(yōu)勢(shì)包括:

  • 速度:Spark 基于內(nèi)存計(jì)算,能夠比基于磁盤的計(jì)算快很多。對(duì)于迭代式算法和交互式數(shù)據(jù)挖掘任務(wù),這種速度優(yōu)勢(shì)尤為明顯。
  • 易用性:Spark 支持多種語言,包括 Java、Scala、Python 和 R。它提供了豐富的內(nèi)置 API,可以幫助開發(fā)人員更快地構(gòu)建和運(yùn)行應(yīng)用程序。
  • 通用性:Spark 提供了多種組件,可以支持不同類型的計(jì)算任務(wù),包括批處理、交互式查詢、流處理、機(jī)器學(xué)習(xí)和圖形處理等。
  • 兼容性:Spark 可以與多種數(shù)據(jù)源集成,包括 Hadoop 分布式文件系統(tǒng)(HDFS)、Apache Cassandra、Apache HBase 和 Amazon S3 等。
  • 容錯(cuò)性:Spark 提供了彈性分布式數(shù)據(jù)集(RDD)抽象,可以幫助開發(fā)人員更快地構(gòu)建容錯(cuò)應(yīng)用程序。

3.word Count

上手寫一個(gè)簡(jiǎn)單的代碼例子,下面是一個(gè)Word Count的Spark程序:

import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCount {
  def mAIn (args:Array [String]): Unit = {
    //setMaster("local[9]") 表示在本地運(yùn)行 Spark 程序,使用 9 個(gè)線程。local[*] 表示使用所有可用的處理器核心。
   //這種模式通常用于本地測(cè)試和開發(fā)。
    val conf = new SparkConf ().setAppName ("Word Count").setMaster("local[9]");
    val sc = new SparkContext (conf);
    sc.setLogLevel("ERROR")

    val data = List("Hello World", "Hello Spark")
    val textFile = sc.parallelize(data)
    val wordCounts = textFile.flatMap (line => line.split (" ")).map (
      word => (word, 1)).reduceByKey ( (a, b) => a + b)
    wordCounts.collect().foreach(println)
  }
}

輸出:
(Hello,2)
(World,1)
(Spark,1)

程序首先創(chuàng)建了一個(gè) SparkConf 對(duì)象,用來設(shè)置應(yīng)用程序名稱和運(yùn)行模式。然后,它創(chuàng)建了一個(gè) SparkContext 對(duì)象,用來連接到 Spark 集群。

接下來,程序創(chuàng)建了一個(gè)包含兩個(gè)字符串的列表,并使用 parallelize 方法將其轉(zhuǎn)換為一個(gè) RDD。然后,它使用 flatMap 方法將每一行文本拆分成單詞,并使用 map 方法將每個(gè)單詞映射為一個(gè)鍵值對(duì)(key-value pair),其中鍵是單詞,值是 1。

最后,程序使用 reduceByKey 方法將具有相同鍵的鍵值對(duì)進(jìn)行合并,并對(duì)它們的值進(jìn)行求和。最終結(jié)果是一個(gè)包含每個(gè)單詞及其出現(xiàn)次數(shù)的 RDD。程序使用 collect 方法將結(jié)果收集到驅(qū)動(dòng)程序,并使用 foreach 方法打印出來。

二、Spark基本概念

Spark的理論較多,為了更有效地學(xué)習(xí)Spark,首先來理解下其基本概念。

1.Application

Application指的就是用戶編寫的Spark應(yīng)用程序。

如下,"Word Count"就是該應(yīng)用程序的名字。

import org.apache.spark.sql.SparkSession

object WordCount {
  def main(args: Array[String]) {
    // 創(chuàng)建 SparkSession 對(duì)象,它是 Spark Application 的入口
    val spark = SparkSession.builder.appName("Word Count").getOrCreate()
    // 讀取文本文件并創(chuàng)建 Dataset
    val textFile = spark.read.textFile("hdfs://...")
    // 使用 flatMap 轉(zhuǎn)換將文本分割為單詞,并使用 reduceByKey 轉(zhuǎn)換計(jì)算每個(gè)單詞的數(shù)量
    val counts = textFile.flatMap(line => line.split(" "))
                 .groupByKey(identity)
                 .count()
    // 將結(jié)果保存到文本文件中
    counts.write.text("hdfs://...")
    // 停止 SparkSession
    spark.stop()
  }
}

2.Driver

Driver 是運(yùn)行 Spark Application 的進(jìn)程,它負(fù)責(zé)創(chuàng)建 SparkSession 和 SparkContext 對(duì)象,并將代碼轉(zhuǎn)換和操作。

它還負(fù)責(zé)創(chuàng)建邏輯和物理計(jì)劃,并與集群管理器協(xié)調(diào)調(diào)度任務(wù)。

簡(jiǎn)而言之,Spark Application 是使用 Spark API 編寫的程序,而 Spark Driver 是負(fù)責(zé)運(yùn)行該程序并與集群管理器協(xié)調(diào)的進(jìn)程。

可以將Driver 理解為運(yùn)行 Spark Application main 方法的進(jìn)程。

driver的內(nèi)存大小可以進(jìn)行設(shè)置,配置如下:

# 設(shè)置 driver內(nèi)存大小
driver-memory 1024m

3.Master & Worker

在Spark中,Master是獨(dú)立集群的控制者,而Worker是工作者。

一個(gè)Spark獨(dú)立集群需要啟動(dòng)一個(gè)Master和多個(gè)Worker。Worker就是物理節(jié)點(diǎn),Worker上面可以啟動(dòng)Executor進(jìn)程。

4.Executor

在每個(gè)Worker上為某應(yīng)用啟動(dòng)的一個(gè)進(jìn)程,該進(jìn)程負(fù)責(zé)運(yùn)行Task,并且負(fù)責(zé)將數(shù)據(jù)存在內(nèi)存或者磁盤上。

每個(gè)任務(wù)都有各自獨(dú)立的Executor。Executor是一個(gè)執(zhí)行Task的容器。實(shí)際上它是一組計(jì)算資源(cpu核心、memory)的集合。

一個(gè)Worker節(jié)點(diǎn)可以有多個(gè)Executor。一個(gè)Executor可以運(yùn)行多個(gè)Task。

Executor創(chuàng)建成功后,在日志文件會(huì)顯示如下信息:

INFO Executor: Starting executor ID [executorId] on host [executorHostname]

5.RDD

RDD(Resilient Distributed Dataset)叫做彈性分布式數(shù)據(jù)集,是Spark中最基本的數(shù)據(jù)抽象,它代表一個(gè)不可變、可分區(qū)、里面的元素可并行計(jì)算的集合。

RDD的 Partition 是指數(shù)據(jù)集的分區(qū)。它是數(shù)據(jù)集中元素的集合,這些元素被分區(qū)到集群的節(jié)點(diǎn)上,可以并行操作。對(duì)于RDD來說,每個(gè)分片都會(huì)被一個(gè)計(jì)算任務(wù)處理,并決定并行計(jì)算的粒度。用戶可以在創(chuàng)建RDD時(shí)指定RDD的分片個(gè)數(shù),如果沒有指定,那么就會(huì)采用默認(rèn)值。默認(rèn)值就是程序所分配到的CPU Core的數(shù)目。

一個(gè)函數(shù)會(huì)被作用在每一個(gè)分區(qū)。Spark 中 RDD 的計(jì)算是以分片為單位的,compute 函數(shù)會(huì)被作用到每個(gè)分區(qū)上。

RDD的每次轉(zhuǎn)換都會(huì)生成一個(gè)新的RDD,所以RDD之間就會(huì)形成類似于流水線一樣的前后依賴關(guān)系。在部分分區(qū)數(shù)據(jù)丟失時(shí),Spark可以通過這個(gè)依賴關(guān)系重新計(jì)算丟失的分區(qū)數(shù)據(jù),而不是對(duì)RDD的所有分區(qū)進(jìn)行重新計(jì)算。

6.Job

一個(gè)Job包含多個(gè)RDD及作用于相應(yīng)RDD上的各種操作,每個(gè)Action的觸發(fā)就會(huì)生成一個(gè)job。用戶提交的Job會(huì)提交給DAG Scheduler,Job會(huì)被分解成Stage,Stage會(huì)被細(xì)化成Task。

7.Task

被發(fā)送到Executor上的工作單元。每個(gè)Task負(fù)責(zé)計(jì)算一個(gè)分區(qū)的數(shù)據(jù)。

8.Stage

在 Spark 中,一個(gè)作業(yè)(Job)會(huì)被劃分為多個(gè)階段(Stage)。同一個(gè) Stage 可以有多個(gè) Task 并行執(zhí)行(Task 數(shù)=分區(qū)數(shù))。

階段之間的劃分是根據(jù)數(shù)據(jù)的依賴關(guān)系來確定的。當(dāng)一個(gè) RDD 的分區(qū)依賴于另一個(gè) RDD 的分區(qū)時(shí),這兩個(gè) RDD 就屬于同一個(gè)階段。當(dāng)一個(gè) RDD 的分區(qū)依賴于多個(gè) RDD 的分區(qū)時(shí),這些 RDD 就屬于不同的階段。

Spark入門指南:從基礎(chǔ)概念到實(shí)踐應(yīng)用全解析

上圖中,Stage表示一個(gè)可以順滑完成的階段。曲線表示 Shuffle 過程。

如果Stage能夠復(fù)用前面的Stage的話,那么會(huì)顯示灰色。

Spark入門指南:從基礎(chǔ)概念到實(shí)踐應(yīng)用全解析

9.Shuffle

在 Spark 中,Shuffle 是指在不同階段之間重新分配數(shù)據(jù)的過程。它通常發(fā)生在需要對(duì)數(shù)據(jù)進(jìn)行聚合或分組操作的時(shí)候,例如 reduceByKey 或 groupByKey 等操作。

在 Shuffle 過程中,Spark 會(huì)將數(shù)據(jù)按照鍵值進(jìn)行分區(qū),并將屬于同一分區(qū)的數(shù)據(jù)發(fā)送到同一個(gè)計(jì)算節(jié)點(diǎn)上。這樣,每個(gè)計(jì)算節(jié)點(diǎn)就可以獨(dú)立地處理屬于它自己分區(qū)的數(shù)據(jù)。

10.Stage的劃分

Stage的劃分,簡(jiǎn)單來說是以寬依賴來劃分的。

對(duì)于窄依賴,Partition 的轉(zhuǎn)換處理在 Stage 中完成計(jì)算,不劃分(將窄依賴盡量放在在同一個(gè) Stage 中,可以實(shí)現(xiàn)流水線計(jì)算)。

對(duì)于寬依賴,由于有 Shuffle 的存在,只能在父 RDD 處理完成后,才能開始接下來的計(jì)算,也就是說需要?jiǎng)澐?Stage。

Spark 會(huì)根據(jù) Shuffle/寬依賴 使用回溯算法來對(duì) DAG 進(jìn)行 Stage 劃分,從后往前,遇到寬依賴就斷開,遇到窄依賴就把當(dāng)前的 RDD 加入到當(dāng)前的 Stage 階段中。

至于什么是窄依賴和寬依賴,下文馬上就會(huì)提及。

11.窄依賴 & 寬依賴

(1) 窄依賴

父 RDD 的一個(gè)分區(qū)只會(huì)被子 RDD 的一個(gè)分區(qū)依賴。比如:map,filter和union,這種依賴稱之為「窄依賴」。

Spark入門指南:從基礎(chǔ)概念到實(shí)踐應(yīng)用全解析

窄依賴的多個(gè)分區(qū)可以并行計(jì)算,并且窄依賴的一個(gè)分區(qū)的數(shù)據(jù)如果丟失只需要重新計(jì)算對(duì)應(yīng)的分區(qū)的數(shù)據(jù)就可以了。

(2) 寬依賴

指子RDD的分區(qū)依賴于父RDD的所有分區(qū),稱之為「寬依賴」。

Spark入門指南:從基礎(chǔ)概念到實(shí)踐應(yīng)用全解析

對(duì)于寬依賴,必須等到上一階段計(jì)算完成才能計(jì)算下一階段。

12.DAG

有向無環(huán)圖,其實(shí)說白了就是RDD之間的依賴關(guān)系圖。

  • 開始:通過 SparkContext 創(chuàng)建的 RDD。
  • 結(jié)束:觸發(fā) Action,一旦觸發(fā) Action 就形成了一個(gè)完整的 DAG(有幾個(gè) Action,就有幾個(gè) DAG)。

三、Spark執(zhí)行流程

Spark的執(zhí)行流程大致如下:

  • 構(gòu)建Spark Application的運(yùn)行環(huán)境(啟動(dòng)SparkContext),SparkContext向資源管理器(可以是Standalone、Mesos或YARN)注冊(cè)并申請(qǐng)運(yùn)行Executor資源。
  • 資源管理器為Executor分配資源并啟動(dòng)Executor進(jìn)程,Executor運(yùn)行情況將隨著“心跳”發(fā)送到資源管理器上。
  • SparkContext構(gòu)建DAG圖,將DAG圖分解成多個(gè)Stage,并把每個(gè)Stage的TaskSet(任務(wù)集)發(fā)送給Task Scheduler (任務(wù)調(diào)度器)。
  • Executor向SparkContext申請(qǐng)Task, Task Scheduler將Task發(fā)放給Executor,同時(shí),SparkContext將應(yīng)用程序代碼發(fā)放給Executor。
  • Task在Executor上運(yùn)行,把執(zhí)行結(jié)果反饋給Task Scheduler,然后再反饋給DAG Scheduler。
  • 當(dāng)一個(gè)階段完成后,Spark 會(huì)根據(jù)數(shù)據(jù)依賴關(guān)系將結(jié)果傳輸給下一個(gè)階段,并開始執(zhí)行下一個(gè)階段的任務(wù)。
  • 最后,當(dāng)所有階段都完成后,Spark 會(huì)將最終結(jié)果返回給驅(qū)動(dòng)程序,并完成作業(yè)的執(zhí)行。

四、Spark運(yùn)行模式

Spark 支持多種運(yùn)行模式,包括本地模式、獨(dú)立模式、Mesos 模式、YARN 模式和 Kube.NETes 模式。

  • 本地模式:在本地模式下,Spark 應(yīng)用程序會(huì)在單個(gè)機(jī)器上運(yùn)行,不需要連接到集群。這種模式適用于開發(fā)和測(cè)試,但不適用于生產(chǎn)環(huán)境。
  • 獨(dú)立模式:在獨(dú)立模式下,Spark 應(yīng)用程序會(huì)連接到一個(gè)獨(dú)立的 Spark 集群,并在集群中運(yùn)行。這種模式適用于小型集群,但不支持動(dòng)態(tài)資源分配。
  • Mesos 模式:在 Mesos 模式下,Spark 應(yīng)用程序會(huì)連接到一個(gè) Apache Mesos 集群,并在集群中運(yùn)行。這種模式支持動(dòng)態(tài)資源分配和細(xì)粒度資源共享,目前國(guó)內(nèi)使用較少。
  • YARN 模式:在 YARN 模式下,Spark 應(yīng)用程序會(huì)連接到一個(gè) Apache Hadoop YARN 集群,并在集群中運(yùn)行。這種模式支持動(dòng)態(tài)資源分配和與其他 Hadoop 生態(tài)系統(tǒng)組件的集成,Spark在Yarn模式下是不需要Master和Worker的。
  • Kubernetes 模式:在 Kubernetes 模式下,Spark 應(yīng)用程序會(huì)連接到一個(gè) Kubernetes 集群,并在集群中運(yùn)行。這種模式支持動(dòng)態(tài)資源分配和容器化部署。

五、RDD詳解

RDD的概念在Spark中十分重要,上面只是簡(jiǎn)單的介紹了一下,下面詳細(xì)的對(duì)RDD展開介紹。

RDD是“Resilient Distributed Dataset”的縮寫,從全稱就可以了解到RDD的一些典型特性:

  • Resilient(彈性):RDD之間會(huì)形成有向無環(huán)圖(DAG),如果RDD丟失了或者失效了,可以從父RDD重新計(jì)算得到。即容錯(cuò)性。
  • Distributed(分布式):RDD的數(shù)據(jù)是以邏輯分區(qū)的形式分布在集群的不同節(jié)點(diǎn)的。
  • Dataset(數(shù)據(jù)集):即RDD存儲(chǔ)的數(shù)據(jù)記錄,可以從外部數(shù)據(jù)生成RDD,例如Json文件,CSV文件,文本文件,數(shù)據(jù)庫(kù)等。

RDD里面的數(shù)據(jù)集會(huì)被邏輯分成若干個(gè)分區(qū),這些分區(qū)是分布在集群的不同節(jié)點(diǎn)的,基于這樣的特性,RDD才能在集群不同節(jié)點(diǎn)并行計(jì)算。

1.RDD特性

  • 內(nèi)存計(jì)算:Spark RDD運(yùn)算數(shù)據(jù)是在內(nèi)存中進(jìn)行的,在內(nèi)存足夠的情況下,不會(huì)把中間結(jié)果存儲(chǔ)在磁盤,所以計(jì)算速度非常高效。
  • 惰性求值:所有的轉(zhuǎn)換操作都是惰性的,也就是說不會(huì)立即執(zhí)行任務(wù),只是把對(duì)數(shù)據(jù)的轉(zhuǎn)換操作記錄下來而已。只有碰到action操作才會(huì)被真正的執(zhí)行。
  • 容錯(cuò)性:Spark RDD具備容錯(cuò)特性,在RDD失效或者數(shù)據(jù)丟失的時(shí)候,可以根據(jù)DAG從父RDD重新把數(shù)據(jù)集計(jì)算出來,以達(dá)到數(shù)據(jù)容錯(cuò)的效果。
  • 不變性:RDD是進(jìn)程安全的,因?yàn)镽DD是不可修改的。它可以在任何時(shí)間點(diǎn)被創(chuàng)建和查詢,使得緩存,共享,備份都非常簡(jiǎn)單。在計(jì)算過程中,是RDD的不可修改特性保證了數(shù)據(jù)的一致性。
  • 持久化:可以調(diào)用cache或者persist函數(shù),把RDD緩存在內(nèi)存、磁盤,下次使用的時(shí)候不需要重新計(jì)算而是直接使用。

2.RDD操作

RDD支持兩種操作:

  • 轉(zhuǎn)換操作(Transformation)。
  • 行動(dòng)操作(Actions)。

(1) 轉(zhuǎn)換操作(Transformation)

轉(zhuǎn)換操作以RDD做為輸入?yún)?shù),然后輸出一個(gè)或者多個(gè)RDD。轉(zhuǎn)換操作不會(huì)修改輸入RDD。Map()、Filter()這些都屬于轉(zhuǎn)換操作。

轉(zhuǎn)換操作是惰性求值操作,只有在碰到行動(dòng)操作(Actions)的時(shí)候,轉(zhuǎn)換操作才會(huì)真正實(shí)行。轉(zhuǎn)換操作分兩種:「窄依賴」和「寬依賴」。

下面是一些常見的轉(zhuǎn)換操作:

轉(zhuǎn)換操作

描述

map

將函數(shù)應(yīng)用于 RDD 中的每個(gè)元素,并返回一個(gè)新的 RDD

filter

返回一個(gè)新的 RDD,其中包含滿足給定謂詞的元素

flatMap

將函數(shù)應(yīng)用于 RDD 中的每個(gè)元素,并將返回的迭代器展平為一個(gè)新的 RDD

union

返回一個(gè)新的 RDD,其中包含兩個(gè) RDD 的元素

distinct

返回一個(gè)新的 RDD,其中包含原始 RDD 中不同的元素

groupByKey

將鍵值對(duì) RDD 中具有相同鍵的元素分組到一起,并返回一個(gè)新的 RDD

reduceByKey

將鍵值對(duì) RDD 中具有相同鍵的元素聚合到一起,并返回一個(gè)新的 RDD

sortByKey

返回一個(gè)新的鍵值對(duì) RDD,其中元素按照鍵排序

(2)行動(dòng)操作(Action)

Action是數(shù)據(jù)執(zhí)行部分,其通過執(zhí)行count,reduce,collect等方法真正執(zhí)行數(shù)據(jù)的計(jì)算部分。

Action 操作

描述

reduce

通過函數(shù)聚合 RDD 中的所有元素

collect

將 RDD 中的所有元素返回到驅(qū)動(dòng)程序

count

返回 RDD 中的元素個(gè)數(shù)

first

返回 RDD 中的第一個(gè)元素

take

返回 RDD 中的前 n 個(gè)元素

takeOrdered

返回 RDD 中的前 n 個(gè)元素,按照自然順序或指定的順序排序

saveAsTextFile

將 RDD 中的元素保存到文本文件中

foreach

將函數(shù)應(yīng)用于 RDD 中的每個(gè)元素

3.RDD 的創(chuàng)建方式

創(chuàng)建RDD有3種不同方式:

  • 從外部存儲(chǔ)系統(tǒng)。
  • 從其他RDD。
  • 由一個(gè)已經(jīng)存在的 Scala 集合創(chuàng)建。

(1) 從外部存儲(chǔ)系統(tǒng)

由外部存儲(chǔ)系統(tǒng)的數(shù)據(jù)集創(chuàng)建,包括本地的文件系統(tǒng),還有所有 Hadoop 支持的數(shù)據(jù)集,比如 HDFS、Cassandra、HBase 等:

val rdd1 = sc.textFile("hdfs://node1:8020/wordcount/input/words.txt")

(2) 從其他RDD

通過已有的 RDD 經(jīng)過算子轉(zhuǎn)換生成新的 RDD:

val rdd2=rdd1.flatMap(_.split(" "))

(3) 由一個(gè)已經(jīng)存在的 Scala 集合創(chuàng)建

val rdd3 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
或者
val rdd4 = sc.makeRDD(List(1,2,3,4,5,6,7,8))

其實(shí)makeRDD 方法底層調(diào)用了 parallelize 方法:

4.RDD 緩存機(jī)制

RDD 緩存是在內(nèi)存存儲(chǔ)RDD計(jì)算結(jié)果的一種優(yōu)化技術(shù)。把中間結(jié)果緩存起來以便在需要的時(shí)候重復(fù)使用,這樣才能有效減輕計(jì)算壓力,提升運(yùn)算性能。

要持久化一個(gè)RDD,只要調(diào)用其cache()或者persist()方法即可。在該RDD第一次被計(jì)算出來時(shí),就會(huì)直接緩存在每個(gè)節(jié)點(diǎn)中。而且Spark的持久化機(jī)制還是自動(dòng)容錯(cuò)的,如果持久化的RDD的任何partition丟失了,那么Spark會(huì)自動(dòng)通過其源RDD,使用transformation操作重新計(jì)算該partition。

val rdd1 = sc.textFile("hdfs://node01:8020/words.txt")
val rdd2 = rdd1.flatMap(x=>x.split(" ")).map((_,1)).reduceByKey(_+_)
rdd2.cache //緩存/持久化
rdd2.sortBy(_._2,false).collect//觸發(fā)action,會(huì)去讀取HDFS的文件,rdd2會(huì)真正執(zhí)行持久化
rdd2.sortBy(_._2,false).collect//觸發(fā)action,會(huì)去讀緩存中的數(shù)據(jù),執(zhí)行速度會(huì)比之前快,因?yàn)閞dd2已經(jīng)持久化到內(nèi)存中了

需要注意的是,在觸發(fā)action的時(shí)候,才會(huì)去執(zhí)行持久化。

cache()和persist()的區(qū)別在于,cache()是persist()的一種簡(jiǎn)化方式,cache()的底層就是調(diào)用的persist()的無參版本,就是調(diào)用persist(MEMORY_ONLY),將數(shù)據(jù)持久化到內(nèi)存中。

如果需要從內(nèi)存中去除緩存,那么可以使用unpersist()方法。

rdd.persist(StorageLevel.MEMORY_ONLY)
rdd.unpersist()

5.存儲(chǔ)級(jí)別

RDD存儲(chǔ)級(jí)別主要有以下幾種。

級(jí)別

使用空間

CPU時(shí)間

是否在內(nèi)存中

是否在磁盤上

備注

MEMORY_ONLY

使用未序列化的Java對(duì)象格式,將數(shù)據(jù)保存在內(nèi)存中。如果內(nèi)存不夠存放所有的數(shù)據(jù),則數(shù)據(jù)可能就不會(huì)進(jìn)行持久化。

MEMORY_ONLY_2

數(shù)據(jù)存2份

MEMORY_ONLY_SER

基本含義同MEMORY_ONLY。唯一的區(qū)別是,會(huì)將RDD中的數(shù)據(jù)進(jìn)行序列化。這種方式更加節(jié)省內(nèi)存

MEMORY_ONLY_SER_2

數(shù)據(jù)序列化,數(shù)據(jù)存2份

MEMORY_AND_DISK

中等

部分

部分

如果數(shù)據(jù)在內(nèi)存中放不下,則溢寫到磁盤

MEMORY_AND_DISK_2

中等

部分

部分

數(shù)據(jù)存2份

MEMORY_AND_DISK_SER

部分

部分

基本含義同MEMORY_AND_DISK。唯一的區(qū)別是,會(huì)將RDD中的數(shù)據(jù)進(jìn)行序列化

MEMORY_AND_DISK_SER_2

部分

部分

數(shù)據(jù)存2份

DISK_ONLY

使用未序列化的Java對(duì)象格式,將數(shù)據(jù)全部寫入磁盤文件中。

DISK_ONLY_2

數(shù)據(jù)存2份

OFF_HEAP

 

 

 

 

這個(gè)目前是試驗(yàn)型選項(xiàng),類似MEMORY_ONLY_SER,但是數(shù)據(jù)是存儲(chǔ)在堆外內(nèi)存的。

對(duì)于上述任意一種持久化策略,如果加上后綴_2,代表的是將每個(gè)持久化的數(shù)據(jù),都復(fù)制一份副本,并將副本保存到其他節(jié)點(diǎn)上。

這種基于副本的持久化機(jī)制主要用于進(jìn)行容錯(cuò)。假如某個(gè)節(jié)點(diǎn)掛掉了,節(jié)點(diǎn)的內(nèi)存或磁盤中的持久化數(shù)據(jù)丟失了,那么后續(xù)對(duì)RDD計(jì)算時(shí)還可以使用該數(shù)據(jù)在其他節(jié)點(diǎn)上的副本。如果沒有副本的話,就只能將這些數(shù)據(jù)從源頭處重新計(jì)算一遍了。

6.RDD的血緣關(guān)系

血緣關(guān)系是指 RDD 之間的依賴關(guān)系。當(dāng)你對(duì)一個(gè) RDD 執(zhí)行轉(zhuǎn)換操作時(shí),Spark 會(huì)生成一個(gè)新的 RDD,并記錄這兩個(gè) RDD 之間的依賴關(guān)系。這種依賴關(guān)系就是血緣關(guān)系。

血緣關(guān)系可以幫助 Spark 在發(fā)生故障時(shí)恢復(fù)數(shù)據(jù)。當(dāng)一個(gè)分區(qū)丟失時(shí),Spark 可以根據(jù)血緣關(guān)系重新計(jì)算丟失的分區(qū),而不需要從頭開始重新計(jì)算整個(gè) RDD。

血緣關(guān)系還可以幫助 Spark 優(yōu)化計(jì)算過程。Spark 可以根據(jù)血緣關(guān)系合并多個(gè)連續(xù)的窄依賴轉(zhuǎn)換,減少數(shù)據(jù)傳輸和通信開銷。

我們可以執(zhí)行toDebugString打印RDD的依賴關(guān)系。

下面是一個(gè)簡(jiǎn)單的例子:

val conf = new SparkConf().setAppName("Lineage Example").setMaster("local")
val sc = new SparkContext(conf)

val data = sc.parallelize(List(1, 2, 3, 4, 5))
val mappedData = data.map(x => x + 1)
val filteredData = mappedData.filter(x => x % 2 == 0)

println(filteredData.toDebugString)

在這個(gè)例子中,我們首先創(chuàng)建了一個(gè)包含 5 個(gè)元素的 RDD,并對(duì)它執(zhí)行了兩個(gè)轉(zhuǎn)換操作:map 和 filter。然后,我們使用 toDebugString 方法打印了最終 RDD 的血緣關(guān)系。

運(yùn)行這段代碼后,你會(huì)看到類似下面的輸出:

(2) MapPartitionsRDD[2] at filter at <console>:26 []
 |  MapPartitionsRDD[1] at map at <console>:24 []
 |  ParallelCollectionRDD[0] at parallelize at <console>:22 []

這個(gè)輸出表示最終的 RDD 是通過兩個(gè)轉(zhuǎn)換操作(map 和 filter)從原始的 ParallelCollectionRDD 轉(zhuǎn)換而來的。

六、CheckPoint

CheckPoint可以將RDD從其依賴關(guān)系中抽出來,保存到可靠的存儲(chǔ)系統(tǒng)(例如HDFS,S3等), 即它可以將數(shù)據(jù)和元數(shù)據(jù)保存到檢查指向目錄中。 因此,在程序發(fā)生崩潰的時(shí)候,Spark可以恢復(fù)此數(shù)據(jù),并從停止的任何地方開始。

CheckPoint分為兩類:

  • 高可用CheckPoint:容錯(cuò)性優(yōu)先。這種類型的檢查點(diǎn)可確保數(shù)據(jù)永久存儲(chǔ),如存儲(chǔ)在HDFS或其他分布式文件系統(tǒng)上。 這也意味著數(shù)據(jù)通常會(huì)在網(wǎng)絡(luò)中復(fù)制,這會(huì)降低檢查點(diǎn)的運(yùn)行速度。
  • 本地CheckPoint:性能優(yōu)先。 RDD持久保存到執(zhí)行程序中的本地文件系統(tǒng)。 因此,數(shù)據(jù)寫得更快,但本地文件系統(tǒng)也不是完全可靠的,一旦數(shù)據(jù)丟失,工作將無法恢復(fù)。

開發(fā)人員可以使用RDD.checkpoint()方法來設(shè)置檢查點(diǎn)。在使用檢查點(diǎn)之前,必須使用SparkContext.setCheckpointDir(directory: String)方法設(shè)置檢查點(diǎn)目錄。

下面是一個(gè)簡(jiǎn)單的例子:

import org.apache.spark.{SparkConf, SparkContext}

object CheckpointExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Checkpoint Example").setMaster("local")
    val sc = new SparkContext(conf)

    // 設(shè)置 checkpoint 目錄
    sc.setCheckpointDir("/tmp/checkpoint")

    val data = sc.parallelize(List(1, 2, 3, 4, 5))
    val mappedData = data.map(x => x + 1)
    val filteredData = mappedData.filter(x => x % 2 == 0)

    // 對(duì) RDD 進(jìn)行 checkpoint
    filteredData.checkpoint()
    // 觸發(fā) checkpoint
    filteredData.count()
  }
}

RDD的檢查點(diǎn)機(jī)制就好比Hadoop將中間計(jì)算值存儲(chǔ)到磁盤,即使計(jì)算中出現(xiàn)了故障,我們也可以輕松地從中恢復(fù)。通過對(duì) RDD 啟動(dòng)檢查點(diǎn)機(jī)制可以實(shí)現(xiàn)容錯(cuò)和高可用。

Persist VS CheckPoint

  • 位置:Persist 和 Cache 只能保存在本地的磁盤和內(nèi)存中(或者堆外內(nèi)存–實(shí)驗(yàn)中),而 Checkpoint 可以保存數(shù)據(jù)到 HDFS 這類可靠的存儲(chǔ)上。
  • 生命周期:Cache 和 Persist 的 RDD 會(huì)在程序結(jié)束后會(huì)被清除或者手動(dòng)調(diào)用 unpersist 方法,而 Checkpoint 的 RDD 在程序結(jié)束后依然存在,不會(huì)被刪除。CheckPoint將RDD持久化到HDFS或本地文件夾,如果不被手動(dòng)remove掉,是一直存在的,也就是說可以被下一個(gè)driver使用,而Persist不能被其他dirver使用。

七、Spark-Submit

1.詳細(xì)參數(shù)說明

參數(shù)名

參數(shù)說明

—master

master 的地址,提交任務(wù)到哪里執(zhí)行,例如 spark://host:port, yarn, local。具體指可參考下面關(guān)于Master_URL的列表

—deploy-mode

在本地 (client) 啟動(dòng) driver 或在 cluster 上啟動(dòng),默認(rèn)是 client

—class

應(yīng)用程序的主類,僅針對(duì) java 或 scala 應(yīng)用

—name

應(yīng)用程序的名稱

—jars

用逗號(hào)分隔的本地 jar 包,設(shè)置后,這些 jar 將包含在 driver 和 executor 的 classpath 下

—packages

包含在driver 和executor 的 classpath 中的 jar 的 maven 坐標(biāo)

—exclude-packages

為了避免沖突 而指定不包含的 package

—repositories

遠(yuǎn)程 repository

—conf PROP=VALUE

指定 spark 配置屬性的值, 例如 -conf spark.executor.extraJavaOptinotallow=”-XX:MaxPermSize=256m”

—properties-file

加載的配置文件,默認(rèn)為 conf/spark-defaults.conf

—driver-memory

Driver內(nèi)存,默認(rèn) 1G

—driver-java-options

傳給 driver 的額外的 Java 選項(xiàng)

—driver-library-path

傳給 driver 的額外的庫(kù)路徑

—driver-class-path

傳給 driver 的額外的類路徑

—driver-cores

Driver 的核數(shù),默認(rèn)是1。在 yarn 或者 standalone 下使用

—executor-memory

每個(gè) executor 的內(nèi)存,默認(rèn)是1G

—total-executor-cores

所有 executor 總共的核數(shù)。僅僅在 mesos 或者 standalone 下使用

—num-executors

啟動(dòng)的 executor 數(shù)量。默認(rèn)為2。在 yarn 下使用

—executor-core

每個(gè) executor 的核數(shù)。在yarn或者standalone下使用

2.Master_URL的值

Master URL

含義

local

使用1個(gè)worker線程在本地運(yùn)行Spark應(yīng)用程序

local[K]

使用K個(gè)worker線程在本地運(yùn)行Spark應(yīng)用程序

local[*]

使用所有剩余worker線程在本地運(yùn)行Spark應(yīng)用程序

spark://HOST:PORT

連接到Spark Standalone集群,以便在該集群上運(yùn)行Spark應(yīng)用程序

mesos://HOST:PORT

連接到Mesos集群,以便在該集群上運(yùn)行Spark應(yīng)用程序

yarn-client

以client方式連接到Y(jié)ARN集群,集群的定位由環(huán)境變量HADOOP_CONF_DIR定義,該方式driver在client運(yùn)行。

yarn-cluster

以cluster方式連接到Y(jié)ARN集群,集群的定位由環(huán)境變量HADOOP_CONF_DIR定義,該方式driver也在集群中運(yùn)行。

八、Spark 共享變量

一般情況下,當(dāng)一個(gè)傳遞給Spark操作(例如map和reduce)的函數(shù)在遠(yuǎn)程節(jié)點(diǎn)上面運(yùn)行時(shí),Spark操作實(shí)際上操作的是這個(gè)函數(shù)所用變量的一個(gè)獨(dú)立副本。

這些變量被復(fù)制到每臺(tái)機(jī)器上,并且這些變量在遠(yuǎn)程機(jī)器上的所有更新都不會(huì)傳遞回驅(qū)動(dòng)程序。通常跨任務(wù)的讀寫變量是低效的,所以,Spark提供了兩種共享變量:「廣播變量(broadcast variable)」和「累加器(accumulator)」。

1.廣播變量

廣播變量允許程序員緩存一個(gè)只讀的變量在每臺(tái)機(jī)器上面,而不是每個(gè)任務(wù)保存一份拷貝。說白了其實(shí)就是共享變量。

如果Executor端用到了Driver的變量,如果不使用廣播變量在Executor有多少task就有多少Driver端的變量副本。如果使用廣播變量在每個(gè)Executor中只有一份Driver端的變量副本。

一個(gè)廣播變量可以通過調(diào)用SparkContext.broadcast(v)方法從一個(gè)初始變量v中創(chuàng)建。廣播變量是v的一個(gè)包裝變量,它的值可以通過value方法訪問,下面的代碼說明了這個(gè)過程:

import org.apache.spark.{SparkConf, SparkContext}

object BroadcastExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("Broadcast Example").setMaster("local")
    val sc = new SparkContext(conf)

    val data = sc.parallelize(List(1, 2, 3, 4, 5))

    // 創(chuàng)建一個(gè)廣播變量
    val factor = sc.broadcast(2)

    // 使用廣播變量
    val result = data.map(x => x * factor.value)

    result.collect().foreach(println)
  }
}

廣播變量創(chuàng)建以后,我們就能夠在集群的任何函數(shù)中使用它來代替變量v,這樣我們就不需要再次傳遞變量v到每個(gè)節(jié)點(diǎn)上。另外,為了保證所有的節(jié)點(diǎn)得到廣播變量具有相同的值,對(duì)象v不能在廣播之后被修改。

2.累加器

累加器是一種只能通過關(guān)聯(lián)操作進(jìn)行“加”操作的變量,因此它能夠高效的應(yīng)用于并行操作中。它們能夠用來實(shí)現(xiàn)counters和sums。

一個(gè)累加器可以通過調(diào)用SparkContext.accumulator(v)方法從一個(gè)初始變量v中創(chuàng)建。運(yùn)行在集群上的任務(wù)可以通過add方法或者使用+=操作來給它加值。然而,它們無法讀取這個(gè)值。只有驅(qū)動(dòng)程序可以使用value方法來讀取累加器的值。

示例代碼如下:

import org.apache.spark.{SparkConf, SparkContext}

object AccumulatorExample {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("AccumulatorExample")
    val sc = new SparkContext(conf)

    val accum = sc.longAccumulator("My Accumulator")

    sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))

    println(accum.value) // 輸出 10
  }
}

這個(gè)示例中,我們創(chuàng)建了一個(gè)名為 My Accumulator 的累加器,并使用 sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x)) 來對(duì)其進(jìn)行累加。最后,我們使用 println(accum.value) 來輸出累加器的值,結(jié)果為 10。

我們可以利用子類AccumulatorParam創(chuàng)建自己的累加器類型。AccumulatorParam接口有兩個(gè)方法:zero方法為你的數(shù)據(jù)類型提供一個(gè)“0 值”(zero value),addInPlace方法計(jì)算兩個(gè)值的和。例如,假設(shè)我們有一個(gè)Vector類代表數(shù)學(xué)上的向量,我們能夠如下定義累加器:

object VectorAccumulatorParam extends AccumulatorParam[Vector] {
  def zero(initialValue: Vector): Vector = {
    Vector.zeros(initialValue.size)
  }
  def addInPlace(v1: Vector, v2: Vector): Vector = {
    v1 += v2
  }
}
// Then, create an Accumulator of this type:
val vecAccum = sc.accumulator(new Vector(...))(VectorAccumulatorParam)

九、Spark SQL

Spark為結(jié)構(gòu)化數(shù)據(jù)處理引入了一個(gè)稱為Spark SQL的編程模塊。它提供了一個(gè)稱為DataFrame的編程抽象,并且可以充當(dāng)分布式SQL查詢引擎。

1.Spark SQL的特性

  • 集成:無縫地將SQL查詢與Spark程序混合。 Spark SQL允許將結(jié)構(gòu)化數(shù)據(jù)作為Spark中的分布式數(shù)據(jù)集(RDD)進(jìn)行查詢,在Python,Scala和Java中集成了API。這種緊密的集成使得可以輕松地運(yùn)行SQL查詢以及復(fù)雜的分析算法。
  • Hive兼容性:在現(xiàn)有倉(cāng)庫(kù)上運(yùn)行未修改的Hive查詢。 Spark SQL重用了Hive前端和MetaStore,提供與現(xiàn)有Hive數(shù)據(jù),查詢和UDF的完全兼容性。只需將其與Hive一起安裝即可。
  • 標(biāo)準(zhǔn)連接:通過JDBC或ODBC連接。 Spark SQL包括具有行業(yè)標(biāo)準(zhǔn)JDBC和ODBC連接的服務(wù)器模式。
  • 可擴(kuò)展性:對(duì)于交互式查詢和長(zhǎng)查詢使用相同的引擎。 Spark SQL利用RDD模型來支持中查詢?nèi)蒎e(cuò),使其能夠擴(kuò)展到大型作業(yè)。不要擔(dān)心為歷史數(shù)據(jù)使用不同的引擎。

2.Spark SQL 數(shù)據(jù)類型

Spark SQL 支持多種數(shù)據(jù)類型,包括數(shù)字類型、字符串類型、二進(jìn)制類型、布爾類型、日期時(shí)間類型和區(qū)間類型等。

數(shù)字類型包括:

  • ByteType:代表一個(gè)字節(jié)的整數(shù),范圍是 -128 到 127¹²。
  • ShortType:代表兩個(gè)字節(jié)的整數(shù),范圍是 -32768 到 32767¹²。
  • IntegerType:代表四個(gè)字節(jié)的整數(shù),范圍是 -2147483648 到 2147483647¹²。
  • LongType:代表八個(gè)字節(jié)的整數(shù),范圍是 -9223372036854775808 到 9223372036854775807¹²。
  • FloatType:代表四字節(jié)的單精度浮點(diǎn)數(shù)¹²。
  • DoubleType:代表八字節(jié)的雙精度浮點(diǎn)數(shù)¹²。
  • DecimalType:代表任意精度的十進(jìn)制數(shù)據(jù),通過內(nèi)部的 java.math.BigDecimal 支持。BigDecimal 由一個(gè)任意精度的整型非標(biāo)度值和一個(gè) 32 位整數(shù)組成¹²。

字符串類型包括:

  • StringType:代表字符字符串值。

二進(jìn)制類型包括:

  • BinaryType:代表字節(jié)序列值。

布爾類型包括:

  • BooleanType:代表布爾值。

日期時(shí)間類型包括:

  • TimestampType:代表包含字段年、月、日、時(shí)、分、秒的值,與會(huì)話本地時(shí)區(qū)相關(guān)。時(shí)間戳值表示絕對(duì)時(shí)間點(diǎn)。
  • DateType:代表包含字段年、月和日的值,不帶時(shí)區(qū)。

區(qū)間類型包括:

  • YearMonthIntervalType (startField, endField):表示由以下字段組成的連續(xù)子集組成的年月間隔:MONTH(月份),YEAR(年份)。
  • DayTimeIntervalType (startField, endField):表示由以下字段組成的連續(xù)子集組成的日時(shí)間間隔:SECOND(秒),MINUTE(分鐘),HOUR(小時(shí)),DAY(天)。

復(fù)合類型包括:

  • ArrayType (elementType, containsNull):代表由 elementType 類型元素組成的序列值。containsNull 用來指明 ArrayType 中的值是否有 null 值。
  • MapType (keyType, valueType, valueContainsNull):表示包括一組鍵值對(duì)的值。通過 keyType 表示 key 數(shù)據(jù)的類型,通過 valueType 表示 value 數(shù)據(jù)的類型。valueContainsNull 用來指明 MapType 中的值是否有 null 值。
  • StructType (fields):表示一個(gè)擁有 StructFields (fields) 序列結(jié)構(gòu)的值。
  • StructField (name, dataType, nullable):代表 StructType 中的一個(gè)字段,字段的名字通過 name 指定,dataType 指定 field 的數(shù)據(jù)類型,nullable 表示字段的值是否有 null 值。

3.DataFrame

DataFrame 是 Spark 中用于處理結(jié)構(gòu)化數(shù)據(jù)的一種數(shù)據(jù)結(jié)構(gòu)。它類似于關(guān)系數(shù)據(jù)庫(kù)中的表,具有行和列。每一列都有一個(gè)名稱和一個(gè)類型,每一行都是一條記錄。

DataFrame 支持多種數(shù)據(jù)源,包括結(jié)構(gòu)化數(shù)據(jù)文件、Hive 表、外部數(shù)據(jù)庫(kù)和現(xiàn)有的 RDD。它提供了豐富的操作,包括篩選、聚合、分組、排序等。

DataFrame 的優(yōu)點(diǎn)在于它提供了一種高級(jí)的抽象,使得用戶可以使用類似于 SQL 的語言進(jìn)行數(shù)據(jù)處理,而無需關(guān)心底層的實(shí)現(xiàn)細(xì)節(jié)。此外,Spark 會(huì)自動(dòng)對(duì) DataFrame 進(jìn)行優(yōu)化,以提高查詢性能。

下面是一個(gè)使用DataFrame的代碼例子:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("DataFrame Example").getOrCreate()
import spark.implicits._

val data = Seq(
  ("Alice", 25),
  ("Bob", 30),
  ("Charlie", 35)
)

val df = data.toDF("name", "age")

df.show()

在這個(gè)示例中,我們首先創(chuàng)建了一個(gè) SparkSession 對(duì)象,然后使用 toDF 方法將一個(gè)序列轉(zhuǎn)換為 DataFrame。最后,我們使用 show 方法來顯示 DataFrame 的內(nèi)容。

4.創(chuàng)建 DataFrame

在 Scala 中,可以通過以下幾種方式創(chuàng)建 DataFrame:

從現(xiàn)有的 RDD 轉(zhuǎn)換而來。例如:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Create DataFrame").getOrCreate()
import spark.implicits._

case class Person(name: String, age: Int)

val rdd = spark.sparkContext.parallelize(Seq(Person("Alice", 25), Person("Bob", 30)))
val df = rdd.toDF()
df.show()

從外部數(shù)據(jù)源讀取。例如,從 JSON 文件中讀取數(shù)據(jù)并創(chuàng)建 DataFrame:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Create DataFrame").getOrCreate()

val df = spark.read.json("path/to/json/file")
df.show()

通過編程方式創(chuàng)建。例如,使用 createDataFrame 方法:

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

val spark = SparkSession.builder.appName("Create DataFrame").getOrCreate()

val schema = StructType(
  List(
    StructField("name", StringType, nullable = true),
    StructField("age", IntegerType, nullable = true)
  )
)

val data = Seq(Row("Alice", 25), Row("Bob", 30))
val rdd = spark.sparkContext.parallelize(data)

val df = spark.createDataFrame(rdd, schema)
df.show()

5.DSL & SQL

在 Spark 中,可以使用兩種方式對(duì) DataFrame 進(jìn)行查詢:「DSL(Domain-Specific Language)」和「 SQL」。

DSL 是一種特定領(lǐng)域語言,它提供了一組用于操作 DataFrame 的方法。例如,下面是一個(gè)使用 DSL 進(jìn)行查詢的例子:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("DSL and SQL").getOrCreate()
import spark.implicits._

val df = Seq(
  ("Alice", 25),
  ("Bob", 30),
  ("Charlie", 35)
).toDF("name", "age")

df.select("name", "age")
  .filter($"age" > 25)
  .show()

SQL 是一種結(jié)構(gòu)化查詢語言,它用于管理關(guān)系數(shù)據(jù)庫(kù)系統(tǒng)。在 Spark 中,可以使用 SQL 對(duì) DataFrame 進(jìn)行查詢。例如,下面是一個(gè)使用 SQL 進(jìn)行查詢的例子:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("DSL and SQL").getOrCreate()
import spark.implicits._

val df = Seq(
  ("Alice", 25),
  ("Bob", 30),
  ("Charlie", 35)
).toDF("name", "age")

df.createOrReplaceTempView("people")

spark.sql("SELECT name, age FROM people WHERE age > 25").show()

DSL 和 SQL 的區(qū)別在于語法和風(fēng)格。DSL 使用方法調(diào)用鏈來構(gòu)建查詢,而 SQL 使用聲明式語言來描述查詢。選擇哪種方式取決于個(gè)人喜好和使用場(chǎng)景。

6.Spark SQL 數(shù)據(jù)源

Spark SQL 支持多種數(shù)據(jù)源,包括 Parquet、JSON、CSV、JDBC、Hive 等。

下面是示例代碼:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Data Sources Example").getOrCreate()
// Parquet
val df = spark.read.parquet("path/to/parquet/file")
// JSON 
val df = spark.read.json("path/to/json/file")
// CSV
val df = spark.read.option("header", "true").csv("path/to/csv/file")
// JDBC
val df = spark.read
  .format("jdbc")
  .option("url", "jdbc:MySQL://host:port/database")
  .option("dbtable", "table")
  .option("user", "username")
  .option("password", "password")
  .load()

df.show()

7.load & save

在 Spark 中,load 函數(shù)用于從外部數(shù)據(jù)源讀取數(shù)據(jù)并創(chuàng)建 DataFrame,而 save 函數(shù)用于將 DataFrame 保存到外部數(shù)據(jù)源。

下面是從 Parquet 文件中讀取數(shù)據(jù)并創(chuàng)建 DataFrame 的示例代碼:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Load and Save Example").getOrCreate()

val df = spark.read.load("path/to/parquet/file")
df.show()

下面是將 DataFrame 保存到 Parquet 文件的示例代碼:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Load and Save Example").getOrCreate()
import spark.implicits._

val df = Seq(
  ("Alice", 25),
  ("Bob", 30),
  ("Charlie", 35)
).toDF("name", "age")

df.write.save("path/to/parquet/file")

8.函數(shù)

Spark SQL 提供了豐富的內(nèi)置函數(shù),包括數(shù)學(xué)函數(shù)、字符串函數(shù)、日期時(shí)間函數(shù)、聚合函數(shù)等。你可以在 Spark SQL 的官方文檔中查看所有可用的內(nèi)置函數(shù)。

此外,Spark SQL 還支持「自定義函數(shù)(User-Defined Function,UDF)」,可以讓用戶編寫自己的函數(shù)并在查詢中使用。

下面是一個(gè)使用 SQL 語法編寫自定義函數(shù)的示例代碼:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.udf

val spark = SparkSession.builder.appName("UDF Example").getOrCreate()
import spark.implicits._

val df = Seq(
  ("Alice", 25),
  ("Bob", 30),
  ("Charlie", 35)
).toDF("name", "age")

df.createOrReplaceTempView("people")

val square = udf((x: Int) => x * x)
spark.udf.register("square", square)

spark.sql("SELECT name, square(age) FROM people").show()

在這個(gè)示例中,我們首先定義了一個(gè)名為 square 的自定義函數(shù),它接受一個(gè)整數(shù)參數(shù)并返回它的平方。然后,我們使用 createOrReplaceTempView 方法創(chuàng)建一個(gè)臨時(shí)視圖,并使用 udf.register 方法注冊(cè)自定義函數(shù)。

最后,我們使用 spark.sql 方法執(zhí)行 SQL 查詢,并在查詢中調(diào)用自定義函數(shù)。

9.DataSet

DataSet 是 Spark 1.6 版本中引入的一種新的數(shù)據(jù)結(jié)構(gòu),它提供了 RDD 的強(qiáng)類型和 DataFrame 的查詢優(yōu)化能力。

10.創(chuàng)建DataSet

在 Scala 中,可以通過以下幾種方式創(chuàng)建 DataSet:

從現(xiàn)有的 RDD 轉(zhuǎn)換而來。例如:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Create DataSet").getOrCreate()
import spark.implicits._

case class Person(name: String, age: Int)

val rdd = spark.sparkContext.parallelize(Seq(Person("Alice", 25), Person("Bob", 30)))
val ds = rdd.toDS()
ds.show()

從外部數(shù)據(jù)源讀取。例如,從 JSON 文件中讀取數(shù)據(jù)并創(chuàng)建 DataSet:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Create DataSet").getOrCreate()
import spark.implicits._

case class Person(name: String, age: Long)

val ds = spark.read.json("path/to/json/file").as[Person]
ds.show()

通過編程方式創(chuàng)建。例如,使用 createDataset 方法:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Create DataSet").getOrCreate()
import spark.implicits._

case class Person(name: String, age: Int)

val data = Seq(Person("Alice", 25), Person("Bob", 30))
val ds = spark.createDataset(data)
ds.show()

11.DataSet VS DataFrame

DataSet 和 DataFrame 都是 Spark 中用于處理結(jié)構(gòu)化數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu)。它們都提供了豐富的操作,包括篩選、聚合、分組、排序等。

它們之間的主要區(qū)別在于類型安全性。DataFrame 是一種弱類型的數(shù)據(jù)結(jié)構(gòu),它的列只有在運(yùn)行時(shí)才能確定類型。這意味著,在編譯時(shí)無法檢測(cè)到類型錯(cuò)誤,只有在運(yùn)行時(shí)才會(huì)拋出異常。

而 DataSet 是一種強(qiáng)類型的數(shù)據(jù)結(jié)構(gòu),它的類型在編譯時(shí)就已經(jīng)確定。這意味著,如果你試圖對(duì)一個(gè)不存在的列進(jìn)行操作,或者對(duì)一個(gè)列進(jìn)行錯(cuò)誤的類型轉(zhuǎn)換,編譯器就會(huì)報(bào)錯(cuò)。

此外,DataSet 還提供了一些額外的操作,例如 map、flatMap、reduce 等。

12.RDD & DataFrame & Dataset 轉(zhuǎn)化

RDD、DataFrame、Dataset三者有許多共性,有各自適用的場(chǎng)景常常需要在三者之間轉(zhuǎn)換。

DataFrame/Dataset 轉(zhuǎn) RDD:

val rdd1=testDF.rdd
val rdd2=testDS.rdd

RDD 轉(zhuǎn) DataSet:

import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型
val testDS = rdd.map {line=>
      Coltest(line._1,line._2)
    }.toDS

可以注意到,定義每一行的類型(case class)時(shí),已經(jīng)給出了字段名和類型,后面只要往case class里面添加值即可。

Dataset 轉(zhuǎn) DataFrame:

import spark.implicits._
val testDF = testDS.toDF

DataFrame 轉(zhuǎn) Dataset:

import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定義字段名和類型
val testDS = testDF.as[Coltest]

這種方法就是在給出每一列的類型后,使用as方法,轉(zhuǎn)成Dataset,這在數(shù)據(jù)類型在DataFrame需要針對(duì)各個(gè)字段處理時(shí)極為方便。

注意:在使用一些特殊的操作時(shí),一定要加上 import spark.implicits._ 不然toDF、toDS無法使用。

十、Spark Streaming

Spark Streaming 的工作原理是將實(shí)時(shí)數(shù)據(jù)流拆分為小批量數(shù)據(jù),并使用 Spark 引擎對(duì)這些小批量數(shù)據(jù)進(jìn)行處理。這種微批處理(Micro-Batch Processing)的方式使得 Spark Streaming 能夠以近乎實(shí)時(shí)的延遲處理大規(guī)模的數(shù)據(jù)流。

下面是一個(gè)簡(jiǎn)單的 Spark Streaming 示例代碼:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

val conf = new SparkConf().setAppName("Spark Streaming Example")
val ssc = new StreamingContext(conf, Seconds(1))

val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

wordCounts.print()

ssc.start()
ssc.awaitTermination()

我們首先創(chuàng)建了一個(gè) StreamingContext 對(duì)象,并指定了批處理間隔為 1 秒。然后,我們使用 socketTextStream 方法從套接字源創(chuàng)建了一個(gè) DStream。接下來,我們對(duì) DStream 進(jìn)行了一系列操作,包括 flatMap、map 和 reduceByKey。最后,我們使用 print 方法打印出單詞計(jì)數(shù)的結(jié)果。

1.Spark Streaming 優(yōu)缺點(diǎn)

Spark Streaming 作為一種實(shí)時(shí)流處理框架,具有以下優(yōu)點(diǎn):

  • 高性能:Spark Streaming 基于 Spark 引擎,能夠快速處理大規(guī)模的數(shù)據(jù)流。
  • 易用性:Spark Streaming 提供了豐富的 API,可以讓開發(fā)人員快速構(gòu)建實(shí)時(shí)流處理應(yīng)用。
  • 容錯(cuò)性:Spark Streaming 具有良好的容錯(cuò)性,能夠在節(jié)點(diǎn)故障時(shí)自動(dòng)恢復(fù)。
  • 集成性:Spark Streaming 能夠與 Spark 生態(tài)系統(tǒng)中的其他組件(如 Spark SQL、MLlib 等)無縫集成。

但是,Spark Streaming 也有一些缺點(diǎn):

  • 延遲:由于 Spark Streaming 基于微批處理模型,因此它的延遲相對(duì)較高。對(duì)于需要極低延遲的應(yīng)用場(chǎng)景,Spark Streaming 可能不是最佳選擇。
  • 復(fù)雜性:Spark Streaming 的配置和調(diào)優(yōu)相對(duì)復(fù)雜,需要一定的經(jīng)驗(yàn)和技能。

2.DStream

DStream(離散化流)是 Spark Streaming 中用于表示實(shí)時(shí)數(shù)據(jù)流的一種抽象。它由一系列連續(xù)的 RDD 組成,每個(gè) RDD 包含一段時(shí)間內(nèi)收集到的數(shù)據(jù)。

在 Spark Streaming 中,可以通過以下幾種方式創(chuàng)建 DStream:

(1) 從輸入源創(chuàng)建。例如,從套接字源創(chuàng)建 DStream:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

val conf = new SparkConf().setAppName("DStream Example")
val ssc = new StreamingContext(conf, Seconds(1))

val lines = ssc.socketTextStream("localhost", 9999)
lines.print()

ssc.start()
ssc.awaitTermination()

(2) 通過轉(zhuǎn)換操作創(chuàng)建。例如,對(duì)現(xiàn)有的 DStream 進(jìn)行 map 操作:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

val conf = new SparkConf().setAppName("DStream Example")
val ssc = new StreamingContext(conf, Seconds(1))

val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
words.print()

ssc.start()
ssc.awaitTermination()

(3) 通過連接操作創(chuàng)建。例如,對(duì)兩個(gè) DStream 進(jìn)行 union 操作:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

val conf = new SparkConf().setAppName("DStream Example")
val ssc = new StreamingContext(conf, Seconds(1))

val lines1 = ssc.socketTextStream("localhost", 9999)
val lines2 = ssc.socketTextStream("localhost", 9998)
val lines = lines1.union(lines2)
lines.print()

ssc.start()
ssc.awaitTermination()

總結(jié):簡(jiǎn)單來說 DStream 就是對(duì) RDD 的封裝,你對(duì) DStream 進(jìn)行操作,就是對(duì) RDD 進(jìn)行操作。對(duì)于 DataFrame/DataSet/DStream 來說本質(zhì)上都可以理解成 RDD。

3.窗口函數(shù)

在 Spark Streaming 中,窗口函數(shù)用于對(duì) DStream 中的數(shù)據(jù)進(jìn)行窗口化處理。它允許你對(duì)一段時(shí)間內(nèi)的數(shù)據(jù)進(jìn)行聚合操作。

Spark Streaming 提供了多種窗口函數(shù),包括:

  • window:返回一個(gè)新的 DStream,它包含了原始 DStream 中指定窗口大小和滑動(dòng)間隔的數(shù)據(jù)。
  • countByWindow:返回一個(gè)新的單元素 DStream,它包含了原始 DStream 中指定窗口大小和滑動(dòng)間隔的元素個(gè)數(shù)。
  • reduceByWindow:返回一個(gè)新的 DStream,它包含了原始 DStream 中指定窗口大小和滑動(dòng)間隔的元素經(jīng)過 reduce 函數(shù)處理后的結(jié)果。
  • reduceByKeyAndWindow:類似于 reduceByWindow,但是在進(jìn)行 reduce 操作之前會(huì)先按照 key 進(jìn)行分組。

下面是一個(gè)使用窗口函數(shù)的示例代碼:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

val conf = new SparkConf().setAppName("Window Example")
val ssc = new StreamingContext(conf, Seconds(1))

val lines = ssc.socketTextStream("localhost", 9999)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(30), Seconds(10))

wordCounts.print()

ssc.start()
ssc.awaitTermination()

在這個(gè)示例中,我們首先創(chuàng)建了一個(gè) DStream,并對(duì)其進(jìn)行了一系列轉(zhuǎn)換操作。然后,我們使用 reduceByKeyAndWindow 函數(shù)對(duì) DStream 進(jìn)行窗口化處理,指定了窗口大小為 30 秒,滑動(dòng)間隔為 10 秒。最后,我們使用 print 方法打印出單詞計(jì)數(shù)的結(jié)果。

4.輸出操作

Spark Streaming允許DStream的數(shù)據(jù)輸出到外部系統(tǒng),如數(shù)據(jù)庫(kù)或文件系統(tǒng),輸出的數(shù)據(jù)可以被外部系統(tǒng)所使用,該操作類似于RDD的輸出操作。Spark Streaming支持以下輸出操作:

  • **print() **: 打印DStream中每個(gè)RDD的前10個(gè)元素到控制臺(tái)。
  • **saveAsTextFiles(prefix, [suffix] **: 將此DStream中每個(gè)RDD的所有元素以文本文件的形式保存。每個(gè)批次的數(shù)據(jù)都會(huì)保存在一個(gè)單獨(dú)的目錄中,目錄名為:prefix-TIME_IN_MS[.suffix]。
  • **saveAsObjectFiles(prefix, [suffix])**: 將此DStream中每個(gè)RDD的所有元素以Java對(duì)象序列化的形式保存。每個(gè)批次的數(shù)據(jù)都會(huì)保存在一個(gè)單獨(dú)的目錄中,目錄名為:prefix-TIME_IN_MS[.suffix]。
  • **saveAsHadoopFiles(prefix, [suffix])**:將此DStream中每個(gè)RDD的所有元素以Hadoop文件(SequenceFile等)的形式保存。每個(gè)批次的數(shù)據(jù)都會(huì)保存在一個(gè)單獨(dú)的目錄中,目錄名為:prefix-TIME_IN_MS[.suffix]。
  • **foreachRDD(func)**:最通用的輸出操作,將函數(shù)func應(yīng)用于DStream中生成的每個(gè)RDD。通過此函數(shù),可以將數(shù)據(jù)寫入任何支持寫入操作的數(shù)據(jù)源。

十一、Structured Streaming

Structured Streaming 是 Spark 2.0 版本中引入的一種新的流處理引擎。它基于 Spark SQL 引擎,提供了一種聲明式的 API 來處理結(jié)構(gòu)化數(shù)據(jù)流。

與 Spark Streaming 相比,Structured Streaming 具有以下優(yōu)點(diǎn):

  • 易用性:Structured Streaming 提供了與 Spark SQL 相同的 API,可以讓開發(fā)人員快速構(gòu)建流處理應(yīng)用。
  • 高性能:Structured Streaming 基于 Spark SQL 引擎,能夠快速處理大規(guī)模的數(shù)據(jù)流。
  • 容錯(cuò)性:Structured Streaming 具有良好的容錯(cuò)性,能夠在節(jié)點(diǎn)故障時(shí)自動(dòng)恢復(fù)。
  • 端到端一致性:Structured Streaming 提供了端到端一致性保證,能夠確保數(shù)據(jù)不丟失、不重復(fù)。

下面是一個(gè)簡(jiǎn)單的 Structured Streaming 示例代碼:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

import spark.implicits._

val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()

val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()

在這個(gè)示例中,我們首先創(chuàng)建了一個(gè) SparkSession 對(duì)象。然后,我們使用 readStream 方法從套接字源創(chuàng)建了一個(gè) DataFrame。接下來,我們對(duì) DataFrame 進(jìn)行了一系列操作,包括 flatMap、groupBy 和 count。最后,我們使用 writeStream 方法將結(jié)果輸出到控制臺(tái)。

Structured Streaming 同樣支持 DSL 和 SQL 語法。

DSL 語法:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

import spark.implicits._

val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()

val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()

SQL 語法:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("Structured Streaming Example").getOrCreate()

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

lines.createOrReplaceTempView("lines")

val wordCounts = spark.sql(
  """
    |SELECT value, COUNT(*) as count
    |FROM (
    |    SELECT explode(split(value, ' ')) as value
    |    FROM lines
    |)
    |GROUP BY value
  """.stripMargin)

val query = wordCounts.writeStream
  .outputMode("complete")
  .format("console")
  .start()

query.awaitTermination()

1.Source

Structured Streaming 支持多種輸入源,包括文件源(如文本文件、Parquet 文件、JSON 文件等)、Kafka、Socket 等。下面是一個(gè)使用 Scala 語言從 Kafka 中讀取數(shù)據(jù)的例子:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()

// 訂閱一個(gè)主題
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .as[(String, String)]

2.Output

Structured Streaming 支持多種輸出方式,包括控制臺(tái)輸出、內(nèi)存輸出、文件輸出、數(shù)據(jù)源輸出等。下面是將數(shù)據(jù)寫入到 Parquet 文件中的例子:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()

// 從 socket 中讀取數(shù)據(jù)
val lines = spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// 將數(shù)據(jù)寫入到 Parquet 文件中
lines.writeStream
  .format("parquet")
  .option("path", "path/to/output/dir")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .start()

3.Output Mode

每當(dāng)結(jié)果表更新時(shí),我們都希望將更改后的結(jié)果行寫入外部接收器。

Output mode 指定了數(shù)據(jù)寫入輸出接收器的方式。Structured Streaming 支持以下三種 output mode:

Output Mode

描述

Append

只將流 DataFrame/Dataset 中的新行寫入接收器。

Complete

每當(dāng)有更新時(shí),將流 DataFrame/Dataset 中的所有行寫入接收器。

Update

每當(dāng)有更新時(shí),只將流 DataFrame/Dataset 中更新的行寫入接收器。

4.Output Sink

Output sink 指定了數(shù)據(jù)寫入的位置。Structured Streaming 支持多種輸出接收器,包括文件接收器、Kafka 接收器、Foreach 接收器、控制臺(tái)接收器和內(nèi)存接收器等。下面是一些使用 Scala 語言將數(shù)據(jù)寫入到不同輸出接收器中的例子:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder.appName("StructuredStreaming").getOrCreate()

// 從 socket 中讀取數(shù)據(jù)
val lines = spark
  .readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

// 將數(shù)據(jù)寫入到 Parquet 文件中
lines.writeStream
  .format("parquet")
  .option("path", "path/to/output/dir")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .start()

// 將數(shù)據(jù)寫入到 Kafka 中
//selectExpr 是一個(gè) DataFrame 的轉(zhuǎn)換操作,它允許你使用 SQL 表達(dá)式來選擇 DataFrame 中的列。
//selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") 表示選擇 key 和 value 列,并將它們的類型轉(zhuǎn)換為字符串類型。
//這是因?yàn)?Kafka 接收器要求數(shù)據(jù)必須是字符串類型或二進(jìn)制類型。
lines.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .start()

// 將數(shù)據(jù)寫入到控制臺(tái)中
lines.writeStream
  .format("console")
  .start()

// 將數(shù)據(jù)寫入到內(nèi)存中
lines.writeStream
  .format("memory")
  .queryName("tableName")
  .start()

5.PV,UV統(tǒng)計(jì)

下面是用Structured Streaming實(shí)現(xiàn)PV,UV統(tǒng)計(jì)的例子,我們來感受實(shí)戰(zhàn)下:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object PVUVExample {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder.appName("PVUVExample").getOrCreate()
    import spark.implicits._

    // 假設(shè)我們有一個(gè)包含用戶ID和訪問的URL的輸入流
    val lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
    val data = lines.as[String].map(line => {
      val parts = line.split(",")
      (parts(0), parts(1))
    }).toDF("user", "url")

    // 計(jì)算PV
    val pv = data.groupBy("url").count().withColumnRenamed("count", "pv")
    val pvQuery = pv.writeStream.outputMode("complete").format("console").start()

    // 計(jì)算UV
    val uv = data.dropDuplicates().groupBy("url").count().withColumnRenamed("count", "uv")
    val uvQuery = uv.writeStream.outputMode("complete").format("console").start()

    pvQuery.awaitTermination()
    uvQuery.awaitTermination()
  }
}

這段代碼演示了如何使用Structured Streaming對(duì)數(shù)據(jù)進(jìn)行PV和UV統(tǒng)計(jì)。它首先從一個(gè)socket源讀取數(shù)據(jù),然后使用groupBy和count對(duì)數(shù)據(jù)進(jìn)行PV統(tǒng)計(jì),最后使用dropDuplicates、groupBy和count對(duì)數(shù)據(jù)進(jìn)行UV統(tǒng)計(jì)。

假設(shè)我們?cè)诒镜貑?dòng)了一個(gè)socket服務(wù)器,并向其發(fā)送以下數(shù)據(jù):

user1,http://example.com/page1
user2,http://example.com/page1
user1,http://example.com/page2
user3,http://example.com/page1
user2,http://example.com/page2
user3,http://example.com/page2

那么程序?qū)⑤敵鲆韵陆Y(jié)果:

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---+
|                 url| pv|
+--------------------+---+
|http://example.co...|  3|
|http://example.co...|  3|
+--------------------+---+

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+---+
|                 url| uv|
+--------------------+---+
|http://example.co...|  2|
|http://example.co...|  3|
+--------------------+---+

總結(jié)

在此,我們對(duì)Spark的基本概念、使用方式以及部分原理進(jìn)行了簡(jiǎn)單的介紹。Spark以其強(qiáng)大的處理能力和靈活性,已經(jīng)成為大數(shù)據(jù)處理領(lǐng)域的一個(gè)重要工具。然而,這只是冰山一角。Spark的世界里還有許多深度和廣度等待著我們?nèi)ヌ剿鳌?/p>

作為初學(xué)者,你可能會(huì)覺得這個(gè)領(lǐng)域龐大且復(fù)雜。但請(qǐng)記住,每個(gè)都是從初學(xué)者開始的。不斷的學(xué)習(xí)和實(shí)踐,你將能夠更好的理解和掌握Spark,并將其應(yīng)用于解決實(shí)際問題。這篇文章可能不能涵蓋所有的知識(shí)點(diǎn),但我希望它能帶給你收獲和思考。

分享到:
標(biāo)簽:Spark
用戶無頭像

網(wǎng)友整理

注冊(cè)時(shí)間:

網(wǎng)站:5 個(gè)   小程序:0 個(gè)  文章:12 篇

  • 51998

    網(wǎng)站

  • 12

    小程序

  • 1030137

    文章

  • 747

    會(huì)員

趕快注冊(cè)賬號(hào),推廣您的網(wǎng)站吧!
最新入駐小程序

數(shù)獨(dú)大挑戰(zhàn)2018-06-03

數(shù)獨(dú)一種數(shù)學(xué)游戲,玩家需要根據(jù)9

答題星2018-06-03

您可以通過答題星輕松地創(chuàng)建試卷

全階人生考試2018-06-03

各種考試題,題庫(kù),初中,高中,大學(xué)四六

運(yùn)動(dòng)步數(shù)有氧達(dá)人2018-06-03

記錄運(yùn)動(dòng)步數(shù),積累氧氣值。還可偷

每日養(yǎng)生app2018-06-03

每日養(yǎng)生,天天健康

體育訓(xùn)練成績(jī)?cè)u(píng)定2018-06-03

通用課目體育訓(xùn)練成績(jī)?cè)u(píng)定