/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category(value={IntegrationTest.class})
public class RackAwarenessIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private static final String TAG_VALUE_K8_CLUSTER_1 = "k8s-cluster-1";
    private static final String TAG_VALUE_K8_CLUSTER_2 = "k8s-cluster-2";
    private static final String TAG_VALUE_K8_CLUSTER_3 = "k8s-cluster-3";
    private static final String TAG_VALUE_EU_CENTRAL_1A = "eu-central-1a";
    private static final String TAG_VALUE_EU_CENTRAL_1B = "eu-central-1b";
    private static final String TAG_VALUE_EU_CENTRAL_1C = "eu-central-1c";
    private static final int DEFAULT_NUMBER_OF_STATEFUL_SUB_TOPOLOGIES = 1;
    private static final int DEFAULT_NUMBER_OF_PARTITIONS_OF_SUB_TOPOLOGIES = 2;
    @Rule
    public TestName testName = new TestName();
    private static final String INPUT_TOPIC = "input-topic";
    private static final String TAG_ZONE = "zone";
    private static final String TAG_CLUSTER = "cluster";
    private List<KafkaStreamsWithConfiguration> kafkaStreamsInstances;
    private Properties baseConfiguration;
    private Topology topology;

    @BeforeClass
    public static void createTopics() throws Exception {
        CLUSTER.start();
        CLUSTER.createTopic(INPUT_TOPIC, 6, 1);
    }

    @Before
    public void setup() {
        this.kafkaStreamsInstances = new ArrayList<KafkaStreamsWithConfiguration>();
        this.baseConfiguration = new Properties();
        String safeTestName = IntegrationTestUtils.safeUniqueTestName(this.getClass(), this.testName);
        String applicationId = "app-" + safeTestName;
        this.baseConfiguration.put("application.id", applicationId);
        this.baseConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.baseConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.baseConfiguration.put("default.key.serde", Serdes.Integer().getClass());
        this.baseConfiguration.put("default.value.serde", Serdes.Integer().getClass());
    }

    @After
    public void cleanup() throws IOException {
        for (KafkaStreamsWithConfiguration kafkaStreamsWithConfiguration : this.kafkaStreamsInstances) {
            kafkaStreamsWithConfiguration.kafkaStreams.close(Duration.ofMillis(60000L));
            IntegrationTestUtils.purgeLocalStreamsState(kafkaStreamsWithConfiguration.configuration);
        }
        this.kafkaStreamsInstances.clear();
    }

    @Test
    public void shouldDoRebalancingWithMaximumNumberOfClientTags() throws Exception {
        int i;
        this.initTopology(3, 3);
        boolean numberOfStandbyReplicas = true;
        ArrayList<String> clientTagKeys = new ArrayList<String>();
        HashMap<String, String> clientTags1 = new HashMap<String, String>();
        HashMap<String, String> clientTags2 = new HashMap<String, String>();
        for (i = 0; i < 5; ++i) {
            clientTagKeys.add("key-" + i);
        }
        for (i = 0; i < clientTagKeys.size(); ++i) {
            String key = (String)clientTagKeys.get(i);
            clientTags1.put(key, "value-1-" + i);
            clientTags2.put(key, "value-2-" + i);
        }
        Assert.assertEquals((long)5L, (long)clientTagKeys.size());
        Stream.of(clientTags1, clientTags2).forEach(clientTags -> Assert.assertEquals((String)String.format("clientsTags with content '%s' did not match expected size", clientTags), (long)5L, (long)clientTags.size()));
        this.createAndStart(clientTags1, clientTagKeys, 1);
        this.createAndStart(clientTags1, clientTagKeys, 1);
        this.createAndStart(clientTags2, clientTagKeys, 1);
        this.waitUntilAllKafkaStreamsClientsAreRunning();
        Assert.assertTrue((boolean)this.isIdealTaskDistributionReachedForTags(clientTagKeys));
        this.stopKafkaStreamsInstanceWithIndex(0);
        this.waitUntilAllKafkaStreamsClientsAreRunning();
        Assert.assertTrue((boolean)this.isIdealTaskDistributionReachedForTags(clientTagKeys));
    }

    @Test
    public void shouldDistributeStandbyReplicasWhenAllClientsAreLocatedOnASameClusterTag() throws Exception {
        this.initTopology();
        boolean numberOfStandbyReplicas = true;
        this.createAndStart(RackAwarenessIntegrationTest.buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 1);
        this.createAndStart(RackAwarenessIntegrationTest.buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 1);
        this.createAndStart(RackAwarenessIntegrationTest.buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 1);
        this.createAndStart(RackAwarenessIntegrationTest.buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 1);
        this.createAndStart(RackAwarenessIntegrationTest.buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 1);
        this.createAndStart(RackAwarenessIntegrationTest.buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 1);
        this.waitUntilAllKafkaStreamsClientsAreRunning();
        Assert.assertTrue((boolean)this.isIdealTaskDistributionReachedForTags(Collections.singletonList(TAG_ZONE)));
    }

    @Test
    public void shouldDistributeStandbyReplicasOverMultipleClientTags() throws Exception {
        this.initTopology();
        int numberOfStandbyReplicas = 2;
        this.createAndStart(RackAwarenessIntegrationTest.buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 2);
        this.createAndStart(RackAwarenessIntegrationTest.buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 2);
        this.createAndStart(RackAwarenessIntegrationTest.buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 2);
        this.createAndStart(RackAwarenessIntegrationTest.buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_2), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 2);
        this.createAndStart(RackAwarenessIntegrationTest.buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_2), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 2);
        this.createAndStart(RackAwarenessIntegrationTest.buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_2), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 2);
        this.createAndStart(RackAwarenessIntegrationTest.buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_3), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 2);
        this.createAndStart(RackAwarenessIntegrationTest.buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_3), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 2);
        this.createAndStart(RackAwarenessIntegrationTest.buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_3), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 2);
        this.waitUntilAllKafkaStreamsClientsAreRunning();
        Assert.assertTrue((boolean)this.isIdealTaskDistributionReachedForTags(Arrays.asList(TAG_ZONE, TAG_CLUSTER)));
    }

    @Test
    public void shouldDistributeStandbyReplicasWhenIdealDistributionCanNotBeAchieved() throws Exception {
        this.initTopology();
        int numberOfStandbyReplicas = 2;
        this.createAndStart(RackAwarenessIntegrationTest.buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 2);
        this.createAndStart(RackAwarenessIntegrationTest.buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 2);
        this.createAndStart(RackAwarenessIntegrationTest.buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_1), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 2);
        this.createAndStart(RackAwarenessIntegrationTest.buildClientTags(TAG_VALUE_EU_CENTRAL_1A, TAG_VALUE_K8_CLUSTER_2), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 2);
        this.createAndStart(RackAwarenessIntegrationTest.buildClientTags(TAG_VALUE_EU_CENTRAL_1B, TAG_VALUE_K8_CLUSTER_2), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 2);
        this.createAndStart(RackAwarenessIntegrationTest.buildClientTags(TAG_VALUE_EU_CENTRAL_1C, TAG_VALUE_K8_CLUSTER_2), Arrays.asList(TAG_ZONE, TAG_CLUSTER), 2);
        this.waitUntilAllKafkaStreamsClientsAreRunning();
        Assert.assertTrue((boolean)this.isIdealTaskDistributionReachedForTags(Collections.singletonList(TAG_ZONE)));
        Assert.assertTrue((boolean)this.isPartialTaskDistributionReachedForTags(Collections.singletonList(TAG_CLUSTER)));
    }

    private void stopKafkaStreamsInstanceWithIndex(int index) {
        this.kafkaStreamsInstances.get(index).kafkaStreams.close(Duration.ofMillis(60000L));
        this.kafkaStreamsInstances.remove(index);
    }

    private void waitUntilAllKafkaStreamsClientsAreRunning() throws Exception {
        this.waitUntilAllKafkaStreamsClientsAreRunning(Duration.ofMillis(60000L));
    }

    private void waitUntilAllKafkaStreamsClientsAreRunning(Duration timeout) throws Exception {
        IntegrationTestUtils.waitForApplicationState(this.kafkaStreamsInstances.stream().map(it -> ((KafkaStreamsWithConfiguration)it).kafkaStreams).collect(Collectors.toList()), KafkaStreams.State.RUNNING, timeout);
    }

    private boolean isPartialTaskDistributionReachedForTags(Collection<String> tagsToCheck) {
        Predicate<TaskClientTagDistribution> partialTaskClientTagDistributionTest = taskClientTagDistribution -> {
            Map activeTaskClientTags = ((TaskClientTagDistribution)taskClientTagDistribution).activeTaskClientTags.clientTags;
            return RackAwarenessIntegrationTest.tagsAmongstActiveAndAtLeastOneStandbyTaskIsDifferent(((TaskClientTagDistribution)taskClientTagDistribution).standbyTasksClientTags, activeTaskClientTags, tagsToCheck);
        };
        return this.isTaskDistributionTestSuccessful(partialTaskClientTagDistributionTest);
    }

    private boolean isIdealTaskDistributionReachedForTags(Collection<String> tagsToCheck) {
        Predicate<TaskClientTagDistribution> idealTaskClientTagDistributionTest = taskClientTagDistribution -> {
            Map activeTaskClientTags = ((TaskClientTagDistribution)taskClientTagDistribution).activeTaskClientTags.clientTags;
            return RackAwarenessIntegrationTest.tagsAmongstStandbyTasksAreDifferent(((TaskClientTagDistribution)taskClientTagDistribution).standbyTasksClientTags, tagsToCheck) && RackAwarenessIntegrationTest.tagsAmongstActiveAndAllStandbyTasksAreDifferent(((TaskClientTagDistribution)taskClientTagDistribution).standbyTasksClientTags, activeTaskClientTags, tagsToCheck);
        };
        return this.isTaskDistributionTestSuccessful(idealTaskClientTagDistributionTest);
    }

    private boolean isTaskDistributionTestSuccessful(Predicate<TaskClientTagDistribution> taskClientTagDistributionPredicate) {
        List<TaskClientTagDistribution> tasksClientTagDistributions = this.getTasksClientTagDistributions();
        if (tasksClientTagDistributions.isEmpty()) {
            return false;
        }
        return tasksClientTagDistributions.stream().allMatch(taskClientTagDistributionPredicate);
    }

    private static boolean tagsAmongstActiveAndAllStandbyTasksAreDifferent(Collection<TaskClientTags> standbyTasks, Map<String, String> activeTaskClientTags, Collection<String> tagsToCheck) {
        return standbyTasks.stream().allMatch(standbyTask -> tagsToCheck.stream().noneMatch(tag -> ((String)activeTaskClientTags.get(tag)).equals(((TaskClientTags)standbyTask).clientTags.get(tag))));
    }

    private static boolean tagsAmongstActiveAndAtLeastOneStandbyTaskIsDifferent(Collection<TaskClientTags> standbyTasks, Map<String, String> activeTaskClientTags, Collection<String> tagsToCheck) {
        return standbyTasks.stream().anyMatch(standbyTask -> tagsToCheck.stream().noneMatch(tag -> ((String)activeTaskClientTags.get(tag)).equals(((TaskClientTags)standbyTask).clientTags.get(tag))));
    }

    private static boolean tagsAmongstStandbyTasksAreDifferent(Collection<TaskClientTags> standbyTasks, Collection<String> tagsToCheck) {
        HashMap<String, Integer> statistics = new HashMap<String, Integer>();
        for (TaskClientTags standbyTask : standbyTasks) {
            for (String tag : tagsToCheck) {
                String tagValue = (String)standbyTask.clientTags.get(tag);
                Integer tagValueOccurrence = statistics.getOrDefault(tagValue, 0);
                statistics.put(tagValue, tagValueOccurrence + 1);
            }
        }
        return statistics.values().stream().noneMatch(occurrence -> occurrence > 1);
    }

    private void initTopology() {
        this.initTopology(2, 1);
    }

    private void initTopology(int numberOfPartitionsOfSubTopologies, int numberOfStatefulSubTopologies) {
        StreamsBuilder builder = new StreamsBuilder();
        String stateStoreName = "myTransformState";
        StoreBuilder keyValueStoreBuilder = Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.persistentKeyValueStore((String)"myTransformState"), (Serde)Serdes.Integer(), (Serde)Serdes.Integer());
        builder.addStateStore(keyValueStoreBuilder);
        KStream stream = builder.stream(INPUT_TOPIC, Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.Integer()));
        stream.repartition(Repartitioned.numberOfPartitions((int)numberOfPartitionsOfSubTopologies)).filter((k, v) -> true);
        for (int i = 0; i < numberOfStatefulSubTopologies; ++i) {
            stream.repartition(Repartitioned.numberOfPartitions((int)numberOfPartitionsOfSubTopologies)).groupByKey().reduce(Integer::sum);
        }
        this.topology = builder.build();
    }

    private List<TaskClientTagDistribution> getTasksClientTagDistributions() {
        ArrayList<TaskClientTagDistribution> taskClientTags = new ArrayList<TaskClientTagDistribution>();
        for (KafkaStreamsWithConfiguration kafkaStreamsInstance : this.kafkaStreamsInstances) {
            StreamsConfig config = new StreamsConfig((Map)kafkaStreamsInstance.configuration);
            for (ThreadMetadata localThreadsMetadata : kafkaStreamsInstance.kafkaStreams.metadataForLocalThreads()) {
                localThreadsMetadata.activeTasks().forEach(activeTask -> {
                    TaskId activeTaskId = activeTask.taskId();
                    Map clientTags = config.getClientTags();
                    List<TaskClientTags> standbyTasks = this.findStandbysForActiveTask(activeTaskId);
                    if (!standbyTasks.isEmpty()) {
                        TaskClientTags activeTaskView = new TaskClientTags(activeTaskId, clientTags);
                        taskClientTags.add(new TaskClientTagDistribution(activeTaskView, standbyTasks));
                    }
                });
            }
        }
        return taskClientTags;
    }

    private List<TaskClientTags> findStandbysForActiveTask(TaskId taskId) {
        ArrayList<TaskClientTags> standbyTasks = new ArrayList<TaskClientTags>();
        for (KafkaStreamsWithConfiguration kafkaStreamsInstance : this.kafkaStreamsInstances) {
            for (ThreadMetadata localThreadsMetadata : kafkaStreamsInstance.kafkaStreams.metadataForLocalThreads()) {
                localThreadsMetadata.standbyTasks().forEach(standbyTask -> {
                    TaskId standbyTaskId = standbyTask.taskId();
                    if (taskId.equals((Object)standbyTaskId)) {
                        StreamsConfig config = new StreamsConfig((Map)kafkaStreamsInstance.configuration);
                        standbyTasks.add(new TaskClientTags(standbyTaskId, config.getClientTags()));
                    }
                });
            }
        }
        return standbyTasks;
    }

    private static Map<String, String> buildClientTags(String zone, String cluster) {
        HashMap<String, String> clientTags = new HashMap<String, String>();
        clientTags.put(TAG_ZONE, zone);
        clientTags.put(TAG_CLUSTER, cluster);
        return clientTags;
    }

    private void createAndStart(Map<String, String> clientTags, Collection<String> rackAwareAssignmentTags, int numberOfStandbyReplicas) {
        Properties streamsConfiguration = this.createStreamsConfiguration(clientTags, rackAwareAssignmentTags, numberOfStandbyReplicas);
        KafkaStreams kafkaStreams = new KafkaStreams(this.topology, streamsConfiguration);
        this.kafkaStreamsInstances.add(new KafkaStreamsWithConfiguration(streamsConfiguration, kafkaStreams));
        kafkaStreams.start();
    }

    private Properties createStreamsConfiguration(Map<String, String> clientTags, Collection<String> rackAwareAssignmentTags, int numStandbyReplicas) {
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.putAll((Map<?, ?>)this.baseConfiguration);
        streamsConfiguration.put("num.standby.replicas", (Object)numStandbyReplicas);
        streamsConfiguration.put("rack.aware.assignment.tags", String.join((CharSequence)",", rackAwareAssignmentTags));
        clientTags.forEach((key, value) -> streamsConfiguration.put(StreamsConfig.clientTagPrefix((String)key), value));
        streamsConfiguration.put("state.dir", TestUtils.tempDirectory((String)String.join((CharSequence)"-", clientTags.values())).getPath());
        return streamsConfiguration;
    }

    private static final class TaskClientTags {
        private final TaskId taskId;
        private final Map<String, String> clientTags;

        TaskClientTags(TaskId taskId, Map<String, String> clientTags) {
            this.taskId = taskId;
            this.clientTags = clientTags;
        }

        public String toString() {
            return "TaskClientTags{taskId=" + this.taskId + ", clientTags=" + this.clientTags + '}';
        }
    }

    private static final class TaskClientTagDistribution {
        private final TaskClientTags activeTaskClientTags;
        private final List<TaskClientTags> standbyTasksClientTags;

        TaskClientTagDistribution(TaskClientTags activeTaskClientTags, List<TaskClientTags> standbyTasksClientTags) {
            this.activeTaskClientTags = activeTaskClientTags;
            this.standbyTasksClientTags = standbyTasksClientTags;
        }

        public String toString() {
            return "TaskDistribution{activeTaskClientTagsView=" + this.activeTaskClientTags + ", standbyTasks=" + this.standbyTasksClientTags + '}';
        }
    }

    private static final class KafkaStreamsWithConfiguration {
        private final Properties configuration;
        private final KafkaStreams kafkaStreams;

        KafkaStreamsWithConfiguration(Properties configuration, KafkaStreams kafkaStreams) {
            this.configuration = configuration;
            this.kafkaStreams = kafkaStreams;
        }
    }
}

