/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.StateQueryRequest;
import org.apache.kafka.streams.query.StateQueryResult;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
import org.apache.kafka.streams.state.VersionedKeyValueStore;
import org.apache.kafka.streams.state.VersionedRecord;
import org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category(value={IntegrationTest.class})
public class VersionedKeyValueStoreIntegrationTest {
    private static final String STORE_NAME = "versioned-store";
    private static final long HISTORY_RETENTION = 3600000L;
    private String inputStream;
    private String globalTableTopic;
    private String outputStream;
    private long baseTimestamp;
    private KafkaStreams kafkaStreams;
    private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    @Rule
    public TestName testName = new TestName();

    @BeforeClass
    public static void before() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void after() {
        CLUSTER.stop();
    }

    @Before
    public void beforeTest() throws InterruptedException {
        String uniqueTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        this.inputStream = "input-stream-" + uniqueTestName;
        this.globalTableTopic = "global-table-" + uniqueTestName;
        this.outputStream = "output-stream-" + uniqueTestName;
        CLUSTER.createTopic(this.inputStream);
        CLUSTER.createTopic(this.outputStream);
        this.baseTimestamp = VersionedKeyValueStoreIntegrationTest.CLUSTER.time.milliseconds();
    }

    @After
    public void afterTest() {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close(Duration.ofSeconds(30L));
            this.kafkaStreams.cleanUp();
        }
    }

    @Test
    public void shouldPutGetAndDelete() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.versionedKeyValueStoreBuilder((VersionedBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)STORE_NAME, (Duration)Duration.ofMillis(3600000L)), (Serde)Serdes.Integer(), (Serde)Serdes.String())).stream(this.inputStream, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).process(() -> new VersionedStoreContentCheckerProcessor(true), new String[]{STORE_NAME}).to(this.outputStream, Produced.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()));
        Properties props = this.props();
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
        this.kafkaStreams.start();
        int numRecordsProduced = 0;
        numRecordsProduced += this.produceDataToTopic(this.inputStream, this.baseTimestamp, KeyValue.pair((Object)1, (Object)"a0"), KeyValue.pair((Object)2, (Object)"b0"), KeyValue.pair((Object)3, null));
        numRecordsProduced += this.produceDataToTopic(this.inputStream, this.baseTimestamp + 5L, KeyValue.pair((Object)1, (Object)"a5"), KeyValue.pair((Object)2, null), KeyValue.pair((Object)3, (Object)"c5"));
        numRecordsProduced += this.produceDataToTopic(this.inputStream, this.baseTimestamp + 2L, KeyValue.pair((Object)1, (Object)"a2"), KeyValue.pair((Object)2, (Object)"b2"), KeyValue.pair((Object)3, null));
        numRecordsProduced += this.produceDataToTopic(this.inputStream, this.baseTimestamp + 5L, KeyValue.pair((Object)1, (Object)"a5_new"), KeyValue.pair((Object)2, (Object)"b5"), KeyValue.pair((Object)3, null));
        List receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), IntegerDeserializer.class, IntegerDeserializer.class), this.outputStream, numRecordsProduced += this.produceDataToTopic(this.inputStream, this.baseTimestamp + 7L, KeyValue.pair((Object)1, (Object)"delete"), KeyValue.pair((Object)2, (Object)"delete"), KeyValue.pair((Object)3, (Object)"delete")));
        for (KeyValue receivedRecord : receivedRecords) {
            MatcherAssert.assertThat((Object)receivedRecord.value, (Matcher)CoreMatchers.equalTo((Object)0));
        }
    }

    @Test
    public void shouldSetChangelogTopicProperties() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.versionedKeyValueStoreBuilder((VersionedBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)STORE_NAME, (Duration)Duration.ofMillis(3600000L)), (Serde)Serdes.Integer(), (Serde)Serdes.String())).stream(this.inputStream, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).process(() -> new VersionedStoreContentCheckerProcessor(false), new String[]{STORE_NAME}).to(this.outputStream, Produced.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()));
        Properties props = this.props();
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
        this.kafkaStreams.start();
        this.produceDataToTopic(this.inputStream, this.baseTimestamp, KeyValue.pair((Object)0, (Object)"foo"));
        IntegrationTestUtils.waitUntilMinRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), IntegerDeserializer.class, IntegerDeserializer.class), this.outputStream, 1);
        String changelogTopic = "app-VersionedKeyValueStoreIntegrationTestshouldSetChangelogTopicProperties-versioned-store-changelog";
        Properties changelogTopicConfig = CLUSTER.getLogConfig("app-VersionedKeyValueStoreIntegrationTestshouldSetChangelogTopicProperties-versioned-store-changelog");
        MatcherAssert.assertThat((Object)changelogTopicConfig.getProperty("cleanup.policy"), (Matcher)CoreMatchers.equalTo((Object)"compact"));
        MatcherAssert.assertThat((Object)changelogTopicConfig.getProperty("min.compaction.lag.ms"), (Matcher)CoreMatchers.equalTo((Object)Long.toString(90000000L)));
    }

    @Test
    public void shouldRestore() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.versionedKeyValueStoreBuilder((VersionedBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)STORE_NAME, (Duration)Duration.ofMillis(3600000L)), (Serde)Serdes.Integer(), (Serde)Serdes.String())).stream(this.inputStream, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).process(() -> new VersionedStoreContentCheckerProcessor(true), new String[]{STORE_NAME}).to(this.outputStream, Produced.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()));
        Properties props = this.props();
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
        this.kafkaStreams.start();
        DataTracker data = new DataTracker();
        int initialRecordsProduced = 0;
        initialRecordsProduced += this.produceDataToTopic(this.inputStream, data, this.baseTimestamp, KeyValue.pair((Object)1, (Object)"a0"), KeyValue.pair((Object)2, (Object)"b0"), KeyValue.pair((Object)3, null));
        initialRecordsProduced += this.produceDataToTopic(this.inputStream, data, this.baseTimestamp + 5L, KeyValue.pair((Object)1, (Object)"a5"), KeyValue.pair((Object)2, null), KeyValue.pair((Object)3, (Object)"c5"));
        initialRecordsProduced += this.produceDataToTopic(this.inputStream, data, this.baseTimestamp + 2L, KeyValue.pair((Object)1, (Object)"a2"), KeyValue.pair((Object)2, (Object)"b2"), KeyValue.pair((Object)3, null));
        initialRecordsProduced += this.produceDataToTopic(this.inputStream, data, this.baseTimestamp + 5L, KeyValue.pair((Object)1, (Object)"a5_new"), KeyValue.pair((Object)2, (Object)"b5"), KeyValue.pair((Object)3, null));
        initialRecordsProduced += this.produceDataToTopic(this.inputStream, data, this.baseTimestamp + 7L, KeyValue.pair((Object)1, (Object)"delete"), KeyValue.pair((Object)2, (Object)"delete"), KeyValue.pair((Object)3, (Object)"delete"));
        IntegrationTestUtils.waitUntilMinRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), IntegerDeserializer.class, IntegerDeserializer.class), this.outputStream, initialRecordsProduced += this.produceDataToTopic(this.inputStream, data, this.baseTimestamp + 10L, KeyValue.pair((Object)1, (Object)"a10"), KeyValue.pair((Object)2, (Object)"b10"), KeyValue.pair((Object)3, (Object)"c10")));
        this.kafkaStreams.close();
        this.kafkaStreams.cleanUp();
        streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.versionedKeyValueStoreBuilder((VersionedBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)STORE_NAME, (Duration)Duration.ofMillis(3600000L)), (Serde)Serdes.Integer(), (Serde)Serdes.String())).stream(this.inputStream, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).process(() -> new VersionedStoreContentCheckerProcessor(true, data), new String[]{STORE_NAME}).to(this.outputStream, Produced.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()));
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
        this.kafkaStreams.start();
        int additionalRecordsProduced = this.produceDataToTopic(this.inputStream, this.baseTimestamp + 12L, KeyValue.pair((Object)1, (Object)"a12"), KeyValue.pair((Object)2, (Object)"b12"), KeyValue.pair((Object)3, (Object)"c12"));
        List receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), IntegerDeserializer.class, IntegerDeserializer.class), this.outputStream, initialRecordsProduced + additionalRecordsProduced);
        for (KeyValue receivedRecord : receivedRecords) {
            MatcherAssert.assertThat((Object)receivedRecord.value, (Matcher)CoreMatchers.equalTo((Object)0));
        }
    }

    @Test
    public void shouldAllowCustomIQv2ForCustomStoreImplementations() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.versionedKeyValueStoreBuilder((VersionedBytesStoreSupplier)new CustomIQv2VersionedStoreSupplier(), (Serde)Serdes.Integer(), (Serde)Serdes.String())).stream(this.inputStream, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).process(() -> new VersionedStoreContentCheckerProcessor(false), new String[]{STORE_NAME});
        Properties props = this.props();
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
        this.kafkaStreams.start();
        StateQueryRequest request = StateQueryRequest.inStore((String)STORE_NAME).withQuery((Query)new TestQuery()).withPartitions(Collections.singleton(0));
        StateQueryResult result = IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, request);
        MatcherAssert.assertThat((Object)result.getOnlyPartitionResult().getResult(), (Matcher)CoreMatchers.equalTo((Object)"success"));
    }

    @Test
    public void shouldManualUpgradeFromNonVersionedTimestampedToVersioned() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.timestampedKeyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentTimestampedKeyValueStore((String)STORE_NAME), (Serde)Serdes.Integer(), (Serde)Serdes.String())).stream(this.inputStream, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).process(TimestampedStoreContentCheckerProcessor::new, new String[]{STORE_NAME}).to(this.outputStream, Produced.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()));
        this.shouldManualUpgradeFromNonVersionedToVersioned(streamsBuilder.build());
    }

    @Test
    public void shouldManualUpgradeFromNonVersionedNonTimestampedToVersioned() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)STORE_NAME), (Serde)Serdes.Integer(), (Serde)Serdes.String())).stream(this.inputStream, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).process(KeyValueStoreContentCheckerProcessor::new, new String[]{STORE_NAME}).to(this.outputStream, Produced.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()));
        this.shouldManualUpgradeFromNonVersionedToVersioned(streamsBuilder.build());
    }

    private void shouldManualUpgradeFromNonVersionedToVersioned(Topology originalTopology) throws Exception {
        Properties props = this.props();
        props.put("min.compaction.lag.ms", (Object)60000L);
        this.kafkaStreams = new KafkaStreams(originalTopology, props);
        this.kafkaStreams.start();
        DataTracker data = new DataTracker();
        int initialRecordsProduced = 0;
        initialRecordsProduced += this.produceDataToTopic(this.inputStream, data, this.baseTimestamp, KeyValue.pair((Object)1, (Object)"a0"), KeyValue.pair((Object)2, (Object)"b0"), KeyValue.pair((Object)3, null));
        initialRecordsProduced += this.produceDataToTopic(this.inputStream, data, this.baseTimestamp + 5L, KeyValue.pair((Object)1, (Object)"a5"), KeyValue.pair((Object)2, null), KeyValue.pair((Object)3, (Object)"c5"));
        List receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), IntegerDeserializer.class, IntegerDeserializer.class), this.outputStream, initialRecordsProduced += this.produceDataToTopic(this.inputStream, data, this.baseTimestamp + 2L, KeyValue.pair((Object)1, (Object)"a2"), KeyValue.pair((Object)2, (Object)"b2"), KeyValue.pair((Object)3, null)));
        for (KeyValue receivedRecord : receivedRecords) {
            MatcherAssert.assertThat((Object)receivedRecord.value, (Matcher)CoreMatchers.equalTo((Object)0));
        }
        this.kafkaStreams.close();
        this.kafkaStreams.cleanUp();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.versionedKeyValueStoreBuilder((VersionedBytesStoreSupplier)Stores.persistentVersionedKeyValueStore((String)STORE_NAME, (Duration)Duration.ofMillis(3600000L)), (Serde)Serdes.Integer(), (Serde)Serdes.String())).stream(this.inputStream, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).process(() -> new VersionedStoreContentCheckerProcessor(true, data), new String[]{STORE_NAME}).to(this.outputStream, Produced.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()));
        props = this.props();
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
        this.kafkaStreams.start();
        int additionalRecordsProduced = this.produceDataToTopic(this.inputStream, this.baseTimestamp + 12L, KeyValue.pair((Object)1, (Object)"a12"), KeyValue.pair((Object)2, (Object)"b12"), KeyValue.pair((Object)3, (Object)"c12"));
        receivedRecords = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), IntegerDeserializer.class, IntegerDeserializer.class), this.outputStream, initialRecordsProduced + additionalRecordsProduced);
        for (KeyValue receivedRecord : receivedRecords) {
            MatcherAssert.assertThat((Object)receivedRecord.value, (Matcher)CoreMatchers.equalTo((Object)0));
        }
    }

    private Properties props() {
        Properties streamsConfiguration = new Properties();
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        streamsConfiguration.put("application.id", "app-" + safeTestName);
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        streamsConfiguration.put("commit.interval.ms", (Object)1000L);
        streamsConfiguration.put("auto.offset.reset", "earliest");
        return streamsConfiguration;
    }

    @SafeVarargs
    private final int produceDataToTopic(String topic, long timestamp, KeyValue<Integer, String> ... keyValues) {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(topic, Arrays.asList(keyValues), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class), timestamp);
        return keyValues.length;
    }

    @SafeVarargs
    private final int produceDataToTopic(String topic, DataTracker dataTracker, long timestamp, KeyValue<Integer, String> ... keyValues) {
        this.produceDataToTopic(topic, timestamp, keyValues);
        for (KeyValue<Integer, String> keyValue : keyValues) {
            dataTracker.add((Integer)keyValue.key, timestamp, (String)keyValue.value);
        }
        return keyValues.length;
    }

    private static class CustomIQv2VersionedStoreSupplier
    implements VersionedBytesStoreSupplier {
        private CustomIQv2VersionedStoreSupplier() {
        }

        public String name() {
            return VersionedKeyValueStoreIntegrationTest.STORE_NAME;
        }

        public KeyValueStore<Bytes, byte[]> get() {
            return new VersionedKeyValueToBytesStoreAdapter((VersionedKeyValueStore)new CustomIQv2VersionedStore());
        }

        public String metricsScope() {
            return "metrics-scope";
        }

        public long historyRetentionMs() {
            return 3600000L;
        }

        private static class CustomIQv2VersionedStore
        implements VersionedKeyValueStore<Bytes, byte[]> {
            private CustomIQv2VersionedStore() {
            }

            public <R> QueryResult<R> query(Query<R> query, PositionBound positionBound, QueryConfig config) {
                if (query instanceof TestQuery) {
                    return QueryResult.forResult((Object)"success");
                }
                throw new UnsupportedOperationException();
            }

            public long put(Bytes key, byte[] value, long timestamp) {
                throw new UnsupportedOperationException();
            }

            public VersionedRecord<byte[]> delete(Bytes key, long timestamp) {
                throw new UnsupportedOperationException();
            }

            public VersionedRecord<byte[]> get(Bytes key) {
                throw new UnsupportedOperationException();
            }

            public VersionedRecord<byte[]> get(Bytes key, long asOfTimestamp) {
                throw new UnsupportedOperationException();
            }

            public String name() {
                return VersionedKeyValueStoreIntegrationTest.STORE_NAME;
            }

            @Deprecated
            public void init(org.apache.kafka.streams.processor.ProcessorContext context, StateStore root) {
                throw new UnsupportedOperationException();
            }

            public void init(StateStoreContext context, StateStore root) {
                context.register(root, (key, value) -> {});
            }

            public void flush() {
            }

            public void close() {
            }

            public boolean persistent() {
                return false;
            }

            public boolean isOpen() {
                return true;
            }
        }
    }

    private static class TestQuery
    implements Query<String> {
        private TestQuery() {
        }
    }

    private static class DataTracker {
        static final String DELETE_VALUE_KEYWORD = "delete";
        final Map<Integer, Map<Long, Optional<String>>> data = new HashMap<Integer, Map<Long, Optional<String>>>();

        private DataTracker() {
        }

        void add(Integer key, long timestamp, String value) {
            this.data.computeIfAbsent(key, k -> new HashMap());
            if (DELETE_VALUE_KEYWORD.equals(value)) {
                this.data.get(key).put(timestamp, Optional.empty());
            } else {
                this.data.get(key).put(timestamp, Optional.ofNullable(value));
            }
        }
    }

    private static class KeyValueStoreContentCheckerProcessor
    implements Processor<Integer, String, Integer, Integer> {
        private ProcessorContext<Integer, Integer> context;
        private KeyValueStore<Integer, String> store;
        private final Map<Integer, Optional<String>> data = new HashMap<Integer, Optional<String>>();

        KeyValueStoreContentCheckerProcessor() {
        }

        public void init(ProcessorContext<Integer, Integer> context) {
            this.context = context;
            this.store = (KeyValueStore)context.getStateStore(VersionedKeyValueStoreIntegrationTest.STORE_NAME);
        }

        public void process(Record<Integer, String> record) {
            if ("delete".equals(record.value())) {
                throw new IllegalArgumentException("Using 'delete' keyword for KeyValueStoreContentCheckerProcessor will result in the record timestamp being ignored. Use regular put with null value instead.");
            }
            this.store.put(record.key(), record.value());
            this.data.put((Integer)record.key(), Optional.ofNullable(record.value()));
            int failedChecks = this.checkStoreContents();
            this.context.forward(record.withValue((Object)failedChecks));
        }

        private int checkStoreContents() {
            int failedChecks = 0;
            for (Map.Entry<Integer, Optional<String>> keyValue : this.data.entrySet()) {
                Integer expectedKey = keyValue.getKey();
                String expectedValue = keyValue.getValue().orElse(null);
                String foundValue = (String)this.store.get((Object)expectedKey);
                if (Objects.equals(foundValue, expectedValue)) continue;
                ++failedChecks;
            }
            return failedChecks;
        }
    }

    private static class TimestampedStoreContentCheckerProcessor
    implements Processor<Integer, String, Integer, Integer> {
        private ProcessorContext<Integer, Integer> context;
        private TimestampedKeyValueStore<Integer, String> store;
        private final Map<Integer, Optional<ValueAndTimestamp<String>>> data = new HashMap<Integer, Optional<ValueAndTimestamp<String>>>();

        TimestampedStoreContentCheckerProcessor() {
        }

        public void init(ProcessorContext<Integer, Integer> context) {
            this.context = context;
            this.store = (TimestampedKeyValueStore)context.getStateStore(VersionedKeyValueStoreIntegrationTest.STORE_NAME);
        }

        public void process(Record<Integer, String> record) {
            if ("delete".equals(record.value())) {
                throw new IllegalArgumentException("Using 'delete' keyword for TimestampedStoreContentCheckerProcessor will result in the record timestamp being ignored. Use regular put with null value instead.");
            }
            ValueAndTimestamp valueAndTimestamp = ValueAndTimestamp.make((Object)record.value(), (long)record.timestamp());
            this.store.put(record.key(), (Object)valueAndTimestamp);
            this.data.put((Integer)record.key(), Optional.ofNullable(valueAndTimestamp));
            int failedChecks = this.checkStoreContents();
            this.context.forward(record.withValue((Object)failedChecks));
        }

        private int checkStoreContents() {
            int failedChecks = 0;
            for (Map.Entry<Integer, Optional<ValueAndTimestamp<String>>> keyWithValueAndTimestamp : this.data.entrySet()) {
                Integer key = keyWithValueAndTimestamp.getKey();
                ValueAndTimestamp valueAndTimestamp = keyWithValueAndTimestamp.getValue().orElse(null);
                ValueAndTimestamp record = (ValueAndTimestamp)this.store.get((Object)key);
                if (Objects.equals(record, valueAndTimestamp)) continue;
                ++failedChecks;
            }
            return failedChecks;
        }
    }

    private static class VersionedStoreContentCheckerProcessor
    implements Processor<Integer, String, Integer, Integer> {
        private ProcessorContext<Integer, Integer> context;
        private VersionedKeyValueStore<Integer, String> store;
        private final boolean writeToStore;
        private final DataTracker data;

        VersionedStoreContentCheckerProcessor(boolean writeToStore) {
            this(writeToStore, new DataTracker());
        }

        VersionedStoreContentCheckerProcessor(boolean writeToStore, DataTracker initialData) {
            this.writeToStore = writeToStore;
            this.data = initialData;
        }

        public void init(ProcessorContext<Integer, Integer> context) {
            this.context = context;
            this.store = (VersionedKeyValueStore)context.getStateStore(VersionedKeyValueStoreIntegrationTest.STORE_NAME);
        }

        public void process(Record<Integer, String> record) {
            if (this.writeToStore) {
                if ("delete".equals(record.value())) {
                    this.store.delete(record.key(), record.timestamp());
                } else {
                    this.store.put(record.key(), record.value(), record.timestamp());
                }
                this.data.add((Integer)record.key(), record.timestamp(), (String)record.value());
            }
            int failedChecks = this.checkStoreContents();
            this.context.forward(record.withValue((Object)failedChecks));
        }

        private int checkStoreContents() {
            int failedChecks = 0;
            for (Map.Entry<Integer, Map<Long, Optional<String>>> keyWithTimestampsAndValues : this.data.data.entrySet()) {
                Integer key = keyWithTimestampsAndValues.getKey();
                Map<Long, Optional<String>> timestampsAndValues = keyWithTimestampsAndValues.getValue();
                long maxExpectedTimestamp = -1L;
                String expectedValueForMaxTimestamp = null;
                for (Map.Entry<Long, Optional<String>> timestampAndValue : timestampsAndValues.entrySet()) {
                    VersionedRecord versionedRecord;
                    Long expectedTimestamp = timestampAndValue.getKey();
                    String expectedValue = timestampAndValue.getValue().orElse(null);
                    if (expectedTimestamp > maxExpectedTimestamp) {
                        maxExpectedTimestamp = expectedTimestamp;
                        expectedValueForMaxTimestamp = expectedValue;
                    }
                    if (VersionedStoreContentCheckerProcessor.contentsMatch((VersionedRecord<String>)(versionedRecord = this.store.get((Object)key, expectedTimestamp.longValue())), expectedValue, expectedTimestamp)) continue;
                    ++failedChecks;
                }
                VersionedRecord versionedRecord = this.store.get((Object)key);
                if (VersionedStoreContentCheckerProcessor.contentsMatch((VersionedRecord<String>)versionedRecord, expectedValueForMaxTimestamp, maxExpectedTimestamp)) continue;
                ++failedChecks;
            }
            return failedChecks;
        }

        private static boolean contentsMatch(VersionedRecord<String> versionedRecord, String expectedValue, long expectedTimestamp) {
            if (expectedValue == null) {
                return versionedRecord == null;
            }
            if (versionedRecord == null) {
                return false;
            }
            return expectedValue.equals(versionedRecord.value()) && expectedTimestamp == versionedRecord.timestamp();
        }
    }
}

