/*
 * Decompiled with CFR 0.152.
 */
package io.streamthoughts.kafka.connect.filepulse.fs;

import io.streamthoughts.kafka.connect.filepulse.config.CommonSourceConfig;
import io.streamthoughts.kafka.connect.filepulse.fs.CompositeFileListFilter;
import io.streamthoughts.kafka.connect.filepulse.fs.FileListFilter;
import io.streamthoughts.kafka.connect.filepulse.fs.FileObjectCandidatesFilter;
import io.streamthoughts.kafka.connect.filepulse.fs.FileSystemListing;
import io.streamthoughts.kafka.connect.filepulse.fs.TaskFileOrder;
import io.streamthoughts.kafka.connect.filepulse.fs.TaskFileURIProvider;
import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectStatus;
import io.streamthoughts.kafka.connect.filepulse.source.SourceOffsetPolicy;
import io.streamthoughts.kafka.connect.filepulse.source.TaskPartitioner;
import io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreAccess;
import io.streamthoughts.kafka.connect.filepulse.state.internal.OpaqueMemoryResource;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.storage.StateSnapshot;
import java.net.URI;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.common.config.ConfigDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DelegateTaskFileURIProvider
implements TaskFileURIProvider {
    private final Logger LOG = LoggerFactory.getLogger(DelegateTaskFileURIProvider.class);
    private static final Duration DEFAULT_REFRESH_TIMEOUT = Duration.ofSeconds(30L);
    private static final String CONNECT_NAME_CONFIG = "name";
    private Config config;
    private FileSystemListing<?> fileSystemListing;
    private TaskPartitioner partitioner;
    private SourceOffsetPolicy sourceOffsetPolicy;
    private StateBackingStoreAccess sharedStore;
    private TaskFileOrder taskFileOrder;
    private StateSnapshot<FileObject> fileState;
    private boolean isFirstCall = true;

    public void configure(Map<String, ?> configs) {
        this.config = new Config(configs);
        this.sharedStore = new StateBackingStoreAccess(configs.get(CONNECT_NAME_CONFIG).toString(), this.config::getStateBackingStore, true);
        this.partitioner = this.config.getTaskPartitioner();
        this.fileSystemListing = this.config.getFileSystemListing();
        this.sourceOffsetPolicy = this.config.getSourceOffsetPolicy();
        this.fileSystemListing.setFilter((FileListFilter)new CompositeFileListFilter(this.config.getFileSystemListingFilter()));
        this.taskFileOrder = this.config.getTaskFilerOrder();
    }

    public List<URI> nextURIs() {
        this.refreshState();
        Collection<FileObjectMeta> filtered = FileObjectCandidatesFilter.filter(this.sourceOffsetPolicy, fileObjectKey -> {
            FileObject fileObject = (FileObject)this.fileState.getForKey(fileObjectKey.original());
            if (fileObject == null) {
                return true;
            }
            FileObjectStatus status = fileObject.status();
            if (status == FileObjectStatus.COMPLETED) {
                return this.isFirstCall;
            }
            return !status.isDone();
        }, this.fileSystemListing.listObjects()).values();
        this.isFirstCall = false;
        List<FileObjectMeta> sorted = this.taskFileOrder.sort(filtered);
        return this.partitioner.partitionForTask(sorted, this.config.getTaskCount(), this.config.getTaskId());
    }

    private void refreshState() {
        try {
            StateBackingStore store = (StateBackingStore)((OpaqueMemoryResource)this.sharedStore.get()).getResource();
            store.refresh(DEFAULT_REFRESH_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
            this.fileState = store.snapshot();
        }
        catch (TimeoutException e) {
            this.LOG.debug("Failed to reach end of states log quickly enough", (Throwable)e);
        }
    }

    public boolean hasMore() {
        return true;
    }

    public void close() {
        this.LOG.info("Closing DelegateTaskFileURIProvider");
        this.closeSharedStateBackingStore();
        this.LOG.info("Closed DelegateTaskFileURIProvider");
    }

    private void closeSharedStateBackingStore() {
        try {
            if (this.sharedStore != null) {
                this.sharedStore.close();
            }
        }
        catch (Exception exception) {
            this.LOG.warn("Failed to close shared StateBackingStore. Error: '{}'", (Object)exception.getMessage());
        }
    }

    public static final class Config
    extends CommonSourceConfig {
        public static final String TASK_ID_CONFIG = "task.id";
        private static final String TASK_ID_DOC = "The current task id";
        public static final String TASK_COUNT_CONFIG = "task.count";
        private static final String TASK_COUNT_DOC = "The total number tasks assigned.";

        public Config(Map<String, ?> originals) {
            super(Config.getConf(), originals);
        }

        static ConfigDef getConf() {
            return CommonSourceConfig.getConfigDev().define(TASK_ID_CONFIG, ConfigDef.Type.INT, ConfigDef.Importance.HIGH, TASK_ID_DOC).define(TASK_COUNT_CONFIG, ConfigDef.Type.INT, ConfigDef.Importance.HIGH, TASK_COUNT_DOC);
        }

        public int getTaskId() {
            return this.getInt(TASK_ID_CONFIG);
        }

        public int getTaskCount() {
            return this.getInt(TASK_COUNT_CONFIG);
        }
    }
}

