大家好,关于Spark Shuffle工作原理及源码深度解析很多朋友都还不太明白,不过没关系,因为今天小编就来为大家分享关于的知识点,相信应该可以解决大家的一些困惑和问题,如果碰巧可以解决您的问题,还望关注下本站哦,希望对各位有所帮助!
//让用户指定随机播放管理器的短名称valshortShuffleMgrNames=Map("sort"-classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,"tungsten-sort"-classOf[org.apache.spark.shuffle .sort.SortShuffleManager].getName)valshuffleMgrName=conf.get(config.SHUFFLE_MANAGER)valshuffleMgrClass=ShortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)valshuffleManager=instantiateClass[ShuffleManager](shuffleMgrClass)
在这里你可以看到有两种洗牌,包括排序和钨排序。 ShuffleManager是通过反射创建的。 ShuffleManager 是一个特质。核心方法如下:
私人[spark]traitShuffleManager{/**
* 注册一个shuffle返回句柄
*/defregisterShuffle[K,V,C]( shuffleId:Int, dependency:ShuffleDependency[K,V,C]):ShuffleHandle/** 根据给定分区获取一个Writer,当executors执行map任务时调用*/defgetWriter[K,V ](handle:ShuffleHandle,mapId:Long,context:TaskContext,metrics:ShuffleWriteMetricsReporter):ShuffleWriter[K,V]/**
* 根据reduce分区范围获取Reader,执行器执行reduce任务时调用
*/defgetReader [K,C](handle:ShuffleHandle,startPartition:Int,endPartition:Int,context:TaskContext,metrics:ShuffleReadMetricsReporter):ShuffleReader [K,C] .}
2.SortShuffleManager
SortShuffleManager是ShuffleManager的唯一实现类。上述三种方法的实现如下:
2.1 注册Shuffle
/**
* 获取一个[[ShuffleHandle]]传递给任务。
*/overridedefregisterShuffle[K,V,C]( shuffleId:Int, dependency:ShuffleDependency[K,V,C]):ShuffleHandle={//1.首先检查是否符合BypassMergeSortif(SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {//If分区数量少于spark.shuffle.sort.bypassMergeThreshold,并且我们不需要//映射端聚合,然后直接写入numPartitions文件并在最后连接//它们。这避免了两次序列化和反序列化来将溢出的文件合并在一起,这在正常的代码路径中会发生。缺点是//一次打开多个文件,因此分配给缓冲区的内存更多。 newBypassMergeSortShuffleHandle[K,V]( shuffleId, dependency.asInstanceOf[ShuffleDependency[K, V,V]])//否则,检查是否可以序列化}elseif(SortShuffleManager.canUseSerializedShuffle(dependency)) {//否则,尝试以序列化形式缓冲映射输出,因为这样效率更高:newSerializedShuffleHandle[K,V ]( shuffleId, dependency.asInstanceOf[ShuffleDependency[K,V] ,V]]) }else{//否则,buffer map以反序列化形式输出:newBaseShuffleHandle(shuffleId, dependency) } }
1.首先检查是否满足BypassMergeSort。这里需要满足两个条件。首先,当前的shuffle依赖中没有map端的聚合操作。其次,分区数量必须小于spark.shuffle.sort.bypassMergeThreshold的值。默认值为200。 如果满足这些条件, 两个条件将返回BypassMergeSortShuffleHandle 并启用旁路归并排序洗牌机制。
defshouldBypassMergeSort(conf:SparkConf, dep:ShuffleDependency[_, _, _]):Boolean={//如果需要进行地图端聚合,则无法绕过排序。if(dep.mapSideCombine) {false}else{//默认值为200valbypassMergeThreshold:Int=conf.get(config.SHUFFLE_SORT_BYPASS_MERGE_THRESHOLD) dep.partitioner.numPartitions=bypassMergeThreshold }}
2、如果不满足上述条件,检查是否满足canUseSerializedShuffle()方法。如果满足该方法中的三个条件,将返回SerializedShuffleHandle 并启用tungsten-sort shuffle 机制。
defcanUseSerializedShuffle(dependency:ShuffleDependency[_, _, _]):Boolean={valshufId=dependency.shuffleIdvalnumPartitions=dependency.partitioner.numPartitions//序列化器需要支持Relocationif(!dependency.serializer.supportsRelocationOfSerializedObjects) { log.debug(s"Can" t 使用序列化的shuffle进行shuffle$shufId因为序列化器"+s"${dependency.serializer.getClass.getName}不支持对象重定位")false//不能进行map端聚合操作}elseif(dependency.mapSideCombine ) { log.debug(s"不能使用序列化shuffle进行shuffle$shufId因为我们需要做"+s"map端聚合")false//分区数量不能大于16777215+1}elseif(numPartitions MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) { log.debug (s"不能对shuffle$shufId 使用序列化shuffle,因为它有多个"+s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODEpartitions")false}else{ log.debug(s"可以对shuffle$shufId 使用序列化shuffle" )真的}}
3、如果以上两个条件都不满足,则返回BaseShuffleHandle,并使用基本的sort shuffle机制。
2.2 获取读者
/**
* 获取一系列reduce 分区的读取器(startPartition 到endPartition-1,包括在内)。
* 通过reduce任务调用执行器。
*/overridedefgetReader [K,C](handle:ShuffleHandle,startPartition:Int,endPartition:Int,context:TaskContext,metrics:ShuffleReadMetricsReporter):ShuffleReader [K,C]={valblocksByAddress=SparkEnv.get.mapOutputTracker.getMapSizesByExecutorId(手le.shuffleId, startPartition, endPartition)newBlockStoreShuffleReader(handle.asInstanceOf [BaseShuffleHandle[K,_,C]],blocksByAddress,上下文,指标,shouldBatchFetch=canUseBatchFetch(startPartition,endPartition,context))}
这里返回BlockStoreShuffleReader
2.3 获取作家
/** 获取给定分区的写入器。由映射任务调用执行器。 */overridedefgetWriter[K,V](handle:ShuffleHandle,mapId:Long,context:TaskContext,metrics:ShuffleWriteMetricsReporter):ShuffleWriter[K,V]={valmapTaskIds=taskIdMapsForShuffle.computeIf Absent(handle .shuffleId, _=newOpenHashSet[Long](16)) mapTaskIds.synchronized { mapTaskIds.add(context.taskAttemptId()) }valenv=SparkEnv.get//获取不同的ShuffleWritehandlematch{caseunsafeShuffleHandle:SerializedShuffleHandle[K@unchecked,V@unchecked ]=newUnsafeShuffleWriter( env.blockManager, context.taskMemoryManager(), unsafeShuffleHandle, mapId, context , env.conf,指标,shuffleExecutorComponents)casebypassMergeSortHandle:BypassMergeSortShuffleHandle[K@unchecked,V@unchecked]=newBypassMergeSortShuffleWriter( env.blockManager,bypassMergeSortHandle ,mapId,env .conf,指标,shuffleExecutorComponents)caseother:BaseShuffleHandle[ K@未选中,V@未选中,_]=newSortShuffleWriter(shuffleBlockResolver, 其他, mapId, context, shuffleExecutorComponents) }}
这里会根据句柄得到不同的ShuffleWrite。如果是SerializedShuffleHandle,则使用UnsafeShuffleWriter。如果是BypassMergeSortShuffleHandle,则使用BypassMergeSortShuffleWriter。否则,使用SortShuffleWriter。
3. 三个Writer的实现
上面提到,当旁路机制开启时,会使用BypassMergeSortShuffleWriter。如果序列化器支持重定位,且map端没有聚合且分区数不大于16777215+1,满足三个条件,则使用UnsafeShuffleWriter,否则使用SortShuffleWriter。
3.1 绕过MergeSortShuffleWriter
BypassMergeSortShuffleWriter继承ShuffleWriter,用Java实现。它将map端的多个输出文件合并为一个文件并生成一个索引文件。索引记录到每个分区的起始地址。 write()方法如下:
@Overridepublic void write(Iteratorrecords)throwsIOException{ assert (partitionWriters==null);//创建新的ShuffleMapOutputWriterShuffleMapOutputWritermapOutputWriter=shuffleExecutorComponents .createMapOutputWriter(shuffleId, mapId, numPartitions);try{//如果没有数据if(!records.hasNext () ) {//返回所有分区的写入长度partitionLengths=mapOutputWriter.commitAllPartitions(); //更新mapStatus mapStatus=MapStatus$.MODULE$.apply( blockManager.shuffleServerId(),partitionLengths,mapId);返回; }finalSerializerInstanceserInstance=序列化器。 newInstance();finallong openStartTime=System.nanoTime();//创建与分区数相等的DiskBlockObjectWriter FileSegmentpartitionWriters=newDiskBlockObjectWriter[numPartitions]; partitionWriterSegments=newFileSegment[numPartitions];//对于每个分区for(int i=0; i numPartitions; i++) {//创建临时块finalTuple2 tempShuffleBlockIdPlusFile=blockManager.diskBlockManager().createTempShuffleBlock();//获取文件和id临时块的finalFilefile=tempShuffleBlockIdPlusFile._2();finalBlockIdblockId=tempShuffleBlockIdPlusFile._1();//为每个分区创建一个DiskBlockObjectWriterpartitionWriters[i]=blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); } //创建要写入的文件和创建磁盘写入器都涉及与磁盘交互,并且当我们打开许多文件时,可能会花费很长时间,因此应该//包含在随机写入时间中。//创建文件和写入文件需要大量时间,也需要计入shuffle写入时间writeMetrics中。 incWriteTime(System.nanoTime() - openStartTime);//如果有数据while(records.hasNext()) {finalProduct2 record=reports.next();finalKkey=record._1();//对于每条数据,按key 写入对应分区对应的文件partitionWriters[partitioner.getPartition(key)].write(key, record._2()); }for(int i=0; i numPartitions; i++) {try(DiskBlockObjectWriterwriter=partitionWriters[i ]) {//提交partitionWriterSegments[i]=writer.commitAndGet(); } }//将所有分区文件合并为一个文件partitionLengths=writePartitionedData(mapOutputWriter); //更新mapStatusmapStatus=MapStatus$.MODULE$.apply( blockManager .shuffleServerId(),partitionLengths,mapId); }catch(Exceptione) {try{ mapOutputWriter.abort(e); }catch(Exceptione2) { logger.error("无法在写入映射输出后中止写入器。", e2); e.addSuppressed(e2); }扔; }}
合并文件的方法writePartitionedData()如下。默认使用零拷贝方法来合并文件:
privatelong[] writePartitionedData(ShuffleMapOutputWritermapOutputWriter) throwsIOException{//跟踪分区在输出文件中开始的位置if(partitionWriters !=null) {//开始时间finallong writeStartTime=System.nanoTime();try{for(int i=0; } i numPartitions; i++) {//获取每个文件FinalFilefile=partitionWriterSegments[i].file();ShufflePartitionWriterwriter=mapOutputWriter.getPartitionWriter(i);if(file.exists()) {//采用零拷贝方法if(transferToEnabled ) {//使用WritableByteChannelWrapper 使资源关闭保持一致//该实现与UnsafeShuffleWriter.Optional MaybeOutputChannel=writer.openChannelWrapper();//这里会调用Utils.copyFileStreamNIO 方法,最后会调用FileChannel.transferTo 方法复制文件if(maybeOutputChannel .isPresent()) { writePartitionedDataWithChannel(file, MaybeOutputChannel.get()); }其他{ writePartitionedDataWithStream(文件, writer); } }else{//否则,复制流writePartitionedDataWithStream(file, writer); }if(! file.delete()) { logger.error("无法删除分区{} 的文件", i); } } } }最后{ writeMetrics.incWriteTime(System.nanoTime() - writeStartTi
me); } partitionWriters =null; }returnmapOutputWriter.commitAllPartitions();} 3.2 UnsafeShuffleWriter UnsafeShuffleWriter也是继承ShuffleWriter,用java实现,write方法如下: @Overridepublic void write(scala.collection.Iterator>records)throwsIOException{// Keep track of success so we know if we encountered an exception// We do this rather than a standard try/catch/re-throw to handle// generic throwables.// 跟踪异常boolean success =false;try{while(records.hasNext()) {// 将数据插入ShuffleExternalSorter进行外部排序insertRecordIntoSorter(records.next()); }// 合并并输出文件closeAndWriteOutput(); success =true; }finally{if(sorter !=null) {try{ sorter.cleanupResources(); }catch(Exceptione) {// Only throw this error if we won"t be masking another// error.if(success) {throwe; }else{ logger.error("In addition to a failure during writing, we failed during "+"cleanup.", e); } } } }} 这里主要有两个方法: 3.2.1 insertRecordIntoSorter() @VisibleForTestingvoid insertRecordIntoSorter(Product2 record)throwsIOException{ assert(sorter !=null);// 获取key和分区finalKkey = record._1();finalint partitionId = partitioner.getPartition(key);// 重置缓冲区serBuffer.reset();// 将key和value写入缓冲区serOutputStream.writeKey(key,OBJECT_CLASS_TAG); serOutputStream.writeValue(record._2(),OBJECT_CLASS_TAG); serOutputStream.flush();// 获取序列化数据大小finalint serializedRecordSize = serBuffer.size(); assert (serializedRecordSize >0);// 将序列化后的数据插入ShuffleExternalSorter处理sorter.insertRecord( serBuffer.getBuf(),Platform.BYTE_ARRAY_OFFSET, serializedRecordSize, partitionId);} 该方法会将数据进行序列化,并且将序列化后的数据通过insertRecord()方法插入外部排序器中,insertRecord()方法如下: public void insertRecord(ObjectrecordBase, long recordOffset, int length, int partitionId)throwsIOException{// for testsassert(inMemSorter !=null);// 如果数据条数超过溢写阈值,直接溢写磁盘if(inMemSorter.numRecords() >= numElementsForSpillThreshold) { logger.info("Spilling data because number of spilledRecords crossed the threshold "+ numElementsForSpillThreshold); spill(); }// Checks whether there is enough space to insert an additional record in to the sort pointer// array and grows the array if additional space is required. If the required space cannot be// obtained, then the in-memory data will be spilled to disk.// 检查是否有足够的空间插入额外的记录到排序指针数组中,如果需要额外的空间对数组进行扩容,如果空间不够,内存中的数据将会被溢写到磁盘上growPointerArrayIfNecessary();finalint uaoSize =UnsafeAlignedOffset.getUaoSize();// Need 4 or 8 bytes to store the record length.// 需要额外的4或8个字节存储数据长度finalint required = length + uaoSize;// 如果需要更多的内存,会想TaskMemoryManager申请新的pageacquireNewPageIfNecessary(required); assert(currentPage !=null);finalObjectbase = currentPage.getBaseObject();//Given a memory page and offset within that page, encode this address into a 64-bit long.//This address will remain valid as long as the corresponding page has not been freed.// 通过给定的内存页和偏移量,将当前数据的逻辑地址编码成一个long型finallong recordAddress = taskMemoryManager.encodePageNumberAndOffset(currentPage, pageCursor);// 写长度值UnsafeAlignedOffset.putSize(base, pageCursor, length);// 移动指针pageCursor += uaoSize;// 写数据Platform.copyMemory(recordBase, recordOffset, base, pageCursor, length);// 移动指针pageCursor += length;// 将编码的逻辑地址和分区id传给ShuffleInMemorySorter进行排序inMemSorter.insertRecord(recordAddress, partitionId);} 在这里对于数据的缓存和溢写不借助于其他高级数据结构,而是直接操作内存空间 growPointerArrayIfNecessary()方法如下: /** * Checks whether there is enough space to insert an additional record in to the sort pointer * array and grows the array if additional space is required. If the required space cannot be * obtained, then the in-memory data will be spilled to disk. */privatevoid growPointerArrayIfNecessary()throwsIOException{ assert(inMemSorter !=null);// 如果没有空间容纳新的数据if(!inMemSorter.hasSpaceForAnotherRecord()) {// 获取当前内存使用量long used = inMemSorter.getMemoryUsage();LongArrayarray;try{// could trigger spilling// 分配给缓存原来两倍的容量array = allocateArray(used /8*2); }catch(TooLargePageExceptione) {// The pointer array is too big to fix in a single page, spill.// 如果超出了一页的大小,直接溢写,溢写方法见后面// 一页的大小为128M,在PackedRecordPointer类中// static final int MAXIMUM_PAGE_SIZE_BYTES = 1<< 27; // 128 megabytesspill();return; }catch(SparkOutOfMemoryErrore) {// should have trigger spillingif(!inMemSorter.hasSpaceForAnotherRecord()) { logger.error("Unable to grow the pointer array");throwe; }return; }// check if spilling is triggered or notif(inMemSorter.hasSpaceForAnotherRecord()) {// 如果有了剩余空间,则表明没必要扩容,释放分配的空间freeArray(array); }else{// 否则把原来的数组复制到新的数组inMemSorter.expandPointerArray(array); } }} spill()方法如下: @Overridepublic long spill(long size,MemoryConsumertrigger)throwsIOException{if(trigger !=this|| inMemSorter ==null|| inMemSorter.numRecords() ==0) {return0L; } logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",Thread.currentThread().getId(),Utils.bytesToString(getMemoryUsage()), spills.size(), spills.size() >1?" times":" time");// Sorts the in-memory records and writes the sorted records to an on-disk file.// This method does not free the sort data structures.// 对内存中的数据进行排序并且将有序记录写到一个磁盘文件中,这个方法不会释放排序的数据结构writeSortedFile(false);finallong spillSize = freeMemory();// 重置ShuffleInMemorySorterinMemSorter.reset();// Reset the in-memory sorter"s pointer array only after freeing up the memory pages holding the// records. Otherwise, if the task is over allocated memory, then without freeing the memory// pages, we might not be able to get memory for the pointer array.taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);returnspillSize;} writeSortedFile()方法: privatevoid writeSortedFile(boolean isLastFile) {// This call performs the actual sort.// 返回一个排序好的迭代器finalShuffleInMemorySorter.ShuffleSorterIteratorsortedRecords = inMemSorter.getSortedIterator();// If there are no sorted records, so we don"t need to create an empty spill file.if(!sortedRecords.hasNext()) {return; }finalShuffleWriteMetricsReporterwriteMetricsToUse;// 如果为true,则为输出文件,否则为溢写文件if(isLastFile) {// We"re writing the final non-spill file, so we _do_ want to count this as shuffle bytes.writeMetricsToUse = writeMetrics; }else{// We"re spilling, so bytes written should be counted towards spill rather than write.// Create a dummy WriteMetrics object to absorb these metrics, since we don"t want to count// them towards shuffle bytes written.writeMetricsToUse =newShuffleWriteMetrics(); }// Small writes to DiskBlockObjectWriter will be fairly inefficient. Since there doesn"t seem to// be an API to directly transfer bytes from managed memory to the disk writer, we buffer// data through a byte array. This array does not need to be large enough to hold a single// record;// 创建一个字节缓冲数组,大小为1mfinalbyte[] writeBuffer =newbyte[diskWriteBufferSize];// Because this output will be read during shuffle, its compression codec must be controlled by// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use// createTempShuffleBlock here; see SPARK-3426 for more details.// 创建一个临时的shuffle blockfinalTuple2 spilledFileInfo = blockManager.diskBlockManager().createTempShuffleBlock();// 获取文件和idfinalFilefile = spilledFileInfo._2();finalTempShuffleBlockIdblockId = spilledFileInfo._1();finalSpillInfospillInfo =newSpillInfo(numPartitions, file, blockId);// Unfortunately, we need a serializer instance in order to construct a DiskBlockObjectWriter.// Our write path doesn"t actually use this serializer (since we end up calling the `write()`// OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work// around this, we pass a dummy no-op serializer.// 不做任何转换的序列化器,因为需要一个实例来构造DiskBlockObjectWriterfinalSerializerInstanceser =DummySerializerInstance.INSTANCE; int currentPartition =-1;finalFileSegmentcommittedSegment;try(DiskBlockObjectWriterwriter = blockManager.getDiskWriter(blockId, file, ser, fileBufferSizeBytes, writeMetricsToUse)) {finalint uaoSize =UnsafeAlignedOffset.getUaoSize();// 遍历while(sortedRecords.hasNext()) { sortedRecords.loadNext();finalint partition = sortedRecords.packedRecordPointer.getPartitionId(); assert (partition >= currentPartition);if(partition != currentPartition) {// Switch to the new partition// 如果切换到了新的分区,提交当前分区,并且记录当前分区大小if(currentPartition !=-1) {finalFileSegmentfileSegment = writer.commitAndGet(); spillInfo.partitionLengths[currentPartition] = fileSegment.length(); }// 然后切换到下一个分区currentPartition = partition; }// 获取指针,通过指针获取页号和偏移量finallong recordPointer = sortedRecords.packedRecordPointer.getRecordPointer();finalObjectrecordPage = taskMemoryManager.getPage(recordPointer);finallong recordOffsetInPage = taskMemoryManager.getOffsetInPage(recordPointer);// 获取剩余数据int dataRemaining =UnsafeAlignedOffset.getSize(recordPage, recordOffsetInPage);// 跳过数据前面存储的长度long recordReadPosition = recordOffsetInPage + uaoSize;// skip over record lengthwhile(dataRemaining >0) {finalint toTransfer =Math.min(diskWriteBufferSize, dataRemaining);// 将数据拷贝到缓冲数组中Platform.copyMemory( recordPage, recordReadPosition, writeBuffer,Platform.BYTE_ARRAY_OFFSET, toTransfer);// 从缓冲数组中转入DiskBlockObjectWriterwriter.write(writeBuffer,0, toTransfer);// 更新位置recordReadPosition += toTransfer;// 更新剩余数据dataRemaining -= toTransfer; } writer.recordWritten(); }// 提交committedSegment = writer.commitAndGet(); }// If `writeSortedFile()` was called from `closeAndGetSpills()` and no records were inserted,// then the file might be empty. Note that it might be better to avoid calling// writeSortedFile() in that case.// 记录溢写文件的列表if(currentPartition !=-1) { spillInfo.partitionLengths[currentPartition] = committedSegment.length(); spills.add(spillInfo); }// 如果是溢写文件,更新溢写的指标if(!isLastFile) { writeMetrics.incRecordsWritten( ((ShuffleWriteMetrics)writeMetricsToUse).recordsWritten()); taskContext.taskMetrics().incDiskBytesSpilled( ((ShuffleWriteMetrics)writeMetricsToUse).bytesWritten()); }} encodePageNumberAndOffset()方法如下: public long encodePageNumberAndOffset(MemoryBlockpage, long offsetInPage) {// 如果开启了堆外内存,偏移量为绝对地址,可能需要64位进行编码,由于页大小限制,将其减去当前页的基地址,变为相对地址if(tungstenMemoryMode ==MemoryMode.OFF_HEAP) {// In off-heap mode, an offset is an absolute address that may require a full 64 bits to// encode. Due to our page size limitation, though, we can convert this into an offset that"s// relative to the page"s base offset; this relative offset will fit in 51 bits.offsetInPage -= page.getBaseOffset(); }returnencodePageNumberAndOffset(page.pageNumber, offsetInPage);}@VisibleForTestingpublic static long encodePageNumberAndOffset(int pageNumber, long offsetInPage) { assert (pageNumber >=0) :"encodePageNumberAndOffset called with invalid page";// 高13位为页号,低51位为偏移量// 页号左移51位,再拼偏移量和上一个低51位都为1的掩码0x7FFFFFFFFFFFFLreturn(((long) pageNumber)< ShuffleInMemorySorter的insertRecord()方法如下: public void insertRecord(long recordPointer, int partitionId) {if(!hasSpaceForAnotherRecord()) {thrownewIllegalStateException("There is no space for new record"); } array.set(pos,PackedRecordPointer.packPointer(recordPointer, partitionId)); pos++;} PackedRecordPointer.packPointer()方法: public static long packPointer(long recordPointer, int partitionId) { assert (partitionId<=MAXIMUM_PARTITION_ID);// Note that without word alignment we can address 2^27 bytes = 128 megabytes per page.// Also note that this relies on some internals of how TaskMemoryManager encodes its addresses.// 将页号右移24位,和低27位拼在一起,这样逻辑地址被压缩成40位finallong pageNumber = (recordPointer &MASK_LONG_UPPER_13_BITS) >>>24;finallong compressedAddress = pageNumber | (recordPointer &MASK_LONG_LOWER_27_BITS);// 将分区号放在高24位上return(((long) partitionId)<<40) | compressedAddress;} getSortedIterator()方法: publicShuffleSorterIteratorgetSortedIterator() { int offset =0;// 使用基数排序对内存分区ID进行排序。基数排序要快得多,但是在添加指针时需要额外的内存作为保留内存if(useRadixSort) { offset =RadixSort.sort( array, pos,PackedRecordPointer.PARTITION_ID_START_BYTE_INDEX,PackedRecordPointer.PARTITION_ID_END_BYTE_INDEX,false,false);// 否则采用timSort排序}else{MemoryBlockunused =newMemoryBlock( array.getBaseObject(), array.getBaseOffset() + pos *8L, (array.size() - pos) *8L);LongArraybuffer =newLongArray(unused);Sorter sorter =newSorter<>(newShuffleSortDataFormat(buffer)); sorter.sort(array,0, pos,SORT_COMPARATOR); }returnnewShuffleSorterIterator(pos, array, offset);} 3.2.2 closeAndWriteOutput() @VisibleForTestingvoid closeAndWriteOutput()throwsIOException{ assert(sorter !=null); updatePeakMemoryUsed(); serBuffer =null; serOutputStream =null;// 获取溢写文件finalSpillInfo[] spills = sorter.closeAndGetSpills(); sorter =null;finallong[] partitionLengths;try{// 合并溢写文件partitionLengths = mergeSpills(spills); }finally{// 删除溢写文件for(SpillInfospill : spills) {if(spill.file.exists() && !spill.file.delete()) { logger.error("Error while deleting spill file {}", spill.file.getPath()); } } }// 更新mapstatusmapStatus =MapStatus$.MODULE$.apply( blockManager.shuffleServerId(), partitionLengths, mapId);} mergeSpills()方法: privatelong[] mergeSpills(SpillInfo[] spills)throwsIOException{ long[] partitionLengths;// 如果没有溢写文件,创建空的if(spills.length ==0) {finalShuffleMapOutputWritermapWriter = shuffleExecutorComponents .createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions());returnmapWriter.commitAllPartitions();// 如果只有一个溢写文件,将它合并输出}elseif(spills.length ==1) {Optional maybeSingleFileWriter = shuffleExecutorComponents.createSingleFileMapOutputWriter(shuffleId, mapId);if(maybeSingleFileWriter.isPresent()) {// Here, we don"t need to perform any metrics updates because the bytes written to this// output file would have already been counted as shuffle bytes written.partitionLengths = spills[0].partitionLengths; maybeSingleFileWriter.get().transferMapSpillFile(spills[0].file, partitionLengths); }else{ partitionLengths = mergeSpillsUsingStandardWriter(spills); }// 如果有多个,合并输出,合并的时候有NIO和BIO两种方式}else{ partitionLengths = mergeSpillsUsingStandardWriter(spills); }returnpartitionLengths;} 3.3 SortShuffleWriter SortShuffleWriter会使用PartitionedAppendOnlyMap或PartitionedPariBuffer在内存中进行排序,如果超过内存限制,会溢写到文件中,在全局输出有序文件的时候,对之前的所有输出文件和当前内存中的数据进行全局归并排序,对key相同的元素会使用定义的function进行聚合,入口为write()方法: overridedefwrite(records:Iterator[Product2[K,V]]):Unit= {// 创建一个外部排序器,如果map端有预聚合,就传入aggregator和keyOrdering,否则不需要传入sorter =if(dep.mapSideCombine) {newExternalSorter[K,V,C]( context, dep.aggregator,Some(dep.partitioner), dep.keyOrdering, dep.serializer) }else{// In this case we pass neither an aggregator nor an ordering to the sorter, because we don"t// care whether the keys get sorted in each partition; that will be done on the reduce side// if the operation being run is sortByKey.newExternalSorter[K,V,V]( context, aggregator =None,Some(dep.partitioner), ordering =None, dep.serializer) }// 将数据放入ExternalSorter进行排序sorter.insertAll(records)// Don"t bother including the time to open the merged output file in the shuffle write time,// because it just opens a single file, so is typically too fast to measure accurately// (see SPARK-3570).// 创建一个输出WrtiervalmapOutputWriter = shuffleExecutorComponents.createMapOutputWriter( dep.shuffleId, mapId, dep.partitioner.numPartitions)// 将外部排序的数据写入Writersorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)valpartitionLengths = mapOutputWriter.commitAllPartitions()// 更新mapstatusmapStatus =MapStatus(blockManager.shuffleServerId, partitionLengths, mapId)} insertAll()方法: definsertAll(records:Iterator[Product2[K,V]]):Unit= {//TODO:stop combining if we find that the reduction factor isn"t highvalshouldCombine = aggregator.isDefined// 是否需要map端聚合if(shouldCombine) {// Combine values in-memory first using our AppendOnlyMap// 使用AppendOnlyMap在内存中聚合values// 获取mergeValue()函数,将新值合并到当前聚合结果中valmergeValue = aggregator.get.mergeValue// 获取createCombiner()函数,创建聚合初始值valcreateCombiner = aggregator.get.createCombinervarkv:Product2[K,V] =null// 如果一个key当前有聚合值,则合并,如果没有创建初始值valupdate = (hadValue:Boolean, oldValue:C) =>{if(hadValue) mergeValue(oldValue, kv._2)elsecreateCombiner(kv._2) }// 遍历while(records.hasNext) {// 增加读取记录数addElementsRead() kv = records.next()// map为PartitionedAppendOnlyMap,将分区和key作为key,聚合值作为valuemap.changeValue((getPartition(kv._1), kv._1), update)// 是否需要溢写到磁盘maybeSpillCollection(usingMap =true) }// 如果不需要map端聚合}else{// Stick values into our bufferwhile(records.hasNext) { addElementsRead()valkv = records.next()// buffer为PartitionedPairBuffer,将分区和key加进去buffer.insert(getPartition(kv._1), kv._1, kv._2.asInstanceOf[C])// 是否需要溢写到磁盘maybeSpillCollection(usingMap =false) } }} 该方法主要是判断在插入数据时,是否需要在map端进行预聚合,分别采用两种数据结构来保存 maybeSpillCollection()方法里面会调用maybeSpill()方法检查是否需要溢写,如果发生溢写,重新构造一个map或者buffer结构从头开始缓存,如下: privatedefmaybeSpillCollection(usingMap:Boolean):Unit= {varestimatedSize =0Lif(usingMap) { estimatedSize = map.estimateSize()// 判断是否需要溢写if(maybeSpill(map, estimatedSize)) { map =newPartitionedAppendOnlyMap[K,C] } }else{ estimatedSize = buffer.estimateSize()// 判断是否需要溢写if(maybeSpill(buffer, estimatedSize)) { buffer =newPartitionedPairBuffer[K,C] } }if(estimatedSize >_peakMemoryUsedBytes) { _peakMemoryUsedBytes = estimatedSize }}protecteddefmaybeSpill(collection:C, currentMemory:Long):Boolean= {varshouldSpill =false// 如果读取的记录数是32的倍数,并且预估map或者buffer内存占用大于默认的5m阈值if(elementsRead %32==0&& currentMemory >= myMemoryThreshold) {// Claim up to double our current memory from the shuffle memory pool// 尝试申请2*currentMemory-5m的内存valamountToRequest =2* currentMemory - myMemoryThresholdvalgranted = acquireMemory(amountToRequest)// 更新阈值myMemoryThreshold += granted// If we were granted too little memory to grow further (either tryToAcquire returned 0,// or we already had more memory than myMemoryThreshold), spill the current collection// 判断,如果还是不够,确定溢写shouldSpill = currentMemory >= myMemoryThreshold }// 如果shouldSpill为false,但是读取的记录数大于Integer.MAX_VALUE,也是需要溢写shouldSpill = shouldSpill || _elementsRead >numElementsForceSpillThreshold// Actually spillif(shouldSpill) {// 溢写次数+1_spillCount +=1logSpillage(currentMemory)// 溢写缓存的集合spill(collection) _elementsRead =0_memoryBytesSpilled += currentMemory// 释放内存releaseMemory() } shouldSpill } maybeSpill()方法里面会调用spill()进行溢写,如下: overrideprotected[this]defspill(collection:WritablePartitionedPairCollection[K,C]):Unit= {// 根据给定的比较器进行排序,返回排序结果的迭代器valinMemoryIterator = collection.destructiveSortedWritablePartitionedIterator(comparator)// 将迭代器中的数据溢写到磁盘文件中valspillFile = spillMemoryIteratorToDisk(inMemoryIterator)// ArrayBuffer记录所有溢写的文件spills += spillFile } spillMemoryIteratorToDisk()方法如下: private[this]defspillMemoryIteratorToDisk(inMemoryIterator:WritablePartitionedIterator) :SpilledFile= {// Because these files may be read during shuffle, their compression must be controlled by// spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use// createTempShuffleBlock here; see SPARK-3426 for more context.// 创建一个临时块val(blockId, file) = diskBlockManager.createTempShuffleBlock()// These variables are reset after each flushvarobjectsWritten:Long=0valspillMetrics:ShuffleWriteMetrics=newShuffleWriteMetrics// 创建溢写文件的DiskBlockObjectWritervalwriter:DiskBlockObjectWriter= blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, spillMetrics)// List of batch sizes (bytes) in the order they are written to disk// 记录写入批次大小valbatchSizes =newArrayBuffer[Long]// How many elements we have in each partition// 记录每个分区条数valelementsPerPartition =newArray[Long](numPartitions)// Flush the disk writer"s contents to disk, and update relevant variables.// The writer is committed at the end of this process.// 将内存中的数据按批次刷写到磁盘中defflush():Unit= {valsegment = writer.commitAndGet() batchSizes += segment.length _diskBytesSpilled += segment.length objectsWritten =0}varsuccess =falsetry{// 遍历map或者buffer中的记录while(inMemoryIterator.hasNext) {valpartitionId = inMemoryIterator.nextPartition() require(partitionId >=0&& partitionId< numPartitions,s"partition Id:${partitionId}should be in the range [0,${numPartitions})")// 写入并更新计数值inMemoryIterator.writeNext(writer) elementsPerPartition(partitionId) +=1objectsWritten +=1// 写入条数达到10000条时,将这批刷写到磁盘if(objectsWritten == serializerBatchSize) { flush() } }// 遍历完以后,将剩余的刷写到磁盘if(objectsWritten >0) { flush() }else{ writer.revertPartialWritesAndClose() } success =true}finally{if(success) { writer.close() }else{// This code path only happens if an exception was thrown above before we set success;// close our stuff and let the exception be thrown furtherwriter.revertPartialWritesAndClose()if(file.exists()) {if(!file.delete()) { logWarning(s"Error deleting${file}") } } } }// 返回溢写文件SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)} 接下来就是排序合并操作,调用ExternalSorter.writePartitionedMapOutput()方法: defwritePartitionedMapOutput( shuffleId:Int, mapId:Long, mapOutputWriter:ShuffleMapOutputWriter):Unit= {varnextPartitionId =0// 如果没有发生溢写if(spills.isEmpty) {// Case where we only have in-memory datavalcollection =if(aggregator.isDefined) mapelsebuffer// 根据指定的比较器进行排序valit = collection.destructiveSortedWritablePartitionedIterator(comparator)while(it.hasNext()) {valpartitionId = it.nextPartition()varpartitionWriter:ShufflePartitionWriter=nullvarpartitionPairsWriter:ShufflePartitionPairsWriter=nullTryUtils.tryWithSafeFinally { partitionWriter = mapOutputWriter.getPartitionWriter(partitionId)valblockId =ShuffleBlockId(shuffleId, mapId, partitionId) partitionPairsWriter =newShufflePartitionPairsWriter( partitionWriter, serializerManager, serInstance, blockId, context.taskMetrics().shuffleWriteMetrics)// 将分区内的数据依次取出while(it.hasNext && it.nextPartition() == partitionId) { it.writeNext(partitionPairsWriter) } } {if(partitionPairsWriter !=null) { partitionPairsWriter.close() } } nextPartitionId = partitionId +1}// 如果发生溢写,将溢写文件和缓存数据进行归并排序,排序完成后按照分区依次写入ShufflePartitionPairsWriter}else{// We must perform merge-sort; get an iterator by partition and write everything directly.// 这里会进行归并排序for((id, elements)<-this.partitionedIterator) {valblockId =ShuffleBlockId(shuffleId, mapId, id)varpartitionWriter:ShufflePartitionWriter=nullvarpartitionPairsWriter:ShufflePartitionPairsWriter=nullTryUtils.tryWithSafeFinally { partitionWriter = mapOutputWriter.getPartitionWriter(id) partitionPairsWriter =newShufflePartitionPairsWriter( partitionWriter, serializerManager, serInstance, blockId, context.taskMetrics().shuffleWriteMetrics)if(elements.hasNext) {for(elem<- elements) { partitionPairsWriter.write(elem._1, elem._2) } } } {if(partitionPairsWriter !=null) { partitionPairsWriter.close() } } nextPartitionId = id +1} } context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled) context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled) context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)} partitionedIterator()方法: defpartitionedIterator:Iterator[(Int,Iterator[Product2[K,C]])] = {valusingMap = aggregator.isDefinedvalcollection:WritablePartitionedPairCollection[K,C] =if(usingMap) mapelsebufferif(spills.isEmpty) {// Special case: if we have only in-memory data, we don"t need to merge streams, and perhaps// we don"t even need to sort by anything other than partition ID// 如果没有溢写,并且没有排序,只按照分区id排序if(ordering.isEmpty) {// The user hasn"t requested sorted keys, so only sort by partition ID, not keygroupByPartition(destructiveIterator(collection.partitionedDestructiveSortedIterator(None)))// 如果没有溢写但是排序,先按照分区id排序,再按key排序}else{// We do need to sort by both partition ID and keygroupByPartition(destructiveIterator( collection.partitionedDestructiveSortedIterator(Some(keyComparator)))) } }else{// Merge spilled and in-memory data// 如果有溢写,就将溢写文件和内存中的数据归并排序merge(spills, destructiveIterator( collection.partitionedDestructiveSortedIterator(comparator))) }} 归并方法如下: privatedefmerge(spills:Seq[SpilledFile], inMemory:Iterator[((Int,K),C)]) :Iterator[(Int,Iterator[Product2[K,C]])] = {// 读取溢写文件valreaders = spills.map(newSpillReader(_))valinMemBuffered = inMemory.buffered// 遍历分区(0until numPartitions).iterator.map { p =>valinMemIterator =newIteratorForPartition(p, inMemBuffered)// 合并溢写文件和内存中的数据valiterators = readers.map(_.readNextPartition()) ++Seq(inMemIterator)// 如果有聚合逻辑,按分区聚合,对key按照keyComparator排序if(aggregator.isDefined) {// Perform partial aggregation across partitions(p, mergeWithAggregation( iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))// 如果没有聚合,但是有排序逻辑,按照ordering做归并}elseif(ordering.isDefined) {// No aggregator given, but we have an ordering (e.g. used by reduce tasks in sortByKey);// sort the elements without trying to merge them(p, mergeSort(iterators, ordering.get))// 什么都没有直接归并}else{ (p, iterators.iterator.flatten) } }} 在write()方法中调用commitAllPartitions()方法输出数据,其中调用writeIndexFileAndCommit()方法写出数据和索引文件,如下:好了,文章到这里就结束啦,如果本次分享的Spark Shuffle工作原理及源码深度解析和问题对您有所帮助,还望关注下本站哦!
【Spark Shuffle工作原理及源码深度解析】相关文章:
2.米颠拜石
3.王羲之临池学书
8.郑板桥轶事十则
用户评论
看这标题,感觉是要深入了解 Spark 的 shuffle 机制吧!
有6位网友表示赞同!
想学习Spark框架背后的原理,这篇文章看起来挺不错的
有5位网友表示赞同!
最近在学习数据处理框架,这个机制很重要,源码解析会更有理解力吧。
有13位网友表示赞同!
想要知道Spark Shuffle是如何实现的,这篇帖子肯定能让我的代码更深入
有16位网友表示赞同!
对大数据的处理方式一直很感兴趣,希望能从源码中了解它的运作流程
有6位网友表示赞同!
学习spark优化技术是一个不错的方向啊,这篇文章能提供很多实际案例吧!
有17位网友表示赞同!
感觉Spark Shuffle机制在数据框架领域很重要,解析源码可以让我更全面地认识它。
有10位网友表示赞同!
最近在项目中遇到了Shuffle的问题,希望这篇分析能给我一些启发
有9位网友表示赞同!
对于新手来说,源码解析确实比较困难,但这篇文可能会有详细的图解和解释吧!
有13位网友表示赞同!
很佩服能把Spark源码研究透彻的人,希望可以从中学习到一些经验。
有14位网友表示赞同!
希望能看到源码中的关键逻辑和实现细节,方便我参考学习。
有10位网友表示赞同!
感觉阅读源码解析的过程很有挑战性,但也能让我更加清晰地理解技术的精髓
有10位网友表示赞同!
期待这篇文能揭开Spark Shuffle机制的神秘面纱!
有9位网友表示赞同!
平时学习代码更多是通过官方文档和博客,这种源码级别的分析还是比较少见的
有19位网友表示赞同!
想要深入Spark的底层实现,源码解析绝对是最好的途径
有8位网友表示赞同!
希望作者能够用通俗易懂的方式讲解,让即使没有太多经验的人也能理解。
有17位网友表示赞同!
学习了Shuffle机制,以后可以更好地优化Spark程序的速度和效率吧!
有7位网友表示赞同!
感觉Spark是一个很强大的框架,源码解析能让我更全面地了解它的能力
有14位网友表示赞同!
期待这种深入源码的分析文章越来越多,能够帮助我们学习到更多专业的知识。
有8位网友表示赞同!