/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.SingleGroupPartitionGrouperStub;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;

public class StreamPartitionAssignorTest {
    private TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private TopicPartition t1p2 = new TopicPartition("topic1", 2);
    private TopicPartition t2p0 = new TopicPartition("topic2", 0);
    private TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private TopicPartition t2p2 = new TopicPartition("topic2", 2);
    private TopicPartition t3p0 = new TopicPartition("topic3", 0);
    private TopicPartition t3p1 = new TopicPartition("topic3", 1);
    private TopicPartition t3p2 = new TopicPartition("topic3", 2);
    private TopicPartition t3p3 = new TopicPartition("topic3", 3);
    private Set<String> allTopics = Utils.mkSet((Object[])new String[]{"topic1", "topic2"});
    private List<PartitionInfo> infos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]));
    private Cluster metadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), this.infos, Collections.emptySet(), Collections.emptySet());
    private final TaskId task0 = new TaskId(0, 0);
    private final TaskId task1 = new TaskId(0, 1);
    private final TaskId task2 = new TaskId(0, 2);
    private final TaskId task3 = new TaskId(0, 3);
    private final String userEndPoint = "localhost:2171";
    private final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
    private final TopologyBuilder builder = new TopologyBuilder();
    private final StreamsConfig config = new StreamsConfig((Map)this.configProps());

    private Properties configProps() {
        return new Properties(){
            {
                this.setProperty("application.id", "stream-partition-assignor-test");
                this.setProperty("bootstrap.servers", "localhost:2171");
                this.setProperty("buffered.records.per.partition", "3");
                this.setProperty("timestamp.extractor", MockTimestampExtractor.class.getName());
            }
        };
    }

    @Test
    public void testSubscription() throws Exception {
        this.builder.addSource("source1", new String[]{"topic1"});
        this.builder.addSource("source2", new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        final Set prevTasks = Utils.mkSet((Object[])new TaskId[]{new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1)});
        final Set cachedTasks = Utils.mkSet((Object[])new TaskId[]{new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1), new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)});
        String clientId = "client-id";
        UUID processId = UUID.randomUUID();
        StreamThread thread = new StreamThread(this.builder, this.config, new MockClientSupplier(), "test", clientId, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L){

            public Set<TaskId> prevTasks() {
                return prevTasks;
            }

            public Set<TaskId> cachedTasks() {
                return cachedTasks;
            }
        };
        this.partitionAssignor.configure(this.config.getConsumerConfigs(thread, "test", clientId));
        PartitionAssignor.Subscription subscription = this.partitionAssignor.subscription(Utils.mkSet((Object[])new String[]{"topic1", "topic2"}));
        Collections.sort(subscription.topics());
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"topic1", "topic2"}), (Object)subscription.topics());
        HashSet standbyTasks = new HashSet(cachedTasks);
        standbyTasks.removeAll(prevTasks);
        SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks, null);
        Assert.assertEquals((Object)info.encode(), (Object)subscription.userData());
    }

    @Test
    public void testAssignBasic() throws Exception {
        this.builder.addSource("source1", new String[]{"topic1"});
        this.builder.addSource("source2", new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        List topics = Utils.mkList((Object[])new String[]{"topic1", "topic2"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1, this.task2});
        Set prevTasks10 = Utils.mkSet((Object[])new TaskId[]{this.task0});
        Set prevTasks11 = Utils.mkSet((Object[])new TaskId[]{this.task1});
        Set prevTasks20 = Utils.mkSet((Object[])new TaskId[]{this.task2});
        Set standbyTasks10 = Utils.mkSet((Object[])new TaskId[]{this.task1});
        Set standbyTasks11 = Utils.mkSet((Object[])new TaskId[]{this.task2});
        Set standbyTasks20 = Utils.mkSet((Object[])new TaskId[]{this.task0});
        UUID uuid1 = UUID.randomUUID();
        UUID uuid2 = UUID.randomUUID();
        String client1 = "client1";
        StreamThread thread10 = new StreamThread(this.builder, this.config, (KafkaClientSupplier)this.mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.configure(this.config.getConsumerConfigs(thread10, "test", client1));
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)new MockInternalTopicManager(thread10.config, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, "localhost:2171").encode()));
        subscriptions.put("consumer11", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, "localhost:2171").encode()));
        subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, "localhost:2171").encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, subscriptions);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new Set[]{Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t2p0}), Utils.mkSet((Object[])new TopicPartition[]{this.t1p1, this.t2p1})}), (Object)Utils.mkSet((Object[])new HashSet[]{new HashSet(((PartitionAssignor.Assignment)assignments.get("consumer10")).partitions()), new HashSet(((PartitionAssignor.Assignment)assignments.get("consumer11")).partitions())}));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p2, this.t2p2}), new HashSet(((PartitionAssignor.Assignment)assignments.get("consumer20")).partitions()));
        HashSet allActiveTasks = new HashSet();
        AssignmentInfo info10 = this.checkAssignment(this.allTopics, (PartitionAssignor.Assignment)assignments.get("consumer10"));
        allActiveTasks.addAll(info10.activeTasks);
        AssignmentInfo info11 = this.checkAssignment(this.allTopics, (PartitionAssignor.Assignment)assignments.get("consumer11"));
        allActiveTasks.addAll(info11.activeTasks);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1}), allActiveTasks);
        AssignmentInfo info20 = this.checkAssignment(this.allTopics, (PartitionAssignor.Assignment)assignments.get("consumer20"));
        allActiveTasks.addAll(info20.activeTasks);
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals((Object)allTasks, new HashSet(allActiveTasks));
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals((Object)allTasks, allActiveTasks);
    }

    @Test
    public void testAssignWithPartialTopology() throws Exception {
        Properties props = this.configProps();
        props.put("partition.grouper", SingleGroupPartitionGrouperStub.class);
        StreamsConfig config = new StreamsConfig((Map)props);
        this.builder.addSource("source1", new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store1", false), new String[]{"processor1"});
        this.builder.addSource("source2", new String[]{"topic2"});
        this.builder.addProcessor("processor2", new MockProcessorSupplier(), new String[]{"source2"});
        this.builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store2", false), new String[]{"processor2"});
        List topics = Utils.mkList((Object[])new String[]{"topic1", "topic2"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1, this.task2});
        UUID uuid1 = UUID.randomUUID();
        String client1 = "client1";
        StreamThread thread10 = new StreamThread(this.builder, config, (KafkaClientSupplier)this.mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)new MockInternalTopicManager(thread10.config, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), "localhost:2171").encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, subscriptions);
        HashSet allActiveTasks = new HashSet();
        AssignmentInfo info10 = this.checkAssignment(Utils.mkSet((Object[])new String[]{"topic1"}), (PartitionAssignor.Assignment)assignments.get("consumer10"));
        allActiveTasks.addAll(info10.activeTasks);
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals((Object)allTasks, new HashSet(allActiveTasks));
    }

    @Test
    public void testAssignEmptyMetadata() throws Exception {
        this.builder.addSource("source1", new String[]{"topic1"});
        this.builder.addSource("source2", new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        List topics = Utils.mkList((Object[])new String[]{"topic1", "topic2"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1, this.task2});
        Set prevTasks10 = Utils.mkSet((Object[])new TaskId[]{this.task0});
        Set standbyTasks10 = Utils.mkSet((Object[])new TaskId[]{this.task1});
        Cluster emptyMetadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
        UUID uuid1 = UUID.randomUUID();
        String client1 = "client1";
        StreamThread thread10 = new StreamThread(this.builder, this.config, (KafkaClientSupplier)new MockClientSupplier(), "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.configure(this.config.getConsumerConfigs(thread10, "test", client1));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, "localhost:2171").encode()));
        Map assignments = this.partitionAssignor.assign(emptyMetadata, subscriptions);
        Assert.assertEquals(Collections.emptySet(), new HashSet(((PartitionAssignor.Assignment)assignments.get("consumer10")).partitions()));
        HashSet allActiveTasks = new HashSet();
        AssignmentInfo info10 = this.checkAssignment(Collections.emptySet(), (PartitionAssignor.Assignment)assignments.get("consumer10"));
        allActiveTasks.addAll(info10.activeTasks);
        Assert.assertEquals((long)0L, (long)allActiveTasks.size());
        Assert.assertEquals(Collections.emptySet(), new HashSet(allActiveTasks));
        assignments = this.partitionAssignor.assign(this.metadata, subscriptions);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new Set[]{Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t2p0, this.t1p0, this.t2p0, this.t1p1, this.t2p1, this.t1p2, this.t2p2})}), (Object)Utils.mkSet((Object[])new HashSet[]{new HashSet(((PartitionAssignor.Assignment)assignments.get("consumer10")).partitions())}));
        info10 = this.checkAssignment(this.allTopics, (PartitionAssignor.Assignment)assignments.get("consumer10"));
        allActiveTasks.addAll(info10.activeTasks);
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals((Object)allTasks, new HashSet(allActiveTasks));
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals((Object)allTasks, allActiveTasks);
    }

    @Test
    public void testAssignWithNewTasks() throws Exception {
        this.builder.addSource("source1", new String[]{"topic1"});
        this.builder.addSource("source2", new String[]{"topic2"});
        this.builder.addSource("source3", new String[]{"topic3"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2", "source3"});
        List topics = Utils.mkList((Object[])new String[]{"topic1", "topic2", "topic3"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1, this.task2, this.task3});
        Set prevTasks10 = Utils.mkSet((Object[])new TaskId[]{this.task0});
        Set prevTasks11 = Utils.mkSet((Object[])new TaskId[]{this.task1});
        Set prevTasks20 = Utils.mkSet((Object[])new TaskId[]{this.task2});
        UUID uuid1 = UUID.randomUUID();
        UUID uuid2 = UUID.randomUUID();
        String client1 = "client1";
        StreamThread thread10 = new StreamThread(this.builder, this.config, (KafkaClientSupplier)this.mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.configure(this.config.getConsumerConfigs(thread10, "test", client1));
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)new MockInternalTopicManager(thread10.config, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.emptySet(), "localhost:2171").encode()));
        subscriptions.put("consumer11", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.emptySet(), "localhost:2171").encode()));
        subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.emptySet(), "localhost:2171").encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, subscriptions);
        HashSet allActiveTasks = new HashSet();
        HashSet allPartitions = new HashSet();
        AssignmentInfo info = AssignmentInfo.decode((ByteBuffer)((PartitionAssignor.Assignment)assignments.get("consumer10")).userData());
        allActiveTasks.addAll(info.activeTasks);
        allPartitions.addAll(((PartitionAssignor.Assignment)assignments.get("consumer10")).partitions());
        info = AssignmentInfo.decode((ByteBuffer)((PartitionAssignor.Assignment)assignments.get("consumer11")).userData());
        allActiveTasks.addAll(info.activeTasks);
        allPartitions.addAll(((PartitionAssignor.Assignment)assignments.get("consumer11")).partitions());
        info = AssignmentInfo.decode((ByteBuffer)((PartitionAssignor.Assignment)assignments.get("consumer20")).userData());
        allActiveTasks.addAll(info.activeTasks);
        allPartitions.addAll(((PartitionAssignor.Assignment)assignments.get("consumer20")).partitions());
        Assert.assertEquals((Object)allTasks, allActiveTasks);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t1p1, this.t1p2, this.t2p0, this.t2p1, this.t2p2, this.t3p0, this.t3p1, this.t3p2, this.t3p3}), allPartitions);
    }

    @Test
    public void testAssignWithStates() throws Exception {
        String applicationId = "test";
        this.builder.setApplicationId(applicationId);
        this.builder.addSource("source1", new String[]{"topic1"});
        this.builder.addSource("source2", new String[]{"topic2"});
        this.builder.addProcessor("processor-1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store1", false), new String[]{"processor-1"});
        this.builder.addProcessor("processor-2", new MockProcessorSupplier(), new String[]{"source2"});
        this.builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store2", false), new String[]{"processor-2"});
        this.builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store3", false), new String[]{"processor-2"});
        List topics = Utils.mkList((Object[])new String[]{"topic1", "topic2"});
        TaskId task00 = new TaskId(0, 0);
        TaskId task01 = new TaskId(0, 1);
        TaskId task02 = new TaskId(0, 2);
        TaskId task10 = new TaskId(1, 0);
        TaskId task11 = new TaskId(1, 1);
        TaskId task12 = new TaskId(1, 2);
        List tasks = Utils.mkList((Object[])new TaskId[]{task00, task01, task02, task10, task11, task12});
        UUID uuid1 = UUID.randomUUID();
        UUID uuid2 = UUID.randomUUID();
        String client1 = "client1";
        StreamThread thread10 = new StreamThread(this.builder, this.config, (KafkaClientSupplier)this.mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.configure(this.config.getConsumerConfigs(thread10, applicationId, client1));
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)new MockInternalTopicManager(thread10.config, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), "localhost:2171").encode()));
        subscriptions.put("consumer11", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), "localhost:2171").encode()));
        subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.emptySet(), Collections.emptySet(), "localhost:2171").encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, subscriptions);
        Assert.assertEquals((long)2L, (long)((PartitionAssignor.Assignment)assignments.get("consumer10")).partitions().size());
        Assert.assertEquals((long)2L, (long)((PartitionAssignor.Assignment)assignments.get("consumer11")).partitions().size());
        Assert.assertEquals((long)2L, (long)((PartitionAssignor.Assignment)assignments.get("consumer20")).partitions().size());
        AssignmentInfo info10 = AssignmentInfo.decode((ByteBuffer)((PartitionAssignor.Assignment)assignments.get("consumer10")).userData());
        AssignmentInfo info11 = AssignmentInfo.decode((ByteBuffer)((PartitionAssignor.Assignment)assignments.get("consumer11")).userData());
        AssignmentInfo info20 = AssignmentInfo.decode((ByteBuffer)((PartitionAssignor.Assignment)assignments.get("consumer20")).userData());
        Assert.assertEquals((long)2L, (long)info10.activeTasks.size());
        Assert.assertEquals((long)2L, (long)info11.activeTasks.size());
        Assert.assertEquals((long)2L, (long)info20.activeTasks.size());
        HashSet allTasks = new HashSet();
        allTasks.addAll(info10.activeTasks);
        allTasks.addAll(info11.activeTasks);
        allTasks.addAll(info20.activeTasks);
        Assert.assertEquals(new HashSet(tasks), allTasks);
        Map topicGroups = thread10.builder.topicGroups();
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{task00, task01, task02}), this.tasksForState(applicationId, "store1", tasks, topicGroups));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{task10, task11, task12}), this.tasksForState(applicationId, "store2", tasks, topicGroups));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{task10, task11, task12}), this.tasksForState(applicationId, "store3", tasks, topicGroups));
    }

    private Set<TaskId> tasksForState(String applicationId, String storeName, List<TaskId> tasks, Map<Integer, TopologyBuilder.TopicsInfo> topicGroups) {
        String changelogTopic = ProcessorStateManager.storeChangelogTopic((String)applicationId, (String)storeName);
        HashSet<TaskId> ids = new HashSet<TaskId>();
        for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
            Set stateChangelogTopics = entry.getValue().stateChangelogTopics.keySet();
            if (!stateChangelogTopics.contains(changelogTopic)) continue;
            for (TaskId id : tasks) {
                if (id.topicGroupId != entry.getKey()) continue;
                ids.add(id);
            }
        }
        return ids;
    }

    @Test
    public void testAssignWithStandbyReplicas() throws Exception {
        Properties props = this.configProps();
        props.setProperty("num.standby.replicas", "1");
        StreamsConfig config = new StreamsConfig((Map)props);
        this.builder.addSource("source1", new String[]{"topic1"});
        this.builder.addSource("source2", new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        List topics = Utils.mkList((Object[])new String[]{"topic1", "topic2"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1, this.task2});
        Set prevTasks10 = Utils.mkSet((Object[])new TaskId[]{this.task0});
        Set prevTasks11 = Utils.mkSet((Object[])new TaskId[]{this.task1});
        Set prevTasks20 = Utils.mkSet((Object[])new TaskId[]{this.task2});
        Set standbyTasks10 = Utils.mkSet((Object[])new TaskId[]{this.task1});
        Set standbyTasks11 = Utils.mkSet((Object[])new TaskId[]{this.task2});
        Set standbyTasks20 = Utils.mkSet((Object[])new TaskId[]{this.task0});
        UUID uuid1 = UUID.randomUUID();
        UUID uuid2 = UUID.randomUUID();
        String client1 = "client1";
        StreamThread thread10 = new StreamThread(this.builder, config, (KafkaClientSupplier)this.mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)new MockInternalTopicManager(thread10.config, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, "localhost:2171").encode()));
        subscriptions.put("consumer11", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, "localhost:2171").encode()));
        subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, "localhost:2171").encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, subscriptions);
        HashSet allActiveTasks = new HashSet();
        HashSet allStandbyTasks = new HashSet();
        AssignmentInfo info10 = this.checkAssignment(this.allTopics, (PartitionAssignor.Assignment)assignments.get("consumer10"));
        allActiveTasks.addAll(info10.activeTasks);
        allStandbyTasks.addAll(info10.standbyTasks.keySet());
        AssignmentInfo info11 = this.checkAssignment(this.allTopics, (PartitionAssignor.Assignment)assignments.get("consumer11"));
        allActiveTasks.addAll(info11.activeTasks);
        allStandbyTasks.addAll(info11.standbyTasks.keySet());
        Assert.assertNotEquals((String)"same processId has same set of standby tasks", info11.standbyTasks.keySet(), info10.standbyTasks.keySet());
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1}), new HashSet(allActiveTasks));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{this.task2}), new HashSet(allStandbyTasks));
        AssignmentInfo info20 = this.checkAssignment(this.allTopics, (PartitionAssignor.Assignment)assignments.get("consumer20"));
        allActiveTasks.addAll(info20.activeTasks);
        allStandbyTasks.addAll(info20.standbyTasks.keySet());
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals((Object)allTasks, allActiveTasks);
        Assert.assertEquals((long)3L, (long)allStandbyTasks.size());
        Assert.assertEquals((Object)allTasks, allStandbyTasks);
    }

    @Test
    public void testOnAssignment() throws Exception {
        TopicPartition t2p3 = new TopicPartition("topic2", 3);
        TopologyBuilder builder = new TopologyBuilder();
        builder.addSource("source1", new String[]{"topic1"});
        builder.addSource("source2", new String[]{"topic2"});
        builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        UUID uuid = UUID.randomUUID();
        String client1 = "client1";
        StreamThread thread = new StreamThread(builder, this.config, (KafkaClientSupplier)this.mockClientSupplier, "test", client1, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.configure(this.config.getConsumerConfigs(thread, "test", client1));
        List activeTaskList = Utils.mkList((Object[])new TaskId[]{this.task0, this.task3});
        HashMap<TaskId, Set> activeTasks = new HashMap<TaskId, Set>();
        HashMap<TaskId, Set> standbyTasks = new HashMap<TaskId, Set>();
        activeTasks.put(this.task0, Utils.mkSet((Object[])new TopicPartition[]{this.t1p0}));
        activeTasks.put(this.task3, Utils.mkSet((Object[])new TopicPartition[]{t2p3}));
        standbyTasks.put(this.task1, Utils.mkSet((Object[])new TopicPartition[]{this.t1p0}));
        standbyTasks.put(this.task2, Utils.mkSet((Object[])new TopicPartition[]{this.t2p0}));
        AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, new HashMap());
        PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList((Object[])new TopicPartition[]{this.t1p0, t2p3}), info.encode());
        this.partitionAssignor.onAssignment(assignment);
        Assert.assertEquals(activeTasks, (Object)this.partitionAssignor.activeTasks());
        Assert.assertEquals(standbyTasks, (Object)this.partitionAssignor.standbyTasks());
    }

    @Test
    public void testAssignWithInternalTopics() throws Exception {
        String applicationId = "test";
        this.builder.setApplicationId(applicationId);
        this.builder.addInternalTopic("topicX");
        this.builder.addSource("source1", new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addSink("sink1", "topicX", new String[]{"processor1"});
        this.builder.addSource("source2", new String[]{"topicX"});
        this.builder.addProcessor("processor2", new MockProcessorSupplier(), new String[]{"source2"});
        List topics = Utils.mkList((Object[])new String[]{"topic1", "test-topicX"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1, this.task2});
        UUID uuid1 = UUID.randomUUID();
        String client1 = "client1";
        StreamThread thread10 = new StreamThread(this.builder, this.config, (KafkaClientSupplier)this.mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.configure(this.config.getConsumerConfigs(thread10, applicationId, client1));
        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread10.config, this.mockClientSupplier.restoreConsumer);
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)internalTopicManager);
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        Set emptyTasks = Collections.emptySet();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, "localhost:2171").encode()));
        this.partitionAssignor.assign(this.metadata, subscriptions);
        Assert.assertEquals((long)1L, (long)internalTopicManager.readyTopics.size());
        Assert.assertEquals((long)allTasks.size(), (long)internalTopicManager.readyTopics.get("test-topicX").intValue());
    }

    @Test
    public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throws Exception {
        String applicationId = "test";
        this.builder.setApplicationId(applicationId);
        this.builder.addInternalTopic("topicX");
        this.builder.addSource("source1", new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addSink("sink1", "topicX", new String[]{"processor1"});
        this.builder.addSource("source2", new String[]{"topicX"});
        this.builder.addInternalTopic("topicZ");
        this.builder.addProcessor("processor2", new MockProcessorSupplier(), new String[]{"source2"});
        this.builder.addSink("sink2", "topicZ", new String[]{"processor2"});
        this.builder.addSource("source3", new String[]{"topicZ"});
        List topics = Utils.mkList((Object[])new String[]{"topic1", "test-topicX", "test-topicZ"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1, this.task2});
        UUID uuid1 = UUID.randomUUID();
        String client1 = "client1";
        StreamThread thread10 = new StreamThread(this.builder, this.config, (KafkaClientSupplier)this.mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.configure(this.config.getConsumerConfigs(thread10, applicationId, client1));
        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread10.config, this.mockClientSupplier.restoreConsumer);
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)internalTopicManager);
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        Set emptyTasks = Collections.emptySet();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, "localhost:2171").encode()));
        this.partitionAssignor.assign(this.metadata, subscriptions);
        Assert.assertEquals((long)2L, (long)internalTopicManager.readyTopics.size());
        Assert.assertEquals((long)allTasks.size(), (long)internalTopicManager.readyTopics.get("test-topicZ").intValue());
    }

    @Test
    public void shouldAddUserDefinedEndPointToSubscription() throws Exception {
        Properties properties = this.configProps();
        properties.put("application.server", "localhost:8080");
        StreamsConfig config = new StreamsConfig((Map)properties);
        String applicationId = "application-id";
        this.builder.setApplicationId("application-id");
        this.builder.addSource("source", new String[]{"input"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        this.builder.addSink("sink", "output", new String[]{"processor"});
        UUID uuid1 = UUID.randomUUID();
        String client1 = "client1";
        StreamThread streamThread = new StreamThread(this.builder, config, (KafkaClientSupplier)this.mockClientSupplier, "application-id", "client1", uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.configure(config.getConsumerConfigs(streamThread, "application-id", "client1"));
        PartitionAssignor.Subscription subscription = this.partitionAssignor.subscription(Utils.mkSet((Object[])new String[]{"input"}));
        SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode((ByteBuffer)subscription.userData());
        Assert.assertEquals((Object)"localhost:8080", (Object)subscriptionInfo.userEndPoint);
    }

    @Test
    public void shouldMapUserEndPointToTopicPartitions() throws Exception {
        Properties properties = this.configProps();
        String myEndPoint = "localhost:8080";
        properties.put("application.server", "localhost:8080");
        StreamsConfig config = new StreamsConfig((Map)properties);
        String applicationId = "application-id";
        this.builder.setApplicationId("application-id");
        this.builder.addSource("source", new String[]{"topic1"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        this.builder.addSink("sink", "output", new String[]{"processor"});
        List topics = Utils.mkList((Object[])new String[]{"topic1"});
        UUID uuid1 = UUID.randomUUID();
        String client1 = "client1";
        StreamThread streamThread = new StreamThread(this.builder, config, (KafkaClientSupplier)this.mockClientSupplier, "application-id", "client1", uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
        partitionAssignor.configure(config.getConsumerConfigs(streamThread, "application-id", "client1"));
        partitionAssignor.setInternalTopicManager((InternalTopicManager)new MockInternalTopicManager(streamThread.config, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        Set emptyTasks = Collections.emptySet();
        subscriptions.put("consumer1", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, "localhost:8080").encode()));
        Map assignments = partitionAssignor.assign(this.metadata, subscriptions);
        PartitionAssignor.Assignment consumerAssignment = (PartitionAssignor.Assignment)assignments.get("consumer1");
        AssignmentInfo assignmentInfo = AssignmentInfo.decode((ByteBuffer)consumerAssignment.userData());
        Set topicPartitions = (Set)assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 8080));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2)}), (Object)topicPartitions);
    }

    @Test
    public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() throws Exception {
        Properties properties = this.configProps();
        String myEndPoint = "localhost";
        properties.put("application.server", "localhost");
        StreamsConfig config = new StreamsConfig((Map)properties);
        UUID uuid1 = UUID.randomUUID();
        String client1 = "client1";
        String applicationId = "application-id";
        this.builder.setApplicationId("application-id");
        StreamThread streamThread = new StreamThread(this.builder, config, (KafkaClientSupplier)this.mockClientSupplier, "application-id", "client1", uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)new MockInternalTopicManager(streamThread.config, this.mockClientSupplier.restoreConsumer));
        try {
            this.partitionAssignor.configure(config.getConsumerConfigs(streamThread, "application-id", "client1"));
            Assert.fail((String)"expected to an exception due to invalid config");
        }
        catch (ConfigException configException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() throws Exception {
        Properties properties = this.configProps();
        String myEndPoint = "localhost:j87yhk";
        properties.put("application.server", "localhost:j87yhk");
        StreamsConfig config = new StreamsConfig((Map)properties);
        UUID uuid1 = UUID.randomUUID();
        String client1 = "client1";
        String applicationId = "application-id";
        this.builder.setApplicationId("application-id");
        StreamThread streamThread = new StreamThread(this.builder, config, (KafkaClientSupplier)this.mockClientSupplier, "application-id", "client1", uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        try {
            this.partitionAssignor.configure(config.getConsumerConfigs(streamThread, "application-id", "client1"));
            Assert.fail((String)"expected to an exception due to invalid config");
        }
        catch (ConfigException configException) {
            // empty catch block
        }
    }

    @Test
    public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exception {
        List<TopicPartition> topic = Collections.singletonList(new TopicPartition("topic", 0));
        Map<HostInfo, Set<TopicPartition>> hostState = Collections.singletonMap(new HostInfo("localhost", 80), Collections.singleton(new TopicPartition("topic", 0)));
        AssignmentInfo assignmentInfo = new AssignmentInfo(Collections.singletonList(new TaskId(0, 0)), Collections.emptyMap(), hostState);
        this.partitionAssignor.onAssignment(new PartitionAssignor.Assignment(topic, assignmentInfo.encode()));
        Assert.assertEquals(hostState, (Object)this.partitionAssignor.getPartitionsByHostState());
    }

    @Test
    public void shouldSetClusterMetadataOnAssignment() throws Exception {
        List<TopicPartition> topic = Collections.singletonList(new TopicPartition("topic", 0));
        Map<HostInfo, Set<TopicPartition>> hostState = Collections.singletonMap(new HostInfo("localhost", 80), Collections.singleton(new TopicPartition("topic", 0)));
        AssignmentInfo assignmentInfo = new AssignmentInfo(Collections.singletonList(new TaskId(0, 0)), Collections.emptyMap(), hostState);
        this.partitionAssignor.onAssignment(new PartitionAssignor.Assignment(topic, assignmentInfo.encode()));
        Cluster cluster = this.partitionAssignor.clusterMetadata();
        List partitionInfos = cluster.partitionsForTopic("topic");
        PartitionInfo partitionInfo = (PartitionInfo)partitionInfos.get(0);
        Assert.assertEquals((long)1L, (long)partitionInfos.size());
        Assert.assertEquals((Object)"topic", (Object)partitionInfo.topic());
        Assert.assertEquals((long)0L, (long)partitionInfo.partition());
    }

    @Test
    public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() throws Exception {
        Cluster cluster = this.partitionAssignor.clusterMetadata();
        Assert.assertNotNull((Object)cluster);
    }

    @Test
    public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() {
        String applicationId = "application-id";
        KStreamBuilder builder = new KStreamBuilder();
        builder.setApplicationId("application-id");
        KStream stream1 = builder.stream(new String[]{"topic1"}).selectKey((KeyValueMapper)new KeyValueMapper<Object, Object, Object>(){

            public Object apply(Object key, Object value) {
                return null;
            }
        }).groupByKey().count("count").toStream().map((KeyValueMapper)new KeyValueMapper<Object, Long, KeyValue<Object, Object>>(){

            public KeyValue<Object, Object> apply(Object key, Long value) {
                return null;
            }
        });
        builder.stream(new String[]{"unknownTopic"}).selectKey((KeyValueMapper)new KeyValueMapper<Object, Object, Object>(){

            public Object apply(Object key, Object value) {
                return null;
            }
        }).join(stream1, new ValueJoiner(){

            public Object apply(Object value1, Object value2) {
                return null;
            }
        }, JoinWindows.of((long)0L));
        UUID uuid = UUID.randomUUID();
        String client = "client1";
        StreamThread streamThread = new StreamThread((TopologyBuilder)builder, this.config, (KafkaClientSupplier)this.mockClientSupplier, "application-id", "client1", uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState((TopologyBuilder)builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.configure(this.config.getConsumerConfigs(streamThread, "application-id", "client1"));
        MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(streamThread.config, this.mockClientSupplier.restoreConsumer);
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)mockInternalTopicManager);
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        Set emptyTasks = Collections.emptySet();
        subscriptions.put("client1", new PartitionAssignor.Subscription(Collections.singletonList("unknownTopic"), new SubscriptionInfo(uuid, emptyTasks, emptyTasks, "localhost:2171").encode()));
        Map assignment = this.partitionAssignor.assign(this.metadata, subscriptions);
        HashMap<String, Integer> expectedCreatedInternalTopics = new HashMap<String, Integer>();
        expectedCreatedInternalTopics.put("application-id-count-repartition", 3);
        expectedCreatedInternalTopics.put("application-id-count-changelog", 3);
        Assert.assertThat(mockInternalTopicManager.readyTopics, (Matcher)CoreMatchers.equalTo(expectedCreatedInternalTopics));
        List<TopicPartition> expectedAssignment = Arrays.asList(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2), new TopicPartition("application-id-count-repartition", 0), new TopicPartition("application-id-count-repartition", 1), new TopicPartition("application-id-count-repartition", 2));
        Assert.assertThat(new HashSet(((PartitionAssignor.Assignment)assignment.get("client1")).partitions()), (Matcher)CoreMatchers.equalTo(new HashSet<TopicPartition>(expectedAssignment)));
    }

    @Test
    public void shouldUpdatePartitionHostInfoMapOnAssignment() throws Exception {
        TopicPartition partitionOne = new TopicPartition("topic", 1);
        TopicPartition partitionTwo = new TopicPartition("topic", 2);
        Map<HostInfo, Set<TopicPartition>> firstHostState = Collections.singletonMap(new HostInfo("localhost", 9090), Utils.mkSet((Object[])new TopicPartition[]{partitionOne, partitionTwo}));
        HashMap<HostInfo, Set<TopicPartition>> secondHostState = new HashMap<HostInfo, Set<TopicPartition>>();
        secondHostState.put(new HostInfo("localhost", 9090), Utils.mkSet((Object[])new TopicPartition[]{partitionOne}));
        secondHostState.put(new HostInfo("other", 9090), Utils.mkSet((Object[])new TopicPartition[]{partitionTwo}));
        this.partitionAssignor.onAssignment(this.createAssignment(firstHostState));
        Assert.assertEquals(firstHostState, (Object)this.partitionAssignor.getPartitionsByHostState());
        this.partitionAssignor.onAssignment(this.createAssignment(secondHostState));
        Assert.assertEquals(secondHostState, (Object)this.partitionAssignor.getPartitionsByHostState());
    }

    @Test
    public void shouldUpdateClusterMetadataOnAssignment() throws Exception {
        TopicPartition topicOne = new TopicPartition("topic", 1);
        TopicPartition topicTwo = new TopicPartition("topic2", 2);
        Map<HostInfo, Set<TopicPartition>> firstHostState = Collections.singletonMap(new HostInfo("localhost", 9090), Utils.mkSet((Object[])new TopicPartition[]{topicOne}));
        Map<HostInfo, Set<TopicPartition>> secondHostState = Collections.singletonMap(new HostInfo("localhost", 9090), Utils.mkSet((Object[])new TopicPartition[]{topicOne, topicTwo}));
        this.partitionAssignor.onAssignment(this.createAssignment(firstHostState));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"topic"}), (Object)this.partitionAssignor.clusterMetadata().topics());
        this.partitionAssignor.onAssignment(this.createAssignment(secondHostState));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"topic", "topic2"}), (Object)this.partitionAssignor.clusterMetadata().topics());
    }

    @Test
    public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() throws Exception {
        Properties props = this.configProps();
        props.setProperty("num.standby.replicas", "1");
        StreamsConfig config = new StreamsConfig((Map)props);
        KStreamBuilder builder = new KStreamBuilder();
        String applicationId = "appId";
        builder.setApplicationId("appId");
        builder.stream(new String[]{"topic1"}).groupByKey().count("count");
        UUID uuid = UUID.randomUUID();
        String client = "client1";
        StreamThread streamThread = new StreamThread((TopologyBuilder)builder, config, (KafkaClientSupplier)this.mockClientSupplier, "appId", "client1", uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState((TopologyBuilder)builder, StreamsMetadataState.UNKNOWN_HOST), 0L);
        this.partitionAssignor.configure(config.getConsumerConfigs(streamThread, "appId", "client1"));
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)new MockInternalTopicManager(streamThread.config, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        Set emptyTasks = Collections.emptySet();
        subscriptions.put("consumer1", new PartitionAssignor.Subscription(Collections.singletonList("topic1"), new SubscriptionInfo(uuid, emptyTasks, emptyTasks, "localhost:2171").encode()));
        subscriptions.put("consumer2", new PartitionAssignor.Subscription(Collections.singletonList("topic1"), new SubscriptionInfo(UUID.randomUUID(), emptyTasks, emptyTasks, "other:9090").encode()));
        Set allPartitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t1p1, this.t1p2});
        Map assign = this.partitionAssignor.assign(this.metadata, subscriptions);
        PartitionAssignor.Assignment consumer1Assignment = (PartitionAssignor.Assignment)assign.get("consumer1");
        AssignmentInfo assignmentInfo = AssignmentInfo.decode((ByteBuffer)consumer1Assignment.userData());
        Set consumer1partitions = (Set)assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 2171));
        Set consumer2Partitions = (Set)assignmentInfo.partitionsByHost.get(new HostInfo("other", 9090));
        HashSet allAssignedPartitions = new HashSet(consumer1partitions);
        allAssignedPartitions.addAll(consumer2Partitions);
        Assert.assertThat((Object)consumer1partitions, (Matcher)CoreMatchers.not((Object)allPartitions));
        Assert.assertThat((Object)consumer2Partitions, (Matcher)CoreMatchers.not((Object)allPartitions));
        Assert.assertThat(allAssignedPartitions, (Matcher)CoreMatchers.equalTo((Object)allPartitions));
    }

    @Test(expected=KafkaException.class)
    public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() throws Exception {
        this.partitionAssignor.configure(Collections.singletonMap("num.standby.replicas", 1));
    }

    @Test(expected=KafkaException.class)
    public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotStreamThreadInstance() throws Exception {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("num.standby.replicas", 1);
        config.put("__stream.thread.instance__", "i am not a stream thread");
        this.partitionAssignor.configure(config);
    }

    private PartitionAssignor.Assignment createAssignment(Map<HostInfo, Set<TopicPartition>> firstHostState) {
        AssignmentInfo info = new AssignmentInfo(Collections.emptyList(), Collections.emptyMap(), firstHostState);
        return new PartitionAssignor.Assignment(Collections.emptyList(), info.encode());
    }

    private AssignmentInfo checkAssignment(Set<String> expectedTopics, PartitionAssignor.Assignment assignment) {
        AssignmentInfo info = AssignmentInfo.decode((ByteBuffer)assignment.userData());
        Assert.assertEquals((long)assignment.partitions().size(), (long)info.activeTasks.size());
        ArrayList<TaskId> activeTasks = new ArrayList<TaskId>();
        HashSet<String> activeTopics = new HashSet<String>();
        for (TopicPartition partition : assignment.partitions()) {
            activeTasks.add(new TaskId(0, partition.partition()));
            activeTopics.add(partition.topic());
        }
        Assert.assertEquals(activeTasks, (Object)info.activeTasks);
        Assert.assertEquals(expectedTopics, activeTopics);
        HashSet<String> standbyTopics = new HashSet<String>();
        for (Map.Entry entry : info.standbyTasks.entrySet()) {
            TaskId id = (TaskId)entry.getKey();
            Set partitions = (Set)entry.getValue();
            for (TopicPartition partition : partitions) {
                Assert.assertEquals((long)id.partition, (long)partition.partition());
                standbyTopics.add(partition.topic());
            }
        }
        if (info.standbyTasks.size() > 0) {
            Assert.assertEquals(expectedTopics, standbyTopics);
        }
        return info;
    }
}

