<em id="vo9r9"><object id="vo9r9"><input id="vo9r9"></input></object></em>

<em id="vo9r9"><ruby id="vo9r9"><u id="vo9r9"></u></ruby></em>
<rp id="vo9r9"><acronym id="vo9r9"></acronym></rp>
<tbody id="vo9r9"><noscript id="vo9r9"></noscript></tbody>
<dd id="vo9r9"></dd>

[SPARK][CORE] 面試問題之 BypassMergeSortShuffleWriter的細節

發布時間:2022-05-21 06:39

BypassMergeSortShuffleWriter 就如其名,旁支的sort-baesd Shuffle, 他是采用Hash-style實現的Sort based Shuffle。在map階段records會按分區寫入不同的文件, 一個分區一個文件。然后鏈接這些分區文件形成一個output文件,并生成其index。reducer通過IndexShuffleBlockResolver查找消費輸出文件的不同分區。

在 BypassMergeSortShuffleWriter 中records是不會緩存在內存中,所有的records最終都會被flush到磁盤。

在寫入時,BypassMergeSortShuffleWriter 會同時為所有的分區打開單獨的序列化器和文件流,所以當reduce分區數量特別大的時候性能會非常低下。

ShuffleWriter 的調用是在ShuffleMapTask的runTask中進行調用,每個mapTask 都會調用一次runTask。

BypassMergeSortShuffleWriter 源碼解析

首先,我們來回顧下ShuffleWriter的過程。Shuffle發生與寬依賴的stage間,由于stage內的計算采用pipeline的方式。shuffle發生的上一個stage為map節點,下游的stage為reduce階段。而shuffle寫的過程就發生在map階段,shuffleWriter的調用主要是在ShuffleMapStage中,每個ShuffleMapStage包含多個ShuffleMapTask, mapTask個數和分區數相關。

這樣每個ShuffleMapTask都會在其runTask調用下Writer接口,其并非直接調用到具體的執行類。而是在劃分寬依賴時會獲取ShuffleManage,并注冊shuffle,這時會返回的具體ShuffleHandler。

在ShuffleMapTask調用Writer時,是先調用了ShuffleWriteProcessor ,主要控制了ShuffleWriter的生命周期。下面我們看下ShuffleWriteProcessor 中的Write的實現:

 1// ShuffleWriteProcessor
2def write(
3    rdd: RDD[_],
4    dep: ShuffleDependency[_, _, _],
5    mapId: Long,
6    context: TaskContext,
7    partition: Partition): MapStatus = {
8  var writer: ShuffleWriter[Any, Any] = null
9  try {
10    // [1] 通過SparkEnv獲取ShuffleManager, 并通過dep的shuffleHandle, 獲取對應的shuffleWriter的具體實現。
11    val manager = SparkEnv.get.shuffleManager
12    writer = manager.getWriter[Any, Any](
13      dep.shuffleHandle,
14      mapId,
15      context,
16      createMetricsReporter(context))
17    // [2] 調用shuffleWriter的write方法, 并將當前rdd的迭代器傳入
18    writer.write(
19      rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
20    // [3] shuffleWriter結束后,返回mapStatus,或清空數據
21    val mapStatus = writer.stop(success = true)
22    // [4] 如果shuffleWriter執行成功,初始化push-based shuffle, 后面再細講
23    if (mapStatus.isDefined) {
24      // Initiate shuffle push process if push based shuffle is enabled
25      // The map task only takes care of converting the shuffle data file into multiple
26      // block push requests. It delegates pushing the blocks to a different thread-pool -
27      // ShuffleBlockPusher.BLOCK_PUSHER_POOL.
28      if (dep.shuffleMergeEnabled && dep.getMergerLocs.nonEmpty && !dep.shuffleMergeFinalized) {
29        manager.shuffleBlockResolver match {
30          case resolver: IndexShuffleBlockResolver =>
31            val dataFile = resolver.getDataFile(dep.shuffleId, mapId)
32            new ShuffleBlockPusher(SparkEnv.get.conf)
33              .initiateBlockPush(dataFile, writer.getPartitionLengths(), dep, partition.index)
34          case _ =>
35        }
36      }
37    }
38    mapStatus.get
39  }
40...
41}

ShuffleWriteProcessor 中主要做了三件事:

  • [1] 通過SparkEnv獲取ShuffleManager, 并通過dep的shuffleHandle, 獲取對應的shuffleWriter的具體實現。

  • [2] 調用shuffleWriter的write方法, 并將當前rdd的迭代器傳入

  • [3] shuffleWriter結束后,返回mapStatus,或清空數據

可見每一個ShuffleMapTask執行結束后,就會返回一個mapStatus。Task 結果被封裝成 CompletionEvent發送到Driver DAG Scheduler 。判斷Task的類型是ShuffleMapTask,DagScheduler 會向 MapOutputTracker 注冊 MapOutput status 信息。

那么map中的數據是如何通過BypassMergeSortShuffleWriter寫入的?

 1// BypassMergeSortShuffleWriter
2@Override
3public void write(Iterator<Product2<K, V>> records) throws IOException {
4  assert (partitionWriters == null);
5  // [1] 創建處理mapTask所有分區數據commit提交writer
6  ShuffleMapOutputWriter mapOutputWriter = shuffleExecutorComponents
7      .createMapOutputWriter(shuffleId, mapId, numPartitions);
8  try {
9    // 如果沒有數據,直接提交所有分區的commit, 并返回分區長度,獲取mapStatus
10    if (!records.hasNext()) {
11      partitionLengths = mapOutputWriter.commitAllPartitions(
12        ShuffleChecksumHelper.EMPTY_CHECKSUM_VALUE).getPartitionLengths();
13      mapStatus = MapStatus$.MODULE$.apply(
14        blockManager.shuffleServerId(), partitionLengths, mapId);
15      return;
16    }
17    final SerializerInstance serInstance = serializer.newInstance();
18    final long openStartTime = System.nanoTime();
19    // [2] 為每個分區創建一個DiskBlockObjectWriter寫入流和FileSegment文件段
20    partitionWriters = new DiskBlockObjectWriter[numPartitions];
21    partitionWriterSegments = new FileSegment[numPartitions];
22    for (int i = 0; i < numPartitions; i++) {
23      // [2.1] 每個分區創建個臨時file和blockid, 并生成維護一個寫入流
24      final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
25          blockManager.diskBlockManager().createTempShuffleBlock();
26      final File file = tempShuffleBlockIdPlusFile._2();
27      final BlockId blockId = tempShuffleBlockIdPlusFile._1();
28      DiskBlockObjectWriter writer =
29        blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
30      if (partitionChecksums.length > 0) {
31        writer.setChecksum(partitionChecksums[i]);
32      }
33      partitionWriters[i] = writer;
34    } 
35    // Creating the file to write to and creating a disk writer both involve interacting with
36    // the disk, and can take a long time in aggregate when we open many files, so should be
37    // included in the shuffle write time.
38    writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
39    // [3] 依次將records寫入到對應分區的寫入流中, 并提交
40    while (records.hasNext()) {
41      final Product2<K, V> record = records.next();
42      final K key = record._1();
43      partitionWriters[partitioner.getPartition(key)].write(key, record._2());
44    }
45
46    // [3.1]依次對每個分區提交和flush寫入流
47    for (int i = 0; i < numPartitions; i++) {
48      try (DiskBlockObjectWriter writer = partitionWriters[i]) {
49        partitionWriterSegments[i] = writer.commitAndGet();
50      }
51    }
52    // [4] 遍歷所有分區的FileSegement, 并將其鏈接為一個文件,同時會調用writeMetadataFileAndCommit,為其生成索引文件
53    partitionLengths = writePartitionedData(mapOutputWriter);
54    mapStatus = MapStatus$.MODULE$.apply(
55      blockManager.shuffleServerId(), partitionLengths, mapId);
56  } catch (Exception e) {
57    try {
58      mapOutputWriter.abort(e);
59    } catch (Exception e2) {
60logger.error("Failed to abort the writer after failing to write map output.", e2);
61      e.addSuppressed(e2);
62    }
63    throw e;
64  }
65}

綜上,Bypass的writer步驟有四步:

  • [1] 創建處理mapTask所有分區數據commit提交writer

  • [2] 為每個分區創建一個DiskBlockObjectWriter寫入流和FileSegment文件段
    [2.1] 每個分區創建個臨時file和blockid, 并生成維護一個DiskBlockObjectWriter寫入流

  • [3] 依次將records寫入到對應分區的寫入流中, 并提交
    [3.1]依次對每個分區提交和flush寫入流

  • [4] 遍歷所有分區的FileSegement, 并將其鏈接為一個文件,同時會調用writeMetadataFileAndCommit,為其生成索引文件

所以說, Bypass在進行寫入時會為每個MapTask都會生成reduce分區個FileSegement, 寫入時會并發的為所有的分區都創建臨時文件和維護一個io的寫入流, 最終在鏈接為一個文件。所以如果分區數特別多的情況下,是會維護很多io流,所以Bypass限制了分區的閾值。另外通過源碼發現Bypass在實現過程中并沒有使用buffer, 而是直接將數據寫入到流中,這也就是為什么Bypass不能處理mapSide的預聚合的算子。

那么BypassMergeSortShuffleWriter 屬于sort-based Shuffle 到底有沒有排序呢?

接下來,我們再看下Bypass是如何將分區的FileSegement, 并將其鏈接為一個文件, 我們就需要詳細看下writePartitionedData是如何實現的。

 1private long[] writePartitionedData(ShuffleMapOutputWriter mapOutputWriter) throws IOException {
2  // Track location of the partition starts in the output file
3  if (partitionWriters != null) {
4    final long writeStartTime = System.nanoTime();
5    try {
6      for (int i = 0; i < numPartitions; i++) {
7        // [1] 獲取每個分區的 fileSegement 臨時文件,和writer寫出流
8        final File file = partitionWriterSegments[i].file();
9        ShufflePartitionWriter writer = mapOutputWriter.getPartitionWriter(i);
10        if (file.exists()) {
11          if (transferToEnabled) {
12            // Using WritableByteChannelWrapper to make resource closing consistent between
13            // this implementation and UnsafeShuffleWriter.
14            Optional<WritableByteChannelWrapper> maybeOutputChannel = writer.openChannelWrapper();
15            if (maybeOutputChannel.isPresent()) {
16              writePartitionedDataWithChannel(file, maybeOutputChannel.get());
17            } else {
18              writePartitionedDataWithStream(file, writer);
19            }
20          } else {
21            // [2] 將fileSegement合并為一個文件
22            writePartitionedDataWithStream(file, writer);
23          }
24          if (!file.delete()) {
25logger.error("Unable to delete file for partition {}", i);
26          }
27        }
28      }
29    } finally {
30      writeMetrics.incWriteTime(System.nanoTime() - writeStartTime);
31    }
32    partitionWriters = null;
33  }
34  // [3] 提交所有的分區,傳入每個分區數據的長度, 調用 writeMetadataFileAndCommit生成索引文件,記錄每個分區的偏移量
35  return mapOutputWriter.commitAllPartitions(getChecksumValues(partitionChecksums))
36    .getPartitionLengths();
37}

writePartitionedData是如何實現,有三個步驟:

  • [1] 獲取每個分區的 fileSegement 臨時文件,和writer寫出流

  • [2] 將fileSegement合并為一個文件

  • [3] 提交所有的分區,傳入每個分區數據的長度, 調用 writeMetadataFileAndCommit生成索引文件,記錄每個分區的偏移量

總結, BypassMergeSortShuffleWriter 的實現是hash-style的方式,其中沒有sort, 沒有buffer,每一個mapTask都會生成分區數量個FileSegment, 最后再合并為一個File, 最終根據分區的長度為其生成索引文件。所以BypassMergeSortShuffleWriter在分區數量比較小的情況下,性能是比較佳的。其最終每個task會生成2個文件, 所以最終的生成文件數也是2 * M個文件。

今天就先到這里,通過上面的介紹,我們也留下些面試題:

  1. BypassMergeSortShuffleWriter和HashShuffle有什么區別?

  2. 為什么不保留HashShuffleManage, 而是將其作為SortShuffleManager中的一個Writer實現?


本文轉載自網絡,如有侵犯您的權益,請郵件聯系wyl860211@qq.com刪除

混跡職場

升職加薪

熱門推薦

人与嘼zozo免费观看

<em id="vo9r9"><object id="vo9r9"><input id="vo9r9"></input></object></em>

<em id="vo9r9"><ruby id="vo9r9"><u id="vo9r9"></u></ruby></em>
<rp id="vo9r9"><acronym id="vo9r9"></acronym></rp>
<tbody id="vo9r9"><noscript id="vo9r9"></noscript></tbody>
<dd id="vo9r9"></dd>