/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.CorruptedRecord;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.RecordQueue;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsInstanceOf;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class RecordQueueTest {
    private final Serializer<Integer> intSerializer = new IntegerSerializer();
    private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
    private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
    private final Metrics metrics = new Metrics();
    private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, "mock", "latest", (Time)new MockTime());
    final InternalMockProcessorContext<Integer, Integer> context = new InternalMockProcessorContext(StateSerdes.withBuiltinTypes((String)"anyName", Bytes.class, Bytes.class), new MockRecordCollector(), this.metrics);
    private final MockSourceNode<Integer, Integer> mockSourceNodeWithMetrics = new MockSourceNode<Integer, Integer>(this.intDeserializer, this.intDeserializer);
    private final RecordQueue queue = new RecordQueue(new TopicPartition("topic", 1), this.mockSourceNodeWithMetrics, this.timestampExtractor, (DeserializationExceptionHandler)new LogAndFailExceptionHandler(), this.context, new LogContext());
    private final RecordQueue queueThatSkipsDeserializeErrors = new RecordQueue(new TopicPartition("topic", 1), this.mockSourceNodeWithMetrics, this.timestampExtractor, (DeserializationExceptionHandler)new LogAndContinueExceptionHandler(), this.context, new LogContext());
    private final RecordQueue queueThatSkipsInvalidTimestamps = new RecordQueue(new TopicPartition("topic", 1), this.mockSourceNodeWithMetrics, (TimestampExtractor)new LogAndSkipOnInvalidTimestamp(), (DeserializationExceptionHandler)new LogAndFailExceptionHandler(), this.context, new LogContext());
    private final byte[] recordValue = this.intSerializer.serialize(null, (Object)10);
    private final byte[] recordKey = this.intSerializer.serialize(null, (Object)1);

    @Before
    public void before() {
        this.mockSourceNodeWithMetrics.init((InternalProcessorContext<Integer, Integer>)this.context);
    }

    @After
    public void after() {
        this.mockSourceNodeWithMetrics.close();
    }

    @Test
    public void testConsumedSensor() {
        List<ConsumerRecord> records = Arrays.asList(new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()), new ConsumerRecord("topic", 1, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()), new ConsumerRecord("topic", 1, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()));
        this.queue.addRawRecords(records);
        String threadId = Thread.currentThread().getName();
        String taskId = this.context.taskId().toString();
        String processorNodeId = this.mockSourceNodeWithMetrics.name();
        String topic = "topic";
        Metric recordsConsumed = (Metric)this.context.metrics().metrics().get(new MetricName("records-consumed-total", "stream-topic-metrics", "The total number of records consumed from this topic", this.streamsMetrics.topicLevelTagMap(threadId, taskId, processorNodeId, "topic")));
        Metric bytesConsumed = (Metric)this.context.metrics().metrics().get(new MetricName("bytes-consumed-total", "stream-topic-metrics", "The total number of bytes consumed from this topic", this.streamsMetrics.topicLevelTagMap(threadId, taskId, processorNodeId, "topic")));
        double totalBytes = 0.0;
        double totalRecords = 0.0;
        this.queue.poll(5L);
        MatcherAssert.assertThat((Object)bytesConsumed.metricValue(), (Matcher)Matchers.equalTo((Object)(totalBytes += (double)ClientUtils.consumerRecordSizeInBytes((ConsumerRecord)records.get(0)))));
        MatcherAssert.assertThat((Object)recordsConsumed.metricValue(), (Matcher)Matchers.equalTo((Object)(totalRecords += 1.0)));
        this.queue.poll(6L);
        MatcherAssert.assertThat((Object)bytesConsumed.metricValue(), (Matcher)Matchers.equalTo((Object)(totalBytes += (double)ClientUtils.consumerRecordSizeInBytes((ConsumerRecord)records.get(1)))));
        MatcherAssert.assertThat((Object)recordsConsumed.metricValue(), (Matcher)Matchers.equalTo((Object)(totalRecords += 1.0)));
        this.queue.poll(7L);
        MatcherAssert.assertThat((Object)bytesConsumed.metricValue(), (Matcher)Matchers.equalTo((Object)(totalBytes += (double)ClientUtils.consumerRecordSizeInBytes((ConsumerRecord)records.get(2)))));
        MatcherAssert.assertThat((Object)recordsConsumed.metricValue(), (Matcher)Matchers.equalTo((Object)(totalRecords += 1.0)));
    }

    @Test
    public void testTimeTracking() {
        Assert.assertTrue((boolean)this.queue.isEmpty());
        Assert.assertEquals((long)0L, (long)this.queue.size());
        Assert.assertEquals((long)-1L, (long)this.queue.headRecordTimestamp());
        Assert.assertNull((Object)this.queue.headRecordOffset());
        List<ConsumerRecord> list1 = Arrays.asList(new ConsumerRecord("topic", 1, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()), new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()), new ConsumerRecord("topic", 1, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()));
        this.queue.addRawRecords(list1);
        Assert.assertEquals((long)3L, (long)this.queue.size());
        Assert.assertEquals((long)2L, (long)this.queue.headRecordTimestamp());
        Assert.assertEquals((long)2L, (long)this.queue.headRecordOffset());
        Assert.assertEquals((long)2L, (long)this.queue.poll((long)0L).timestamp);
        Assert.assertEquals((long)2L, (long)this.queue.size());
        Assert.assertEquals((long)1L, (long)this.queue.headRecordTimestamp());
        Assert.assertEquals((long)1L, (long)this.queue.headRecordOffset());
        Assert.assertEquals((long)1L, (long)this.queue.poll((long)0L).timestamp);
        Assert.assertEquals((long)1L, (long)this.queue.size());
        Assert.assertEquals((long)3L, (long)this.queue.headRecordTimestamp());
        Assert.assertEquals((long)3L, (long)this.queue.headRecordOffset());
        List<ConsumerRecord> list2 = Arrays.asList(new ConsumerRecord("topic", 1, 4L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()), new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()), new ConsumerRecord("topic", 1, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()));
        this.queue.addRawRecords(list2);
        Assert.assertEquals((long)4L, (long)this.queue.size());
        Assert.assertEquals((long)3L, (long)this.queue.headRecordTimestamp());
        Assert.assertEquals((long)3L, (long)this.queue.headRecordOffset());
        Assert.assertEquals((long)3L, (long)this.queue.poll((long)0L).timestamp);
        Assert.assertEquals((long)3L, (long)this.queue.size());
        Assert.assertEquals((long)4L, (long)this.queue.headRecordTimestamp());
        Assert.assertEquals((long)4L, (long)this.queue.headRecordOffset());
        Assert.assertEquals((long)4L, (long)this.queue.poll((long)0L).timestamp);
        Assert.assertEquals((long)1L, (long)this.queue.headRecordTimestamp());
        Assert.assertEquals((long)1L, (long)this.queue.headRecordOffset());
        Assert.assertEquals((long)1L, (long)this.queue.poll((long)0L).timestamp);
        Assert.assertEquals((long)2L, (long)this.queue.headRecordTimestamp());
        Assert.assertEquals((long)2L, (long)this.queue.headRecordOffset());
        Assert.assertEquals((long)2L, (long)this.queue.poll((long)0L).timestamp);
        Assert.assertTrue((boolean)this.queue.isEmpty());
        Assert.assertEquals((long)0L, (long)this.queue.size());
        Assert.assertEquals((long)-1L, (long)this.queue.headRecordTimestamp());
        Assert.assertNull((Object)this.queue.headRecordOffset());
        List<ConsumerRecord> list3 = Arrays.asList(new ConsumerRecord("topic", 1, 4L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()), new ConsumerRecord("topic", 1, 5L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()), new ConsumerRecord("topic", 1, 6L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()));
        this.queue.addRawRecords(list3);
        Assert.assertEquals((long)3L, (long)this.queue.size());
        Assert.assertEquals((long)4L, (long)this.queue.headRecordTimestamp());
        Assert.assertEquals((long)4L, (long)this.queue.headRecordOffset());
        Assert.assertEquals((long)4L, (long)this.queue.poll((long)0L).timestamp);
        Assert.assertEquals((long)2L, (long)this.queue.size());
        Assert.assertEquals((long)5L, (long)this.queue.headRecordTimestamp());
        Assert.assertEquals((long)5L, (long)this.queue.headRecordOffset());
        this.queue.clear();
        Assert.assertTrue((boolean)this.queue.isEmpty());
        Assert.assertEquals((long)0L, (long)this.queue.size());
        Assert.assertEquals((long)-1L, (long)this.queue.headRecordTimestamp());
        Assert.assertEquals((long)-1L, (long)this.queue.partitionTime());
        Assert.assertNull((Object)this.queue.headRecordOffset());
        this.queue.addRawRecords(list3);
        Assert.assertEquals((long)3L, (long)this.queue.size());
        Assert.assertEquals((long)4L, (long)this.queue.headRecordTimestamp());
        Assert.assertEquals((long)4L, (long)this.queue.headRecordOffset());
    }

    @Test
    public void shouldTrackPartitionTimeAsMaxProcessedTimestamp() {
        Assert.assertTrue((boolean)this.queue.isEmpty());
        MatcherAssert.assertThat((Object)this.queue.size(), (Matcher)CoreMatchers.is((Object)0));
        MatcherAssert.assertThat((Object)this.queue.headRecordTimestamp(), (Matcher)CoreMatchers.is((Object)-1L));
        MatcherAssert.assertThat((Object)this.queue.partitionTime(), (Matcher)CoreMatchers.is((Object)-1L));
        List<ConsumerRecord> list1 = Arrays.asList(new ConsumerRecord("topic", 1, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()), new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()), new ConsumerRecord("topic", 1, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()), new ConsumerRecord("topic", 1, 4L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()));
        this.queue.addRawRecords(list1);
        MatcherAssert.assertThat((Object)this.queue.partitionTime(), (Matcher)CoreMatchers.is((Object)-1L));
        this.queue.poll(0L);
        MatcherAssert.assertThat((Object)this.queue.partitionTime(), (Matcher)CoreMatchers.is((Object)2L));
        this.queue.poll(0L);
        MatcherAssert.assertThat((Object)this.queue.partitionTime(), (Matcher)CoreMatchers.is((Object)2L));
        this.queue.poll(0L);
        MatcherAssert.assertThat((Object)this.queue.partitionTime(), (Matcher)CoreMatchers.is((Object)3L));
    }

    @Test
    public void shouldSetTimestampAndRespectMaxTimestampPolicy() {
        Assert.assertTrue((boolean)this.queue.isEmpty());
        MatcherAssert.assertThat((Object)this.queue.size(), (Matcher)CoreMatchers.is((Object)0));
        MatcherAssert.assertThat((Object)this.queue.headRecordTimestamp(), (Matcher)CoreMatchers.is((Object)-1L));
        MatcherAssert.assertThat((Object)this.queue.partitionTime(), (Matcher)CoreMatchers.is((Object)-1L));
        this.queue.setPartitionTime(150L);
        MatcherAssert.assertThat((Object)this.queue.partitionTime(), (Matcher)CoreMatchers.is((Object)150L));
        List<ConsumerRecord> list1 = Arrays.asList(new ConsumerRecord("topic", 1, 200L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()), new ConsumerRecord("topic", 1, 100L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()), new ConsumerRecord("topic", 1, 300L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()), new ConsumerRecord("topic", 1, 400L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()));
        this.queue.addRawRecords(list1);
        MatcherAssert.assertThat((Object)this.queue.partitionTime(), (Matcher)CoreMatchers.is((Object)150L));
        this.queue.poll(0L);
        MatcherAssert.assertThat((Object)this.queue.partitionTime(), (Matcher)CoreMatchers.is((Object)200L));
        this.queue.setPartitionTime(500L);
        MatcherAssert.assertThat((Object)this.queue.partitionTime(), (Matcher)CoreMatchers.is((Object)500L));
        this.queue.poll(0L);
        MatcherAssert.assertThat((Object)this.queue.partitionTime(), (Matcher)CoreMatchers.is((Object)500L));
    }

    @Test
    public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() {
        byte[] key = new LongSerializer().serialize("foo", Long.valueOf(1L));
        List<ConsumerRecord> records = Collections.singletonList(new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)key, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()));
        StreamsException exception = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.queue.addRawRecords((Iterable)records));
        MatcherAssert.assertThat((Object)exception.getCause(), (Matcher)IsInstanceOf.instanceOf(SerializationException.class));
    }

    @Test
    public void shouldThrowStreamsExceptionWhenValueDeserializationFails() {
        byte[] value = new LongSerializer().serialize("foo", Long.valueOf(1L));
        List<ConsumerRecord> records = Collections.singletonList(new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)value, (Headers)new RecordHeaders(), Optional.empty()));
        StreamsException exception = (StreamsException)Assert.assertThrows(StreamsException.class, () -> this.queue.addRawRecords((Iterable)records));
        MatcherAssert.assertThat((Object)exception.getCause(), (Matcher)IsInstanceOf.instanceOf(SerializationException.class));
    }

    @Test
    public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() {
        byte[] key = new LongSerializer().serialize("foo", Long.valueOf(1L));
        ConsumerRecord record = new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)key, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty());
        List<ConsumerRecord> records = Collections.singletonList(record);
        this.queueThatSkipsDeserializeErrors.addRawRecords(records);
        Assert.assertEquals((long)1L, (long)this.queueThatSkipsDeserializeErrors.size());
        Assert.assertEquals((Object)new CorruptedRecord(record), (Object)this.queueThatSkipsDeserializeErrors.poll(0L));
    }

    @Test
    public void shouldNotThrowStreamsExceptionWhenValueDeserializationFailsWithSkipHandler() {
        byte[] value = new LongSerializer().serialize("foo", Long.valueOf(1L));
        ConsumerRecord record = new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)value, (Headers)new RecordHeaders(), Optional.empty());
        List<ConsumerRecord> records = Collections.singletonList(record);
        this.queueThatSkipsDeserializeErrors.addRawRecords(records);
        Assert.assertEquals((long)1L, (long)this.queueThatSkipsDeserializeErrors.size());
        Assert.assertEquals((Object)new CorruptedRecord(record), (Object)this.queueThatSkipsDeserializeErrors.poll(0L));
    }

    @Test
    public void shouldThrowOnNegativeTimestamp() {
        List<ConsumerRecord> records = Collections.singletonList(new ConsumerRecord("topic", 1, 1L, -1L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()));
        RecordQueue queue = new RecordQueue(new TopicPartition("topic", 1), this.mockSourceNodeWithMetrics, (TimestampExtractor)new FailOnInvalidTimestamp(), (DeserializationExceptionHandler)new LogAndContinueExceptionHandler(), new InternalMockProcessorContext(), new LogContext());
        StreamsException exception = (StreamsException)Assert.assertThrows(StreamsException.class, () -> queue.addRawRecords((Iterable)records));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)Matchers.equalTo((Object)"Input record ConsumerRecord(topic = topic, partition = 1, leaderEpoch = null, offset = 1, CreateTime = -1, serialized key size = 0, serialized value size = 0, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = 10) has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data."));
    }

    @Test
    public void shouldDropOnNegativeTimestamp() {
        ConsumerRecord record = new ConsumerRecord("topic", 1, 1L, -1L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty());
        List<ConsumerRecord> records = Collections.singletonList(record);
        this.queueThatSkipsInvalidTimestamps.addRawRecords(records);
        Assert.assertEquals((long)1L, (long)this.queueThatSkipsInvalidTimestamps.size());
        Assert.assertEquals((Object)new CorruptedRecord(record), (Object)this.queueThatSkipsInvalidTimestamps.poll(0L));
    }

    @Test
    public void shouldPassPartitionTimeToTimestampExtractor() {
        PartitionTimeTrackingTimestampExtractor timestampExtractor = new PartitionTimeTrackingTimestampExtractor();
        RecordQueue queue = new RecordQueue(new TopicPartition("topic", 1), this.mockSourceNodeWithMetrics, (TimestampExtractor)timestampExtractor, (DeserializationExceptionHandler)new LogAndFailExceptionHandler(), this.context, new LogContext());
        Assert.assertTrue((boolean)queue.isEmpty());
        Assert.assertEquals((long)0L, (long)queue.size());
        Assert.assertEquals((long)-1L, (long)queue.headRecordTimestamp());
        List<ConsumerRecord> list1 = Arrays.asList(new ConsumerRecord("topic", 1, 2L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()), new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()), new ConsumerRecord("topic", 1, 3L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()), new ConsumerRecord("topic", 1, 4L, 0L, TimestampType.CREATE_TIME, 0, 0, (Object)this.recordKey, (Object)this.recordValue, (Headers)new RecordHeaders(), Optional.empty()));
        Assert.assertEquals((long)-1L, (long)timestampExtractor.partitionTime);
        queue.addRawRecords(list1);
        Assert.assertEquals((long)-1L, (long)timestampExtractor.partitionTime);
        queue.poll(0L);
        Assert.assertEquals((long)2L, (long)timestampExtractor.partitionTime);
        queue.poll(0L);
        Assert.assertEquals((long)2L, (long)timestampExtractor.partitionTime);
        queue.poll(0L);
        Assert.assertEquals((long)3L, (long)timestampExtractor.partitionTime);
    }

    private static class PartitionTimeTrackingTimestampExtractor
    implements TimestampExtractor {
        private long partitionTime = -1L;

        private PartitionTimeTrackingTimestampExtractor() {
        }

        public long extract(ConsumerRecord<Object, Object> record, long partitionTime) {
            if (partitionTime < this.partitionTime) {
                throw new IllegalStateException("Partition time should not decrease");
            }
            this.partitionTime = partitionTime;
            return record.offset();
        }
    }
}

