/*
 * Decompiled with CFR 0.152.
 */
package io.streamthoughts.kafka.connect.filepulse.source;

import io.streamthoughts.kafka.connect.filepulse.Version;
import io.streamthoughts.kafka.connect.filepulse.clean.GenericFileCleanupPolicy;
import io.streamthoughts.kafka.connect.filepulse.config.SourceConnectorConfig;
import io.streamthoughts.kafka.connect.filepulse.fs.CompositeFileListFilter;
import io.streamthoughts.kafka.connect.filepulse.fs.DefaultFileSystemMonitor;
import io.streamthoughts.kafka.connect.filepulse.fs.DefaultTaskFileURIProvider;
import io.streamthoughts.kafka.connect.filepulse.fs.DelegateTaskFileURIProvider;
import io.streamthoughts.kafka.connect.filepulse.fs.FileListFilter;
import io.streamthoughts.kafka.connect.filepulse.fs.FileSystemListing;
import io.streamthoughts.kafka.connect.filepulse.fs.FileSystemMonitor;
import io.streamthoughts.kafka.connect.filepulse.source.FileObject;
import io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask;
import io.streamthoughts.kafka.connect.filepulse.source.FileSystemMonitorThread;
import io.streamthoughts.kafka.connect.filepulse.source.HashByURITaskPartitioner;
import io.streamthoughts.kafka.connect.filepulse.source.TaskPartitioner;
import io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreAccess;
import io.streamthoughts.kafka.connect.filepulse.state.internal.OpaqueMemoryResource;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FilePulseSourceConnector
extends SourceConnector {
    private static final Logger LOG = LoggerFactory.getLogger(FilePulseSourceConnector.class);
    private static final long DEFAULT_MAX_TIMEOUT = 5000L;
    private static final String CONNECT_NAME_CONFIG = "name";
    private Map<String, String> configProperties;
    private final AtomicInteger taskConfigsGeneration = new AtomicInteger(0);
    private FileSystemMonitorThread fsMonitorThread;
    private SourceConnectorConfig connectorConfig;
    private FileSystemMonitor monitor;
    private String connectorGroupName;
    private TaskPartitioner partitioner;
    private StateBackingStoreAccess sharedStore;
    private final AtomicBoolean closed = new AtomicBoolean(false);

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> props) {
        this.connectorGroupName = props.get(CONNECT_NAME_CONFIG);
        LOG.info("Starting FilePulse source connector: {}", (Object)this.connectorGroupName);
        try {
            this.configProperties = new HashMap<String, String>(props);
            this.configProperties.put("tasks.file.status.storage.name", this.connectorGroupName);
            this.configProperties.put("tasks.file.status.storage.consumer.enabled", "true");
            this.connectorConfig = new SourceConnectorConfig(this.configProperties);
        }
        catch (ConfigException e2) {
            throw new ConnectException("Failed to initialize FilePulseSourceConnector due to configuration error", (Throwable)e2);
        }
        try {
            this.sharedStore = new StateBackingStoreAccess(this.connectorGroupName, this.connectorConfig::getStateBackingStore, false);
            this.partitioner = this.connectorConfig.getTaskPartitioner();
            FileSystemListing<?> fileSystemListing = this.connectorConfig.getFileSystemListing();
            fileSystemListing.setFilter((FileListFilter)new CompositeFileListFilter(this.connectorConfig.getFileSystemListingFilter()));
            this.monitor = new DefaultFileSystemMonitor(this.connectorConfig.allowTasksReconfigurationAfterTimeoutMs(), fileSystemListing, (GenericFileCleanupPolicy)this.connectorConfig.getFsCleanupPolicy(), this.connectorConfig.getFsCleanupPolicyPredicate(), this.connectorConfig.getSourceOffsetPolicy(), (StateBackingStore<FileObject>)((StateBackingStore)((OpaqueMemoryResource)this.sharedStore.get()).getResource()), this.connectorConfig.getTaskFilerOrder());
            this.monitor.setFileSystemListingEnabled(!this.connectorConfig.isFileListingTaskDelegationEnabled());
            this.fsMonitorThread = new FileSystemMonitorThread(this.context, this.monitor, this.connectorConfig.getListingInterval());
            this.fsMonitorThread.setUncaughtExceptionHandler((t, e) -> {
                LOG.info("Uncaught error from file system monitoring thread [{}]", (Object)t.getName(), (Object)e);
                this.context.raiseError((Exception)new ConnectException("Unexpected error from FileSystemMonitorThread", e));
            });
            this.fsMonitorThread.start();
            LOG.info("Started FilePulse source connector: {}", (Object)this.connectorGroupName);
        }
        catch (Exception e3) {
            this.closeResources();
            throw e3;
        }
    }

    public Class<? extends Task> taskClass() {
        return FilePulseSourceTask.class;
    }

    public List<Map<String, String>> taskConfigs(int maxTasks) {
        LOG.info("Creating new tasks configurations (maxTasks={})", (Object)maxTasks);
        if (this.connectorConfig.isFileListingTaskDelegationEnabled()) {
            ArrayList<Map<String, String>> taskConfigs = new ArrayList<Map<String, String>>(maxTasks);
            IntStream.range(0, maxTasks).forEachOrdered(i -> taskConfigs.add(this.createTaskConfig(i, maxTasks, 0L, null)));
            return taskConfigs;
        }
        List<List<String>> partitioned = this.partitionAndGet(maxTasks);
        long taskConfigsGen = this.taskConfigsGeneration.getAndIncrement();
        ArrayList<Map<String, String>> taskConfigs = new ArrayList<Map<String, String>>(partitioned.size());
        if (partitioned.isEmpty()) {
            if (taskConfigsGen > 0L) {
                LOG.info("No object file was found - skip task reconfiguration.");
                return taskConfigs;
            }
            LOG.info("No object file was found - resetting all tasks with an empty config.");
            IntStream.range(0, maxTasks).forEachOrdered(i -> taskConfigs.add(this.createTaskConfig(i, maxTasks, taskConfigsGen, Collections.emptyList())));
        } else {
            IntStream.range(0, partitioned.size()).forEachOrdered(i -> taskConfigs.add(this.createTaskConfig(i, partitioned.size(), taskConfigsGen, (List)partitioned.get(i))));
        }
        IntStream.range(0, partitioned.size()).forEachOrdered(i -> LOG.info("Created config for task_id={} with '{}' object files (task_config_gen={}).", new Object[]{i, ((List)partitioned.get(i)).size(), taskConfigsGen}));
        return taskConfigs;
    }

    private List<List<String>> partitionAndGet(int maxTasks) {
        List files = this.monitor.listFilesToSchedule(this.connectorConfig.getMaxScheduledFiles());
        return this.partitioner.partition((Collection)files, maxTasks).stream().map(it -> it.stream().map(Object::toString).collect(Collectors.toList())).collect(Collectors.toList());
    }

    private Map<String, String> createTaskConfig(int taskId, int taskCount, long taskConfigGen, List<String> URIs) {
        HashMap<String, String> taskConfig = new HashMap<String, String>(this.configProperties);
        taskConfig.put("task.generation.id", String.valueOf(taskConfigGen));
        if (this.connectorConfig.isFileListingTaskDelegationEnabled()) {
            taskConfig.put("file.uris.provider", DelegateTaskFileURIProvider.class.getName());
            taskConfig.put("task.partitioner.class", HashByURITaskPartitioner.class.getName());
            taskConfig.put("task.id", String.valueOf(taskId));
            taskConfig.put("task.count", String.valueOf(taskCount));
            taskConfig.put("tasks.file.status.storage.consumer.enabled", "true");
        } else {
            taskConfig.put("file.uris.provider", DefaultTaskFileURIProvider.class.getName());
            taskConfig.put("file.object.uris", String.join((CharSequence)",", URIs));
            taskConfig.put("tasks.file.status.storage.consumer.enabled", "false");
        }
        return taskConfig;
    }

    public void stop() {
        LOG.info("Stopping FilePulse source connector");
        this.closeResources();
        LOG.info("Stopped FilePulse source connector");
    }

    private void closeResources() {
        if (this.closed.compareAndSet(false, true)) {
            LOG.info("Closing resources for FilePulse source connector");
            try {
                if (this.fsMonitorThread != null) {
                    try {
                        this.fsMonitorThread.shutdown(5000L);
                        this.fsMonitorThread.join(5000L);
                    }
                    catch (InterruptedException e) {
                        LOG.warn("Failed to close file-system monitoring thread. Error: {}", (Object)e.getMessage());
                    }
                }
                if (this.partitioner != null) {
                    try {
                        this.partitioner.close();
                    }
                    catch (Exception e) {
                        LOG.warn("Failed to close TaskPartition. Error: {}", (Object)e.getMessage());
                    }
                }
            }
            finally {
                this.closeSharedStateBackingStore();
            }
            LOG.info("Closed resources for FilePulse source connector");
        }
    }

    public ConfigDef config() {
        return SourceConnectorConfig.getConf();
    }

    private void closeSharedStateBackingStore() {
        try {
            if (this.sharedStore != null) {
                this.sharedStore.close();
            }
        }
        catch (Exception exception) {
            LOG.error("Failed to shared StateBackingStore '{}'", (Object)this.connectorGroupName);
        }
    }
}

