/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.tests.SmokeTestClient;
import org.apache.kafka.streams.tests.SmokeTestDriver;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;

@Timeout(value=600L)
@Tag(value="integration")
public class SmokeTestDriverIntegrationTest {
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3);

    @BeforeAll
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterAll
    public static void closeCluster() {
        CLUSTER.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ParameterizedTest
    @CsvSource(value={"false, false", "true, false", "true, true"})
    public void shouldWorkWithRebalance(boolean stateUpdaterEnabled, boolean processingThreadsEnabled) throws InterruptedException {
        Exit.setExitProcedure((statusCode, message) -> {
            throw new AssertionError((Object)("Test called exit(). code:" + statusCode + " message:" + message));
        });
        Exit.setHaltProcedure((statusCode, message) -> {
            throw new AssertionError((Object)("Test called halt(). code:" + statusCode + " message:" + message));
        });
        int numClientsCreated = 0;
        ArrayList<SmokeTestClient> clients = new ArrayList<SmokeTestClient>();
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, SmokeTestDriver.topics());
        String bootstrapServers = CLUSTER.bootstrapServers();
        Driver driver = new Driver(bootstrapServers, 10, 1000);
        driver.start();
        System.out.println("started driver");
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("__state.updater.enabled__", (Object)stateUpdaterEnabled);
        props.put("__processing.threads.enabled__", (Object)processingThreadsEnabled);
        props.put("session.timeout.ms", (Object)10000);
        while (driver.isAlive()) {
            Thread.sleep(1000L);
            SmokeTestClient smokeTestClient = new SmokeTestClient("streams-" + numClientsCreated++);
            clients.add(smokeTestClient);
            smokeTestClient.start(props);
            if (clients.size() < 3) continue;
            SmokeTestClient client = (SmokeTestClient)clients.remove(0);
            client.closeAsync();
            while (!client.closed()) {
                Thread.sleep(100L);
            }
        }
        try {
            driver.join();
        }
        finally {
            for (SmokeTestClient client : clients) {
                client.closeAsync();
            }
            for (SmokeTestClient client : clients) {
                while (!client.closed()) {
                    Thread.sleep(100L);
                }
            }
        }
        if (driver.exception() != null) {
            driver.exception().printStackTrace();
            throw new AssertionError((Object)driver.exception());
        }
        Assertions.assertTrue((boolean)driver.result().passed(), (String)driver.result().result());
    }

    private static class Driver
    extends Thread {
        private final String bootstrapServers;
        private final int numKeys;
        private final int maxRecordsPerKey;
        private Exception exception = null;
        private SmokeTestDriver.VerificationResult result;

        private Driver(String bootstrapServers, int numKeys, int maxRecordsPerKey) {
            this.bootstrapServers = bootstrapServers;
            this.numKeys = numKeys;
            this.maxRecordsPerKey = maxRecordsPerKey;
        }

        @Override
        public void run() {
            try {
                Map<String, Set<Integer>> allData = SmokeTestDriver.generate(this.bootstrapServers, this.numKeys, this.maxRecordsPerKey, Duration.ofSeconds(20L));
                this.result = SmokeTestDriver.verify(this.bootstrapServers, allData, this.maxRecordsPerKey);
            }
            catch (Exception ex) {
                this.exception = ex;
            }
        }

        public Exception exception() {
            return this.exception;
        }

        SmokeTestDriver.VerificationResult result() {
            return this.result;
        }
    }
}

