package org.apache.spark.shuffle.sort;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import javax.annotation.Nullable;
import org.apache.spark.Partitioner;
import org.apache.spark.ShuffleDependency;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.scheduler.MapStatus;
import org.apache.spark.scheduler.MapStatus$;
import org.apache.spark.serializer.Serializer;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
import org.apache.spark.shuffle.ShuffleWriter;
import org.apache.spark.storage.BlockId;
import org.apache.spark.storage.BlockManager;
import org.apache.spark.storage.DiskBlockObjectWriter;
import org.apache.spark.storage.FileSegment;
import org.apache.spark.storage.TempShuffleBlockId;
import org.apache.spark.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.spark_project.guava.annotations.VisibleForTesting;
import org.spark_project.guava.io.Closeables;
import scala.None$;
import scala.Option;
import scala.Product2;
import scala.Tuple2;
import scala.collection.Iterator;

/* loaded from: input_file:org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.class */
final class BypassMergeSortShuffleWriter<K, V> extends ShuffleWriter<K, V> {
    private static final Logger logger;
    private final int fileBufferSize;
    private final boolean transferToEnabled;
    private final int numPartitions;
    private final BlockManager blockManager;
    private final Partitioner partitioner;
    private final ShuffleWriteMetrics writeMetrics;
    private final int shuffleId;
    private final int mapId;
    private final Serializer serializer;
    private final IndexShuffleBlockResolver shuffleBlockResolver;
    private DiskBlockObjectWriter[] partitionWriters;
    private FileSegment[] partitionWriterSegments;

    @Nullable
    private MapStatus mapStatus;
    private long[] partitionLengths;
    private boolean stopping = false;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BypassMergeSortShuffleWriter(BlockManager blockManager, IndexShuffleBlockResolver indexShuffleBlockResolver, BypassMergeSortShuffleHandle<K, V> bypassMergeSortShuffleHandle, int i, TaskContext taskContext, SparkConf sparkConf) {
        this.fileBufferSize = ((int) sparkConf.getSizeAsKb("spark.shuffle.file.buffer", "32k")) * 1024;
        this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true);
        this.blockManager = blockManager;
        ShuffleDependency<K, V, V> dependency = bypassMergeSortShuffleHandle.dependency();
        this.mapId = i;
        this.shuffleId = dependency.shuffleId();
        this.partitioner = dependency.partitioner();
        this.numPartitions = this.partitioner.numPartitions();
        this.writeMetrics = taskContext.taskMetrics().shuffleWriteMetrics();
        this.serializer = dependency.serializer();
        this.shuffleBlockResolver = indexShuffleBlockResolver;
    }

    @Override // org.apache.spark.shuffle.ShuffleWriter
    public void write(Iterator<Product2<K, V>> iterator) throws IOException {
        if (!$assertionsDisabled && this.partitionWriters != null) {
            throw new AssertionError();
        }
        if (!iterator.hasNext()) {
            this.partitionLengths = new long[this.numPartitions];
            this.shuffleBlockResolver.writeIndexFileAndCommit(this.shuffleId, this.mapId, this.partitionLengths, null);
            this.mapStatus = MapStatus$.MODULE$.apply(this.blockManager.shuffleServerId(), this.partitionLengths);
            return;
        }
        SerializerInstance newInstance = this.serializer.newInstance();
        long nanoTime = System.nanoTime();
        this.partitionWriters = new DiskBlockObjectWriter[this.numPartitions];
        this.partitionWriterSegments = new FileSegment[this.numPartitions];
        for (int i = 0; i < this.numPartitions; i++) {
            Tuple2<TempShuffleBlockId, File> createTempShuffleBlock = this.blockManager.diskBlockManager().createTempShuffleBlock();
            this.partitionWriters[i] = this.blockManager.getDiskWriter((BlockId) createTempShuffleBlock._1(), (File) createTempShuffleBlock._2(), newInstance, this.fileBufferSize, this.writeMetrics);
        }
        this.writeMetrics.incWriteTime(System.nanoTime() - nanoTime);
        while (iterator.hasNext()) {
            Product2 product2 = (Product2) iterator.next();
            Object _1 = product2._1();
            this.partitionWriters[this.partitioner.getPartition(_1)].write(_1, product2._2());
        }
        for (int i2 = 0; i2 < this.numPartitions; i2++) {
            DiskBlockObjectWriter diskBlockObjectWriter = this.partitionWriters[i2];
            this.partitionWriterSegments[i2] = diskBlockObjectWriter.commitAndGet();
            diskBlockObjectWriter.close();
        }
        File tempFileWith = Utils.tempFileWith(this.shuffleBlockResolver.getDataFile(this.shuffleId, this.mapId));
        try {
            this.partitionLengths = writePartitionedFile(tempFileWith);
            this.shuffleBlockResolver.writeIndexFileAndCommit(this.shuffleId, this.mapId, this.partitionLengths, tempFileWith);
            if (tempFileWith.exists() && !tempFileWith.delete()) {
                logger.error("Error while deleting temp file {}", tempFileWith.getAbsolutePath());
            }
            this.mapStatus = MapStatus$.MODULE$.apply(this.blockManager.shuffleServerId(), this.partitionLengths);
        } catch (Throwable th) {
            if (tempFileWith.exists() && !tempFileWith.delete()) {
                logger.error("Error while deleting temp file {}", tempFileWith.getAbsolutePath());
            }
            throw th;
        }
    }

    @VisibleForTesting
    long[] getPartitionLengths() {
        return this.partitionLengths;
    }

    private long[] writePartitionedFile(File file) throws IOException {
        long[] jArr = new long[this.numPartitions];
        if (this.partitionWriters == null) {
            return jArr;
        }
        FileOutputStream fileOutputStream = new FileOutputStream(file, true);
        long nanoTime = System.nanoTime();
        for (int i = 0; i < this.numPartitions; i++) {
            try {
                File file2 = this.partitionWriterSegments[i].file();
                if (file2.exists()) {
                    FileInputStream fileInputStream = new FileInputStream(file2);
                    boolean z = true;
                    try {
                        jArr[i] = Utils.copyStream(fileInputStream, fileOutputStream, false, this.transferToEnabled);
                        z = false;
                        Closeables.close(fileInputStream, false);
                        if (!file2.delete()) {
                            logger.error("Unable to delete file for partition {}", Integer.valueOf(i));
                        }
                    } finally {
                    }
                }
            } catch (Throwable th) {
                Closeables.close(fileOutputStream, true);
                this.writeMetrics.incWriteTime(System.nanoTime() - nanoTime);
                throw th;
            }
        }
        Closeables.close(fileOutputStream, false);
        this.writeMetrics.incWriteTime(System.nanoTime() - nanoTime);
        this.partitionWriters = null;
        return jArr;
    }

    @Override // org.apache.spark.shuffle.ShuffleWriter
    public Option<MapStatus> stop(boolean z) {
        if (this.stopping) {
            return None$.empty();
        }
        this.stopping = true;
        if (z) {
            if (this.mapStatus == null) {
                throw new IllegalStateException("Cannot call stop(true) without having called write()");
            }
            return Option.apply(this.mapStatus);
        }
        if (this.partitionWriters != null) {
            try {
                for (DiskBlockObjectWriter diskBlockObjectWriter : this.partitionWriters) {
                    File revertPartialWritesAndClose = diskBlockObjectWriter.revertPartialWritesAndClose();
                    if (!revertPartialWritesAndClose.delete()) {
                        logger.error("Error while deleting file {}", revertPartialWritesAndClose.getAbsolutePath());
                    }
                }
            } finally {
                this.partitionWriters = null;
            }
        }
        return None$.empty();
    }

    static {
        $assertionsDisabled = !BypassMergeSortShuffleWriter.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(BypassMergeSortShuffleWriter.class);
    }
}
