/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals.FetchConfig;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals.FetchMetricsAggregator;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.IsolationLevel;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.KafkaException;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.errors.SerializationException;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.message.FetchResponseData;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.record.ControlRecordType;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.record.Record;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.record.RecordBatch;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.record.TimestampType;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.requests.FetchResponse;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.utils.BufferSupplier;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.utils.CloseableIterator;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.utils.LogContext;
import org.apache.flink.avro.registry.confluent.shaded.org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

class CompletedFetch<K, V> {
    final TopicPartition partition;
    final FetchResponseData.PartitionData partitionData;
    final short requestVersion;
    long nextFetchOffset;
    Optional<Integer> lastEpoch;
    boolean isConsumed = false;
    boolean initialized = false;
    private final Logger log;
    private final SubscriptionState subscriptions;
    private final FetchConfig<K, V> fetchConfig;
    private final BufferSupplier decompressionBufferSupplier;
    private final Iterator<? extends RecordBatch> batches;
    private final Set<Long> abortedProducerIds;
    private final PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions;
    private final FetchMetricsAggregator metricAggregator;
    private int recordsRead;
    private int bytesRead;
    private RecordBatch currentBatch;
    private Record lastRecord;
    private CloseableIterator<Record> records;
    private Exception cachedRecordException = null;
    private boolean corruptLastRecord = false;

    CompletedFetch(LogContext logContext, SubscriptionState subscriptions, FetchConfig<K, V> fetchConfig, BufferSupplier decompressionBufferSupplier, TopicPartition partition, FetchResponseData.PartitionData partitionData, FetchMetricsAggregator metricAggregator, Long fetchOffset, short requestVersion) {
        this.log = logContext.logger(CompletedFetch.class);
        this.subscriptions = subscriptions;
        this.fetchConfig = fetchConfig;
        this.decompressionBufferSupplier = decompressionBufferSupplier;
        this.partition = partition;
        this.partitionData = partitionData;
        this.metricAggregator = metricAggregator;
        this.batches = FetchResponse.recordsOrFail(partitionData).batches().iterator();
        this.nextFetchOffset = fetchOffset;
        this.requestVersion = requestVersion;
        this.lastEpoch = Optional.empty();
        this.abortedProducerIds = new HashSet<Long>();
        this.abortedTransactions = this.abortedTransactions(partitionData);
    }

    void recordAggregatedMetrics(int bytes, int records) {
        this.metricAggregator.record(this.partition, bytes, records);
    }

    void drain() {
        if (!this.isConsumed) {
            this.maybeCloseRecordStream();
            this.cachedRecordException = null;
            this.isConsumed = true;
            this.recordAggregatedMetrics(this.bytesRead, this.recordsRead);
            if (this.bytesRead > 0) {
                this.subscriptions.movePartitionToEnd(this.partition);
            }
        }
    }

    private void maybeEnsureValid(RecordBatch batch) {
        if (this.fetchConfig.checkCrcs && batch.magic() >= 2) {
            try {
                batch.ensureValid();
            }
            catch (CorruptRecordException e) {
                throw new KafkaException("Record batch for partition " + this.partition + " at offset " + batch.baseOffset() + " is invalid, cause: " + e.getMessage());
            }
        }
    }

    private void maybeEnsureValid(Record record) {
        if (this.fetchConfig.checkCrcs) {
            try {
                record.ensureValid();
            }
            catch (CorruptRecordException e) {
                throw new KafkaException("Record for partition " + this.partition + " at offset " + record.offset() + " is invalid, cause: " + e.getMessage());
            }
        }
    }

    private void maybeCloseRecordStream() {
        if (this.records != null) {
            this.records.close();
            this.records = null;
        }
    }

    private Record nextFetchedRecord() {
        while (true) {
            if (this.records == null || !this.records.hasNext()) {
                this.maybeCloseRecordStream();
                if (!this.batches.hasNext()) {
                    if (this.currentBatch != null) {
                        this.nextFetchOffset = this.currentBatch.nextOffset();
                    }
                    this.drain();
                    return null;
                }
                this.currentBatch = this.batches.next();
                this.lastEpoch = this.maybeLeaderEpoch(this.currentBatch.partitionLeaderEpoch());
                this.maybeEnsureValid(this.currentBatch);
                if (this.fetchConfig.isolationLevel == IsolationLevel.READ_COMMITTED && this.currentBatch.hasProducerId()) {
                    this.consumeAbortedTransactionsUpTo(this.currentBatch.lastOffset());
                    long producerId = this.currentBatch.producerId();
                    if (this.containsAbortMarker(this.currentBatch)) {
                        this.abortedProducerIds.remove(producerId);
                    } else if (this.isBatchAborted(this.currentBatch)) {
                        this.log.debug("Skipping aborted record batch from partition {} with producerId {} and offsets {} to {}", new Object[]{this.partition, producerId, this.currentBatch.baseOffset(), this.currentBatch.lastOffset()});
                        this.nextFetchOffset = this.currentBatch.nextOffset();
                        continue;
                    }
                }
                this.records = this.currentBatch.streamingIterator(this.decompressionBufferSupplier);
                continue;
            }
            Record record = (Record)this.records.next();
            if (record.offset() < this.nextFetchOffset) continue;
            this.maybeEnsureValid(record);
            if (!this.currentBatch.isControlBatch()) {
                return record;
            }
            this.nextFetchOffset = record.offset() + 1L;
        }
    }

    List<ConsumerRecord<K, V>> fetchRecords(int maxRecords) {
        ArrayList<ConsumerRecord<K, V>> records;
        block9: {
            if (this.corruptLastRecord) {
                throw new KafkaException("Received exception when fetching the next record from " + this.partition + ". If needed, please seek past the record to continue consumption.", this.cachedRecordException);
            }
            if (this.isConsumed) {
                return Collections.emptyList();
            }
            records = new ArrayList<ConsumerRecord<K, V>>();
            try {
                for (int i = 0; i < maxRecords; ++i) {
                    if (this.cachedRecordException == null) {
                        this.corruptLastRecord = true;
                        this.lastRecord = this.nextFetchedRecord();
                        this.corruptLastRecord = false;
                    }
                    if (this.lastRecord != null) {
                        Optional<Integer> leaderEpoch = this.maybeLeaderEpoch(this.currentBatch.partitionLeaderEpoch());
                        TimestampType timestampType = this.currentBatch.timestampType();
                        ConsumerRecord<K, V> record = this.parseRecord(this.partition, leaderEpoch, timestampType, this.lastRecord);
                        records.add(record);
                        ++this.recordsRead;
                        this.bytesRead += this.lastRecord.sizeInBytes();
                        this.nextFetchOffset = this.lastRecord.offset() + 1L;
                        this.cachedRecordException = null;
                        continue;
                    }
                    break;
                }
            }
            catch (SerializationException se) {
                this.cachedRecordException = se;
                if (records.isEmpty()) {
                    throw se;
                }
            }
            catch (KafkaException e) {
                this.cachedRecordException = e;
                if (!records.isEmpty()) break block9;
                throw new KafkaException("Received exception when fetching the next record from " + this.partition + ". If needed, please seek past the record to continue consumption.", e);
            }
        }
        return records;
    }

    ConsumerRecord<K, V> parseRecord(TopicPartition partition, Optional<Integer> leaderEpoch, TimestampType timestampType, Record record) {
        try {
            long offset = record.offset();
            long timestamp = record.timestamp();
            RecordHeaders headers = new RecordHeaders(record.headers());
            ByteBuffer keyBytes = record.key();
            byte[] keyByteArray = keyBytes == null ? null : Utils.toArray(keyBytes);
            Object key = keyBytes == null ? null : (Object)this.fetchConfig.keyDeserializer.deserialize(partition.topic(), headers, keyByteArray);
            ByteBuffer valueBytes = record.value();
            byte[] valueByteArray = valueBytes == null ? null : Utils.toArray(valueBytes);
            Object value = valueBytes == null ? null : (Object)this.fetchConfig.valueDeserializer.deserialize(partition.topic(), headers, valueByteArray);
            return new ConsumerRecord<Object, Object>(partition.topic(), partition.partition(), offset, timestamp, timestampType, keyByteArray == null ? -1 : keyByteArray.length, valueByteArray == null ? -1 : valueByteArray.length, key, value, headers, leaderEpoch);
        }
        catch (RuntimeException e) {
            throw new RecordDeserializationException(partition, record.offset(), "Error deserializing key/value for partition " + partition + " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e);
        }
    }

    private Optional<Integer> maybeLeaderEpoch(int leaderEpoch) {
        return leaderEpoch == -1 ? Optional.empty() : Optional.of(leaderEpoch);
    }

    private void consumeAbortedTransactionsUpTo(long offset) {
        if (this.abortedTransactions == null) {
            return;
        }
        while (!this.abortedTransactions.isEmpty() && this.abortedTransactions.peek().firstOffset() <= offset) {
            FetchResponseData.AbortedTransaction abortedTransaction = this.abortedTransactions.poll();
            this.abortedProducerIds.add(abortedTransaction.producerId());
        }
    }

    private boolean isBatchAborted(RecordBatch batch) {
        return batch.isTransactional() && this.abortedProducerIds.contains(batch.producerId());
    }

    private PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions(FetchResponseData.PartitionData partition) {
        if (partition.abortedTransactions() == null || partition.abortedTransactions().isEmpty()) {
            return null;
        }
        PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions = new PriorityQueue<FetchResponseData.AbortedTransaction>(partition.abortedTransactions().size(), Comparator.comparingLong(FetchResponseData.AbortedTransaction::firstOffset));
        abortedTransactions.addAll(partition.abortedTransactions());
        return abortedTransactions;
    }

    private boolean containsAbortMarker(RecordBatch batch) {
        if (!batch.isControlBatch()) {
            return false;
        }
        Iterator batchIterator = batch.iterator();
        if (!batchIterator.hasNext()) {
            return false;
        }
        Record firstRecord = (Record)batchIterator.next();
        return ControlRecordType.ABORT == ControlRecordType.parse(firstRecord.key());
    }
}

