Spark技术内幕:Shuffle Pluggable框架详解,你怎么开发自己的Shuffle Service?

  • 时间:
  • 浏览:10
  • 来源:uu快3电脑版_uu快3和值_礼金

2)       获得Shuffle Writer, 根据Shuffle Map Task的ID为其创建Shuffle Writer。

     shuffleId: Int,

ShuffleReader实现了下游的Task怎么才能 才能 读取上游的ShuffleMapTask的Shuffle输出的逻辑。你这人逻辑比较僵化 ,简单来说就说 通过org.apache.spark.MapOutputTracker获得数据的位置信息,为何让 为何让 数据在本地如此调用org.apache.spark.storage.BlockManager的getBlockData读取本地数据(实际上getBlockData最终会调用org.apache.spark.shuffle.ShuffleBlockManager的getBlockData)。具体的Shuffle Read的逻辑请查看下面的章节。

      endPartition: Int,

一般为何让 如此特殊的需求,还能否 使用下面的实现,实际上Hash BasedShuffle 和Sort BasedShuffle不是如此实现的。

6)       def stop(): Unit,停止Shuffle Manager。

Driver和每个Executor不是持有有还有一个ShuffleManager,你这人ShuffleManager还能否 通过配置项spark.shuffle.manager指定,为何让 由SparkEnv创建。Driver中的ShuffleManager负责注册Shuffle的元数据,比如Shuffle ID,map task的数量等。Executor中的ShuffleManager 则负责读和写Shuffle的数据。

每个接口的具体实现的例子,还能否 参照org.apache.spark.shuffle.sort.SortShuffleManager 和org.apache.spark.shuffle.hash.HashShuffleManager。

def getWriter[K, V](handle: ShuffleHandle, mapId: Int, context:TaskContext): ShuffleWriter[K, V]

   defregisterShuffle[K, V, C](

      handle: ShuffleHandle,

Shuffle Map Task通过ShuffleWriter将Shuffle数据写入本地。你这人Writer主要通过ShuffleBlockManager来写入数据,为何让 它的功能是比较轻量级的。

      context: TaskContext): ShuffleReader[K,C]

      dependency: ShuffleDependency[K, V, C]):ShuffleHandle = {

3)       获得Shuffle Reader,根据Shuffle ID和partition的ID为其创建ShuffleReader。

  override def registerShuffle[K, V, C](

对于Hash BasedShuffle,请查看org.apache.spark.shuffle.hash.HashShuffleWriter;对于Sort Based Shuffle,请查看org.apache.spark.shuffle.sort.SortShuffleWriter。

      shuffleId: Int,



      startPartition: Int,

  }

怎么才能 才能 开发我本人的Shuffle机制?到这里你应该知道为何会 做了。别问我? 再看到吧。

2)         def stop(success: Boolean): Option[MapStatus],写入完成后提交本次写入。

还能否 实现的函数及其功能说明:

5)       defunregisterShuffle(shuffleId: Int): Boolean,删除本地的Shuffle的元数据。

主要使用从本地读取Shuffle数据的功能。哪几种接口不是通过org.apache.spark.storage.BlockManager调用的。

dependency:ShuffleDependency[K, V, C]): ShuffleHandle

3)       def stop(): Unit,停止该Manager。

4)       为数据成员shuffleBlockManager赋值,以保存实际的ShuffleBlockManager

1)       def getBytes(blockId: ShuffleBlockId):Option[ByteBuffer], 一般通过调用下有还有一个接口实现,只不过将ManagedBuffer转加进了ByteBuffer。

  def getReader[K, C](

对于Hash Based Shuffle,请查看org.apache.spark.shuffle.FileShuffleBlockManager;对于Sort Based Shuffle,请查看org.apache.spark.shuffle.IndexShuffleBlockManager。



为何让 您喜欢 本文,如此请动一下手指支持以下博客之星的评比吧。非常感谢您的投票。每天还能否 一票哦。

      numMaps: Int,

2)       def getBlockData(blockId:ShuffleBlockId): ManagedBuffer,核心读取逻辑。比如Hash Based Shuffle的从本地读取文件不是通过你这人接口实现的。为何让 不同的实现为何让 文件的组织最好的最好的依据是不一样的,比如Sort Based Shuffle还能否 通过先读取Index索引文件获得每个partition的起始位置后,能否 读取真正的数据文件。

     numMaps: Int,

首先介绍一下还能否 实现的接口。框架的类图如图所示(今天CSDN抽风,竟然上传不了图片。为何让 还能否 实现新的Shuffle机制,如此还能否 实现哪几种接口。

1)         def write(records: Iterator[_ <:Product2[K, V]]): Unit, 写入所有的数据。还能否 注意的是为何让 还能否 在Map端做聚合。(aggregate),如此写入前还能否 将records做聚合。

1)       def read():Iterator[Product2[K, C]]

1)       由Driver注册元数据信息

    new BaseShuffleHandle(shuffleId, numMaps,dependency)