/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;

@Timeout(value=600L)
@Tag(value="integration")
public class GlobalThreadShutDownOrderTest {
    private static final int NUM_BROKERS = 1;
    private static final Properties BROKER_CONFIG = new Properties();
    private final AtomicInteger closeCounter = new AtomicInteger(0);
    public static final EmbeddedKafkaCluster CLUSTER;
    private final MockTime mockTime;
    private final String globalStore = "globalStore";
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private String globalStoreTopic;
    private String streamTopic;
    private final List<Long> retrievedValuesList;
    private boolean firstRecordProcessed;

    public GlobalThreadShutDownOrderTest() {
        this.mockTime = GlobalThreadShutDownOrderTest.CLUSTER.time;
        this.globalStore = "globalStore";
        this.retrievedValuesList = new ArrayList<Long>();
    }

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @BeforeEach
    public void before(TestInfo testInfo) throws Exception {
        this.builder = new StreamsBuilder();
        this.createTopics();
        this.streamsConfiguration = new Properties();
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(testInfo);
        this.streamsConfiguration.put("application.id", "app-" + safeTestName);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("statestore.cache.max.bytes", (Object)0);
        this.streamsConfiguration.put("commit.interval.ms", (Object)100L);
        Consumed stringLongConsumed = Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Long());
        KeyValueStoreBuilder storeBuilder = new KeyValueStoreBuilder(Stores.persistentKeyValueStore((String)"globalStore"), Serdes.String(), Serdes.Long(), (Time)this.mockTime);
        this.builder.addGlobalStore((StoreBuilder)storeBuilder, this.globalStoreTopic, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.Long()), new MockApiProcessorSupplier());
        this.builder.stream(this.streamTopic, stringLongConsumed).process(() -> new GlobalStoreProcessor("globalStore"), new String[0]);
    }

    @AfterEach
    public void after() throws Exception {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldFinishGlobalStoreOperationOnShutDown() throws Exception {
        this.kafkaStreams = new KafkaStreams(this.builder.build(), this.streamsConfiguration);
        this.populateTopics(this.globalStoreTopic);
        this.populateTopics(this.streamTopic);
        this.kafkaStreams.start();
        TestUtils.waitForCondition(() -> this.firstRecordProcessed, (long)30000L, (String)"Has not processed record within 30 seconds");
        this.kafkaStreams.close(Duration.ofSeconds(30L));
        List<Long> expectedRetrievedValues = Arrays.asList(1L, 2L, 3L, 4L);
        Assertions.assertEquals(expectedRetrievedValues, this.retrievedValuesList);
        Assertions.assertEquals((int)1, (int)this.closeCounter.get());
    }

    private void createTopics() throws Exception {
        this.streamTopic = "stream-topic";
        this.globalStoreTopic = "global-store-topic";
        CLUSTER.createTopics(this.streamTopic);
        CLUSTER.createTopic(this.globalStoreTopic);
    }

    private void populateTopics(String topicName) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronously(topicName, Arrays.asList(new KeyValue((Object)"A", (Object)1L), new KeyValue((Object)"B", (Object)2L), new KeyValue((Object)"C", (Object)3L), new KeyValue((Object)"D", (Object)4L)), TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, LongSerializer.class, (Properties)new Properties()), (Time)this.mockTime);
    }

    static {
        BROKER_CONFIG.put("transaction.state.log.replication.factor", (Object)1);
        BROKER_CONFIG.put("transaction.state.log.min.isr", (Object)1);
        CLUSTER = new EmbeddedKafkaCluster(1, BROKER_CONFIG);
    }

    private class GlobalStoreProcessor
    implements Processor<String, Long, Void, Void> {
        private KeyValueStore<String, Long> store;
        private final String storeName;

        GlobalStoreProcessor(String storeName) {
            this.storeName = storeName;
        }

        public void init(ProcessorContext<Void, Void> context) {
            this.store = (KeyValueStore)context.getStateStore(this.storeName);
        }

        public void process(Record<String, Long> record) {
            GlobalThreadShutDownOrderTest.this.firstRecordProcessed = true;
        }

        public void close() {
            GlobalThreadShutDownOrderTest.this.closeCounter.getAndIncrement();
            List<String> keys = Arrays.asList("A", "B", "C", "D");
            for (String key : keys) {
                Utils.sleep((long)1000L);
                GlobalThreadShutDownOrderTest.this.retrievedValuesList.add(this.store.get((Object)key));
            }
        }
    }
}

