/*
 * Decompiled with CFR 0.152.
 */
package io.streamthoughts.kafka.connect.filepulse.source;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
import io.streamthoughts.kafka.connect.filepulse.filter.FilterException;
import io.streamthoughts.kafka.connect.filepulse.filter.RecordFilterPipeline;
import io.streamthoughts.kafka.connect.filepulse.offset.OffsetManager;
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputReader;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import io.streamthoughts.kafka.connect.filepulse.source.FileContext;
import io.streamthoughts.kafka.connect.filepulse.source.FileInputIterable;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecordsPollingConsumer;
import io.streamthoughts.kafka.connect.filepulse.source.SourceMetadata;
import io.streamthoughts.kafka.connect.filepulse.source.SourceOffset;
import io.streamthoughts.kafka.connect.filepulse.source.StateListener;
import java.io.File;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultFileRecordsPollingConsumer
implements FileRecordsPollingConsumer<FileRecord<TypedStruct>> {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultFileRecordsPollingConsumer.class);
    private final Queue<FileInputIterable> queue;
    private final boolean ignoreCommittedOffsets;
    private final FileInputReader reader;
    private final RecordFilterPipeline<FileRecord<TypedStruct>> pipeline;
    private final OffsetManager offsetManager;
    private StateListener listener;
    private final SourceTaskContext taskContext;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private FileRecord latestPollRecord;
    private FileInputIterator<FileRecord<TypedStruct>> currentIterator;

    DefaultFileRecordsPollingConsumer(SourceTaskContext taskContext, FileInputReader reader, RecordFilterPipeline<FileRecord<TypedStruct>> pipeline, OffsetManager offsetManager, boolean ignoreCommittedOffsets) {
        this.queue = new LinkedBlockingQueue<FileInputIterable>();
        this.ignoreCommittedOffsets = ignoreCommittedOffsets;
        this.reader = reader;
        this.pipeline = pipeline;
        this.offsetManager = offsetManager;
        this.taskContext = taskContext;
    }

    void addAll(List<String> files) {
        if (this.isClose()) {
            throw new IllegalStateException("Can't add new input files, consumer is closed");
        }
        List iterables = files.stream().map(this.toIterable()).filter(this.excludeUnreadableAndNotify()).peek(it -> {
            if (this.hasListener()) {
                this.listener.onScheduled(new FileContext(it.metadata()));
            }
        }).collect(Collectors.toList());
        this.queue.addAll(iterables);
    }

    private Function<String, FileInputIterable> toIterable() {
        return file -> new FileInputIterable(new File((String)file), this.reader);
    }

    private Predicate<FileInputIterable> excludeUnreadableAndNotify() {
        return it -> {
            boolean valid = it.isValid();
            if (!valid) {
                LOG.error("Invalid source, file doesn't exist or is not readable - ignore : {}", (Object)it.file().getAbsolutePath());
                this.listener.onInvalid(new FileContext(it.metadata()));
            }
            return valid;
        };
    }

    public FileContext context() {
        if (this.currentIterator != null) {
            FileContext context = this.currentIterator.context();
            if (this.latestPollRecord != null) {
                context = new FileContext(context.metadata(), this.latestPollRecord.offset().toSourceOffset());
            }
            return context;
        }
        return null;
    }

    public void seekTo(SourceOffset offset) {
        throw new UnsupportedOperationException();
    }

    public RecordsIterable<FileRecord<TypedStruct>> next() {
        if (this.queue.isEmpty()) {
            return RecordsIterable.empty();
        }
        do {
            FileInputIterable iterable;
            this.currentIterator = (iterable = this.queue.peek()).isOpen() ? this.getOrCloseIteratorIfNoMoreRecord(iterable) : this.openAndGetIteratorOrNullIfInvalid(this.taskContext, iterable);
        } while (!this.queue.isEmpty() && this.currentIterator == null);
        if (this.currentIterator == null) {
            return RecordsIterable.empty();
        }
        RecordsIterable records = this.currentIterator.next();
        Throwable exception = null;
        try {
            RecordsIterable filtered = this.pipeline.apply(records, this.currentIterator.hasNext());
            if (!filtered.isEmpty()) {
                this.latestPollRecord = (FileRecord)filtered.last();
            }
            RecordsIterable recordsIterable = filtered;
            return recordsIterable;
        }
        catch (FilterException e) {
            exception = e;
            RecordsIterable recordsIterable = RecordsIterable.empty();
            return recordsIterable;
        }
        catch (ConnectFilePulseException e) {
            exception = e;
            throw e;
        }
        catch (Exception e) {
            exception = e;
            throw new ConnectFilePulseException((Throwable)e);
        }
        finally {
            if (exception != null) {
                this.closeIterator(this.currentIterator, (Exception)exception);
            }
        }
    }

    public boolean hasNext() {
        return !this.queue.isEmpty();
    }

    public void close() {
        FileInputIterable monitor;
        while ((monitor = this.queue.poll()) != null) {
            try {
                monitor.close();
            }
            catch (Exception exception) {}
        }
        this.reader.close();
        this.closed.set(true);
    }

    public boolean isClose() {
        return this.closed.get();
    }

    public void setFileListener(StateListener listener) {
        this.listener = listener;
    }

    private FileInputIterator<FileRecord<TypedStruct>> openAndGetIteratorOrNullIfInvalid(SourceTaskContext context, FileInputIterable iterable) {
        FileInputIterator newIterator = null;
        SourceMetadata metadata = iterable.metadata();
        try {
            if (!iterable.isValid()) {
                LOG.error("File does not exist or is not readable, skip entry and continue '{}'", (Object)metadata.absolutePath());
                this.deleteFileQueueAndInvokeListener(new FileContext(metadata), null);
                return null;
            }
            SourceOffset committedOffset = !this.ignoreCommittedOffsets ? this.offsetManager.getOffsetFor(context, metadata).orElse(SourceOffset.empty()) : SourceOffset.empty();
            if (!this.ignoreCommittedOffsets && FileInputIterable.isAlreadyCompleted((SourceOffset)committedOffset, (SourceMetadata)metadata)) {
                LOG.warn("Detected source file already completed, skip entry and continue '{}'", (Object)metadata.absolutePath());
                this.deleteFileQueueAndInvokeListener(new FileContext(metadata, committedOffset), null);
            } else {
                newIterator = iterable.open(committedOffset);
                this.pipeline.init(newIterator.context());
                if (this.hasListener()) {
                    this.listener.onStart(newIterator.context());
                }
            }
        }
        catch (Exception e) {
            this.deleteFileQueueAndInvokeListener(new FileContext(metadata), e);
        }
        return newIterator;
    }

    private FileInputIterator<FileRecord<TypedStruct>> getOrCloseIteratorIfNoMoreRecord(FileInputIterable iterable) {
        FileInputIterator currItr = iterable.iterator();
        if (currItr.hasNext()) {
            return currItr;
        }
        this.closeIterator((FileInputIterator<FileRecord<TypedStruct>>)currItr, null);
        return null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeIterator(FileInputIterator<FileRecord<TypedStruct>> iterator, Exception cause) {
        try {
            iterator.close();
        }
        catch (Exception e) {
            LOG.debug("Error while closing file '{}'", (Object)iterator.context(), (Object)e);
        }
        finally {
            this.deleteFileQueueAndInvokeListener(iterator.context(), cause);
        }
    }

    private void deleteFileQueueAndInvokeListener(FileContext taskContext, Throwable exception) {
        this.queue.remove();
        if (this.hasListener()) {
            if (exception != null) {
                this.listener.onFailure(taskContext, exception);
            } else {
                this.listener.onCompleted(taskContext);
            }
        }
    }

    private boolean hasListener() {
        return this.listener != null;
    }
}

