/*
 * Decompiled with CFR 0.152.
 */
package io.streamthoughts.kafka.connect.filepulse.source;

import io.streamthoughts.kafka.connect.filepulse.config.TaskConfig;
import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.filter.DefaultRecordFilterPipeline;
import io.streamthoughts.kafka.connect.filepulse.filter.RecordFilterPipeline;
import io.streamthoughts.kafka.connect.filepulse.offset.OffsetManager;
import io.streamthoughts.kafka.connect.filepulse.offset.SimpleOffsetManager;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import io.streamthoughts.kafka.connect.filepulse.source.DefaultFileRecordsPollingConsumer;
import io.streamthoughts.kafka.connect.filepulse.source.FileContext;
import io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import io.streamthoughts.kafka.connect.filepulse.source.KafkaFileStateReporter;
import io.streamthoughts.kafka.connect.filepulse.source.SourceFile;
import io.streamthoughts.kafka.connect.filepulse.source.SourceMetadata;
import io.streamthoughts.kafka.connect.filepulse.source.SourceStatus;
import io.streamthoughts.kafka.connect.filepulse.state.FileStateBackingStore;
import io.streamthoughts.kafka.connect.filepulse.state.StateBackingStoreRegistry;
import io.streamthoughts.kafka.connect.filepulse.storage.StateBackingStore;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FilePulseSourceTask
extends SourceTask {
    private static final Logger LOG = LoggerFactory.getLogger(FilePulseSourceTask.class);
    private static final Integer NO_PARTITION = null;
    public TaskConfig config;
    private String topic;
    private DefaultFileRecordsPollingConsumer consumer;
    private OffsetManager offsetManager;
    private StateBackingStore<SourceFile> store;
    private KafkaFileStateReporter reporter;
    private volatile FileContext contextToBeCommitted;

    public String version() {
        return new FilePulseSourceConnector().version();
    }

    public void start(Map<String, String> props) {
        LOG.info("Starting task");
        this.config = new TaskConfig(props);
        this.offsetManager = new SimpleOffsetManager(this.config.offsetStrategy());
        this.store = this.getStateStatesBackingStore();
        this.topic = this.config.topic();
        this.consumer = this.newFileRecordsPollingConsumer();
        this.reporter = new KafkaFileStateReporter(this.store, this.offsetManager);
        this.consumer.setFileListener(this.reporter);
        this.consumer.addAll(this.config.files());
    }

    private DefaultFileRecordsPollingConsumer newFileRecordsPollingConsumer() {
        DefaultRecordFilterPipeline filter = new DefaultRecordFilterPipeline(this.config.filters());
        return new DefaultFileRecordsPollingConsumer(this.context, this.config.reader(), (RecordFilterPipeline<FileRecord<TypedStruct>>)filter, this.offsetManager, this.config.isReadCommittedFile());
    }

    private StateBackingStore<SourceFile> getStateStatesBackingStore() {
        String groupId = this.config.getTasksReporterGroupId();
        StateBackingStoreRegistry.instance().register(groupId, () -> {
            Map<String, Object> configs = this.config.getInternalKafkaReporterConfig();
            String stateStoreTopic = this.config.getTaskReporterTopic();
            FileStateBackingStore store = new FileStateBackingStore(stateStoreTopic, groupId, configs, true);
            store.start();
            return store;
        });
        return StateBackingStoreRegistry.instance().get(groupId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<SourceRecord> poll() throws InterruptedException {
        this.contextToBeCommitted = this.consumer.context();
        if (!this.consumer.hasNext()) {
            this.contextToBeCommitted = null;
            LOG.info("Orphan task detected - all scheduled files are now completed - waiting for new reconfiguration.");
            FilePulseSourceTask filePulseSourceTask = this;
            synchronized (filePulseSourceTask) {
                ((Object)((Object)this)).wait();
            }
            return null;
        }
        RecordsIterable<FileRecord<TypedStruct>> records = this.consumer.next();
        if (records.isEmpty() && this.consumer.hasNext()) {
            Thread.sleep(500L);
            records = this.consumer.next();
        }
        FileContext context = this.consumer.context();
        if (records != null && !records.isEmpty()) {
            return records.stream().map(r -> this.buildSourceRecord(context, (FileRecord<?>)r)).collect(Collectors.toList());
        }
        return null;
    }

    public void commit() {
        if (this.contextToBeCommitted != null) {
            this.reporter.notify(this.contextToBeCommitted.metadata(), this.contextToBeCommitted.offset(), SourceStatus.READING);
        }
    }

    private SourceRecord buildSourceRecord(FileContext context, FileRecord<?> record) {
        SourceMetadata metadata = context.metadata();
        Map sourcePartition = this.offsetManager.toPartitionMap(metadata);
        Map sourceOffsets = this.offsetManager.toOffsetMap(record.offset().toSourceOffset());
        return record.toSourceRecord(sourcePartition, sourceOffsets, context.metadata(), this.topic, NO_PARTITION);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        LOG.info("Stopping task.");
        FilePulseSourceTask filePulseSourceTask = this;
        synchronized (filePulseSourceTask) {
            if (this.consumer != null) {
                this.consumer.close();
                ((Object)((Object)this)).notify();
            }
            if (this.store != null) {
                StateBackingStoreRegistry.instance().release(this.config.getTasksReporterGroupId());
            }
        }
        LOG.info("Task stopped.");
    }
}

