《Spark 官方文档》Spark独立模式

spark 1.6.0 [原文地址]

Spark独立模式

Spark除了可以在Mesos和YARN集群上运行之外,还支持一种简单的独立部署模式。独立部署模式下,你既可以手工启动(手动运行master和workers),也可以利用我们提供的启动脚本(launch scripts)。同时,独立部署模式下,你可以在单机上运行这些程序,以方便测试。

Spark集群独立安装

要独立安装Spark,你只需要将编译好的Spark包复制到集群中每一个节点上即可。你可以下载一个编译好的Spark版本,也可以在这里自己编译一个版本(build it yourself)。

手动启动集群

执行以下命令,启动独立部署的master server:

./sbin/start-master.sh

一旦启动完成,master会打印出master URL(spark://HOST:PORT),后续worker需要用这个URL来连接master,写代码的时候SparkContext中的master参数也需要设置成这个master URL。你还可以在master的web UI(默认为http://localhost:8080)上查看master URL。

类似地,你可以通过以下命令,启动一个或多个worker节点,并将其连接到master:

./sbin/start-slave.sh <master-spark-URL>

启动一个worker以后,刷新一下master的web UI(默认为http://localhost:8080),你应该可以在这里看到一个新的节点。

最后,以下配置选项将会被传给master和worker:

参数 含义
-h HOST--host HOST 监听的主机名
-i HOST--ip HOST 监听的主机名(已经废弃,请使用-h 或者 –host)
-p PORT--port PORT 服务监听的端口(master节点默认7077,worker节点随机)
--webui-port PORT web UI端口(master节点默认8080,worker节点默认8081)
-c CORES--cores CORES 单节点上,Spark应用能够使用的CPU core数上限(默认,等于CPU core的个数);仅worker节点有效
-m MEM--memory MEM 单节点上,Spark应用能够使用的内存上限,格式为1000M 或者 2G(默认为,机器上所有内存减去1G);仅worker节点有效
-d DIR--work-dir DIR 工作目录,同时job的日志也输出到该目录(默认:${SPAKR_HOME}/work);仅worker节点有效
--properties-file FILE 自定义Spark属性文件的加载路径(默认:conf/spark-defaults.conf)

集群启动脚本

要使用启动脚本,来启动一个Spark独立部署集群,首先你需要在Spark目录下创建一个文件conf/slaves,并且在文件中写入你需要作为worker节点启动的每一台机器主机名(或IP),每行一台机器。如果conf/slaves文件不存在,启动脚本会默认会用单机方式启动,这种方式对测试很有帮助。注意,master节点访问各个worker时使用ssh。默认情况下,你需要配置ssh免密码登陆(使用秘钥文件)。如果你没有设置免密码登陆,那么你也可以通过环境变量SPARK_SSH_FOREGROUND来一个一个地设置每个worker的密码。

设置好conf/slaves文件以后,你就可以用一下shell脚本来启动或停止集群了,类似于Hadoop的部署,这些脚本都在${SPARK_HOME}/sbin目录下:

  • sbin/start-master.sh – 在本机启动一个master实例
  • sbin/start-slaves.sh – 在conf/slaves文件所指定的每一台机器上都启动一个slave实例
  • sbin/start-slave.sh – 在本机启动一个slave实例
  • sbin/start-all.sh – 启动一个master和多个slave实例,详细见上面的描述。
  • sbin/stop-master.sh – 停止 start-master.sh所启动的master实例
  • sbin/stop-slaves.sh – 停止所有在conf/slaves中指定的slave实例
  • sbin/stop-all.sh – 停止master节点和所有slave节点,详细见上面的描述

注意,这些脚本都需要在你启动Spark master的机器上运行,而不是你的本地机器。

Spark独立部署集群的其他可选的环境变量见conf/spark-env.sh。你可以通过复制conf/spark-env.sh.template来创建这个文件,同时你还需要将配置好的文件复制到所有的worker节点上。以下是可用的设置:

环境变量 含义
SPARK_MASTER_IP master实例绑定的IP地址,例如,绑定到一个公网IP
SPARK_MASTER_PORT mater实例绑定的端口(默认7077)
SPARK_MASTER_WEBUI_PORT master web UI的端口(默认8080)
SPARK_MASTER_OPTS master专用配置属性,格式如”-Dx=y” (默认空),可能的选项请参考下面的列表。
SPARK_LOCAL_DIRS Spark的本地工作目录,包括:映射输出的临时文件和RDD保存到磁盘上的临时数据。这个目录需要快速访问,最好设成本地磁盘上的目录。也可以通过使用逗号分隔列表,将其设成多个磁盘上的不同路径。
SPARK_WORKER_CORES 本机上Spark应用可以使用的CPU core上限(默认所有CPU core)
SPARK_WORKER_MEMORY 本机上Spark应用可以使用的内存上限,如:1000m,2g(默认为本机所有内存减去1GB);注意每个应用单独使用的内存大小要用 spark.executor.memory 属性配置的。
SPARK_WORKER_PORT Spark worker绑定的端口(默认随机)
SPARK_WORKER_WEBUI_PORT worker web UI端口(默认8081)
SPARK_WORKER_INSTANCES 每个slave机器上启动的worker实例个数(默认:1)。如果你的slave机器非常强劲,可以把这个值设为大于1;相应的,你需要设置SPARK_WORKER_CORES参数来显式地限制每个worker实例使用的CPU个数,否则每个worker实例都会使用所有的CPU。
SPARK_WORKER_DIR Spark worker的工作目录,包括worker的日志以及临时存储空间(默认:${SPARK_HOME}/work)
SPARK_WORKER_OPTS worker的专用配置属性,格式为:”-Dx=y”,可能的选项请参考下面的列表。
SPARK_DAEMON_MEMORY Spark master和worker后台进程所使用的内存(默认:1g)
SPARK_DAEMON_JAVA_OPTS Spark master和workers后台进程所使用的JVM选项,格式为:”-Dx=y”(默认空)
SPARK_PUBLIC_DNS Spark master和workers使用的公共DNS(默认空)

注意: 启动脚本目前不支持Windows。如需在Windows上运行,请手工启动master和workers。

SPARK_MASTER_OPTS支持以下属性:

属性名 默认值 含义
spark.deploy.retainedApplications 200 web UI上最多展示几个已结束应用。更早的应用的数将被删除。
spark.deploy.retainedDrivers 200 web UI上最多展示几个已结束的驱动器。更早的驱动器进程数据将被删除。
spark.deploy.spreadOut true 独立部署集群的master是否应该尽可能将应用分布到更多的节点上;设为true,对数据本地性支持较好;设为false,计算会收缩到少数几台机器上,这对计算密集型任务比较有利。
spark.deploy.defaultCores (无限制) Spark独立模式下应用程序默认使用的CPU个数(没有设置spark.cores.max的情况下)。如果不设置,则为所有可用CPU个数(除非设置了spark.cores.max)。如果集群是共享的,最好将此值设小一些,以避免用户占满整个集群。
spark.worker.timeout 60 如果master没有收到worker的心跳,那么将在这么多秒之后,master将丢弃该worker。

SPARK_WORKER_OPTS支持以下属性:

属性名 默认值 含义
spark.worker.cleanup.enabled false 是否定期清理 worker 和应用的工作目录。注意,该设置仅在独立模式下有效,YARN有自己的清理方式;同时,只会清理已经结束的应用对应的目录。
spark.worker.cleanup.interval 1800 (30 minutes) worker清理本地应用工作目录的时间间隔(秒)
spark.worker.cleanup.appDataTtl 7 * 24 * 3600 (7 days) 清理多久以前的应用的工作目录。这个选项值将取决于你的磁盘总量。spark应用会将日志和jar包都放在其对应的工作目录下。随着时间流逝,应用的工作目录很快会占满磁盘,尤其是在你的应用提交比较频繁的情况下。

连接到集群

要在Spark集群上运行一个应用,只需把spark://IP:PORT这个master URL传给SparkContext(参考SparkContext constructor

如需要运行交互式的spark shell,运行如下命令:

./bin/spark-shell --master spark://IP:PORT

你也可以通过设置选线 –total-executor-cores <numCores> 来控制spark-shell在集群上使用的CPU总数。

启动Spark应用

spark-submit脚本(spark-submit script )是提交spark应用最简洁的方式。对于独立安装的集群来说,spark目前支持两种运行模式。客户端(client)模式下,驱动器进程(driver)将在提交应用的机器上启动。而在集群(cluster)模式下,驱动器(driver)将会在集群中的某一台worker上启动,同时提交应用的客户端在提交动作完成之后立即退出,而不会等到Spark应用运行结束。

如果你的应用时通过spark-submit提交启动的,那么应用对应的jar包会自动发布到所有的worker节点上。任何额外的依赖项jar包,都必须在–jars参数中指明,并以逗号分隔(如:–jars jar1,jar2)。

另外,独立安装的集群还支持异常退出(返回值非0)时自动重启。要启用这个特性,你需要在spark-submit时指定–supervise参数。其后,如果你需要杀掉一个重复失败的应用,你可能需要运行如下指令:

./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>

你可以在master web UI(http://<master url>:8080)上查看驱动器ID。

资源调度

独立安装集群目前只支持简单的先进先出(FIFO)调度器。这个调度器可以支持多用户,你可以控制每个应用所使用的最大资源。默认情况下,Spark应用会申请集群中所有的CPU,这不太合理,除非你的进群同一时刻只运行一个应用。你可以通过SparkConf中的spark.cores.max,来设置一个CPU帽子以限制其使用的CPU总数。例如:

val conf = new SparkConf()
             .setMaster(...)
             .setAppName(...)
             .set("spark.cores.max", "10")
val sc = new SparkContext(conf)

另外,你也可以通过conf/spark-env.sh中的spark.deploy.defaultCores设置应用默认使用的CPU个数(特别针对没有设置spark.cores.max的应用)。

export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"

在一些共享的集群上,用户很可能忘记单独设置一个最大CPU限制,那么这个参数将很有用。

监控和日志

Spark独立安装模式提供了一个基于web的集群监控用户界面。master和每个worker都有其对应的web UI,展示集群和Spark作业的统计数据。默认情况下,你可以在master机器的8080端口上访问到这个web UI。这个端口可以通过配置文件或者命令行来设置。

另外,每个作业的详细日志,将被输出到每个slave节点上的工作目录下(默认为:${SPARK_HOME}/work)。每个Spark作业下都至少有两个日志文件,stdout和stderr,这里将包含所有的输出到控制台的信息。

和Hadoop同时运行

你可以让Spark和已有的Hadoop在同一集群上同时运行,只需要将Spark作为独立的服务启动即可。这样Spark可以通过hdfs:// URL来访问Hadoop上的数据(通常情况下是,hdfs://<namenode>:9000/path,你可以在Hadoop Namenode的web UI上找到正确的链接)。当然,你也可以为Spark部署一个独立的集群,这时候Spark仍然可以通过网络访问HDFS上的数据;这会比访问本地磁盘慢一些,但如果Spark和Hadoop集群都在同一个本地局域网内的话,问题不大(例如,你可以在Hadoop集群的每个机架上新增一些部署Spark的机器)。

网络安全端口配置

Spark会大量使用网络资源,而有些环境会设置严密的防火墙设置,以严格限制网络访问。完整的端口列表,请参考这里:security page.

高可用性

默认情况下,独立调度的集群能够容忍worker节点的失败(在Spark本身来说,它能够将失败的工作移到其他worker节点上)。然而,调度器需要master做出调度决策,而这(默认行为)会造成单点失败:如果master挂了,任何应用都不能提交和调度。为了绕过这个单点问题,我们有两种高可用方案,具体如下:

基于Zookeeper的热备master

概要

利用Zookeeper来提供领导节点选举以及一些状态数据的存储,你可以在集群中启动多个master并连接到同一个Zookeeper。其中一个将被选举为“领导”,而另一个将处于备用(standby)状态。如果“领导”挂了,则另一个master会立即被选举,并从Zookeeper恢复已挂“领导”的状态,并继续调度。整个恢复流程(从“领导”挂开始计时)可能需要1到2分钟的时间。注意,整个延时只会影响新增应用 – 已经运行的应用不会受到影响。

更多关于Zookeeper信息请参考这里:here

配置

要启用这种恢复模式,你可以在spark-env中设置 SPARK_DAEMON_JAVA_OPTS,可用的属性如下:

系统属性 含义
spark.deploy.recoveryMode 设为ZOOKEEPER以启用热备master恢复模式(默认空)
spark.deploy.zookeeper.url Zookeeper集群URL(如:192.168.1.100:2181,192.168.1.101:2181)
spark.deploy.zookeeper.dir 用于存储可恢复状态的Zookeeper目录(默认 /spark)

可能的问题:如果你有多个master,但没有正确设置好master使用Zookeeper的配置,那么这些master彼此都不可见,并且每个master都认为自己是“领导”。这件会导致整个集群处于不稳定状态(多个master都会独立地进行调度)

详细

如果你已经有一个Zookeeper集群,那么启动高可用特性是很简单的。只需要在不同节点上启动多个master,并且配置相同的Zookeeper(包括Zookeeper URL和目录)即可。masters可以随时添加和删除。

在调度新提交的Spark应用或者新增worker节点时,需要知道当前”领导“的IP地址。你只需要将以前单个的master地址替换成多个master地址即可。例如,你可以在SparkContext中设置master URL为spark://host1:port1.host2:port2。这会导致SparkContext在两个master中都进行登记 – 那么这时候,如果host1挂了,这个应用的配置同样可以在新”领导“(host2)中找到。

”在master注册“和普通操作有一个显著的区别。在Spark应用或worker启动时,它们需要找当前的”领导“master,并在该master上注册。一旦注册成功,它们的状态将被存储到Zookeeper上。如果”老领导“挂了,”新领导“将会联系所有之前注册过的Spark应用和worker并通知它们领导权的变更,所以Spark应用和worker在启动时甚至没有必要知道”新领导“的存在。

由于这一特性,新的master可以在任何时间添加进来,你唯一需要关注的就是,新的应用或worker能够访问到这个master。总之,只要应用和worker注册成功,其他的你都不用管了。

基于本地文件系统的单点恢复

概要

利用Zookeeper当然是生成环境下高可用的最佳选择,但有时候你仍然希望在master挂了的时候能够重启之,FILESYSTEM模式能帮你实现这一需求。当应用和worker注册到master的时候,他们的状态都将被写入文件系统目录中,一旦master挂了,你只需要重启master,这些状态都能够恢复。

配置

要使用这种恢复模式,你需要在spark-env中设置SPARK_DAEMON_JAVA_OPTS,可用的属性如下:

系统属性 含义
spark.deploy.recoveryMode 设为FILESYSTEM以启用单点恢复模式(默认空)
spark.deploy.recoveryDirectory 用于存储可恢复状态数据的目录,master进程必须有访问权限

Details(详细)

  • 这个解决方案可以用于和一些监控、管理系统进行串联(如:monit),或者与手动重启相结合。
  • 至少,基于文件系统工单恢复总比不能恢复强;同时,这种恢复模式也会是开发或者实验场景下的不错的选择。在某些情况下,通过stop-master.sh杀死master可能不会清理其状态恢复目录下的数据,那么这时候你启动或重启master,将会进入恢复模式。这可能导致master的启动时间长达一分钟(master可能要等待之前注册的worker和客户端超时)。
  • 你也可以使用NFS目录来作为数据恢复目录(虽然这不是官方声明支持的)。如果老的master挂了,你可以在另一个节点上启动master,这个master只要能访问同一个NFS目录,它就能够正确地恢复状态数据,包括之前注册的worker和应用(等价于Zookeeper模式)。后续的应用必须使用新的master来进行注册。

原创文章,转载请注明: 转载自并发编程网 – ifeve.com本文链接地址: 《Spark 官方文档》Spark独立模式

  • Trackback 关闭
  • 评论 (1)
    • dlbrant
    • 2016/03/16 4:39下午

    发布了以后是不是就不能编辑了?

return top