/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.streams.state.internals.WindowKeySchema;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public abstract class WindowBytesStoreTest {
    static final long WINDOW_SIZE = 3L;
    static final long SEGMENT_INTERVAL = 60000L;
    static final long RETENTION_PERIOD = 120000L;
    WindowStore<Integer, String> windowStore;
    InternalMockProcessorContext context;
    final File baseDir = TestUtils.tempDirectory((String)"test");
    private final StateSerdes<Integer, String> serdes = new StateSerdes("", Serdes.Integer(), Serdes.String());
    final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<KeyValue<byte[], byte[]>>();
    private final Producer<byte[], byte[]> producer = new MockProducer(true, Serdes.ByteArray().serializer(), Serdes.ByteArray().serializer());

    abstract <K, V> WindowStore<K, V> buildWindowStore(long var1, long var3, boolean var5, Serde<K> var6, Serde<V> var7);

    abstract String getMetricsScope();

    abstract void setClassLoggerToDebug();

    private RecordCollectorImpl createRecordCollector(String name) {
        return new RecordCollectorImpl(name, new LogContext(name), (ProductionExceptionHandler)new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records")){

            public <K1, V1> void send(String topic, K1 key, V1 value, Headers headers, Integer partition, Long timestamp, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) {
                WindowBytesStoreTest.this.changeLog.add((KeyValue<byte[], byte[]>)new KeyValue((Object)keySerializer.serialize(topic, headers, key), (Object)valueSerializer.serialize(topic, headers, value)));
            }
        };
    }

    @Before
    public void setup() {
        this.windowStore = this.buildWindowStore(120000L, 3L, false, Serdes.Integer(), Serdes.String());
        RecordCollectorImpl recordCollector = this.createRecordCollector(this.windowStore.name());
        recordCollector.init(this.producer);
        this.context = new InternalMockProcessorContext(this.baseDir, Serdes.String(), Serdes.Integer(), (RecordCollector)recordCollector, new ThreadCache(new LogContext("testCache"), 0L, (StreamsMetricsImpl)new MockStreamsMetrics(new Metrics())));
        this.windowStore.init((ProcessorContext)this.context, this.windowStore);
    }

    @After
    public void after() {
        this.windowStore.close();
    }

    @Test
    public void testRangeAndSinglePointFetch() {
        long startTime = 59996L;
        this.putFirstBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals((Object)"zero", (Object)this.windowStore.fetch((Object)0, 59996L));
        Assert.assertEquals((Object)"one", (Object)this.windowStore.fetch((Object)1, 59997L));
        Assert.assertEquals((Object)"two", (Object)this.windowStore.fetch((Object)2, 59998L));
        Assert.assertEquals((Object)"four", (Object)this.windowStore.fetch((Object)4, 60000L));
        Assert.assertEquals((Object)"five", (Object)this.windowStore.fetch((Object)5, 60001L));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("zero")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        this.putSecondBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals((Object)"two+1", (Object)this.windowStore.fetch((Object)2, 59999L));
        Assert.assertEquals((Object)"two+2", (Object)this.windowStore.fetch((Object)2, 60000L));
        Assert.assertEquals((Object)"two+3", (Object)this.windowStore.fetch((Object)2, 60001L));
        Assert.assertEquals((Object)"two+4", (Object)this.windowStore.fetch((Object)2, 60002L));
        Assert.assertEquals((Object)"two+5", (Object)this.windowStore.fetch((Object)2, 60003L));
        Assert.assertEquals((Object)"two+6", (Object)this.windowStore.fetch((Object)2, 60004L));
        Assert.assertEquals(new HashSet(Collections.emptyList()), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59991L), Instant.ofEpochMilli(59997L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("two")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59992L), Instant.ofEpochMilli(59998L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1", "two+2")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59994L), Instant.ofEpochMilli(60000L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1", "two+2", "two+3")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59995L), Instant.ofEpochMilli(60001L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1", "two+2", "two+3", "two+4")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59996L), Instant.ofEpochMilli(60002L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1", "two+2", "two+3", "two+4", "two+5")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60003L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1", "two+2", "two+3", "two+4", "two+5", "two+6")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(60004L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+1", "two+2", "two+3", "two+4", "two+5", "two+6")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59999L), Instant.ofEpochMilli(60005L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+2", "two+3", "two+4", "two+5", "two+6")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60000L), Instant.ofEpochMilli(60006L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+3", "two+4", "two+5", "two+6")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60001L), Instant.ofEpochMilli(60007L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+4", "two+5", "two+6")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60002L), Instant.ofEpochMilli(60008L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+5", "two+6")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60003L), Instant.ofEpochMilli(60009L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("two+6")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60004L), Instant.ofEpochMilli(60010L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60005L), Instant.ofEpochMilli(60011L))));
        this.windowStore.flush();
        Map<Integer, Set<String>> entriesByKey = this.entriesByKey(this.changeLog, 59996L);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"zero@0"}), entriesByKey.get(0));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"one@1"}), entriesByKey.get(1));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"}), entriesByKey.get(2));
        Assert.assertNull(entriesByKey.get(3));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"four@4"}), entriesByKey.get(4));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"five@5"}), entriesByKey.get(5));
        Assert.assertNull(entriesByKey.get(6));
    }

    @Test
    public void shouldGetAll() {
        long startTime = 59996L;
        this.putFirstBatch(this.windowStore, 59996L, this.context);
        KeyValue<Windowed<Integer>, String> zero = WindowBytesStoreTest.windowedPair(0, "zero", 59996L);
        KeyValue<Windowed<Integer>, String> one = WindowBytesStoreTest.windowedPair(1, "one", 59997L);
        KeyValue<Windowed<Integer>, String> two = WindowBytesStoreTest.windowedPair(2, "two", 59998L);
        KeyValue<Windowed<Integer>, String> four = WindowBytesStoreTest.windowedPair(4, "four", 60000L);
        KeyValue<Windowed<Integer>, String> five = WindowBytesStoreTest.windowedPair(5, "five", 60001L);
        Assert.assertEquals(new HashSet<KeyValue>(Arrays.asList(zero, one, two, four, five)), WindowBytesStoreTest.toSet(this.windowStore.all()));
    }

    @Test
    public void shouldFetchAllInTimeRange() {
        long startTime = 59996L;
        this.putFirstBatch(this.windowStore, 59996L, this.context);
        KeyValue<Windowed<Integer>, String> zero = WindowBytesStoreTest.windowedPair(0, "zero", 59996L);
        KeyValue<Windowed<Integer>, String> one = WindowBytesStoreTest.windowedPair(1, "one", 59997L);
        KeyValue<Windowed<Integer>, String> two = WindowBytesStoreTest.windowedPair(2, "two", 59998L);
        KeyValue<Windowed<Integer>, String> four = WindowBytesStoreTest.windowedPair(4, "four", 60000L);
        KeyValue<Windowed<Integer>, String> five = WindowBytesStoreTest.windowedPair(5, "five", 60001L);
        Assert.assertEquals(new HashSet<KeyValue>(Arrays.asList(one, two, four)), WindowBytesStoreTest.toSet(this.windowStore.fetchAll(Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60000L))));
        Assert.assertEquals(new HashSet<KeyValue>(Arrays.asList(zero, one, two)), WindowBytesStoreTest.toSet(this.windowStore.fetchAll(Instant.ofEpochMilli(59996L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(new HashSet<KeyValue>(Arrays.asList(one, two, four, five)), WindowBytesStoreTest.toSet(this.windowStore.fetchAll(Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60001L))));
    }

    @Test
    public void testFetchRange() {
        long startTime = 59996L;
        this.putFirstBatch(this.windowStore, 59996L, this.context);
        KeyValue<Windowed<Integer>, String> zero = WindowBytesStoreTest.windowedPair(0, "zero", 59996L);
        KeyValue<Windowed<Integer>, String> one = WindowBytesStoreTest.windowedPair(1, "one", 59997L);
        KeyValue<Windowed<Integer>, String> two = WindowBytesStoreTest.windowedPair(2, "two", 59998L);
        KeyValue<Windowed<Integer>, String> four = WindowBytesStoreTest.windowedPair(4, "four", 60000L);
        KeyValue<Windowed<Integer>, String> five = WindowBytesStoreTest.windowedPair(5, "five", 60001L);
        Assert.assertEquals(new HashSet<KeyValue>(Arrays.asList(zero, one)), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)0, (Object)1, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(new HashSet<KeyValue<Windowed<Integer>, String>>(Collections.singletonList(one)), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)1, (Object)1, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(new HashSet<KeyValue>(Arrays.asList(one, two)), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)1, (Object)3, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(new HashSet<KeyValue>(Arrays.asList(zero, one, two)), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)0, (Object)5, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(new HashSet<KeyValue>(Arrays.asList(zero, one, two, four, five)), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)0, (Object)5, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(60004L))));
        Assert.assertEquals(new HashSet<KeyValue>(Arrays.asList(two, four, five)), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)0, (Object)5, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(60004L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)4, (Object)5, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)0, (Object)3, Instant.ofEpochMilli(59999L), Instant.ofEpochMilli(60004L))));
    }

    @Test
    public void testPutAndFetchBefore() {
        long startTime = 59996L;
        this.putFirstBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("zero")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59996L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("one")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)1, Instant.ofEpochMilli(59994L), Instant.ofEpochMilli(59997L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("two")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59995L), Instant.ofEpochMilli(59998L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)3, Instant.ofEpochMilli(59996L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("four")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)4, Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60000L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("five")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)5, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(60001L))));
        this.putSecondBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals(new HashSet(Collections.emptyList()), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59992L), Instant.ofEpochMilli(59995L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59996L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59994L), Instant.ofEpochMilli(59997L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("two")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59995L), Instant.ofEpochMilli(59998L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59996L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1", "two+2")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60000L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1", "two+2", "two+3")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(60001L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+1", "two+2", "two+3", "two+4")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59999L), Instant.ofEpochMilli(60002L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+2", "two+3", "two+4", "two+5")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60000L), Instant.ofEpochMilli(60003L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+3", "two+4", "two+5", "two+6")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60001L), Instant.ofEpochMilli(60004L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+4", "two+5", "two+6")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60002L), Instant.ofEpochMilli(60005L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+5", "two+6")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60003L), Instant.ofEpochMilli(60006L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("two+6")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60004L), Instant.ofEpochMilli(60007L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60005L), Instant.ofEpochMilli(60008L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60006L), Instant.ofEpochMilli(60009L))));
        this.windowStore.flush();
        Map<Integer, Set<String>> entriesByKey = this.entriesByKey(this.changeLog, 59996L);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"zero@0"}), entriesByKey.get(0));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"one@1"}), entriesByKey.get(1));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"}), entriesByKey.get(2));
        Assert.assertNull(entriesByKey.get(3));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"four@4"}), entriesByKey.get(4));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"five@5"}), entriesByKey.get(5));
        Assert.assertNull(entriesByKey.get(6));
    }

    @Test
    public void testPutAndFetchAfter() {
        long startTime = 59996L;
        this.putFirstBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("zero")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59996L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("one")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)1, Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60000L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("two")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(60001L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)3, Instant.ofEpochMilli(59999L), Instant.ofEpochMilli(60002L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("four")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)4, Instant.ofEpochMilli(60000L), Instant.ofEpochMilli(60003L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("five")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)5, Instant.ofEpochMilli(60001L), Instant.ofEpochMilli(60004L))));
        this.putSecondBatch(this.windowStore, 59996L, this.context);
        Assert.assertEquals(new HashSet(Collections.emptyList()), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59994L), Instant.ofEpochMilli(59997L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("two")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59995L), Instant.ofEpochMilli(59998L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59996L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1", "two+2")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60000L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two", "two+1", "two+2", "two+3")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59998L), Instant.ofEpochMilli(60001L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+1", "two+2", "two+3", "two+4")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(59999L), Instant.ofEpochMilli(60002L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+2", "two+3", "two+4", "two+5")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60000L), Instant.ofEpochMilli(60003L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+3", "two+4", "two+5", "two+6")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60001L), Instant.ofEpochMilli(60004L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+4", "two+5", "two+6")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60002L), Instant.ofEpochMilli(60005L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("two+5", "two+6")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60003L), Instant.ofEpochMilli(60006L))));
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("two+6")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60004L), Instant.ofEpochMilli(60007L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60005L), Instant.ofEpochMilli(60008L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60006L), Instant.ofEpochMilli(60009L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60007L), Instant.ofEpochMilli(60010L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)2, Instant.ofEpochMilli(60008L), Instant.ofEpochMilli(60011L))));
        this.windowStore.flush();
        Map<Integer, Set<String>> entriesByKey = this.entriesByKey(this.changeLog, 59996L);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"zero@0"}), entriesByKey.get(0));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"one@1"}), entriesByKey.get(1));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"two@2", "two+1@3", "two+2@4", "two+3@5", "two+4@6", "two+5@7", "two+6@8"}), entriesByKey.get(2));
        Assert.assertNull(entriesByKey.get(3));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"four@4"}), entriesByKey.get(4));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"five@5"}), entriesByKey.get(5));
        Assert.assertNull(entriesByKey.get(6));
    }

    @Test
    public void testPutSameKeyTimestamp() {
        this.windowStore = this.buildWindowStore(120000L, 3L, true, Serdes.Integer(), Serdes.String());
        this.windowStore.init((ProcessorContext)this.context, this.windowStore);
        long startTime = 59996L;
        this.setCurrentTime(59996L);
        this.windowStore.put((Object)0, (Object)"zero");
        Assert.assertEquals(new HashSet<String>(Collections.singletonList("zero")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        this.windowStore.put((Object)0, (Object)"zero");
        this.windowStore.put((Object)0, (Object)"zero+");
        this.windowStore.put((Object)0, (Object)"zero++");
        Assert.assertEquals(new HashSet<String>(Arrays.asList("zero", "zero", "zero+", "zero++")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59993L), Instant.ofEpochMilli(59999L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("zero", "zero", "zero+", "zero++")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59994L), Instant.ofEpochMilli(60000L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("zero", "zero", "zero+", "zero++")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59995L), Instant.ofEpochMilli(60001L))));
        Assert.assertEquals(new HashSet<String>(Arrays.asList("zero", "zero", "zero+", "zero++")), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59996L), Instant.ofEpochMilli(60002L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), WindowBytesStoreTest.toSet(this.windowStore.fetch((Object)0, Instant.ofEpochMilli(59997L), Instant.ofEpochMilli(60003L))));
        this.windowStore.flush();
        Map<Integer, Set<String>> entriesByKey = this.entriesByKey(this.changeLog, 59996L);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"zero@0", "zero@0", "zero+@0", "zero++@0"}), entriesByKey.get(0));
    }

    @Test
    public void shouldCloseOpenIteratorsWhenStoreIsClosedAndNotThrowInvalidStateStoreExceptionOnHasNext() {
        this.setCurrentTime(0L);
        this.windowStore.put((Object)1, (Object)"one", 1L);
        this.windowStore.put((Object)1, (Object)"two", 2L);
        this.windowStore.put((Object)1, (Object)"three", 3L);
        WindowStoreIterator iterator = this.windowStore.fetch((Object)1, Instant.ofEpochMilli(1L), Instant.ofEpochMilli(3L));
        Assert.assertTrue((boolean)iterator.hasNext());
        this.windowStore.close();
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void shouldFetchAndIterateOverExactKeys() {
        long windowSize = 0x7A00000000000000L;
        long retentionPeriod = 0x7A00000000000000L;
        WindowStore windowStore = this.buildWindowStore(0x7A00000000000000L, 0x7A00000000000000L, false, Serdes.String(), Serdes.String());
        windowStore.init((ProcessorContext)this.context, windowStore);
        windowStore.put((Object)"a", (Object)"0001", 0L);
        windowStore.put((Object)"aa", (Object)"0002", 0L);
        windowStore.put((Object)"a", (Object)"0003", 1L);
        windowStore.put((Object)"aa", (Object)"0004", 1L);
        windowStore.put((Object)"a", (Object)"0005", 0x79FFFFFFFFFFFFFFL);
        HashSet<String> expected = new HashSet<String>(Arrays.asList("0001", "0003", "0005"));
        MatcherAssert.assertThat(WindowBytesStoreTest.toSet(windowStore.fetch((Object)"a", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))), (Matcher)CoreMatchers.equalTo(expected));
        Set set = WindowBytesStoreTest.toSet(windowStore.fetch((Object)"a", (Object)"a", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE)));
        MatcherAssert.assertThat(set, (Matcher)CoreMatchers.equalTo(new HashSet<KeyValue>(Arrays.asList(WindowBytesStoreTest.windowedPair("a", "0001", 0L, 0x7A00000000000000L), WindowBytesStoreTest.windowedPair("a", "0003", 1L, 0x7A00000000000000L), WindowBytesStoreTest.windowedPair("a", "0005", 0x79FFFFFFFFFFFFFFL, 0x7A00000000000000L)))));
        set = WindowBytesStoreTest.toSet(windowStore.fetch((Object)"aa", (Object)"aa", Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE)));
        MatcherAssert.assertThat(set, (Matcher)CoreMatchers.equalTo(new HashSet<KeyValue>(Arrays.asList(WindowBytesStoreTest.windowedPair("aa", "0002", 0L, 0x7A00000000000000L), WindowBytesStoreTest.windowedPair("aa", "0004", 1L, 0x7A00000000000000L)))));
    }

    @Test
    public void testDeleteAndUpdate() {
        long currentTime = 0L;
        this.setCurrentTime(0L);
        this.windowStore.put((Object)1, (Object)"one");
        this.windowStore.put((Object)1, (Object)"one v2");
        WindowStoreIterator iterator = this.windowStore.fetch((Object)1, 0L, 0L);
        Assert.assertEquals((Object)new KeyValue((Object)0L, (Object)"one v2"), (Object)iterator.next());
        this.windowStore.put((Object)1, null);
        iterator = this.windowStore.fetch((Object)1, 0L, 0L);
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void shouldReturnNullOnWindowNotFound() {
        Assert.assertNull((Object)this.windowStore.fetch((Object)1, 0L));
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnPutNullKey() {
        this.windowStore.put(null, (Object)"anyValue");
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnGetNullKey() {
        this.windowStore.fetch(null, Instant.ofEpochMilli(1L), Instant.ofEpochMilli(2L));
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnRangeNullFromKey() {
        this.windowStore.fetch(null, (Object)2, Instant.ofEpochMilli(1L), Instant.ofEpochMilli(2L));
    }

    @Test(expected=NullPointerException.class)
    public void shouldThrowNullPointerExceptionOnRangeNullToKey() {
        this.windowStore.fetch((Object)1, null, Instant.ofEpochMilli(1L), Instant.ofEpochMilli(2L));
    }

    @Test
    public void shouldFetchAndIterateOverExactBinaryKeys() {
        WindowStore windowStore = this.buildWindowStore(120000L, 3L, true, Serdes.Bytes(), Serdes.String());
        windowStore.init((ProcessorContext)this.context, windowStore);
        Bytes key1 = Bytes.wrap((byte[])new byte[]{0});
        Bytes key2 = Bytes.wrap((byte[])new byte[]{0, 0});
        Bytes key3 = Bytes.wrap((byte[])new byte[]{0, 0, 0});
        windowStore.put((Object)key1, (Object)"1", 0L);
        windowStore.put((Object)key2, (Object)"2", 0L);
        windowStore.put((Object)key3, (Object)"3", 0L);
        windowStore.put((Object)key1, (Object)"4", 1L);
        windowStore.put((Object)key2, (Object)"5", 1L);
        windowStore.put((Object)key3, (Object)"6", 59999L);
        windowStore.put((Object)key1, (Object)"7", 59999L);
        windowStore.put((Object)key2, (Object)"8", 59999L);
        windowStore.put((Object)key3, (Object)"9", 59999L);
        HashSet<String> expectedKey1 = new HashSet<String>(Arrays.asList("1", "4", "7"));
        MatcherAssert.assertThat(WindowBytesStoreTest.toSet(windowStore.fetch((Object)key1, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))), (Matcher)CoreMatchers.equalTo(expectedKey1));
        HashSet<String> expectedKey2 = new HashSet<String>(Arrays.asList("2", "5", "8"));
        MatcherAssert.assertThat(WindowBytesStoreTest.toSet(windowStore.fetch((Object)key2, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))), (Matcher)CoreMatchers.equalTo(expectedKey2));
        HashSet<String> expectedKey3 = new HashSet<String>(Arrays.asList("3", "6", "9"));
        MatcherAssert.assertThat(WindowBytesStoreTest.toSet(windowStore.fetch((Object)key3, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(Long.MAX_VALUE))), (Matcher)CoreMatchers.equalTo(expectedKey3));
    }

    @Test
    public void shouldReturnSameResultsForSingleKeyFetchAndEqualKeyRangeFetch() {
        this.windowStore.put((Object)1, (Object)"one", 0L);
        this.windowStore.put((Object)2, (Object)"two", 1L);
        this.windowStore.put((Object)2, (Object)"two", 2L);
        this.windowStore.put((Object)3, (Object)"three", 3L);
        WindowStoreIterator singleKeyIterator = this.windowStore.fetch((Object)2, 0L, 5L);
        KeyValueIterator keyRangeIterator = this.windowStore.fetch((Object)2, (Object)2, 0L, 5L);
        Assert.assertEquals((Object)((KeyValue)singleKeyIterator.next()).value, (Object)((KeyValue)keyRangeIterator.next()).value);
        Assert.assertEquals((Object)((KeyValue)singleKeyIterator.next()).value, (Object)((KeyValue)keyRangeIterator.next()).value);
        Assert.assertFalse((boolean)singleKeyIterator.hasNext());
        Assert.assertFalse((boolean)keyRangeIterator.hasNext());
    }

    @Test
    public void shouldNotThrowInvalidRangeExceptionWithNegativeFromKey() {
        this.setClassLoggerToDebug();
        LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
        KeyValueIterator iterator = this.windowStore.fetch((Object)-1, (Object)1, 0L, 10L);
        Assert.assertFalse((boolean)iterator.hasNext());
        List<String> messages = appender.getMessages();
        MatcherAssert.assertThat(messages, (Matcher)CoreMatchers.hasItem((Object)"Returning empty iterator for fetch with invalid key range: from > to. This may be due to serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers"));
    }

    @Test
    public void shouldLogAndMeasureExpiredRecords() {
        this.setClassLoggerToDebug();
        LogCaptureAppender appender = LogCaptureAppender.createAndRegister();
        this.windowStore.put((Object)1, (Object)"initial record", 240000L);
        this.windowStore.put((Object)1, (Object)"late record", 0L);
        this.windowStore.put((Object)1, (Object)"another on-time record", 120001L);
        LogCaptureAppender.unregister(appender);
        Map metrics = this.context.metrics().metrics();
        String metricScope = this.getMetricsScope();
        String threadId = Thread.currentThread().getName();
        Metric dropTotal = (Metric)metrics.get(new MetricName("expired-window-record-drop-total", "stream-" + metricScope + "-metrics", "The total number of occurrence of expired-window-record-drop operations.", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"client-id", (Object)threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0"), Utils.mkEntry((Object)(metricScope + "-id"), (Object)this.windowStore.name())})));
        Metric dropRate = (Metric)metrics.get(new MetricName("expired-window-record-drop-rate", "stream-" + metricScope + "-metrics", "The average number of occurrence of expired-window-record-drop operation per second.", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"client-id", (Object)threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0"), Utils.mkEntry((Object)(metricScope + "-id"), (Object)this.windowStore.name())})));
        Assert.assertEquals((Object)1.0, (Object)dropTotal.metricValue());
        Assert.assertNotEquals((Object)0.0, (Object)dropRate.metricValue());
        List<String> messages = appender.getMessages();
        MatcherAssert.assertThat(messages, (Matcher)CoreMatchers.hasItem((Object)"Skipping record for expired segment."));
    }

    @Test
    public void shouldNotThrowExceptionWhenFetchRangeIsExpired() {
        this.windowStore.put((Object)1, (Object)"one", 0L);
        this.windowStore.put((Object)1, (Object)"two", 480000L);
        WindowStoreIterator iterator = this.windowStore.fetch((Object)1, 0L, 10L);
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void testWindowIteratorPeek() {
        long currentTime = 0L;
        this.setCurrentTime(0L);
        this.windowStore.put((Object)1, (Object)"one");
        KeyValueIterator iterator = this.windowStore.fetchAll(0L, 0L);
        Assert.assertTrue((boolean)iterator.hasNext());
        Windowed nextKey = (Windowed)iterator.peekNextKey();
        Assert.assertEquals((Object)iterator.peekNextKey(), (Object)nextKey);
        Assert.assertEquals((Object)iterator.peekNextKey(), (Object)((KeyValue)iterator.next()).key);
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void testValueIteratorPeek() {
        this.windowStore.put((Object)1, (Object)"one", 0L);
        WindowStoreIterator iterator = this.windowStore.fetch((Object)1, 0L, 10L);
        Assert.assertTrue((boolean)iterator.hasNext());
        Long nextKey = (Long)iterator.peekNextKey();
        Assert.assertEquals((Object)iterator.peekNextKey(), (Object)nextKey);
        Assert.assertEquals((Object)iterator.peekNextKey(), (Object)((KeyValue)iterator.next()).key);
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void shouldNotThrowConcurrentModificationException() {
        long currentTime = 0L;
        this.setCurrentTime(currentTime);
        this.windowStore.put((Object)1, (Object)"one");
        this.setCurrentTime(currentTime += 30L);
        this.windowStore.put((Object)1, (Object)"two");
        KeyValueIterator iterator = this.windowStore.all();
        this.setCurrentTime(currentTime += 30L);
        this.windowStore.put((Object)1, (Object)"three");
        this.setCurrentTime(currentTime += 30L);
        this.windowStore.put((Object)2, (Object)"four");
        Assert.assertEquals(WindowBytesStoreTest.windowedPair(1, "one", 0L), (Object)iterator.next());
        Assert.assertEquals(WindowBytesStoreTest.windowedPair(1, "two", 30L), (Object)iterator.next());
        Assert.assertEquals(WindowBytesStoreTest.windowedPair(1, "three", 60L), (Object)iterator.next());
        Assert.assertEquals(WindowBytesStoreTest.windowedPair(2, "four", 90L), (Object)iterator.next());
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void testFetchDuplicates() {
        this.windowStore = this.buildWindowStore(120000L, 3L, true, Serdes.Integer(), Serdes.String());
        this.windowStore.init((ProcessorContext)this.context, this.windowStore);
        long currentTime = 0L;
        this.setCurrentTime(currentTime);
        this.windowStore.put((Object)1, (Object)"one");
        this.windowStore.put((Object)1, (Object)"one-2");
        this.setCurrentTime(currentTime += 30L);
        this.windowStore.put((Object)1, (Object)"two");
        this.windowStore.put((Object)1, (Object)"two-2");
        this.setCurrentTime(currentTime += 30L);
        this.windowStore.put((Object)1, (Object)"three");
        this.windowStore.put((Object)1, (Object)"three-2");
        WindowStoreIterator iterator = this.windowStore.fetch((Object)1, 0L, 30L);
        Assert.assertEquals((Object)new KeyValue((Object)0L, (Object)"one"), (Object)iterator.next());
        Assert.assertEquals((Object)new KeyValue((Object)0L, (Object)"one-2"), (Object)iterator.next());
        Assert.assertEquals((Object)new KeyValue((Object)30L, (Object)"two"), (Object)iterator.next());
        Assert.assertEquals((Object)new KeyValue((Object)30L, (Object)"two-2"), (Object)iterator.next());
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    private void putFirstBatch(WindowStore<Integer, String> store, long startTime, InternalMockProcessorContext context) {
        context.setRecordContext(this.createRecordContext(startTime));
        store.put((Object)0, (Object)"zero");
        context.setRecordContext(this.createRecordContext(startTime + 1L));
        store.put((Object)1, (Object)"one");
        context.setRecordContext(this.createRecordContext(startTime + 2L));
        store.put((Object)2, (Object)"two");
        context.setRecordContext(this.createRecordContext(startTime + 4L));
        store.put((Object)4, (Object)"four");
        context.setRecordContext(this.createRecordContext(startTime + 5L));
        store.put((Object)5, (Object)"five");
    }

    private void putSecondBatch(WindowStore<Integer, String> store, long startTime, InternalMockProcessorContext context) {
        context.setRecordContext(this.createRecordContext(startTime + 3L));
        store.put((Object)2, (Object)"two+1");
        context.setRecordContext(this.createRecordContext(startTime + 4L));
        store.put((Object)2, (Object)"two+2");
        context.setRecordContext(this.createRecordContext(startTime + 5L));
        store.put((Object)2, (Object)"two+3");
        context.setRecordContext(this.createRecordContext(startTime + 6L));
        store.put((Object)2, (Object)"two+4");
        context.setRecordContext(this.createRecordContext(startTime + 7L));
        store.put((Object)2, (Object)"two+5");
        context.setRecordContext(this.createRecordContext(startTime + 8L));
        store.put((Object)2, (Object)"two+6");
    }

    protected static <E> Set<E> toSet(WindowStoreIterator<E> iterator) {
        HashSet<Object> set = new HashSet<Object>();
        while (iterator.hasNext()) {
            set.add(((KeyValue)iterator.next()).value);
        }
        return set;
    }

    protected static <K, V> Set<KeyValue<K, V>> toSet(Iterator<KeyValue<K, V>> iterator) {
        HashSet<KeyValue<K, V>> results = new HashSet<KeyValue<K, V>>();
        while (iterator.hasNext()) {
            results.add(iterator.next());
        }
        return results;
    }

    private Map<Integer, Set<String>> entriesByKey(List<KeyValue<byte[], byte[]>> changeLog, long startTime) {
        HashMap<Integer, Set<String>> entriesByKey = new HashMap<Integer, Set<String>>();
        for (KeyValue<byte[], byte[]> entry : changeLog) {
            long timestamp = WindowKeySchema.extractStoreTimestamp((byte[])((byte[])entry.key));
            Integer key = (Integer)WindowKeySchema.extractStoreKey((byte[])((byte[])entry.key), this.serdes);
            String value = entry.value == null ? null : (String)this.serdes.valueFrom((byte[])entry.value);
            Set entries = entriesByKey.computeIfAbsent(key, k -> new HashSet());
            entries.add(value + "@" + (timestamp - startTime));
        }
        return entriesByKey;
    }

    protected static <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp) {
        return WindowBytesStoreTest.windowedPair(key, value, timestamp, 3L);
    }

    private static <K, V> KeyValue<Windowed<K>, V> windowedPair(K key, V value, long timestamp, long windowSize) {
        return KeyValue.pair((Object)new Windowed(key, (Window)WindowKeySchema.timeWindowForSize((long)timestamp, (long)windowSize)), value);
    }

    protected void setCurrentTime(long currentTime) {
        this.context.setRecordContext(this.createRecordContext(currentTime));
    }

    private ProcessorRecordContext createRecordContext(long time) {
        return new ProcessorRecordContext(time, 0L, 0, "topic", null);
    }
}

