/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.ArrayDeque;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.RecordDeserializer;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StampedRecord;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.slf4j.Logger;

public class RecordQueue {
    static final long UNKNOWN = -1L;
    private final Logger log;
    private final SourceNode source;
    private final TopicPartition partition;
    private final ProcessorContext processorContext;
    private final TimestampExtractor timestampExtractor;
    private final RecordDeserializer recordDeserializer;
    private final ArrayDeque<ConsumerRecord<byte[], byte[]>> fifoQueue;
    private StampedRecord headRecord = null;
    private long partitionTime = -1L;

    RecordQueue(TopicPartition partition, SourceNode source, TimestampExtractor timestampExtractor, DeserializationExceptionHandler deserializationExceptionHandler, InternalProcessorContext processorContext, LogContext logContext) {
        this.source = source;
        this.partition = partition;
        this.fifoQueue = new ArrayDeque();
        this.timestampExtractor = timestampExtractor;
        this.recordDeserializer = new RecordDeserializer(source, deserializationExceptionHandler, logContext, processorContext.metrics().skippedRecordsSensor());
        this.processorContext = processorContext;
        this.log = logContext.logger(RecordQueue.class);
    }

    public SourceNode source() {
        return this.source;
    }

    public TopicPartition partition() {
        return this.partition;
    }

    int addRawRecords(Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) {
        for (ConsumerRecord<byte[], byte[]> rawRecord : rawRecords) {
            this.fifoQueue.addLast(rawRecord);
        }
        this.updateHead();
        return this.size();
    }

    public StampedRecord poll() {
        StampedRecord recordToReturn = this.headRecord;
        this.headRecord = null;
        this.updateHead();
        return recordToReturn;
    }

    public int size() {
        return this.fifoQueue.size() + (this.headRecord == null ? 0 : 1);
    }

    public boolean isEmpty() {
        return this.fifoQueue.isEmpty() && this.headRecord == null;
    }

    public long headRecordTimestamp() {
        return this.headRecord == null ? -1L : this.headRecord.timestamp;
    }

    long partitionTime() {
        return this.partitionTime;
    }

    public void clear() {
        this.fifoQueue.clear();
        this.headRecord = null;
        this.partitionTime = -1L;
    }

    private void updateHead() {
        while (this.headRecord == null && !this.fifoQueue.isEmpty()) {
            long timestamp;
            ConsumerRecord<byte[], byte[]> raw = this.fifoQueue.pollFirst();
            ConsumerRecord<Object, Object> deserialized = this.recordDeserializer.deserialize(this.processorContext, raw);
            if (deserialized == null) continue;
            try {
                timestamp = this.timestampExtractor.extract(deserialized, this.partitionTime);
            }
            catch (StreamsException internalFatalExtractorException) {
                throw internalFatalExtractorException;
            }
            catch (Exception fatalUserException) {
                throw new StreamsException(String.format("Fatal user code error in TimestampExtractor callback for record %s.", deserialized), fatalUserException);
            }
            this.log.trace("Source node {} extracted timestamp {} for record {}", new Object[]{this.source.name(), timestamp, deserialized});
            if (timestamp < 0L) {
                this.log.warn("Skipping record due to negative extracted timestamp. topic=[{}] partition=[{}] offset=[{}] extractedTimestamp=[{}] extractor=[{}]", new Object[]{deserialized.topic(), deserialized.partition(), deserialized.offset(), timestamp, this.timestampExtractor.getClass().getCanonicalName()});
                ((StreamsMetricsImpl)this.processorContext.metrics()).skippedRecordsSensor().record();
                continue;
            }
            this.headRecord = new StampedRecord(deserialized, timestamp);
            this.partitionTime = Math.max(this.partitionTime, timestamp);
        }
    }
}

