/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.WindowKeySchema;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public abstract class AbstractWindowBytesStoreTest {
    static final long WINDOW_SIZE = 3L;
    static final long SEGMENT_INTERVAL = 60000L;
    static final long RETENTION_PERIOD = 120000L;
    final long defaultStartTime = 59996L;
    final KeyValue<Windowed<Integer>, String> zero = AbstractWindowBytesStoreTest.windowedPair(0, "zero", 59996L);
    final KeyValue<Windowed<Integer>, String> one = AbstractWindowBytesStoreTest.windowedPair(1, "one", 59997L);
    final KeyValue<Windowed<Integer>, String> two = AbstractWindowBytesStoreTest.windowedPair(2, "two", 59998L);
    final KeyValue<Windowed<Integer>, String> three = AbstractWindowBytesStoreTest.windowedPair(3, "three", 59998L);
    final KeyValue<Windowed<Integer>, String> four = AbstractWindowBytesStoreTest.windowedPair(4, "four", 60000L);
    final KeyValue<Windowed<Integer>, String> five = AbstractWindowBytesStoreTest.windowedPair(5, "five", 60001L);
    WindowStore<Integer, String> windowStore;
    InternalMockProcessorContext context;
    MockRecordCollector recordCollector;
    final File baseDir = TestUtils.tempDirectory((String)"test");
    private final StateSerdes<Integer, String> serdes = new StateSerdes("", Serdes.Integer(), Serdes.String());

    abstract <K, V> WindowStore<K, V> buildWindowStore(long var1, long var3, boolean var5, Serde<K> var6, Serde<V> var7);

    @Before
    public void setup() {
        this.windowStore = this.buildWindowStore(120000L, 3L, false, Serdes.Integer(), Serdes.String());
        this.recordCollector = new MockRecordCollector();
        this.context = new InternalMockProcessorContext(this.baseDir, Serdes.String(), Serdes.Integer(), this.recordCollector, new ThreadCache(new LogContext("testCache"), 0L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics())));
        this.context.setTime(1L);
        this.windowStore.init((StateStoreContext)this.context, this.windowStore);
    }

    @After
    public void after() {
        this.windowStore.close();
    }

    @Test
    public void testRangeAndSinglePointFetch() {
        this.putFirstBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("zero")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        this.putSecondBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals((Object)"two+1", (Object)this.windowStore.fetch((Object)2, 59999L));
        Assert.assertEquals((Object)"two+2", (Object)this.windowStore.fetch((Object)2, 60000L));
        Assert.assertEquals((Object)"two+3", (Object)this.windowStore.fetch((Object)2, 60001L));
        Assert.assertEquals((Object)"two+4", (Object)this.windowStore.fetch((Object)2, 60002L));
        Assert.assertEquals((Object)"two+5", (Object)this.windowStore.fetch((Object)2, 60003L));
        Assert.assertEquals((Object)"two+6", (Object)this.windowStore.fetch((Object)2, 60004L));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59991L), Instant.ofEpochMilli(59997L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("two")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59992L), Instant.ofEpochMilli(59998L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1", "two+2")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59994L), Instant.ofEpochMilli(60000L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1", "two+2", "two+3")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59995L), Instant.ofEpochMilli(60001L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1", "two+2", "two+3", "two+4")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59996L), Instant.ofEpochMilli(60002L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1", "two+2", "two+3", "two+4", "two+5")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60003L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(60004L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59999L), Instant.ofEpochMilli(60005L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+2", "two+3", "two+4", "two+5", "two+6")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60000L), Instant.ofEpochMilli(60006L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+3", "two+4", "two+5", "two+6")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60001L), Instant.ofEpochMilli(60007L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+4", "two+5", "two+6")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60002L), Instant.ofEpochMilli(60008L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+5", "two+6")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60003L), Instant.ofEpochMilli(60009L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("two+6")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60004L), Instant.ofEpochMilli(60010L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60005L), Instant.ofEpochMilli(60011L))));
        this.windowStore.flush();
        ArrayList<KeyValue<byte[], byte[]>> changeLog = new ArrayList<KeyValue<byte[], byte[]>>();
        for (ProducerRecord<Object, Object> record : this.recordCollector.collected()) {
            changeLog.add((KeyValue<byte[], byte[]>)new KeyValue((Object)((Bytes)record.key()).get(), (Object)((byte[])record.value())));
        }
        Map<Integer, Set<String>> entriesByKey = this.entriesByKey(changeLog, 59996L);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"zero@0"}), entriesByKey.get(0));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"one@1"}), entriesByKey.get(1));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"}), entriesByKey.get(2));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"three@2"}), entriesByKey.get(3));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"four@4"}), entriesByKey.get(4));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"five@5"}), entriesByKey.get(5));
        Assert.assertNull(entriesByKey.get(6));
    }

    @Test
    public void shouldGetAll() {
        this.putFirstBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals(Arrays.asList(this.zero, this.one, this.two, this.three, this.four, this.five), StreamsTestUtils.toList(this.windowStore.all()));
    }

    @Test
    public void shouldGetAllNonDeletedRecords() {
        this.windowStore.put((Object)0, (Object)"zero", 59996L);
        this.windowStore.put((Object)1, (Object)"one", 59997L);
        this.windowStore.put((Object)2, (Object)"two", 59998L);
        this.windowStore.put((Object)3, (Object)"three", 59999L);
        this.windowStore.put((Object)4, (Object)"four", 60000L);
        this.windowStore.put((Object)1, null, 59997L);
        this.windowStore.put((Object)3, null, 59999L);
        Assert.assertEquals(Arrays.asList(this.zero, this.two, this.four), StreamsTestUtils.toList(this.windowStore.all()));
    }

    @Test
    public void shouldGetAllReturnTimestampOrderedRecords() {
        this.windowStore.put((Object)4, (Object)"four", 60000L);
        this.windowStore.put((Object)0, (Object)"zero", 59996L);
        this.windowStore.put((Object)2, (Object)"two", 59998L);
        this.windowStore.put((Object)3, (Object)"three", 59999L);
        this.windowStore.put((Object)1, (Object)"one", 59997L);
        KeyValue<Windowed<Integer>, String> three = AbstractWindowBytesStoreTest.windowedPair(3, "three", 59999L);
        Assert.assertEquals(Arrays.asList(this.zero, this.one, this.two, three, this.four), StreamsTestUtils.toList(this.windowStore.all()));
    }

    @Test
    public void shouldEarlyClosedIteratorStillGetAllRecords() {
        this.windowStore.put((Object)0, (Object)"zero", 59996L);
        this.windowStore.put((Object)1, (Object)"one", 59997L);
        KeyValueIterator it = this.windowStore.all();
        Assert.assertEquals(this.zero, (Object)it.next());
        it.close();
        Assert.assertEquals(Arrays.asList(this.zero, this.one), StreamsTestUtils.toList(this.windowStore.all()));
    }

    @Test
    public void shouldGetBackwardAll() {
        this.putFirstBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals(Arrays.asList(this.five, this.four, this.three, this.two, this.one, this.zero), StreamsTestUtils.toList(this.windowStore.backwardAll()));
    }

    @Test
    public void shouldFetchAllInTimeRange() {
        this.putFirstBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals(Arrays.asList(this.one, this.two, this.three, this.four), StreamsTestUtils.toList(this.windowStore.fetchAll(Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60000L))));
        Assert.assertEquals(Arrays.asList(this.zero, this.one, this.two, this.three), StreamsTestUtils.toList(this.windowStore.fetchAll(Instant.ofEpochMilli(59996L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Arrays.asList(this.one, this.two, this.three, this.four, this.five), StreamsTestUtils.toList(this.windowStore.fetchAll(Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60001L))));
    }

    @Test
    public void shouldBackwardFetchAllInTimeRange() {
        this.putFirstBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals(Arrays.asList(this.four, this.three, this.two, this.one), StreamsTestUtils.toList(this.windowStore.backwardFetchAll(Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60000L))));
        Assert.assertEquals(Arrays.asList(this.three, this.two, this.one, this.zero), StreamsTestUtils.toList(this.windowStore.backwardFetchAll(Instant.ofEpochMilli(59996L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Arrays.asList(this.five, this.four, this.three, this.two, this.one), StreamsTestUtils.toList(this.windowStore.backwardFetchAll(Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60001L))));
    }

    @Test
    public void testFetchRange() {
        this.putFirstBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals(Arrays.asList(this.zero, this.one), StreamsTestUtils.toList(this.windowStore.fetch((Object)0, (Object)1, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Collections.singletonList(this.one), StreamsTestUtils.toList(this.windowStore.fetch((Object)1, (Object)1, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Arrays.asList(this.one, this.two, this.three), StreamsTestUtils.toList(this.windowStore.fetch((Object)1, (Object)3, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Arrays.asList(this.zero, this.one, this.two, this.three), StreamsTestUtils.toList(this.windowStore.fetch((Object)0, (Object)5, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Arrays.asList(this.zero, this.one, this.two, this.three, this.four, this.five), StreamsTestUtils.toList(this.windowStore.fetch((Object)0, (Object)5, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(60004L))));
        Assert.assertEquals(Arrays.asList(this.two, this.three, this.four, this.five), StreamsTestUtils.toList(this.windowStore.fetch((Object)0, (Object)5, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(60004L))));
        Assert.assertEquals(Collections.emptyList(), StreamsTestUtils.toList(this.windowStore.fetch((Object)4, (Object)5, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Collections.emptyList(), StreamsTestUtils.toList(this.windowStore.fetch((Object)0, (Object)3, Instant.ofEpochMilli(59999L), Instant.ofEpochMilli(60004L))));
        Assert.assertEquals(Arrays.asList(this.zero, this.one, this.two), StreamsTestUtils.toList(this.windowStore.fetch(null, (Object)2, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(60001L))));
        Assert.assertEquals(Arrays.asList(this.two, this.three, this.four, this.five), StreamsTestUtils.toList(this.windowStore.fetch((Object)2, null, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(60004L))));
        Assert.assertEquals(Arrays.asList(this.zero, this.one, this.two, this.three, this.four, this.five), StreamsTestUtils.toList(this.windowStore.fetch(null, null, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(60004L))));
    }

    @Test
    public void testBackwardFetchRange() {
        this.putFirstBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals(Arrays.asList(this.one, this.zero), StreamsTestUtils.toList(this.windowStore.backwardFetch((Object)0, (Object)1, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Collections.singletonList(this.one), StreamsTestUtils.toList(this.windowStore.backwardFetch((Object)1, (Object)1, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Arrays.asList(this.three, this.two, this.one), StreamsTestUtils.toList(this.windowStore.backwardFetch((Object)1, (Object)3, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Arrays.asList(this.three, this.two, this.one, this.zero), StreamsTestUtils.toList(this.windowStore.backwardFetch((Object)0, (Object)5, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Arrays.asList(this.five, this.four, this.three, this.two, this.one, this.zero), StreamsTestUtils.toList(this.windowStore.backwardFetch((Object)0, (Object)5, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(60004L))));
        Assert.assertEquals(Arrays.asList(this.five, this.four, this.three, this.two), StreamsTestUtils.toList(this.windowStore.backwardFetch((Object)0, (Object)5, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(60004L))));
        Assert.assertEquals(Collections.emptyList(), StreamsTestUtils.toList(this.windowStore.backwardFetch((Object)4, (Object)5, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(Collections.emptyList(), StreamsTestUtils.toList(this.windowStore.backwardFetch((Object)0, (Object)3, Instant.ofEpochMilli(59999L), Instant.ofEpochMilli(60004L))));
        Assert.assertEquals(Arrays.asList(this.two, this.one, this.zero), StreamsTestUtils.toList(this.windowStore.backwardFetch(null, (Object)2, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(60001L))));
        Assert.assertEquals(Arrays.asList(this.five, this.four, this.three, this.two), StreamsTestUtils.toList(this.windowStore.backwardFetch((Object)2, null, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(60004L))));
        Assert.assertEquals(Arrays.asList(this.five, this.four, this.three, this.two, this.one, this.zero), StreamsTestUtils.toList(this.windowStore.backwardFetch(null, null, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(60004L))));
    }

    @Test
    public void testPutAndFetchBefore() {
        this.putFirstBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("zero")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59996L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("one")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)1, Instant.ofEpochMilli(59994L), Instant.ofEpochMilli(59997L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("two")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59995L), Instant.ofEpochMilli(59998L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("three")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)3, Instant.ofEpochMilli(59996L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("four")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)4, Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60000L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("five")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)5, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(60001L))));
        this.putSecondBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59992L), Instant.ofEpochMilli(59995L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59996L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59994L), Instant.ofEpochMilli(59997L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("two")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59995L), Instant.ofEpochMilli(59998L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59996L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1", "two+2")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60000L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1", "two+2", "two+3")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(60001L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+1", "two+2", "two+3", "two+4")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59999L), Instant.ofEpochMilli(60002L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+2", "two+3", "two+4", "two+5")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60000L), Instant.ofEpochMilli(60003L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+3", "two+4", "two+5", "two+6")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60001L), Instant.ofEpochMilli(60004L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+4", "two+5", "two+6")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60002L), Instant.ofEpochMilli(60005L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+5", "two+6")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60003L), Instant.ofEpochMilli(60006L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("two+6")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60004L), Instant.ofEpochMilli(60007L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60005L), Instant.ofEpochMilli(60008L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60006L), Instant.ofEpochMilli(60009L))));
        this.windowStore.flush();
        ArrayList<KeyValue<byte[], byte[]>> changeLog = new ArrayList<KeyValue<byte[], byte[]>>();
        for (ProducerRecord<Object, Object> record : this.recordCollector.collected()) {
            changeLog.add((KeyValue<byte[], byte[]>)new KeyValue((Object)((Bytes)record.key()).get(), (Object)((byte[])record.value())));
        }
        Map<Integer, Set<String>> entriesByKey = this.entriesByKey(changeLog, 59996L);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"zero@0"}), entriesByKey.get(0));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"one@1"}), entriesByKey.get(1));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"}), entriesByKey.get(2));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"three@2"}), entriesByKey.get(3));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"four@4"}), entriesByKey.get(4));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"five@5"}), entriesByKey.get(5));
        Assert.assertNull(entriesByKey.get(6));
    }

    @Test
    public void testPutAndFetchAfter() {
        this.putFirstBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("zero")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59996L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("one")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)1, Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60000L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("two")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(60001L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)3, Instant.ofEpochMilli(59999L), Instant.ofEpochMilli(60002L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("four")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)4, Instant.ofEpochMilli(60000L), Instant.ofEpochMilli(60003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("five")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)5, Instant.ofEpochMilli(60001L), Instant.ofEpochMilli(60004L))));
        this.putSecondBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59994L), Instant.ofEpochMilli(59997L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("two")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59995L), Instant.ofEpochMilli(59998L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59996L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1", "two+2")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60000L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1", "two+2", "two+3")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(60001L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+1", "two+2", "two+3", "two+4")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59999L), Instant.ofEpochMilli(60002L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+2", "two+3", "two+4", "two+5")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60000L), Instant.ofEpochMilli(60003L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+3", "two+4", "two+5", "two+6")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60001L), Instant.ofEpochMilli(60004L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+4", "two+5", "two+6")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60002L), Instant.ofEpochMilli(60005L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+5", "two+6")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60003L), Instant.ofEpochMilli(60006L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("two+6")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60004L), Instant.ofEpochMilli(60007L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60005L), Instant.ofEpochMilli(60008L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60006L), Instant.ofEpochMilli(60009L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60007L), Instant.ofEpochMilli(60010L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60008L), Instant.ofEpochMilli(60011L))));
        this.windowStore.flush();
        ArrayList<KeyValue<byte[], byte[]>> changeLog = new ArrayList<KeyValue<byte[], byte[]>>();
        for (ProducerRecord<Object, Object> record : this.recordCollector.collected()) {
            changeLog.add((KeyValue<byte[], byte[]>)new KeyValue((Object)((Bytes)record.key()).get(), (Object)((byte[])record.value())));
        }
        Map<Integer, Set<String>> entriesByKey = this.entriesByKey(changeLog, 59996L);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"zero@0"}), entriesByKey.get(0));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"one@1"}), entriesByKey.get(1));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"}), entriesByKey.get(2));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"three@2"}), entriesByKey.get(3));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"four@4"}), entriesByKey.get(4));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"five@5"}), entriesByKey.get(5));
        Assert.assertNull(entriesByKey.get(6));
    }

    @Test
    public void testPutSameKeyTimestamp() {
        this.windowStore.close();
        this.windowStore = this.buildWindowStore(120000L, 3L, true, Serdes.Integer(), Serdes.String());
        this.windowStore.init((StateStoreContext)this.context, this.windowStore);
        this.windowStore.put((Object)0, (Object)"zero", 59996L);
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("zero")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        this.windowStore.put((Object)0, (Object)"zero", 59996L);
        this.windowStore.put((Object)0, (Object)"zero+", 59996L);
        this.windowStore.put((Object)0, (Object)"zero++", 59996L);
        Assert.assertEquals(new HashSet<String>(Arrays.asList("zero", "zero", "zero+", "zero++")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("zero", "zero", "zero+", "zero++")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59994L), Instant.ofEpochMilli(60000L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("zero", "zero", "zero+", "zero++")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59995L), Instant.ofEpochMilli(60001L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("zero", "zero", "zero+", "zero++")), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59996L), Instant.ofEpochMilli(60002L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60003L))));
        this.windowStore.flush();
        ArrayList<KeyValue<byte[], byte[]>> changeLog = new ArrayList<KeyValue<byte[], byte[]>>();
        for (ProducerRecord<Object, Object> record : this.recordCollector.collected()) {
            changeLog.add((KeyValue<byte[], byte[]>)new KeyValue((Object)((Bytes)record.key()).get(), (Object)((byte[])record.value())));
        }
        Map<Integer, Set<String>> entriesByKey = this.entriesByKey(changeLog, 59996L);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"zero@0", "zero@0", "zero+@0", "zero++@0"}), entriesByKey.get(0));
    }

    @Test
    public void shouldCloseOpenIteratorsWhenStoreIsClosedAndNotThrowInvalidStateStoreExceptionOnHasNext() {
        this.windowStore.put((Object)1, (Object)"one", 1L);
        this.windowStore.put((Object)1, (Object)"two", 2L);
        this.windowStore.put((Object)1, (Object)"three", 3L);
        try (WindowStoreIterator iterator = this.windowStore.fetch((Object)1, Instant.ofEpochMilli(1L), Instant.ofEpochMilli(3L));){
            Assert.assertTrue((boolean)iterator.hasNext());
            this.windowStore.close();
            Assert.assertFalse((boolean)iterator.hasNext());
        }
    }

    @Test
    public void shouldFetchAndIterateOverExactKeys() {
        long windowSize = 0x7A00000000000000L;
        long retentionPeriod = 0x7A00000000000000L;
        WindowStore windowStore = this.buildWindowStore(0x7A00000000000000L, 0x7A00000000000000L, false, Serdes.String(), Serdes.String());
        windowStore.init((StateStoreContext)this.context, windowStore);
        windowStore.put((Object)"a", (Object)"0001", 0L);
        windowStore.put((Object)"aa", (Object)"0002", 0L);
        windowStore.put((Object)"a", (Object)"0003", 1L);
        windowStore.put((Object)"aa", (Object)"0004", 1L);
        windowStore.put((Object)"a", (Object)"0005", 0x79FFFFFFFFFFFFFFL);
        HashSet<String> expected = new HashSet<String>(Arrays.asList("0001", "0003", "0005"));
        MatcherAssert.assertThat(StreamsTestUtils.valuesToSet(windowStore.fetch((Object)"a", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))), (Matcher)CoreMatchers.equalTo(expected));
        Set set = StreamsTestUtils.toSet(windowStore.fetch((Object)"a", (Object)"a", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE)));
        MatcherAssert.assertThat(set, (Matcher)CoreMatchers.equalTo(new HashSet<KeyValue>(Arrays.asList(AbstractWindowBytesStoreTest.windowedPair("a", "0001", 0L, 0x7A00000000000000L), AbstractWindowBytesStoreTest.windowedPair("a", "0003", 1L, 0x7A00000000000000L), AbstractWindowBytesStoreTest.windowedPair("a", "0005", 0x79FFFFFFFFFFFFFFL, 0x7A00000000000000L)))));
        set = StreamsTestUtils.toSet(windowStore.fetch((Object)"aa", (Object)"aa", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE)));
        MatcherAssert.assertThat(set, (Matcher)CoreMatchers.equalTo(new HashSet<KeyValue>(Arrays.asList(AbstractWindowBytesStoreTest.windowedPair("aa", "0002", 0L, 0x7A00000000000000L), AbstractWindowBytesStoreTest.windowedPair("aa", "0004", 1L, 0x7A00000000000000L)))));
        windowStore.close();
    }

    @Test
    public void testDeleteAndUpdate() {
        long currentTime = 0L;
        this.windowStore.put((Object)1, (Object)"one", 0L);
        this.windowStore.put((Object)1, (Object)"one v2", 0L);
        WindowStoreIterator iterator = this.windowStore.fetch((Object)1, 0L, 0L);
        Assert.assertEquals((Object)new KeyValue((Object)0L, (Object)"one v2"), (Object)iterator.next());
        this.windowStore.put((Object)1, null, 0L);
        iterator = this.windowStore.fetch((Object)1, 0L, 0L);
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void shouldReturnNullOnWindowNotFound() {
        Assert.assertNull((Object)this.windowStore.fetch((Object)1, 0L));
    }

    @Test
    public void shouldThrowNullPointerExceptionOnPutNullKey() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowStore.put(null, (Object)"anyValue", 0L));
    }

    @Test
    public void shouldThrowNullPointerExceptionOnGetNullKey() {
        Assert.assertThrows(NullPointerException.class, () -> this.windowStore.fetch(null, Instant.ofEpochMilli(1L), Instant.ofEpochMilli(2L)));
    }

    @Test
    public void shouldFetchAndIterateOverExactBinaryKeys() {
        WindowStore windowStore = this.buildWindowStore(120000L, 3L, true, Serdes.Bytes(), Serdes.String());
        windowStore.init((StateStoreContext)this.context, windowStore);
        Bytes key1 = Bytes.wrap((byte[])new byte[]{0});
        Bytes key2 = Bytes.wrap((byte[])new byte[]{0, 0});
        Bytes key3 = Bytes.wrap((byte[])new byte[]{0, 0, 0});
        windowStore.put((Object)key1, (Object)"1", 0L);
        windowStore.put((Object)key2, (Object)"2", 0L);
        windowStore.put((Object)key3, (Object)"3", 0L);
        windowStore.put((Object)key1, (Object)"4", 1L);
        windowStore.put((Object)key2, (Object)"5", 1L);
        windowStore.put((Object)key3, (Object)"6", 59999L);
        windowStore.put((Object)key1, (Object)"7", 59999L);
        windowStore.put((Object)key2, (Object)"8", 59999L);
        windowStore.put((Object)key3, (Object)"9", 59999L);
        HashSet<String> expectedKey1 = new HashSet<String>(Arrays.asList("1", "4", "7"));
        MatcherAssert.assertThat(StreamsTestUtils.valuesToSet(windowStore.fetch((Object)key1, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))), (Matcher)CoreMatchers.equalTo(expectedKey1));
        HashSet<String> expectedKey2 = new HashSet<String>(Arrays.asList("2", "5", "8"));
        MatcherAssert.assertThat(StreamsTestUtils.valuesToSet(windowStore.fetch((Object)key2, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))), (Matcher)CoreMatchers.equalTo(expectedKey2));
        HashSet<String> expectedKey3 = new HashSet<String>(Arrays.asList("3", "6", "9"));
        MatcherAssert.assertThat(StreamsTestUtils.valuesToSet(windowStore.fetch((Object)key3, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))), (Matcher)CoreMatchers.equalTo(expectedKey3));
        windowStore.close();
    }

    @Test
    public void shouldReturnSameResultsForSingleKeyFetchAndEqualKeyRangeFetch() {
        this.windowStore.put((Object)1, (Object)"one", 0L);
        this.windowStore.put((Object)2, (Object)"two", 1L);
        this.windowStore.put((Object)2, (Object)"two", 2L);
        this.windowStore.put((Object)3, (Object)"three", 3L);
        try (WindowStoreIterator singleKeyIterator = this.windowStore.fetch((Object)2, 0L, 5L);
             KeyValueIterator keyRangeIterator = this.windowStore.fetch((Object)2, (Object)2, 0L, 5L);){
            Assert.assertEquals((Object)((KeyValue)singleKeyIterator.next()).value, (Object)((KeyValue)keyRangeIterator.next()).value);
            Assert.assertEquals((Object)((KeyValue)singleKeyIterator.next()).value, (Object)((KeyValue)keyRangeIterator.next()).value);
            Assert.assertFalse((boolean)singleKeyIterator.hasNext());
            Assert.assertFalse((boolean)keyRangeIterator.hasNext());
        }
    }

    @Test
    public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() {
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
             KeyValueIterator iterator = this.windowStore.fetch((Object)-1, (Object)1, 0L, 10L);){
            Assert.assertFalse((boolean)iterator.hasNext());
            List messages = appender.getMessages();
            MatcherAssert.assertThat((Object)messages, (Matcher)CoreMatchers.hasItem((Object)"Returning empty iterator for fetch with invalid key range: from > to. This may be due to range arguments set in the wrong order, or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers"));
        }
    }

    @Test
    public void shouldLogAndMeasureExpiredRecords() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        WindowStore windowStore = this.buildWindowStore(120000L, 3L, false, Serdes.Integer(), Serdes.String());
        InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), new StreamsConfig((Map)streamsConfig), this.recordCollector);
        SystemTime time = new SystemTime();
        context.setSystemTimeMs(time.milliseconds());
        context.setTime(1L);
        windowStore.init(context, windowStore);
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister();){
            windowStore.put((Object)1, (Object)"initial record", 240000L);
            windowStore.put((Object)1, (Object)"late record", 0L);
            windowStore.put((Object)1, (Object)"another on-time record", 120001L);
            List messages = appender.getMessages();
            MatcherAssert.assertThat((Object)messages, (Matcher)CoreMatchers.hasItem((Object)"Skipping record for expired segment."));
        }
        Map metrics = context.metrics().metrics();
        String threadId = Thread.currentThread().getName();
        Metric dropTotal = (Metric)metrics.get(new MetricName("dropped-records-total", "stream-task-metrics", "", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")})));
        Metric dropRate = (Metric)metrics.get(new MetricName("dropped-records-rate", "stream-task-metrics", "", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")})));
        Assert.assertEquals((Object)1.0, (Object)dropTotal.metricValue());
        Assert.assertNotEquals((Object)0.0, (Object)dropRate.metricValue());
        windowStore.close();
    }

    @Test
    public void shouldNotThrowExceptionWhenFetchRangeIsExpired() {
        this.windowStore.put((Object)1, (Object)"one", 0L);
        this.windowStore.put((Object)1, (Object)"two", 480000L);
        try (WindowStoreIterator iterator = this.windowStore.fetch((Object)1, 0L, 10L);){
            Assert.assertFalse((boolean)iterator.hasNext());
        }
    }

    @Test
    public void testWindowIteratorPeek() {
        long currentTime = 0L;
        this.windowStore.put((Object)1, (Object)"one", 0L);
        try (KeyValueIterator iterator = this.windowStore.fetchAll(0L, 0L);){
            Assert.assertTrue((boolean)iterator.hasNext());
            Windowed nextKey = (Windowed)iterator.peekNextKey();
            Assert.assertEquals((Object)iterator.peekNextKey(), (Object)nextKey);
            Assert.assertEquals((Object)iterator.peekNextKey(), (Object)((KeyValue)iterator.next()).key);
            Assert.assertFalse((boolean)iterator.hasNext());
        }
    }

    @Test
    public void testValueIteratorPeek() {
        this.windowStore.put((Object)1, (Object)"one", 0L);
        try (WindowStoreIterator iterator = this.windowStore.fetch((Object)1, 0L, 10L);){
            Assert.assertTrue((boolean)iterator.hasNext());
            Long nextKey = (Long)iterator.peekNextKey();
            Assert.assertEquals((Object)iterator.peekNextKey(), (Object)nextKey);
            Assert.assertEquals((Object)iterator.peekNextKey(), (Object)((KeyValue)iterator.next()).key);
            Assert.assertFalse((boolean)iterator.hasNext());
        }
    }

    @Test
    public void shouldNotThrowConcurrentModificationException() {
        long currentTime = 0L;
        this.windowStore.put((Object)1, (Object)"one", currentTime);
        this.windowStore.put((Object)1, (Object)"two", currentTime += 30L);
        try (KeyValueIterator iterator = this.windowStore.all();){
            this.windowStore.put((Object)1, (Object)"three", currentTime += 30L);
            this.windowStore.put((Object)2, (Object)"four", currentTime += 30L);
            Assert.assertEquals(AbstractWindowBytesStoreTest.windowedPair(1, "one", 0L), (Object)iterator.next());
            Assert.assertEquals(AbstractWindowBytesStoreTest.windowedPair(1, "two", 30L), (Object)iterator.next());
            Assert.assertEquals(AbstractWindowBytesStoreTest.windowedPair(1, "three", 60L), (Object)iterator.next());
            Assert.assertEquals(AbstractWindowBytesStoreTest.windowedPair(2, "four", 90L), (Object)iterator.next());
            Assert.assertFalse((boolean)iterator.hasNext());
        }
    }

    @Test
    public void testFetchDuplicates() {
        this.windowStore.close();
        this.windowStore = this.buildWindowStore(120000L, 3L, true, Serdes.Integer(), Serdes.String());
        this.windowStore.init((StateStoreContext)this.context, this.windowStore);
        long currentTime = 0L;
        this.windowStore.put((Object)1, (Object)"one", currentTime);
        this.windowStore.put((Object)1, (Object)"one-2", currentTime);
        this.windowStore.put((Object)1, (Object)"two", currentTime += 30L);
        this.windowStore.put((Object)1, (Object)"two-2", currentTime);
        this.windowStore.put((Object)1, (Object)"three", currentTime += 30L);
        this.windowStore.put((Object)1, (Object)"three-2", currentTime);
        try (WindowStoreIterator iterator = this.windowStore.fetch((Object)1, 0L, 30L);){
            Assert.assertEquals((Object)new KeyValue((Object)0L, (Object)"one"), (Object)iterator.next());
            Assert.assertEquals((Object)new KeyValue((Object)0L, (Object)"one-2"), (Object)iterator.next());
            Assert.assertEquals((Object)new KeyValue((Object)30L, (Object)"two"), (Object)iterator.next());
            Assert.assertEquals((Object)new KeyValue((Object)30L, (Object)"two-2"), (Object)iterator.next());
            Assert.assertFalse((boolean)iterator.hasNext());
        }
    }

    private void putFirstBatch(WindowStore<Integer, String> store, long startTime, InternalMockProcessorContext context) {
        context.setRecordContext(this.createRecordContext(startTime));
        store.put((Object)0, (Object)"zero", startTime);
        store.put((Object)1, (Object)"one", startTime + 1L);
        store.put((Object)2, (Object)"two", startTime + 2L);
        store.put((Object)3, (Object)"three", startTime + 2L);
        store.put((Object)4, (Object)"four", startTime + 4L);
        store.put((Object)5, (Object)"five", startTime + 5L);
    }

    private void putSecondBatch(WindowStore<Integer, String> store, long startTime, InternalMockProcessorContext context) {
        store.put((Object)2, (Object)"two+1", startTime + 3L);
        store.put((Object)2, (Object)"two+2", startTime + 4L);
        store.put((Object)2, (Object)"two+3", startTime + 5L);
        store.put((Object)2, (Object)"two+4", startTime + 6L);
        store.put((Object)2, (Object)"two+5", startTime + 7L);
        store.put((Object)2, (Object)"two+6", startTime + 8L);
    }

    long extractStoreTimestamp(byte[] binaryKey) {
        return WindowKeySchema.extractStoreTimestamp((byte[])binaryKey);
    }

    <K> K extractStoreKey(byte[] binaryKey, StateSerdes<K, ?> serdes) {
        return (K)WindowKeySchema.extractStoreKey((byte[])binaryKey, serdes);
    }

    private Map<Integer, Set<String>> entriesByKey(List<KeyValue<byte[], byte[]>> changeLog, long startTime) {
        HashMap<Integer, Set<String>> entriesByKey = new HashMap<Integer, Set<String>>();
        for (KeyValue<byte[], byte[]> entry : changeLog) {
            long timestamp = this.extractStoreTimestamp((byte[])entry.key);
            Integer key = this.extractStoreKey((byte[])entry.key, this.serdes);
            String value = entry.value == null ? null : (String)this.serdes.valueFrom((byte[])entry.value);
            Set entries = entriesByKey.computeIfAbsent(key, k -> new HashSet());
            entries.add(value + "@" + (timestamp - startTime));
        }
        return entriesByKey;
    }

    protected static <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp) {
        return AbstractWindowBytesStoreTest.windowedPair(key, value, timestamp, 3L);
    }

    private static <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp, long windowSize) {
        return KeyValue.pair((Object)new Windowed(key, (Window)WindowKeySchema.timeWindowForSize((long)timestamp, (long)windowSize)), value);
    }

    private ProcessorRecordContext createRecordContext(long time) {
        return new ProcessorRecordContext(time, 0L, 0, "topic", (Headers)new RecordHeaders());
    }
}

