/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.To;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.ToInternal;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KStreamSessionWindowAggregateProcessorTest {
    private static final long GAP_MS = 300000L;
    private static final String STORE_NAME = "session-store";
    private final String threadId = Thread.currentThread().getName();
    private final ToInternal toInternal = new ToInternal();
    private final Initializer<Long> initializer = () -> 0L;
    private final Aggregator<String, String, Long> aggregator = (aggKey, value, aggregate) -> aggregate + 1L;
    private final Merger<String, Long> sessionMerger = (aggKey, aggOne, aggTwo) -> aggOne + aggTwo;
    private final KStreamSessionWindowAggregate<String, String, Long> sessionAggregator = new KStreamSessionWindowAggregate(SessionWindows.with((Duration)Duration.ofMillis(300000L)), "session-store", this.initializer, this.aggregator, this.sessionMerger);
    private final List<KeyValueTimestamp<Windowed<String>, Change<Long>>> results = new ArrayList<KeyValueTimestamp<Windowed<String>, Change<Long>>>();
    private final Processor<String, String> processor = this.sessionAggregator.get();
    private SessionStore<String, Long> sessionStore;
    private InternalMockProcessorContext context;
    private Metrics metrics;

    @Before
    public void initializeStore() {
        File stateDir = TestUtils.tempDirectory();
        this.metrics = new Metrics();
        MockStreamsMetrics metrics = new MockStreamsMetrics(this.metrics);
        this.context = new InternalMockProcessorContext(stateDir, Serdes.String(), Serdes.String(), metrics, new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig()), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 100000L, (StreamsMetricsImpl)metrics)){

            @Override
            public void forward(Object key, Object value, To to) {
                KStreamSessionWindowAggregateProcessorTest.this.toInternal.update(to);
                KStreamSessionWindowAggregateProcessorTest.this.results.add(new KeyValueTimestamp<Windowed, Change>((Windowed)key, (Change)value, KStreamSessionWindowAggregateProcessorTest.this.toInternal.timestamp()));
            }
        };
        this.initStore(true);
        this.processor.init((ProcessorContext)this.context);
    }

    private void initStore(boolean enableCaching) {
        StoreBuilder storeBuilder = Stores.sessionStoreBuilder((SessionBytesStoreSupplier)Stores.persistentSessionStore((String)STORE_NAME, (Duration)Duration.ofMillis(900000L)), (Serde)Serdes.String(), (Serde)Serdes.Long()).withLoggingDisabled();
        if (enableCaching) {
            storeBuilder.withCachingEnabled();
        }
        this.sessionStore = (SessionStore)storeBuilder.build();
        this.sessionStore.init((ProcessorContext)this.context, this.sessionStore);
    }

    @After
    public void closeStore() {
        this.sessionStore.close();
    }

    @Test
    public void shouldCreateSingleSessionWhenWithinGap() {
        this.context.setTime(0L);
        this.processor.process((Object)"john", (Object)"first");
        this.context.setTime(500L);
        this.processor.process((Object)"john", (Object)"second");
        KeyValueIterator values = this.sessionStore.findSessions((Object)"john", 0L, 2000L);
        Assert.assertTrue((boolean)values.hasNext());
        Assert.assertEquals((Object)2L, (Object)((KeyValue)values.next()).value);
    }

    @Test
    public void shouldMergeSessions() {
        this.context.setTime(0L);
        String sessionId = "mel";
        this.processor.process((Object)"mel", (Object)"first");
        Assert.assertTrue((boolean)this.sessionStore.findSessions((Object)"mel", 0L, 0L).hasNext());
        this.context.setTime(300001L);
        this.processor.process((Object)"mel", (Object)"second");
        Assert.assertTrue((boolean)this.sessionStore.findSessions((Object)"mel", 300001L, 300001L).hasNext());
        Assert.assertTrue((boolean)this.sessionStore.findSessions((Object)"mel", 0L, 0L).hasNext());
        this.context.setTime(150000L);
        this.processor.process((Object)"mel", (Object)"third");
        KeyValueIterator iterator = this.sessionStore.findSessions((Object)"mel", 0L, 300001L);
        KeyValue kv = (KeyValue)iterator.next();
        Assert.assertEquals((Object)3L, (Object)kv.value);
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void shouldUpdateSessionIfTheSameTime() {
        this.context.setTime(0L);
        this.processor.process((Object)"mel", (Object)"first");
        this.processor.process((Object)"mel", (Object)"second");
        KeyValueIterator iterator = this.sessionStore.findSessions((Object)"mel", 0L, 0L);
        Assert.assertEquals((Object)2L, (Object)((KeyValue)iterator.next()).value);
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap() {
        String sessionId = "mel";
        long time = 0L;
        this.context.setTime(time);
        this.processor.process((Object)"mel", (Object)"first");
        this.context.setTime(time += 300001L);
        this.processor.process((Object)"mel", (Object)"second");
        this.processor.process((Object)"mel", (Object)"second");
        this.context.setTime(time += 300001L);
        this.processor.process((Object)"mel", (Object)"third");
        this.processor.process((Object)"mel", (Object)"third");
        this.processor.process((Object)"mel", (Object)"third");
        this.sessionStore.flush();
        Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"mel", (Window)new SessionWindow(0L, 0L)), new Change((Object)1L, null), 0L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"mel", (Window)new SessionWindow(300001L, 300001L)), new Change((Object)2L, null), 300001L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"mel", (Window)new SessionWindow(time, time)), new Change((Object)3L, null), time)), this.results);
    }

    @Test
    public void shouldRemoveMergedSessionsFromStateStore() {
        this.context.setTime(0L);
        this.processor.process((Object)"a", (Object)"1");
        KeyValueIterator a1 = this.sessionStore.findSessions((Object)"a", 0L, 0L);
        Assert.assertEquals((Object)KeyValue.pair((Object)new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), (Object)1L), (Object)a1.next());
        this.context.setTime(100L);
        this.processor.process((Object)"a", (Object)"2");
        KeyValueIterator a2 = this.sessionStore.findSessions((Object)"a", 0L, 100L);
        Assert.assertEquals((Object)KeyValue.pair((Object)new Windowed((Object)"a", (Window)new SessionWindow(0L, 100L)), (Object)2L), (Object)a2.next());
        Assert.assertFalse((boolean)a2.hasNext());
    }

    @Test
    public void shouldHandleMultipleSessionsAndMerging() {
        this.context.setTime(0L);
        this.processor.process((Object)"a", (Object)"1");
        this.processor.process((Object)"b", (Object)"1");
        this.processor.process((Object)"c", (Object)"1");
        this.processor.process((Object)"d", (Object)"1");
        this.context.setTime(150000L);
        this.processor.process((Object)"d", (Object)"2");
        this.context.setTime(300001L);
        this.processor.process((Object)"a", (Object)"2");
        this.processor.process((Object)"b", (Object)"2");
        this.context.setTime(450001L);
        this.processor.process((Object)"a", (Object)"3");
        this.processor.process((Object)"c", (Object)"3");
        this.sessionStore.flush();
        Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), new Change((Object)1L, null), 0L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"b", (Window)new SessionWindow(0L, 0L)), new Change((Object)1L, null), 0L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"c", (Window)new SessionWindow(0L, 0L)), new Change((Object)1L, null), 0L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"d", (Window)new SessionWindow(0L, 150000L)), new Change((Object)2L, null), 150000L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"b", (Window)new SessionWindow(300001L, 300001L)), new Change((Object)1L, null), 300001L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"a", (Window)new SessionWindow(300001L, 450001L)), new Change((Object)2L, null), 450001L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"c", (Window)new SessionWindow(450001L, 450001L)), new Change((Object)1L, null), 450001L)), this.results);
    }

    @Test
    public void shouldGetAggregatedValuesFromValueGetter() {
        KTableValueGetter getter = this.sessionAggregator.view().get();
        getter.init((ProcessorContext)this.context);
        this.context.setTime(0L);
        this.processor.process((Object)"a", (Object)"1");
        this.context.setTime(300001L);
        this.processor.process((Object)"a", (Object)"1");
        this.processor.process((Object)"a", (Object)"2");
        long t0 = (Long)getter.get((Object)new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L))).value();
        long t1 = (Long)getter.get((Object)new Windowed((Object)"a", (Window)new SessionWindow(300001L, 300001L))).value();
        Assert.assertEquals((long)1L, (long)t0);
        Assert.assertEquals((long)2L, (long)t1);
    }

    @Test
    public void shouldImmediatelyForwardNewSessionWhenNonCachedStore() {
        this.initStore(false);
        this.processor.init((ProcessorContext)this.context);
        this.context.setTime(0L);
        this.processor.process((Object)"a", (Object)"1");
        this.processor.process((Object)"b", (Object)"1");
        this.processor.process((Object)"c", (Object)"1");
        Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), new Change((Object)1L, null), 0L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"b", (Window)new SessionWindow(0L, 0L)), new Change((Object)1L, null), 0L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"c", (Window)new SessionWindow(0L, 0L)), new Change((Object)1L, null), 0L)), this.results);
    }

    @Test
    public void shouldImmediatelyForwardRemovedSessionsWhenMerging() {
        this.initStore(false);
        this.processor.init((ProcessorContext)this.context);
        this.context.setTime(0L);
        this.processor.process((Object)"a", (Object)"1");
        this.context.setTime(5L);
        this.processor.process((Object)"a", (Object)"1");
        Assert.assertEquals(Arrays.asList(new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), new Change((Object)1L, null), 0L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), new Change(null, null), 0L), new KeyValueTimestamp<Windowed, Change>(new Windowed((Object)"a", (Window)new SessionWindow(0L, 5L)), new Change((Object)2L, null), 5L)), this.results);
    }

    @Test
    public void shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetricsVersion0100To24() {
        this.shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetrics("0.10.0-2.4");
    }

    @Test
    public void shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetricsVersionLatest() {
        this.shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetrics("latest");
    }

    private void shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetrics(String builtInMetricsVersion) {
        InternalMockProcessorContext context = this.createInternalMockProcessorContext(builtInMetricsVersion);
        this.processor.init((ProcessorContext)context);
        context.setRecordContext(new ProcessorRecordContext(-1L, -2L, -3, "topic", null));
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class);){
            this.processor.process(null, (Object)"1");
            MatcherAssert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record due to null key. value=[1] topic=[topic] partition=[-3] offset=[-2]"));
        }
        if ("0.10.0-2.4".equals(builtInMetricsVersion)) {
            Assert.assertEquals((Object)1.0, (Object)StreamsTestUtils.getMetricByName(context.metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
        } else {
            Assert.assertEquals((Object)1.0, (Object)StreamsTestUtils.getMetricByName(context.metrics().metrics(), "dropped-records-total", "stream-task-metrics").metricValue());
        }
    }

    @Test
    public void shouldLogAndMeterWhenSkippingLateRecordWithZeroGraceWithBuiltInMetricsVersionLatest() {
        this.shouldLogAndMeterWhenSkippingLateRecordWithZeroGrace("latest");
    }

    @Test
    public void shouldLogAndMeterWhenSkippingLateRecordWithZeroGraceWithBuiltInMetricsVersion0100To24() {
        this.shouldLogAndMeterWhenSkippingLateRecordWithZeroGrace("0.10.0-2.4");
    }

    private void shouldLogAndMeterWhenSkippingLateRecordWithZeroGrace(String builtInMetricsVersion) {
        MetricName dropRate;
        MetricName dropTotal;
        InternalMockProcessorContext context = this.createInternalMockProcessorContext(builtInMetricsVersion);
        Processor processor = new KStreamSessionWindowAggregate(SessionWindows.with((Duration)Duration.ofMillis(10L)).grace(Duration.ofMillis(0L)), STORE_NAME, this.initializer, this.aggregator, this.sessionMerger).get();
        this.initStore(false);
        processor.init((ProcessorContext)context);
        context.setRecordContext(new ProcessorRecordContext(0L, -2L, -3, "topic", null));
        processor.process((Object)"dummy", (Object)"dummy");
        context.setRecordContext(new ProcessorRecordContext(0L, -2L, -3, "topic", null));
        processor.process((Object)"OnTime1", (Object)"1");
        context.setRecordContext(new ProcessorRecordContext(1L, -2L, -3, "topic", null));
        processor.process((Object)"dummy", (Object)"dummy");
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class);){
            context.setRecordContext(new ProcessorRecordContext(0L, -2L, -3, "topic", null));
            processor.process((Object)"Late1", (Object)"1");
            MatcherAssert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record for expired window. key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[1]"));
        }
        if ("0.10.0-2.4".equals(builtInMetricsVersion)) {
            dropTotal = new MetricName("late-record-drop-total", "stream-processor-node-metrics", "The total number of late records dropped", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"client-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0"), Utils.mkEntry((Object)"processor-node-id", (Object)"TESTING_NODE")}));
            dropRate = new MetricName("late-record-drop-rate", "stream-processor-node-metrics", "The average number of late records dropped per second", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"client-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0"), Utils.mkEntry((Object)"processor-node-id", (Object)"TESTING_NODE")}));
        } else {
            dropTotal = new MetricName("dropped-records-total", "stream-task-metrics", "The total number of dropped records", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
            dropRate = new MetricName("dropped-records-rate", "stream-task-metrics", "The average number of dropped records per second", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
        }
        MatcherAssert.assertThat((Object)((KafkaMetric)this.metrics.metrics().get(dropTotal)).metricValue(), (Matcher)CoreMatchers.is((Object)1.0));
        MatcherAssert.assertThat((Object)((Double)((KafkaMetric)this.metrics.metrics().get(dropRate)).metricValue()), (Matcher)Matchers.greaterThan((Comparable)Double.valueOf(0.0)));
    }

    @Test
    public void shouldLogAndMeterWhenSkippingLateRecordWithNonzeroGraceWithBuiltInMetricsVersionLatest() {
        this.shouldLogAndMeterWhenSkippingLateRecordWithNonzeroGrace("latest");
    }

    @Test
    public void shouldLogAndMeterWhenSkippingLateRecordWithNonzeroGraceWithBuiltInMetricsVersion0100To24() {
        this.shouldLogAndMeterWhenSkippingLateRecordWithNonzeroGrace("0.10.0-2.4");
    }

    private void shouldLogAndMeterWhenSkippingLateRecordWithNonzeroGrace(String builtInMetricsVersion) {
        MetricName dropRate;
        MetricName dropTotal;
        InternalMockProcessorContext context = this.createInternalMockProcessorContext(builtInMetricsVersion);
        Processor processor = new KStreamSessionWindowAggregate(SessionWindows.with((Duration)Duration.ofMillis(10L)).grace(Duration.ofMillis(1L)), STORE_NAME, this.initializer, this.aggregator, this.sessionMerger).get();
        this.initStore(false);
        processor.init((ProcessorContext)context);
        try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister(KStreamSessionWindowAggregate.class);){
            context.setRecordContext(new ProcessorRecordContext(0L, -2L, -3, "topic", null));
            processor.process((Object)"dummy", (Object)"dummy");
            context.setRecordContext(new ProcessorRecordContext(0L, -2L, -3, "topic", null));
            processor.process((Object)"OnTime1", (Object)"1");
            context.setRecordContext(new ProcessorRecordContext(1L, -2L, -3, "topic", null));
            processor.process((Object)"dummy", (Object)"dummy");
            context.setRecordContext(new ProcessorRecordContext(0L, -2L, -3, "topic", null));
            processor.process((Object)"OnTime2", (Object)"1");
            context.setRecordContext(new ProcessorRecordContext(2L, -2L, -3, "topic", null));
            processor.process((Object)"dummy", (Object)"dummy");
            context.setRecordContext(new ProcessorRecordContext(0L, -2L, -3, "topic", null));
            processor.process((Object)"Late1", (Object)"1");
            MatcherAssert.assertThat(appender.getMessages(), (Matcher)CoreMatchers.hasItem((Object)"Skipping record for expired window. key=[Late1] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0] expiration=[1] streamTime=[2]"));
        }
        if ("0.10.0-2.4".equals(builtInMetricsVersion)) {
            dropTotal = new MetricName("late-record-drop-total", "stream-processor-node-metrics", "The total number of late records dropped", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"client-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0"), Utils.mkEntry((Object)"processor-node-id", (Object)"TESTING_NODE")}));
            dropRate = new MetricName("late-record-drop-rate", "stream-processor-node-metrics", "The average number of late records dropped per second", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"client-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0"), Utils.mkEntry((Object)"processor-node-id", (Object)"TESTING_NODE")}));
        } else {
            dropTotal = new MetricName("dropped-records-total", "stream-task-metrics", "The total number of dropped records", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
            dropRate = new MetricName("dropped-records-rate", "stream-task-metrics", "The average number of dropped records per second", Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"thread-id", (Object)this.threadId), Utils.mkEntry((Object)"task-id", (Object)"0_0")}));
        }
        MatcherAssert.assertThat((Object)((KafkaMetric)this.metrics.metrics().get(dropTotal)).metricValue(), (Matcher)CoreMatchers.is((Object)1.0));
        MatcherAssert.assertThat((Object)((Double)((KafkaMetric)this.metrics.metrics().get(dropRate)).metricValue()), (Matcher)Matchers.greaterThan((Comparable)Double.valueOf(0.0)));
    }

    private InternalMockProcessorContext createInternalMockProcessorContext(String builtInMetricsVersion) {
        StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(this.metrics, "test", builtInMetricsVersion);
        InternalMockProcessorContext context = new InternalMockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), streamsMetrics, new StreamsConfig((Map)StreamsTestUtils.getStreamsConfig()), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 100000L, streamsMetrics)){

            @Override
            public void forward(Object key, Object value, To to) {
                KStreamSessionWindowAggregateProcessorTest.this.toInternal.update(to);
                KStreamSessionWindowAggregateProcessorTest.this.results.add(new KeyValueTimestamp<Windowed, Change>((Windowed)key, (Change)value, KStreamSessionWindowAggregateProcessorTest.this.toInternal.timestamp()));
            }
        };
        TaskMetrics.droppedRecordsSensorOrSkippedRecordsSensor((String)this.threadId, (String)context.taskId().toString(), (StreamsMetricsImpl)streamsMetrics);
        StoreBuilder storeBuilder = Stores.sessionStoreBuilder((SessionBytesStoreSupplier)Stores.persistentSessionStore((String)STORE_NAME, (Duration)Duration.ofMillis(900000L)), (Serde)Serdes.String(), (Serde)Serdes.Long()).withLoggingDisabled();
        SessionStore sessionStore = (SessionStore)storeBuilder.build();
        sessionStore.init((ProcessorContext)context, (StateStore)sessionStore);
        return context;
    }
}

