Spark部署

Spark應用程序使用spark-submit(shell命令)來部署在集羣中的Spark應用程序。 它通過一個統一的接口採用全各自的集羣管理器。 因此,您不必每一個應用程序配置。

示例

讓我們同樣以計算字數爲例子,在使用之前,使用shell命令。 在這裏,我們考慮同樣 spark 應用程序的例子。

簡單輸入

下面的文字是輸入數據,並命名該文件爲 in.txt.

people are not as beautiful as they look,
as they walk or as they talk.
they are only as beautiful as they love,
as they care as they share. 

請看下面的程序 −

SparkWordCount.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark._

object SparkWordCount {
def main(args: Array[String]) {

  val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) 

  /\* local = master URL; Word Count = application name; \*/  
  /\* /usr/local/spark = Spark Home; Nil = jars; Map = environment \*/ 
  /\* Map = variables to work nodes \*/ 
  /\*creating an inputRDD to read text file (in.txt) through Spark context\*/ 
  val input = sc.textFile("in.txt") 
  /\* Transform the inputRDD into countRDD \*/ 

  valcount = input.flatMap(line ⇒ line.split(" ")) 
  .map(word ⇒ (word, 1)) 
  .reduceByKey(\_ + \_) 

  /\* saveAsTextFile method is an action that effects on the RDD \*/  
  count.saveAsTextFile("outfile") 
  System.out.println("OK"); 

}
}  

保存上述程序到指定的文件 SparkWordCount.scala 並將其放置在一個用戶定義的目錄名爲 spark-application.

注 − 雖然轉化 inputRDD 成 countRDD  我們使用 flatMap() 用於標記化(從文本文件),行成單詞, map() 方法統計詞頻和 reduceByKey() 方法計算每個單詞的重複。

使用以下步驟來提交應用程序。通過終端在 spark-application目錄中執行所有步驟。

第1步:下載 Spark Ja

Spark需要核心 jar 來編譯,因此,從下面的鏈接下載spark-core_2.10-1.3.0.jar 移動下載 jar 的文件到 spark-application 應用程序目錄。

第2步:編譯程序

使用下面給出的命令編譯上述程序。這個命令應該在spark-application應用程序目錄下執行。這裏,/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar ,Spark 採用了 Hadoop 的 jar 支持程序。

$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala 

第3步:創建 JAR

使用以下 spark 命令應用程序創建 jar 文件。這裏,wordcount 爲 jar 文件的文件名。

jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar

第4步:提交spark應用

使用以下命令提交 spark 應用 −

spark-submit --class SparkWordCount --master local wordcount.jar 

如果成功執行,那麼會發現有下面給出的輸出。在下面輸出的正常用戶識別,這是程序的最後一行。如果仔細閱讀下面的輸出,會發現不同的東西,比如 −

  • 在端口 42954 成功啓動服務 「sparkDriver」

  • MemoryStore 啓動使用容量267.3 MB

  • 啓動SparkUI在 http://192.168.1.217:4040

  • 添加JAR文件:/home/hadoop/piapplication/count.jar

  • ResultStage 1 (saveAsTextFile 在 SparkPi.scala:11) finished in 0.566 s

  • 停止 Spark web用戶界面在 http://192.168.1.217:4040

  • MemoryStore 清理

15/07/08 13:56:04 INFO Slf4jLogger: Slf4jLogger started
15/07/08 13:56:04 INFO Utils: Successfully started service 'sparkDriver' on port 42954.
15/07/08 13:56:04 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.1.217:42954]
15/07/08 13:56:04 INFO MemoryStore: MemoryStore started with capacity 267.3 MB
15/07/08 13:56:05 INFO HttpServer: Starting HTTP Server
15/07/08 13:56:05 INFO Utils: Successfully started service 'HTTP file server' on port 56707.
15/07/08 13:56:06 INFO SparkUI: Started SparkUI at http://192.168.1.217:4040
15/07/08 13:56:07 INFO SparkContext: Added JAR file:/home/hadoop/piapplication/count.jar at http://192.168.1.217:56707/jars/count.jar with timestamp 1436343967029
15/07/08 13:56:11 INFO Executor: Adding file:/tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af/userFiles-df4f4c20-a368-4cdd-a2a7-39ed45eb30cf/count.jar to class loader
15/07/08 13:56:11 INFO HadoopRDD: Input split: file:/home/hadoop/piapplication/in.txt:0+54
15/07/08 13:56:12 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2001 bytes result sent to driver
(MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11), which is now runnable
15/07/08 13:56:12 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (MapPartitionsRDD[5] at saveAsTextFile at SparkPi.scala:11)
15/07/08 13:56:13 INFO DAGScheduler: ResultStage 1 (saveAsTextFile at SparkPi.scala:11) finished in 0.566 s
15/07/08 13:56:13 INFO DAGScheduler: Job 0 finished: saveAsTextFile at SparkPi.scala:11, took 2.892996 s
OK
15/07/08 13:56:13 INFO SparkContext: Invoking stop() from shutdown hook
15/07/08 13:56:13 INFO SparkUI: Stopped Spark web UI at http://192.168.1.217:4040
15/07/08 13:56:13 INFO DAGScheduler: Stopping DAGScheduler
15/07/08 13:56:14 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
15/07/08 13:56:14 INFO Utils: path = /tmp/spark-45a07b83-42ed-42b3-b2c2823d8d99c5af/blockmgr-ccdda9e3-24f6-491b-b509-3d15a9e05818, already present as root for deletion.
15/07/08 13:56:14 INFO MemoryStore: MemoryStore cleared
15/07/08 13:56:14 INFO BlockManager: BlockManager stopped
15/07/08 13:56:14 INFO BlockManagerMaster: BlockManagerMaster stopped
15/07/08 13:56:14 INFO SparkContext: Successfully stopped SparkContext
15/07/08 13:56:14 INFO Utils: Shutdown hook called
15/07/08 13:56:14 INFO Utils: Deleting directory /tmp/spark-45a07b83-42ed-42b3b2c2-823d8d99c5af
15/07/08 13:56:14 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!

第5步:檢查輸出

成功執行程序後,會發現一個名爲outfile在spark-application應用程序目錄。

下面的命令用於在outfile目錄中打開和檢查文件列表。

$ cd outfile
$ ls
Part-00000 part-00001 _SUCCESS 

part-00000 文件檢查輸出命令 −

$ cat part-00000
(people,1)
(are,2)
(not,1)
(as,8)
(beautiful,2)
(they, 7)
(look,1) 

part-00001文件查看輸出命令 −

$ cat part-00001
(walk, 1)
(or, 1)
(talk, 1)
(only, 1)
(love, 1)
(care, 1)
(share, 1)

通過下面的部分更多地瞭解「spark-submit」命令。

Spark-submit 語法

spark-submit [options] <app jar | python file> [app arguments]

選項

下面給出描述選項列表 −

S.No

選項

描述

1

--master

spark://host:port, mesos://host:port, yarn, 或 local.

2

--deploy-mode

無論是在本地啓動驅動程序(client),或在工作人員的機器中的一個集羣內 ("cluster") (默認: client)

3

--class

應用程序的主類(適用於 Java/Scala 的應用程序)

4

--name

應用程序的名稱

5

--jars

以逗號分隔本地 jar 列表包括驅動器和執行者類路徑

6

--packages

逗號分隔 jar 的 Maven 座標系列表,包括驅動器和執行者類路徑

7

--repositories

逗號分隔額外遠程存儲庫列表搜索Maven給定的座標,使用 --packages

8

--py-files

用逗號分隔 .zip,.egg 或.py文件的列表放在Python路徑中的 Python 應用程序

9

--files

逗號分隔放置在每一個執行者的工作目錄中的文件的列表

10

--conf (prop=val)

任意 Spark 配置屬性

11

--properties-file

路徑從一個文件來加載額外屬性。如果沒有指定,這將在 conf/spark-defaults 尋找默認值

12

--driver-memory

存儲驅動程序 (e.g. 1000M, 2G) (默認: 512M)

13

--driver-java-options

額外的Java選項傳遞給驅動程序

14

--driver-library-path

額外的庫路徑條目傳遞給驅動程序

15

--driver-class-path

額外的類路徑條目傳遞給驅動程序

需要注意的是使用 --jars 添加 jar 會自動包含在類路徑中

16

--executor-memory

每個執行者的內存(e.g. 1000M, 2G) (默認: 1G)

17

--proxy-user

用戶在提交申請時模仿

18

--help, -h

顯示此幫助信息並退出

19

--verbose, -v

打印額外的調試輸出

20

--version

打印當前 Spark 版本

21

--driver-cores NUM

核心驅動程序(默認值:1)

22

--supervise

如果給定,重新啓動對故障的驅動程序

23

--kill

如果給定,殺死指定的驅動程序

24

--status

如果給定,請求指定的驅動程序的狀態

25

--total-executor-cores

爲所有執行者的核心總數

26

--executor-cores

每執行者內核的數量。 (默認值:1是YARN模式,或在獨立模式下,工人利用多內核)