/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FatalExitExceptionHandler;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteStorageScanner
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteStorageScanner.class);
    private static final int MAX_RETRY_TIME = 100;
    private static final int INITIAL_SCAN_INTERVAL_MS = 100;
    private static final int MAX_SCAN_INTERVAL_MS = 10000;
    private final ScheduledExecutorService scannerExecutor = Executors.newScheduledThreadPool(1, (ThreadFactory)new ExecutorThreadFactory("remote storage scanner"));
    private final Map<Tuple2<TieredStoragePartitionId, TieredStorageSubpartitionId>, Integer> requiredSegmentIds;
    private final Map<Tuple2<TieredStoragePartitionId, TieredStorageSubpartitionId>, Integer> scannedMaxSegmentIds;
    private final String baseRemoteStoragePath;
    private final ScanStrategy scanStrategy;
    private final FileSystem remoteFileSystem;
    @Nullable
    private BiConsumer<TieredStoragePartitionId, TieredStorageSubpartitionId> availabilityNotifier;
    private int lastInterval = 100;
    private int currentRetryTime = 0;

    public RemoteStorageScanner(String baseRemoteStoragePath) {
        this.baseRemoteStoragePath = baseRemoteStoragePath;
        this.requiredSegmentIds = new ConcurrentHashMap<Tuple2<TieredStoragePartitionId, TieredStorageSubpartitionId>, Integer>();
        this.scannedMaxSegmentIds = new ConcurrentHashMap<Tuple2<TieredStoragePartitionId, TieredStorageSubpartitionId>, Integer>();
        this.scanStrategy = new ScanStrategy(10000);
        this.remoteFileSystem = this.createFileSystem();
    }

    private FileSystem createFileSystem() {
        FileSystem fileSystem = null;
        try {
            fileSystem = new Path(this.baseRemoteStoragePath).getFileSystem();
        }
        catch (IOException e) {
            ExceptionUtils.rethrow((Throwable)e, (String)("Failed to initialize file system on the path: " + this.baseRemoteStoragePath));
        }
        return fileSystem;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() {
        ScheduledExecutorService scheduledExecutorService = this.scannerExecutor;
        synchronized (scheduledExecutorService) {
            if (!this.scannerExecutor.isShutdown()) {
                this.scannerExecutor.schedule(this, (long)this.lastInterval, TimeUnit.MILLISECONDS);
            }
        }
    }

    public void watchSegment(TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, int segmentId) {
        Tuple2 key = Tuple2.of((Object)partitionId, (Object)subpartitionId);
        this.scannedMaxSegmentIds.compute((Tuple2<TieredStoragePartitionId, TieredStorageSubpartitionId>)key, (segmentKey, maxSegmentId) -> {
            if (maxSegmentId == null || maxSegmentId < segmentId) {
                this.requiredSegmentIds.put((Tuple2<TieredStoragePartitionId, TieredStorageSubpartitionId>)segmentKey, segmentId);
            }
            return maxSegmentId;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        ScheduledExecutorService scheduledExecutorService = this.scannerExecutor;
        synchronized (scheduledExecutorService) {
            this.scannerExecutor.shutdownNow();
        }
        try {
            if (!this.scannerExecutor.awaitTermination(5L, TimeUnit.MINUTES)) {
                throw new TimeoutException("Timeout to shutdown the flush thread.");
            }
        }
        catch (InterruptedException | TimeoutException e) {
            ExceptionUtils.rethrow((Throwable)e);
        }
    }

    @Override
    public void run() {
        try {
            Iterator<Map.Entry<Tuple2<TieredStoragePartitionId, TieredStorageSubpartitionId>, Integer>> iterator = this.requiredSegmentIds.entrySet().iterator();
            boolean scanned = false;
            while (iterator.hasNext()) {
                Map.Entry<Tuple2<TieredStoragePartitionId, TieredStorageSubpartitionId>, Integer> ids = iterator.next();
                TieredStoragePartitionId partitionId = (TieredStoragePartitionId)ids.getKey().f0;
                TieredStorageSubpartitionId subpartitionId = (TieredStorageSubpartitionId)ids.getKey().f1;
                int requiredSegmentId = ids.getValue();
                int maxSegmentId = this.scannedMaxSegmentIds.getOrDefault(ids.getKey(), -1);
                if (maxSegmentId >= requiredSegmentId && this.checkSegmentExist(partitionId, subpartitionId, requiredSegmentId)) {
                    scanned = true;
                    iterator.remove();
                    ((BiConsumer)Preconditions.checkNotNull(this.availabilityNotifier)).accept(partitionId, subpartitionId);
                    continue;
                }
                this.scanMaxSegmentId(partitionId, subpartitionId);
            }
            this.lastInterval = scanned ? 100 : this.scanStrategy.getInterval(this.lastInterval);
            this.start();
        }
        catch (Throwable throwable) {
            FatalExitExceptionHandler.INSTANCE.uncaughtException(Thread.currentThread(), throwable);
        }
    }

    public void registerAvailabilityAndPriorityNotifier(BiConsumer<TieredStoragePartitionId, TieredStorageSubpartitionId> availabilityNotifier) {
        this.availabilityNotifier = availabilityNotifier;
    }

    private void scanMaxSegmentId(TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId) {
        Path segmentFinishDir = SegmentPartitionFile.getSegmentFinishDirPath(this.baseRemoteStoragePath, partitionId, subpartitionId.getSubpartitionId());
        FileStatus[] fileStatuses = new FileStatus[]{};
        try {
            if (!this.remoteFileSystem.exists(segmentFinishDir)) {
                return;
            }
            fileStatuses = this.remoteFileSystem.listStatus(segmentFinishDir);
            this.currentRetryTime = 0;
        }
        catch (Throwable t) {
            if (t instanceof FileNotFoundException) {
                return;
            }
            ++this.currentRetryTime;
            this.tryThrowException(t, "Failed to list the segment finish file.");
        }
        if (fileStatuses.length != 1) {
            return;
        }
        this.scannedMaxSegmentIds.put((Tuple2<TieredStoragePartitionId, TieredStorageSubpartitionId>)Tuple2.of((Object)partitionId, (Object)subpartitionId), Integer.parseInt(fileStatuses[0].getPath().getName()));
    }

    private boolean checkSegmentExist(TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, int segmentId) {
        Path segmentPath = SegmentPartitionFile.getSegmentPath(this.baseRemoteStoragePath, partitionId, subpartitionId.getSubpartitionId(), segmentId);
        boolean isExist = false;
        try {
            isExist = this.remoteFileSystem.exists(segmentPath);
            this.currentRetryTime = 0;
        }
        catch (Throwable t) {
            ++this.currentRetryTime;
            this.tryThrowException(t, "Failed to check the status of segment file.");
        }
        return isExist;
    }

    private void tryThrowException(Throwable t, String logMessage) {
        LOG.warn(logMessage);
        if (this.currentRetryTime > 100) {
            throw new RuntimeException(logMessage, t);
        }
    }

    static class ScanStrategy {
        private final int maxScanInterval;

        ScanStrategy(int maxScanInterval) {
            Preconditions.checkArgument((maxScanInterval > 0 ? 1 : 0) != 0, (String)"maxScanInterval must be positive, was %s", (Object[])new Object[]{maxScanInterval});
            this.maxScanInterval = maxScanInterval;
        }

        int getInterval(int lastInterval) {
            return Math.min(lastInterval * 2, this.maxScanInterval);
        }
    }
}

