/*
 * Decompiled with CFR 0.152.
 */
package io.streamthoughts.kafka.connect.filepulse.source;

import io.streamthoughts.kafka.connect.filepulse.Version;
import io.streamthoughts.kafka.connect.filepulse.clean.FileCleanupPolicy;
import io.streamthoughts.kafka.connect.filepulse.clean.GenericFileCleanupPolicy;
import io.streamthoughts.kafka.connect.filepulse.config.ConnectorConfig;
import io.streamthoughts.kafka.connect.filepulse.offset.OffsetStrategy;
import io.streamthoughts.kafka.connect.filepulse.offset.SimpleOffsetManager;
import io.streamthoughts.kafka.connect.filepulse.scanner.FileSystemScanner;
import io.streamthoughts.kafka.connect.filepulse.scanner.LocalFileSystemScanner;
import io.streamthoughts.kafka.connect.filepulse.scanner.local.FSDirectoryWalker;
import io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.CompositeFileListFilter;
import io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceTask;
import io.streamthoughts.kafka.connect.filepulse.source.FileSystemMonitorThread;
import io.streamthoughts.kafka.connect.filepulse.source.SourceFile;
import io.streamthoughts.kafka.connect.filepulse.state.FileStateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreRegistry;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FilePulseSourceConnector
extends SourceConnector {
    private static final Logger LOG = LoggerFactory.getLogger(FilePulseSourceConnector.class);
    private static final long MAX_TIMEOUT = 5000L;
    private static final String CONNECT_NAME_CONFIG = "name";
    private Map<String, String> configProperties;
    private AtomicInteger taskConfigsGeneration = new AtomicInteger(0);
    private FileSystemMonitorThread fsMonitorThread;
    private ConnectorConfig config;
    private FileSystemScanner scanner;
    private String connectorGroupName;

    public String version() {
        return Version.getVersion();
    }

    public void start(Map<String, String> props) {
        String connectName = props.get(CONNECT_NAME_CONFIG);
        LOG.info("Configuring connector : {}", (Object)connectName);
        try {
            this.configProperties = props;
            this.config = new ConnectorConfig(props);
        }
        catch (ConfigException e2) {
            throw new ConnectException("Couldn't init FilePulseSourceConnector due to configuration error", (Throwable)e2);
        }
        String tasksReporterGroupId = this.config.getTasksReporterGroupId();
        this.connectorGroupName = tasksReporterGroupId != null ? tasksReporterGroupId : connectName;
        StateBackingStoreRegistry.instance().register(this.connectorGroupName, () -> {
            String stateStoreTopic = this.config.getTaskReporterTopic();
            Map<String, Object> configs = this.config.getInternalKafkaReporterConfig();
            return new FileStateBackingStore(stateStoreTopic, this.connectorGroupName, configs, false);
        });
        FSDirectoryWalker directoryScanner = this.config.directoryScanner();
        directoryScanner.setFilter(new CompositeFileListFilter(this.config.filters()));
        FileCleanupPolicy cleaner = this.config.cleanupPolicy();
        OffsetStrategy strategy = this.config.offsetStrategy();
        StateBackingStore<SourceFile> store = StateBackingStoreRegistry.instance().get(this.connectorGroupName);
        try {
            this.scanner = new LocalFileSystemScanner(this.config.scanDirectoryPath(), directoryScanner, (GenericFileCleanupPolicy)cleaner, new SimpleOffsetManager(strategy), store);
            this.fsMonitorThread = new FileSystemMonitorThread(this.context, this.scanner, this.config.scanInternalMs());
            this.fsMonitorThread.setUncaughtExceptionHandler((t, e) -> {
                LOG.info("Uncaught error from file system monitoring thread [{}]", (Object)t.getName(), (Object)e);
                throw new ConnectException(e);
            });
            this.fsMonitorThread.start();
        }
        catch (Exception e3) {
            LOG.error("Closing resources due to an error thrown during initialization of connector {} ", (Object)this.connectorGroupName);
            StateBackingStoreRegistry.instance().release(this.connectorGroupName);
            if (this.fsMonitorThread != null) {
                this.fsMonitorThread.shutdown(0L);
            }
            throw e3;
        }
    }

    public Class<? extends Task> taskClass() {
        return FilePulseSourceTask.class;
    }

    public List<Map<String, String>> taskConfigs(int maxTasks) {
        LOG.info("Creating new tasks configurations (maxTasks={})", (Object)maxTasks);
        List groupFiles = this.scanner.partitionFilesAndGet(maxTasks);
        ArrayList<Map<String, String>> taskConfigs = new ArrayList<Map<String, String>>(groupFiles.size());
        if (!groupFiles.isEmpty()) {
            long taskConfigsGen = this.taskConfigsGeneration.getAndIncrement();
            for (List group : groupFiles) {
                HashMap<String, String> taskProps = new HashMap<String, String>(this.configProperties);
                taskProps.put("internal.kafka.reporter.id", this.connectorGroupName);
                taskProps.put("file.input.paths", String.join((CharSequence)",", group));
                taskConfigs.add(taskProps);
            }
            for (int i = 0; i < groupFiles.size(); ++i) {
                LOG.info("Created config for task_id={} with '{}' source files (task_config_gen={}).", new Object[]{i, ((List)groupFiles.get(i)).size(), taskConfigsGen});
            }
        } else {
            LOG.warn("Failed to create new task configs - no source files found.");
        }
        return taskConfigs;
    }

    public void stop() {
        LOG.info("Stopping connector");
        this.fsMonitorThread.shutdown();
        try {
            StateBackingStoreRegistry.instance().release(this.connectorGroupName);
            this.fsMonitorThread.join(5000L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        LOG.info("Connector stopped");
    }

    public ConfigDef config() {
        return ConnectorConfig.getConf();
    }
}

