/*
 * Decompiled with CFR 0.152.
 */
package io.streamthoughts.kafka.connect.filepulse.reader;

import io.streamthoughts.kafka.connect.filepulse.data.TypedStruct;
import io.streamthoughts.kafka.connect.filepulse.reader.AbstractFileInputIterator;
import io.streamthoughts.kafka.connect.filepulse.reader.IteratorManager;
import io.streamthoughts.kafka.connect.filepulse.reader.ReaderException;
import io.streamthoughts.kafka.connect.filepulse.reader.RecordsIterable;
import io.streamthoughts.kafka.connect.filepulse.reader.RowFileRecordOffset;
import io.streamthoughts.kafka.connect.filepulse.reader.internal.NonBlockingBufferReader;
import io.streamthoughts.kafka.connect.filepulse.reader.internal.ReversedInputFileReader;
import io.streamthoughts.kafka.connect.filepulse.reader.internal.TextBlock;
import io.streamthoughts.kafka.connect.filepulse.source.FileContext;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecord;
import io.streamthoughts.kafka.connect.filepulse.source.FileRecordOffset;
import io.streamthoughts.kafka.connect.filepulse.source.SourceOffset;
import io.streamthoughts.kafka.connect.filepulse.source.TypedFileRecord;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RowFileInputIterator
extends AbstractFileInputIterator<TypedStruct> {
    private static final Logger LOG = LoggerFactory.getLogger(RowFileInputIterator.class);
    private static final String HEADERS_RECORD_FIELD = "headers";
    private static final String FOOTERS_RECORD_FIELD = "footers";
    private final NonBlockingBufferReader reader;
    private int minNumReadRecords = 0;
    private int skipHeaders = 0;
    private int skipFooters = 0;
    private List<TextBlock> headers;
    private List<String> headerStrings;
    private List<TextBlock> footers;
    private List<String> footersStrings;
    private long offsetLines = 0L;
    private final Charset charset;
    private long maxWaitMs = 0L;
    private long lastObservedRecords = -1L;
    private AtomicBoolean initialized = new AtomicBoolean(false);

    private RowFileInputIterator(FileContext context, NonBlockingBufferReader reader, IteratorManager iteratorManager, Charset charset) {
        super(iteratorManager, context);
        Objects.requireNonNull(reader, "reader can't be null");
        Objects.requireNonNull(iteratorManager, "iteratorManager can't be null");
        Objects.requireNonNull(charset, "charset can't be null");
        this.reader = reader;
        this.charset = charset;
    }

    private void setMinNumReadRecords(int minNumReadRecords) {
        this.minNumReadRecords = minNumReadRecords;
    }

    private void setSkipHeaders(int skipHeaders) {
        this.skipHeaders = skipHeaders;
    }

    private void setSkipFooters(int skipFooters) {
        this.skipFooters = skipFooters;
    }

    private void setMaxWaitMs(long maxWaitMs) {
        this.maxWaitMs = maxWaitMs;
    }

    public void seekTo(SourceOffset offset) {
        Objects.requireNonNull(offset, "offset can't be null");
        if (offset.position() != -1L) {
            this.offsetLines = offset.rows();
            this.reader.seekTo(offset.position());
        }
        this.updateContext();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public RecordsIterable<FileRecord<TypedStruct>> next() {
        try {
            this.initializeIfNeeded();
            this.mayWaitForLinesToBeAvailable();
            LinkedList<FileRecord<TypedStruct>> records = new LinkedList<FileRecord<TypedStruct>>();
            List<TextBlock> lines = this.reader.readLines(this.minNumReadRecords, false);
            if (lines != null) {
                for (TextBlock line : lines) {
                    ++this.offsetLines;
                    if (!this.isNotLineFooter(line) || !this.isNotLineHeader(line)) continue;
                    records.add(this.createOutputRecord(line));
                }
            }
            if (!records.isEmpty() && this.canWaitForMoreRecords()) {
                this.lastObservedRecords = Time.SYSTEM.milliseconds();
            }
            RecordsIterable recordsIterable = new RecordsIterable(records);
            return recordsIterable;
        }
        catch (IOException iOException) {
        }
        finally {
            this.updateContext();
        }
        return null;
    }

    private void mayWaitForLinesToBeAvailable() {
        if (!this.reader.hasNext()) {
            LOG.debug("Waiting for more bytes from file {} (timeout={}ms)", (Object)this.context.metadata(), (Object)this.maxWaitMs);
            long timeout = this.lastObservedRecords + this.maxWaitMs;
            while (!this.reader.hasNext() && this.canWaitForMoreRecords()) {
                Time.SYSTEM.sleep(Math.min(100L, Math.abs(timeout - Time.SYSTEM.milliseconds())));
            }
        }
        if (!this.reader.hasNext() && !this.canWaitForMoreRecords()) {
            LOG.info("Timeout after waiting for more bytes from file {} after '{}ms'.", (Object)this.context.metadata(), (Object)this.maxWaitMs);
            if (this.reader.remaining()) {
                LOG.info("Remaining buffered bytes detected");
                this.reader.enableAutoFlush();
            }
        }
    }

    private void updateContext() {
        SourceOffset offset = new SourceOffset(this.reader.position(), this.offsetLines, Time.SYSTEM.milliseconds());
        this.context = this.context.withOffset(offset);
    }

    private FileRecord<TypedStruct> createOutputRecord(TextBlock record) {
        TypedStruct struct = TypedStruct.create();
        struct.put("message", record.data());
        if (this.skipHeaders > 0) {
            struct.put(HEADERS_RECORD_FIELD, this.headerStrings);
        }
        if (this.skipFooters > 0) {
            struct.put(FOOTERS_RECORD_FIELD, this.footersStrings);
        }
        RowFileRecordOffset offset = RowFileRecordOffset.with(record.startOffset(), record.endOffset()).withSize(record.size()).withRowNumber(this.offsetLines);
        return new TypedFileRecord((FileRecordOffset)offset, struct);
    }

    private void initializeIfNeeded() {
        if (!this.initialized.get()) {
            this.mayReadHeaders();
            this.mayReadFooters();
            this.lastObservedRecords = Time.SYSTEM.milliseconds();
            this.initialized.set(true);
        }
    }

    public boolean hasNext() {
        return this.reader.hasNext() || this.reader.remaining() || this.canWaitForMoreRecords();
    }

    private boolean canWaitForMoreRecords() {
        return this.lastObservedRecords + this.maxWaitMs > Time.SYSTEM.milliseconds();
    }

    @Override
    public void close() {
        if (!this.isClose()) {
            if (this.reader != null) {
                this.reader.close();
            }
            super.close();
        }
    }

    private void mayReadFooters() {
        String fileName = this.context.metadata().name();
        String path = this.context.metadata().absolutePath();
        if (this.skipFooters > 0) {
            LOG.info("Starting to read footer lines ({}) from file {}", (Object)this.skipFooters, (Object)fileName);
            try (ReversedInputFileReader reversedReader = new ReversedInputFileReader(path, this.charset);){
                this.footers = reversedReader.readLines(this.skipFooters);
            }
            catch (Exception e) {
                throw new RuntimeException("", e);
            }
            if (this.footers.size() < this.skipFooters) {
                throw new ReaderException("Not enough value for reading footer lines from file " + path + " (available=" + this.footers.size() + ", expecting=" + this.skipFooters + ")");
            }
            Collections.reverse(this.footers);
            this.footersStrings = this.footers.stream().map(TextBlock::data).collect(Collectors.toList());
        }
    }

    private void mayReadHeaders() {
        String fileName = this.context.metadata().name();
        String path = this.context.metadata().absolutePath();
        if (this.skipHeaders > 0) {
            LOG.info("Starting to read header lines ({}) from file {}", (Object)this.skipHeaders, (Object)fileName);
            try (NonBlockingBufferReader sequentialReader = new NonBlockingBufferReader(new File(path), this.charset);){
                this.headers = sequentialReader.readLines(this.skipHeaders, true);
                this.headerStrings = this.headers.stream().map(TextBlock::data).collect(Collectors.toList());
            }
            catch (Exception e) {
                throw new RuntimeException("", e);
            }
            if (this.headers.size() < this.skipHeaders) {
                throw new ReaderException(String.format("Not enough value for reading header lines from file %s (available=%d, expecting=%d)", path, this.headers.size(), this.skipHeaders));
            }
        }
    }

    private boolean isNotLineHeader(TextBlock source) {
        return this.skipHeaders <= 0 || source.startOffset() > this.headers.get(this.skipHeaders - 1).startOffset();
    }

    private boolean isNotLineFooter(TextBlock source) {
        return this.skipFooters <= 0 || source.startOffset() < this.footers.get(0).startOffset();
    }

    static Builder newBuilder() {
        return new Builder();
    }

    public static class Builder {
        private Charset charset = StandardCharsets.UTF_8;
        private int minNumReadRecords = 1;
        private FileContext context;
        private int initialBufferSize = 4096;
        private int skipHeaders;
        private int skipFooters;
        private long waitMaxMs;
        private IteratorManager iteratorManager;

        private Builder() {
        }

        Builder withContext(FileContext context) {
            this.context = context;
            return this;
        }

        Builder withSkipHeaders(int skipHeaders) {
            this.skipHeaders = skipHeaders;
            return this;
        }

        Builder withSkipFooters(int skipFooters) {
            this.skipFooters = skipFooters;
            return this;
        }

        Builder withMinNumReadRecords(int minNumReadRecords) {
            this.minNumReadRecords = minNumReadRecords;
            return this;
        }

        Builder withMaxWaitMs(long maxWaitMs) {
            this.waitMaxMs = maxWaitMs;
            return this;
        }

        Builder withInitialBufferSize(int initialBufferSize) {
            this.initialBufferSize = initialBufferSize;
            return this;
        }

        Builder withCharset(Charset charset) {
            this.charset = charset;
            return this;
        }

        Builder withIteratorManager(IteratorManager iteratorManager) {
            this.iteratorManager = iteratorManager;
            return this;
        }

        RowFileInputIterator build() {
            this.validateNotNull(this.context, "context");
            NonBlockingBufferReader reader = new NonBlockingBufferReader(this.context.file(), this.initialBufferSize, this.charset);
            reader.disableAutoFlush();
            RowFileInputIterator iterator = new RowFileInputIterator(this.context, reader, this.iteratorManager, this.charset);
            iterator.setSkipFooters(this.skipFooters);
            iterator.setSkipHeaders(this.skipHeaders);
            iterator.setMinNumReadRecords(this.minNumReadRecords);
            iterator.setMaxWaitMs(this.waitMaxMs);
            return iterator;
        }

        private void validateNotNull(Object o, String property) {
            if (o == null) {
                throw new IllegalStateException("Error while building new RowFileInputIterator. The property " + property + " is null.");
            }
        }
    }
}

