/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.filesystem;

import java.io.IOException;
import java.util.Arrays;
import java.util.UUID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FsCheckpointStreamFactory
implements CheckpointStreamFactory {
    private static final Logger LOG = LoggerFactory.getLogger(FsCheckpointStreamFactory.class);
    private static final int MAX_FILE_STATE_THRESHOLD = 0x100000;
    private static final int DEFAULT_WRITE_BUFFER_SIZE = 4096;
    private final int fileStateThreshold;
    private final Path checkpointDirectory;
    private final FileSystem filesystem;

    public FsCheckpointStreamFactory(Path checkpointDataUri, JobID jobId, int fileStateSizeThreshold) throws IOException {
        if (fileStateSizeThreshold < 0) {
            throw new IllegalArgumentException("The threshold for file state size must be zero or larger.");
        }
        if (fileStateSizeThreshold > 0x100000) {
            throw new IllegalArgumentException("The threshold for file state size cannot be larger than 1048576");
        }
        this.fileStateThreshold = fileStateSizeThreshold;
        Path basePath = checkpointDataUri;
        this.filesystem = basePath.getFileSystem();
        this.checkpointDirectory = this.createBasePath(this.filesystem, basePath, jobId);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Initialed file stream factory to URI {}.", (Object)this.checkpointDirectory);
        }
    }

    @Override
    public void close() throws Exception {
    }

    @Override
    public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
        this.checkFileSystemInitialized();
        Path checkpointDir = this.createCheckpointDirPath(this.checkpointDirectory, checkpointID);
        int bufferSize = Math.max(4096, this.fileStateThreshold);
        return new FsCheckpointStateOutputStream(checkpointDir, this.filesystem, bufferSize, this.fileStateThreshold);
    }

    private void checkFileSystemInitialized() throws IllegalStateException {
        if (this.filesystem == null || this.checkpointDirectory == null) {
            throw new IllegalStateException("filesystem has not been re-initialized after deserialization");
        }
    }

    protected Path createBasePath(FileSystem fs, Path checkpointDirectory, JobID jobID) throws IOException {
        Path dir = new Path(checkpointDirectory, jobID.toString());
        fs.mkdirs(dir);
        return dir;
    }

    protected Path createCheckpointDirPath(Path checkpointDirectory, long checkpointID) {
        return new Path(checkpointDirectory, "chk-" + checkpointID);
    }

    public String toString() {
        return "File Stream Factory @ " + this.checkpointDirectory;
    }

    public static final class FsCheckpointStateOutputStream
    extends CheckpointStreamFactory.CheckpointStateOutputStream {
        private final byte[] writeBuffer;
        private int pos;
        private FSDataOutputStream outStream;
        private final int localStateThreshold;
        private final Path basePath;
        private final FileSystem fs;
        private Path statePath;
        private volatile boolean closed;

        public FsCheckpointStateOutputStream(Path basePath, FileSystem fs, int bufferSize, int localStateThreshold) {
            if (bufferSize < localStateThreshold) {
                throw new IllegalArgumentException();
            }
            this.basePath = basePath;
            this.fs = fs;
            this.writeBuffer = new byte[bufferSize];
            this.localStateThreshold = localStateThreshold;
        }

        public void write(int b) throws IOException {
            if (this.pos >= this.writeBuffer.length) {
                this.flush();
            }
            this.writeBuffer[this.pos++] = (byte)b;
        }

        public void write(byte[] b, int off, int len) throws IOException {
            if (len < this.writeBuffer.length / 2) {
                int remaining = this.writeBuffer.length - this.pos;
                if (len > remaining) {
                    System.arraycopy(b, off, this.writeBuffer, this.pos, remaining);
                    off += remaining;
                    len -= remaining;
                    this.pos += remaining;
                    this.flush();
                }
                System.arraycopy(b, off, this.writeBuffer, this.pos, len);
                this.pos += len;
            } else {
                this.flush();
                this.outStream.write(b, off, len);
            }
        }

        public long getPos() throws IOException {
            return (long)this.pos + (this.outStream == null ? 0L : this.outStream.getPos());
        }

        public void flush() throws IOException {
            if (!this.closed) {
                if (this.outStream == null) {
                    this.createStream();
                }
                if (this.pos > 0) {
                    this.outStream.write(this.writeBuffer, 0, this.pos);
                    this.pos = 0;
                }
            } else {
                throw new IOException("closed");
            }
        }

        public void sync() throws IOException {
            this.outStream.sync();
        }

        public boolean isClosed() {
            return this.closed;
        }

        public void close() {
            if (!this.closed) {
                this.closed = true;
                this.pos = this.writeBuffer.length;
                if (this.outStream != null) {
                    try {
                        this.outStream.close();
                    }
                    catch (Throwable throwable) {
                        LOG.warn("Could not close the state stream for {}.", (Object)this.statePath, (Object)throwable);
                    }
                    finally {
                        try {
                            this.fs.delete(this.statePath, false);
                            try {
                                FileUtils.deletePathIfEmpty((FileSystem)this.fs, (Path)this.basePath);
                            }
                            catch (Exception ignored) {
                                LOG.debug("Could not delete the parent directory {}.", (Object)this.basePath, (Object)ignored);
                            }
                        }
                        catch (Exception e) {
                            LOG.warn("Cannot delete closed and discarded state stream for {}.", (Object)this.statePath, (Object)e);
                        }
                    }
                }
            }
        }

        /*
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public StreamStateHandle closeAndGetHandle() throws IOException {
            if (this.outStream == null && this.pos == 0) {
                return null;
            }
            FsCheckpointStateOutputStream fsCheckpointStateOutputStream = this;
            synchronized (fsCheckpointStateOutputStream) {
                if (this.closed) throw new IOException("Stream has already been closed and discarded.");
                if (this.outStream == null && this.pos <= this.localStateThreshold) {
                    this.closed = true;
                    byte[] bytes = Arrays.copyOf(this.writeBuffer, this.pos);
                    this.pos = this.writeBuffer.length;
                    return new ByteStreamStateHandle(this.createStatePath().toString(), bytes);
                }
                try {
                    this.flush();
                    this.pos = this.writeBuffer.length;
                    long size = -1L;
                    try {
                        size = this.outStream.getPos();
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                    this.outStream.close();
                    FileStateHandle fileStateHandle = new FileStateHandle(this.statePath, size);
                    return fileStateHandle;
                }
                catch (Exception exception) {
                    try {
                        this.fs.delete(this.statePath, false);
                        try {
                            FileUtils.deletePathIfEmpty((FileSystem)this.fs, (Path)this.basePath);
                            throw new IOException("Could not flush and close the file system output stream to " + this.statePath + " in order to obtain the stream state handle", exception);
                        }
                        catch (Exception parentDirDeletionFailure) {
                            LOG.debug("Could not delete the parent directory {}.", (Object)this.basePath, (Object)parentDirDeletionFailure);
                            throw new IOException("Could not flush and close the file system output stream to " + this.statePath + " in order to obtain the stream state handle", exception);
                        }
                    }
                    catch (Exception deleteException) {
                        LOG.warn("Could not delete the checkpoint stream file {}.", (Object)this.statePath, (Object)deleteException);
                    }
                    throw new IOException("Could not flush and close the file system output stream to " + this.statePath + " in order to obtain the stream state handle", exception);
                }
                finally {
                    this.closed = true;
                }
            }
        }

        private Path createStatePath() {
            return new Path(this.basePath, UUID.randomUUID().toString());
        }

        private void createStream() throws IOException {
            this.fs.mkdirs(this.basePath);
            Exception latestException = null;
            for (int attempt = 0; attempt < 10; ++attempt) {
                try {
                    this.statePath = this.createStatePath();
                    this.outStream = this.fs.create(this.statePath, FileSystem.WriteMode.NO_OVERWRITE);
                    break;
                }
                catch (Exception e) {
                    latestException = e;
                    continue;
                }
            }
            if (this.outStream == null) {
                throw new IOException("Could not open output stream for state backend", latestException);
            }
        }
    }
}

