/*
 * Decompiled with CFR 0.152.
 */
package io.streamthoughts.kafka.connect.filepulse.source;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.errors.ConnectFilePulseException;
import io.streamthoughts.kafka.connect.filepulse.filter.RecordFilterPipeline;
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputIterator;
import io.streamthoughts.kafka.connect.filepulse.reader.FileInputReader;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import io.streamthoughts.kafka.connect.filepulse.source.DelegateFileInputIterator;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectContext;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectKey;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.FileObjectOffset;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecordsPollingConsumer;
import io.streamthoughts.kafka.connect.filepulse.source.GenericFileObjectMeta;
import io.streamthoughts.kafka.connect.filepulse.source.SourceOffsetPolicy;
import io.streamthoughts.kafka.connect.filepulse.source.StateListener;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.connect.source.SourceTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultFileRecordsPollingConsumer
implements FileRecordsPollingConsumer<FileRecord<TypedStruct>> {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultFileRecordsPollingConsumer.class);
    private final Queue<DelegateFileInputIterator> queue;
    private final boolean ignoreCommittedOffsets;
    private final FileInputReader reader;
    private final RecordFilterPipeline<FileRecord<TypedStruct>> pipeline;
    private final SourceOffsetPolicy offsetPolicy;
    private StateListener listener;
    private final SourceTaskContext taskContext;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private FileRecord<TypedStruct> latestPolledRecord;
    private FileInputIterator<FileRecord<TypedStruct>> currentIterator;

    DefaultFileRecordsPollingConsumer(SourceTaskContext taskContext, FileInputReader reader, RecordFilterPipeline<FileRecord<TypedStruct>> pipeline, SourceOffsetPolicy offsetPolicy, boolean ignoreCommittedOffsets) {
        this.queue = new LinkedBlockingQueue<DelegateFileInputIterator>();
        this.ignoreCommittedOffsets = ignoreCommittedOffsets;
        this.reader = reader;
        this.pipeline = pipeline;
        this.offsetPolicy = offsetPolicy;
        this.taskContext = taskContext;
    }

    void addAll(List<URI> files) {
        if (this.isClosed()) {
            throw new IllegalStateException("Can't add new input files, consumer is closed");
        }
        ArrayList<DelegateFileInputIterator> iterables = new ArrayList<DelegateFileInputIterator>(files.size());
        for (URI uri : files) {
            FileObjectKey key;
            FileObjectMeta objectMeta;
            if (this.reader.canBeRead(uri)) {
                try {
                    objectMeta = this.reader.getObjectMetadata(uri);
                    key = FileObjectKey.of((String)this.offsetPolicy.toPartitionJson(objectMeta));
                }
                catch (Exception e) {
                    throw new ConnectFilePulseException("Failed to compute object-file key while initializing processing for '" + uri + "'.  Connector must be restated.", (Throwable)e);
                }
                iterables.add(new DelegateFileInputIterator(key, uri, this.reader));
                if (!this.hasListener()) continue;
                this.listener.onScheduled(new FileObjectContext(key, objectMeta));
                continue;
            }
            try {
                objectMeta = new GenericFileObjectMeta(uri);
                key = FileObjectKey.of((String)this.offsetPolicy.toPartitionJson(objectMeta));
                if (!this.hasListener()) continue;
                LOG.warn("Object-file does not exist or is not readable. Skip and continue '{}'", (Object)uri);
                this.listener.onInvalid(new FileObjectContext(key, objectMeta));
            }
            catch (Exception e) {
                throw new ConnectFilePulseException("Failed to compute object-file key while initializing processing for '" + uri + "'. Connector must be restated.", (Throwable)e);
            }
        }
        this.queue.addAll(iterables);
    }

    public FileObjectContext context() {
        if (this.currentIterator != null) {
            FileObjectContext context = this.currentIterator.context();
            if (this.latestPolledRecord != null) {
                context = new FileObjectContext(context.key(), context.metadata(), this.latestPolledRecord.offset().toSourceOffset());
            }
            return context;
        }
        return null;
    }

    public void seekTo(FileObjectOffset offset) {
        throw new UnsupportedOperationException();
    }

    public RecordsIterable<FileRecord<TypedStruct>> next() {
        if (this.isClosed()) {
            throw new IllegalStateException("FileRecordsPollingConsumer is closed, no more element can be returned");
        }
        if (!this.hasNext()) {
            return RecordsIterable.empty();
        }
        this.currentIterator = this.findNextFileObjectIterator();
        if (this.currentIterator == null) {
            return RecordsIterable.empty();
        }
        Throwable exception = null;
        try {
            RecordsIterable records = this.currentIterator.next();
            RecordsIterable filtered = this.pipeline.apply(records, this.currentIterator.hasNext());
            if (!filtered.isEmpty()) {
                this.latestPolledRecord = (FileRecord)filtered.last();
            }
            RecordsIterable recordsIterable = filtered;
            return recordsIterable;
        }
        catch (ConnectFilePulseException e) {
            exception = e;
            throw e;
        }
        catch (Exception e) {
            exception = e;
            throw new ConnectFilePulseException((Throwable)e);
        }
        finally {
            if (exception != null) {
                LOG.error("Stopped processing due to error during filter-chain execution for object-file: '{}'", (Object)this.currentIterator.context().metadata());
                this.closeIterator(this.currentIterator, (Exception)exception);
            }
        }
    }

    private FileInputIterator<FileRecord<TypedStruct>> findNextFileObjectIterator() {
        if (this.queue.isEmpty()) {
            return null;
        }
        FileInputIterator<FileRecord<TypedStruct>> ret = null;
        do {
            DelegateFileInputIterator candidate = this.queue.peek();
            GenericFileObjectMeta objectMeta = new GenericFileObjectMeta(candidate.getObjectURI());
            if (candidate.isOpen()) {
                ret = this.getOrCloseIteratorIfNoMoreRecord(candidate);
                continue;
            }
            try {
                if (!candidate.isValid()) {
                    LOG.warn("Object-file does not exist or is not readable. Skip and continue '{}'", (Object)candidate.getObjectURI());
                    this.queue.remove();
                    this.listener.onInvalid(new FileObjectContext(candidate.key(), (FileObjectMeta)objectMeta));
                    continue;
                }
                ret = this.openAndGetIteratorOrNullIfCompleted(candidate);
                if (ret != null) continue;
                this.deleteFileQueueAndInvokeListener(new FileObjectContext(candidate.key(), (FileObjectMeta)objectMeta), null);
            }
            catch (Exception e) {
                LOG.error("Failed to open and initialize new iterator for object-file: {}.", (Object)candidate.getObjectURI());
                this.deleteFileQueueAndInvokeListener(new FileObjectContext(candidate.key(), (FileObjectMeta)objectMeta), e);
                throw e;
            }
        } while (this.hasNext() && ret == null);
        return ret;
    }

    public boolean hasNext() {
        return !this.queue.isEmpty();
    }

    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            DelegateFileInputIterator monitor;
            while ((monitor = this.queue.poll()) != null) {
                try {
                    monitor.close();
                }
                catch (Exception exception) {}
            }
            this.reader.close();
        }
    }

    public boolean isClosed() {
        return this.closed.get();
    }

    public void setStateListener(StateListener listener) {
        this.listener = listener;
    }

    private FileInputIterator<FileRecord<TypedStruct>> openAndGetIteratorOrNullIfCompleted(DelegateFileInputIterator iterator) {
        boolean isAlreadyCompleted;
        FileObjectOffset committedOffset;
        FileObjectMeta metadata = null;
        try {
            metadata = iterator.getMetadata();
            committedOffset = !this.ignoreCommittedOffsets ? this.offsetPolicy.getOffsetFor(this.taskContext, metadata).orElse(FileObjectOffset.empty()) : FileObjectOffset.empty();
        }
        catch (Exception e) {
            if (metadata != null) {
                LOG.warn("Failed to load committed offset for object file {}. Previous offset will be ignored. Error: {}", (Object)iterator.getObjectURI(), (Object)e.getMessage());
                committedOffset = FileObjectOffset.empty();
            }
            throw e;
        }
        boolean bl = isAlreadyCompleted = committedOffset.position() >= metadata.contentLength();
        if (!this.ignoreCommittedOffsets && isAlreadyCompleted) {
            LOG.warn("Detected object-file already completed. Skip entry and continue '{}'", (Object)iterator.getObjectURI());
            return null;
        }
        iterator.open();
        iterator.seekTo(committedOffset);
        this.pipeline.init(iterator.context());
        if (this.hasListener()) {
            this.listener.onStart(iterator.context());
        }
        return iterator;
    }

    private FileInputIterator<FileRecord<TypedStruct>> getOrCloseIteratorIfNoMoreRecord(DelegateFileInputIterator iterable) {
        if (iterable.hasNext()) {
            return iterable;
        }
        this.closeIterator((FileInputIterator<FileRecord<TypedStruct>>)iterable, null);
        return null;
    }

    public void closeCurrentIterator(Exception cause) {
        this.closeIterator(this.currentIterator, cause);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeIterator(FileInputIterator<FileRecord<TypedStruct>> iterator, Exception cause) {
        FileObjectContext context = iterator.context();
        try {
            iterator.close();
        }
        catch (Exception e) {
            LOG.debug("Error while closing iterator for: '{}'", (Object)context.metadata(), (Object)e);
        }
        finally {
            this.deleteFileQueueAndInvokeListener(context, cause);
        }
    }

    private void deleteFileQueueAndInvokeListener(FileObjectContext fileContext, Throwable exception) {
        this.queue.remove();
        if (this.hasListener()) {
            if (exception != null) {
                this.listener.onFailure(fileContext, exception);
            } else {
                this.listener.onCompleted(fileContext);
            }
        }
    }

    private boolean hasListener() {
        return this.listener != null;
    }
}

