欢迎来真孝善网,为您提供真孝善正能量书籍故事!

Spark Shuffle工作原理及源码深度解析

时间:11-03 名人轶事 提交错误

大家好,关于Spark Shuffle工作原理及源码深度解析很多朋友都还不太明白,不过没关系,因为今天小编就来为大家分享关于的知识点,相信应该可以解决大家的一些困惑和问题,如果碰巧可以解决您的问题,还望关注下本站哦,希望对各位有所帮助!

//让用户指定随机播放管理器的短名称valshortShuffleMgrNames=Map("sort"-classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,"tungsten-sort"-classOf[org.apache.spark.shuffle .sort.SortShuffleManager].getName)valshuffleMgrName=conf.get(config.SHUFFLE_MANAGER)valshuffleMgrClass=ShortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)valshuffleManager=instantiateClass[ShuffleManager](shuffleMgrClass)

在这里你可以看到有两种洗牌,包括排序和钨排序。 ShuffleManager是通过反射创建的。 ShuffleManager 是一个特质。核心方法如下:

私人[spark]traitShuffleManager{/**

* 注册一个shuffle返回句柄

*/defregisterShuffle[K,V,C]( shuffleId:Int, dependency:ShuffleDependency[K,V,C]):ShuffleHandle/** 根据给定分区获取一个Writer,当executors执行map任务时调用*/defgetWriter[K,V ](handle:ShuffleHandle,mapId:Long,context:TaskContext,metrics:ShuffleWriteMetricsReporter):ShuffleWriter[K,V]/**

* 根据reduce分区范围获取Reader,执行器执行reduce任务时调用

*/defgetReader [K,C](handle:ShuffleHandle,startPartition:Int,endPartition:Int,context:TaskContext,metrics:ShuffleReadMetricsReporter):ShuffleReader [K,C] .}

2.SortShuffleManager

SortShuffleManager是ShuffleManager的唯一实现类。上述三种方法的实现如下:

2.1 注册Shuffle

/**

* 获取一个[[ShuffleHandle]]传递给任务。

*/overridedefregisterShuffle[K,V,C]( shuffleId:Int, dependency:ShuffleDependency[K,V,C]):ShuffleHandle={//1.首先检查是否符合BypassMergeSortif(SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {//If分区数量少于spark.shuffle.sort.bypassMergeThreshold,并且我们不需要//映射端聚合,然后直接写入numPartitions文件并在最后连接//它们。这避免了两次序列化和反序列化来将溢出的文件合并在一起,这在正常的代码路径中会发生。缺点是//一次打开多个文件,因此分配给缓冲区的内存更多。 newBypassMergeSortShuffleHandle[K,V]( shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V,V]])//否则,检查是否可以序列化}elseif(SortShuffleManager.canUseSerializedShuffle(dependency)) {//否则,尝试以序列化形式缓冲映射输出,因为这样效率更高:newSerializedShuffleHandle[K,V ]( shuffleId, dependency.asInstanceOf[ShuffleDependency[K,V] ,V]]) }else{//否则,buffer map以反序列化形式输出:newBaseShuffleHandle(shuffleId, dependency) } }

1.首先检查是否满足BypassMergeSort。这里需要满足两个条件。首先,当前的shuffle依赖中没有map端的聚合操作。其次,分区数量必须小于spark.shuffle.sort.bypassMergeThreshold的值。默认值为200。 如果满足这些条件, 两个条件将返回BypassMergeSortShuffleHandle 并启用旁路归并排序洗牌机制。

defshouldBypassMergeSort(conf:SparkConf, dep:ShuffleDependency[_, _, _]):Boolean={//如果需要进行地图端聚合,则无法绕过排序。if(dep.mapSideCombine) {false}else{//默认值为200valbypassMergeThreshold:Int=conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD) dep.partitioner.numPartitions=bypassMergeThreshold }}

2、如果不满足上述条件,检查是否满足canUseSerializedShuffle()方法。如果满足该方法中的三个条件,将返回SerializedShuffleHandle 并启用tungsten-sort shuffle 机制。

defcanUseSerializedShuffle(dependency:ShuffleDependency[_, _, _]):Boolean={valshufId=dependency.shuffleIdvalnumPartitions=dependency.partitioner.numPartitions//序列化器需要支持Relocationif(!dependency.serializer.supportsRelocationOfSerializedObjects) { log.debug(s"Can" t 使用序列化的shuffle进行shuffle$shufId因为序列化器"+s"${dependency.serializer.getClass.getName}不支持对象重定位")false//不能进行map端聚合操作}elseif(dependency.mapSideCombine ) { log.debug(s"不能使用序列化shuffle进行shuffle$shufId因为我们需要做"+s"map端聚合")false//分区数量不能大于16777215+1}elseif(numPartitions MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) { log.debug (s"不能对shuffle$shufId 使用序列化shuffle,因为它有多个"+s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODEpartitions")false}else{ log.debug(s"可以对shuffle$shufId 使用序列化shuffle" )真的}}

3、如果以上两个条件都不满足,则返回BaseShuffleHandle,并使用基本的sort shuffle机制。

2.2 获取读者

/**

* 获取一系列reduce 分区的读取器(startPartition 到endPartition-1,包括在内)。

* 通过reduce任务调用执行器。

*/overridedefgetReader [K,C](handle:ShuffleHandle,startPartition:Int,endPartition:Int,context:TaskContext,metrics:ShuffleReadMetricsReporter):ShuffleReader [K,C]={valblocksByAddress=SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(手le.shuffleId, startPartition, endPartition)newBlockStoreShuffleReader(handle.asInstanceOf [BaseShuffleHandle[K,_,C]],blocksByAddress,上下文,指标,shouldBatchFetch=canUseBatchFetch(startPartition,endPartition,context))}

这里返回BlockStoreShuffleReader

2.3 获取作家

/** 获取给定分区的写入器。由映射任务调用执行器。 */overridedefgetWriter[K,V](handle:ShuffleHandle,mapId:Long,context:TaskContext,metrics:ShuffleWriteMetricsReporter):ShuffleWriter[K,V]={valmapTaskIds=taskIdMapsForShuffle.computeIf Absent(handle .shuffleId, _=newOpenHashSet[Long](16)) mapTaskIds.synchronized { mapTaskIds.add(context.taskAttemptId()) }valenv=SparkEnv.get//获取不同的ShuffleWritehandlematch{caseunsafeShuffleHandle:SerializedShuffleHandle[K@unchecked,V@unchecked ]=newUnsafeShuffleWriter( env.blockManager, context.taskMemoryManager(), unsafeShuffleHandle, mapId, context , env.conf,指标,shuffleExecutorComponents)casebypassMergeSortHandle:BypassMergeSortShuffleHandle[K@unchecked,V@unchecked]=newBypassMergeSortShuffleWriter( env.blockManager,bypassMergeSortHandle ,mapId,env .conf,指标,shuffleExecutorComponents)caseother:BaseShuffleHandle[ K@未选中,V@未选中,_]=newSortShuffleWriter(shuffleBlockResolver, 其他, mapId, context, shuffleExecutorComponents) }}

这里会根据句柄得到不同的ShuffleWrite。如果是SerializedShuffleHandle,则使用UnsafeShuffleWriter。如果是BypassMergeSortShuffleHandle,则使用BypassMergeSortShuffleWriter。否则,使用SortShuffleWriter。

3. 三个Writer的实现

上面提到,当旁路机制开启时,会使用BypassMergeSortShuffleWriter。如果序列化器支持重定位,且map端没有聚合且分区数不大于16777215+1,满足三个条件,则使用UnsafeShuffleWriter,否则使用SortShuffleWriter。

3.1 绕过MergeSortShuffleWriter

BypassMergeSortShuffleWriter继承ShuffleWriter,用Java实现。它将map端的多个输出文件合并为一个文件并生成一个索引文件。索引记录到每个分区的起始地址。 write()方法如下:

@Overridepublic void write(Iteratorrecords)throwsIOException{ assert (partitionWriters==null);//创建新的ShuffleMapOutputWriterShuffleMapOutputWritermapOutputWriter=shuffleExecutorComponents .createMapOutputWriter(shuffleId, mapId, numPartitions);try{//如果没有数据if(!records.hasNext () ) {//返回所有分区的写入长度partitionLengths=mapOutputWriter.commitAllPartitions(); //更新mapStatus mapStatus=MapStatus$.MODULE$.apply( blockManager.shuffleServerId(),partitionLengths,mapId);返回; }finalSerializerInstanceserInstance=序列化器。 newInstance();finallong openStartTime=System.nanoTime();//创建与分区数相等的DiskBlockObjectWriter FileSegmentpartitionWriters=newDiskBlockObjectWriter[numPartitions]; partitionWriterSegments=newFileSegment[numPartitions];//对于每个分区for(int i=0; i numPartitions; i++) {//创建临时块finalTuple2 tempShuffleBlockIdPlusFile=blockManager.diskBlockManager().createTempShuffleBlock();//获取文件和id临时块的finalFilefile=tempShuffleBlockIdPlusFile._2();finalBlockIdblockId=tempShuffleBlockIdPlusFile._1();//为每个分区创建一个DiskBlockObjectWriterpartitionWriters[i]=blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); } //创建要写入的文件和创建磁盘写入器都涉及与磁盘交互,并且当我们打开许多文件时,可能会花费很长时间,因此应该//包含在随机写入时间中。//创建文件和写入文件需要大量时间,也需要计入shuffle写入时间writeMetrics中。 incWriteTime(System.nanoTime() - openStartTime);//如果有数据while(records.hasNext()) {finalProduct2 record=reports.next();finalKkey=record._1();//对于每条数据,按key 写入对应分区对应的文件partitionWriters[partitioner.getPartition(key)].write(key, record._2()); }for(int i=0; i numPartitions; i++) {try(DiskBlockObjectWriterwriter=partitionWriters[i ]) {//提交partitionWriterSegments[i]=writer.commitAndGet(); } }//将所有分区文件合并为一个文件partitionLengths=writePartitionedData(mapOutputWriter); //更新mapStatusmapStatus=MapStatus$.MODULE$.apply( blockManager .shuffleServerId(),partitionLengths,mapId); }catch(Exceptione) {try{ mapOutputWriter.abort(e); }catch(Exceptione2) { logger.error("无法在写入映射输出后中止写入器。", e2); e.addSuppressed(e2); }扔; }}

合并文件的方法writePartitionedData()如下。默认使用零拷贝方法来合并文件:

privatelong[] writePartitionedData(ShuffleMapOutputWritermapOutputWriter) throwsIOException{//跟踪分区在输出文件中开始的位置if(partitionWriters !=null) {//开始时间finallong writeStartTime=System.nanoTime();try{for(int i=0; } i numPartitions; i++) {//获取每个文件FinalFilefile=partitionWriterSegments[i].file();ShufflePartitionWriterwriter=mapOutputWriter.getPartitionWriter(i);if(file.exists()) {//采用零拷贝方法if(transferToEnabled ) {//使用WritableByteChannelWrapper 使资源关闭保持一致//该实现与UnsafeShuffleWriter.Optional MaybeOutputChannel=writer.openChannelWrapper();//这里会调用Utils.copyFileStreamNIO 方法,最后会调用FileChannel.transferTo 方法复制文件if(maybeOutputChannel .isPresent()) { writePartitionedDataWithChannel(file, MaybeOutputChannel.get()); }其他{ writePartitionedDataWithStream(文件, writer); } }else{//否则,复制流writePartitionedDataWithStream(file, writer); }if(! file.delete()) { logger.error("无法删除分区{} 的文件", i); } } } }最后{ writeMetrics.incWriteTime(System.nanoTime() - writeStartTi

me);    }    partitionWriters =null;  }returnmapOutputWriter.commitAllPartitions();} 3.2 UnsafeShuffleWriter UnsafeShuffleWriter也是继承ShuffleWriter,用java实现,write方法如下: @Overridepublic void write(scala.collection.Iterator>records)throwsIOException{// Keep track of success so we know if we encountered an exception// We do this rather than a standard try/catch/re-throw to handle// generic throwables.// 跟踪异常boolean success =false;try{while(records.hasNext()) {// 将数据插入ShuffleExternalSorter进行外部排序insertRecordIntoSorter(records.next());    }// 合并并输出文件closeAndWriteOutput();    success =true;  }finally{if(sorter !=null) {try{        sorter.cleanupResources();      }catch(Exceptione) {// Only throw this error if we won"t be masking another// error.if(success) {throwe;        }else{          logger.error("In addition to a failure during writing, we failed during "+"cleanup.", e);        }      }    }  }} 这里主要有两个方法: 3.2.1 insertRecordIntoSorter()

@VisibleForTestingvoid insertRecordIntoSorter(Product2 record)throwsIOException{  assert(sorter !=null);// 获取key和分区finalKkey = record._1();finalint partitionId = partitioner.getPartition(key);// 重置缓冲区serBuffer.reset();// 将key和value写入缓冲区serOutputStream.writeKey(key,OBJECT_CLASS_TAG);  serOutputStream.writeValue(record._2(),OBJECT_CLASS_TAG);  serOutputStream.flush();// 获取序列化数据大小finalint serializedRecordSize = serBuffer.size();  assert (serializedRecordSize >0);// 将序列化后的数据插入ShuffleExternalSorter处理sorter.insertRecord(    serBuffer.getBuf(),Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);} 该方法会将数据进行序列化,并且将序列化后的数据通过insertRecord()方法插入外部排序器中,insertRecord()方法如下: public void insertRecord(ObjectrecordBase, long recordOffset, int length, int partitionId)throwsIOException{// for testsassert(inMemSorter !=null);// 如果数据条数超过溢写阈值,直接溢写磁盘if(inMemSorter.numRecords() >= numElementsForSpillThreshold) {    logger.info("Spilling data because number of spilledRecords crossed the threshold "+      numElementsForSpillThreshold);    spill();  }// Checks whether there is enough space to insert an additional record in to the sort pointer// array and grows the array if additional space is required. If the required space cannot be// obtained, then the in-memory data will be spilled to disk.// 检查是否有足够的空间插入额外的记录到排序指针数组中,如果需要额外的空间对数组进行扩容,如果空间不够,内存中的数据将会被溢写到磁盘上growPointerArrayIfNecessary();finalint uaoSize =UnsafeAlignedOffset.getUaoSize();// Need 4 or 8 bytes to store the record length.// 需要额外的4或8个字节存储数据长度finalint required = length + uaoSize;// 如果需要更多的内存,会想TaskMemoryManager申请新的pageacquireNewPageIfNecessary(required);  assert(currentPage !=null);finalObjectbase = currentPage.getBaseObject();//Given a memory page and offset within that page, encode this address into a 64-bit long.//This address will remain valid as long as the corresponding page has not been freed.// 通过给定的内存页和偏移量,将当前数据的逻辑地址编码成一个long型finallong recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);// 写长度值UnsafeAlignedOffset.putSize(base, pageCursor, length);// 移动指针pageCursor += uaoSize;// 写数据Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);// 移动指针pageCursor += length;// 将编码的逻辑地址和分区id传给ShuffleInMemorySorter进行排序inMemSorter.insertRecord(recordAddress, partitionId);} 在这里对于数据的缓存和溢写不借助于其他高级数据结构,而是直接操作内存空间 growPointerArrayIfNecessary()方法如下: /** * Checks whether there is enough space to insert an additional record in to the sort pointer * array and grows the array if additional space is required. If the required space cannot be * obtained, then the in-memory data will be spilled to disk. */privatevoid growPointerArrayIfNecessary()throwsIOException{  assert(inMemSorter !=null);// 如果没有空间容纳新的数据if(!inMemSorter.hasSpaceForAnotherRecord()) {// 获取当前内存使用量long used = inMemSorter.getMemoryUsage();LongArrayarray;try{// could trigger spilling// 分配给缓存原来两倍的容量array = allocateArray(used /8*2);    }catch(TooLargePageExceptione) {// The pointer array is too big to fix in a single page, spill.// 如果超出了一页的大小,直接溢写,溢写方法见后面// 一页的大小为128M,在PackedRecordPointer类中// static final int MAXIMUM_PAGE_SIZE_BYTES = 1<< 27;  // 128 megabytesspill();return;    }catch(SparkOutOfMemoryErrore) {// should have trigger spillingif(!inMemSorter.hasSpaceForAnotherRecord()) {        logger.error("Unable to grow the pointer array");throwe;      }return;    }// check if spilling is triggered or notif(inMemSorter.hasSpaceForAnotherRecord()) {// 如果有了剩余空间,则表明没必要扩容,释放分配的空间freeArray(array);    }else{// 否则把原来的数组复制到新的数组inMemSorter.expandPointerArray(array);    }  }} spill()方法如下: @Overridepublic long spill(long size,MemoryConsumertrigger)throwsIOException{if(trigger !=this|| inMemSorter ==null|| inMemSorter.numRecords() ==0) {return0L;  }  logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",Thread.currentThread().getId(),Utils.bytesToString(getMemoryUsage()),    spills.size(),    spills.size() >1?" times":" time");// Sorts the in-memory records and writes the sorted records to an on-disk file.// This method does not free the sort data structures.// 对内存中的数据进行排序并且将有序记录写到一个磁盘文件中,这个方法不会释放排序的数据结构writeSortedFile(false);finallong spillSize = freeMemory();// 重置ShuffleInMemorySorterinMemSorter.reset();// Reset the in-memory sorter"s pointer array only after freeing up the memory pages holding the// records. Otherwise, if the task is over allocated memory, then without freeing the memory// pages, we might not be able to get memory for the pointer array.taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);returnspillSize;} writeSortedFile()方法: privatevoid writeSortedFile(boolean isLastFile) {// This call performs the actual sort.// 返回一个排序好的迭代器finalShuffleInMemorySorter.ShuffleSorterIteratorsortedRecords =    inMemSorter.getSortedIterator();// If there are no sorted records, so we don"t need to create an empty spill file.if(!sortedRecords.hasNext()) {return;  }finalShuffleWriteMetricsReporterwriteMetricsToUse;// 如果为true,则为输出文件,否则为溢写文件if(isLastFile) {// We"re writing the final non-spill file, so we _do_ want to count this as shuffle bytes.writeMetricsToUse = writeMetrics;  }else{// We"re spilling, so bytes written should be counted towards spill rather than write.// Create a dummy WriteMetrics object to absorb these metrics, since we don"t want to count// them towards shuffle bytes written.writeMetricsToUse =newShuffleWriteMetrics();  }// Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn"t seem to// be an API to directly transfer bytes from managed memory to the disk writer, we buffer// data through a byte array. This array does not need to be large enough to hold a single// record;// 创建一个字节缓冲数组,大小为1mfinalbyte[] writeBuffer =newbyte[diskWriteBufferSize];// Because this output will be read during shuffle, its compression codec must be controlled by// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use// createTempShuffleBlock here; see SPARK-3426 for more details.// 创建一个临时的shuffle blockfinalTuple2 spilledFileInfo =    blockManager.diskBlockManager().createTempShuffleBlock();// 获取文件和idfinalFilefile = spilledFileInfo._2();finalTempShuffleBlockIdblockId = spilledFileInfo._1();finalSpillInfospillInfo =newSpillInfo(numPartitions, file, blockId);// Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.// Our write path doesn"t actually use this serializer (since we end up calling the `write()`// OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work// around this, we pass a dummy no-op serializer.// 不做任何转换的序列化器,因为需要一个实例来构造DiskBlockObjectWriterfinalSerializerInstanceser =DummySerializerInstance.INSTANCE;  int currentPartition =-1;finalFileSegmentcommittedSegment;try(DiskBlockObjectWriterwriter =      blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse)) {finalint uaoSize =UnsafeAlignedOffset.getUaoSize();// 遍历while(sortedRecords.hasNext()) {      sortedRecords.loadNext();finalint partition = sortedRecords.packedRecordPointer.getPartitionId();      assert (partition >= currentPartition);if(partition != currentPartition) {// Switch to the new partition// 如果切换到了新的分区,提交当前分区,并且记录当前分区大小if(currentPartition !=-1) {finalFileSegmentfileSegment = writer.commitAndGet();          spillInfo.partitionLengths[currentPartition] = fileSegment.length();        }// 然后切换到下一个分区currentPartition = partition;      }// 获取指针,通过指针获取页号和偏移量finallong recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();finalObjectrecordPage = taskMemoryManager.getPage(recordPointer);finallong recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);// 获取剩余数据int dataRemaining =UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage);// 跳过数据前面存储的长度long recordReadPosition = recordOffsetInPage + uaoSize;// skip over record lengthwhile(dataRemaining >0) {finalint toTransfer =Math.min(diskWriteBufferSize, dataRemaining);// 将数据拷贝到缓冲数组中Platform.copyMemory(          recordPage, recordReadPosition, writeBuffer,Platform.BYTE_ARRAY_OFFSET, toTransfer);// 从缓冲数组中转入DiskBlockObjectWriterwriter.write(writeBuffer,0, toTransfer);// 更新位置recordReadPosition += toTransfer;// 更新剩余数据dataRemaining -= toTransfer;      }      writer.recordWritten();    }// 提交committedSegment = writer.commitAndGet();  }// If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted,// then the file might be empty. Note that it might be better to avoid calling// writeSortedFile() in that case.// 记录溢写文件的列表if(currentPartition !=-1) {    spillInfo.partitionLengths[currentPartition] = committedSegment.length();    spills.add(spillInfo);  }// 如果是溢写文件,更新溢写的指标if(!isLastFile) {      writeMetrics.incRecordsWritten(      ((ShuffleWriteMetrics)writeMetricsToUse).recordsWritten());    taskContext.taskMetrics().incDiskBytesSpilled(      ((ShuffleWriteMetrics)writeMetricsToUse).bytesWritten());  }} encodePageNumberAndOffset()方法如下: public long encodePageNumberAndOffset(MemoryBlockpage, long offsetInPage) {// 如果开启了堆外内存,偏移量为绝对地址,可能需要64位进行编码,由于页大小限制,将其减去当前页的基地址,变为相对地址if(tungstenMemoryMode ==MemoryMode.OFF_HEAP) {// In off-heap mode, an offset is an absolute address that may require a full 64 bits to// encode. Due to our page size limitation, though, we can convert this into an offset that"s// relative to the page"s base offset; this relative offset will fit in 51 bits.offsetInPage -= page.getBaseOffset();  }returnencodePageNumberAndOffset(page.pageNumber, offsetInPage);}@VisibleForTestingpublic static long encodePageNumberAndOffset(int pageNumber, long offsetInPage) {  assert (pageNumber >=0) :"encodePageNumberAndOffset called with invalid page";// 高13位为页号,低51位为偏移量// 页号左移51位,再拼偏移量和上一个低51位都为1的掩码0x7FFFFFFFFFFFFLreturn(((long) pageNumber)< ShuffleInMemorySorter的insertRecord()方法如下: public void insertRecord(long recordPointer, int partitionId) {if(!hasSpaceForAnotherRecord()) {thrownewIllegalStateException("There is no space for new record");  }  array.set(pos,PackedRecordPointer.packPointer(recordPointer, partitionId));  pos++;} PackedRecordPointer.packPointer()方法: public static long packPointer(long recordPointer, int partitionId) {  assert (partitionId<=MAXIMUM_PARTITION_ID);// Note that without word alignment we can address 2^27 bytes = 128 megabytes per page.// Also note that this relies on some internals of how TaskMemoryManager encodes its addresses.// 将页号右移24位,和低27位拼在一起,这样逻辑地址被压缩成40位finallong pageNumber = (recordPointer &MASK_LONG_UPPER_13_BITS) >>>24;finallong compressedAddress = pageNumber | (recordPointer &MASK_LONG_LOWER_27_BITS);// 将分区号放在高24位上return(((long) partitionId)<<40) | compressedAddress;} getSortedIterator()方法: publicShuffleSorterIteratorgetSortedIterator() {  int offset =0;// 使用基数排序对内存分区ID进行排序。基数排序要快得多,但是在添加指针时需要额外的内存作为保留内存if(useRadixSort) {    offset =RadixSort.sort(      array, pos,PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX,PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX,false,false);// 否则采用timSort排序}else{MemoryBlockunused =newMemoryBlock(      array.getBaseObject(),      array.getBaseOffset() + pos *8L,      (array.size() - pos) *8L);LongArraybuffer =newLongArray(unused);Sorter sorter =newSorter<>(newShuffleSortDataFormat(buffer));    sorter.sort(array,0, pos,SORT_COMPARATOR);  }returnnewShuffleSorterIterator(pos, array, offset);}

3.2.2 closeAndWriteOutput() @VisibleForTestingvoid closeAndWriteOutput()throwsIOException{  assert(sorter !=null);  updatePeakMemoryUsed();  serBuffer =null;  serOutputStream =null;// 获取溢写文件finalSpillInfo[] spills = sorter.closeAndGetSpills();  sorter =null;finallong[] partitionLengths;try{// 合并溢写文件partitionLengths = mergeSpills(spills);  }finally{// 删除溢写文件for(SpillInfospill : spills) {if(spill.file.exists() && !spill.file.delete()) {        logger.error("Error while deleting spill file {}", spill.file.getPath());      }    }  }// 更新mapstatusmapStatus =MapStatus$.MODULE$.apply(    blockManager.shuffleServerId(), partitionLengths, mapId);} mergeSpills()方法: privatelong[] mergeSpills(SpillInfo[] spills)throwsIOException{  long[] partitionLengths;// 如果没有溢写文件,创建空的if(spills.length ==0) {finalShuffleMapOutputWritermapWriter = shuffleExecutorComponents        .createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions());returnmapWriter.commitAllPartitions();// 如果只有一个溢写文件,将它合并输出}elseif(spills.length ==1) {Optional maybeSingleFileWriter =        shuffleExecutorComponents.createSingleFileMapOutputWriter(shuffleId, mapId);if(maybeSingleFileWriter.isPresent()) {// Here, we don"t need to perform any metrics updates because the bytes written to this// output file would have already been counted as shuffle bytes written.partitionLengths = spills[0].partitionLengths;      maybeSingleFileWriter.get().transferMapSpillFile(spills[0].file, partitionLengths);    }else{      partitionLengths = mergeSpillsUsingStandardWriter(spills);    }// 如果有多个,合并输出,合并的时候有NIO和BIO两种方式}else{    partitionLengths = mergeSpillsUsingStandardWriter(spills);  }returnpartitionLengths;} 3.3 SortShuffleWriter SortShuffleWriter会使用PartitionedAppendOnlyMap或PartitionedPariBuffer在内存中进行排序,如果超过内存限制,会溢写到文件中,在全局输出有序文件的时候,对之前的所有输出文件和当前内存中的数据进行全局归并排序,对key相同的元素会使用定义的function进行聚合,入口为write()方法: overridedefwrite(records:Iterator[Product2[K,V]]):Unit= {// 创建一个外部排序器,如果map端有预聚合,就传入aggregator和keyOrdering,否则不需要传入sorter =if(dep.mapSideCombine) {newExternalSorter[K,V,C](      context, dep.aggregator,Some(dep.partitioner), dep.keyOrdering, dep.serializer)  }else{// In this case we pass neither an aggregator nor an ordering to the sorter, because we don"t// care whether the keys get sorted in each partition; that will be done on the reduce side// if the operation being run is sortByKey.newExternalSorter[K,V,V](      context, aggregator =None,Some(dep.partitioner), ordering =None, dep.serializer)  }// 将数据放入ExternalSorter进行排序sorter.insertAll(records)// Don"t bother including the time to open the merged output file in the shuffle write time,// because it just opens a single file, so is typically too fast to measure accurately// (see SPARK-3570).// 创建一个输出WrtiervalmapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(    dep.shuffleId, mapId, dep.partitioner.numPartitions)// 将外部排序的数据写入Writersorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)valpartitionLengths = mapOutputWriter.commitAllPartitions()// 更新mapstatusmapStatus =MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)} insertAll()方法: definsertAll(records:Iterator[Product2[K,V]]):Unit= {//TODO:stop combining if we find that the reduction factor isn"t highvalshouldCombine = aggregator.isDefined// 是否需要map端聚合if(shouldCombine) {// Combine values in-memory first using our AppendOnlyMap// 使用AppendOnlyMap在内存中聚合values// 获取mergeValue()函数,将新值合并到当前聚合结果中valmergeValue = aggregator.get.mergeValue// 获取createCombiner()函数,创建聚合初始值valcreateCombiner = aggregator.get.createCombinervarkv:Product2[K,V] =null// 如果一个key当前有聚合值,则合并,如果没有创建初始值valupdate = (hadValue:Boolean, oldValue:C) =>{if(hadValue) mergeValue(oldValue, kv._2)elsecreateCombiner(kv._2)    }// 遍历while(records.hasNext) {// 增加读取记录数addElementsRead()      kv = records.next()// map为PartitionedAppendOnlyMap,将分区和key作为key,聚合值作为valuemap.changeValue((getPartition(kv._1), kv._1), update)// 是否需要溢写到磁盘maybeSpillCollection(usingMap =true)    }// 如果不需要map端聚合}else{// Stick values into our bufferwhile(records.hasNext) {      addElementsRead()valkv = records.next()// buffer为PartitionedPairBuffer,将分区和key加进去buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])// 是否需要溢写到磁盘maybeSpillCollection(usingMap =false)    }  }} 该方法主要是判断在插入数据时,是否需要在map端进行预聚合,分别采用两种数据结构来保存 maybeSpillCollection()方法里面会调用maybeSpill()方法检查是否需要溢写,如果发生溢写,重新构造一个map或者buffer结构从头开始缓存,如下: privatedefmaybeSpillCollection(usingMap:Boolean):Unit= {varestimatedSize =0Lif(usingMap) {    estimatedSize = map.estimateSize()// 判断是否需要溢写if(maybeSpill(map, estimatedSize)) {      map =newPartitionedAppendOnlyMap[K,C]    }  }else{    estimatedSize = buffer.estimateSize()// 判断是否需要溢写if(maybeSpill(buffer, estimatedSize)) {      buffer =newPartitionedPairBuffer[K,C]    }  }if(estimatedSize >_peakMemoryUsedBytes) {    _peakMemoryUsedBytes = estimatedSize  }}protecteddefmaybeSpill(collection:C, currentMemory:Long):Boolean= {varshouldSpill =false// 如果读取的记录数是32的倍数,并且预估map或者buffer内存占用大于默认的5m阈值if(elementsRead %32==0&& currentMemory >= myMemoryThreshold) {// Claim up to double our current memory from the shuffle memory pool// 尝试申请2*currentMemory-5m的内存valamountToRequest =2* currentMemory - myMemoryThresholdvalgranted = acquireMemory(amountToRequest)// 更新阈值myMemoryThreshold += granted// If we were granted too little memory to grow further (either tryToAcquire returned 0,// or we already had more memory than myMemoryThreshold), spill the current collection// 判断,如果还是不够,确定溢写shouldSpill = currentMemory >= myMemoryThreshold    }// 如果shouldSpill为false,但是读取的记录数大于Integer.MAX_VALUE,也是需要溢写shouldSpill = shouldSpill || _elementsRead >numElementsForceSpillThreshold// Actually spillif(shouldSpill) {// 溢写次数+1_spillCount +=1logSpillage(currentMemory)// 溢写缓存的集合spill(collection)      _elementsRead =0_memoryBytesSpilled += currentMemory// 释放内存releaseMemory()    }    shouldSpill  } maybeSpill()方法里面会调用spill()进行溢写,如下: overrideprotected[this]defspill(collection:WritablePartitionedPairCollection[K,C]):Unit= {// 根据给定的比较器进行排序,返回排序结果的迭代器valinMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)// 将迭代器中的数据溢写到磁盘文件中valspillFile = spillMemoryIteratorToDisk(inMemoryIterator)// ArrayBuffer记录所有溢写的文件spills += spillFile  } spillMemoryIteratorToDisk()方法如下: private[this]defspillMemoryIteratorToDisk(inMemoryIterator:WritablePartitionedIterator)    :SpilledFile= {// Because these files may be read during shuffle, their compression must be controlled by// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use// createTempShuffleBlock here; see SPARK-3426 for more context.// 创建一个临时块val(blockId, file) = diskBlockManager.createTempShuffleBlock()// These variables are reset after each flushvarobjectsWritten:Long=0valspillMetrics:ShuffleWriteMetrics=newShuffleWriteMetrics// 创建溢写文件的DiskBlockObjectWritervalwriter:DiskBlockObjectWriter=    blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)// List of batch sizes (bytes) in the order they are written to disk// 记录写入批次大小valbatchSizes =newArrayBuffer[Long]// How many elements we have in each partition// 记录每个分区条数valelementsPerPartition =newArray[Long](numPartitions)// Flush the disk writer"s contents to disk, and update relevant variables.// The writer is committed at the end of this process.// 将内存中的数据按批次刷写到磁盘中defflush():Unit= {valsegment = writer.commitAndGet()    batchSizes += segment.length    _diskBytesSpilled += segment.length    objectsWritten =0}varsuccess =falsetry{// 遍历map或者buffer中的记录while(inMemoryIterator.hasNext) {valpartitionId = inMemoryIterator.nextPartition()      require(partitionId >=0&& partitionId< numPartitions,s"partition Id:${partitionId}should be in the range [0,${numPartitions})")// 写入并更新计数值inMemoryIterator.writeNext(writer)      elementsPerPartition(partitionId) +=1objectsWritten +=1// 写入条数达到10000条时,将这批刷写到磁盘if(objectsWritten == serializerBatchSize) {        flush()      }    }// 遍历完以后,将剩余的刷写到磁盘if(objectsWritten >0) {      flush()    }else{      writer.revertPartialWritesAndClose()    }    success =true}finally{if(success) {      writer.close()    }else{// This code path only happens if an exception was thrown above before we set success;// close our stuff and let the exception be thrown furtherwriter.revertPartialWritesAndClose()if(file.exists()) {if(!file.delete()) {          logWarning(s"Error deleting${file}")        }      }    }  }// 返回溢写文件SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)} 接下来就是排序合并操作,调用ExternalSorter.writePartitionedMapOutput()方法: defwritePartitionedMapOutput(    shuffleId:Int,    mapId:Long,    mapOutputWriter:ShuffleMapOutputWriter):Unit= {varnextPartitionId =0// 如果没有发生溢写if(spills.isEmpty) {// Case where we only have in-memory datavalcollection =if(aggregator.isDefined) mapelsebuffer// 根据指定的比较器进行排序valit = collection.destructiveSortedWritablePartitionedIterator(comparator)while(it.hasNext()) {valpartitionId = it.nextPartition()varpartitionWriter:ShufflePartitionWriter=nullvarpartitionPairsWriter:ShufflePartitionPairsWriter=nullTryUtils.tryWithSafeFinally {        partitionWriter = mapOutputWriter.getPartitionWriter(partitionId)valblockId =ShuffleBlockId(shuffleId, mapId, partitionId)        partitionPairsWriter =newShufflePartitionPairsWriter(          partitionWriter,          serializerManager,          serInstance,          blockId,          context.taskMetrics().shuffleWriteMetrics)// 将分区内的数据依次取出while(it.hasNext && it.nextPartition() == partitionId) {          it.writeNext(partitionPairsWriter)        }      } {if(partitionPairsWriter !=null) {          partitionPairsWriter.close()        }      }      nextPartitionId = partitionId +1}// 如果发生溢写,将溢写文件和缓存数据进行归并排序,排序完成后按照分区依次写入ShufflePartitionPairsWriter}else{// We must perform merge-sort; get an iterator by partition and write everything directly.// 这里会进行归并排序for((id, elements)<-this.partitionedIterator) {valblockId =ShuffleBlockId(shuffleId, mapId, id)varpartitionWriter:ShufflePartitionWriter=nullvarpartitionPairsWriter:ShufflePartitionPairsWriter=nullTryUtils.tryWithSafeFinally {        partitionWriter = mapOutputWriter.getPartitionWriter(id)        partitionPairsWriter =newShufflePartitionPairsWriter(          partitionWriter,          serializerManager,          serInstance,          blockId,          context.taskMetrics().shuffleWriteMetrics)if(elements.hasNext) {for(elem<- elements) {            partitionPairsWriter.write(elem._1, elem._2)          }        }      } {if(partitionPairsWriter !=null) {          partitionPairsWriter.close()        }      }      nextPartitionId = id +1}  }  context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)  context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)  context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)} partitionedIterator()方法: defpartitionedIterator:Iterator[(Int,Iterator[Product2[K,C]])] = {valusingMap = aggregator.isDefinedvalcollection:WritablePartitionedPairCollection[K,C] =if(usingMap) mapelsebufferif(spills.isEmpty) {// Special case: if we have only in-memory data, we don"t need to merge streams, and perhaps// we don"t even need to sort by anything other than partition ID// 如果没有溢写,并且没有排序,只按照分区id排序if(ordering.isEmpty) {// The user hasn"t requested sorted keys, so only sort by partition ID, not keygroupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))// 如果没有溢写但是排序,先按照分区id排序,再按key排序}else{// We do need to sort by both partition ID and keygroupByPartition(destructiveIterator(        collection.partitionedDestructiveSortedIterator(Some(keyComparator))))    }  }else{// Merge spilled and in-memory data// 如果有溢写,就将溢写文件和内存中的数据归并排序merge(spills, destructiveIterator(      collection.partitionedDestructiveSortedIterator(comparator)))  }} 归并方法如下: privatedefmerge(spills:Seq[SpilledFile], inMemory:Iterator[((Int,K),C)])    :Iterator[(Int,Iterator[Product2[K,C]])] = {// 读取溢写文件valreaders = spills.map(newSpillReader(_))valinMemBuffered = inMemory.buffered// 遍历分区(0until numPartitions).iterator.map { p =>valinMemIterator =newIteratorForPartition(p, inMemBuffered)// 合并溢写文件和内存中的数据valiterators = readers.map(_.readNextPartition()) ++Seq(inMemIterator)// 如果有聚合逻辑,按分区聚合,对key按照keyComparator排序if(aggregator.isDefined) {// Perform partial aggregation across partitions(p, mergeWithAggregation(        iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))// 如果没有聚合,但是有排序逻辑,按照ordering做归并}elseif(ordering.isDefined) {// No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);// sort the elements without trying to merge them(p, mergeSort(iterators, ordering.get))// 什么都没有直接归并}else{      (p, iterators.iterator.flatten)    }  }} 在write()方法中调用commitAllPartitions()方法输出数据,其中调用writeIndexFileAndCommit()方法写出数据和索引文件,如下:

好了,文章到这里就结束啦,如果本次分享的Spark Shuffle工作原理及源码深度解析和问题对您有所帮助,还望关注下本站哦!

用户评论

淡抹烟熏妆丶

看这标题,感觉是要深入了解 Spark 的 shuffle 机制吧!

    有6位网友表示赞同!

无所谓

想学习Spark框架背后的原理,这篇文章看起来挺不错的

    有5位网友表示赞同!

秒淘你心窝

最近在学习数据处理框架,这个机制很重要,源码解析会更有理解力吧。

    有13位网友表示赞同!

一纸愁肠。

想要知道Spark Shuffle是如何实现的,这篇帖子肯定能让我的代码更深入

    有16位网友表示赞同!

花容月貌

对大数据的处理方式一直很感兴趣,希望能从源码中了解它的运作流程

    有6位网友表示赞同!

£烟消云散

学习spark优化技术是一个不错的方向啊,这篇文章能提供很多实际案例吧!

    有17位网友表示赞同!

黑夜漫长

感觉Spark Shuffle机制在数据框架领域很重要,解析源码可以让我更全面地认识它。

    有10位网友表示赞同!

♂你那刺眼的温柔

最近在项目中遇到了Shuffle的问题,希望这篇分析能给我一些启发

    有9位网友表示赞同!

迷路的男人

对于新手来说,源码解析确实比较困难,但这篇文可能会有详细的图解和解释吧!

    有13位网友表示赞同!

封心锁爱

很佩服能把Spark源码研究透彻的人,希望可以从中学习到一些经验。

    有14位网友表示赞同!

孤者何惧

希望能看到源码中的关键逻辑和实现细节,方便我参考学习。

    有10位网友表示赞同!

﹏櫻之舞﹏

感觉阅读源码解析的过程很有挑战性,但也能让我更加清晰地理解技术的精髓

    有10位网友表示赞同!

◆残留德花瓣

期待这篇文能揭开Spark Shuffle机制的神秘面纱!

    有9位网友表示赞同!

此生一诺

平时学习代码更多是通过官方文档和博客,这种源码级别的分析还是比较少见的

    有19位网友表示赞同!

雪花ミ飞舞

想要深入Spark的底层实现,源码解析绝对是最好的途径

    有8位网友表示赞同!

安好如初

希望作者能够用通俗易懂的方式讲解,让即使没有太多经验的人也能理解。

    有17位网友表示赞同!

枫无痕

学习了Shuffle机制,以后可以更好地优化Spark程序的速度和效率吧!

    有7位网友表示赞同!

我就是这样一个人

感觉Spark是一个很强大的框架,源码解析能让我更全面地了解它的能力

    有14位网友表示赞同!

杰克

期待这种深入源码的分析文章越来越多,能够帮助我们学习到更多专业的知识。

    有8位网友表示赞同!

【Spark Shuffle工作原理及源码深度解析】相关文章:

1.蛤蟆讨媳妇【哈尼族民间故事】

2.米颠拜石

3.王羲之临池学书

4.清代敢于创新的“浓墨宰相”——刘墉

5.“巧取豪夺”的由来--米芾逸事

6.荒唐洁癖 惜砚如身(米芾逸事)

7.拜石为兄--米芾逸事

8.郑板桥轶事十则

9.王献之被公主抢亲后的悲惨人生

10.史上真实张三丰:在棺材中竟神奇复活