cao死我好湿好紧好爽动态视屏|精选久久久久久久久久|中文无码精品一区二区三区四季|AAA国语精品刺激对白视频|

當前位置:首頁 > 軟件開放 > 正文內(nèi)容

如何閱讀spark源碼(spark源碼看什么書)

軟件開放2年前 (2023-02-24)1276

本篇文章給大家談?wù)勅绾伍喿xspark源碼,以及spark源碼看什么書對應(yīng)的知識點,希望對各位有所幫助,不要忘了收藏本站喔。

本文目錄一覽:

如何成為Spark高手

  第一階段:熟練掌握Scala語言

1,spark框架是采用scala語言寫的,精致優(yōu)雅。想要成為spark高手,你就必須閱讀spark源碼,就必須掌握scala。?

2,雖然現(xiàn)在的spark可以使用多種語言開發(fā),java,python,但是最快速和支持最好的API依然并將永遠是Scala的API,所以必須掌握scala來編寫復(fù)雜的和高性能的spark分布式程序。?

3尤其是熟練掌握Scala的trait,apply,函數(shù)式編程,泛型,逆變,與協(xié)變等。

第二階段:精通spark平臺本身提供給開發(fā)折的API

1,掌握spark中面向RDD的開發(fā)模式,掌握各種transformation和action函數(shù)的使用。?

2,掌握Spark中的款依賴和窄依賴,lineage機制。?

3,掌握RDD的計算流程,如Stage的劃分,spark應(yīng)用程序提交給集群的基礎(chǔ)過程和Work節(jié)點基礎(chǔ)的工作原理。

  第三階段:深入Spark內(nèi)核

此階段主要是通過Spark框架的源碼研讀來深入Spark內(nèi)核部分:?

1,通過源碼掌握Spark的任務(wù)提交,?

2,通過源碼掌握Spark的集群的任務(wù)調(diào)度,?

3,尤其要精通DAGScheduler,TaskScheduler和Worker節(jié)點內(nèi)部的工作的每一步細節(jié)。

第四階段:掌握Spark上的核心框架的使用

Spark作為云計算大數(shù)據(jù)時代的集大成者,在實時流式處理,圖技術(shù),機器學(xué)習(xí),nosql查詢等方面具有明顯的優(yōu)勢,我們使用Spark的時候大部分時間都是在使用其框架:?

sparksql,spark streaming等?

1,spark streaming是出色的實時流失處理框架,要掌握,DStream,transformation和checkpoint等。?

2,spark sql是離線統(tǒng)計分析工具,shark已經(jīng)沒落。?

3,對于spark中的機器學(xué)習(xí)和Graphx等要掌握其原理和用法。

  第五階段:做商業(yè)級的spark項目

通過一個完整的具有代表性的spark項目來貫穿spark的方方面面,包括項目的框架設(shè)計,用到的技術(shù)的剖析,開始實現(xiàn),運維等,完善掌握其中的每一個階段和細節(jié),以后你就可以從容的面對絕大多數(shù)spark項目。

  第六階段:提供spark解決方案

1,徹底掌握spark框架源碼的每一個細節(jié),?

2,根據(jù)步同的業(yè)務(wù)場景的需要提供spark在不同場景的解決方案,?

3,根據(jù)實際需要,在spark框架基礎(chǔ)上經(jīng)行2次開發(fā),打造自己的spark框架。

可能是全網(wǎng)最詳細的 Spark Sql Aggregate 源碼剖析

縱觀 Spark Sql 源碼,聚合的實現(xiàn)是其中較為復(fù)雜的部分,本文希望能以例子結(jié)合流程圖的方式來說清楚整個過程。這里僅關(guān)注 Aggregate 在物理執(zhí)行計劃相關(guān)的內(nèi)容,之前的 parse、analyze 及 optimize 階段暫不做分析。在 Spark Sql 中,有一個專門的 Aggregation strategy 用來處理聚合,我們先來看看這個策略。

本文暫不討論 distinct Aggregate 的實現(xiàn)(有興趣的可以看看另一篇博文 ),我們來看看 AggUtils#planAggregateWithoutDistinct 是如何生成聚合的物理執(zhí)行計劃的

創(chuàng)建聚合分為兩個階段:

AggregateExpression 共有以下幾種 mode:

Q:是否支持使用 hash based agg 是如何判斷的?

摘自我另一篇文章:

為了說明最常用也是最復(fù)雜的的 hash based agg,本小節(jié)暫時將示例 sql 改為

這樣就能進入 HashAggregateExec 的分支

構(gòu)造函數(shù)主要工作就是對 groupingExpressions、aggregateExpressions、aggregateAttributes、resultExpressions 進行了初始化

在 enable code gen 的情況下,會調(diào)用 HashAggregateExec#inputRDDs 來生成 RDD,為了分析 HashAggregateExec 是如何生成 RDD 的,我們設(shè)置 spark.sql.codegen.wholeStage 為 false 來 disable code gen,這樣就會調(diào)用 HashAggregateExec#doExecute 來生成 RDD,如下:

可以看到,關(guān)鍵的部分就是根據(jù) child.execute() 生成的 RDD 的每一個 partition 的迭代器轉(zhuǎn)化生成一個新的 TungstenAggregationIterator ,即 HashAggregateExec 生成的 RDD 的各個 partition。由于 TungstenAggregationIterator 涉及內(nèi)容非常多,我們單開一大節(jié)來進行介紹。

此迭代器:

注:UnsafeKVExternalSorter 的實現(xiàn)可以參考:

UnsafeRow 是 InternalRow(表示一行記錄) 的 unsafe 實現(xiàn),由原始內(nèi)存(byte array)而不是 Java 對象支持,由三個區(qū)域組成:

使用 UnsafeRow 的收益:

構(gòu)造函數(shù)的主要流程已在上圖中說明,需要注意的是:當內(nèi)存不足時(畢竟每個 grouping 對應(yīng)的 agg buffer 直接占用內(nèi)存,如果 grouping 非常多,或者 agg buffer 較大,容易出現(xiàn)內(nèi)存用盡)會從 hash based aggregate 切換為 sort based aggregate(會 spill 數(shù)據(jù)到磁盤),后文會進行詳述。先來看看最關(guān)鍵的 processInputs 方法的實現(xiàn)

上圖中,需要注意的是:hashMap 中 get 一個 groupingKey 對應(yīng)的 agg buffer 時,若已經(jīng)存在該 buffer 則直接返回;若不存在,嘗試申請內(nèi)存新建一個:

上圖中,用于真正處理一條 row 的 AggregationIterator#processRow 還需進一步展開分析。在此之前,我們先來看看 AggregateFunction 的分類

AggregateFunction 可以分為 DeclarativeAggregate 和 ImperativeAggregate 兩大類,具體的聚合函數(shù)均為這兩類的子類。

DeclarativeAggregate 是一類直接由 Catalyst 中的 Expressions 構(gòu)成的聚合函數(shù),主要邏輯通過調(diào)用 4 個表達式完成,分別是:

我們再次以容易理解的 Count 來舉例說明:

通常來講,實現(xiàn)一個基于 Expressions 的 DeclarativeAggregate 函數(shù)包含以下幾個重要的組成部分:

再來看看 AggregationIterator#processRow

AggregationIterator#processRow 會調(diào)用

生成用于處理一行數(shù)據(jù)(row)的函數(shù)

說白了 processRow 生成了函數(shù)才是直接用來接受一條 input row 來更新對應(yīng)的 agg buffer,具體是根據(jù) mode 及 aggExpression 中的 aggFunction 的類型調(diào)用其 updateExpressions 或 mergeExpressions 方法:

比如,對于 aggFunction 為 DeclarativeAggregate 類型的 Partial 下的 Count 來說就是調(diào)用其 updateExpressions 方法,即:

對于 Final 的 Count 來說就是調(diào)用其 mergeExpressions 方法,即:

對于 aggFunction 為 ImperativeAggregate 類型的 Partial 下的 Collect 來說就是調(diào)用其 update 方法,即:

對于 Final 的 Collect 來說就是調(diào)用其 merge 方法,即:

我們都知道,讀取一個迭代器的數(shù)據(jù),是要不斷調(diào)用 hasNext 方法進行 check 是否還有數(shù)據(jù),當該方法返回 true 的時候再調(diào)用 next 方法取得下一條數(shù)據(jù)。所以要知道如何讀取 TungstenAggregationIterator 的數(shù)據(jù),就得分析其這兩個方法。

分為兩種情況,分別是:

Agg 的實現(xiàn)確實復(fù)雜,本文雖然篇幅已經(jīng)很長,但還有很多方面沒有 cover 到,但基本最核心、最復(fù)雜的點都詳細介紹了,如果對于未 cover 的部分有興趣,請自行閱讀源碼進行分析~

spark sql 2.3 源碼解讀 - Execute (7)

終于到了最后一步執(zhí)行了:

最關(guān)鍵的兩個函數(shù)便是 doPrepare和 doExecute了。

還是以上一章的sql語句為例,其最終生成的sparkplan為:

看一下SortExec的doPrepare 和 doExecute方法:

下面看child也就是ShuffleExchangeExec:

先看沒有exchangeCoordinator的情況,

首先執(zhí)行:

上面的方法會返回一個ShuffleDependency,ShuffleDependency中最重要的是rddWithPartitionIds,它決定了每一條InternalRow shuffle后的partition id:

接下來:

返回結(jié)果是ShuffledRowRDD:

CoalescedPartitioner的邏輯:

再看有exchangeCoordinator的情況:

同樣返回的是ShuffledRowRDD:

再看doEstimationIfNecessary:

estimatePartitionStartIndices 函數(shù)得到了 partitionStartIndices:

有exchangeCoordinator的情況就生成了partitionStartIndices,從而對分區(qū)進行了調(diào)整。

最后來一個例子:

未開啟exchangeCoordinator的plan:

開啟exchangeCoordinator的plan:

不同之處是 兩個Exchange都帶了coordinator,且都是同一個coordinator。

執(zhí)行withExchangeCoordinator前:

執(zhí)行withExchangeCoordinator后:

生成了coordinator,且執(zhí)行了 doPrepare后,可以看到兩個exchange都向其注冊了。

doExecute后:

原先的numPartitions是200,經(jīng)過執(zhí)行后,生成的partitionStartIndices為[1],也就是只有1個partition,顯然在測試數(shù)據(jù)量很小的情況下,1個partition是更為合理的。這就是ExchangeCoordinator的功勞。

execute 最終的輸出是rdd,剩下的結(jié)果便是spark對rdd的運算了。其實 spark sql 最終的目標便也是生成rdd,交給spark core來運算。

spark sql的介紹到這里就結(jié)束了。

怎么用Eclipse搭建Spark源碼閱讀環(huán)境

應(yīng)該說這個和是不是Spark項目沒什么關(guān)系。

建議你使用intellij idea,在spark目錄下執(zhí)行"sbt/sbt gen-idea",會自動生成.idea項目,導(dǎo)入即可。

idea我不熟,還需要做一些其他的插件配置(python, sbt等)和環(huán)境設(shè)置。

你也可以使用Eclipse看,Eclipse有scala IDE,把Spark項目當maven工程導(dǎo)入。但是子項目之間的依賴會有點問題,會報錯。

推薦使用前者,向Databricks的開發(fā)者看齊;我使用的是后者,我直接依賴了編譯好的包就不會報錯了,純讀源碼的話也勉強可以跟蹤和調(diào)試。

另外,我也看有的Committer用vim看spark代碼的,所以怎么看源碼都無所謂,你熟悉就好,而且這和是不是Spark項目也沒什么關(guān)系。:)

怎么在Idea IDE里面打開Spark源碼而不報錯

首先我們先點擊一個工程的Project Structure菜單,這時候會彈出一個對話框,仔細的用戶肯定會發(fā)現(xiàn)里面列出來的模塊(Module)居然沒有yarn!就是這個原因?qū)е聐arn模塊相關(guān)的代碼老是報錯!只需要將yarn模塊加入到這里即可。

步驟依次選擇 Add-Import Module-選擇pom.xml,然后一步一步點擊確定,這時候會在對話框里面多了spark-yarn_2.10模塊,

然后點擊Maven Projects里面的Reimport All Maven Projects,等yarn模塊里面的所有依賴全部下載完的時候,我們就可以看到這個模塊里面的代碼終于不再報錯了?。?/p>

Spark源碼分析之SparkSubmit的流程

本文主要對SparkSubmit的任務(wù)提交流程源碼進行分析。 Spark源碼版本為2.3.1。

首先閱讀一下啟動腳本,看看首先加載的是哪個類,我們看一下 spark-submit 啟動腳本中的具體內(nèi)容。

可以看到這里加載的類是org.apache.spark.deploy.SparkSubmit,并且把啟動相關(guān)的參數(shù)也帶過去了。下面我們跟一下源碼看看整個流程是如何運作的...

SparkSubmit的main方法如下

這里我們由于我們是提交作業(yè),所有會走上面的submit(appArgs, uninitLog)方法

可以看到submit方法首先會準備任務(wù)提交的環(huán)境,調(diào)用了prepareSubmitEnvironment,該方法會返回四元組,該方法中會調(diào)用doPrepareSubmitEnvironment,這里我們重點注意 childMainClass類具體是什么 ,因為這里涉及到后面啟動我們主類的過程。

以下是doPrepareSubmitEnvironment方法的源碼...

可以看到該方法首先是解析相關(guān)的參數(shù),如jar包,mainClass的全限定名,系統(tǒng)配置,校驗一些參數(shù),等等,之后的關(guān)鍵點就是根據(jù)我們 deploy-mode 參數(shù)來判斷是如何運行我們的mainClass,這里主要是通過childMainClass這個參數(shù)來決定下一步首先啟動哪個類。

childMainClass根據(jù)部署模型有不同的值:

之后該方法會把準備好的四元組返回,我們接著看之前的submit方法

可以看到這里最終會調(diào)用doRunMain()方法去進行下一步。

doRunMain的實現(xiàn)如下...

doRunMain方法中會判斷是否需要一個代理用戶,然后無論需不需要都會執(zhí)行runMain方法,我們接下來看看runMain方法是如何實現(xiàn)的。

這里我們只假設(shè)以集群模式啟動,首先會加載類,將我們的childMainClass加載為字節(jié)碼對象mainClass ,然后將mainClass 映射成SparkApplication對象,因為我們以集群模式啟動,那么上一步返回四元組中的childMainClass的參數(shù)為ClientApp的全限定名,而這里會調(diào)用app實例的start方法因此,這里最終調(diào)用的是ClientApp的start方法。

ClientApp的start方法如下...

可以看到這里和之前我們的master啟動流程有些相似。

可以參考我上一篇文章 Spark源碼分析之Master的啟動流程 對這一流程加深理解。

首先是準備rpcEnv環(huán)境,之后通過master的地址獲取masterEndpoints端點相關(guān)信息,因為這里運行start方法時會將之前配置的相關(guān)參數(shù)都傳進來,之后就會通過rpcEnv注冊相關(guān)clientEndPoint端點信息,同時需要注意,這里會把masterEndpoints端點信息也作為構(gòu)造ClientEndpoint端點的參數(shù),也就是說這個ClientEndpoint會和masterEndpoints通信。

而在我上一篇文章中說過,只要是setupEndpoint方法被調(diào)用,一定會調(diào)用相關(guān)端點的的onStart方法,而這會調(diào)用clientEndPoint的onStart方法。

ClientEndPoint類中的onStart方法會匹配launch事件。源碼如下

onStart中匹配我們的launch的過程,這個過程是啟動driverWrapper的過程,可以看到上面源碼中封裝了mainClass ,該參數(shù)對應(yīng)DriverWrapper類的全限定名,之后將mainClass封裝到command中,然后封裝到driverDescription中,向Master申請啟動Driver。

這個過程會向Mster發(fā)送消息,是通過rpcEnv來實現(xiàn)發(fā)射消息的,而這里就涉及到outbox信箱,會調(diào)用postToOutbox方法,向outbox信箱中添加消息,然后通過TransportClient的send或sendRpc方法發(fā)送消息。發(fā)件箱以及發(fā)送過程是在同一個線程中進行。

而細心的同學(xué)會注意到這里調(diào)用的方法名為SendToMasterAndForwardReply,見名之意,發(fā)送消息到master并且期待回應(yīng)。

下面是rpcEnv來實現(xiàn)向遠端發(fā)送消息的一個調(diào)用流程,最終會通過netty中的TransportClient來寫出。

之后,Master端會觸發(fā)receiveAndReply函數(shù),匹配RequestSubmitDriver樣例類,完成模式匹配執(zhí)行后續(xù)流程。

可以看到這里首先將Driver信息封裝成DriverInfo,然后添加待調(diào)度列表waitingDrivers中,然后調(diào)用通用的schedule函數(shù)。

由于waitingDrivers不為空,則會走LaunchDriver的流程,當前的application申請資源,這時會向worker發(fā)送消息,觸發(fā)Worker的receive方法。

Worker的receive方法中,當Worker遇到LaunchDriver指令時,創(chuàng)建并啟動一個DriverRunner,DriverRunner啟動一個線程,異步的處理Driver啟動工作。這里說啟動的Driver就是剛才說的org.apache.spark.deploy.worker.DriverWrapper

可以看到上面在DriverRunner中是開辟線程異步的處理Driver啟動工作,不會阻塞主進程的執(zhí)行,而prepareAndRunDriver方法中最終調(diào)用 runDriver..

runDriver中主要先做了一些初始化工作,接著就開始啟動driver了。

上述Driver啟動工作主要分為以下幾步:

下面我們直接看DriverWrapper的實現(xiàn)

DriverWrapper,會創(chuàng)建了一個RpcEndpoint與RpcEnv,RpcEndpoint為WorkerWatcher,主要目的為監(jiān)控Worker節(jié)點是否正常,如果出現(xiàn)異常就直接退出,然后當前的ClassLoader加載userJar,同時執(zhí)行userMainClass,在執(zhí)行用戶的main方法后關(guān)閉workerWatcher。

以上就是SparkSubmit的流程,下一篇我會對SparkContext的源碼進行解析。

歡迎關(guān)注...

如何閱讀spark源碼的介紹就聊到這里吧,感謝你花時間閱讀本站內(nèi)容,更多關(guān)于spark源碼看什么書、如何閱讀spark源碼的信息別忘了在本站進行查找喔。

掃描二維碼推送至手機訪問。

版權(quán)聲明:本文由飛速云SEO網(wǎng)絡(luò)優(yōu)化推廣發(fā)布,如需轉(zhuǎn)載請注明出處。

本文鏈接:http://m.smallwaterjetsystem.com/post/9796.html

“如何閱讀spark源碼(spark源碼看什么書)” 的相關(guān)文章

軟件開發(fā)詳細設(shè)計文檔(軟件開發(fā)詳細設(shè)計文檔自動生成工具)

軟件開發(fā)詳細設(shè)計文檔(軟件開發(fā)詳細設(shè)計文檔自動生成工具)

本篇文章給大家談?wù)勡浖_發(fā)詳細設(shè)計文檔,以及軟件開發(fā)詳細設(shè)計文檔自動生成工具對應(yīng)的知識點,希望對各位有所幫助,不要忘了收藏本站喔。 本文目錄一覽: 1、軟件開發(fā)文檔的分類 2、軟件開發(fā)中詳細設(shè)計文檔現(xiàn)在是必須的么?如果不是用什么取代? 3、軟件開發(fā)需要編寫哪些文檔? 軟件開發(fā)文檔的分類...

2萬粉絲一天收入(1萬粉絲一天收入)

2萬粉絲一天收入(1萬粉絲一天收入)

本篇文章給大家談?wù)?萬粉絲一天收入,以及1萬粉絲一天收入對應(yīng)的知識點,希望對各位有所幫助,不要忘了收藏本站喔。 本文目錄一覽: 1、抖音上有2千萬粉絲可以賺多少錢 2、162萬粉絲看直播2萬人,一月收入多少 3、抖音二萬多粉絲能賣多少錢 4、抖音粉絲二千萬收入多少 5、兩萬粉絲快手號...

游咔破解版(嗶咔破解游戲)

游咔破解版(嗶咔破解游戲)

今天給各位分享游咔破解版的知識,其中也會對嗶咔破解游戲進行解釋,如果能碰巧解決你現(xiàn)在面臨的問題,別忘了關(guān)注本站,現(xiàn)在開始吧!本文目錄一覽: 1、游咔游戲盒子為什么一直要刷新 2、游咔加載不出來為什么 3、游咔密碼到底是什么 4、游咔怎么更新 5、游咔游戲盒app打不開 游咔游戲盒子...

手機瀏覽器顯示不全網(wǎng)頁內(nèi)容(手機瀏覽器不能顯示全部網(wǎng)頁內(nèi)容)

手機瀏覽器顯示不全網(wǎng)頁內(nèi)容(手機瀏覽器不能顯示全部網(wǎng)頁內(nèi)容)

本篇文章給大家談?wù)勈謾C瀏覽器顯示不全網(wǎng)頁內(nèi)容,以及手機瀏覽器不能顯示全部網(wǎng)頁內(nèi)容對應(yīng)的知識點,希望對各位有所幫助,不要忘了收藏本站喔。 本文目錄一覽: 1、手機網(wǎng)頁顯示不完整怎么辦? 2、手機瀏覽電腦版網(wǎng)頁顯示不全 手機瀏覽電腦版網(wǎng)頁顯示不全怎么辦 3、存龍網(wǎng)站手機打開不全 4、手機網(wǎng)頁...

少兒編程課程收費標準(少兒編程課程收費標準表)

少兒編程課程收費標準(少兒編程課程收費標準表)

本篇文章給大家談?wù)勆賰壕幊陶n程收費標準,以及少兒編程課程收費標準表對應(yīng)的知識點,希望對各位有所幫助,不要忘了收藏本站喔。 本文目錄一覽: 1、少兒編程培訓(xùn)需要多少錢? 2、核桃編程收費標準? 3、學(xué)編程要多少錢學(xué)費 4、少兒學(xué)編程要多少錢?大家伙認可的是? 少兒編程培訓(xùn)需要多少錢? 不...

體育直播app大全免費下載(中國體育直播app下載)

體育直播app大全免費下載(中國體育直播app下載)

本篇文章給大家談?wù)勼w育直播app大全免費下載,以及中國體育直播app下載對應(yīng)的知識點,希望對各位有所幫助,不要忘了收藏本站喔。 本文目錄一覽: 1、體育直播的軟件有哪些?用哪個最好,求推薦 2、請問體育直播的網(wǎng)站有哪些啊 3、有什么手機軟件可以看nba直播 體育直播的軟件有哪些?用哪個最好...