/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.core.fs.local;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.file.AtomicMoveNotSupportedException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.CommitterFromPersistRecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.fs.local.LocalRecoverable;
import org.apache.flink.util.Preconditions;

@Internal
public class LocalRecoverableFsDataOutputStream
extends CommitterFromPersistRecoverableFsDataOutputStream<LocalRecoverable> {
    private final File targetFile;
    private final File tempFile;
    private final FileChannel fileChannel;
    private final OutputStream fos;

    public LocalRecoverableFsDataOutputStream(File targetFile, File tempFile) throws IOException {
        this.targetFile = Preconditions.checkNotNull(targetFile);
        this.tempFile = Preconditions.checkNotNull(tempFile);
        this.fileChannel = FileChannel.open(tempFile.toPath(), StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
        this.fos = Channels.newOutputStream(this.fileChannel);
    }

    LocalRecoverableFsDataOutputStream(LocalRecoverable resumable) throws IOException {
        this.targetFile = Preconditions.checkNotNull(resumable.targetFile());
        this.tempFile = Preconditions.checkNotNull(resumable.tempFile());
        if (!this.tempFile.exists()) {
            throw new FileNotFoundException("File Not Found: " + this.tempFile.getAbsolutePath());
        }
        this.fileChannel = FileChannel.open(this.tempFile.toPath(), StandardOpenOption.WRITE, StandardOpenOption.APPEND);
        if (this.fileChannel.position() < resumable.offset()) {
            throw new IOException("Missing data in tmp file: " + this.tempFile.getAbsolutePath());
        }
        this.fileChannel.truncate(resumable.offset());
        this.fos = Channels.newOutputStream(this.fileChannel);
    }

    @VisibleForTesting
    LocalRecoverableFsDataOutputStream(File targetFile, File tempFile, FileChannel fileChannel, OutputStream fos) {
        this.targetFile = Preconditions.checkNotNull(targetFile);
        this.tempFile = Preconditions.checkNotNull(tempFile);
        this.fileChannel = fileChannel;
        this.fos = fos;
    }

    @Override
    public void write(int b) throws IOException {
        this.fos.write(b);
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        this.fos.write(b, off, len);
    }

    @Override
    public void flush() throws IOException {
        this.fos.flush();
    }

    @Override
    public void sync() throws IOException {
        this.fileChannel.force(true);
    }

    @Override
    public long getPos() throws IOException {
        return this.fileChannel.position();
    }

    @Override
    public LocalRecoverable persist() throws IOException {
        this.flush();
        this.sync();
        return new LocalRecoverable(this.targetFile, this.tempFile, this.getPos());
    }

    @Override
    protected RecoverableFsDataOutputStream.Committer createCommitterFromResumeRecoverable(LocalRecoverable recoverable) {
        return new LocalCommitter(recoverable);
    }

    @Override
    public void close() throws IOException {
        this.fos.close();
    }

    static class LocalCommitter
    implements RecoverableFsDataOutputStream.Committer {
        private final LocalRecoverable recoverable;

        LocalCommitter(LocalRecoverable recoverable) {
            this.recoverable = Preconditions.checkNotNull(recoverable);
        }

        @Override
        public void commit() throws IOException {
            File src = this.recoverable.tempFile();
            File dest = this.recoverable.targetFile();
            if (src.length() != this.recoverable.offset()) {
                throw new IOException("Cannot clean commit: File has trailing junk data.");
            }
            try {
                Files.move(src.toPath(), dest.toPath(), StandardCopyOption.ATOMIC_MOVE);
            }
            catch (UnsupportedOperationException | AtomicMoveNotSupportedException e) {
                if (!src.renameTo(dest)) {
                    throw new IOException("Committing file failed, could not rename " + String.valueOf(src) + " -> " + String.valueOf(dest));
                }
            }
            catch (FileAlreadyExistsException e) {
                throw new IOException("Committing file failed. Target file already exists: " + String.valueOf(dest));
            }
        }

        @Override
        public void commitAfterRecovery() throws IOException {
            File src = this.recoverable.tempFile();
            File dest = this.recoverable.targetFile();
            long expectedLength = this.recoverable.offset();
            if (src.exists()) {
                if (src.length() > expectedLength) {
                    try (FileOutputStream fos = new FileOutputStream(src, true);){
                        fos.getChannel().truncate(expectedLength);
                    }
                } else if (src.length() < expectedLength) {
                    throw new IOException("Missing data in tmp file: " + String.valueOf(src));
                }
                Files.move(src.toPath(), dest.toPath(), StandardCopyOption.ATOMIC_MOVE);
            } else if (!dest.exists()) {
                // empty if block
            }
        }

        @Override
        public RecoverableWriter.CommitRecoverable getRecoverable() {
            return this.recoverable;
        }
    }
}

