
作者:肖冠宇
來源:大數據DT(ID:hzdashuju)
內容摘編自《企業大數據處理:Spark、Druid、Flume與Kafka應用實踐》
導讀:Spark是由加州大學伯克利分校AMP實驗室開源的分布式大規模數據處理通用引擎,具有高吞吐、低延時、通用易擴展、高容錯等特點。Spark內部提供了豐富的開發庫,集成了數據分析引擎Spark SQL、圖計算框架GraphX、機器學習庫MLlib、流計算引擎Spark Streaming。
Spark在函數式編程語言Scala中實現,提供了豐富的開發API,支持Scala、Java、Python、R等多種開發語言。同時,Spark提供了多種運行模式,既可以采用獨立部署的方式運行,也可以依托Hadoop YARN、Apache Mesos等資源管理器調度任務運行。
目前,Spark已經在金融、交通、醫療、氣象等多種領域中廣泛使用。
01 Spark概述
1. 核心概念介紹
Spark架構示意圖如圖2-1所示,下面將分別介紹各核心組件。
Client:客戶端進程,負責提交作業。
Driver:一個Spark作業有一個Spark Context,一個Spark Context對應一個Driver進程,作業的main函數運行在Driver中。Driver主要負責Spark作業的解析,以及通過DAGScheduler劃分Stage,將Stage轉化成TaskSet提交給TaskScheduler任務調度器,進而調度Task到Executor上執行。
Executor:負責執行Driver分發的Task任務。集群中一個節點可以啟動多個Executor,每一個Executor可以執行多個Task任務。
Catche:Spark提供了對RDD不同級別的緩存策略,分別可以緩存到內存、磁盤、外部分布式內存存儲系統Tachyon等。
Application:提交的一個作業就是一個Application,一個Application只有一個Spark Context。
Job:RDD執行一次Action操作就會生成一個Job。
Task:Spark運行的基本單位,負責處理RDD的計算邏輯。
Stage:DAGScheduler將Job劃分為多個Stage,Stage的劃分界限為Shuffle的產生,Shuffle標志著上一個Stage的結束和下一個Stage的開始。
TaskSet:劃分的Stage會轉換成一組相關聯的任務集。
RDD(Resilient Distributed Dataset):彈性分布式數據集,可以理解為一種只讀的分布式多分區的數組,Spark計算操作都是基于RDD進行的,下面會有詳細介紹。
DAG(Directed Acyclic Graph):有向無環圖。Spark實現了DAG的計算模型,DAG計算模型是指將一個計算任務按照計算規則分解為若干子任務,這些子任務之間根據邏輯關系構建成有向無環圖。
▲圖2-1 Spark架構示意圖
2. RDD介紹
RDD從字面上理解有些困難,我們可以認為是一種分布式多分區只讀的數組,Spark計算操作都是基于RDD進行的。
RDD具有幾個特性:只讀、多分區、分布式,可以將HDFS塊文件轉換成RDD,也可以由一個或多個RDD轉換成新的RDD,失效自動重構?;谶@些特性,RDD在分布式環境下能夠被高效地并行處理。
(1)計算類型
在Spark中RDD提供Transformation和Action兩種計算類型。Transformation操作非常豐富,采用延遲執行的方式,在邏輯上定義了RDD的依賴關系和計算邏輯,但并不會真正觸發執行動作,只有等到Action操作才會觸發真正執行操作。Action操作常用于最終結果的輸出。
常用的Transformation操作及其描述:
map (func):接收一個處理函數并行處理源RDD中的每個元素,返回與源RDD元素一一對應的新RDD
filter (func):并行處理源RDD中的每個元素,接收一個處理函數,并根據定義的規則對RDD中的每個元素進行過濾處理,返回處理結果為true的元素重新組成新的RDD
flatMap (func):flatMap是map和flatten的組合操作,與map函數相似,不過map函數返回的新RDD包含的元素可能是嵌套類型,flatMap接收一個處理嵌套會將嵌套類型的元素展開映射成多個元素組成新的RDD
mapPartitions (func):與map函數應用于RDD中的每個元素不同,mapPartitions應用于RDD中的每個分區。mapPartitions函數接收的參數為func函數,func接收參數為每個分區的迭代器,返回值為每個分區元素處理之后組成的新的迭代器,func會作用于分區中的每一個元素。有一種典型的應用場景,比如待處理分區中的數據需要寫入到數據庫,如果使用map函數,每一個元素都會創建一個數據庫連接對象,非常耗時并且容易引起問題發生,如果使用mapPartitions函數只會在分區中創建一個數據庫連接對象,性能提高明顯
mapPartitionsWithIndex(func):作用與mapPartitions函數相同,只是接收的參數func函數需要傳入兩個參數,分區的索引作為第一個參數傳入,按照分區的索引對分區中元素進行處理
union (otherDataset):將兩個RDD進行合并,返回結果為RDD中元素(不去重)
intersection (otherDataset):對兩個RDD進行取交集運算,返回結果為RDD無重復元素
distinct ([numTasks])):對RDD中元素去重
groupByKey ([numTasks]):在KV類型的RDD中按Key分組,將相同Key的元素聚集到同一個分區內,此函數不能接收函數作為參數,只接收一個可選參數任務數,所以不能在RDD分區本地進行聚合計算,如需按Key對Value聚合計算,只能對groupByKey返回的新RDD繼續使用其他函數運算
reduceByKey (func, [numTasks]):對KV類型的RDD按Key分組,接收兩個參數,第一個參數為處理函數,第二個參數為可選參數設置reduce的任務數。reduceByKey函數能夠在RDD分區本地提前進行聚合運算,這有效減少了shuffle過程傳輸的數據量。相對于groupByKey函數更簡潔高效
aggregateByKey (zeroValue)(seqOp, combOp):對KV類型的RDD按Key分組進行reduce計算,可接收三個參數,第一個參數是初始化值,第二個參數是分區內處理函數,第三個參數是分區間處理函數
sortByKey ([ascending], [numTasks]):對KV類型的RDD內部元素按照Key進行排序,排序過程會涉及Shuffle
join (otherDataset, [numTasks]):對KV類型的RDD進行關聯,只能是兩個RDD之間關聯,超過兩個RDD關聯需要使用多次join函數,join函數只會關聯出具有相同Key的元素,相當于SQL語句中的inner join
cogroup (otherDataset, [numTasks]):對KV類型的RDD進行關聯,cogroup處理多個RDD關聯比join更加優雅,它可以同時傳入多個RDD作為參數進行關聯,產生的新RDD中的元素不會出現笛卡爾積的情況,使用fullOuterJoin函數會產生笛卡爾積
coalesce (numPartitions):對RDD重新分區,將RDD中的分區數減小到參數numPartitions個,不會產生shuffle。在較大的數據集中使用filer等過濾操作后可能會產生多個大小不等的中間結果數據文件,重新分區并減小分區可以提高作業的執行效率,是Spark中常用的一種優化手段
repartition (numPartitions):對RDD重新分區,接收一個參數——numPartitions分區數,是coalesce函數設置shuffle為true的一種實現形式
repartitionAndSortWithinPartitions (partitioner):接收一個分區對象(如Spark提供的分區類HashPartitioner)對RDD中元素重新分區并在分區內排序
常用的Action操作及其描述:
reduce(func):處理RDD兩兩之間元素的聚集操作
collect():返回RDD中所有數據元素
count():返回RDD中元素個數
first():返回RDD中的第一個元素
take(n):返回RDD中的前n個元素
saveAsTextFile(path):將RDD寫入文本文件,保存至本地文件系統或者HDFS中
saveAsSequenceFile(path):將KV類型的RDD寫入SequenceFile文件,保存至本地文件系統或者HDFS中
countByKey():返回KV類型的RDD每個Key包含的元素個數
foreach(func):遍歷RDD中所有元素,接收參數為func函數,常用操作是傳入println函數打印所有元素
從HDFS文件生成Spark RDD,經過map、filter、join等多次Transformation操作,最終調用saveAsTextFile Action操作將結果集輸出到HDFS,并以文件形式保存。RDD的流轉過程如圖2-2所示。
▲圖2-2 RDD的流轉過程示意圖
(2)緩存
在Spark中RDD可以緩存到內存或者磁盤上,提供緩存的主要目的是減少同一數據集被多次使用的網絡傳輸次數,提高Spark的計算性能。Spark提供對RDD的多種緩存級別,可以滿足不同場景對RDD的使用需求。RDD的緩存具有容錯性,如果有分區丟失,可以通過系統自動重新計算。
在代碼中可以使用persist()方法或cache()方法緩存RDD。cache()方法默認將RDD緩存到內存中,cache()方法和persist()方法都可以用unpersist()方法來取消RDD緩存。示例如下:
val fileDataRdd = sc.textFile("hdfs://data/hadoop/test.text")
fileDataRdd.cache() // 緩存RDD到內存
或者
fileDataRdd.persist(StorageLevel.MEMORY_ONLY)
fileDataRdd..unpersist() // 取消緩存
Spark的所有緩存級別定義在org.apache.spark.storage.StorageLevel對象中,如下所示。
object storageLevel extends scala.AnyRef with scala.Serializable {
val NONE : org.apache.spark.storage.StorageLevel
val DISK_ONLY : org.apache.spark.storage.StorageLevel
val DISK_ONLY_2 : org.apache.spark.storage.StorageLevel
val MEMORY_ONLY : org.apache.spark.storage.StorageLevel
val MEMORY_ONLY_2 : org.apache.spark.storage.StorageLevel
val MEMORY_ONLY_SER : org.apache.spark.storage.StorageLevel
val MEMORY_ONLY_SER_2 : org.apache.spark.storage.StorageLevel
val MEMORY_AND_DISK : org.apache.spark.storage.StorageLevel
val MEMORY_AND_DISK_2 : org.apache.spark.storage.StorageLevel
val MEMORY_AND_DISK_SER : org.apache.spark.storage.StorageLevel
val MEMORY_AND_DISK_SER_2 : org.apache.spark.storage.StorageLevel
val OFF_HEAP : org.apache.spark.storage.StorageLevel
Spark各緩存級別及其描述:
MEMORY_ONLY:RDD僅緩存一份到內存,此為默認級別
MEMORY_ONLY_2:將RDD分別緩存在集群的兩個節點上,RDD在集群內存中保存兩份
MEMORY_ONLY_SER:將RDD以Java序列化對象的方式緩存到內存中,有效減少了RDD在內存中占用的空間,不過讀取時會消耗更多的CPU資源
DISK_ONLY:RDD僅緩存一份到磁盤
MEMORY_AND_DISK:RDD僅緩存一份到內存,當內存中空間不足時會將部分RDD分區緩存到磁盤
MEMORY_AND_DISK_2:將RDD分別緩存在集群的兩個節點上,當內存中空間不足時會將部分RDD分區緩存到磁盤,RDD在集群內存中保存兩份
MEMORY_AND_DISK_SER:將RDD以Java序列化對象的方式緩存到內存中,當內存中空間不足時會將部分RDD分區緩存到磁盤,有效減少了RDD在內存中占用的空間,不過讀取時會消耗更多的CPU資源
OFF_HEAP:將RDD以序列化的方式緩存到JVM之外的存儲空間Tachyon中,與其他緩存模式相比,減少了JVM垃圾回收開銷。Spark執行程序失敗不會導致數據丟失,Spark與Tachyon已經能較好地兼容,使用起來方便穩定
(3)依賴關系
窄依賴(Narrow Dependency):父RDD的分區只對應一個子RDD的分區,如圖2-3所示,如果子RDD只有部分分區數據損壞或者丟失,只需要從對應的父RDD重新計算恢復。
▲圖2-3 窄依賴示意圖
寬依賴(Shuffle Dependency):子RDD分區依賴父RDD的所有分區,如圖2-4所示。如果子RDD部分分區甚至全部分區數據損壞或丟失,需要從所有父RDD重新計算,相對窄依賴而言付出的代價更高,所以應盡量避免寬依賴的使用。
▲圖2-4 寬依賴示意圖
Lineage:每個RDD都會記錄自己依賴的父RDD信息,一旦出現數據損壞或者丟失將從父RDD迅速重新恢復。
3. 運行模式
Spark運行模式主要有以下幾種:
Local模式:本地采用多線程的方式執行,主要用于開發測試。
On Yarn模式:Spark On Yarn有兩種模式,分別為yarn-client和yarn-cluster模式。yarn-client模式中,Driver運行在客戶端,其作業運行日志在客戶端查看,適合返回小數據量結果集交互式場景使用。yarn-cluster模式中,Driver運行在集群中的某個節點,節點的選擇由YARN調度,作業日志通過yarn管理名稱查看:yarn logs -applicationId,也可以在YARN的Web UI中查看,適合大數據量非交互式場景使用。
提交作業命令:
./bin/spark-submit --class package.MainClass \ # 作業執行主類,需要完成的包路徑
--master spark://host:port, mesos://host:port, yarn, or local\Maste
# 運行方式
---deploy-mode client,cluster\ # 部署模式,如果Master采用YARN模式則可以選擇使用clent模式或者cluster模式,默認client模式
--driver-memory 1g \ # Driver運行內存,默認1G
---driver-cores 1 \ # Driver分配的CPU核個數
--executor-memory 4g \ # Executor內存大小
--executor-cores 1 \ # Executor分配的CPU核個數
---num-executors \ # 作業執行需要啟動的Executor數
---jars \ # 作業程序依賴的外部jar包,這些jar包會從本地上傳到Driver然后分發到各Executor classpath中。
lib/spark-examples*.jar \ # 作業執行JAR包
[other application arguments ] # 程序運行需要傳入的參數
作業在yarn-cluster模式下的執行過程如圖2-5所示。
▲圖2-5 作業在yarn-cluster模式下的執行過程
Client在任何一臺能與Yarn通信的入口機向Yarn提交作業,提交的配置中可以設置申請的資源情況,如果沒有配置則將采用默認配置。
ResourceManager接收到Client的作業請求后,首先檢查程序啟動的ApplicationMaster需要的資源情況,然后向資源調度器申請選取一個能夠滿足資源要求的NodeManager節點用于啟動ApplicationMaster進程,ApplicationMaster啟動成功之后立即在該節點啟動Driver進程。
ApplicationMaster根據提交作業時設置的Executor相關配置參數或者默認配置參數與ResourceManager通信領取Executor資源信息,并與相關NodeManager通信啟動Executor進程。
Executor啟動成功之后與Driver通信領取Driver分發的任務。
Task執行,運行成功輸出結果。
02 Shuffle詳解
Shuffle最早出現于MapReduce框架中,負責連接Map階段的輸出與Reduce階段的輸入。Shuffle階段涉及磁盤IO、網絡傳輸、內存使用等多種資源的調用,所以Shuffle階段的執行效率影響整個作業的執行效率,大部分優化也都是針對Shuffle階段進行的。
Spark是實現了MapReduce原語的一種通用實時計算框架。Spark作業中Map階段的Shuffle稱為Shuffle Write,Reduce階段的Shuffle稱為Shuffle Read。
Shuffle Write階段會將Map Task中間結果數據寫入到本地磁盤,而在Shuffle Read階段中,Reduce Task從Shuffle Write階段拉取數據到內存中并行計算。Spark Shuffle階段的劃分方式如圖2-6所示。
▲圖2-6 Spark Shuffle階段的劃分方式
1. Shuffle Write實現方式
(1)基于Hash的實現(hash-based)
每個Map Task都會生成與Reduce Task數據相同的文件數,對Key取Hash值分別寫入對應的文件中,如圖2-7所示。
生成的文件數FileNum=MapTaskNum×ReduceTaskNum,如果Map Task和Reduce Task數都比較多就會生成大量的小文件,寫文件過程中,每個文件都要占用一部分緩沖區,總占用緩沖區大小TotalBufferSize=CoreNum×ReduceTaskNum×FileBufferSize,大量的小文件就會占用更多的緩沖區,造成不必要的內存開銷,同時,大量的隨機寫操作會大大降低磁盤IO的性能。
▲圖2-7 基于Hash的實現方式
由于簡單的基于Hash的實現方式擴展性較差,內存資源利用率低,過多的小文件在文件拉取過程中增加了磁盤IO和網絡開銷,所以需要對基于Hash的實現方式進行進一步優化,為此引入了Consolidate(合并)機制。
如圖2-8所示,將同一個Core中執行的Task輸出結果寫入到相同的文件中,生成的文件數FileNum=CoreNum×ReduceTaskNum,這種優化方式減少了生成的文件數目,提高了磁盤IO的吞吐量,但是文件緩存占用的空間并沒有減少,性能沒有得到明顯有效的提高。
▲圖2-8 優化后的基于Hash的實現方式
設置方式:
代碼中設置:conf.get("spark.shuffle.manager", "hash")
配置文件中設置:在conf/spark-default.conf配置文件中添加spark.shuffle.managerhash
基于Hash的實現方式的優缺點:
優點:實現簡單,小數量級數據處理操作方便。
缺點:產生小文件過多,內存利用率低,大量的隨機讀寫造成磁盤IO性能下降。
(2)基于Sort的實現方式(sort-based)
為了解決基于Hash的實現方式的諸多問題,Spark Shuffle引入了基于Sort的實現方式,如圖2-9所示。該方式中每個Map Task任務生成兩個文件,一個是數據文件,一個是索引文件,生成的文件數FileNum=MapTaskNum×2.
數據文件中的數據按照Key分區在不同分區之間排序,同一分區中的數據不排序,索引文件記錄了文件中每個分區的偏移量和范圍。當Reduce Task讀取數據時,先讀取索引文件找到對應的分區數據偏移量和范圍,然后從數據文件讀取指定的數據。
設置方式:
代碼中設置:conf.get("spark.shuffle.manager", "sort")
配置文件中設置:在conf/spark-default.conf配置文件中添加spark.shuffle.manager sort
▲圖2-9 基于Sort的實現方式
基于Sort的實現方式的優缺點:
優點:順序讀寫能夠大幅提高磁盤IO性能,不會產生過多小文件,降低文件緩存占用內存空間大小,提高內存使用率。
缺點:多了一次粗粒度的排序。
2. Shuffle Read實現方式
Shuffle Read階段中Task通過直接讀取本地Shuffle Write階段產生的中間結果數據或者通過HTTP的方式從遠程Shuffle Write階段拉取中間結果數據進行處理。Shuffle Write階段基于Hash和基于Sort兩種實現方式產生的中間結果數據在Shuffle Read階段采用同一種實現方式。
獲取需要拉取的數據信息,根據數據本地性原則判斷采用哪種級別的拉取方式。
判斷是否需要在Map端聚合(reduceByKey會在Map端預聚合)。
Shuffle Read階段Task拉取過來的數據如果涉及聚合或者排序,則會使用HashMap結構在內存中存儲,如果拉取過來的數據集在HashMap中已經存在相同的鍵則將數據聚合在一起。此時涉及一個比較重要的參數——spark.shuffle.spill,決定在內存被寫滿后是否將數據以文件的形式寫入到磁盤,默認值為true,如果設置為false,則有可能會發生OOM內存溢出的風險,建議開啟。
排序聚合之后的數據以文件形式寫入磁盤將產生大量的文件內數據有序的小文件,將這些小文件重新加載到內存中,隨后采用歸并排序的方式合并為一個大的數據文件。
本文摘編自《企業大數據處理:Spark、Druid、Flume與Kafka應用實踐》,經出版方授權發布。
數據分析咨詢請掃描二維碼
若不方便掃碼,搜微信號:CDAshujufenxi
CDA數據分析師證書考試體系(更新于2025年05月22日)
2025-05-26解碼數據基因:從數字敏感度到邏輯思維 每當看到超市貨架上商品的排列變化,你是否會聯想到背后的銷售數據波動?三年前在零售行 ...
2025-05-23在本文中,我們將探討 AI 為何能夠加速數據分析、如何在每個步驟中實現數據分析自動化以及使用哪些工具。 數據分析中的AI是什么 ...
2025-05-20當數據遇見人生:我的第一個分析項目 記得三年前接手第一個數據分析項目時,我面對Excel里密密麻麻的銷售數據手足無措。那些跳動 ...
2025-05-20在數字化運營的時代,企業每天都在產生海量數據:用戶點擊行為、商品銷售記錄、廣告投放反饋…… 這些數據就像散落的拼圖,而相 ...
2025-05-19在當今數字化營銷時代,小紅書作為國內領先的社交電商平臺,其銷售數據蘊含著巨大的商業價值。通過對小紅書銷售數據的深入分析, ...
2025-05-16Excel作為最常用的數據分析工具,有沒有什么工具可以幫助我們快速地使用excel表格,只要輕松幾步甚至輸入幾項指令就能搞定呢? ...
2025-05-15數據,如同無形的燃料,驅動著現代社會的運轉。從全球互聯網用戶每天產生的2.5億TB數據,到制造業的傳感器、金融交易 ...
2025-05-15大數據是什么_數據分析師培訓 其實,現在的大數據指的并不僅僅是海量數據,更準確而言是對大數據分析的方法。傳統的數 ...
2025-05-14CDA持證人簡介: 萬木,CDA L1持證人,某電商中廠BI工程師 ,5年數據經驗1年BI內訓師,高級數據分析師,擁有豐富的行業經驗。 ...
2025-05-13CDA持證人簡介: 王明月 ,CDA 數據分析師二級持證人,2年數據產品工作經驗,管理學博士在讀。 學習入口:https://edu.cda.cn/g ...
2025-05-12CDA持證人簡介: 楊貞璽 ,CDA一級持證人,鄭州大學情報學碩士研究生,某上市公司數據分析師。 學習入口:https://edu.cda.cn/g ...
2025-05-09CDA持證人簡介 程靖 CDA會員大咖,暢銷書《小白學產品》作者,13年頂級互聯網公司產品經理相關經驗,曾在百度、美團、阿里等 ...
2025-05-07相信很多做數據分析的小伙伴,都接到過一些高階的數據分析需求,實現的過程需要用到一些數據獲取,數據清洗轉換,建模方法等,這 ...
2025-05-06以下的文章內容來源于劉靜老師的專欄,如果您想閱讀專欄《10大業務分析模型突破業務瓶頸》,點擊下方鏈接 https://edu.cda.cn/g ...
2025-04-30CDA持證人簡介: 邱立峰 CDA 數據分析師二級持證人,數字化轉型專家,數據治理專家,高級數據分析師,擁有豐富的行業經驗。 ...
2025-04-29CDA持證人簡介: 程靖 CDA會員大咖,暢銷書《小白學產品》作者,13年頂級互聯網公司產品經理相關經驗,曾在百度,美團,阿里等 ...
2025-04-28CDA持證人簡介: 居瑜 ,CDA一級持證人國企財務經理,13年財務管理運營經驗,在數據分析就業和實踐經驗方面有著豐富的積累和經 ...
2025-04-27數據分析在當今信息時代發揮著重要作用。單因素方差分析(One-Way ANOVA)是一種關鍵的統計方法,用于比較三個或更多獨立樣本組 ...
2025-04-25CDA持證人簡介: 居瑜 ,CDA一級持證人國企財務經理,13年財務管理運營經驗,在數據分析就業和實踐經驗方面有著豐富的積累和經 ...
2025-04-25