/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;

public class StandbyTaskEOSIntegrationTest {
    private static final long REBALANCE_TIMEOUT = Duration.ofMinutes(2L).toMillis();
    private static final int KEY_0 = 0;
    private static final int KEY_1 = 1;
    private final AtomicBoolean skipRecord = new AtomicBoolean(false);
    private String appId;
    private String inputTopic;
    private String storeName;
    private String outputTopic;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);

    @Before
    public void createTopics() throws Exception {
        this.appId = "standbyTest";
        this.inputTopic = "testInputTopic";
        this.outputTopic = "testOutputTopic";
        this.storeName = "dedupStore";
        CLUSTER.deleteTopicsAndWait(this.inputTopic, this.outputTopic);
        CLUSTER.createTopic(this.inputTopic, 1, 3);
        CLUSTER.createTopic(this.outputTopic, 1, 3);
    }

    @Test
    public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception {
        String base = TestUtils.tempDirectory((String)this.appId).getPath();
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, Collections.singletonList(new KeyValue((Object)0, (Object)0)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, (Properties)new Properties()), 10L);
        try (KafkaStreams streamInstanceOne = this.buildWithDeduplicationTopology(base + "-1");
             KafkaStreams streamInstanceTwo = this.buildWithDeduplicationTopology(base + "-2");
             KafkaStreams streamInstanceOneRecovery = this.buildWithDeduplicationTopology(base + "-1");){
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceOne), Duration.ofSeconds(30L));
            IntegrationTestUtils.waitUntilMinRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), IntegerDeserializer.class, IntegerDeserializer.class), this.outputTopic, 1);
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceTwo), Duration.ofSeconds(30L));
            TestUtils.waitForCondition(() -> ((ReadOnlyKeyValueStore)streamInstanceTwo.store(StoreQueryParameters.fromNameAndType((String)this.storeName, (QueryableStoreType)QueryableStoreTypes.keyValueStore()).enableStaleStores())).get((Object)0) != null, (long)REBALANCE_TIMEOUT, (String)"Could not get key from standby store");
            TestUtils.waitForCondition(() -> ((ReadOnlyKeyValueStore)streamInstanceOne.store(StoreQueryParameters.fromNameAndType((String)this.storeName, (QueryableStoreType)QueryableStoreTypes.keyValueStore()))).get((Object)0) != null, (String)"Could not get key from main store");
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, Collections.singletonList(new KeyValue((Object)1, (Object)0)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, (Properties)new Properties()), 10L);
            TestUtils.waitForCondition(() -> streamInstanceOne.state() == KafkaStreams.State.ERROR, (String)"Stream instance 1 did not go into error state");
            streamInstanceOne.close();
            IntegrationTestUtils.waitUntilMinRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), IntegerDeserializer.class, IntegerDeserializer.class), this.outputTopic, 2);
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceOneRecovery), Duration.ofSeconds(30L));
            TestUtils.waitForCondition(() -> ((ReadOnlyKeyValueStore)streamInstanceOneRecovery.store(StoreQueryParameters.fromNameAndType((String)this.storeName, (QueryableStoreType)QueryableStoreTypes.keyValueStore()).enableStaleStores())).get((Object)0) != null, (String)"Could not get key from recovered standby store");
            streamInstanceTwo.close();
            TestUtils.waitForCondition(() -> ((ReadOnlyKeyValueStore)streamInstanceOneRecovery.store(StoreQueryParameters.fromNameAndType((String)this.storeName, (QueryableStoreType)QueryableStoreTypes.keyValueStore()))).get((Object)0) != null, (long)REBALANCE_TIMEOUT, (String)"Could not get key from recovered main store");
            this.skipRecord.set(false);
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, Collections.singletonList(new KeyValue((Object)1, (Object)0)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, (Properties)new Properties()), 10L);
            TestUtils.waitForCondition(() -> streamInstanceOneRecovery.state() == KafkaStreams.State.ERROR, (String)"Stream instance 1 did not go into error state");
        }
    }

    private KafkaStreams buildWithDeduplicationTopology(String stateDirPath) {
        StreamsBuilder builder = new StreamsBuilder();
        builder.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)this.storeName), (Serde)Serdes.Integer(), (Serde)Serdes.Integer()));
        builder.stream(this.inputTopic).transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>(){
            private KeyValueStore store;

            public void init(ProcessorContext context) {
                this.store = (KeyValueStore)context.getStateStore(StandbyTaskEOSIntegrationTest.this.storeName);
            }

            public KeyValue<Integer, Integer> transform(Integer key, Integer value) {
                if (StandbyTaskEOSIntegrationTest.this.skipRecord.get()) {
                    return KeyValue.pair((Object)key, (Object)value);
                }
                if (this.store.get((Object)key) != null) {
                    return null;
                }
                this.store.put((Object)key, (Object)value);
                this.store.flush();
                if (key == 1) {
                    StandbyTaskEOSIntegrationTest.this.skipRecord.set(true);
                    throw new RuntimeException("Injected test error");
                }
                return KeyValue.pair((Object)key, (Object)value);
            }

            public void close() {
            }
        }, new String[]{this.storeName}).to(this.outputTopic);
        return new KafkaStreams(builder.build(), this.props(stateDirPath));
    }

    private Properties props(String stateDirPath) {
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("application.id", this.appId);
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("cache.max.bytes.buffering", (Object)0);
        streamsConfiguration.put("state.dir", stateDirPath);
        streamsConfiguration.put("num.standby.replicas", (Object)1);
        streamsConfiguration.put("processing.guarantee", "exactly_once");
        streamsConfiguration.put("default.key.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("default.value.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("commit.interval.ms", (Object)1000);
        streamsConfiguration.put("auto.offset.reset", "earliest");
        return streamsConfiguration;
    }
}

