/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.BufferValue;
import org.apache.kafka.streams.state.internals.ContextualRecord;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
import org.apache.kafka.streams.state.internals.Maybe;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
import org.apache.kafka.test.MockInternalProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<String, String>> {
    private static final RecordHeaders V_2_CHANGELOG_HEADERS = new RecordHeaders(new Header[]{new RecordHeader("v", new byte[]{2})});
    private static final String APP_ID = "test-app";
    private final Function<String, B> bufferSupplier;
    private final String testName;

    @Parameterized.Parameters(name="{index}: test={0}")
    public static Collection<Object[]> parameters() {
        return Collections.singletonList(new Object[]{"in-memory buffer", name -> new InMemoryTimeOrderedKeyValueBuffer.Builder(name, Serdes.String(), Serdes.serdeFrom((Serializer)new NullRejectingStringSerializer(), (Deserializer)new StringDeserializer())).build()});
    }

    public TimeOrderedKeyValueBufferTest(String testName, Function<String, B> bufferSupplier) {
        this.testName = testName + "_" + new Random().nextInt(Integer.MAX_VALUE);
        this.bufferSupplier = bufferSupplier;
    }

    private static MockInternalProcessorContext makeContext() {
        Properties properties = new Properties();
        properties.setProperty("application.id", APP_ID);
        properties.setProperty("bootstrap.servers", "");
        TaskId taskId = new TaskId(0, 0);
        MockInternalProcessorContext context = new MockInternalProcessorContext(properties, taskId, TestUtils.tempDirectory());
        context.setRecordCollector(new MockRecordCollector());
        return context;
    }

    private static void cleanup(MockInternalProcessorContext context, TimeOrderedKeyValueBuffer<String, String> buffer) {
        try {
            buffer.close();
            Utils.delete((File)context.stateDir());
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void shouldInit() {
        TimeOrderedKeyValueBuffer buffer = (TimeOrderedKeyValueBuffer)this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext context = TimeOrderedKeyValueBufferTest.makeContext();
        buffer.init((ProcessorContext)context, (StateStore)buffer);
        TimeOrderedKeyValueBufferTest.cleanup(context, (TimeOrderedKeyValueBuffer<String, String>)buffer);
    }

    @Test
    public void shouldAcceptData() {
        TimeOrderedKeyValueBuffer buffer = (TimeOrderedKeyValueBuffer)this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext context = TimeOrderedKeyValueBufferTest.makeContext();
        buffer.init((ProcessorContext)context, (StateStore)buffer);
        TimeOrderedKeyValueBufferTest.putRecord((TimeOrderedKeyValueBuffer<String, String>)buffer, context, 0L, 0L, "asdf", "2p93nf");
        TimeOrderedKeyValueBufferTest.cleanup(context, (TimeOrderedKeyValueBuffer<String, String>)buffer);
    }

    @Test
    public void shouldRejectNullValues() {
        TimeOrderedKeyValueBuffer buffer = (TimeOrderedKeyValueBuffer)this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext context = TimeOrderedKeyValueBufferTest.makeContext();
        buffer.init((ProcessorContext)context, (StateStore)buffer);
        try {
            buffer.put(0L, (Object)"asdf", null, TimeOrderedKeyValueBufferTest.getContext(0L));
            Assert.fail((String)"expected an exception");
        }
        catch (NullPointerException nullPointerException) {
            // empty catch block
        }
        TimeOrderedKeyValueBufferTest.cleanup(context, (TimeOrderedKeyValueBuffer<String, String>)buffer);
    }

    @Test
    public void shouldRemoveData() {
        TimeOrderedKeyValueBuffer buffer = (TimeOrderedKeyValueBuffer)this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext context = TimeOrderedKeyValueBufferTest.makeContext();
        buffer.init((ProcessorContext)context, (StateStore)buffer);
        TimeOrderedKeyValueBufferTest.putRecord((TimeOrderedKeyValueBuffer<String, String>)buffer, context, 0L, 0L, "asdf", "qwer");
        MatcherAssert.assertThat((Object)buffer.numRecords(), (Matcher)Matchers.is((Object)1));
        buffer.evictWhile(() -> true, kv -> {});
        MatcherAssert.assertThat((Object)buffer.numRecords(), (Matcher)Matchers.is((Object)0));
        TimeOrderedKeyValueBufferTest.cleanup(context, (TimeOrderedKeyValueBuffer<String, String>)buffer);
    }

    @Test
    public void shouldRespectEvictionPredicate() {
        TimeOrderedKeyValueBuffer buffer = (TimeOrderedKeyValueBuffer)this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext context = TimeOrderedKeyValueBufferTest.makeContext();
        buffer.init((ProcessorContext)context, (StateStore)buffer);
        TimeOrderedKeyValueBufferTest.putRecord((TimeOrderedKeyValueBuffer<String, String>)buffer, context, 0L, 0L, "asdf", "eyt");
        TimeOrderedKeyValueBufferTest.putRecord((TimeOrderedKeyValueBuffer<String, String>)buffer, context, 1L, 0L, "zxcv", "rtg");
        MatcherAssert.assertThat((Object)buffer.numRecords(), (Matcher)Matchers.is((Object)2));
        LinkedList evicted = new LinkedList();
        buffer.evictWhile(() -> buffer.numRecords() > 1, evicted::add);
        MatcherAssert.assertThat((Object)buffer.numRecords(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat(evicted, (Matcher)Matchers.is(Collections.singletonList(new TimeOrderedKeyValueBuffer.Eviction((Object)"asdf", new Change((Object)"eyt", null), TimeOrderedKeyValueBufferTest.getContext(0L)))));
        TimeOrderedKeyValueBufferTest.cleanup(context, (TimeOrderedKeyValueBuffer<String, String>)buffer);
    }

    @Test
    public void shouldTrackCount() {
        TimeOrderedKeyValueBuffer buffer = (TimeOrderedKeyValueBuffer)this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext context = TimeOrderedKeyValueBufferTest.makeContext();
        buffer.init((ProcessorContext)context, (StateStore)buffer);
        TimeOrderedKeyValueBufferTest.putRecord((TimeOrderedKeyValueBuffer<String, String>)buffer, context, 0L, 0L, "asdf", "oin");
        MatcherAssert.assertThat((Object)buffer.numRecords(), (Matcher)Matchers.is((Object)1));
        TimeOrderedKeyValueBufferTest.putRecord((TimeOrderedKeyValueBuffer<String, String>)buffer, context, 1L, 0L, "asdf", "wekjn");
        MatcherAssert.assertThat((Object)buffer.numRecords(), (Matcher)Matchers.is((Object)1));
        TimeOrderedKeyValueBufferTest.putRecord((TimeOrderedKeyValueBuffer<String, String>)buffer, context, 0L, 0L, "zxcv", "24inf");
        MatcherAssert.assertThat((Object)buffer.numRecords(), (Matcher)Matchers.is((Object)2));
        TimeOrderedKeyValueBufferTest.cleanup(context, (TimeOrderedKeyValueBuffer<String, String>)buffer);
    }

    @Test
    public void shouldTrackSize() {
        TimeOrderedKeyValueBuffer buffer = (TimeOrderedKeyValueBuffer)this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext context = TimeOrderedKeyValueBufferTest.makeContext();
        buffer.init((ProcessorContext)context, (StateStore)buffer);
        TimeOrderedKeyValueBufferTest.putRecord((TimeOrderedKeyValueBuffer<String, String>)buffer, context, 0L, 0L, "asdf", "23roni");
        MatcherAssert.assertThat((Object)buffer.bufferSize(), (Matcher)Matchers.is((Object)43L));
        TimeOrderedKeyValueBufferTest.putRecord((TimeOrderedKeyValueBuffer<String, String>)buffer, context, 1L, 0L, "asdf", "3l");
        MatcherAssert.assertThat((Object)buffer.bufferSize(), (Matcher)Matchers.is((Object)39L));
        TimeOrderedKeyValueBufferTest.putRecord((TimeOrderedKeyValueBuffer<String, String>)buffer, context, 0L, 0L, "zxcv", "qfowin");
        MatcherAssert.assertThat((Object)buffer.bufferSize(), (Matcher)Matchers.is((Object)82L));
        TimeOrderedKeyValueBufferTest.cleanup(context, (TimeOrderedKeyValueBuffer<String, String>)buffer);
    }

    @Test
    public void shouldTrackMinTimestamp() {
        TimeOrderedKeyValueBuffer buffer = (TimeOrderedKeyValueBuffer)this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext context = TimeOrderedKeyValueBufferTest.makeContext();
        buffer.init((ProcessorContext)context, (StateStore)buffer);
        TimeOrderedKeyValueBufferTest.putRecord((TimeOrderedKeyValueBuffer<String, String>)buffer, context, 1L, 0L, "asdf", "2093j");
        MatcherAssert.assertThat((Object)buffer.minTimestamp(), (Matcher)Matchers.is((Object)1L));
        TimeOrderedKeyValueBufferTest.putRecord((TimeOrderedKeyValueBuffer<String, String>)buffer, context, 0L, 0L, "zxcv", "3gon4i");
        MatcherAssert.assertThat((Object)buffer.minTimestamp(), (Matcher)Matchers.is((Object)0L));
        TimeOrderedKeyValueBufferTest.cleanup(context, (TimeOrderedKeyValueBuffer<String, String>)buffer);
    }

    @Test
    public void shouldEvictOldestAndUpdateSizeAndCountAndMinTimestamp() {
        TimeOrderedKeyValueBuffer buffer = (TimeOrderedKeyValueBuffer)this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext context = TimeOrderedKeyValueBufferTest.makeContext();
        buffer.init((ProcessorContext)context, (StateStore)buffer);
        TimeOrderedKeyValueBufferTest.putRecord((TimeOrderedKeyValueBuffer<String, String>)buffer, context, 1L, 0L, "zxcv", "o23i4");
        MatcherAssert.assertThat((Object)buffer.numRecords(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat((Object)buffer.bufferSize(), (Matcher)Matchers.is((Object)42L));
        MatcherAssert.assertThat((Object)buffer.minTimestamp(), (Matcher)Matchers.is((Object)1L));
        TimeOrderedKeyValueBufferTest.putRecord((TimeOrderedKeyValueBuffer<String, String>)buffer, context, 0L, 0L, "asdf", "3ng");
        MatcherAssert.assertThat((Object)buffer.numRecords(), (Matcher)Matchers.is((Object)2));
        MatcherAssert.assertThat((Object)buffer.bufferSize(), (Matcher)Matchers.is((Object)82L));
        MatcherAssert.assertThat((Object)buffer.minTimestamp(), (Matcher)Matchers.is((Object)0L));
        AtomicInteger callbackCount = new AtomicInteger(0);
        buffer.evictWhile(() -> true, kv -> {
            switch (callbackCount.incrementAndGet()) {
                case 1: {
                    MatcherAssert.assertThat((Object)kv.key(), (Matcher)Matchers.is((Object)"asdf"));
                    MatcherAssert.assertThat((Object)buffer.numRecords(), (Matcher)Matchers.is((Object)2));
                    MatcherAssert.assertThat((Object)buffer.bufferSize(), (Matcher)Matchers.is((Object)82L));
                    MatcherAssert.assertThat((Object)buffer.minTimestamp(), (Matcher)Matchers.is((Object)0L));
                    break;
                }
                case 2: {
                    MatcherAssert.assertThat((Object)kv.key(), (Matcher)Matchers.is((Object)"zxcv"));
                    MatcherAssert.assertThat((Object)buffer.numRecords(), (Matcher)Matchers.is((Object)1));
                    MatcherAssert.assertThat((Object)buffer.bufferSize(), (Matcher)Matchers.is((Object)42L));
                    MatcherAssert.assertThat((Object)buffer.minTimestamp(), (Matcher)Matchers.is((Object)1L));
                    break;
                }
                default: {
                    Assert.fail((String)"too many invocations");
                }
            }
        });
        MatcherAssert.assertThat((Object)callbackCount.get(), (Matcher)Matchers.is((Object)2));
        MatcherAssert.assertThat((Object)buffer.numRecords(), (Matcher)Matchers.is((Object)0));
        MatcherAssert.assertThat((Object)buffer.bufferSize(), (Matcher)Matchers.is((Object)0L));
        MatcherAssert.assertThat((Object)buffer.minTimestamp(), (Matcher)Matchers.is((Object)Long.MAX_VALUE));
        TimeOrderedKeyValueBufferTest.cleanup(context, (TimeOrderedKeyValueBuffer<String, String>)buffer);
    }

    @Test
    public void shouldReturnUndefinedOnPriorValueForNotBufferedKey() {
        TimeOrderedKeyValueBuffer buffer = (TimeOrderedKeyValueBuffer)this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext context = TimeOrderedKeyValueBufferTest.makeContext();
        buffer.init((ProcessorContext)context, (StateStore)buffer);
        MatcherAssert.assertThat((Object)buffer.priorValueForBuffered((Object)"ASDF"), (Matcher)Matchers.is((Object)Maybe.undefined()));
    }

    @Test
    public void shouldReturnPriorValueForBufferedKey() {
        TimeOrderedKeyValueBuffer buffer = (TimeOrderedKeyValueBuffer)this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext context = TimeOrderedKeyValueBufferTest.makeContext();
        buffer.init((ProcessorContext)context, (StateStore)buffer);
        ProcessorRecordContext recordContext = TimeOrderedKeyValueBufferTest.getContext(0L);
        context.setRecordContext(recordContext);
        buffer.put(1L, (Object)"A", new Change((Object)"new-value", (Object)"old-value"), recordContext);
        buffer.put(1L, (Object)"B", new Change((Object)"new-value", null), recordContext);
        MatcherAssert.assertThat((Object)buffer.priorValueForBuffered((Object)"A"), (Matcher)Matchers.is((Object)Maybe.defined((Object)ValueAndTimestamp.make((Object)"old-value", (long)-1L))));
        MatcherAssert.assertThat((Object)buffer.priorValueForBuffered((Object)"B"), (Matcher)Matchers.is((Object)Maybe.defined(null)));
    }

    @Test
    public void shouldFlush() {
        TimeOrderedKeyValueBuffer buffer = (TimeOrderedKeyValueBuffer)this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext context = TimeOrderedKeyValueBufferTest.makeContext();
        buffer.init((ProcessorContext)context, (StateStore)buffer);
        TimeOrderedKeyValueBufferTest.putRecord((TimeOrderedKeyValueBuffer<String, String>)buffer, context, 2L, 0L, "asdf", "2093j");
        TimeOrderedKeyValueBufferTest.putRecord((TimeOrderedKeyValueBuffer<String, String>)buffer, context, 1L, 1L, "zxcv", "3gon4i");
        TimeOrderedKeyValueBufferTest.putRecord((TimeOrderedKeyValueBuffer<String, String>)buffer, context, 0L, 2L, "deleteme", "deadbeef");
        buffer.evictWhile(() -> buffer.minTimestamp() < 1L, kv -> {});
        buffer.flush();
        List collected = ((MockRecordCollector)context.recordCollector()).collected().stream().map(pr -> {
            KeyValue niceValue;
            if (pr.value() == null) {
                niceValue = null;
            } else {
                byte[] serializedValue = (byte[])pr.value();
                ByteBuffer valueBuffer = ByteBuffer.wrap(serializedValue);
                BufferValue contextualRecord = BufferValue.deserialize((ByteBuffer)valueBuffer);
                long timestamp = valueBuffer.getLong();
                niceValue = new KeyValue((Object)timestamp, (Object)contextualRecord);
            }
            return new ProducerRecord(pr.topic(), pr.partition(), pr.timestamp(), (Object)pr.key().toString(), niceValue, (Iterable)pr.headers());
        }).collect(Collectors.toList());
        MatcherAssert.assertThat(collected, (Matcher)Matchers.is(Arrays.asList(new ProducerRecord("test-app-" + this.testName + "-changelog", Integer.valueOf(0), null, (Object)"deleteme", null, (Iterable)new RecordHeaders()), new ProducerRecord("test-app-" + this.testName + "-changelog", Integer.valueOf(0), null, (Object)"zxcv", (Object)new KeyValue((Object)1L, (Object)TimeOrderedKeyValueBufferTest.getBufferValue("3gon4i", 1L)), (Iterable)V_2_CHANGELOG_HEADERS), new ProducerRecord("test-app-" + this.testName + "-changelog", Integer.valueOf(0), null, (Object)"asdf", (Object)new KeyValue((Object)2L, (Object)TimeOrderedKeyValueBufferTest.getBufferValue("2093j", 0L)), (Iterable)V_2_CHANGELOG_HEADERS))));
        TimeOrderedKeyValueBufferTest.cleanup(context, (TimeOrderedKeyValueBuffer<String, String>)buffer);
    }

    @Test
    public void shouldRestoreOldFormat() {
        TimeOrderedKeyValueBuffer buffer = (TimeOrderedKeyValueBuffer)this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext context = TimeOrderedKeyValueBufferTest.makeContext();
        buffer.init((ProcessorContext)context, (StateStore)buffer);
        RecordBatchingStateRestoreCallback stateRestoreCallback = (RecordBatchingStateRestoreCallback)context.stateRestoreCallback(this.testName);
        context.setRecordContext(new ProcessorRecordContext(0L, 0L, 0, "", null));
        FullChangeSerde serializer = FullChangeSerde.wrap((Serde)Serdes.String());
        byte[] todeleteValue = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray((Change)serializer.serializeParts(null, new Change((Object)"doomed", null)));
        byte[] asdfValue = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray((Change)serializer.serializeParts(null, new Change((Object)"qwer", null)));
        byte[] zxcvValue1 = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray((Change)serializer.serializeParts(null, new Change((Object)"eo4im", (Object)"previous")));
        byte[] zxcvValue2 = FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray((Change)serializer.serializeParts(null, new Change((Object)"next", (Object)"eo4im")));
        stateRestoreCallback.restoreBatch(Arrays.asList(new ConsumerRecord("changelog-topic", 0, 0L, 0L, TimestampType.CREATE_TIME, -1L, -1, -1, (Object)"todelete".getBytes(StandardCharsets.UTF_8), (Object)ByteBuffer.allocate(8 + todeleteValue.length).putLong(0L).put(todeleteValue).array()), new ConsumerRecord("changelog-topic", 0, 1L, 1L, TimestampType.CREATE_TIME, -1L, -1, -1, (Object)"asdf".getBytes(StandardCharsets.UTF_8), (Object)ByteBuffer.allocate(8 + asdfValue.length).putLong(2L).put(asdfValue).array()), new ConsumerRecord("changelog-topic", 0, 2L, 2L, TimestampType.CREATE_TIME, -1L, -1, -1, (Object)"zxcv".getBytes(StandardCharsets.UTF_8), (Object)ByteBuffer.allocate(8 + zxcvValue1.length).putLong(1L).put(zxcvValue1).array()), new ConsumerRecord("changelog-topic", 0, 3L, 3L, TimestampType.CREATE_TIME, -1L, -1, -1, (Object)"zxcv".getBytes(StandardCharsets.UTF_8), (Object)ByteBuffer.allocate(8 + zxcvValue2.length).putLong(1L).put(zxcvValue2).array())));
        MatcherAssert.assertThat((Object)buffer.numRecords(), (Matcher)Matchers.is((Object)3));
        MatcherAssert.assertThat((Object)buffer.minTimestamp(), (Matcher)Matchers.is((Object)0L));
        MatcherAssert.assertThat((Object)buffer.bufferSize(), (Matcher)Matchers.is((Object)172L));
        stateRestoreCallback.restoreBatch(Collections.singletonList(new ConsumerRecord("changelog-topic", 0, 3L, 3L, TimestampType.CREATE_TIME, -1L, -1, -1, (Object)"todelete".getBytes(StandardCharsets.UTF_8), null)));
        MatcherAssert.assertThat((Object)buffer.numRecords(), (Matcher)Matchers.is((Object)2));
        MatcherAssert.assertThat((Object)buffer.minTimestamp(), (Matcher)Matchers.is((Object)1L));
        MatcherAssert.assertThat((Object)buffer.bufferSize(), (Matcher)Matchers.is((Object)115L));
        MatcherAssert.assertThat((Object)buffer.priorValueForBuffered((Object)"todelete"), (Matcher)Matchers.is((Object)Maybe.undefined()));
        MatcherAssert.assertThat((Object)buffer.priorValueForBuffered((Object)"asdf"), (Matcher)Matchers.is((Object)Maybe.defined(null)));
        MatcherAssert.assertThat((Object)buffer.priorValueForBuffered((Object)"zxcv"), (Matcher)Matchers.is((Object)Maybe.defined((Object)ValueAndTimestamp.make((Object)"previous", (long)-1L))));
        LinkedList evicted = new LinkedList();
        buffer.evictWhile(() -> true, evicted::add);
        MatcherAssert.assertThat(evicted, (Matcher)Matchers.is(Arrays.asList(new TimeOrderedKeyValueBuffer.Eviction((Object)"zxcv", new Change((Object)"next", (Object)"eo4im"), new ProcessorRecordContext(3L, 3L, 0, "changelog-topic", (Headers)new RecordHeaders())), new TimeOrderedKeyValueBuffer.Eviction((Object)"asdf", new Change((Object)"qwer", null), new ProcessorRecordContext(1L, 1L, 0, "changelog-topic", (Headers)new RecordHeaders())))));
        TimeOrderedKeyValueBufferTest.cleanup(context, (TimeOrderedKeyValueBuffer<String, String>)buffer);
    }

    @Test
    public void shouldRestoreV1Format() {
        TimeOrderedKeyValueBuffer buffer = (TimeOrderedKeyValueBuffer)this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext context = TimeOrderedKeyValueBufferTest.makeContext();
        buffer.init((ProcessorContext)context, (StateStore)buffer);
        RecordBatchingStateRestoreCallback stateRestoreCallback = (RecordBatchingStateRestoreCallback)context.stateRestoreCallback(this.testName);
        context.setRecordContext(new ProcessorRecordContext(0L, 0L, 0, "", null));
        RecordHeaders v1FlagHeaders = new RecordHeaders(new Header[]{new RecordHeader("v", new byte[]{1})});
        byte[] todeleteValue = TimeOrderedKeyValueBufferTest.getContextualRecord("doomed", 0L).serialize(0).array();
        byte[] asdfValue = TimeOrderedKeyValueBufferTest.getContextualRecord("qwer", 1L).serialize(0).array();
        FullChangeSerde fullChangeSerde = FullChangeSerde.wrap((Serde)Serdes.String());
        byte[] zxcvValue1 = new ContextualRecord(FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray((Change)fullChangeSerde.serializeParts(null, new Change((Object)"3o4im", (Object)"previous"))), TimeOrderedKeyValueBufferTest.getContext(2L)).serialize(0).array();
        byte[] zxcvValue2 = new ContextualRecord(FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray((Change)fullChangeSerde.serializeParts(null, new Change((Object)"next", (Object)"3o4im"))), TimeOrderedKeyValueBufferTest.getContext(3L)).serialize(0).array();
        stateRestoreCallback.restoreBatch(Arrays.asList(new ConsumerRecord("changelog-topic", 0, 0L, 999L, TimestampType.CREATE_TIME, Long.valueOf(-1L), -1, -1, (Object)"todelete".getBytes(StandardCharsets.UTF_8), (Object)ByteBuffer.allocate(8 + todeleteValue.length).putLong(0L).put(todeleteValue).array(), (Headers)v1FlagHeaders), new ConsumerRecord("changelog-topic", 0, 1L, 9999L, TimestampType.CREATE_TIME, Long.valueOf(-1L), -1, -1, (Object)"asdf".getBytes(StandardCharsets.UTF_8), (Object)ByteBuffer.allocate(8 + asdfValue.length).putLong(2L).put(asdfValue).array(), (Headers)v1FlagHeaders), new ConsumerRecord("changelog-topic", 0, 2L, 99L, TimestampType.CREATE_TIME, Long.valueOf(-1L), -1, -1, (Object)"zxcv".getBytes(StandardCharsets.UTF_8), (Object)ByteBuffer.allocate(8 + zxcvValue1.length).putLong(1L).put(zxcvValue1).array(), (Headers)v1FlagHeaders), new ConsumerRecord("changelog-topic", 0, 3L, 100L, TimestampType.CREATE_TIME, Long.valueOf(-1L), -1, -1, (Object)"zxcv".getBytes(StandardCharsets.UTF_8), (Object)ByteBuffer.allocate(8 + zxcvValue2.length).putLong(1L).put(zxcvValue2).array(), (Headers)v1FlagHeaders)));
        MatcherAssert.assertThat((Object)buffer.numRecords(), (Matcher)Matchers.is((Object)3));
        MatcherAssert.assertThat((Object)buffer.minTimestamp(), (Matcher)Matchers.is((Object)0L));
        MatcherAssert.assertThat((Object)buffer.bufferSize(), (Matcher)Matchers.is((Object)142L));
        stateRestoreCallback.restoreBatch(Collections.singletonList(new ConsumerRecord("changelog-topic", 0, 3L, 3L, TimestampType.CREATE_TIME, -1L, -1, -1, (Object)"todelete".getBytes(StandardCharsets.UTF_8), null)));
        MatcherAssert.assertThat((Object)buffer.numRecords(), (Matcher)Matchers.is((Object)2));
        MatcherAssert.assertThat((Object)buffer.minTimestamp(), (Matcher)Matchers.is((Object)1L));
        MatcherAssert.assertThat((Object)buffer.bufferSize(), (Matcher)Matchers.is((Object)95L));
        MatcherAssert.assertThat((Object)buffer.priorValueForBuffered((Object)"todelete"), (Matcher)Matchers.is((Object)Maybe.undefined()));
        MatcherAssert.assertThat((Object)buffer.priorValueForBuffered((Object)"asdf"), (Matcher)Matchers.is((Object)Maybe.defined(null)));
        MatcherAssert.assertThat((Object)buffer.priorValueForBuffered((Object)"zxcv"), (Matcher)Matchers.is((Object)Maybe.defined((Object)ValueAndTimestamp.make((Object)"previous", (long)-1L))));
        LinkedList evicted = new LinkedList();
        buffer.evictWhile(() -> true, evicted::add);
        MatcherAssert.assertThat(evicted, (Matcher)Matchers.is(Arrays.asList(new TimeOrderedKeyValueBuffer.Eviction((Object)"zxcv", new Change((Object)"next", (Object)"3o4im"), TimeOrderedKeyValueBufferTest.getContext(3L)), new TimeOrderedKeyValueBuffer.Eviction((Object)"asdf", new Change((Object)"qwer", null), TimeOrderedKeyValueBufferTest.getContext(1L)))));
        TimeOrderedKeyValueBufferTest.cleanup(context, (TimeOrderedKeyValueBuffer<String, String>)buffer);
    }

    @Test
    public void shouldRestoreV2Format() {
        TimeOrderedKeyValueBuffer buffer = (TimeOrderedKeyValueBuffer)this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext context = TimeOrderedKeyValueBufferTest.makeContext();
        buffer.init((ProcessorContext)context, (StateStore)buffer);
        RecordBatchingStateRestoreCallback stateRestoreCallback = (RecordBatchingStateRestoreCallback)context.stateRestoreCallback(this.testName);
        context.setRecordContext(new ProcessorRecordContext(0L, 0L, 0, "", null));
        RecordHeaders v2FlagHeaders = new RecordHeaders(new Header[]{new RecordHeader("v", new byte[]{2})});
        byte[] todeleteValue = TimeOrderedKeyValueBufferTest.getBufferValue("doomed", 0L).serialize(0).array();
        byte[] asdfValue = TimeOrderedKeyValueBufferTest.getBufferValue("qwer", 1L).serialize(0).array();
        byte[] zxcvValue1 = new BufferValue(Serdes.String().serializer().serialize(null, (Object)"previous"), Serdes.String().serializer().serialize(null, (Object)"IGNORED"), Serdes.String().serializer().serialize(null, (Object)"3o4im"), TimeOrderedKeyValueBufferTest.getContext(2L)).serialize(0).array();
        byte[] zxcvValue2 = new BufferValue(Serdes.String().serializer().serialize(null, (Object)"previous"), Serdes.String().serializer().serialize(null, (Object)"3o4im"), Serdes.String().serializer().serialize(null, (Object)"next"), TimeOrderedKeyValueBufferTest.getContext(3L)).serialize(0).array();
        stateRestoreCallback.restoreBatch(Arrays.asList(new ConsumerRecord("changelog-topic", 0, 0L, 999L, TimestampType.CREATE_TIME, Long.valueOf(-1L), -1, -1, (Object)"todelete".getBytes(StandardCharsets.UTF_8), (Object)ByteBuffer.allocate(8 + todeleteValue.length).put(todeleteValue).putLong(0L).array(), (Headers)v2FlagHeaders), new ConsumerRecord("changelog-topic", 0, 1L, 9999L, TimestampType.CREATE_TIME, Long.valueOf(-1L), -1, -1, (Object)"asdf".getBytes(StandardCharsets.UTF_8), (Object)ByteBuffer.allocate(8 + asdfValue.length).put(asdfValue).putLong(2L).array(), (Headers)v2FlagHeaders), new ConsumerRecord("changelog-topic", 0, 2L, 99L, TimestampType.CREATE_TIME, Long.valueOf(-1L), -1, -1, (Object)"zxcv".getBytes(StandardCharsets.UTF_8), (Object)ByteBuffer.allocate(8 + zxcvValue1.length).put(zxcvValue1).putLong(1L).array(), (Headers)v2FlagHeaders), new ConsumerRecord("changelog-topic", 0, 2L, 100L, TimestampType.CREATE_TIME, Long.valueOf(-1L), -1, -1, (Object)"zxcv".getBytes(StandardCharsets.UTF_8), (Object)ByteBuffer.allocate(8 + zxcvValue2.length).put(zxcvValue2).putLong(1L).array(), (Headers)v2FlagHeaders)));
        MatcherAssert.assertThat((Object)buffer.numRecords(), (Matcher)Matchers.is((Object)3));
        MatcherAssert.assertThat((Object)buffer.minTimestamp(), (Matcher)Matchers.is((Object)0L));
        MatcherAssert.assertThat((Object)buffer.bufferSize(), (Matcher)Matchers.is((Object)142L));
        stateRestoreCallback.restoreBatch(Collections.singletonList(new ConsumerRecord("changelog-topic", 0, 3L, 3L, TimestampType.CREATE_TIME, -1L, -1, -1, (Object)"todelete".getBytes(StandardCharsets.UTF_8), null)));
        MatcherAssert.assertThat((Object)buffer.numRecords(), (Matcher)Matchers.is((Object)2));
        MatcherAssert.assertThat((Object)buffer.minTimestamp(), (Matcher)Matchers.is((Object)1L));
        MatcherAssert.assertThat((Object)buffer.bufferSize(), (Matcher)Matchers.is((Object)95L));
        MatcherAssert.assertThat((Object)buffer.priorValueForBuffered((Object)"todelete"), (Matcher)Matchers.is((Object)Maybe.undefined()));
        MatcherAssert.assertThat((Object)buffer.priorValueForBuffered((Object)"asdf"), (Matcher)Matchers.is((Object)Maybe.defined(null)));
        MatcherAssert.assertThat((Object)buffer.priorValueForBuffered((Object)"zxcv"), (Matcher)Matchers.is((Object)Maybe.defined((Object)ValueAndTimestamp.make((Object)"previous", (long)-1L))));
        LinkedList evicted = new LinkedList();
        buffer.evictWhile(() -> true, evicted::add);
        MatcherAssert.assertThat(evicted, (Matcher)Matchers.is(Arrays.asList(new TimeOrderedKeyValueBuffer.Eviction((Object)"zxcv", new Change((Object)"next", (Object)"3o4im"), TimeOrderedKeyValueBufferTest.getContext(3L)), new TimeOrderedKeyValueBuffer.Eviction((Object)"asdf", new Change((Object)"qwer", null), TimeOrderedKeyValueBufferTest.getContext(1L)))));
        TimeOrderedKeyValueBufferTest.cleanup(context, (TimeOrderedKeyValueBuffer<String, String>)buffer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void shouldNotRestoreUnrecognizedVersionRecord() {
        TimeOrderedKeyValueBuffer buffer = (TimeOrderedKeyValueBuffer)this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext context = TimeOrderedKeyValueBufferTest.makeContext();
        buffer.init((ProcessorContext)context, (StateStore)buffer);
        RecordBatchingStateRestoreCallback stateRestoreCallback = (RecordBatchingStateRestoreCallback)context.stateRestoreCallback(this.testName);
        context.setRecordContext(new ProcessorRecordContext(0L, 0L, 0, "", null));
        RecordHeaders unknownFlagHeaders = new RecordHeaders(new Header[]{new RecordHeader("v", new byte[]{-1})});
        byte[] todeleteValue = TimeOrderedKeyValueBufferTest.getBufferValue("doomed", 0L).serialize(0).array();
        try {
            stateRestoreCallback.restoreBatch(Collections.singletonList(new ConsumerRecord("changelog-topic", 0, 0L, 999L, TimestampType.CREATE_TIME, Long.valueOf(-1L), -1, -1, (Object)"todelete".getBytes(StandardCharsets.UTF_8), (Object)ByteBuffer.allocate(8 + todeleteValue.length).putLong(0L).put(todeleteValue).array(), (Headers)unknownFlagHeaders)));
            Assert.fail((String)"expected an exception");
        }
        catch (IllegalArgumentException illegalArgumentException) {
        }
        finally {
            TimeOrderedKeyValueBufferTest.cleanup(context, (TimeOrderedKeyValueBuffer<String, String>)buffer);
        }
    }

    private static void putRecord(TimeOrderedKeyValueBuffer<String, String> buffer, MockInternalProcessorContext context, long streamTime, long recordTimestamp, String key, String value) {
        ProcessorRecordContext recordContext = TimeOrderedKeyValueBufferTest.getContext(recordTimestamp);
        context.setRecordContext(recordContext);
        buffer.put(streamTime, (Object)key, new Change((Object)value, null), recordContext);
    }

    private static BufferValue getBufferValue(String value, long timestamp) {
        return new BufferValue(null, null, Serdes.String().serializer().serialize(null, (Object)value), TimeOrderedKeyValueBufferTest.getContext(timestamp));
    }

    private static ContextualRecord getContextualRecord(String value, long timestamp) {
        FullChangeSerde fullChangeSerde = FullChangeSerde.wrap((Serde)Serdes.String());
        return new ContextualRecord(FullChangeSerde.mergeChangeArraysIntoSingleLegacyFormattedArray((Change)fullChangeSerde.serializeParts(null, new Change((Object)value, null))), TimeOrderedKeyValueBufferTest.getContext(timestamp));
    }

    private static ProcessorRecordContext getContext(long recordTimestamp) {
        return new ProcessorRecordContext(recordTimestamp, 0L, 0, "topic", null);
    }

    public static final class NullRejectingStringSerializer
    extends StringSerializer {
        public byte[] serialize(String topic, String data) {
            if (data == null) {
                throw new IllegalArgumentException();
            }
            return super.serialize(topic, data);
        }
    }
}

