五、Spark Shuffle文件寻址

1、Shuffle文件寻址

1)、MapOutputTracker

MapOutputTracker是Spark架构中的一个模块,是一个主从架构。管理磁盘小文件的地址

2)、BlockManager

BlockManager块管理者,是Spark架构中的一个模块,也是一个主从架构。

BlockManagerMaster会在集群中有用到广播变量缓存数据或者删除缓存数据时候通知BlockManagerSlave传输或者删除数据

BlockManagerSlave会与BlockManagerSlave之间通信

3)、Shuffle文件寻址

4)、Shuffle文件寻址流程

  1. map task执行完成后,会将task执行情况和磁盘小文件的地址封装到MpStatus对象中,通过MapOutputTrackerWorker对象向Driver中的MapOutputTrackerMaster汇报。
  2. 在所有的map task执行完毕后,Driver中就掌握了所有的磁盘小文件的地址。
  3. reduce task执行之前,会通过Excutor中MapOutPutTrackerWorker向Driver端的MapOutputTrackerMaster获取磁盘小文件的地址。
  4. 获取到磁盘小文件的地址后,会通过BlockManager连接数据所在节点然后通过BlockTransferService行数据的传输
  5. BlockTransferService默认启动5个task节点取数据。默认情况下,5个task拉取数据量不能超过48M。

六、Spark 内存管理和Shuffle优化

1、Spark内存管理

Spark执行应用程序时,Spark集群会启动Driver和Executor两种JVM进程,Driver负责创建SparkContext上下文提交任务,task分发等。Executor负责task计算任务,并将结果返回给Driver。同时需要需要持久化的RDD提供储存。Driver端的内存管理比较简单这里所说的Spark内存管理针对Executor端的内存管理。

Spark内存管理分为静态内存管理和统一内存管理,Spark1.6之前使用的是静态内存管理,Spark1.6之后引入统一内存管理。

静态内存管理存储内存、执行内存和其他内存的大小在 Spark 应用程序运行期间均为固定的,但用户可以应用程序启动前进行配置

统一内存管理与静态内存管理的区别在于储存内存和执行内存共享同一块空间可以互相借用对方的空间。

Spark1.6以上版本默认使用的是统一内存管理,可以通过参数spark.memory.useLegacyMode 设置true(默认false)使用静态内存管理。

1)、静态内存管理分布图

2)、统一内存管理分布图

3)、reduce 中OOM如何处理

  1. 减少每次拉取的数据量
  2. 提高shuffle聚合的内存比例
  3. 提高Excutor的总内存

2、Shuffle调优

1)、SparkShuffle调优配置如何使用?

1、在代码中,推荐使用,硬编码

new SparkConf().set(“spark.shuffle.file.buffer”,”64”)

2、在提交spark任务时候推荐使用。

spark-submit --conf spark.shuffle.file.buffer=64 –conf ….

3、在conf下的spark-default.conf配置文件中,推荐,因为是写死后所有应用程序都要用。

2)、Shuffle调优附件

spark.reducer.maxSizeInFlight
默认值:48m
参数说明:该参数用于设置shuffle read taskbuffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小比如96m),从而减少拉取数据的次数,也就可以减少网络传输次数,进而提升性能。在实践发现合理调节该参数,性能会有1%~5%的提升

spark.shuffle.io.maxRetries
默认值:3
参数说明shuffle read taskshuffle write task所在节点拉取属于自己数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表可以重试最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败
调优建议:对于那些包含了特别耗时的shuffle操作作业建议增加重试最大次数比如60次),以避免由于JVM的full gc或者网络稳定因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程调节该参数可以大幅度提升稳定性。
shuffle file not find    taskScheduler不负责重试task,由DAGScheduler负责重试stage

spark.shuffle.io.retryWait
默认值:5s
参数说明:具体解释同上,该参数代表了每次重试取数据的等待间隔默认是5s。
调优建议建议加大间隔时长(比如60s),以增加shuffle操作稳定性。

spark.shuffle.sort.bypassMergeThreshold
默认值:200
参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并一个文件,并会创建单独的索引文件。
调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制map-side就不会进行排序了,减少了排序性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高

原文地址:https://blog.csdn.net/yaya_jn/article/details/134661647

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任

如若转载,请注明出处:http://www.7code.cn/show_33832.html

如若内容造成侵权/违法违规/事实不符,请联系代码007邮箱suwngjj01@126.com进行投诉反馈,一经查实,立即删除

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注