/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class StandbyTaskEOSIntegrationTest {
    private static final long REBALANCE_TIMEOUT = Duration.ofMinutes(2L).toMillis();
    private static final int KEY_0 = 0;
    private static final int KEY_1 = 1;
    @Parameterized.Parameter
    public String eosConfig;
    private final AtomicBoolean skipRecord = new AtomicBoolean(false);
    private String appId;
    private String inputTopic;
    private String storeName;
    private String outputTopic;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);
    @Rule
    public TestName testName = new TestName();

    @Parameterized.Parameters(name="{0}")
    public static Collection<String[]> data() {
        return Arrays.asList({"exactly_once"}, {"exactly_once_beta"});
    }

    @Before
    public void createTopics() throws Exception {
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        this.appId = "app-" + safeTestName;
        this.inputTopic = "input-" + safeTestName;
        this.outputTopic = "output-" + safeTestName;
        this.storeName = "store-" + safeTestName;
        CLUSTER.deleteTopicsAndWait(this.inputTopic, this.outputTopic, this.appId + "-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog");
        CLUSTER.createTopic(this.inputTopic, 1, 3);
        CLUSTER.createTopic(this.outputTopic, 1, 3);
    }

    @Test
    public void surviveWithOneTaskAsStandby() throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, Collections.singletonList(new KeyValue((Object)0, (Object)0)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, (Properties)new Properties()), 10L);
        String stateDirPath = TestUtils.tempDirectory((String)this.appId).getPath();
        CountDownLatch instanceLatch = new CountDownLatch(1);
        try (KafkaStreams streamInstanceOne = this.buildStreamWithDirtyStateDir(stateDirPath + "/" + this.appId + "-1/", instanceLatch);
             KafkaStreams streamInstanceTwo = this.buildStreamWithDirtyStateDir(stateDirPath + "/" + this.appId + "-2/", instanceLatch);){
            streamInstanceOne.start();
            streamInstanceTwo.start();
            Assert.assertTrue((boolean)instanceLatch.await(15L, TimeUnit.SECONDS));
            TestUtils.waitForCondition(() -> streamInstanceOne.state().equals((Object)KafkaStreams.State.RUNNING), (String)"Stream instance one should be up and running by now");
            TestUtils.waitForCondition(() -> streamInstanceTwo.state().equals((Object)KafkaStreams.State.RUNNING), (String)"Stream instance two should be up and running by now");
            streamInstanceOne.close(Duration.ZERO);
            streamInstanceTwo.close(Duration.ZERO);
            streamInstanceOne.cleanUp();
            streamInstanceTwo.cleanUp();
        }
    }

    private KafkaStreams buildStreamWithDirtyStateDir(String stateDirPath, CountDownLatch recordProcessLatch) throws Exception {
        StreamsBuilder builder = new StreamsBuilder();
        TaskId taskId = new TaskId(0, 0);
        Properties props = this.props(stateDirPath);
        StateDirectory stateDirectory = new StateDirectory(new StreamsConfig((Map)props), (Time)new MockTime(), true);
        new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), ".checkpoint")).write(Collections.singletonMap(new TopicPartition("unknown-topic", 0), 5L));
        Assert.assertTrue((boolean)new File(stateDirectory.directoryForTask(taskId), "rocksdb/KSTREAM-AGGREGATE-STATE-STORE-0000000001").mkdirs());
        builder.stream(this.inputTopic, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer())).groupByKey().count().toStream().peek((key, value) -> recordProcessLatch.countDown());
        return new KafkaStreams(builder.build(), props);
    }

    @Test
    public void shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing() throws Exception {
        String base = TestUtils.tempDirectory((String)this.appId).getPath();
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, Collections.singletonList(new KeyValue((Object)0, (Object)0)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, (Properties)new Properties()), 10L);
        try (KafkaStreams streamInstanceOne = this.buildWithDeduplicationTopology(base + "-1");
             KafkaStreams streamInstanceTwo = this.buildWithDeduplicationTopology(base + "-2");
             KafkaStreams streamInstanceOneRecovery = this.buildWithDeduplicationTopology(base + "-1");){
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceOne), Duration.ofSeconds(30L));
            IntegrationTestUtils.waitUntilMinRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), IntegerDeserializer.class, IntegerDeserializer.class), this.outputTopic, 1);
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceTwo), Duration.ofSeconds(30L));
            TestUtils.waitForCondition(() -> ((ReadOnlyKeyValueStore)streamInstanceTwo.store(StoreQueryParameters.fromNameAndType((String)this.storeName, (QueryableStoreType)QueryableStoreTypes.keyValueStore()).enableStaleStores())).get((Object)0) != null, (long)REBALANCE_TIMEOUT, (String)"Could not get key from standby store");
            TestUtils.waitForCondition(() -> ((ReadOnlyKeyValueStore)streamInstanceOne.store(StoreQueryParameters.fromNameAndType((String)this.storeName, (QueryableStoreType)QueryableStoreTypes.keyValueStore()))).get((Object)0) != null, (String)"Could not get key from main store");
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, Collections.singletonList(new KeyValue((Object)1, (Object)0)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, (Properties)new Properties()), 10L);
            TestUtils.waitForCondition(() -> streamInstanceOne.state() == KafkaStreams.State.ERROR, (String)"Stream instance 1 did not go into error state");
            streamInstanceOne.close();
            IntegrationTestUtils.waitUntilMinRecordsReceived(TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), IntegerDeserializer.class, IntegerDeserializer.class), this.outputTopic, 2);
            IntegrationTestUtils.startApplicationAndWaitUntilRunning(Collections.singletonList(streamInstanceOneRecovery), Duration.ofSeconds(30L));
            TestUtils.waitForCondition(() -> ((ReadOnlyKeyValueStore)streamInstanceOneRecovery.store(StoreQueryParameters.fromNameAndType((String)this.storeName, (QueryableStoreType)QueryableStoreTypes.keyValueStore()).enableStaleStores())).get((Object)0) != null, (String)"Could not get key from recovered standby store");
            streamInstanceTwo.close();
            TestUtils.waitForCondition(() -> ((ReadOnlyKeyValueStore)streamInstanceOneRecovery.store(StoreQueryParameters.fromNameAndType((String)this.storeName, (QueryableStoreType)QueryableStoreTypes.keyValueStore()))).get((Object)0) != null, (long)REBALANCE_TIMEOUT, (String)"Could not get key from recovered main store");
            this.skipRecord.set(false);
            IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputTopic, Collections.singletonList(new KeyValue((Object)1, (Object)0)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class, (Properties)new Properties()), 10L);
            TestUtils.waitForCondition(() -> streamInstanceOneRecovery.state() == KafkaStreams.State.ERROR, (String)"Stream instance 1 did not go into error state");
        }
    }

    private KafkaStreams buildWithDeduplicationTopology(String stateDirPath) {
        StreamsBuilder builder = new StreamsBuilder();
        builder.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)this.storeName), (Serde)Serdes.Integer(), (Serde)Serdes.Integer()));
        builder.stream(this.inputTopic).transform(() -> new Transformer<Integer, Integer, KeyValue<Integer, Integer>>(){
            private KeyValueStore store;

            public void init(ProcessorContext context) {
                this.store = (KeyValueStore)context.getStateStore(StandbyTaskEOSIntegrationTest.this.storeName);
            }

            public KeyValue<Integer, Integer> transform(Integer key, Integer value) {
                if (StandbyTaskEOSIntegrationTest.this.skipRecord.get()) {
                    return KeyValue.pair((Object)key, (Object)value);
                }
                if (this.store.get((Object)key) != null) {
                    return null;
                }
                this.store.put((Object)key, (Object)value);
                this.store.flush();
                if (key == 1) {
                    StandbyTaskEOSIntegrationTest.this.skipRecord.set(true);
                    throw new RuntimeException("Injected test error");
                }
                return KeyValue.pair((Object)key, (Object)value);
            }

            public void close() {
            }
        }, new String[]{this.storeName}).to(this.outputTopic);
        return new KafkaStreams(builder.build(), this.props(stateDirPath));
    }

    private Properties props(String stateDirPath) {
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put("application.id", this.appId);
        streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        streamsConfiguration.put("cache.max.bytes.buffering", (Object)0);
        streamsConfiguration.put("state.dir", stateDirPath);
        streamsConfiguration.put("num.standby.replicas", (Object)1);
        streamsConfiguration.put("processing.guarantee", this.eosConfig);
        streamsConfiguration.put("default.key.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("default.value.serde", Serdes.Integer().getClass());
        streamsConfiguration.put("commit.interval.ms", (Object)1000);
        streamsConfiguration.put("acceptable.recovery.lag", (Object)0);
        streamsConfiguration.put("auto.offset.reset", "earliest");
        return streamsConfiguration;
    }
}

