/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.ozone.container.ozoneimpl;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerScannerConfiguration;
import org.apache.hadoop.ozone.container.ozoneimpl.OnDemandScannerMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class OnDemandContainerDataScanner {
    public static final Logger LOG = LoggerFactory.getLogger(OnDemandContainerDataScanner.class);
    private static volatile OnDemandContainerDataScanner instance;
    private final ExecutorService scanExecutor;
    private final ContainerController containerController;
    private final DataTransferThrottler throttler;
    private final Canceler canceler;
    private final ConcurrentHashMap.KeySetView<Long, Boolean> containerRescheduleCheckSet;
    private final OnDemandScannerMetrics metrics;
    private final long minScanGap;

    private OnDemandContainerDataScanner(ContainerScannerConfiguration conf, ContainerController controller) {
        this.containerController = controller;
        this.throttler = new DataTransferThrottler(conf.getOnDemandBandwidthPerVolume());
        this.canceler = new Canceler();
        this.metrics = OnDemandScannerMetrics.create();
        this.scanExecutor = Executors.newSingleThreadExecutor();
        this.containerRescheduleCheckSet = ConcurrentHashMap.newKeySet();
        this.minScanGap = conf.getContainerScanMinGap();
    }

    public static synchronized void init(ContainerScannerConfiguration conf, ContainerController controller) {
        if (instance != null) {
            LOG.warn("Trying to initialize on demand scanner a second time on a datanode.");
            return;
        }
        instance = new OnDemandContainerDataScanner(conf, controller);
    }

    private static boolean shouldScan(Container<?> container) {
        if (container == null) {
            return false;
        }
        long containerID = ((ContainerData)container.getContainerData()).getContainerID();
        if (instance == null) {
            LOG.debug("Skipping on demand scan for container {} since scanner was not initialized.", (Object)containerID);
            return false;
        }
        HddsVolume containerVolume = ((ContainerData)container.getContainerData()).getVolume();
        if (containerVolume.isFailed()) {
            LOG.debug("Skipping on demand scan for container {} since its volume {} has failed.", (Object)containerID, (Object)containerVolume);
            return false;
        }
        return !ContainerUtils.recentlyScanned(container, OnDemandContainerDataScanner.instance.minScanGap, LOG) && container.shouldScanData();
    }

    public static Optional<Future<?>> scanContainer(Container<?> container) {
        if (!OnDemandContainerDataScanner.shouldScan(container)) {
            return Optional.empty();
        }
        Future<?> resultFuture = null;
        long containerId = ((ContainerData)container.getContainerData()).getContainerID();
        if (OnDemandContainerDataScanner.addContainerToScheduledContainers(containerId)) {
            resultFuture = OnDemandContainerDataScanner.instance.scanExecutor.submit(() -> {
                OnDemandContainerDataScanner.performOnDemandScan(container);
                OnDemandContainerDataScanner.removeContainerFromScheduledContainers(containerId);
            });
        }
        return Optional.ofNullable(resultFuture);
    }

    private static boolean addContainerToScheduledContainers(long containerId) {
        return OnDemandContainerDataScanner.instance.containerRescheduleCheckSet.add(containerId);
    }

    private static void removeContainerFromScheduledContainers(long containerId) {
        OnDemandContainerDataScanner.instance.containerRescheduleCheckSet.remove(containerId);
    }

    private static void performOnDemandScan(Container<?> container) {
        if (!OnDemandContainerDataScanner.shouldScan(container)) {
            return;
        }
        long containerId = ((ContainerData)container.getContainerData()).getContainerID();
        try {
            Object containerData = container.getContainerData();
            OnDemandContainerDataScanner.logScanStart(containerData);
            Container.ScanResult result = container.scanData(OnDemandContainerDataScanner.instance.throttler, OnDemandContainerDataScanner.instance.canceler);
            if (result.getFailureType() == Container.ScanResult.FailureType.DELETED_CONTAINER) {
                LOG.error("Container [{}] has been deleted.", (Object)containerId, (Object)result.getException());
                return;
            }
            if (!result.isHealthy()) {
                LOG.error("Corruption detected in container [{}].Marking it UNHEALTHY.", (Object)containerId, (Object)result.getException());
                OnDemandContainerDataScanner.instance.metrics.incNumUnHealthyContainers();
                OnDemandContainerDataScanner.instance.containerController.markContainerUnhealthy(containerId, result);
            }
            OnDemandContainerDataScanner.instance.metrics.incNumContainersScanned();
            Instant now = Instant.now();
            OnDemandContainerDataScanner.logScanCompleted(containerData, now);
            OnDemandContainerDataScanner.instance.containerController.updateDataScanTimestamp(containerId, now);
        }
        catch (IOException e) {
            LOG.warn("Unexpected exception while scanning container " + containerId, (Throwable)e);
        }
        catch (InterruptedException ex) {
            LOG.info("On demand container scan interrupted.");
        }
    }

    private static void logScanStart(ContainerData containerData) {
        if (LOG.isDebugEnabled()) {
            Optional<Instant> scanTimestamp = containerData.lastDataScanTime();
            String lastScanTime = scanTimestamp.map(ts -> "at " + ts).orElse("never");
            LOG.debug("Scanning container {}, last scanned {}", (Object)containerData.getContainerID(), (Object)lastScanTime);
        }
    }

    private static void logScanCompleted(ContainerData containerData, Instant timestamp) {
        LOG.debug("Completed scan of container {} at {}", (Object)containerData.getContainerID(), (Object)timestamp);
    }

    public static OnDemandScannerMetrics getMetrics() {
        return OnDemandContainerDataScanner.instance.metrics;
    }

    @VisibleForTesting
    public static DataTransferThrottler getThrottler() {
        return OnDemandContainerDataScanner.instance.throttler;
    }

    @VisibleForTesting
    public static Canceler getCanceler() {
        return OnDemandContainerDataScanner.instance.canceler;
    }

    public static synchronized void shutdown() {
        if (instance == null) {
            return;
        }
        instance.shutdownScanner();
    }

    private synchronized void shutdownScanner() {
        instance = null;
        this.metrics.unregister();
        String shutdownMessage = "On-demand container scanner is shutting down.";
        LOG.info(shutdownMessage);
        this.canceler.cancel(shutdownMessage);
        if (!this.scanExecutor.isShutdown()) {
            this.scanExecutor.shutdown();
        }
        try {
            long timeoutSeconds = 5L;
            if (!this.scanExecutor.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) {
                LOG.warn("On demand scanner shut down forcefully after {} seconds", (Object)timeoutSeconds);
                this.scanExecutor.shutdownNow();
            }
        }
        catch (InterruptedException e) {
            LOG.warn("On demand scanner interrupted while waiting for shut down.");
            this.scanExecutor.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

