/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(value=MockitoJUnitRunner.StrictStubs.class)
public class RackAwarenessStreamsPartitionAssignorTest {
    private final List<PartitionInfo> infos = Arrays.asList(new PartitionInfo("topic0", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic0", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic0", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic4", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic4", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic4", 2, Node.noNode(), new Node[0], new Node[0]));
    final String consumer1 = "consumer1";
    final String consumer2 = "consumer2";
    final String consumer3 = "consumer3";
    final String consumer4 = "consumer4";
    final String consumer5 = "consumer5";
    final String consumer6 = "consumer6";
    final String consumer7 = "consumer7";
    final String consumer8 = "consumer8";
    final String consumer9 = "consumer9";
    private final Cluster metadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), this.infos, Collections.emptySet(), Collections.emptySet());
    private static final List<String> ALL_TAG_KEYS = new ArrayList<String>();
    private final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
    private static final String USER_END_POINT = "localhost:8080";
    private static final String APPLICATION_ID = "stream-partition-assignor-test";
    private TaskManager taskManager;
    private Admin adminClient;
    private StreamsConfig streamsConfig = new StreamsConfig(this.configProps());
    private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
    private TopologyMetadata topologyMetadata = new TopologyMetadata(this.builder, this.streamsConfig);
    private final StreamsMetadataState streamsMetadataState = (StreamsMetadataState)Mockito.mock(StreamsMetadataState.class);
    private final Map<String, ConsumerPartitionAssignor.Subscription> subscriptions = new HashMap<String, ConsumerPartitionAssignor.Subscription>();
    private final MockTime time = new MockTime();

    private Map<String, Object> configProps() {
        HashMap<String, Object> configurationMap = new HashMap<String, Object>();
        configurationMap.put("application.id", APPLICATION_ID);
        configurationMap.put("bootstrap.servers", USER_END_POINT);
        ReferenceContainer referenceContainer = new ReferenceContainer();
        referenceContainer.mainConsumer = (Consumer)Mockito.mock(Consumer.class);
        referenceContainer.adminClient = this.adminClient;
        referenceContainer.taskManager = this.taskManager;
        referenceContainer.streamsMetadataState = this.streamsMetadataState;
        referenceContainer.time = this.time;
        configurationMap.put("__reference.container.instance__", referenceContainer);
        configurationMap.put("rack.aware.assignment.tags", String.join((CharSequence)",", ALL_TAG_KEYS));
        ALL_TAG_KEYS.forEach(key -> configurationMap.put(StreamsConfig.clientTagPrefix((String)key), "dummy"));
        return configurationMap;
    }

    private void configurePartitionAssignorWith(Map<String, Object> props) {
        Map<String, Object> configMap = this.configProps();
        configMap.putAll(props);
        this.streamsConfig = new StreamsConfig(configMap);
        this.topologyMetadata = new TopologyMetadata(this.builder, this.streamsConfig);
        this.partitionAssignor.configure(configMap);
        this.overwriteInternalTopicManagerWithMock();
    }

    private void createMockTaskManager() {
        this.taskManager = (TaskManager)Mockito.mock(TaskManager.class);
        Mockito.when((Object)this.taskManager.topologyMetadata()).thenReturn((Object)this.topologyMetadata);
        Mockito.when((Object)this.taskManager.processId()).thenReturn((Object)AssignmentTestUtils.PID_1);
        this.topologyMetadata.buildAndRewriteTopology();
    }

    private void overwriteInternalTopicManagerWithMock() {
        MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager((Time)this.time, this.streamsConfig, this.mockClientSupplier.restoreConsumer, false);
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)mockInternalTopicManager);
    }

    @Before
    public void setUp() {
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(AssignmentTestUtils.EMPTY_CHANGELOG_END_OFFSETS);
    }

    @Test
    public void shouldDistributeWithMaximumNumberOfClientTags() {
        this.setupTopology(3, 2);
        this.createMockTaskManager();
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(RackAwarenessStreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Arrays.asList("stream-partition-assignor-test-store2-changelog", "stream-partition-assignor-test-store3-changelog", "stream-partition-assignor-test-store4-changelog"), Arrays.asList(3, 3, 3)));
        this.configurePartitionAssignorWith(Collections.singletonMap("num.standby.replicas", 1));
        HashMap<String, String> clientTags1 = new HashMap<String, String>();
        HashMap<String, String> clientTags2 = new HashMap<String, String>();
        for (int i = 0; i < ALL_TAG_KEYS.size(); ++i) {
            String key = ALL_TAG_KEYS.get(i);
            clientTags1.put(key, "value-1-" + i);
            clientTags2.put(key, "value-2-" + i);
        }
        HashMap<String, Map<String, String>> hostTags = new HashMap<String, Map<String, String>>();
        this.subscriptions.put("consumer1", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_1, AssignmentTestUtils.EMPTY_TASKS, clientTags1));
        hostTags.put("consumer1", clientTags1);
        this.subscriptions.put("consumer2", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_2, AssignmentTestUtils.EMPTY_TASKS, clientTags1));
        hostTags.put("consumer2", clientTags1);
        this.subscriptions.put("consumer3", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_3, AssignmentTestUtils.EMPTY_TASKS, clientTags2));
        hostTags.put("consumer3", clientTags2);
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        this.verifyIdealTaskDistributionReached(this.getClientTagDistributions(assignments, hostTags), ALL_TAG_KEYS);
        this.subscriptions.clear();
        this.subscriptions.put("consumer2", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_2, AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer2")).userData()).activeTasks(), clientTags1));
        this.subscriptions.put("consumer3", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_3, AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer3")).userData()).activeTasks(), clientTags2));
        assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        this.verifyIdealTaskDistributionReached(this.getClientTagDistributions(assignments, hostTags), ALL_TAG_KEYS);
    }

    @Test
    public void shouldDistributeOnDistinguishingTagSubset() {
        this.setupTopology(3, 0);
        this.createMockTaskManager();
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(RackAwarenessStreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Arrays.asList("stream-partition-assignor-test-store0-changelog", "stream-partition-assignor-test-store1-changelog", "stream-partition-assignor-test-store2-changelog"), Arrays.asList(3, 3, 3)));
        this.configurePartitionAssignorWith(Collections.singletonMap("num.standby.replicas", 1));
        HashMap<String, String> clientTags1 = new HashMap<String, String>();
        HashMap<String, String> clientTags2 = new HashMap<String, String>();
        clientTags1.put(ALL_TAG_KEYS.get(0), "value-1-all");
        clientTags2.put(ALL_TAG_KEYS.get(0), "value-2-all");
        clientTags1.put(ALL_TAG_KEYS.get(1), "value-1-1");
        clientTags2.put(ALL_TAG_KEYS.get(1), "value-2-2");
        String consumer1 = "consumer1";
        String consumer2 = "consumer2";
        String consumer3 = "consumer3";
        String consumer4 = "consumer4";
        String consumer5 = "consumer5";
        String consumer6 = "consumer6";
        HashMap<String, Map<String, String>> hostTags = new HashMap<String, Map<String, String>>();
        this.subscriptions.put("consumer1", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_1, AssignmentTestUtils.EMPTY_TASKS, clientTags1));
        hostTags.put("consumer1", clientTags1);
        this.subscriptions.put("consumer2", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_2, AssignmentTestUtils.EMPTY_TASKS, clientTags1));
        hostTags.put("consumer2", clientTags1);
        this.subscriptions.put("consumer3", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_3, AssignmentTestUtils.EMPTY_TASKS, clientTags1));
        hostTags.put("consumer3", clientTags1);
        this.subscriptions.put("consumer4", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_4, AssignmentTestUtils.EMPTY_TASKS, clientTags2));
        hostTags.put("consumer4", clientTags2);
        this.subscriptions.put("consumer5", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_5, AssignmentTestUtils.EMPTY_TASKS, clientTags2));
        hostTags.put("consumer5", clientTags2);
        this.subscriptions.put("consumer6", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_6, AssignmentTestUtils.EMPTY_TASKS, clientTags2));
        hostTags.put("consumer6", clientTags2);
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        this.verifyIdealTaskDistributionReached(this.getClientTagDistributions(assignments, hostTags), Collections.singletonList(ALL_TAG_KEYS.get(1)));
    }

    @Test
    public void shouldDistributeWithMultipleStandbys() {
        this.setupTopology(3, 0);
        this.createMockTaskManager();
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(RackAwarenessStreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Arrays.asList("stream-partition-assignor-test-store0-changelog", "stream-partition-assignor-test-store1-changelog", "stream-partition-assignor-test-store2-changelog"), Arrays.asList(3, 3, 3)));
        this.configurePartitionAssignorWith(Collections.singletonMap("num.standby.replicas", 2));
        Map clientTags1 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)ALL_TAG_KEYS.get(0), (Object)"value-0-1"), Utils.mkEntry((Object)ALL_TAG_KEYS.get(1), (Object)"value-1-1")});
        Map clientTags2 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)ALL_TAG_KEYS.get(0), (Object)"value-0-1"), Utils.mkEntry((Object)ALL_TAG_KEYS.get(1), (Object)"value-1-2")});
        Map clientTags3 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)ALL_TAG_KEYS.get(0), (Object)"value-0-1"), Utils.mkEntry((Object)ALL_TAG_KEYS.get(1), (Object)"value-1-3")});
        Map clientTags4 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)ALL_TAG_KEYS.get(0), (Object)"value-0-2"), Utils.mkEntry((Object)ALL_TAG_KEYS.get(1), (Object)"value-1-1")});
        Map clientTags5 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)ALL_TAG_KEYS.get(0), (Object)"value-0-2"), Utils.mkEntry((Object)ALL_TAG_KEYS.get(1), (Object)"value-1-2")});
        Map clientTags6 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)ALL_TAG_KEYS.get(0), (Object)"value-0-2"), Utils.mkEntry((Object)ALL_TAG_KEYS.get(1), (Object)"value-1-3")});
        Map clientTags7 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)ALL_TAG_KEYS.get(0), (Object)"value-0-3"), Utils.mkEntry((Object)ALL_TAG_KEYS.get(1), (Object)"value-1-1")});
        Map clientTags8 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)ALL_TAG_KEYS.get(0), (Object)"value-0-3"), Utils.mkEntry((Object)ALL_TAG_KEYS.get(1), (Object)"value-1-2")});
        Map clientTags9 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)ALL_TAG_KEYS.get(0), (Object)"value-0-3"), Utils.mkEntry((Object)ALL_TAG_KEYS.get(1), (Object)"value-1-3")});
        HashMap<String, Map<String, String>> hostTags = new HashMap<String, Map<String, String>>();
        this.subscriptions.put("consumer1", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_1, AssignmentTestUtils.EMPTY_TASKS, clientTags1));
        hostTags.put("consumer1", clientTags1);
        this.subscriptions.put("consumer2", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_2, AssignmentTestUtils.EMPTY_TASKS, clientTags2));
        hostTags.put("consumer2", clientTags2);
        this.subscriptions.put("consumer3", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_3, AssignmentTestUtils.EMPTY_TASKS, clientTags3));
        hostTags.put("consumer3", clientTags3);
        this.subscriptions.put("consumer4", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_4, AssignmentTestUtils.EMPTY_TASKS, clientTags4));
        hostTags.put("consumer4", clientTags4);
        this.subscriptions.put("consumer5", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_5, AssignmentTestUtils.EMPTY_TASKS, clientTags5));
        hostTags.put("consumer5", clientTags5);
        this.subscriptions.put("consumer6", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_6, AssignmentTestUtils.EMPTY_TASKS, clientTags6));
        hostTags.put("consumer6", clientTags6);
        this.subscriptions.put("consumer7", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_7, AssignmentTestUtils.EMPTY_TASKS, clientTags7));
        hostTags.put("consumer7", clientTags7);
        this.subscriptions.put("consumer8", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_8, AssignmentTestUtils.EMPTY_TASKS, clientTags8));
        hostTags.put("consumer8", clientTags8);
        this.subscriptions.put("consumer9", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_9, AssignmentTestUtils.EMPTY_TASKS, clientTags9));
        hostTags.put("consumer9", clientTags9);
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        this.verifyIdealTaskDistributionReached(this.getClientTagDistributions(assignments, hostTags), Arrays.asList(ALL_TAG_KEYS.get(0), ALL_TAG_KEYS.get(1)));
    }

    @Test
    public void shouldDistributePartiallyWhenDoNotHaveEnoughClients() {
        this.setupTopology(3, 0);
        this.createMockTaskManager();
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(RackAwarenessStreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Arrays.asList("stream-partition-assignor-test-store0-changelog", "stream-partition-assignor-test-store1-changelog", "stream-partition-assignor-test-store2-changelog"), Arrays.asList(3, 3, 3)));
        this.configurePartitionAssignorWith(Collections.singletonMap("num.standby.replicas", 2));
        Map clientTags1 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)ALL_TAG_KEYS.get(0), (Object)"value-0-1"), Utils.mkEntry((Object)ALL_TAG_KEYS.get(1), (Object)"value-1-1")});
        Map clientTags2 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)ALL_TAG_KEYS.get(0), (Object)"value-0-1"), Utils.mkEntry((Object)ALL_TAG_KEYS.get(1), (Object)"value-1-2")});
        Map clientTags3 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)ALL_TAG_KEYS.get(0), (Object)"value-0-1"), Utils.mkEntry((Object)ALL_TAG_KEYS.get(1), (Object)"value-1-3")});
        Map clientTags4 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)ALL_TAG_KEYS.get(0), (Object)"value-0-2"), Utils.mkEntry((Object)ALL_TAG_KEYS.get(1), (Object)"value-1-1")});
        Map clientTags5 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)ALL_TAG_KEYS.get(0), (Object)"value-0-2"), Utils.mkEntry((Object)ALL_TAG_KEYS.get(1), (Object)"value-1-2")});
        Map clientTags6 = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)ALL_TAG_KEYS.get(0), (Object)"value-0-2"), Utils.mkEntry((Object)ALL_TAG_KEYS.get(1), (Object)"value-1-3")});
        HashMap<String, Map<String, String>> hostTags = new HashMap<String, Map<String, String>>();
        this.subscriptions.put("consumer1", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_1, AssignmentTestUtils.EMPTY_TASKS, clientTags1));
        hostTags.put("consumer1", clientTags1);
        this.subscriptions.put("consumer2", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_2, AssignmentTestUtils.EMPTY_TASKS, clientTags2));
        hostTags.put("consumer2", clientTags2);
        this.subscriptions.put("consumer3", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_3, AssignmentTestUtils.EMPTY_TASKS, clientTags3));
        hostTags.put("consumer3", clientTags3);
        this.subscriptions.put("consumer4", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_4, AssignmentTestUtils.EMPTY_TASKS, clientTags4));
        hostTags.put("consumer4", clientTags4);
        this.subscriptions.put("consumer5", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_5, AssignmentTestUtils.EMPTY_TASKS, clientTags5));
        hostTags.put("consumer5", clientTags5);
        this.subscriptions.put("consumer6", RackAwarenessStreamsPartitionAssignorTest.getSubscription(AssignmentTestUtils.PID_6, AssignmentTestUtils.EMPTY_TASKS, clientTags6));
        hostTags.put("consumer6", clientTags6);
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        this.verifyIdealTaskDistributionReached(this.getClientTagDistributions(assignments, hostTags), Collections.singletonList(ALL_TAG_KEYS.get(1)));
        this.verifyPartialTaskDistributionReached(this.getClientTagDistributions(assignments, hostTags), Collections.singletonList(ALL_TAG_KEYS.get(0)));
    }

    private Map<TaskId, ClientTagDistribution> getClientTagDistributions(Map<String, ConsumerPartitionAssignor.Assignment> assignments, Map<String, Map<String, String>> hostTags) {
        HashMap<TaskId, ClientTagDistribution> taskClientTags = new HashMap<TaskId, ClientTagDistribution>();
        for (Map.Entry<String, ConsumerPartitionAssignor.Assignment> entry : assignments.entrySet()) {
            ClientTagDistribution tagDistribution;
            AssignmentInfo info = AssignmentInfo.decode((ByteBuffer)entry.getValue().userData());
            for (TaskId activeTaskId : info.activeTasks()) {
                taskClientTags.putIfAbsent(activeTaskId, new ClientTagDistribution(activeTaskId));
                tagDistribution = (ClientTagDistribution)taskClientTags.get(activeTaskId);
                tagDistribution.addActiveTags(hostTags.get(entry.getKey()));
            }
            for (TaskId standbyTaskId : info.standbyTasks().keySet()) {
                taskClientTags.putIfAbsent(standbyTaskId, new ClientTagDistribution(standbyTaskId));
                tagDistribution = (ClientTagDistribution)taskClientTags.get(standbyTaskId);
                tagDistribution.addStandbyTags(hostTags.get(entry.getKey()));
            }
        }
        return taskClientTags;
    }

    private void verifyIdealTaskDistributionReached(Map<TaskId, ClientTagDistribution> taskClientTags, List<String> tagsToCheck) {
        for (Map.Entry<TaskId, ClientTagDistribution> entry : taskClientTags.entrySet()) {
            if (!RackAwarenessStreamsPartitionAssignorTest.tagsAmongStandbysAreDifferent(entry.getValue(), tagsToCheck)) {
                throw new AssertionError((Object)("task " + entry.getKey() + "'s tag-distribution for " + tagsToCheck + " among standbys is not ideal: " + entry.getValue()));
            }
            if (!RackAwarenessStreamsPartitionAssignorTest.tagsAmongActiveAndAllStandbysAreDifferent(entry.getValue(), tagsToCheck)) {
                throw new AssertionError((Object)("task " + entry.getKey() + "'s tag-distribution for " + tagsToCheck + " between active and standbys is not ideal: " + entry.getValue()));
            }
        }
    }

    private void verifyPartialTaskDistributionReached(Map<TaskId, ClientTagDistribution> taskClientTags, List<String> tagsToCheck) {
        for (Map.Entry<TaskId, ClientTagDistribution> entry : taskClientTags.entrySet()) {
            if (!RackAwarenessStreamsPartitionAssignorTest.tagsAmongActiveAndAtLeastOneStandbyIsDifferent(entry.getValue(), tagsToCheck)) {
                throw new AssertionError((Object)("task " + entry.getKey() + "'s tag-distribution for " + tagsToCheck + "between active and standbys is not partially ideal: " + entry.getValue()));
            }
        }
    }

    private static boolean tagsAmongActiveAndAllStandbysAreDifferent(ClientTagDistribution tagDistribution, List<String> tagsToCheck) {
        return tagDistribution.standbysClientTags.stream().allMatch(standbyTags -> tagsToCheck.stream().noneMatch(tag -> ((String)tagDistribution.activeClientTags.get(tag)).equals(standbyTags.get(tag))));
    }

    private static boolean tagsAmongActiveAndAtLeastOneStandbyIsDifferent(ClientTagDistribution tagDistribution, List<String> tagsToCheck) {
        return tagDistribution.standbysClientTags.stream().anyMatch(standbyTags -> tagsToCheck.stream().noneMatch(tag -> ((String)tagDistribution.activeClientTags.get(tag)).equals(standbyTags.get(tag))));
    }

    private static boolean tagsAmongStandbysAreDifferent(ClientTagDistribution tagDistribution, List<String> tagsToCheck) {
        HashMap<String, Integer> statistics = new HashMap<String, Integer>();
        for (Map tags : tagDistribution.standbysClientTags) {
            for (Map.Entry tag : tags.entrySet()) {
                if (!tagsToCheck.contains(tag.getKey())) continue;
                String tagValue = (String)tag.getValue();
                Integer tagValueOccurrence = statistics.getOrDefault(tagValue, 0);
                statistics.put(tagValue, tagValueOccurrence + 1);
            }
        }
        return statistics.values().stream().noneMatch(occurrence -> occurrence > 1);
    }

    private void setupTopology(int numOfStatefulTopologies, int numOfStatelessTopologies) {
        int i;
        if (numOfStatefulTopologies + numOfStatelessTopologies > 5) {
            throw new IllegalArgumentException("Should not have more than 5 topologies, but have " + numOfStatefulTopologies);
        }
        for (i = 0; i < numOfStatelessTopologies; ++i) {
            this.builder.addSource(null, "source" + i, null, null, null, new String[]{"topic" + i});
            this.builder.addProcessor("processor" + i, new MockApiProcessorSupplier(), new String[]{"source" + i});
        }
        for (i = numOfStatelessTopologies; i < numOfStatelessTopologies + numOfStatefulTopologies; ++i) {
            this.builder.addSource(null, "source" + i, null, null, null, new String[]{"topic" + i});
            this.builder.addProcessor("processor" + i, new MockApiProcessorSupplier(), new String[]{"source" + i});
            this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store" + i, false), new String[]{"processor" + i});
        }
    }

    private static Map<TopicPartition, Long> getTopicPartitionOffsetsMap(List<String> changelogTopics, List<Integer> topicsNumPartitions) {
        if (changelogTopics.size() != topicsNumPartitions.size()) {
            throw new IllegalStateException("Passed in " + changelogTopics.size() + " changelog topic names, but " + topicsNumPartitions.size() + " different numPartitions for the topics");
        }
        HashMap<TopicPartition, Long> changelogEndOffsets = new HashMap<TopicPartition, Long>();
        for (int i = 0; i < changelogTopics.size(); ++i) {
            String topic = changelogTopics.get(i);
            int numPartitions = topicsNumPartitions.get(i);
            for (int partition = 0; partition < numPartitions; ++partition) {
                changelogEndOffsets.put(new TopicPartition(topic, partition), Long.MAX_VALUE);
            }
        }
        return changelogEndOffsets;
    }

    private static ConsumerPartitionAssignor.Subscription getSubscription(ProcessId processId, Collection<TaskId> prevActiveTasks, Map<String, String> clientTags) {
        return new ConsumerPartitionAssignor.Subscription(Collections.singletonList("source1"), new SubscriptionInfo(11, 11, processId, null, RackAwarenessStreamsPartitionAssignorTest.getTaskOffsetSums(prevActiveTasks), 0, 0, clientTags).encode());
    }

    private static Map<TaskId, Long> getTaskOffsetSums(Collection<TaskId> activeTasks) {
        Map<TaskId, Long> taskOffsetSums = activeTasks.stream().collect(Collectors.toMap(t -> t, t -> -2L));
        taskOffsetSums.putAll(AssignmentTestUtils.EMPTY_TASKS.stream().collect(Collectors.toMap(t -> t, t -> 0L)));
        return taskOffsetSums;
    }

    static {
        for (int i = 0; i < 5; ++i) {
            ALL_TAG_KEYS.add("key-" + i);
        }
    }

    private static final class ClientTagDistribution {
        private final TaskId taskId;
        private final Map<String, String> activeClientTags;
        private final List<Map<String, String>> standbysClientTags;

        ClientTagDistribution(TaskId taskId) {
            this.taskId = taskId;
            this.activeClientTags = new HashMap<String, String>();
            this.standbysClientTags = new ArrayList<Map<String, String>>();
        }

        void addActiveTags(Map<String, String> activeClientTags) {
            if (!this.activeClientTags.isEmpty()) {
                throw new IllegalStateException("Found multiple active tasks for " + this.taskId + ", this should not happen");
            }
            this.activeClientTags.putAll(activeClientTags);
        }

        void addStandbyTags(Map<String, String> standbyClientTags) {
            this.standbysClientTags.add(standbyClientTags);
        }

        public String toString() {
            return "ClientTagDistribution{taskId=" + this.taskId + ", activeClientTags=" + this.activeClientTags + ", standbysClientTags=" + this.standbysClientTags + '}';
        }
    }
}

