/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsBuilderTest;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.processor.DefaultPartitionGrouper;
import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.StateStoreSupplier;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.SingleGroupPartitionGrouperStub;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.apache.kafka.streams.processor.internals.ThreadDataProvider;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.Assert;
import org.junit.Test;

public class StreamPartitionAssignorTest {
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t1p2 = new TopicPartition("topic1", 2);
    private final TopicPartition t1p3 = new TopicPartition("topic1", 3);
    private final TopicPartition t2p0 = new TopicPartition("topic2", 0);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final TopicPartition t2p2 = new TopicPartition("topic2", 2);
    private final TopicPartition t2p3 = new TopicPartition("topic2", 3);
    private final TopicPartition t3p0 = new TopicPartition("topic3", 0);
    private final TopicPartition t3p1 = new TopicPartition("topic3", 1);
    private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
    private final TopicPartition t3p3 = new TopicPartition("topic3", 3);
    private final Set<String> allTopics = Utils.mkSet((Object[])new String[]{"topic1", "topic2"});
    private final List<PartitionInfo> infos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]));
    private final Cluster metadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), this.infos, Collections.emptySet(), Collections.emptySet());
    private final TaskId task0 = new TaskId(0, 0);
    private final TaskId task1 = new TaskId(0, 1);
    private final TaskId task2 = new TaskId(0, 2);
    private final TaskId task3 = new TaskId(0, 3);
    private final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
    private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
    private final StreamsConfig config = new StreamsConfig((Map)this.configProps());
    private final ThreadDataProvider threadDataProvider = (ThreadDataProvider)EasyMock.createNiceMock(ThreadDataProvider.class);
    private final Map<String, Object> configurationMap = new HashMap<String, Object>();
    private final DefaultPartitionGrouper defaultPartitionGrouper = new DefaultPartitionGrouper();
    private final SingleGroupPartitionGrouperStub stubPartitionGrouper = new SingleGroupPartitionGrouperStub();
    private final String userEndPoint = "localhost:8080";

    private Properties configProps() {
        return new Properties(){
            {
                this.setProperty("application.id", "stream-partition-assignor-test");
                this.setProperty("bootstrap.servers", "localhost:8080");
                this.setProperty("buffered.records.per.partition", "3");
                this.setProperty("default.timestamp.extractor", MockTimestampExtractor.class.getName());
            }
        };
    }

    private void configurePartitionAssignor(int standbyReplicas, String endPoint) {
        this.configurationMap.put("__stream.thread.instance__", this.threadDataProvider);
        this.configurationMap.put("num.standby.replicas", standbyReplicas);
        this.configurationMap.put("application.server", endPoint);
        this.partitionAssignor.configure(this.configurationMap);
    }

    private void mockThreadDataProvider(Set<TaskId> prevTasks, Set<TaskId> cachedTasks, UUID processId, PartitionGrouper partitionGrouper, InternalTopologyBuilder builder) {
        EasyMock.expect((Object)this.threadDataProvider.name()).andReturn((Object)"name").anyTimes();
        EasyMock.expect((Object)this.threadDataProvider.prevActiveTasks()).andReturn(prevTasks).anyTimes();
        EasyMock.expect((Object)this.threadDataProvider.cachedTasks()).andReturn(cachedTasks).anyTimes();
        EasyMock.expect((Object)this.threadDataProvider.config()).andReturn((Object)this.config).anyTimes();
        EasyMock.expect((Object)this.threadDataProvider.builder()).andReturn((Object)builder).anyTimes();
        EasyMock.expect((Object)this.threadDataProvider.processId()).andReturn((Object)processId).anyTimes();
        EasyMock.expect((Object)this.threadDataProvider.partitionGrouper()).andReturn((Object)partitionGrouper).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.threadDataProvider});
    }

    @Test
    public void shouldInterleaveTasksByGroupId() {
        TaskId taskIdA0 = new TaskId(0, 0);
        TaskId taskIdA1 = new TaskId(0, 1);
        TaskId taskIdA2 = new TaskId(0, 2);
        TaskId taskIdA3 = new TaskId(0, 3);
        TaskId taskIdB0 = new TaskId(1, 0);
        TaskId taskIdB1 = new TaskId(1, 1);
        TaskId taskIdB2 = new TaskId(1, 2);
        TaskId taskIdC0 = new TaskId(2, 0);
        TaskId taskIdC1 = new TaskId(2, 1);
        List<TaskId> expectedSubList1 = Arrays.asList(taskIdA0, taskIdA3, taskIdB2);
        List<TaskId> expectedSubList2 = Arrays.asList(taskIdA1, taskIdB0, taskIdC0);
        List<TaskId> expectedSubList3 = Arrays.asList(taskIdA2, taskIdB1, taskIdC1);
        List<List> embeddedList = Arrays.asList(expectedSubList1, expectedSubList2, expectedSubList3);
        List<TaskId> tasks = Arrays.asList(taskIdC0, taskIdC1, taskIdB0, taskIdB1, taskIdB2, taskIdA0, taskIdA1, taskIdA2, taskIdA3);
        Collections.shuffle(tasks);
        List interleavedTaskIds = this.partitionAssignor.interleaveTasksByGroupId(tasks, 3);
        Assert.assertThat((Object)interleavedTaskIds, (Matcher)CoreMatchers.equalTo(embeddedList));
    }

    @Test
    public void testSubscription() {
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        Set prevTasks = Utils.mkSet((Object[])new TaskId[]{new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1)});
        Set cachedTasks = Utils.mkSet((Object[])new TaskId[]{new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1), new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)});
        UUID processId = UUID.randomUUID();
        this.mockThreadDataProvider(prevTasks, cachedTasks, processId, this.stubPartitionGrouper, this.builder);
        this.configurePartitionAssignor(0, null);
        PartitionAssignor.Subscription subscription = this.partitionAssignor.subscription(Utils.mkSet((Object[])new String[]{"topic1", "topic2"}));
        Collections.sort(subscription.topics());
        Assert.assertEquals((Object)Utils.mkList((Object[])new String[]{"topic1", "topic2"}), (Object)subscription.topics());
        HashSet standbyTasks = new HashSet(cachedTasks);
        standbyTasks.removeAll(prevTasks);
        SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks, null);
        Assert.assertEquals((Object)info.encode(), (Object)subscription.userData());
    }

    @Test
    public void testAssignBasic() {
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        List topics = Utils.mkList((Object[])new String[]{"topic1", "topic2"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1, this.task2});
        Set prevTasks10 = Utils.mkSet((Object[])new TaskId[]{this.task0});
        Set prevTasks11 = Utils.mkSet((Object[])new TaskId[]{this.task1});
        Set prevTasks20 = Utils.mkSet((Object[])new TaskId[]{this.task2});
        Set standbyTasks10 = Utils.mkSet((Object[])new TaskId[]{this.task1});
        Set standbyTasks11 = Utils.mkSet((Object[])new TaskId[]{this.task2});
        Set standbyTasks20 = Utils.mkSet((Object[])new TaskId[]{this.task0});
        UUID uuid1 = UUID.randomUUID();
        UUID uuid2 = UUID.randomUUID();
        this.mockThreadDataProvider(prevTasks10, standbyTasks10, uuid1, this.stubPartitionGrouper, this.builder);
        this.configurePartitionAssignor(0, null);
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)new MockInternalTopicManager(this.config, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, "localhost:8080").encode()));
        subscriptions.put("consumer11", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, "localhost:8080").encode()));
        subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, "localhost:8080").encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, subscriptions);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new Set[]{Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t2p0}), Utils.mkSet((Object[])new TopicPartition[]{this.t1p1, this.t2p1})}), (Object)Utils.mkSet((Object[])new HashSet[]{new HashSet(((PartitionAssignor.Assignment)assignments.get("consumer10")).partitions()), new HashSet(((PartitionAssignor.Assignment)assignments.get("consumer11")).partitions())}));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p2, this.t2p2}), new HashSet(((PartitionAssignor.Assignment)assignments.get("consumer20")).partitions()));
        AssignmentInfo info10 = this.checkAssignment(this.allTopics, (PartitionAssignor.Assignment)assignments.get("consumer10"));
        HashSet allActiveTasks = new HashSet(info10.activeTasks);
        AssignmentInfo info11 = this.checkAssignment(this.allTopics, (PartitionAssignor.Assignment)assignments.get("consumer11"));
        allActiveTasks.addAll(info11.activeTasks);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1}), allActiveTasks);
        AssignmentInfo info20 = this.checkAssignment(this.allTopics, (PartitionAssignor.Assignment)assignments.get("consumer20"));
        allActiveTasks.addAll(info20.activeTasks);
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals((Object)allTasks, new HashSet(allActiveTasks));
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals((Object)allTasks, allActiveTasks);
    }

    @Test
    public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() {
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addProcessor("processorII", new MockProcessorSupplier(), new String[]{"source2"});
        List<PartitionInfo> localInfos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 3, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 3, Node.noNode(), new Node[0], new Node[0]));
        Cluster localMetadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), localInfos, Collections.emptySet(), Collections.emptySet());
        List topics = Utils.mkList((Object[])new String[]{"topic1", "topic2"});
        TaskId taskIdA0 = new TaskId(0, 0);
        TaskId taskIdA1 = new TaskId(0, 1);
        TaskId taskIdA2 = new TaskId(0, 2);
        TaskId taskIdA3 = new TaskId(0, 3);
        TaskId taskIdB0 = new TaskId(1, 0);
        TaskId taskIdB1 = new TaskId(1, 1);
        TaskId taskIdB2 = new TaskId(1, 2);
        TaskId taskIdB3 = new TaskId(1, 3);
        UUID uuid1 = UUID.randomUUID();
        this.mockThreadDataProvider(new HashSet<TaskId>(), new HashSet<TaskId>(), uuid1, (PartitionGrouper)this.defaultPartitionGrouper, this.builder);
        this.configurePartitionAssignor(0, null);
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)new MockInternalTopicManager(this.config, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet(), new HashSet(), "localhost:8080").encode()));
        subscriptions.put("consumer11", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, new HashSet(), new HashSet(), "localhost:8080").encode()));
        Map assignments = this.partitionAssignor.assign(localMetadata, subscriptions);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new Set[]{Utils.mkSet((Object[])new TopicPartition[]{this.t2p2, this.t1p0, this.t1p2, this.t2p0}), Utils.mkSet((Object[])new TopicPartition[]{this.t1p1, this.t2p1, this.t1p3, this.t2p3})}), (Object)Utils.mkSet((Object[])new HashSet[]{new HashSet(((PartitionAssignor.Assignment)assignments.get("consumer10")).partitions()), new HashSet(((PartitionAssignor.Assignment)assignments.get("consumer11")).partitions())}));
        AssignmentInfo info10 = AssignmentInfo.decode((ByteBuffer)((PartitionAssignor.Assignment)assignments.get("consumer10")).userData());
        List<TaskId> expectedInfo10TaskIds = Arrays.asList(taskIdA1, taskIdA3, taskIdB1, taskIdB3);
        Assert.assertEquals(expectedInfo10TaskIds, (Object)info10.activeTasks);
        AssignmentInfo info11 = AssignmentInfo.decode((ByteBuffer)((PartitionAssignor.Assignment)assignments.get("consumer11")).userData());
        List<TaskId> expectedInfo11TaskIds = Arrays.asList(taskIdA0, taskIdA2, taskIdB0, taskIdB2);
        Assert.assertEquals(expectedInfo11TaskIds, (Object)info11.activeTasks);
    }

    @Test
    public void testAssignWithPartialTopology() {
        Properties props = this.configProps();
        props.put("partition.grouper", SingleGroupPartitionGrouperStub.class);
        StreamsConfig config = new StreamsConfig((Map)props);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store1", false), new String[]{"processor1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor2", new MockProcessorSupplier(), new String[]{"source2"});
        this.builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store2", false), new String[]{"processor2"});
        List topics = Utils.mkList((Object[])new String[]{"topic1", "topic2"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1, this.task2});
        UUID uuid1 = UUID.randomUUID();
        this.mockThreadDataProvider(Collections.emptySet(), Collections.emptySet(), uuid1, this.stubPartitionGrouper, this.builder);
        this.configurePartitionAssignor(0, null);
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)new MockInternalTopicManager(config, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), "localhost:8080").encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, subscriptions);
        AssignmentInfo info10 = this.checkAssignment(Utils.mkSet((Object[])new String[]{"topic1"}), (PartitionAssignor.Assignment)assignments.get("consumer10"));
        HashSet allActiveTasks = new HashSet(info10.activeTasks);
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals((Object)allTasks, new HashSet(allActiveTasks));
    }

    @Test
    public void testAssignEmptyMetadata() {
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        List topics = Utils.mkList((Object[])new String[]{"topic1", "topic2"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1, this.task2});
        Set prevTasks10 = Utils.mkSet((Object[])new TaskId[]{this.task0});
        Set standbyTasks10 = Utils.mkSet((Object[])new TaskId[]{this.task1});
        Cluster emptyMetadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
        UUID uuid1 = UUID.randomUUID();
        this.mockThreadDataProvider(prevTasks10, standbyTasks10, uuid1, this.stubPartitionGrouper, this.builder);
        this.configurePartitionAssignor(0, null);
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, "localhost:8080").encode()));
        Map assignments = this.partitionAssignor.assign(emptyMetadata, subscriptions);
        Assert.assertEquals(Collections.emptySet(), new HashSet(((PartitionAssignor.Assignment)assignments.get("consumer10")).partitions()));
        AssignmentInfo info10 = this.checkAssignment(Collections.emptySet(), (PartitionAssignor.Assignment)assignments.get("consumer10"));
        HashSet allActiveTasks = new HashSet(info10.activeTasks);
        Assert.assertEquals((long)0L, (long)allActiveTasks.size());
        Assert.assertEquals(Collections.emptySet(), new HashSet(allActiveTasks));
        assignments = this.partitionAssignor.assign(this.metadata, subscriptions);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new Set[]{Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t2p0, this.t1p0, this.t2p0, this.t1p1, this.t2p1, this.t1p2, this.t2p2})}), (Object)Utils.mkSet((Object[])new HashSet[]{new HashSet(((PartitionAssignor.Assignment)assignments.get("consumer10")).partitions())}));
        info10 = this.checkAssignment(this.allTopics, (PartitionAssignor.Assignment)assignments.get("consumer10"));
        allActiveTasks.addAll(info10.activeTasks);
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals((Object)allTasks, new HashSet(allActiveTasks));
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals((Object)allTasks, allActiveTasks);
    }

    @Test
    public void testAssignWithNewTasks() {
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addSource(null, "source3", null, null, null, new String[]{"topic3"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2", "source3"});
        List topics = Utils.mkList((Object[])new String[]{"topic1", "topic2", "topic3"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1, this.task2, this.task3});
        Set prevTasks10 = Utils.mkSet((Object[])new TaskId[]{this.task0});
        Set prevTasks11 = Utils.mkSet((Object[])new TaskId[]{this.task1});
        Set prevTasks20 = Utils.mkSet((Object[])new TaskId[]{this.task2});
        UUID uuid1 = UUID.randomUUID();
        UUID uuid2 = UUID.randomUUID();
        this.mockThreadDataProvider(prevTasks10, Collections.emptySet(), uuid1, this.stubPartitionGrouper, this.builder);
        this.configurePartitionAssignor(0, null);
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)new MockInternalTopicManager(this.config, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.emptySet(), "localhost:8080").encode()));
        subscriptions.put("consumer11", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.emptySet(), "localhost:8080").encode()));
        subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.emptySet(), "localhost:8080").encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, subscriptions);
        AssignmentInfo info = AssignmentInfo.decode((ByteBuffer)((PartitionAssignor.Assignment)assignments.get("consumer10")).userData());
        HashSet allActiveTasks = new HashSet(info.activeTasks);
        HashSet allPartitions = new HashSet(((PartitionAssignor.Assignment)assignments.get("consumer10")).partitions());
        info = AssignmentInfo.decode((ByteBuffer)((PartitionAssignor.Assignment)assignments.get("consumer11")).userData());
        allActiveTasks.addAll(info.activeTasks);
        allPartitions.addAll(((PartitionAssignor.Assignment)assignments.get("consumer11")).partitions());
        info = AssignmentInfo.decode((ByteBuffer)((PartitionAssignor.Assignment)assignments.get("consumer20")).userData());
        allActiveTasks.addAll(info.activeTasks);
        allPartitions.addAll(((PartitionAssignor.Assignment)assignments.get("consumer20")).partitions());
        Assert.assertEquals((Object)allTasks, allActiveTasks);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t1p1, this.t1p2, this.t2p0, this.t2p1, this.t2p2, this.t3p0, this.t3p1, this.t3p2, this.t3p3}), allPartitions);
    }

    @Test
    public void testAssignWithStates() {
        String applicationId = "test";
        this.builder.setApplicationId(applicationId);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor-1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store1", false), new String[]{"processor-1"});
        this.builder.addProcessor("processor-2", new MockProcessorSupplier(), new String[]{"source2"});
        this.builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store2", false), new String[]{"processor-2"});
        this.builder.addStateStore((StateStoreSupplier)new MockStateStoreSupplier("store3", false), new String[]{"processor-2"});
        List topics = Utils.mkList((Object[])new String[]{"topic1", "topic2"});
        TaskId task00 = new TaskId(0, 0);
        TaskId task01 = new TaskId(0, 1);
        TaskId task02 = new TaskId(0, 2);
        TaskId task10 = new TaskId(1, 0);
        TaskId task11 = new TaskId(1, 1);
        TaskId task12 = new TaskId(1, 2);
        List tasks = Utils.mkList((Object[])new TaskId[]{task00, task01, task02, task10, task11, task12});
        UUID uuid1 = UUID.randomUUID();
        UUID uuid2 = UUID.randomUUID();
        this.mockThreadDataProvider(Collections.emptySet(), Collections.emptySet(), uuid1, (PartitionGrouper)this.defaultPartitionGrouper, this.builder);
        this.configurePartitionAssignor(0, null);
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)new MockInternalTopicManager(this.config, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), "localhost:8080").encode()));
        subscriptions.put("consumer11", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.emptySet(), Collections.emptySet(), "localhost:8080").encode()));
        subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.emptySet(), Collections.emptySet(), "localhost:8080").encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, subscriptions);
        Assert.assertEquals((long)2L, (long)((PartitionAssignor.Assignment)assignments.get("consumer10")).partitions().size());
        Assert.assertEquals((long)2L, (long)((PartitionAssignor.Assignment)assignments.get("consumer11")).partitions().size());
        Assert.assertEquals((long)2L, (long)((PartitionAssignor.Assignment)assignments.get("consumer20")).partitions().size());
        AssignmentInfo info10 = AssignmentInfo.decode((ByteBuffer)((PartitionAssignor.Assignment)assignments.get("consumer10")).userData());
        AssignmentInfo info11 = AssignmentInfo.decode((ByteBuffer)((PartitionAssignor.Assignment)assignments.get("consumer11")).userData());
        AssignmentInfo info20 = AssignmentInfo.decode((ByteBuffer)((PartitionAssignor.Assignment)assignments.get("consumer20")).userData());
        Assert.assertEquals((long)2L, (long)info10.activeTasks.size());
        Assert.assertEquals((long)2L, (long)info11.activeTasks.size());
        Assert.assertEquals((long)2L, (long)info20.activeTasks.size());
        HashSet allTasks = new HashSet();
        allTasks.addAll(info10.activeTasks);
        allTasks.addAll(info11.activeTasks);
        allTasks.addAll(info20.activeTasks);
        Assert.assertEquals(new HashSet(tasks), allTasks);
        Map topicGroups = this.builder.topicGroups();
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{task00, task01, task02}), this.tasksForState(applicationId, "store1", tasks, topicGroups));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{task10, task11, task12}), this.tasksForState(applicationId, "store2", tasks, topicGroups));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{task10, task11, task12}), this.tasksForState(applicationId, "store3", tasks, topicGroups));
    }

    private Set<TaskId> tasksForState(String applicationId, String storeName, List<TaskId> tasks, Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups) {
        String changelogTopic = ProcessorStateManager.storeChangelogTopic((String)applicationId, (String)storeName);
        HashSet<TaskId> ids = new HashSet<TaskId>();
        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
            Set stateChangelogTopics = entry.getValue().stateChangelogTopics.keySet();
            if (!stateChangelogTopics.contains(changelogTopic)) continue;
            for (TaskId id : tasks) {
                if (id.topicGroupId != entry.getKey()) continue;
                ids.add(id);
            }
        }
        return ids;
    }

    @Test
    public void testAssignWithStandbyReplicas() {
        Properties props = this.configProps();
        props.setProperty("num.standby.replicas", "1");
        StreamsConfig config = new StreamsConfig((Map)props);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        List topics = Utils.mkList((Object[])new String[]{"topic1", "topic2"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1, this.task2});
        Set prevTasks00 = Utils.mkSet((Object[])new TaskId[]{this.task0});
        Set prevTasks01 = Utils.mkSet((Object[])new TaskId[]{this.task1});
        Set prevTasks02 = Utils.mkSet((Object[])new TaskId[]{this.task2});
        Set standbyTasks01 = Utils.mkSet((Object[])new TaskId[]{this.task1});
        Set standbyTasks02 = Utils.mkSet((Object[])new TaskId[]{this.task2});
        Set standbyTasks00 = Utils.mkSet((Object[])new TaskId[]{this.task0});
        UUID uuid1 = UUID.randomUUID();
        UUID uuid2 = UUID.randomUUID();
        this.mockThreadDataProvider(prevTasks00, standbyTasks01, uuid1, (PartitionGrouper)this.defaultPartitionGrouper, this.builder);
        this.configurePartitionAssignor(1, null);
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)new MockInternalTopicManager(config, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks00, standbyTasks01, "localhost:8080").encode()));
        subscriptions.put("consumer11", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks01, standbyTasks02, "localhost:8080").encode()));
        subscriptions.put("consumer20", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks02, standbyTasks00, "any:9097").encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, subscriptions);
        AssignmentInfo info10 = this.checkAssignment(this.allTopics, (PartitionAssignor.Assignment)assignments.get("consumer10"));
        HashSet allActiveTasks = new HashSet(info10.activeTasks);
        HashSet allStandbyTasks = new HashSet(info10.standbyTasks.keySet());
        AssignmentInfo info11 = this.checkAssignment(this.allTopics, (PartitionAssignor.Assignment)assignments.get("consumer11"));
        allActiveTasks.addAll(info11.activeTasks);
        allStandbyTasks.addAll(info11.standbyTasks.keySet());
        Assert.assertNotEquals((String)"same processId has same set of standby tasks", info11.standbyTasks.keySet(), info10.standbyTasks.keySet());
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1}), new HashSet(allActiveTasks));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{this.task2}), new HashSet(allStandbyTasks));
        AssignmentInfo info20 = this.checkAssignment(this.allTopics, (PartitionAssignor.Assignment)assignments.get("consumer20"));
        allActiveTasks.addAll(info20.activeTasks);
        allStandbyTasks.addAll(info20.standbyTasks.keySet());
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals((Object)allTasks, allActiveTasks);
        Assert.assertEquals((long)3L, (long)allStandbyTasks.size());
        Assert.assertEquals((Object)allTasks, allStandbyTasks);
    }

    @Test
    public void testOnAssignment() {
        TopicPartition t2p3 = new TopicPartition("topic2", 3);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        UUID uuid = UUID.randomUUID();
        this.mockThreadDataProvider(Collections.emptySet(), Collections.emptySet(), uuid, (PartitionGrouper)this.defaultPartitionGrouper, this.builder);
        this.configurePartitionAssignor(0, null);
        List activeTaskList = Utils.mkList((Object[])new TaskId[]{this.task0, this.task3});
        HashMap<TaskId, Set> activeTasks = new HashMap<TaskId, Set>();
        HashMap<TaskId, Set> standbyTasks = new HashMap<TaskId, Set>();
        activeTasks.put(this.task0, Utils.mkSet((Object[])new TopicPartition[]{this.t1p0}));
        activeTasks.put(this.task3, Utils.mkSet((Object[])new TopicPartition[]{t2p3}));
        standbyTasks.put(this.task1, Utils.mkSet((Object[])new TopicPartition[]{this.t1p0}));
        standbyTasks.put(this.task2, Utils.mkSet((Object[])new TopicPartition[]{this.t2p0}));
        AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, new HashMap());
        PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList((Object[])new TopicPartition[]{this.t1p0, t2p3}), info.encode());
        this.partitionAssignor.onAssignment(assignment);
        Assert.assertEquals(activeTasks, (Object)this.partitionAssignor.activeTasks());
        Assert.assertEquals(standbyTasks, (Object)this.partitionAssignor.standbyTasks());
    }

    @Test
    public void testAssignWithInternalTopics() {
        String applicationId = "test";
        this.builder.setApplicationId(applicationId);
        this.builder.addInternalTopic("topicX");
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addSink("sink1", "topicX", null, null, null, new String[]{"processor1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topicX"});
        this.builder.addProcessor("processor2", new MockProcessorSupplier(), new String[]{"source2"});
        List topics = Utils.mkList((Object[])new String[]{"topic1", "test-topicX"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1, this.task2});
        UUID uuid1 = UUID.randomUUID();
        this.mockThreadDataProvider(Collections.emptySet(), Collections.emptySet(), uuid1, (PartitionGrouper)this.defaultPartitionGrouper, this.builder);
        this.configurePartitionAssignor(0, null);
        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(this.config, this.mockClientSupplier.restoreConsumer);
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)internalTopicManager);
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        Set emptyTasks = Collections.emptySet();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, "localhost:8080").encode()));
        this.partitionAssignor.assign(this.metadata, subscriptions);
        Assert.assertEquals((long)1L, (long)internalTopicManager.readyTopics.size());
        Assert.assertEquals((long)allTasks.size(), (long)internalTopicManager.readyTopics.get("test-topicX").intValue());
    }

    @Test
    public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() {
        String applicationId = "test";
        this.builder.setApplicationId(applicationId);
        this.builder.addInternalTopic("topicX");
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addSink("sink1", "topicX", null, null, null, new String[]{"processor1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topicX"});
        this.builder.addInternalTopic("topicZ");
        this.builder.addProcessor("processor2", new MockProcessorSupplier(), new String[]{"source2"});
        this.builder.addSink("sink2", "topicZ", null, null, null, new String[]{"processor2"});
        this.builder.addSource(null, "source3", null, null, null, new String[]{"topicZ"});
        List topics = Utils.mkList((Object[])new String[]{"topic1", "test-topicX", "test-topicZ"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{this.task0, this.task1, this.task2});
        UUID uuid1 = UUID.randomUUID();
        this.mockThreadDataProvider(Collections.emptySet(), Collections.emptySet(), uuid1, (PartitionGrouper)this.defaultPartitionGrouper, this.builder);
        this.configurePartitionAssignor(0, null);
        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(this.config, this.mockClientSupplier.restoreConsumer);
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)internalTopicManager);
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        Set emptyTasks = Collections.emptySet();
        subscriptions.put("consumer10", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, "localhost:8080").encode()));
        this.partitionAssignor.assign(this.metadata, subscriptions);
        Assert.assertEquals((long)2L, (long)internalTopicManager.readyTopics.size());
        Assert.assertEquals((long)allTasks.size(), (long)internalTopicManager.readyTopics.get("test-topicZ").intValue());
    }

    @Test
    public void shouldAddUserDefinedEndPointToSubscription() {
        this.builder.setApplicationId("application-id");
        this.builder.addSource(null, "source", null, null, null, new String[]{"input"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        this.builder.addSink("sink", "output", null, null, null, new String[]{"processor"});
        UUID uuid1 = UUID.randomUUID();
        this.mockThreadDataProvider(Collections.emptySet(), Collections.emptySet(), uuid1, (PartitionGrouper)this.defaultPartitionGrouper, this.builder);
        this.configurePartitionAssignor(0, "localhost:8080");
        PartitionAssignor.Subscription subscription = this.partitionAssignor.subscription(Utils.mkSet((Object[])new String[]{"input"}));
        SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode((ByteBuffer)subscription.userData());
        Assert.assertEquals((Object)"localhost:8080", (Object)subscriptionInfo.userEndPoint);
    }

    @Test
    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() {
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        Set<TaskId> emptyTasks = Collections.emptySet();
        subscriptions.put("consumer1", new PartitionAssignor.Subscription(Collections.singletonList("topic1"), new SubscriptionInfo(1, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()));
        subscriptions.put("consumer2", new PartitionAssignor.Subscription(Collections.singletonList("topic1"), new SubscriptionInfo(2, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()));
        this.mockThreadDataProvider(emptyTasks, emptyTasks, UUID.randomUUID(), (PartitionGrouper)this.defaultPartitionGrouper, this.builder);
        this.configurePartitionAssignor(0, null);
        Map assignment = this.partitionAssignor.assign(this.metadata, subscriptions);
        Assert.assertEquals((long)2L, (long)assignment.size());
        Assert.assertEquals((long)1L, (long)AssignmentInfo.decode((ByteBuffer)((PartitionAssignor.Assignment)assignment.get((Object)"consumer1")).userData()).version);
        Assert.assertEquals((long)1L, (long)AssignmentInfo.decode((ByteBuffer)((PartitionAssignor.Assignment)assignment.get((Object)"consumer2")).userData()).version);
    }

    @Test
    public void shouldDownGradeSubscription() {
        Set<TaskId> emptyTasks = Collections.emptySet();
        this.mockThreadDataProvider(emptyTasks, emptyTasks, UUID.randomUUID(), (PartitionGrouper)this.defaultPartitionGrouper, this.builder);
        this.configurationMap.put("upgrade.from", "0.10.0");
        this.configurePartitionAssignor(0, null);
        PartitionAssignor.Subscription subscription = this.partitionAssignor.subscription(Utils.mkSet((Object[])new String[]{"topic1"}));
        Assert.assertEquals((long)1L, (long)SubscriptionInfo.decode((ByteBuffer)subscription.userData()).version);
    }

    @Test
    public void shouldMapUserEndPointToTopicPartitions() {
        String applicationId = "application-id";
        this.builder.setApplicationId("application-id");
        this.builder.addSource(null, "source", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        this.builder.addSink("sink", "output", null, null, null, new String[]{"processor"});
        List topics = Utils.mkList((Object[])new String[]{"topic1"});
        UUID uuid1 = UUID.randomUUID();
        this.mockThreadDataProvider(Collections.emptySet(), Collections.emptySet(), uuid1, (PartitionGrouper)this.defaultPartitionGrouper, this.builder);
        this.configurePartitionAssignor(0, "localhost:8080");
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)new MockInternalTopicManager(this.config, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        Set emptyTasks = Collections.emptySet();
        subscriptions.put("consumer1", new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, "localhost:8080").encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, subscriptions);
        PartitionAssignor.Assignment consumerAssignment = (PartitionAssignor.Assignment)assignments.get("consumer1");
        AssignmentInfo assignmentInfo = AssignmentInfo.decode((ByteBuffer)consumerAssignment.userData());
        Set topicPartitions = (Set)assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 8080));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2)}), (Object)topicPartitions);
    }

    @Test
    public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() {
        String myEndPoint = "localhost";
        String applicationId = "application-id";
        this.builder.setApplicationId("application-id");
        this.mockThreadDataProvider(Collections.emptySet(), Collections.emptySet(), UUID.randomUUID(), (PartitionGrouper)this.defaultPartitionGrouper, this.builder);
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)new MockInternalTopicManager(this.config, this.mockClientSupplier.restoreConsumer));
        try {
            this.configurePartitionAssignor(0, "localhost");
            Assert.fail((String)"expected to an exception due to invalid config");
        }
        catch (ConfigException configException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() {
        String myEndPoint = "localhost:j87yhk";
        String applicationId = "application-id";
        this.builder.setApplicationId("application-id");
        try {
            this.configurePartitionAssignor(0, "localhost:j87yhk");
            Assert.fail((String)"expected to an exception due to invalid config");
        }
        catch (ConfigException configException) {
            // empty catch block
        }
    }

    @Test
    public void shouldExposeHostStateToTopicPartitionsOnAssignment() {
        List<TopicPartition> topic = Collections.singletonList(new TopicPartition("topic", 0));
        Map<HostInfo, Set<TopicPartition>> hostState = Collections.singletonMap(new HostInfo("localhost", 80), Collections.singleton(new TopicPartition("topic", 0)));
        AssignmentInfo assignmentInfo = new AssignmentInfo(Collections.singletonList(new TaskId(0, 0)), Collections.emptyMap(), hostState);
        this.mockThreadDataProvider(Collections.emptySet(), Collections.emptySet(), UUID.randomUUID(), (PartitionGrouper)this.defaultPartitionGrouper, this.builder);
        this.configurePartitionAssignor(0, null);
        this.partitionAssignor.onAssignment(new PartitionAssignor.Assignment(topic, assignmentInfo.encode()));
        Assert.assertEquals(hostState, (Object)this.partitionAssignor.getPartitionsByHostState());
    }

    @Test
    public void shouldSetClusterMetadataOnAssignment() {
        List<TopicPartition> topic = Collections.singletonList(new TopicPartition("topic", 0));
        Map<HostInfo, Set<TopicPartition>> hostState = Collections.singletonMap(new HostInfo("localhost", 80), Collections.singleton(new TopicPartition("topic", 0)));
        AssignmentInfo assignmentInfo = new AssignmentInfo(Collections.singletonList(new TaskId(0, 0)), Collections.emptyMap(), hostState);
        this.mockThreadDataProvider(Collections.emptySet(), Collections.emptySet(), UUID.randomUUID(), (PartitionGrouper)this.defaultPartitionGrouper, this.builder);
        this.configurePartitionAssignor(0, null);
        this.partitionAssignor.onAssignment(new PartitionAssignor.Assignment(topic, assignmentInfo.encode()));
        Cluster cluster = this.partitionAssignor.clusterMetadata();
        List partitionInfos = cluster.partitionsForTopic("topic");
        PartitionInfo partitionInfo = (PartitionInfo)partitionInfos.get(0);
        Assert.assertEquals((long)1L, (long)partitionInfos.size());
        Assert.assertEquals((Object)"topic", (Object)partitionInfo.topic());
        Assert.assertEquals((long)0L, (long)partitionInfo.partition());
    }

    @Test
    public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() {
        Cluster cluster = this.partitionAssignor.clusterMetadata();
        Assert.assertNotNull((Object)cluster);
    }

    @Test
    public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() {
        String applicationId = "application-id";
        StreamsBuilder builder = new StreamsBuilder();
        InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder);
        internalTopologyBuilder.setApplicationId("application-id");
        KStream stream1 = builder.stream("topic1").selectKey((KeyValueMapper)new KeyValueMapper<Object, Object, Object>(){

            public Object apply(Object key, Object value) {
                return null;
            }
        }).groupByKey().count(Materialized.as((String)"count")).toStream().map((KeyValueMapper)new KeyValueMapper<Object, Long, KeyValue<Object, Object>>(){

            public KeyValue<Object, Object> apply(Object key, Long value) {
                return null;
            }
        });
        builder.stream("unknownTopic").selectKey((KeyValueMapper)new KeyValueMapper<Object, Object, Object>(){

            public Object apply(Object key, Object value) {
                return null;
            }
        }).join(stream1, new ValueJoiner(){

            public Object apply(Object value1, Object value2) {
                return null;
            }
        }, JoinWindows.of((long)0L));
        UUID uuid = UUID.randomUUID();
        String client = "client1";
        this.mockThreadDataProvider(Collections.emptySet(), Collections.emptySet(), UUID.randomUUID(), (PartitionGrouper)this.defaultPartitionGrouper, internalTopologyBuilder);
        this.configurePartitionAssignor(0, null);
        MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(this.config, this.mockClientSupplier.restoreConsumer);
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)mockInternalTopicManager);
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        Set emptyTasks = Collections.emptySet();
        subscriptions.put("client1", new PartitionAssignor.Subscription(Collections.singletonList("unknownTopic"), new SubscriptionInfo(uuid, emptyTasks, emptyTasks, "localhost:8080").encode()));
        Map assignment = this.partitionAssignor.assign(this.metadata, subscriptions);
        HashMap<String, Integer> expectedCreatedInternalTopics = new HashMap<String, Integer>();
        expectedCreatedInternalTopics.put("application-id-count-repartition", 3);
        expectedCreatedInternalTopics.put("application-id-count-changelog", 3);
        Assert.assertThat(mockInternalTopicManager.readyTopics, (Matcher)CoreMatchers.equalTo(expectedCreatedInternalTopics));
        List<TopicPartition> expectedAssignment = Arrays.asList(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2), new TopicPartition("application-id-count-repartition", 0), new TopicPartition("application-id-count-repartition", 1), new TopicPartition("application-id-count-repartition", 2));
        Assert.assertThat(new HashSet(((PartitionAssignor.Assignment)assignment.get("client1")).partitions()), (Matcher)CoreMatchers.equalTo(new HashSet<TopicPartition>(expectedAssignment)));
    }

    @Test
    public void shouldUpdatePartitionHostInfoMapOnAssignment() {
        TopicPartition partitionOne = new TopicPartition("topic", 1);
        TopicPartition partitionTwo = new TopicPartition("topic", 2);
        Map<HostInfo, Set<TopicPartition>> firstHostState = Collections.singletonMap(new HostInfo("localhost", 9090), Utils.mkSet((Object[])new TopicPartition[]{partitionOne, partitionTwo}));
        HashMap<HostInfo, Set<TopicPartition>> secondHostState = new HashMap<HostInfo, Set<TopicPartition>>();
        secondHostState.put(new HostInfo("localhost", 9090), Utils.mkSet((Object[])new TopicPartition[]{partitionOne}));
        secondHostState.put(new HostInfo("other", 9090), Utils.mkSet((Object[])new TopicPartition[]{partitionTwo}));
        this.mockThreadDataProvider(Collections.emptySet(), Collections.emptySet(), UUID.randomUUID(), (PartitionGrouper)this.defaultPartitionGrouper, this.builder);
        this.configurePartitionAssignor(0, null);
        this.partitionAssignor.onAssignment(this.createAssignment(firstHostState));
        Assert.assertEquals(firstHostState, (Object)this.partitionAssignor.getPartitionsByHostState());
        this.partitionAssignor.onAssignment(this.createAssignment(secondHostState));
        Assert.assertEquals(secondHostState, (Object)this.partitionAssignor.getPartitionsByHostState());
    }

    @Test
    public void shouldUpdateClusterMetadataOnAssignment() {
        TopicPartition topicOne = new TopicPartition("topic", 1);
        TopicPartition topicTwo = new TopicPartition("topic2", 2);
        Map<HostInfo, Set<TopicPartition>> firstHostState = Collections.singletonMap(new HostInfo("localhost", 9090), Utils.mkSet((Object[])new TopicPartition[]{topicOne}));
        Map<HostInfo, Set<TopicPartition>> secondHostState = Collections.singletonMap(new HostInfo("localhost", 9090), Utils.mkSet((Object[])new TopicPartition[]{topicOne, topicTwo}));
        this.mockThreadDataProvider(Collections.emptySet(), Collections.emptySet(), UUID.randomUUID(), (PartitionGrouper)this.defaultPartitionGrouper, this.builder);
        this.configurePartitionAssignor(0, null);
        this.partitionAssignor.onAssignment(this.createAssignment(firstHostState));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"topic"}), (Object)this.partitionAssignor.clusterMetadata().topics());
        this.partitionAssignor.onAssignment(this.createAssignment(secondHostState));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new String[]{"topic", "topic2"}), (Object)this.partitionAssignor.clusterMetadata().topics());
    }

    @Test
    public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() {
        String applicationId = "appId";
        StreamsBuilder builder = new StreamsBuilder();
        InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder);
        internalTopologyBuilder.setApplicationId("appId");
        builder.stream("topic1").groupByKey().count();
        UUID uuid = UUID.randomUUID();
        this.mockThreadDataProvider(Collections.emptySet(), Collections.emptySet(), uuid, (PartitionGrouper)this.defaultPartitionGrouper, internalTopologyBuilder);
        this.configurePartitionAssignor(1, "localhost:8080");
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)new MockInternalTopicManager(this.config, this.mockClientSupplier.restoreConsumer));
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        Set emptyTasks = Collections.emptySet();
        subscriptions.put("consumer1", new PartitionAssignor.Subscription(Collections.singletonList("topic1"), new SubscriptionInfo(uuid, emptyTasks, emptyTasks, "localhost:8080").encode()));
        subscriptions.put("consumer2", new PartitionAssignor.Subscription(Collections.singletonList("topic1"), new SubscriptionInfo(UUID.randomUUID(), emptyTasks, emptyTasks, "other:9090").encode()));
        Set allPartitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t1p1, this.t1p2});
        Map assign = this.partitionAssignor.assign(this.metadata, subscriptions);
        PartitionAssignor.Assignment consumer1Assignment = (PartitionAssignor.Assignment)assign.get("consumer1");
        AssignmentInfo assignmentInfo = AssignmentInfo.decode((ByteBuffer)consumer1Assignment.userData());
        Set consumer1partitions = (Set)assignmentInfo.partitionsByHost.get(new HostInfo("localhost", 8080));
        Set consumer2Partitions = (Set)assignmentInfo.partitionsByHost.get(new HostInfo("other", 9090));
        HashSet allAssignedPartitions = new HashSet(consumer1partitions);
        allAssignedPartitions.addAll(consumer2Partitions);
        Assert.assertThat((Object)consumer1partitions, (Matcher)CoreMatchers.not((Object)allPartitions));
        Assert.assertThat((Object)consumer2Partitions, (Matcher)CoreMatchers.not((Object)allPartitions));
        Assert.assertThat(allAssignedPartitions, (Matcher)CoreMatchers.equalTo((Object)allPartitions));
    }

    @Test(expected=KafkaException.class)
    public void shouldThrowKafkaExceptionIfStreamThreadNotConfigured() {
        this.partitionAssignor.configure(Collections.singletonMap("num.standby.replicas", 1));
    }

    @Test(expected=KafkaException.class)
    public void shouldThrowKafkaExceptionIfStreamThreadConfigIsNotThreadDataProviderInstance() {
        HashMap<String, Object> config = new HashMap<String, Object>();
        config.put("num.standby.replicas", 1);
        config.put("__stream.thread.instance__", "i am not a stream thread");
        this.partitionAssignor.configure(config);
    }

    private PartitionAssignor.Assignment createAssignment(Map<HostInfo, Set<TopicPartition>> firstHostState) {
        AssignmentInfo info = new AssignmentInfo(Collections.emptyList(), Collections.emptyMap(), firstHostState);
        return new PartitionAssignor.Assignment(Collections.emptyList(), info.encode());
    }

    private AssignmentInfo checkAssignment(Set<String> expectedTopics, PartitionAssignor.Assignment assignment) {
        AssignmentInfo info = AssignmentInfo.decode((ByteBuffer)assignment.userData());
        Assert.assertEquals((long)assignment.partitions().size(), (long)info.activeTasks.size());
        ArrayList<TaskId> activeTasks = new ArrayList<TaskId>();
        HashSet<String> activeTopics = new HashSet<String>();
        for (TopicPartition partition : assignment.partitions()) {
            activeTasks.add(new TaskId(0, partition.partition()));
            activeTopics.add(partition.topic());
        }
        Assert.assertEquals(activeTasks, (Object)info.activeTasks);
        Assert.assertEquals(expectedTopics, activeTopics);
        HashSet<String> standbyTopics = new HashSet<String>();
        for (Map.Entry entry : info.standbyTasks.entrySet()) {
            TaskId id = (TaskId)entry.getKey();
            Set partitions = (Set)entry.getValue();
            for (TopicPartition partition : partitions) {
                Assert.assertEquals((long)id.partition, (long)partition.partition());
                standbyTopics.add(partition.topic());
            }
        }
        if (info.standbyTasks.size() > 0) {
            Assert.assertEquals(expectedTopics, standbyTopics);
        }
        return info;
    }
}

