/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={IntegrationTest.class})
public class StateRestorationIntegrationTest {
    private final StreamsBuilder builder = new StreamsBuilder();
    private static final String APPLICATION_ID = "restoration-test-app";
    private static final String STATE_STORE_NAME = "stateStore";
    private static final String INPUT_TOPIC = "input";
    private static final String OUTPUT_TOPIC = "output";
    private Properties streamsConfiguration;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final MockTime mockTime;

    public StateRestorationIntegrationTest() {
        this.mockTime = StateRestorationIntegrationTest.CLUSTER.time;
    }

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void setUp() throws Exception {
        Properties props = new Properties();
        this.streamsConfiguration = StreamsTestUtils.getStreamsConfig(APPLICATION_ID, CLUSTER.bootstrapServers(), Serdes.Integer().getClass().getName(), Serdes.ByteArray().getClass().getName(), props);
        CLUSTER.createTopics(INPUT_TOPIC);
        CLUSTER.createTopics(OUTPUT_TOPIC);
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldRestoreNullRecord() throws Exception {
        this.builder.table(INPUT_TOPIC, Materialized.as((KeyValueBytesStoreSupplier)Stores.persistentTimestampedKeyValueStore((String)STATE_STORE_NAME)).withKeySerde(Serdes.Integer()).withValueSerde(Serdes.Bytes()).withCachingDisabled()).toStream().to(OUTPUT_TOPIC);
        Properties producerConfig = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, BytesSerializer.class);
        List initialKeyValues = Arrays.asList(KeyValue.pair((Object)3, (Object)new Bytes(new byte[]{3})), KeyValue.pair((Object)3, null), KeyValue.pair((Object)1, (Object)new Bytes(new byte[]{1})));
        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_TOPIC, initialKeyValues, producerConfig, (Time)this.mockTime);
        KafkaStreams streams = new KafkaStreams(this.builder.build(this.streamsConfiguration), this.streamsConfiguration);
        streams.start();
        Properties consumerConfig = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), IntegerDeserializer.class, BytesDeserializer.class);
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig, OUTPUT_TOPIC, initialKeyValues);
        streams.close();
        streams.cleanUp();
        List newKeyValues = Collections.singletonList(KeyValue.pair((Object)2, (Object)new Bytes(new byte[3])));
        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_TOPIC, newKeyValues, producerConfig, (Time)this.mockTime);
        streams = new KafkaStreams(this.builder.build(this.streamsConfiguration), this.streamsConfiguration);
        streams.start();
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(consumerConfig, OUTPUT_TOPIC, newKeyValues);
        streams.close();
    }
}

