/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category(value={IntegrationTest.class})
public class StoreUpgradeIntegrationTest {
    private static final String STORE_NAME = "store";
    private String inputStream;
    private KafkaStreams kafkaStreams;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    @Rule
    public TestName testName = new TestName();

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void createTopics() throws Exception {
        this.inputStream = "input-stream-" + IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        CLUSTER.createTopic(this.inputStream);
    }

    private Properties props() {
        Properties streamsConfiguration = new Properties();
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        streamsConfiguration.put("application.id", "app-" + safeTestName);
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("statestore.cache.max.bytes", (Object)0);
        streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        streamsConfiguration.put("default.key.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("default.value.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("commit.interval.ms", (Object)1000L);
        streamsConfiguration.put("auto.offset.reset", "earliest");
        return streamsConfiguration;
    }

    @After
    public void shutdown() {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close(Duration.ofSeconds(30L));
            this.kafkaStreams.cleanUp();
        }
    }

    @Test
    public void shouldMigrateInMemoryKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws Exception {
        this.shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(false);
    }

    @Test
    public void shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws Exception {
        this.shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(true);
    }

    private void shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(boolean persistentStore) throws Exception {
        StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
        streamsBuilderForOldStore.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)(persistentStore ? Stores.persistentKeyValueStore((String)STORE_NAME) : Stores.inMemoryKeyValueStore((String)STORE_NAME)), (Serde)Serdes.Integer(), (Serde)Serdes.Long())).stream(this.inputStream).process(() -> new KeyValueProcessor(), new String[]{STORE_NAME});
        Properties props = this.props();
        this.kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), props);
        this.kafkaStreams.start();
        this.processKeyValueAndVerifyPlainCount(1, Collections.singletonList(KeyValue.pair((Object)1, (Object)1L)));
        this.processKeyValueAndVerifyPlainCount(1, Collections.singletonList(KeyValue.pair((Object)1, (Object)2L)));
        long lastUpdateKeyOne = persistentStore ? -1L : StoreUpgradeIntegrationTest.CLUSTER.time.milliseconds() - 1L;
        this.processKeyValueAndVerifyPlainCount(2, Arrays.asList(KeyValue.pair((Object)1, (Object)2L), KeyValue.pair((Object)2, (Object)1L)));
        long lastUpdateKeyTwo = persistentStore ? -1L : StoreUpgradeIntegrationTest.CLUSTER.time.milliseconds() - 1L;
        this.processKeyValueAndVerifyPlainCount(3, Arrays.asList(KeyValue.pair((Object)1, (Object)2L), KeyValue.pair((Object)2, (Object)1L), KeyValue.pair((Object)3, (Object)1L)));
        long lastUpdateKeyThree = persistentStore ? -1L : StoreUpgradeIntegrationTest.CLUSTER.time.milliseconds() - 1L;
        this.processKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair((Object)1, (Object)2L), KeyValue.pair((Object)2, (Object)1L), KeyValue.pair((Object)3, (Object)1L), KeyValue.pair((Object)4, (Object)1L)));
        this.processKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair((Object)1, (Object)2L), KeyValue.pair((Object)2, (Object)1L), KeyValue.pair((Object)3, (Object)1L), KeyValue.pair((Object)4, (Object)2L)));
        this.processKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair((Object)1, (Object)2L), KeyValue.pair((Object)2, (Object)1L), KeyValue.pair((Object)3, (Object)1L), KeyValue.pair((Object)4, (Object)3L)));
        long lastUpdateKeyFour = persistentStore ? -1L : StoreUpgradeIntegrationTest.CLUSTER.time.milliseconds() - 1L;
        this.kafkaStreams.close();
        this.kafkaStreams = null;
        StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
        streamsBuilderForNewStore.addStateStore(Stores.timestampedKeyValueStoreBuilder((KeyValueBytesStoreSupplier)(persistentStore ? Stores.persistentTimestampedKeyValueStore((String)STORE_NAME) : Stores.inMemoryKeyValueStore((String)STORE_NAME)), (Serde)Serdes.Integer(), (Serde)Serdes.Long())).stream(this.inputStream).process(() -> new TimestampedKeyValueProcessor(), new String[]{STORE_NAME});
        this.kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), props);
        this.kafkaStreams.start();
        this.verifyCountWithTimestamp(1, 2L, lastUpdateKeyOne);
        this.verifyCountWithTimestamp(2, 1L, lastUpdateKeyTwo);
        this.verifyCountWithTimestamp(3, 1L, lastUpdateKeyThree);
        this.verifyCountWithTimestamp(4, 3L, lastUpdateKeyFour);
        long currentTime = StoreUpgradeIntegrationTest.CLUSTER.time.milliseconds();
        this.processKeyValueAndVerifyCountWithTimestamp(1, currentTime + 42L, Arrays.asList(KeyValue.pair((Object)1, (Object)ValueAndTimestamp.make((Object)3L, (long)(currentTime + 42L))), KeyValue.pair((Object)2, (Object)ValueAndTimestamp.make((Object)1L, (long)lastUpdateKeyTwo)), KeyValue.pair((Object)3, (Object)ValueAndTimestamp.make((Object)1L, (long)lastUpdateKeyThree)), KeyValue.pair((Object)4, (Object)ValueAndTimestamp.make((Object)3L, (long)lastUpdateKeyFour))));
        this.processKeyValueAndVerifyCountWithTimestamp(2, currentTime + 45L, Arrays.asList(KeyValue.pair((Object)1, (Object)ValueAndTimestamp.make((Object)3L, (long)(currentTime + 42L))), KeyValue.pair((Object)2, (Object)ValueAndTimestamp.make((Object)2L, (long)(currentTime + 45L))), KeyValue.pair((Object)3, (Object)ValueAndTimestamp.make((Object)1L, (long)lastUpdateKeyThree)), KeyValue.pair((Object)4, (Object)ValueAndTimestamp.make((Object)3L, (long)lastUpdateKeyFour))));
        this.processKeyValueAndVerifyCountWithTimestamp(4, currentTime + 21L, Arrays.asList(KeyValue.pair((Object)1, (Object)ValueAndTimestamp.make((Object)3L, (long)(currentTime + 42L))), KeyValue.pair((Object)2, (Object)ValueAndTimestamp.make((Object)2L, (long)(currentTime + 45L))), KeyValue.pair((Object)3, (Object)ValueAndTimestamp.make((Object)1L, (long)lastUpdateKeyThree)), KeyValue.pair((Object)4, (Object)ValueAndTimestamp.make((Object)4L, (long)(currentTime + 21L)))));
        this.processKeyValueAndVerifyCountWithTimestamp(4, currentTime + 42L, Arrays.asList(KeyValue.pair((Object)1, (Object)ValueAndTimestamp.make((Object)3L, (long)(currentTime + 42L))), KeyValue.pair((Object)2, (Object)ValueAndTimestamp.make((Object)2L, (long)(currentTime + 45L))), KeyValue.pair((Object)3, (Object)ValueAndTimestamp.make((Object)1L, (long)lastUpdateKeyThree)), KeyValue.pair((Object)4, (Object)ValueAndTimestamp.make((Object)5L, (long)(currentTime + 42L)))));
        this.processKeyValueAndVerifyCountWithTimestamp(4, currentTime + 10L, Arrays.asList(KeyValue.pair((Object)1, (Object)ValueAndTimestamp.make((Object)3L, (long)(currentTime + 42L))), KeyValue.pair((Object)2, (Object)ValueAndTimestamp.make((Object)2L, (long)(currentTime + 45L))), KeyValue.pair((Object)3, (Object)ValueAndTimestamp.make((Object)1L, (long)lastUpdateKeyThree)), KeyValue.pair((Object)4, (Object)ValueAndTimestamp.make((Object)6L, (long)(currentTime + 42L)))));
        this.kafkaStreams.close();
    }

    @Test
    public void shouldProxyKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws Exception {
        StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
        streamsBuilderForOldStore.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)STORE_NAME), (Serde)Serdes.Integer(), (Serde)Serdes.Long())).stream(this.inputStream).process(() -> new KeyValueProcessor(), new String[]{STORE_NAME});
        Properties props = this.props();
        this.kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), props);
        this.kafkaStreams.start();
        this.processKeyValueAndVerifyPlainCount(1, Collections.singletonList(KeyValue.pair((Object)1, (Object)1L)));
        this.processKeyValueAndVerifyPlainCount(1, Collections.singletonList(KeyValue.pair((Object)1, (Object)2L)));
        this.processKeyValueAndVerifyPlainCount(2, Arrays.asList(KeyValue.pair((Object)1, (Object)2L), KeyValue.pair((Object)2, (Object)1L)));
        this.processKeyValueAndVerifyPlainCount(3, Arrays.asList(KeyValue.pair((Object)1, (Object)2L), KeyValue.pair((Object)2, (Object)1L), KeyValue.pair((Object)3, (Object)1L)));
        this.processKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair((Object)1, (Object)2L), KeyValue.pair((Object)2, (Object)1L), KeyValue.pair((Object)3, (Object)1L), KeyValue.pair((Object)4, (Object)1L)));
        this.processKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair((Object)1, (Object)2L), KeyValue.pair((Object)2, (Object)1L), KeyValue.pair((Object)3, (Object)1L), KeyValue.pair((Object)4, (Object)2L)));
        this.processKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair((Object)1, (Object)2L), KeyValue.pair((Object)2, (Object)1L), KeyValue.pair((Object)3, (Object)1L), KeyValue.pair((Object)4, (Object)3L)));
        this.kafkaStreams.close();
        this.kafkaStreams = null;
        StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
        streamsBuilderForNewStore.addStateStore(Stores.timestampedKeyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)STORE_NAME), (Serde)Serdes.Integer(), (Serde)Serdes.Long())).stream(this.inputStream).process(() -> new TimestampedKeyValueProcessor(), new String[]{STORE_NAME});
        this.kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), props);
        this.kafkaStreams.start();
        this.verifyCountWithSurrogateTimestamp(1, 2L);
        this.verifyCountWithSurrogateTimestamp(2, 1L);
        this.verifyCountWithSurrogateTimestamp(3, 1L);
        this.verifyCountWithSurrogateTimestamp(4, 3L);
        this.processKeyValueAndVerifyCount(1, 42L, Arrays.asList(KeyValue.pair((Object)1, (Object)ValueAndTimestamp.make((Object)3L, (long)-1L)), KeyValue.pair((Object)2, (Object)ValueAndTimestamp.make((Object)1L, (long)-1L)), KeyValue.pair((Object)3, (Object)ValueAndTimestamp.make((Object)1L, (long)-1L)), KeyValue.pair((Object)4, (Object)ValueAndTimestamp.make((Object)3L, (long)-1L))));
        this.processKeyValueAndVerifyCount(2, 45L, Arrays.asList(KeyValue.pair((Object)1, (Object)ValueAndTimestamp.make((Object)3L, (long)-1L)), KeyValue.pair((Object)2, (Object)ValueAndTimestamp.make((Object)2L, (long)-1L)), KeyValue.pair((Object)3, (Object)ValueAndTimestamp.make((Object)1L, (long)-1L)), KeyValue.pair((Object)4, (Object)ValueAndTimestamp.make((Object)3L, (long)-1L))));
        this.processKeyValueAndVerifyCount(4, 21L, Arrays.asList(KeyValue.pair((Object)1, (Object)ValueAndTimestamp.make((Object)3L, (long)-1L)), KeyValue.pair((Object)2, (Object)ValueAndTimestamp.make((Object)2L, (long)-1L)), KeyValue.pair((Object)3, (Object)ValueAndTimestamp.make((Object)1L, (long)-1L)), KeyValue.pair((Object)4, (Object)ValueAndTimestamp.make((Object)4L, (long)-1L))));
        this.processKeyValueAndVerifyCount(4, 42L, Arrays.asList(KeyValue.pair((Object)1, (Object)ValueAndTimestamp.make((Object)3L, (long)-1L)), KeyValue.pair((Object)2, (Object)ValueAndTimestamp.make((Object)2L, (long)-1L)), KeyValue.pair((Object)3, (Object)ValueAndTimestamp.make((Object)1L, (long)-1L)), KeyValue.pair((Object)4, (Object)ValueAndTimestamp.make((Object)5L, (long)-1L))));
        this.processKeyValueAndVerifyCount(4, 10L, Arrays.asList(KeyValue.pair((Object)1, (Object)ValueAndTimestamp.make((Object)3L, (long)-1L)), KeyValue.pair((Object)2, (Object)ValueAndTimestamp.make((Object)2L, (long)-1L)), KeyValue.pair((Object)3, (Object)ValueAndTimestamp.make((Object)1L, (long)-1L)), KeyValue.pair((Object)4, (Object)ValueAndTimestamp.make((Object)6L, (long)-1L))));
        this.kafkaStreams.close();
    }

    private <K, V> void processKeyValueAndVerifyPlainCount(K key, List<KeyValue<Integer, Object>> expectedStoreContent) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronously(this.inputStream, Collections.singletonList(KeyValue.pair(key, (Object)0)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class), (Time)StoreUpgradeIntegrationTest.CLUSTER.time);
        TestUtils.waitForCondition(() -> {
            try {
                ReadOnlyKeyValueStore store = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(STORE_NAME, this.kafkaStreams, QueryableStoreTypes.keyValueStore());
                if (store == null) {
                    return false;
                }
                try (KeyValueIterator all = store.all();){
                    LinkedList<Object> storeContent = new LinkedList<Object>();
                    while (all.hasNext()) {
                        storeContent.add(all.next());
                    }
                    boolean bl = storeContent.equals(expectedStoreContent);
                    return bl;
                }
            }
            catch (Exception swallow) {
                swallow.printStackTrace();
                System.err.println(swallow.getMessage());
                return false;
            }
        }, (long)60000L, (String)"Could not get expected result in time.");
    }

    private <K> void verifyCountWithTimestamp(K key, long value, long timestamp) throws Exception {
        TestUtils.waitForCondition(() -> {
            try {
                ReadOnlyKeyValueStore store = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(STORE_NAME, this.kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore());
                if (store == null) {
                    return false;
                }
                ValueAndTimestamp count = (ValueAndTimestamp)store.get(key);
                return (Long)count.value() == value && count.timestamp() == timestamp;
            }
            catch (Exception swallow) {
                swallow.printStackTrace();
                System.err.println(swallow.getMessage());
                return false;
            }
        }, (long)60000L, (String)"Could not get expected result in time.");
    }

    private <K> void verifyCountWithSurrogateTimestamp(K key, long value) throws Exception {
        TestUtils.waitForCondition(() -> {
            try {
                ReadOnlyKeyValueStore store = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(STORE_NAME, this.kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore());
                if (store == null) {
                    return false;
                }
                ValueAndTimestamp count = (ValueAndTimestamp)store.get(key);
                return (Long)count.value() == value && count.timestamp() == -1L;
            }
            catch (Exception swallow) {
                swallow.printStackTrace();
                System.err.println(swallow.getMessage());
                return false;
            }
        }, (long)60000L, (String)"Could not get expected result in time.");
    }

    private <K, V> void processKeyValueAndVerifyCount(K key, long timestamp, List<KeyValue<Integer, Object>> expectedStoreContent) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputStream, Collections.singletonList(KeyValue.pair(key, (Object)0)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class), timestamp);
        TestUtils.waitForCondition(() -> {
            try {
                ReadOnlyKeyValueStore store = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(STORE_NAME, this.kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore());
                if (store == null) {
                    return false;
                }
                try (KeyValueIterator all = store.all();){
                    LinkedList<Object> storeContent = new LinkedList<Object>();
                    while (all.hasNext()) {
                        storeContent.add(all.next());
                    }
                    boolean bl = storeContent.equals(expectedStoreContent);
                    return bl;
                }
            }
            catch (Exception swallow) {
                swallow.printStackTrace();
                System.err.println(swallow.getMessage());
                return false;
            }
        }, (long)60000L, (String)"Could not get expected result in time.");
    }

    private <K, V> void processKeyValueAndVerifyCountWithTimestamp(K key, long timestamp, List<KeyValue<Integer, Object>> expectedStoreContent) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputStream, Collections.singletonList(KeyValue.pair(key, (Object)0)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class), timestamp);
        TestUtils.waitForCondition(() -> {
            try {
                ReadOnlyKeyValueStore store = (ReadOnlyKeyValueStore)IntegrationTestUtils.getStore(STORE_NAME, this.kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore());
                if (store == null) {
                    return false;
                }
                try (KeyValueIterator all = store.all();){
                    LinkedList<Object> storeContent = new LinkedList<Object>();
                    while (all.hasNext()) {
                        storeContent.add(all.next());
                    }
                    boolean bl = storeContent.equals(expectedStoreContent);
                    return bl;
                }
            }
            catch (Exception swallow) {
                swallow.printStackTrace();
                System.err.println(swallow.getMessage());
                return false;
            }
        }, (long)60000L, (String)"Could not get expected result in time.");
    }

    @Test
    public void shouldMigrateInMemoryWindowStoreToTimestampedWindowStoreUsingPapi() throws Exception {
        StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
        streamsBuilderForOldStore.addStateStore(Stores.windowStoreBuilder((WindowBytesStoreSupplier)Stores.inMemoryWindowStore((String)STORE_NAME, (Duration)Duration.ofMillis(1000L), (Duration)Duration.ofMillis(1000L), (boolean)false), (Serde)Serdes.Integer(), (Serde)Serdes.Long())).stream(this.inputStream).process(() -> new WindowedProcessor(), new String[]{STORE_NAME});
        StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
        streamsBuilderForNewStore.addStateStore(Stores.timestampedWindowStoreBuilder((WindowBytesStoreSupplier)Stores.inMemoryWindowStore((String)STORE_NAME, (Duration)Duration.ofMillis(1000L), (Duration)Duration.ofMillis(1000L), (boolean)false), (Serde)Serdes.Integer(), (Serde)Serdes.Long())).stream(this.inputStream).process(() -> new TimestampedWindowedProcessor(), new String[]{STORE_NAME});
        this.shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(streamsBuilderForOldStore, streamsBuilderForNewStore, false);
    }

    @Test
    public void shouldMigratePersistentWindowStoreToTimestampedWindowStoreUsingPapi() throws Exception {
        StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
        streamsBuilderForOldStore.addStateStore(Stores.windowStoreBuilder((WindowBytesStoreSupplier)Stores.persistentWindowStore((String)STORE_NAME, (Duration)Duration.ofMillis(1000L), (Duration)Duration.ofMillis(1000L), (boolean)false), (Serde)Serdes.Integer(), (Serde)Serdes.Long())).stream(this.inputStream).process(() -> new WindowedProcessor(), new String[]{STORE_NAME});
        StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
        streamsBuilderForNewStore.addStateStore(Stores.timestampedWindowStoreBuilder((WindowBytesStoreSupplier)Stores.persistentTimestampedWindowStore((String)STORE_NAME, (Duration)Duration.ofMillis(1000L), (Duration)Duration.ofMillis(1000L), (boolean)false), (Serde)Serdes.Integer(), (Serde)Serdes.Long())).stream(this.inputStream).process(() -> new TimestampedWindowedProcessor(), new String[]{STORE_NAME});
        this.shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(streamsBuilderForOldStore, streamsBuilderForNewStore, true);
    }

    private void shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(StreamsBuilder streamsBuilderForOldStore, StreamsBuilder streamsBuilderForNewStore, boolean persistentStore) throws Exception {
        Properties props = this.props();
        this.kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), props);
        this.kafkaStreams.start();
        this.processWindowedKeyValueAndVerifyPlainCount(1, Collections.singletonList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)1L)));
        this.processWindowedKeyValueAndVerifyPlainCount(1, Collections.singletonList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)2L)));
        long lastUpdateKeyOne = persistentStore ? -1L : StoreUpgradeIntegrationTest.CLUSTER.time.milliseconds() - 1L;
        this.processWindowedKeyValueAndVerifyPlainCount(2, Arrays.asList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)2L), KeyValue.pair((Object)new Windowed((Object)2, (Window)new TimeWindow(0L, 1000L)), (Object)1L)));
        long lastUpdateKeyTwo = persistentStore ? -1L : StoreUpgradeIntegrationTest.CLUSTER.time.milliseconds() - 1L;
        this.processWindowedKeyValueAndVerifyPlainCount(3, Arrays.asList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)2L), KeyValue.pair((Object)new Windowed((Object)2, (Window)new TimeWindow(0L, 1000L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)3, (Window)new TimeWindow(0L, 1000L)), (Object)1L)));
        long lastUpdateKeyThree = persistentStore ? -1L : StoreUpgradeIntegrationTest.CLUSTER.time.milliseconds() - 1L;
        this.processWindowedKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)2L), KeyValue.pair((Object)new Windowed((Object)2, (Window)new TimeWindow(0L, 1000L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)3, (Window)new TimeWindow(0L, 1000L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)4, (Window)new TimeWindow(0L, 1000L)), (Object)1L)));
        this.processWindowedKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)2L), KeyValue.pair((Object)new Windowed((Object)2, (Window)new TimeWindow(0L, 1000L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)3, (Window)new TimeWindow(0L, 1000L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)4, (Window)new TimeWindow(0L, 1000L)), (Object)2L)));
        this.processWindowedKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)2L), KeyValue.pair((Object)new Windowed((Object)2, (Window)new TimeWindow(0L, 1000L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)3, (Window)new TimeWindow(0L, 1000L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)4, (Window)new TimeWindow(0L, 1000L)), (Object)3L)));
        long lastUpdateKeyFour = persistentStore ? -1L : StoreUpgradeIntegrationTest.CLUSTER.time.milliseconds() - 1L;
        this.kafkaStreams.close();
        this.kafkaStreams = null;
        this.kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), props);
        this.kafkaStreams.start();
        this.verifyWindowedCountWithTimestamp(new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), 2L, lastUpdateKeyOne);
        this.verifyWindowedCountWithTimestamp(new Windowed((Object)2, (Window)new TimeWindow(0L, 1000L)), 1L, lastUpdateKeyTwo);
        this.verifyWindowedCountWithTimestamp(new Windowed((Object)3, (Window)new TimeWindow(0L, 1000L)), 1L, lastUpdateKeyThree);
        this.verifyWindowedCountWithTimestamp(new Windowed((Object)4, (Window)new TimeWindow(0L, 1000L)), 3L, lastUpdateKeyFour);
        long currentTime = StoreUpgradeIntegrationTest.CLUSTER.time.milliseconds();
        this.processKeyValueAndVerifyWindowedCountWithTimestamp(1, currentTime + 42L, Arrays.asList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)3L, (long)(currentTime + 42L))), KeyValue.pair((Object)new Windowed((Object)2, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)1L, (long)lastUpdateKeyTwo)), KeyValue.pair((Object)new Windowed((Object)3, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)1L, (long)lastUpdateKeyThree)), KeyValue.pair((Object)new Windowed((Object)4, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)3L, (long)lastUpdateKeyFour))));
        this.processKeyValueAndVerifyWindowedCountWithTimestamp(2, currentTime + 45L, Arrays.asList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)3L, (long)(currentTime + 42L))), KeyValue.pair((Object)new Windowed((Object)2, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)2L, (long)(currentTime + 45L))), KeyValue.pair((Object)new Windowed((Object)3, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)1L, (long)lastUpdateKeyThree)), KeyValue.pair((Object)new Windowed((Object)4, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)3L, (long)lastUpdateKeyFour))));
        this.processKeyValueAndVerifyWindowedCountWithTimestamp(4, currentTime + 21L, Arrays.asList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)3L, (long)(currentTime + 42L))), KeyValue.pair((Object)new Windowed((Object)2, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)2L, (long)(currentTime + 45L))), KeyValue.pair((Object)new Windowed((Object)3, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)1L, (long)lastUpdateKeyThree)), KeyValue.pair((Object)new Windowed((Object)4, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)4L, (long)(currentTime + 21L)))));
        this.processKeyValueAndVerifyWindowedCountWithTimestamp(4, currentTime + 42L, Arrays.asList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)3L, (long)(currentTime + 42L))), KeyValue.pair((Object)new Windowed((Object)2, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)2L, (long)(currentTime + 45L))), KeyValue.pair((Object)new Windowed((Object)3, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)1L, (long)lastUpdateKeyThree)), KeyValue.pair((Object)new Windowed((Object)4, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)5L, (long)(currentTime + 42L)))));
        this.processKeyValueAndVerifyWindowedCountWithTimestamp(4, currentTime + 10L, Arrays.asList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)3L, (long)(currentTime + 42L))), KeyValue.pair((Object)new Windowed((Object)2, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)2L, (long)(currentTime + 45L))), KeyValue.pair((Object)new Windowed((Object)3, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)1L, (long)lastUpdateKeyThree)), KeyValue.pair((Object)new Windowed((Object)4, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)6L, (long)(currentTime + 42L)))));
        this.processKeyValueAndVerifyWindowedCountWithTimestamp(10, currentTime + 100001L, Collections.singletonList(KeyValue.pair((Object)new Windowed((Object)10, (Window)new TimeWindow(100000L, 101000L)), (Object)ValueAndTimestamp.make((Object)1L, (long)(currentTime + 100001L)))));
        this.kafkaStreams.close();
    }

    @Test
    public void shouldProxyWindowStoreToTimestampedWindowStoreUsingPapi() throws Exception {
        StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
        streamsBuilderForOldStore.addStateStore(Stores.windowStoreBuilder((WindowBytesStoreSupplier)Stores.persistentWindowStore((String)STORE_NAME, (Duration)Duration.ofMillis(1000L), (Duration)Duration.ofMillis(1000L), (boolean)false), (Serde)Serdes.Integer(), (Serde)Serdes.Long())).stream(this.inputStream).process(() -> new WindowedProcessor(), new String[]{STORE_NAME});
        Properties props = this.props();
        this.kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(), props);
        this.kafkaStreams.start();
        this.processWindowedKeyValueAndVerifyPlainCount(1, Collections.singletonList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)1L)));
        this.processWindowedKeyValueAndVerifyPlainCount(1, Collections.singletonList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)2L)));
        this.processWindowedKeyValueAndVerifyPlainCount(2, Arrays.asList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)2L), KeyValue.pair((Object)new Windowed((Object)2, (Window)new TimeWindow(0L, 1000L)), (Object)1L)));
        this.processWindowedKeyValueAndVerifyPlainCount(3, Arrays.asList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)2L), KeyValue.pair((Object)new Windowed((Object)2, (Window)new TimeWindow(0L, 1000L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)3, (Window)new TimeWindow(0L, 1000L)), (Object)1L)));
        this.processWindowedKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)2L), KeyValue.pair((Object)new Windowed((Object)2, (Window)new TimeWindow(0L, 1000L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)3, (Window)new TimeWindow(0L, 1000L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)4, (Window)new TimeWindow(0L, 1000L)), (Object)1L)));
        this.processWindowedKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)2L), KeyValue.pair((Object)new Windowed((Object)2, (Window)new TimeWindow(0L, 1000L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)3, (Window)new TimeWindow(0L, 1000L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)4, (Window)new TimeWindow(0L, 1000L)), (Object)2L)));
        this.processWindowedKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)2L), KeyValue.pair((Object)new Windowed((Object)2, (Window)new TimeWindow(0L, 1000L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)3, (Window)new TimeWindow(0L, 1000L)), (Object)1L), KeyValue.pair((Object)new Windowed((Object)4, (Window)new TimeWindow(0L, 1000L)), (Object)3L)));
        this.kafkaStreams.close();
        this.kafkaStreams = null;
        StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
        streamsBuilderForNewStore.addStateStore(Stores.timestampedWindowStoreBuilder((WindowBytesStoreSupplier)Stores.persistentWindowStore((String)STORE_NAME, (Duration)Duration.ofMillis(1000L), (Duration)Duration.ofMillis(1000L), (boolean)false), (Serde)Serdes.Integer(), (Serde)Serdes.Long())).stream(this.inputStream).process(() -> new TimestampedWindowedProcessor(), new String[]{STORE_NAME});
        this.kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(), props);
        this.kafkaStreams.start();
        this.verifyWindowedCountWithSurrogateTimestamp(new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), 2L);
        this.verifyWindowedCountWithSurrogateTimestamp(new Windowed((Object)2, (Window)new TimeWindow(0L, 1000L)), 1L);
        this.verifyWindowedCountWithSurrogateTimestamp(new Windowed((Object)3, (Window)new TimeWindow(0L, 1000L)), 1L);
        this.verifyWindowedCountWithSurrogateTimestamp(new Windowed((Object)4, (Window)new TimeWindow(0L, 1000L)), 3L);
        this.processKeyValueAndVerifyWindowedCountWithTimestamp(1, 42L, Arrays.asList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)3L, (long)-1L)), KeyValue.pair((Object)new Windowed((Object)2, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)1L, (long)-1L)), KeyValue.pair((Object)new Windowed((Object)3, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)1L, (long)-1L)), KeyValue.pair((Object)new Windowed((Object)4, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)3L, (long)-1L))));
        this.processKeyValueAndVerifyWindowedCountWithTimestamp(2, 45L, Arrays.asList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)3L, (long)-1L)), KeyValue.pair((Object)new Windowed((Object)2, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)2L, (long)-1L)), KeyValue.pair((Object)new Windowed((Object)3, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)1L, (long)-1L)), KeyValue.pair((Object)new Windowed((Object)4, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)3L, (long)-1L))));
        this.processKeyValueAndVerifyWindowedCountWithTimestamp(4, 21L, Arrays.asList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)3L, (long)-1L)), KeyValue.pair((Object)new Windowed((Object)2, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)2L, (long)-1L)), KeyValue.pair((Object)new Windowed((Object)3, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)1L, (long)-1L)), KeyValue.pair((Object)new Windowed((Object)4, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)4L, (long)-1L))));
        this.processKeyValueAndVerifyWindowedCountWithTimestamp(4, 42L, Arrays.asList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)3L, (long)-1L)), KeyValue.pair((Object)new Windowed((Object)2, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)2L, (long)-1L)), KeyValue.pair((Object)new Windowed((Object)3, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)1L, (long)-1L)), KeyValue.pair((Object)new Windowed((Object)4, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)5L, (long)-1L))));
        this.processKeyValueAndVerifyWindowedCountWithTimestamp(4, 10L, Arrays.asList(KeyValue.pair((Object)new Windowed((Object)1, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)3L, (long)-1L)), KeyValue.pair((Object)new Windowed((Object)2, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)2L, (long)-1L)), KeyValue.pair((Object)new Windowed((Object)3, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)1L, (long)-1L)), KeyValue.pair((Object)new Windowed((Object)4, (Window)new TimeWindow(0L, 1000L)), (Object)ValueAndTimestamp.make((Object)6L, (long)-1L))));
        this.processKeyValueAndVerifyWindowedCountWithTimestamp(10, 100001L, Collections.singletonList(KeyValue.pair((Object)new Windowed((Object)10, (Window)new TimeWindow(100000L, 101000L)), (Object)ValueAndTimestamp.make((Object)1L, (long)-1L))));
        this.kafkaStreams.close();
    }

    private <K, V> void processWindowedKeyValueAndVerifyPlainCount(K key, List<KeyValue<Windowed<Integer>, Object>> expectedStoreContent) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronously(this.inputStream, Collections.singletonList(KeyValue.pair(key, (Object)0)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class), (Time)StoreUpgradeIntegrationTest.CLUSTER.time);
        TestUtils.waitForCondition(() -> {
            try {
                ReadOnlyWindowStore store = (ReadOnlyWindowStore)IntegrationTestUtils.getStore(STORE_NAME, this.kafkaStreams, QueryableStoreTypes.windowStore());
                if (store == null) {
                    return false;
                }
                try (KeyValueIterator all = store.all();){
                    LinkedList<Object> storeContent = new LinkedList<Object>();
                    while (all.hasNext()) {
                        storeContent.add(all.next());
                    }
                    boolean bl = storeContent.equals(expectedStoreContent);
                    return bl;
                }
            }
            catch (Exception swallow) {
                swallow.printStackTrace();
                System.err.println(swallow.getMessage());
                return false;
            }
        }, (long)60000L, (String)"Could not get expected result in time.");
    }

    private <K> void verifyWindowedCountWithSurrogateTimestamp(Windowed<K> key, long value) throws Exception {
        TestUtils.waitForCondition(() -> {
            try {
                ReadOnlyWindowStore store = (ReadOnlyWindowStore)IntegrationTestUtils.getStore(STORE_NAME, this.kafkaStreams, QueryableStoreTypes.timestampedWindowStore());
                if (store == null) {
                    return false;
                }
                ValueAndTimestamp count = (ValueAndTimestamp)store.fetch(key.key(), key.window().start());
                return (Long)count.value() == value && count.timestamp() == -1L;
            }
            catch (Exception swallow) {
                swallow.printStackTrace();
                System.err.println(swallow.getMessage());
                return false;
            }
        }, (long)60000L, (String)"Could not get expected result in time.");
    }

    private <K> void verifyWindowedCountWithTimestamp(Windowed<K> key, long value, long timestamp) throws Exception {
        TestUtils.waitForCondition(() -> {
            try {
                ReadOnlyWindowStore store = (ReadOnlyWindowStore)IntegrationTestUtils.getStore(STORE_NAME, this.kafkaStreams, QueryableStoreTypes.timestampedWindowStore());
                if (store == null) {
                    return false;
                }
                ValueAndTimestamp count = (ValueAndTimestamp)store.fetch(key.key(), key.window().start());
                return (Long)count.value() == value && count.timestamp() == timestamp;
            }
            catch (Exception swallow) {
                swallow.printStackTrace();
                System.err.println(swallow.getMessage());
                return false;
            }
        }, (long)60000L, (String)"Could not get expected result in time.");
    }

    private <K, V> void processKeyValueAndVerifyWindowedCountWithTimestamp(K key, long timestamp, List<KeyValue<Windowed<Integer>, Object>> expectedStoreContent) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputStream, Collections.singletonList(KeyValue.pair(key, (Object)0)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class), timestamp);
        TestUtils.waitForCondition(() -> {
            try {
                ReadOnlyWindowStore store = (ReadOnlyWindowStore)IntegrationTestUtils.getStore(STORE_NAME, this.kafkaStreams, QueryableStoreTypes.timestampedWindowStore());
                if (store == null) {
                    return false;
                }
                try (KeyValueIterator all = store.all();){
                    LinkedList<Object> storeContent = new LinkedList<Object>();
                    while (all.hasNext()) {
                        storeContent.add(all.next());
                    }
                    boolean bl = storeContent.equals(expectedStoreContent);
                    return bl;
                }
            }
            catch (Exception swallow) {
                swallow.printStackTrace();
                System.err.println(swallow.getMessage());
                return false;
            }
        }, (long)60000L, (String)"Could not get expected result in time.");
    }

    private static class TimestampedWindowedProcessor
    implements Processor<Integer, Integer, Void, Void> {
        private TimestampedWindowStore<Integer, Long> store;

        private TimestampedWindowedProcessor() {
        }

        public void init(ProcessorContext<Void, Void> context) {
            this.store = (TimestampedWindowStore)context.getStateStore(StoreUpgradeIntegrationTest.STORE_NAME);
        }

        public void process(Record<Integer, Integer> record) {
            long newTimestamp;
            long newCount;
            ValueAndTimestamp oldCountWithTimestamp = (ValueAndTimestamp)this.store.fetch(record.key(), (Integer)record.key() < 10 ? 0L : 100000L);
            if (oldCountWithTimestamp == null) {
                newCount = 1L;
                newTimestamp = record.timestamp();
            } else {
                newCount = (Long)oldCountWithTimestamp.value() + 1L;
                newTimestamp = Math.max(oldCountWithTimestamp.timestamp(), record.timestamp());
            }
            this.store.put(record.key(), (Object)ValueAndTimestamp.make((Object)newCount, (long)newTimestamp), (Integer)record.key() < 10 ? 0L : 100000L);
        }

        public void close() {
        }
    }

    private static class WindowedProcessor
    implements Processor<Integer, Integer, Void, Void> {
        private WindowStore<Integer, Long> store;

        private WindowedProcessor() {
        }

        public void init(ProcessorContext<Void, Void> context) {
            this.store = (WindowStore)context.getStateStore(StoreUpgradeIntegrationTest.STORE_NAME);
        }

        public void process(Record<Integer, Integer> record) {
            Long oldCount = (Long)this.store.fetch(record.key(), (Integer)record.key() < 10 ? 0L : 100000L);
            long newCount = oldCount != null ? oldCount + 1L : 1L;
            this.store.put(record.key(), (Object)newCount, (Integer)record.key() < 10 ? 0L : 100000L);
        }

        public void close() {
        }
    }

    private static class TimestampedKeyValueProcessor
    implements Processor<Integer, Integer, Void, Void> {
        private TimestampedKeyValueStore<Integer, Long> store;

        private TimestampedKeyValueProcessor() {
        }

        public void init(ProcessorContext<Void, Void> context) {
            this.store = (TimestampedKeyValueStore)context.getStateStore(StoreUpgradeIntegrationTest.STORE_NAME);
        }

        public void process(Record<Integer, Integer> record) {
            long newTimestamp;
            long newCount;
            ValueAndTimestamp oldCountWithTimestamp = (ValueAndTimestamp)this.store.get(record.key());
            if (oldCountWithTimestamp == null) {
                newCount = 1L;
                newTimestamp = record.timestamp();
            } else {
                newCount = (Long)oldCountWithTimestamp.value() + 1L;
                newTimestamp = Math.max(oldCountWithTimestamp.timestamp(), record.timestamp());
            }
            this.store.put(record.key(), (Object)ValueAndTimestamp.make((Object)newCount, (long)newTimestamp));
        }

        public void close() {
        }
    }

    private static class KeyValueProcessor
    implements Processor<Integer, Integer, Void, Void> {
        private KeyValueStore<Integer, Long> store;

        private KeyValueProcessor() {
        }

        public void init(ProcessorContext<Void, Void> context) {
            this.store = (KeyValueStore)context.getStateStore(StoreUpgradeIntegrationTest.STORE_NAME);
        }

        public void process(Record<Integer, Integer> record) {
            Long oldCount = (Long)this.store.get(record.key());
            long newCount = oldCount != null ? oldCount + 1L : 1L;
            this.store.put(record.key(), (Object)newCount);
        }

        public void close() {
        }
    }
}

