/*
 * Decompiled with CFR 0.152.
 */
package io.streamthoughts.kafka.connect.filepulse.scanner;

import io.streamthoughts.kafka.connect.filepulse.clean.BatchFileCleanupPolicy;
import io.streamthoughts.kafka.connect.filepulse.clean.DelegateBatchFileCleanupPolicy;
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicy;
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicyResult;
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicyResultSet;
import io.streamthoughts.kafka.connect.filepulse.clean.GenericFileCleanupPolicy;
import io.streamthoughts.kafka.connect.filepulse.internal.KeyValuePair;
import io.streamthoughts.kafka.connect.filepulse.offset.OffsetManager;
import io.streamthoughts.kafka.connect.filepulse.scanner.FileSystemScanner;
import io.streamthoughts.kafka.connect.filepulse.scanner.local.FSDirectoryWalker;
import io.streamthoughts.kafka.connect.filepulse.source.SourceFile;
import io.streamthoughts.kafka.connect.filepulse.source.SourceMetadata;
import io.streamthoughts.kafka.connect.filepulse.source.SourceStatus;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.storage.StateSnapshot;
import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.connector.ConnectorContext;
import org.apache.kafka.connect.util.ConnectorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalFileSystemScanner
implements FileSystemScanner {
    private static final Logger LOG = LoggerFactory.getLogger(LocalFileSystemScanner.class);
    private static final long READ_CONFIG_ON_START_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(30L);
    private static final Comparator<SourceMetadata> BY_LAST_MODIFIED = Comparator.comparingLong(SourceMetadata::lastModified);
    private final String sourceDirectoryPath;
    private final FSDirectoryWalker fsWalker;
    private final StateBackingStore<SourceFile> store;
    private final Map<String, SourceMetadata> scheduled = new ConcurrentHashMap<String, SourceMetadata>();
    private LinkedBlockingQueue<SourceFile> completed = new LinkedBlockingQueue();
    private StateSnapshot<SourceFile> fileState;
    private final OffsetManager offsetManager;
    private final BatchFileCleanupPolicy cleaner;
    private ScanStatus status;

    public LocalFileSystemScanner(String sourceDirectoryPath, FSDirectoryWalker fsWalker, GenericFileCleanupPolicy cleaner, final OffsetManager offsetManager, StateBackingStore<SourceFile> store) {
        Objects.requireNonNull(fsWalker, "fsWalker can't be null");
        Objects.requireNonNull(sourceDirectoryPath, "scanDirectoryPath can't be null");
        Objects.requireNonNull(cleaner, "cleaner can't be null");
        this.sourceDirectoryPath = sourceDirectoryPath;
        this.fsWalker = fsWalker;
        if (cleaner instanceof FileCleanupPolicy) {
            this.cleaner = new DelegateBatchFileCleanupPolicy((FileCleanupPolicy)cleaner);
        } else if (cleaner instanceof BatchFileCleanupPolicy) {
            this.cleaner = (BatchFileCleanupPolicy)cleaner;
        } else {
            throw new IllegalArgumentException("Cleaner must be one of 'FileCleanupPolicy', 'BatchFileCleanupPolicy', or the variants that are consumer aware and/or Acknowledging not " + cleaner.getClass().getName());
        }
        this.offsetManager = offsetManager;
        this.store = store;
        this.status = ScanStatus.CREATED;
        LOG.info("Creating local filesystem scanner");
        this.store.setUpdateListener(new StateBackingStore.UpdateListener<SourceFile>(){

            @Override
            public void onStateRemove(String key) {
            }

            @Override
            public void onStateUpdate(String key, SourceFile state) {
                SourceStatus status = state.status();
                if (status.isOneOf(SourceStatus.completed())) {
                    LocalFileSystemScanner.this.completed.add(state);
                } else if (status.isOneOf(new SourceStatus[]{SourceStatus.CLEANED})) {
                    String partition = offsetManager.toPartitionJson(state.metadata());
                    SourceMetadata remove = (SourceMetadata)LocalFileSystemScanner.this.scheduled.remove(partition);
                    if (remove == null) {
                        LOG.warn("Received cleaned status but no file currently scheduled for partition : '{}', this warn should only occurred during recovering step", (Object)partition);
                    }
                }
            }
        });
        if (!this.store.isStarted()) {
            this.store.start();
        } else {
            LOG.warn("The StateBackingStore used to synchronize this connector with tasks processing files is already started. You can ignore that warning if the connector  is recovering from a crash or resuming after being paused.");
        }
        this.readStatesToEnd(READ_CONFIG_ON_START_TIMEOUT_MS);
        this.recoverPreviouslyCompletedSources();
        this.status = ScanStatus.READY;
        LOG.info("Finished initializing local filesystem scanner");
    }

    private void recoverPreviouslyCompletedSources() {
        LOG.info("Recovering completed files from a previous execution");
        this.fileState.states().values().stream().filter(s -> s.status().isOneOf(SourceStatus.completed())).forEach(s -> this.completed.add((SourceFile)s));
        LOG.info("Finished recovering previously completed files : " + this.completed);
    }

    private boolean readStatesToEnd(long timeoutMs) {
        try {
            this.store.refresh(timeoutMs, TimeUnit.MILLISECONDS);
            this.fileState = this.store.snapshot();
            LOG.debug("Finished reading to end of log and updated states snapshot, new states log position: {}", (Object)this.fileState.offset());
            return true;
        }
        catch (TimeoutException e) {
            LOG.warn("Didn't reach end of states log quickly enough", (Throwable)e);
            return false;
        }
    }

    public void scan(ConnectorContext context) {
        this.cleanUpCompletedFiles();
        if (this.updateFiles()) {
            LOG.info("Requesting task reconfiguration");
            context.requestTaskReconfiguration();
        }
    }

    private void cleanUpCompletedFiles() {
        if (!this.completed.isEmpty()) {
            LOG.info("Cleaning up completed files '{}'", (Object)this.completed.size());
            ArrayList cleanable = new ArrayList(this.completed.size());
            this.completed.drainTo(cleanable);
            FileCleanupPolicyResultSet cleaned = (FileCleanupPolicyResultSet)this.cleaner.apply(cleanable);
            cleaned.forEach((source, result) -> {
                if (result.equals((Object)FileCleanupPolicyResult.SUCCEED)) {
                    String partition = this.offsetManager.toPartitionJson(source.metadata());
                    this.store.put(partition, source.withStatus(SourceStatus.CLEANED));
                } else {
                    LOG.info("Postpone clean up for file '{}'", source);
                    this.completed.add((SourceFile)source);
                }
            });
            LOG.info("Finished cleaning all completed source files");
        }
    }

    private synchronized boolean updateFiles() {
        if (this.scheduled.isEmpty()) {
            LOG.info("Scanning local file system directory '{}'", (Object)this.sourceDirectoryPath);
            Collection<File> files = this.fsWalker.listFiles(new File(this.sourceDirectoryPath));
            LOG.info("Completed scanned, number of files detected '{}' ", (Object)files.size());
            if (this.readStatesToEnd(TimeUnit.SECONDS.toMillis(5L))) {
                StateSnapshot<SourceFile> snapshot = this.store.snapshot();
                this.scheduled.putAll(this.toScheduled(files, snapshot));
                LOG.info("Finished lookup for new files : '{}' files selected", (Object)this.scheduled.size());
                this.notifyAll();
                return !this.scheduled.isEmpty() && this.status.equals((Object)ScanStatus.STARTED);
            }
            LOG.info("Finished scanning directory '{}'", (Object)this.sourceDirectoryPath);
        } else {
            LOG.info("Remaining in progress scheduled files: {}. Skip directory scan while waiting for tasks completion.", (Object)this.scheduled.size());
        }
        return false;
    }

    private Map<String, SourceMetadata> toScheduled(Collection<File> scanned, StateSnapshot<SourceFile> snapshot) {
        List toScheduled = scanned.stream().map(SourceMetadata::fromFile).map(metadata -> KeyValuePair.of((Object)this.offsetManager.toPartitionJson(metadata), (Object)metadata)).filter(kv -> this.maybeScheduled(snapshot, (String)kv.key)).collect(Collectors.toList());
        Stream<Map.Entry> entryStream = toScheduled.stream().collect(Collectors.groupingBy(kv -> (String)kv.key)).entrySet().stream().filter(entry -> ((List)entry.getValue()).size() > 1);
        Map<String, List> duplicates = entryStream.collect(Collectors.toMap(Map.Entry::getKey, e -> ((List)e.getValue()).stream().map(m -> ((SourceMetadata)m.value).absolutePath()).collect(Collectors.toList())));
        if (!duplicates.isEmpty()) {
            String strDuplicates = duplicates.entrySet().stream().map(e -> "offset=" + (String)e.getKey() + ", files=" + e.getValue()).collect(Collectors.joining("\n\t", "\n\t", "\n"));
            LOG.error("Duplicates detected in source files. Consider changing the configuration property \"offset.strategy\". Scan is ignored : {}", (Object)strDuplicates);
            return Collections.emptyMap();
        }
        return toScheduled.stream().collect(Collectors.toMap(kv -> (String)kv.key, kv -> (SourceMetadata)kv.value));
    }

    private boolean maybeScheduled(StateSnapshot<SourceFile> snapshot, String partition) {
        return !snapshot.contains(partition) || snapshot.getForKey(partition).status().isOneOf(SourceStatus.started());
    }

    public synchronized List<List<String>> partitionFilesAndGet(int maxGroups) {
        List partitions;
        long started;
        LOG.info("Retrieving source files to be scheduled found during last scan");
        long timeout = 15000L;
        long now = started = Time.SYSTEM.milliseconds();
        while (this.scheduled.isEmpty() && now - started < 15000L) {
            try {
                LOG.info("No file to be scheduled, waiting for next input directory scan execution");
                this.wait(15000L - (now - started));
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            now = Time.SYSTEM.milliseconds();
        }
        if (this.scheduled.isEmpty()) {
            LOG.warn("Directory could not be scanned quickly enough, or no file detected after connector started");
            partitions = Collections.emptyList();
        } else {
            int numGroups = Math.min(this.scheduled.size(), maxGroups);
            ArrayList<SourceMetadata> sources = new ArrayList<SourceMetadata>(this.scheduled.values());
            sources.sort(BY_LAST_MODIFIED);
            List paths = sources.stream().map(SourceMetadata::absolutePath).collect(Collectors.toList());
            partitions = ConnectorUtils.groupPartitions(paths, (int)numGroups);
        }
        this.status = ScanStatus.STARTED;
        return partitions;
    }

    public void close() {
        this.status = ScanStatus.STOPPED;
    }

    private static enum ScanStatus {
        CREATED,
        READY,
        STARTED,
        STOPPED;

    }
}

