/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.UUID;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.DeserializationExceptionHandler;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.AbstractPartitionGroup;
import org.apache.kafka.streams.processor.internals.PartitionGroup;
import org.apache.kafka.streams.processor.internals.RecordQueue;
import org.apache.kafka.streams.processor.internals.StampedRecord;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class PartitionGroupTest {
    private final long maxTaskIdleMs = -1L;
    private final LogContext logContext = new LogContext("[test] ");
    private final Time time = new MockTime();
    private final Serializer<Integer> intSerializer = new IntegerSerializer();
    private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
    private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
    private final TopicPartition unknownPartition = new TopicPartition("unknown-partition", 0);
    private final String errMessage = "Partition " + this.unknownPartition + " not found.";
    private final String[] topics = new String[]{"topic"};
    private final TopicPartition partition1 = this.createPartition1();
    private final TopicPartition partition2 = this.createPartition2();
    private final RecordQueue queue1 = this.createQueue1();
    private final RecordQueue queue2 = this.createQueue2();
    private final byte[] recordValue = this.intSerializer.serialize(null, (Object)10);
    private final byte[] recordKey = this.intSerializer.serialize(null, (Object)1);
    private final Metrics metrics = new Metrics();
    private final Sensor enforcedProcessingSensor = this.metrics.sensor(UUID.randomUUID().toString());
    private final MetricName lastLatenessValue = new MetricName("record-lateness-last-value", "", "", Utils.mkMap((Map.Entry[])new Map.Entry[0]));

    private static Sensor getValueSensor(Metrics metrics, MetricName metricName) {
        Sensor lastRecordedValue = metrics.sensor(metricName.name());
        lastRecordedValue.add(metricName, (MeasurableStat)new Value());
        return lastRecordedValue;
    }

    @Test
    public void testTimeTracking() {
        PartitionGroup group = this.getBasicGroup();
        this.testFirstBatch(group);
        this.testSecondBatch(group);
    }

    private RecordQueue createQueue1() {
        return new RecordQueue(this.partition1, new MockSourceNode<Integer, Integer>(this.intDeserializer, this.intDeserializer), this.timestampExtractor, (DeserializationExceptionHandler)new LogAndContinueExceptionHandler(), new InternalMockProcessorContext(), this.logContext);
    }

    private RecordQueue createQueue2() {
        return new RecordQueue(this.partition2, new MockSourceNode<Integer, Integer>(this.intDeserializer, this.intDeserializer), this.timestampExtractor, (DeserializationExceptionHandler)new LogAndContinueExceptionHandler(), new InternalMockProcessorContext(), this.logContext);
    }

    private TopicPartition createPartition1() {
        return new TopicPartition(this.topics[0], 1);
    }

    private TopicPartition createPartition2() {
        return new TopicPartition(this.topics[0], 2);
    }

    private void testFirstBatch(PartitionGroup group) {
        AbstractPartitionGroup.RecordInfo info = new AbstractPartitionGroup.RecordInfo();
        MatcherAssert.assertThat((Object)group.numBuffered(), (Matcher)CoreMatchers.is((Object)0));
        List<ConsumerRecord> list1 = Arrays.asList(new ConsumerRecord("topic", 1, 1L, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 3L, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 5L, (Object)this.recordKey, (Object)this.recordValue));
        group.addRawRecords(this.partition1, list1);
        List<ConsumerRecord> list2 = Arrays.asList(new ConsumerRecord("topic", 2, 2L, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 2, 4L, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 2, 6L, (Object)this.recordKey, (Object)this.recordValue));
        group.addRawRecords(this.partition2, list2);
        this.verifyBuffered(6, 3, 3, group);
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition1), (Matcher)CoreMatchers.is((Object)-1L));
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition2), (Matcher)CoreMatchers.is((Object)-1L));
        MatcherAssert.assertThat((Object)group.headRecordOffset(this.partition1), (Matcher)CoreMatchers.is((Object)1L));
        MatcherAssert.assertThat((Object)group.headRecordOffset(this.partition2), (Matcher)CoreMatchers.is((Object)2L));
        MatcherAssert.assertThat((Object)group.streamTime(), (Matcher)CoreMatchers.is((Object)-1L));
        MatcherAssert.assertThat((Object)this.metrics.metric(this.lastLatenessValue).metricValue(), (Matcher)CoreMatchers.is((Object)0.0));
        StampedRecord record = group.nextRecord(info, this.time.milliseconds());
        MatcherAssert.assertThat((Object)info.partition(), (Matcher)Matchers.equalTo((Object)this.partition1));
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition1), (Matcher)CoreMatchers.is((Object)1L));
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition2), (Matcher)CoreMatchers.is((Object)-1L));
        MatcherAssert.assertThat((Object)group.headRecordOffset(this.partition1), (Matcher)CoreMatchers.is((Object)3L));
        MatcherAssert.assertThat((Object)group.headRecordOffset(this.partition2), (Matcher)CoreMatchers.is((Object)2L));
        this.verifyTimes(record, 1L, 1L, group);
        MatcherAssert.assertThat((Object)this.metrics.metric(this.lastLatenessValue).metricValue(), (Matcher)CoreMatchers.is((Object)0.0));
        record = group.nextRecord(info, this.time.milliseconds());
        MatcherAssert.assertThat((Object)info.partition(), (Matcher)Matchers.equalTo((Object)this.partition2));
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition1), (Matcher)CoreMatchers.is((Object)1L));
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition2), (Matcher)CoreMatchers.is((Object)2L));
        MatcherAssert.assertThat((Object)group.headRecordOffset(this.partition1), (Matcher)CoreMatchers.is((Object)3L));
        MatcherAssert.assertThat((Object)group.headRecordOffset(this.partition2), (Matcher)CoreMatchers.is((Object)4L));
        this.verifyTimes(record, 2L, 2L, group);
        this.verifyBuffered(4, 2, 2, group);
        Assert.assertEquals((Object)0.0, (Object)this.metrics.metric(this.lastLatenessValue).metricValue());
    }

    private void testSecondBatch(PartitionGroup group) {
        AbstractPartitionGroup.RecordInfo info = new AbstractPartitionGroup.RecordInfo();
        List<ConsumerRecord> list3 = Arrays.asList(new ConsumerRecord("topic", 1, 2L, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 4L, (Object)this.recordKey, (Object)this.recordValue));
        group.addRawRecords(this.partition1, list3);
        this.verifyBuffered(6, 4, 2, group);
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition1), (Matcher)CoreMatchers.is((Object)1L));
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition2), (Matcher)CoreMatchers.is((Object)2L));
        MatcherAssert.assertThat((Object)group.headRecordOffset(this.partition1), (Matcher)CoreMatchers.is((Object)3L));
        MatcherAssert.assertThat((Object)group.headRecordOffset(this.partition2), (Matcher)CoreMatchers.is((Object)4L));
        MatcherAssert.assertThat((Object)group.streamTime(), (Matcher)CoreMatchers.is((Object)2L));
        MatcherAssert.assertThat((Object)this.metrics.metric(this.lastLatenessValue).metricValue(), (Matcher)CoreMatchers.is((Object)0.0));
        StampedRecord record = group.nextRecord(info, this.time.milliseconds());
        MatcherAssert.assertThat((Object)info.partition(), (Matcher)Matchers.equalTo((Object)this.partition1));
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition1), (Matcher)CoreMatchers.is((Object)3L));
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition2), (Matcher)CoreMatchers.is((Object)2L));
        MatcherAssert.assertThat((Object)group.headRecordOffset(this.partition1), (Matcher)CoreMatchers.is((Object)5L));
        MatcherAssert.assertThat((Object)group.headRecordOffset(this.partition2), (Matcher)CoreMatchers.is((Object)4L));
        this.verifyTimes(record, 3L, 3L, group);
        this.verifyBuffered(5, 3, 2, group);
        MatcherAssert.assertThat((Object)this.metrics.metric(this.lastLatenessValue).metricValue(), (Matcher)CoreMatchers.is((Object)0.0));
        record = group.nextRecord(info, this.time.milliseconds());
        MatcherAssert.assertThat((Object)info.partition(), (Matcher)Matchers.equalTo((Object)this.partition2));
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition1), (Matcher)CoreMatchers.is((Object)3L));
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition2), (Matcher)CoreMatchers.is((Object)4L));
        MatcherAssert.assertThat((Object)group.headRecordOffset(this.partition1), (Matcher)CoreMatchers.is((Object)5L));
        MatcherAssert.assertThat((Object)group.headRecordOffset(this.partition2), (Matcher)CoreMatchers.is((Object)6L));
        this.verifyTimes(record, 4L, 4L, group);
        this.verifyBuffered(4, 3, 1, group);
        MatcherAssert.assertThat((Object)this.metrics.metric(this.lastLatenessValue).metricValue(), (Matcher)CoreMatchers.is((Object)0.0));
        record = group.nextRecord(info, this.time.milliseconds());
        MatcherAssert.assertThat((Object)info.partition(), (Matcher)Matchers.equalTo((Object)this.partition1));
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition1), (Matcher)CoreMatchers.is((Object)5L));
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition2), (Matcher)CoreMatchers.is((Object)4L));
        MatcherAssert.assertThat((Object)group.headRecordOffset(this.partition1), (Matcher)CoreMatchers.is((Object)2L));
        MatcherAssert.assertThat((Object)group.headRecordOffset(this.partition2), (Matcher)CoreMatchers.is((Object)6L));
        this.verifyTimes(record, 5L, 5L, group);
        this.verifyBuffered(3, 2, 1, group);
        MatcherAssert.assertThat((Object)this.metrics.metric(this.lastLatenessValue).metricValue(), (Matcher)CoreMatchers.is((Object)0.0));
        record = group.nextRecord(info, this.time.milliseconds());
        MatcherAssert.assertThat((Object)info.partition(), (Matcher)Matchers.equalTo((Object)this.partition1));
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition1), (Matcher)CoreMatchers.is((Object)5L));
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition2), (Matcher)CoreMatchers.is((Object)4L));
        MatcherAssert.assertThat((Object)group.headRecordOffset(this.partition1), (Matcher)CoreMatchers.is((Object)4L));
        MatcherAssert.assertThat((Object)group.headRecordOffset(this.partition2), (Matcher)CoreMatchers.is((Object)6L));
        this.verifyTimes(record, 2L, 5L, group);
        this.verifyBuffered(2, 1, 1, group);
        MatcherAssert.assertThat((Object)this.metrics.metric(this.lastLatenessValue).metricValue(), (Matcher)CoreMatchers.is((Object)3.0));
        record = group.nextRecord(info, this.time.milliseconds());
        MatcherAssert.assertThat((Object)info.partition(), (Matcher)Matchers.equalTo((Object)this.partition1));
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition1), (Matcher)CoreMatchers.is((Object)5L));
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition2), (Matcher)CoreMatchers.is((Object)4L));
        Assert.assertNull((Object)group.headRecordOffset(this.partition1));
        MatcherAssert.assertThat((Object)group.headRecordOffset(this.partition2), (Matcher)CoreMatchers.is((Object)6L));
        this.verifyTimes(record, 4L, 5L, group);
        this.verifyBuffered(1, 0, 1, group);
        MatcherAssert.assertThat((Object)this.metrics.metric(this.lastLatenessValue).metricValue(), (Matcher)CoreMatchers.is((Object)1.0));
        record = group.nextRecord(info, this.time.milliseconds());
        MatcherAssert.assertThat((Object)info.partition(), (Matcher)Matchers.equalTo((Object)this.partition2));
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition1), (Matcher)CoreMatchers.is((Object)5L));
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition2), (Matcher)CoreMatchers.is((Object)6L));
        Assert.assertNull((Object)group.headRecordOffset(this.partition1));
        Assert.assertNull((Object)group.headRecordOffset(this.partition2));
        this.verifyTimes(record, 6L, 6L, group);
        this.verifyBuffered(0, 0, 0, group);
        MatcherAssert.assertThat((Object)this.metrics.metric(this.lastLatenessValue).metricValue(), (Matcher)CoreMatchers.is((Object)0.0));
    }

    @Test
    public void shouldChooseNextRecordBasedOnHeadTimestamp() {
        PartitionGroup group = this.getBasicGroup();
        Assert.assertEquals((long)0L, (long)group.numBuffered());
        List<ConsumerRecord> list1 = Arrays.asList(new ConsumerRecord("topic", 1, 1L, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 5L, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 3L, (Object)this.recordKey, (Object)this.recordValue));
        group.addRawRecords(this.partition1, list1);
        this.verifyBuffered(3, 3, 0, group);
        Assert.assertEquals((long)-1L, (long)group.streamTime());
        Assert.assertEquals((Object)0.0, (Object)this.metrics.metric(this.lastLatenessValue).metricValue());
        AbstractPartitionGroup.RecordInfo info = new AbstractPartitionGroup.RecordInfo();
        StampedRecord record = group.nextRecord(info, this.time.milliseconds());
        Assert.assertEquals((long)record.timestamp, (long)1L);
        record = group.nextRecord(info, this.time.milliseconds());
        Assert.assertEquals((long)record.timestamp, (long)5L);
        List<ConsumerRecord> list2 = Arrays.asList(new ConsumerRecord("topic", 2, 2L, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 2, 4L, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 2, 6L, (Object)this.recordKey, (Object)this.recordValue));
        group.addRawRecords(this.partition2, list2);
        record = group.nextRecord(info, this.time.milliseconds());
        Assert.assertEquals((long)record.timestamp, (long)2L);
        record = group.nextRecord(info, this.time.milliseconds());
        Assert.assertEquals((long)record.timestamp, (long)3L);
    }

    private void verifyTimes(StampedRecord record, long recordTime, long streamTime, PartitionGroup group) {
        MatcherAssert.assertThat((Object)record.timestamp, (Matcher)CoreMatchers.is((Object)recordTime));
        MatcherAssert.assertThat((Object)group.streamTime(), (Matcher)CoreMatchers.is((Object)streamTime));
    }

    private void verifyBuffered(int totalBuffered, int partitionOneBuffered, int partitionTwoBuffered, PartitionGroup group) {
        Assert.assertEquals((long)totalBuffered, (long)group.numBuffered());
        Assert.assertEquals((long)partitionOneBuffered, (long)group.numBuffered(this.partition1));
        Assert.assertEquals((long)partitionTwoBuffered, (long)group.numBuffered(this.partition2));
    }

    @Test
    public void shouldSetPartitionTimestampAndStreamTime() {
        PartitionGroup group = this.getBasicGroup();
        group.setPartitionTime(this.partition1, 100L);
        Assert.assertEquals((long)100L, (long)group.partitionTimestamp(this.partition1));
        Assert.assertEquals((long)100L, (long)group.streamTime());
        group.setPartitionTime(this.partition2, 50L);
        Assert.assertEquals((long)50L, (long)group.partitionTimestamp(this.partition2));
        Assert.assertEquals((long)100L, (long)group.streamTime());
    }

    @Test
    public void shouldThrowIllegalStateExceptionUponAddRecordsIfPartitionUnknown() {
        PartitionGroup group = this.getBasicGroup();
        IllegalStateException exception = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> group.addRawRecords(this.unknownPartition, null));
        MatcherAssert.assertThat((Object)this.errMessage, (Matcher)Matchers.equalTo((Object)exception.getMessage()));
    }

    @Test
    public void shouldThrowIllegalStateExceptionUponNumBufferedIfPartitionUnknown() {
        PartitionGroup group = this.getBasicGroup();
        IllegalStateException exception = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> group.numBuffered(this.unknownPartition));
        MatcherAssert.assertThat((Object)this.errMessage, (Matcher)Matchers.equalTo((Object)exception.getMessage()));
    }

    @Test
    public void shouldThrowIllegalStateExceptionUponSetPartitionTimestampIfPartitionUnknown() {
        PartitionGroup group = this.getBasicGroup();
        IllegalStateException exception = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> group.setPartitionTime(this.unknownPartition, 0L));
        MatcherAssert.assertThat((Object)this.errMessage, (Matcher)Matchers.equalTo((Object)exception.getMessage()));
    }

    @Test
    public void shouldThrowIllegalStateExceptionUponGetPartitionTimestampIfPartitionUnknown() {
        PartitionGroup group = this.getBasicGroup();
        IllegalStateException exception = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> group.partitionTimestamp(this.unknownPartition));
        MatcherAssert.assertThat((Object)this.errMessage, (Matcher)Matchers.equalTo((Object)exception.getMessage()));
    }

    @Test
    public void shouldThrowIllegalStateExceptionUponGetHeadRecordOffsetIfPartitionUnknown() {
        PartitionGroup group = this.getBasicGroup();
        IllegalStateException exception = (IllegalStateException)Assert.assertThrows(IllegalStateException.class, () -> group.headRecordOffset(this.unknownPartition));
        MatcherAssert.assertThat((Object)this.errMessage, (Matcher)Matchers.equalTo((Object)exception.getMessage()));
    }

    @Test
    public void shouldEmptyPartitionsOnClear() {
        PartitionGroup group = new PartitionGroup(this.logContext, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)this.queue1)}), tp -> OptionalLong.of(0L), PartitionGroupTest.getValueSensor(this.metrics, this.lastLatenessValue), this.enforcedProcessingSensor, 10L);
        List<ConsumerRecord> list = Arrays.asList(new ConsumerRecord("topic", 1, 1L, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 3L, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 5L, (Object)this.recordKey, (Object)this.recordValue));
        group.addRawRecords(this.partition1, list);
        group.nextRecord(new AbstractPartitionGroup.RecordInfo(), this.time.milliseconds());
        group.nextRecord(new AbstractPartitionGroup.RecordInfo(), this.time.milliseconds());
        group.updateLags();
        group.clear();
        MatcherAssert.assertThat((Object)group.numBuffered(), (Matcher)Matchers.equalTo((Object)0));
        MatcherAssert.assertThat((Object)group.streamTime(), (Matcher)Matchers.equalTo((Object)-1L));
        MatcherAssert.assertThat((Object)group.nextRecord(new AbstractPartitionGroup.RecordInfo(), this.time.milliseconds()), (Matcher)Matchers.equalTo(null));
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition1), (Matcher)Matchers.equalTo((Object)-1L));
        this.hasNoFetchedLag(group, this.partition1);
        group.addRawRecords(this.partition1, list);
    }

    @Test
    public void shouldUpdatePartitionQueuesShrink() {
        PartitionGroup group = this.getBasicGroup();
        List<ConsumerRecord> list1 = Arrays.asList(new ConsumerRecord("topic", 1, 1L, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 5L, (Object)this.recordKey, (Object)this.recordValue));
        group.addRawRecords(this.partition1, list1);
        List<ConsumerRecord> list2 = Arrays.asList(new ConsumerRecord("topic", 2, 2L, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 2, 4L, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 2, 6L, (Object)this.recordKey, (Object)this.recordValue));
        group.addRawRecords(this.partition2, list2);
        Assert.assertEquals((long)(list1.size() + list2.size()), (long)group.numBuffered());
        Assert.assertTrue((boolean)group.allPartitionsBufferedLocally());
        group.nextRecord(new AbstractPartitionGroup.RecordInfo(), this.time.milliseconds());
        group.updatePartitions(Utils.mkSet((Object[])new TopicPartition[]{this.createPartition2()}), p -> {
            Assert.fail((String)"should not create any queues");
            return null;
        });
        Assert.assertTrue((boolean)group.allPartitionsBufferedLocally());
        Assert.assertEquals((long)list2.size(), (long)group.numBuffered());
        Assert.assertEquals((long)1L, (long)group.streamTime());
        Assert.assertThrows(IllegalStateException.class, () -> group.partitionTimestamp(this.partition1));
        MatcherAssert.assertThat((Object)group.nextRecord(new AbstractPartitionGroup.RecordInfo(), this.time.milliseconds()), (Matcher)Matchers.notNullValue());
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition2), (Matcher)Matchers.equalTo((Object)2L));
    }

    @Test
    public void shouldUpdatePartitionQueuesExpand() {
        PartitionGroup group = new PartitionGroup(this.logContext, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)this.queue1)}), tp -> OptionalLong.of(0L), PartitionGroupTest.getValueSensor(this.metrics, this.lastLatenessValue), this.enforcedProcessingSensor, -1L);
        List<ConsumerRecord> list1 = Arrays.asList(new ConsumerRecord("topic", 1, 1L, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 5L, (Object)this.recordKey, (Object)this.recordValue));
        group.addRawRecords(this.partition1, list1);
        Assert.assertEquals((long)list1.size(), (long)group.numBuffered());
        Assert.assertTrue((boolean)group.allPartitionsBufferedLocally());
        group.nextRecord(new AbstractPartitionGroup.RecordInfo(), this.time.milliseconds());
        group.updatePartitions(Utils.mkSet((Object[])new TopicPartition[]{this.createPartition1(), this.createPartition2()}), p -> {
            Assert.assertEquals((Object)this.createPartition2(), (Object)p);
            return this.createQueue2();
        });
        Assert.assertFalse((boolean)group.allPartitionsBufferedLocally());
        Assert.assertEquals((long)1L, (long)group.numBuffered());
        Assert.assertEquals((long)1L, (long)group.streamTime());
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition1), (Matcher)Matchers.equalTo((Object)1L));
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition2), (Matcher)Matchers.equalTo((Object)-1L));
        MatcherAssert.assertThat((Object)group.nextRecord(new AbstractPartitionGroup.RecordInfo(), this.time.milliseconds()), (Matcher)Matchers.notNullValue());
    }

    @Test
    public void shouldUpdatePartitionQueuesShrinkAndExpand() {
        PartitionGroup group = new PartitionGroup(this.logContext, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)this.queue1)}), tp -> OptionalLong.of(0L), PartitionGroupTest.getValueSensor(this.metrics, this.lastLatenessValue), this.enforcedProcessingSensor, -1L);
        List<ConsumerRecord> list1 = Arrays.asList(new ConsumerRecord("topic", 1, 1L, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 5L, (Object)this.recordKey, (Object)this.recordValue));
        group.addRawRecords(this.partition1, list1);
        Assert.assertEquals((long)list1.size(), (long)group.numBuffered());
        Assert.assertTrue((boolean)group.allPartitionsBufferedLocally());
        group.nextRecord(new AbstractPartitionGroup.RecordInfo(), this.time.milliseconds());
        group.updatePartitions(Utils.mkSet((Object[])new TopicPartition[]{this.createPartition2()}), p -> {
            Assert.assertEquals((Object)this.createPartition2(), (Object)p);
            return this.createQueue2();
        });
        Assert.assertFalse((boolean)group.allPartitionsBufferedLocally());
        Assert.assertEquals((long)0L, (long)group.numBuffered());
        Assert.assertEquals((long)1L, (long)group.streamTime());
        Assert.assertThrows(IllegalStateException.class, () -> group.partitionTimestamp(this.partition1));
        MatcherAssert.assertThat((Object)group.partitionTimestamp(this.partition2), (Matcher)Matchers.equalTo((Object)-1L));
        MatcherAssert.assertThat((Object)group.nextRecord(new AbstractPartitionGroup.RecordInfo(), this.time.milliseconds()), (Matcher)Matchers.nullValue());
    }

    @Test
    public void shouldNeverWaitIfIdlingIsDisabled() {
        PartitionGroup group = new PartitionGroup(this.logContext, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)this.queue1), Utils.mkEntry((Object)this.partition2, (Object)this.queue2)}), tp -> OptionalLong.of(0L), PartitionGroupTest.getValueSensor(this.metrics, this.lastLatenessValue), this.enforcedProcessingSensor, -1L);
        List<ConsumerRecord> list1 = Arrays.asList(new ConsumerRecord("topic", 1, 1L, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 5L, (Object)this.recordKey, (Object)this.recordValue));
        group.addRawRecords(this.partition1, list1);
        MatcherAssert.assertThat((Object)group.allPartitionsBufferedLocally(), (Matcher)CoreMatchers.is((Object)false));
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class);){
            appender.setClassLoggerToTrace(PartitionGroup.class);
            MatcherAssert.assertThat((Object)group.readyToProcess(0L), (Matcher)CoreMatchers.is((Object)true));
            MatcherAssert.assertThat((Object)appender.getEvents(), (Matcher)Matchers.hasItem((Matcher)Matchers.allOf((Matcher)Matchers.hasProperty((String)"level", (Matcher)Matchers.equalTo((Object)"TRACE")), (Matcher)Matchers.hasProperty((String)"message", (Matcher)Matchers.equalTo((Object)"[test] Ready for processing because max.task.idle.ms is disabled.\n\tThere may be out-of-order processing for this task as a result.\n\tBuffered partitions: [topic-1]\n\tNon-buffered partitions: [topic-2]")))));
        }
    }

    @Test
    public void shouldBeReadyIfAllPartitionsAreBuffered() {
        PartitionGroup group = new PartitionGroup(this.logContext, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)this.queue1), Utils.mkEntry((Object)this.partition2, (Object)this.queue2)}), tp -> OptionalLong.of(0L), PartitionGroupTest.getValueSensor(this.metrics, this.lastLatenessValue), this.enforcedProcessingSensor, 0L);
        List<ConsumerRecord> list1 = Arrays.asList(new ConsumerRecord("topic", 1, 1L, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 5L, (Object)this.recordKey, (Object)this.recordValue));
        group.addRawRecords(this.partition1, list1);
        List<ConsumerRecord> list2 = Arrays.asList(new ConsumerRecord("topic", 2, 1L, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 2, 5L, (Object)this.recordKey, (Object)this.recordValue));
        group.addRawRecords(this.partition2, list2);
        MatcherAssert.assertThat((Object)group.allPartitionsBufferedLocally(), (Matcher)CoreMatchers.is((Object)true));
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class);){
            appender.setClassLoggerToTrace(PartitionGroup.class);
            MatcherAssert.assertThat((Object)group.readyToProcess(0L), (Matcher)CoreMatchers.is((Object)true));
            MatcherAssert.assertThat((Object)appender.getEvents(), (Matcher)Matchers.hasItem((Matcher)Matchers.allOf((Matcher)Matchers.hasProperty((String)"level", (Matcher)Matchers.equalTo((Object)"TRACE")), (Matcher)Matchers.hasProperty((String)"message", (Matcher)Matchers.equalTo((Object)"[test] All partitions were buffered locally, so this task is ready for processing.")))));
        }
    }

    @Test
    public void shouldWaitForFetchesWhenMetadataIsIncomplete() {
        HashMap<TopicPartition, OptionalLong> lags = new HashMap<TopicPartition, OptionalLong>();
        PartitionGroup group = new PartitionGroup(this.logContext, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)this.queue1), Utils.mkEntry((Object)this.partition2, (Object)this.queue2)}), tp -> lags.getOrDefault(tp, OptionalLong.empty()), PartitionGroupTest.getValueSensor(this.metrics, this.lastLatenessValue), this.enforcedProcessingSensor, 0L);
        List<ConsumerRecord> list1 = Arrays.asList(new ConsumerRecord("topic", 1, 1L, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 5L, (Object)this.recordKey, (Object)this.recordValue));
        group.addRawRecords(this.partition1, list1);
        MatcherAssert.assertThat((Object)group.allPartitionsBufferedLocally(), (Matcher)CoreMatchers.is((Object)false));
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class);){
            appender.setClassLoggerToTrace(PartitionGroup.class);
            MatcherAssert.assertThat((Object)group.readyToProcess(0L), (Matcher)CoreMatchers.is((Object)false));
            MatcherAssert.assertThat((Object)appender.getEvents(), (Matcher)Matchers.hasItem((Matcher)Matchers.allOf((Matcher)Matchers.hasProperty((String)"level", (Matcher)Matchers.equalTo((Object)"TRACE")), (Matcher)Matchers.hasProperty((String)"message", (Matcher)Matchers.equalTo((Object)"[test] Waiting to fetch data for topic-2")))));
        }
        lags.put(this.partition2, OptionalLong.of(0L));
        group.updateLags();
        MatcherAssert.assertThat((Object)group.readyToProcess(0L), (Matcher)CoreMatchers.is((Object)true));
    }

    @Test
    public void shouldWaitForPollWhenLagIsNonzero() {
        HashMap<TopicPartition, OptionalLong> lags = new HashMap<TopicPartition, OptionalLong>();
        PartitionGroup group = new PartitionGroup(this.logContext, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)this.queue1), Utils.mkEntry((Object)this.partition2, (Object)this.queue2)}), tp -> lags.getOrDefault(tp, OptionalLong.empty()), PartitionGroupTest.getValueSensor(this.metrics, this.lastLatenessValue), this.enforcedProcessingSensor, 0L);
        List<ConsumerRecord> list1 = Arrays.asList(new ConsumerRecord("topic", 1, 1L, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 5L, (Object)this.recordKey, (Object)this.recordValue));
        group.addRawRecords(this.partition1, list1);
        lags.put(this.partition2, OptionalLong.of(1L));
        group.updateLags();
        MatcherAssert.assertThat((Object)group.allPartitionsBufferedLocally(), (Matcher)CoreMatchers.is((Object)false));
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class);){
            appender.setClassLoggerToTrace(PartitionGroup.class);
            MatcherAssert.assertThat((Object)group.readyToProcess(0L), (Matcher)CoreMatchers.is((Object)false));
            MatcherAssert.assertThat((Object)appender.getEvents(), (Matcher)Matchers.hasItem((Matcher)Matchers.allOf((Matcher)Matchers.hasProperty((String)"level", (Matcher)Matchers.equalTo((Object)"TRACE")), (Matcher)Matchers.hasProperty((String)"message", (Matcher)Matchers.equalTo((Object)"[test] Lag for topic-2 is currently 1, but no data is buffered locally. Waiting to buffer some records.")))));
        }
    }

    @Test
    public void shouldIdleAsSpecifiedWhenLagIsZero() {
        PartitionGroup group = new PartitionGroup(this.logContext, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)this.queue1), Utils.mkEntry((Object)this.partition2, (Object)this.queue2)}), tp -> OptionalLong.of(0L), PartitionGroupTest.getValueSensor(this.metrics, this.lastLatenessValue), this.enforcedProcessingSensor, 1L);
        group.updateLags();
        List<ConsumerRecord> list1 = Arrays.asList(new ConsumerRecord("topic", 1, 1L, (Object)this.recordKey, (Object)this.recordValue), new ConsumerRecord("topic", 1, 5L, (Object)this.recordKey, (Object)this.recordValue));
        group.addRawRecords(this.partition1, list1);
        MatcherAssert.assertThat((Object)group.allPartitionsBufferedLocally(), (Matcher)CoreMatchers.is((Object)false));
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class);){
            appender.setClassLoggerToTrace(PartitionGroup.class);
            MatcherAssert.assertThat((Object)group.readyToProcess(0L), (Matcher)CoreMatchers.is((Object)false));
            MatcherAssert.assertThat((Object)appender.getEvents(), (Matcher)Matchers.hasItem((Matcher)Matchers.allOf((Matcher)Matchers.hasProperty((String)"level", (Matcher)Matchers.equalTo((Object)"TRACE")), (Matcher)Matchers.hasProperty((String)"message", (Matcher)Matchers.equalTo((Object)"[test] Lag for topic-2 is currently 0 and current time is 0. Waiting for new data to be produced for configured idle time 1 (deadline is 1).")))));
        }
        appender = LogCaptureAppender.createAndRegister(PartitionGroup.class);
        var4_4 = null;
        try {
            appender.setClassLoggerToTrace(PartitionGroup.class);
            MatcherAssert.assertThat((Object)group.readyToProcess(1L), (Matcher)CoreMatchers.is((Object)true));
            MatcherAssert.assertThat((Object)appender.getEvents(), (Matcher)Matchers.hasItem((Matcher)Matchers.allOf((Matcher)Matchers.hasProperty((String)"level", (Matcher)Matchers.equalTo((Object)"TRACE")), (Matcher)Matchers.hasProperty((String)"message", (Matcher)Matchers.equalTo((Object)"[test] Continuing to process although some partitions are empty on the broker.\n\tThere may be out-of-order processing for this task as a result.\n\tPartitions with local data: [topic-1].\n\tPartitions we gave up waiting for, with their corresponding deadlines: {topic-2=1}.\n\tConfigured max.task.idle.ms: 1.\n\tCurrent wall-clock time: 1.")))));
        }
        catch (Throwable throwable) {
            var4_4 = throwable;
            throw throwable;
        }
        finally {
            if (appender != null) {
                if (var4_4 != null) {
                    try {
                        appender.close();
                    }
                    catch (Throwable throwable) {
                        var4_4.addSuppressed(throwable);
                    }
                } else {
                    appender.close();
                }
            }
        }
        appender = LogCaptureAppender.createAndRegister(PartitionGroup.class);
        var4_4 = null;
        try {
            appender.setClassLoggerToTrace(PartitionGroup.class);
            MatcherAssert.assertThat((Object)group.readyToProcess(2L), (Matcher)CoreMatchers.is((Object)true));
            MatcherAssert.assertThat((Object)appender.getEvents(), (Matcher)Matchers.hasItem((Matcher)Matchers.allOf((Matcher)Matchers.hasProperty((String)"level", (Matcher)Matchers.equalTo((Object)"TRACE")), (Matcher)Matchers.hasProperty((String)"message", (Matcher)Matchers.equalTo((Object)"[test] Continuing to process although some partitions are empty on the broker.\n\tThere may be out-of-order processing for this task as a result.\n\tPartitions with local data: [topic-1].\n\tPartitions we gave up waiting for, with their corresponding deadlines: {topic-2=1}.\n\tConfigured max.task.idle.ms: 1.\n\tCurrent wall-clock time: 2.")))));
        }
        catch (Throwable throwable) {
            var4_4 = throwable;
            throw throwable;
        }
        finally {
            if (appender != null) {
                if (var4_4 != null) {
                    try {
                        appender.close();
                    }
                    catch (Throwable throwable) {
                        var4_4.addSuppressed(throwable);
                    }
                } else {
                    appender.close();
                }
            }
        }
    }

    private void hasNoFetchedLag(PartitionGroup group, TopicPartition partition) {
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class);){
            appender.setClassLoggerToTrace(PartitionGroup.class);
            Assert.assertFalse((boolean)group.readyToProcess(0L));
            MatcherAssert.assertThat((Object)appender.getEvents(), (Matcher)Matchers.hasItem((Matcher)Matchers.hasProperty((String)"message", (Matcher)Matchers.equalTo((Object)String.format("[test] Waiting to fetch data for %s", partition)))));
        }
    }

    private void hasZeroFetchedLag(PartitionGroup group, TopicPartition partition) {
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class);){
            appender.setClassLoggerToTrace(PartitionGroup.class);
            Assert.assertFalse((boolean)group.readyToProcess(0L));
            MatcherAssert.assertThat((Object)appender.getEvents(), (Matcher)Matchers.hasItem((Matcher)Matchers.hasProperty((String)"message", (Matcher)Matchers.startsWith((String)String.format("[test] Lag for %s is currently 0 and current time is %d. Waiting for new data to be produced for configured idle time", partition, 0L)))));
        }
    }

    private void hasNonZeroFetchedLag(PartitionGroup group, TopicPartition partition, long lag) {
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(PartitionGroup.class);){
            appender.setClassLoggerToTrace(PartitionGroup.class);
            Assert.assertFalse((boolean)group.readyToProcess(0L));
            MatcherAssert.assertThat((Object)appender.getEvents(), (Matcher)Matchers.hasItem((Matcher)Matchers.hasProperty((String)"message", (Matcher)Matchers.equalTo((Object)String.format("[test] Lag for %s is currently %d, but no data is buffered locally. Waiting to buffer some records.", partition, lag)))));
        }
    }

    @Test
    public void shouldUpdateLags() {
        HashMap<TopicPartition, OptionalLong> lags = new HashMap<TopicPartition, OptionalLong>();
        PartitionGroup group = new PartitionGroup(this.logContext, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)this.queue1)}), tp -> lags.getOrDefault(tp, OptionalLong.empty()), PartitionGroupTest.getValueSensor(this.metrics, this.lastLatenessValue), this.enforcedProcessingSensor, 10L);
        this.hasNoFetchedLag(group, this.partition1);
        lags.put(this.partition1, OptionalLong.of(5L));
        this.hasNoFetchedLag(group, this.partition1);
        group.updateLags();
        this.hasNonZeroFetchedLag(group, this.partition1, 5L);
        lags.put(this.partition1, OptionalLong.of(0L));
        group.updateLags();
        this.hasZeroFetchedLag(group, this.partition1);
        lags.remove(this.partition1);
        this.hasZeroFetchedLag(group, this.partition1);
        group.updateLags();
        this.hasNoFetchedLag(group, this.partition1);
    }

    private PartitionGroup getBasicGroup() {
        return new PartitionGroup(this.logContext, Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.partition1, (Object)this.queue1), Utils.mkEntry((Object)this.partition2, (Object)this.queue2)}), tp -> OptionalLong.of(0L), PartitionGroupTest.getValueSensor(this.metrics, this.lastLatenessValue), this.enforcedProcessingSensor, -1L);
    }
}

