/*
 * Decompiled with CFR 0.152.
 */
package com.facebook.presto.spiller;

import com.facebook.presto.execution.buffer.PagesSerde;
import com.facebook.presto.execution.buffer.PagesSerdeUtil;
import com.facebook.presto.execution.buffer.SerializedPage;
import com.facebook.presto.memory.LocalMemoryContext;
import com.facebook.presto.operator.SpillContext;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spiller.SingleStreamSpiller;
import com.facebook.presto.spiller.SpillerStats;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.airlift.slice.InputStreamSliceInput;
import io.airlift.slice.OutputStreamSliceOutput;
import io.airlift.slice.SliceInput;
import io.airlift.slice.SliceOutput;
import java.io.Closeable;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Iterator;
import java.util.Objects;
import javax.annotation.concurrent.NotThreadSafe;

@NotThreadSafe
public class FileSingleStreamSpiller
implements SingleStreamSpiller {
    @VisibleForTesting
    static final int BUFFER_SIZE = 4096;
    private final Path targetFileName;
    private final Closer closer = Closer.create();
    private final PagesSerde serde;
    private final SpillerStats spillerStats;
    private final SpillContext localSpillContext;
    private final LocalMemoryContext memoryContext;
    private final ListeningExecutorService executor;
    private ListenableFuture<?> spillInProgress = Futures.immediateFuture(null);

    public FileSingleStreamSpiller(PagesSerde serde, ListeningExecutorService executor, Path spillPath, SpillerStats spillerStats, SpillContext spillContext, LocalMemoryContext memoryContext) {
        this.serde = Objects.requireNonNull(serde, "serde is null");
        this.executor = Objects.requireNonNull(executor, "executor is null");
        this.spillerStats = Objects.requireNonNull(spillerStats, "spillerStats is null");
        this.localSpillContext = spillContext.newLocalSpillContext();
        this.memoryContext = Objects.requireNonNull(memoryContext, "memoryContext can not be null");
        try {
            this.targetFileName = Files.createTempFile(spillPath, "spill", ".bin", new FileAttribute[0]);
        }
        catch (IOException e) {
            throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Failed to create spill file", (Throwable)e);
        }
    }

    @Override
    public ListenableFuture<?> spill(Iterator<Page> pageIterator) {
        this.checkNoSpillInProgress();
        this.spillInProgress = this.executor.submit(() -> this.writePages(pageIterator));
        return this.spillInProgress;
    }

    @Override
    public Iterator<Page> getSpilledPages() {
        this.checkNoSpillInProgress();
        return this.readPages();
    }

    private void writePages(Iterator<Page> pageIterator) {
        try (OutputStreamSliceOutput output = new OutputStreamSliceOutput((OutputStream)new FileOutputStream(this.targetFileName.toFile(), true), 4096);){
            this.memoryContext.setBytes(4096L);
            while (pageIterator.hasNext()) {
                Page page = pageIterator.next();
                SerializedPage serializedPage = this.serde.serialize(page);
                long pageSize = serializedPage.getSizeInBytes();
                this.localSpillContext.updateBytes(pageSize);
                this.spillerStats.addToTotalSpilledBytes(pageSize);
                PagesSerdeUtil.writeSerializedPage((SliceOutput)output, serializedPage);
            }
        }
        catch (IOException | UncheckedIOException e) {
            throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Failed to spill pages", (Throwable)e);
        }
        finally {
            this.memoryContext.setBytes(0L);
        }
    }

    private Iterator<Page> readPages() {
        try {
            FileInputStream input = new FileInputStream(this.targetFileName.toFile());
            this.memoryContext.setBytes(4096L);
            this.closer.register((Closeable)input);
            this.closer.register(() -> this.memoryContext.setBytes(0L));
            return PagesSerdeUtil.readPages(this.serde, (SliceInput)new InputStreamSliceInput((InputStream)input, 4096));
        }
        catch (IOException e) {
            throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Failed to read spilled pages", (Throwable)e);
        }
    }

    @Override
    public void close() {
        this.closer.register(() -> Files.delete(this.targetFileName));
        this.closer.register((Closeable)this.localSpillContext);
        try {
            this.closer.close();
        }
        catch (Exception e) {
            throw new PrestoException((ErrorCodeSupplier)StandardErrorCode.GENERIC_INTERNAL_ERROR, "Failed to close spiller", (Throwable)e);
        }
    }

    private void checkNoSpillInProgress() {
        Preconditions.checkState((boolean)this.spillInProgress.isDone(), (Object)"spill in progress");
    }
}

