/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.core.fs;

import java.io.Closeable;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.BlockLocation;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.Path;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class LimitedConnectionsFileSystem
extends FileSystem {
    private static final Logger LOG = LoggerFactory.getLogger(LimitedConnectionsFileSystem.class);
    private final FileSystem originalFs;
    private final ReentrantLock lock;
    private final Condition available;
    private final int maxNumOpenOutputStreams;
    private final int maxNumOpenInputStreams;
    private final int maxNumOpenStreamsTotal;
    private final long streamOpenTimeoutNanos;
    private final long streamInactivityTimeoutNanos;
    @GuardedBy(value="lock")
    private final HashSet<OutStream> openOutputStreams;
    @GuardedBy(value="lock")
    private final HashSet<InStream> openInputStreams;
    @GuardedBy(value="lock")
    private int numReservedOutputStreams;
    @GuardedBy(value="lock")
    private int numReservedInputStreams;

    public LimitedConnectionsFileSystem(FileSystem originalFs, int maxNumOpenStreamsTotal) {
        this(originalFs, maxNumOpenStreamsTotal, 0L, 0L);
    }

    public LimitedConnectionsFileSystem(FileSystem originalFs, int maxNumOpenStreamsTotal, long streamOpenTimeout, long streamInactivityTimeout) {
        this(originalFs, maxNumOpenStreamsTotal, 0, 0, streamOpenTimeout, streamInactivityTimeout);
    }

    public LimitedConnectionsFileSystem(FileSystem originalFs, int maxNumOpenStreamsTotal, int maxNumOpenOutputStreams, int maxNumOpenInputStreams, long streamOpenTimeout, long streamInactivityTimeout) {
        Preconditions.checkArgument(maxNumOpenStreamsTotal >= 0, "maxNumOpenStreamsTotal must be >= 0");
        Preconditions.checkArgument(maxNumOpenOutputStreams >= 0, "maxNumOpenOutputStreams must be >= 0");
        Preconditions.checkArgument(maxNumOpenInputStreams >= 0, "maxNumOpenInputStreams must be >= 0");
        Preconditions.checkArgument(streamOpenTimeout >= 0L, "stream opening timeout must be >= 0 (0 means infinite timeout)");
        Preconditions.checkArgument(streamInactivityTimeout >= 0L, "stream inactivity timeout must be >= 0 (0 means infinite timeout)");
        this.originalFs = Preconditions.checkNotNull(originalFs, "originalFs");
        this.lock = new ReentrantLock(true);
        this.available = this.lock.newCondition();
        this.openOutputStreams = new HashSet();
        this.openInputStreams = new HashSet();
        this.maxNumOpenStreamsTotal = maxNumOpenStreamsTotal;
        this.maxNumOpenOutputStreams = maxNumOpenOutputStreams;
        this.maxNumOpenInputStreams = maxNumOpenInputStreams;
        long openTimeoutNanos = streamOpenTimeout * 1000000L;
        long inactivityTimeoutNanos = streamInactivityTimeout * 1000000L;
        this.streamOpenTimeoutNanos = openTimeoutNanos >= streamOpenTimeout ? openTimeoutNanos : Long.MAX_VALUE;
        this.streamInactivityTimeoutNanos = inactivityTimeoutNanos >= streamInactivityTimeout ? inactivityTimeoutNanos : Long.MAX_VALUE;
    }

    public int getMaxNumOpenOutputStreams() {
        return this.maxNumOpenOutputStreams;
    }

    public int getMaxNumOpenInputStreams() {
        return this.maxNumOpenInputStreams;
    }

    public int getMaxNumOpenStreamsTotal() {
        return this.maxNumOpenStreamsTotal;
    }

    public long getStreamOpenTimeout() {
        return this.streamOpenTimeoutNanos / 1000000L;
    }

    public long getStreamInactivityTimeout() {
        return this.streamInactivityTimeoutNanos / 1000000L;
    }

    public int getTotalNumberOfOpenStreams() {
        this.lock.lock();
        try {
            int n = this.numReservedOutputStreams + this.numReservedInputStreams;
            return n;
        }
        finally {
            this.lock.unlock();
        }
    }

    public int getNumberOfOpenOutputStreams() {
        this.lock.lock();
        try {
            int n = this.numReservedOutputStreams;
            return n;
        }
        finally {
            this.lock.unlock();
        }
    }

    public int getNumberOfOpenInputStreams() {
        return this.numReservedInputStreams;
    }

    @Override
    public FSDataOutputStream create(Path f, FileSystem.WriteMode overwriteMode) throws IOException {
        return this.createOutputStream(() -> this.originalFs.create(f, overwriteMode));
    }

    @Override
    @Deprecated
    public FSDataOutputStream create(Path f, boolean overwrite, int bufferSize, short replication, long blockSize) throws IOException {
        return this.createOutputStream(() -> this.originalFs.create(f, overwrite, bufferSize, replication, blockSize));
    }

    @Override
    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
        return this.createInputStream(() -> this.originalFs.open(f, bufferSize));
    }

    @Override
    public FSDataInputStream open(Path f) throws IOException {
        return this.createInputStream(() -> this.originalFs.open(f));
    }

    private FSDataOutputStream createOutputStream(SupplierWithException<FSDataOutputStream, IOException> streamOpener) throws IOException {
        SupplierWithException wrappedStreamOpener = () -> new OutStream((FSDataOutputStream)streamOpener.get(), this);
        return this.createStream(wrappedStreamOpener, this.openOutputStreams, true);
    }

    private FSDataInputStream createInputStream(SupplierWithException<FSDataInputStream, IOException> streamOpener) throws IOException {
        SupplierWithException wrappedStreamOpener = () -> new InStream((FSDataInputStream)streamOpener.get(), this);
        return this.createStream(wrappedStreamOpener, this.openInputStreams, false);
    }

    @Override
    public FileSystemKind getKind() {
        return this.originalFs.getKind();
    }

    @Override
    public boolean isDistributedFS() {
        return this.originalFs.isDistributedFS();
    }

    @Override
    public Path getWorkingDirectory() {
        return this.originalFs.getWorkingDirectory();
    }

    @Override
    public Path getHomeDirectory() {
        return this.originalFs.getHomeDirectory();
    }

    @Override
    public URI getUri() {
        return this.originalFs.getUri();
    }

    @Override
    public FileStatus getFileStatus(Path f) throws IOException {
        return this.originalFs.getFileStatus(f);
    }

    @Override
    public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long len) throws IOException {
        return this.originalFs.getFileBlockLocations(file, start, len);
    }

    @Override
    public FileStatus[] listStatus(Path f) throws IOException {
        return this.originalFs.listStatus(f);
    }

    @Override
    public boolean delete(Path f, boolean recursive) throws IOException {
        return this.originalFs.delete(f, recursive);
    }

    @Override
    public boolean mkdirs(Path f) throws IOException {
        return this.originalFs.mkdirs(f);
    }

    @Override
    public boolean rename(Path src, Path dst) throws IOException {
        return this.originalFs.rename(src, dst);
    }

    @Override
    public boolean exists(Path f) throws IOException {
        return this.originalFs.exists(f);
    }

    @Override
    @Deprecated
    public long getDefaultBlockSize() {
        return this.originalFs.getDefaultBlockSize();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T extends StreamWithTimeout> T createStream(SupplierWithException<T, IOException> streamOpener, HashSet<T> openStreams, boolean output) throws IOException {
        int outputLimit = output && this.maxNumOpenOutputStreams > 0 ? this.maxNumOpenOutputStreams : Integer.MAX_VALUE;
        int inputLimit = !output && this.maxNumOpenInputStreams > 0 ? this.maxNumOpenInputStreams : Integer.MAX_VALUE;
        int totalLimit = this.maxNumOpenStreamsTotal > 0 ? this.maxNumOpenStreamsTotal : Integer.MAX_VALUE;
        int outputCredit = output ? 1 : 0;
        int inputCredit = output ? 0 : 1;
        try {
            this.lock.lockInterruptibly();
            try {
                assert (this.openOutputStreams.size() <= this.numReservedOutputStreams);
                assert (this.openInputStreams.size() <= this.numReservedInputStreams);
                this.waitForAvailability(totalLimit, outputLimit, inputLimit);
                this.numReservedOutputStreams += outputCredit;
                this.numReservedInputStreams += inputCredit;
            }
            finally {
                this.lock.unlock();
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException("interrupted before opening stream");
        }
        boolean success = false;
        try {
            StreamWithTimeout out = (StreamWithTimeout)streamOpener.get();
            this.lock.lock();
            try {
                openStreams.add(out);
            }
            finally {
                this.lock.unlock();
            }
            success = true;
            StreamWithTimeout streamWithTimeout = out;
            return (T)streamWithTimeout;
        }
        finally {
            if (!success) {
                this.lock.lock();
                try {
                    this.numReservedOutputStreams -= outputCredit;
                    this.numReservedInputStreams -= inputCredit;
                    this.available.signalAll();
                }
                finally {
                    this.lock.unlock();
                }
            }
        }
    }

    @GuardedBy(value="lock")
    private void waitForAvailability(int totalLimit, int outputLimit, int inputLimit) throws InterruptedException, IOException {
        long timeLeft;
        long deadline;
        Preconditions.checkState(this.lock.isHeldByCurrentThread());
        if (this.streamOpenTimeoutNanos == 0L) {
            deadline = Long.MAX_VALUE;
        } else {
            long deadlineNanos = System.nanoTime() + this.streamOpenTimeoutNanos;
            long l = deadline = deadlineNanos > 0L ? deadlineNanos : Long.MAX_VALUE;
        }
        if (this.streamInactivityTimeoutNanos == 0L) {
            while ((timeLeft = deadline - System.nanoTime()) > 0L && !this.hasAvailability(totalLimit, outputLimit, inputLimit)) {
                this.available.await(timeLeft, TimeUnit.NANOSECONDS);
            }
        } else {
            long now;
            long checkIntervalNanos = (this.streamInactivityTimeoutNanos >>> 1) + 1L;
            while ((timeLeft = deadline - (now = System.nanoTime())) > 0L && !this.hasAvailability(totalLimit, outputLimit, inputLimit)) {
                if (this.closeInactiveStream(this.openOutputStreams, now) || this.closeInactiveStream(this.openInputStreams, now)) continue;
                long timeToWait = Math.min(checkIntervalNanos, timeLeft);
                this.available.await(timeToWait, TimeUnit.NANOSECONDS);
            }
        }
        if (timeLeft <= 0L && !this.hasAvailability(totalLimit, outputLimit, inputLimit)) {
            throw new IOException(String.format("Timeout while waiting for an available stream/connection. limits: total=%d, input=%d, output=%d ; Open: input=%d, output=%d ; timeout: %d ms", this.maxNumOpenStreamsTotal, this.maxNumOpenInputStreams, this.maxNumOpenOutputStreams, this.numReservedInputStreams, this.numReservedOutputStreams, this.getStreamOpenTimeout()));
        }
    }

    @GuardedBy(value="lock")
    private boolean hasAvailability(int totalLimit, int outputLimit, int inputLimit) {
        return this.numReservedOutputStreams < outputLimit && this.numReservedInputStreams < inputLimit && this.numReservedOutputStreams + this.numReservedInputStreams < totalLimit;
    }

    @GuardedBy(value="lock")
    private boolean closeInactiveStream(HashSet<? extends StreamWithTimeout> streams, long nowNanos) {
        for (StreamWithTimeout streamWithTimeout : streams) {
            try {
                StreamProgressTracker tracker = streamWithTimeout.getProgressTracker();
                if (streamWithTimeout.isClosed() || nowNanos < tracker.getLastCheckTimestampNanos() + this.streamInactivityTimeoutNanos) {
                    return false;
                }
                if (tracker.checkNewBytesAndMark(nowNanos)) continue;
                streamWithTimeout.closeDueToTimeout();
                return true;
            }
            catch (StreamTimeoutException tracker) {
            }
            catch (IOException e) {
                LOG.debug("Could not check for stream progress to determine inactivity", (Throwable)e);
            }
        }
        return false;
    }

    void unregisterOutputStream(OutStream stream) {
        this.lock.lock();
        try {
            if (this.openOutputStreams.remove(stream)) {
                --this.numReservedOutputStreams;
                this.available.signalAll();
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    void unregisterInputStream(InStream stream) {
        this.lock.lock();
        try {
            if (this.openInputStreams.remove(stream)) {
                --this.numReservedInputStreams;
                this.available.signalAll();
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    public static class ConnectionLimitingSettings {
        public final int limitTotal;
        public final int limitInput;
        public final int limitOutput;
        public final long streamOpenTimeout;
        public final long streamInactivityTimeout;

        public ConnectionLimitingSettings(int limitTotal, int limitInput, int limitOutput, long streamOpenTimeout, long streamInactivityTimeout) {
            Preconditions.checkArgument(limitTotal >= 0);
            Preconditions.checkArgument(limitInput >= 0);
            Preconditions.checkArgument(limitOutput >= 0);
            Preconditions.checkArgument(streamOpenTimeout >= 0L);
            Preconditions.checkArgument(streamInactivityTimeout >= 0L);
            this.limitTotal = limitTotal;
            this.limitInput = limitInput;
            this.limitOutput = limitOutput;
            this.streamOpenTimeout = streamOpenTimeout;
            this.streamInactivityTimeout = streamInactivityTimeout;
        }

        @Nullable
        public static ConnectionLimitingSettings fromConfig(Configuration config, String fsScheme) {
            Preconditions.checkNotNull(fsScheme, "fsScheme");
            Preconditions.checkNotNull(config, "config");
            ConfigOption<Integer> totalLimitOption = CoreOptions.fileSystemConnectionLimit(fsScheme);
            ConfigOption<Integer> limitInOption = CoreOptions.fileSystemConnectionLimitIn(fsScheme);
            ConfigOption<Integer> limitOutOption = CoreOptions.fileSystemConnectionLimitOut(fsScheme);
            int totalLimit = config.get(totalLimitOption);
            int limitIn = config.get(limitInOption);
            int limitOut = config.get(limitOutOption);
            ConnectionLimitingSettings.checkLimit(totalLimit, totalLimitOption);
            ConnectionLimitingSettings.checkLimit(limitIn, limitInOption);
            ConnectionLimitingSettings.checkLimit(limitOut, limitOutOption);
            if (totalLimit <= 0 && limitIn <= 0 && limitOut <= 0) {
                return null;
            }
            ConfigOption<Long> openTimeoutOption = CoreOptions.fileSystemConnectionLimitTimeout(fsScheme);
            ConfigOption<Long> inactivityTimeoutOption = CoreOptions.fileSystemConnectionLimitStreamInactivityTimeout(fsScheme);
            long openTimeout = config.get(openTimeoutOption);
            long inactivityTimeout = config.get(inactivityTimeoutOption);
            ConnectionLimitingSettings.checkTimeout(openTimeout, openTimeoutOption);
            ConnectionLimitingSettings.checkTimeout(inactivityTimeout, inactivityTimeoutOption);
            return new ConnectionLimitingSettings(totalLimit == -1 ? 0 : totalLimit, limitIn == -1 ? 0 : limitIn, limitOut == -1 ? 0 : limitOut, openTimeout, inactivityTimeout);
        }

        private static void checkLimit(int value, ConfigOption<Integer> option) {
            if (value < -1) {
                throw new IllegalConfigurationException("Invalid value for '" + option.key() + "': " + value);
            }
        }

        private static void checkTimeout(long timeout, ConfigOption<Long> option) {
            if (timeout < 0L) {
                throw new IllegalConfigurationException("Invalid value for '" + option.key() + "': " + timeout);
            }
        }
    }

    private static final class InStream
    extends FSDataInputStream
    implements StreamWithTimeout {
        private final FSDataInputStream originalStream;
        private final LimitedConnectionsFileSystem fs;
        private volatile StreamTimeoutException timeoutException;
        private final StreamProgressTracker progressTracker;
        private final AtomicBoolean closed = new AtomicBoolean();

        InStream(FSDataInputStream originalStream, LimitedConnectionsFileSystem fs) {
            this.originalStream = Preconditions.checkNotNull(originalStream);
            this.fs = Preconditions.checkNotNull(fs);
            this.progressTracker = new StreamProgressTracker(this);
        }

        @Override
        public int read() throws IOException {
            try {
                return this.originalStream.read();
            }
            catch (IOException e) {
                this.handleIOException(e);
                return 0;
            }
        }

        @Override
        public int read(byte[] b) throws IOException {
            try {
                return this.originalStream.read(b);
            }
            catch (IOException e) {
                this.handleIOException(e);
                return 0;
            }
        }

        @Override
        public int read(byte[] b, int off, int len) throws IOException {
            try {
                return this.originalStream.read(b, off, len);
            }
            catch (IOException e) {
                this.handleIOException(e);
                return 0;
            }
        }

        @Override
        public long skip(long n) throws IOException {
            try {
                return this.originalStream.skip(n);
            }
            catch (IOException e) {
                this.handleIOException(e);
                return 0L;
            }
        }

        @Override
        public int available() throws IOException {
            try {
                return this.originalStream.available();
            }
            catch (IOException e) {
                this.handleIOException(e);
                return 0;
            }
        }

        @Override
        public void mark(int readlimit) {
            this.originalStream.mark(readlimit);
        }

        @Override
        public void reset() throws IOException {
            try {
                this.originalStream.reset();
            }
            catch (IOException e) {
                this.handleIOException(e);
            }
        }

        @Override
        public boolean markSupported() {
            return this.originalStream.markSupported();
        }

        @Override
        public void seek(long desired) throws IOException {
            try {
                this.originalStream.seek(desired);
            }
            catch (IOException e) {
                this.handleIOException(e);
            }
        }

        @Override
        public long getPos() throws IOException {
            try {
                return this.originalStream.getPos();
            }
            catch (IOException e) {
                this.handleIOException(e);
                return 0L;
            }
        }

        @Override
        public void close() throws IOException {
            if (this.closed.compareAndSet(false, true)) {
                try {
                    this.originalStream.close();
                }
                catch (IOException e) {
                    this.handleIOException(e);
                }
                finally {
                    this.fs.unregisterInputStream(this);
                }
            }
        }

        @Override
        public void closeDueToTimeout() throws IOException {
            this.timeoutException = new StreamTimeoutException();
            this.close();
        }

        @Override
        public boolean isClosed() {
            return this.closed.get();
        }

        @Override
        public StreamProgressTracker getProgressTracker() {
            return this.progressTracker;
        }

        private void handleIOException(IOException exception) throws IOException {
            if (this.timeoutException == null) {
                throw exception;
            }
            StreamTimeoutException te = new StreamTimeoutException(this.timeoutException);
            te.addSuppressed(exception);
            throw te;
        }
    }

    private static final class OutStream
    extends FSDataOutputStream
    implements StreamWithTimeout {
        private final FSDataOutputStream originalStream;
        private final LimitedConnectionsFileSystem fs;
        private final StreamProgressTracker progressTracker;
        private volatile StreamTimeoutException timeoutException;
        private final AtomicBoolean closed = new AtomicBoolean();

        OutStream(FSDataOutputStream originalStream, LimitedConnectionsFileSystem fs) {
            this.originalStream = Preconditions.checkNotNull(originalStream);
            this.fs = Preconditions.checkNotNull(fs);
            this.progressTracker = new StreamProgressTracker(this);
        }

        @Override
        public void write(int b) throws IOException {
            try {
                this.originalStream.write(b);
            }
            catch (IOException e) {
                this.handleIOException(e);
            }
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            try {
                this.originalStream.write(b, off, len);
            }
            catch (IOException e) {
                this.handleIOException(e);
            }
        }

        @Override
        public long getPos() throws IOException {
            try {
                return this.originalStream.getPos();
            }
            catch (IOException e) {
                this.handleIOException(e);
                return -1L;
            }
        }

        @Override
        public void flush() throws IOException {
            try {
                this.originalStream.flush();
            }
            catch (IOException e) {
                this.handleIOException(e);
            }
        }

        @Override
        public void sync() throws IOException {
            try {
                this.originalStream.sync();
            }
            catch (IOException e) {
                this.handleIOException(e);
            }
        }

        @Override
        public void close() throws IOException {
            if (this.closed.compareAndSet(false, true)) {
                try {
                    this.originalStream.close();
                }
                catch (IOException e) {
                    this.handleIOException(e);
                }
                finally {
                    this.fs.unregisterOutputStream(this);
                }
            }
        }

        @Override
        public void closeDueToTimeout() throws IOException {
            this.timeoutException = new StreamTimeoutException();
            this.close();
        }

        @Override
        public boolean isClosed() {
            return this.closed.get();
        }

        @Override
        public StreamProgressTracker getProgressTracker() {
            return this.progressTracker;
        }

        private void handleIOException(IOException exception) throws IOException {
            if (this.timeoutException == null) {
                throw exception;
            }
            StreamTimeoutException te = new StreamTimeoutException(this.timeoutException);
            te.addSuppressed(exception);
            throw te;
        }
    }

    private static final class StreamProgressTracker {
        private final StreamWithTimeout stream;
        private volatile long lastCheckBytes = -1L;
        private volatile long lastCheckTimestampNanos;

        StreamProgressTracker(StreamWithTimeout stream) {
            this.stream = stream;
        }

        public long getLastCheckTimestampNanos() {
            return this.lastCheckTimestampNanos;
        }

        public boolean checkNewBytesAndMark(long timestamp) throws IOException {
            this.lastCheckTimestampNanos = timestamp;
            long bytesNow = this.stream.getPos();
            if (bytesNow > this.lastCheckBytes) {
                this.lastCheckBytes = bytesNow;
                return true;
            }
            return false;
        }
    }

    private static interface StreamWithTimeout
    extends Closeable {
        public StreamProgressTracker getProgressTracker();

        public long getPos() throws IOException;

        public void closeDueToTimeout() throws IOException;

        public boolean isClosed();
    }

    public static final class StreamTimeoutException
    extends IOException {
        private static final long serialVersionUID = -8790922066795901928L;

        public StreamTimeoutException() {
            super("Stream closed due to inactivity timeout. This is done to prevent inactive streams from blocking the full pool of limited connections");
        }

        public StreamTimeoutException(StreamTimeoutException other) {
            super(other.getMessage(), other);
        }
    }
}

