/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.SingleGroupPartitionGrouperStub;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.MockProcessorSupplier;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class StreamsPartitionAssignorTest {
    private static final String CONSUMER_1 = "consumer1";
    private static final String CONSUMER_2 = "consumer2";
    private static final String CONSUMER_3 = "consumer3";
    private static final String CONSUMER_4 = "consumer4";
    private final Set<String> allTopics = Utils.mkSet((Object[])new String[]{"topic1", "topic2"});
    private final TopicPartition t1p0 = new TopicPartition("topic1", 0);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t1p2 = new TopicPartition("topic1", 2);
    private final TopicPartition t1p3 = new TopicPartition("topic1", 3);
    private final TopicPartition t2p0 = new TopicPartition("topic2", 0);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final TopicPartition t2p2 = new TopicPartition("topic2", 2);
    private final TopicPartition t2p3 = new TopicPartition("topic2", 3);
    private final TopicPartition t3p0 = new TopicPartition("topic3", 0);
    private final TopicPartition t3p1 = new TopicPartition("topic3", 1);
    private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
    private final TopicPartition t3p3 = new TopicPartition("topic3", 3);
    private final List<PartitionInfo> infos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]));
    private final SubscriptionInfo defaultSubscriptionInfo = StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_1, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS);
    private final Cluster metadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), this.infos, Collections.emptySet(), Collections.emptySet());
    private final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
    private static final String USER_END_POINT = "localhost:8080";
    private static final String OTHER_END_POINT = "other:9090";
    private static final String APPLICATION_ID = "stream-partition-assignor-test";
    private TaskManager taskManager;
    private Admin adminClient;
    private InternalTopologyBuilder builder = new InternalTopologyBuilder();
    private StreamsMetadataState streamsMetadataState = (StreamsMetadataState)EasyMock.createNiceMock(StreamsMetadataState.class);
    private final Map<String, ConsumerPartitionAssignor.Subscription> subscriptions = new HashMap<String, ConsumerPartitionAssignor.Subscription>();
    private final Class<? extends TaskAssignor> taskAssignor;
    private final AtomicInteger assignmentError = new AtomicInteger();
    private final AtomicLong nextScheduledRebalanceMs = new AtomicLong(Long.MAX_VALUE);
    private final MockTime time = new MockTime();

    private Map<String, Object> configProps() {
        HashMap<String, Object> configurationMap = new HashMap<String, Object>();
        configurationMap.put("application.id", APPLICATION_ID);
        configurationMap.put("bootstrap.servers", USER_END_POINT);
        configurationMap.put("__task.manager.instance__", this.taskManager);
        configurationMap.put("__streams.metadata.state.instance__", this.streamsMetadataState);
        configurationMap.put("__streams.admin.client.instance__", this.adminClient);
        configurationMap.put("__assignment.error.code__", this.assignmentError);
        configurationMap.put("__next.probing.rebalance.ms__", this.nextScheduledRebalanceMs);
        configurationMap.put("__time__", this.time);
        configurationMap.put("internal.task.assignor.class", this.taskAssignor.getName());
        return configurationMap;
    }

    private MockInternalTopicManager configureDefault() {
        this.createDefaultMockTaskManager();
        return this.configureDefaultPartitionAssignor();
    }

    private MockInternalTopicManager configureDefaultPartitionAssignor() {
        return this.configurePartitionAssignorWith(Collections.emptyMap());
    }

    private MockInternalTopicManager configurePartitionAssignorWith(Map<String, Object> props) {
        Map<String, Object> configMap = this.configProps();
        configMap.putAll(props);
        this.partitionAssignor.configure(configMap);
        EasyMock.replay((Object[])new Object[]{this.taskManager, this.adminClient});
        return this.overwriteInternalTopicManagerWithMock(false);
    }

    private void createDefaultMockTaskManager() {
        this.createMockTaskManager(AssignmentTestUtils.EMPTY_TASK_OFFSET_SUMS, AssignmentTestUtils.UUID_1);
    }

    private void createMockTaskManager(Set<TaskId> activeTasks, Set<TaskId> standbyTasks) {
        this.createMockTaskManager(StreamsPartitionAssignorTest.getTaskOffsetSums(activeTasks, standbyTasks), AssignmentTestUtils.UUID_1);
    }

    private void createMockTaskManager(Map<TaskId, Long> taskOffsetSums, UUID processId) {
        this.taskManager = (TaskManager)EasyMock.createNiceMock(TaskManager.class);
        EasyMock.expect((Object)this.taskManager.builder()).andReturn((Object)this.builder).anyTimes();
        EasyMock.expect((Object)this.taskManager.getTaskOffsetSums()).andReturn(taskOffsetSums).anyTimes();
        EasyMock.expect((Object)this.taskManager.processId()).andReturn((Object)processId).anyTimes();
        this.builder.setApplicationId(APPLICATION_ID);
        this.builder.buildTopology();
    }

    private void createMockAdminClient(Map<TopicPartition, Long> changelogEndOffsets) {
        this.adminClient = (Admin)EasyMock.createMock(AdminClient.class);
        ListOffsetsResult result = (ListOffsetsResult)EasyMock.createNiceMock(ListOffsetsResult.class);
        KafkaFutureImpl allFuture = new KafkaFutureImpl();
        allFuture.complete(changelogEndOffsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, t -> {
            ListOffsetsResult.ListOffsetsResultInfo info = (ListOffsetsResult.ListOffsetsResultInfo)EasyMock.createNiceMock(ListOffsetsResult.ListOffsetsResultInfo.class);
            EasyMock.expect((Object)info.offset()).andStubReturn(t.getValue());
            EasyMock.replay((Object[])new Object[]{info});
            return info;
        })));
        EasyMock.expect((Object)this.adminClient.listOffsets((Map)EasyMock.anyObject())).andStubReturn((Object)result);
        EasyMock.expect((Object)result.all()).andReturn((Object)allFuture);
        EasyMock.replay((Object[])new Object[]{result});
    }

    private MockInternalTopicManager overwriteInternalTopicManagerWithMock(boolean mockCreateInternalTopics) {
        MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(new StreamsConfig(this.configProps()), this.mockClientSupplier.restoreConsumer, mockCreateInternalTopics);
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)mockInternalTopicManager);
        return mockInternalTopicManager;
    }

    @Parameterized.Parameters(name="task assignor = {0}")
    public static Collection<Object[]> parameters() {
        return Arrays.asList({HighAvailabilityTaskAssignor.class}, {StickyTaskAssignor.class}, {FallbackPriorTaskAssignor.class});
    }

    public StreamsPartitionAssignorTest(Class<? extends TaskAssignor> taskAssignor) {
        this.taskAssignor = taskAssignor;
        this.createMockAdminClient(AssignmentTestUtils.EMPTY_CHANGELOG_END_OFFSETS);
    }

    @Test
    public void shouldUseEagerRebalancingProtocol() {
        this.createDefaultMockTaskManager();
        this.configurePartitionAssignorWith(Collections.singletonMap("upgrade.from", "2.3"));
        Assert.assertEquals((long)1L, (long)this.partitionAssignor.supportedProtocols().size());
        Assert.assertTrue((boolean)this.partitionAssignor.supportedProtocols().contains(ConsumerPartitionAssignor.RebalanceProtocol.EAGER));
        Assert.assertFalse((boolean)this.partitionAssignor.supportedProtocols().contains(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE));
    }

    @Test
    public void shouldUseCooperativeRebalancingProtocol() {
        this.configureDefault();
        Assert.assertEquals((long)2L, (long)this.partitionAssignor.supportedProtocols().size());
        Assert.assertTrue((boolean)this.partitionAssignor.supportedProtocols().contains(ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE));
    }

    @Test
    public void shouldProduceStickyAndBalancedAssignmentWhenNothingChanges() {
        List<TaskId> allTasks = Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_1_3);
        Map previousAssignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)CONSUMER_1, Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3)), Utils.mkEntry((Object)CONSUMER_2, Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0)), Utils.mkEntry((Object)CONSUMER_3, Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2))});
        ClientState state = new ClientState();
        SortedSet consumers = Utils.mkSortedSet((Comparable[])new String[]{CONSUMER_1, CONSUMER_2, CONSUMER_3});
        state.addPreviousTasksAndOffsetSums(CONSUMER_1, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3), AssignmentTestUtils.EMPTY_TASKS));
        state.addPreviousTasksAndOffsetSums(CONSUMER_2, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0), AssignmentTestUtils.EMPTY_TASKS));
        state.addPreviousTasksAndOffsetSums(CONSUMER_3, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2), AssignmentTestUtils.EMPTY_TASKS));
        state.initializePrevTasks(Collections.emptyMap());
        state.computeTaskLags(AssignmentTestUtils.UUID_1, StreamsPartitionAssignorTest.getTaskEndOffsetSums(allTasks));
        StreamsPartitionAssignorTest.assertEquivalentAssignment(previousAssignment, StreamsPartitionAssignor.assignTasksToThreads(allTasks, Collections.emptySet(), (SortedSet)consumers, (ClientState)state));
    }

    @Test
    public void shouldProduceStickyAndBalancedAssignmentWhenNewTasksAreAdded() {
        ArrayList<TaskId> allTasks = new ArrayList<TaskId>(Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_1_3));
        Map previousAssignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)CONSUMER_1, new ArrayList<TaskId>(Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3))), Utils.mkEntry((Object)CONSUMER_2, new ArrayList<TaskId>(Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0))), Utils.mkEntry((Object)CONSUMER_3, new ArrayList<TaskId>(Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2)))});
        ClientState state = new ClientState();
        SortedSet consumers = Utils.mkSortedSet((Comparable[])new String[]{CONSUMER_1, CONSUMER_2, CONSUMER_3});
        state.addPreviousTasksAndOffsetSums(CONSUMER_1, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3), AssignmentTestUtils.EMPTY_TASKS));
        state.addPreviousTasksAndOffsetSums(CONSUMER_2, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0), AssignmentTestUtils.EMPTY_TASKS));
        state.addPreviousTasksAndOffsetSums(CONSUMER_3, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2), AssignmentTestUtils.EMPTY_TASKS));
        state.initializePrevTasks(Collections.emptyMap());
        state.computeTaskLags(AssignmentTestUtils.UUID_1, StreamsPartitionAssignorTest.getTaskEndOffsetSums(allTasks));
        TaskId newTask = AssignmentTestUtils.TASK_2_0;
        allTasks.add(newTask);
        state.assignActiveTasks(allTasks);
        Map newAssignment = StreamsPartitionAssignor.assignTasksToThreads(allTasks, Collections.emptySet(), (SortedSet)consumers, (ClientState)state);
        ((List)previousAssignment.get(CONSUMER_2)).add(newTask);
        StreamsPartitionAssignorTest.assertEquivalentAssignment(previousAssignment, newAssignment);
    }

    @Test
    public void shouldProduceMaximallyStickyAssignmentWhenMemberLeaves() {
        List<TaskId> allTasks = Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_1_3);
        Map previousAssignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)CONSUMER_1, Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3)), Utils.mkEntry((Object)CONSUMER_2, Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0)), Utils.mkEntry((Object)CONSUMER_3, Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2))});
        ClientState state = new ClientState();
        SortedSet consumers = Utils.mkSortedSet((Comparable[])new String[]{CONSUMER_1, CONSUMER_2, CONSUMER_3});
        state.addPreviousTasksAndOffsetSums(CONSUMER_1, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3), AssignmentTestUtils.EMPTY_TASKS));
        state.addPreviousTasksAndOffsetSums(CONSUMER_2, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0), AssignmentTestUtils.EMPTY_TASKS));
        state.addPreviousTasksAndOffsetSums(CONSUMER_3, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2), AssignmentTestUtils.EMPTY_TASKS));
        state.initializePrevTasks(Collections.emptyMap());
        state.computeTaskLags(AssignmentTestUtils.UUID_1, StreamsPartitionAssignorTest.getTaskEndOffsetSums(allTasks));
        consumers.remove(CONSUMER_3);
        Map assignment = StreamsPartitionAssignor.assignTasksToThreads(allTasks, Collections.emptySet(), (SortedSet)consumers, (ClientState)state);
        Assert.assertTrue((boolean)((List)assignment.get(CONSUMER_1)).containsAll((Collection)previousAssignment.get(CONSUMER_1)));
        Assert.assertTrue((boolean)((List)assignment.get(CONSUMER_2)).containsAll((Collection)previousAssignment.get(CONSUMER_2)));
        MatcherAssert.assertThat((Object)((List)assignment.get(CONSUMER_1)).size(), (Matcher)CoreMatchers.equalTo((Object)4));
        MatcherAssert.assertThat((Object)((List)assignment.get(CONSUMER_2)).size(), (Matcher)CoreMatchers.equalTo((Object)4));
    }

    @Test
    public void shouldProduceStickyEnoughAssignmentWhenNewMemberJoins() {
        List<TaskId> allTasks = Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_1_3);
        Map previousAssignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)CONSUMER_1, Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3)), Utils.mkEntry((Object)CONSUMER_2, Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0)), Utils.mkEntry((Object)CONSUMER_3, Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2))});
        ClientState state = new ClientState();
        SortedSet consumers = Utils.mkSortedSet((Comparable[])new String[]{CONSUMER_1, CONSUMER_2, CONSUMER_3});
        state.addPreviousTasksAndOffsetSums(CONSUMER_1, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3), AssignmentTestUtils.EMPTY_TASKS));
        state.addPreviousTasksAndOffsetSums(CONSUMER_2, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0), AssignmentTestUtils.EMPTY_TASKS));
        state.addPreviousTasksAndOffsetSums(CONSUMER_3, StreamsPartitionAssignorTest.getTaskOffsetSums(Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_2), AssignmentTestUtils.EMPTY_TASKS));
        consumers.add(CONSUMER_4);
        state.addPreviousTasksAndOffsetSums(CONSUMER_4, StreamsPartitionAssignorTest.getTaskOffsetSums(AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS));
        state.initializePrevTasks(Collections.emptyMap());
        state.computeTaskLags(AssignmentTestUtils.UUID_1, StreamsPartitionAssignorTest.getTaskEndOffsetSums(allTasks));
        Map assignment = StreamsPartitionAssignor.assignTasksToThreads(allTasks, Collections.emptySet(), (SortedSet)consumers, (ClientState)state);
        Assert.assertTrue((boolean)((List)previousAssignment.get(CONSUMER_1)).containsAll((Collection)assignment.get(CONSUMER_1)));
        Assert.assertTrue((boolean)((List)previousAssignment.get(CONSUMER_3)).containsAll((Collection)assignment.get(CONSUMER_3)));
        Assert.assertTrue((boolean)((List)assignment.get(CONSUMER_2)).containsAll((Collection)previousAssignment.get(CONSUMER_2)));
        MatcherAssert.assertThat((Object)((List)assignment.get(CONSUMER_1)).size(), (Matcher)CoreMatchers.equalTo((Object)2));
        MatcherAssert.assertThat((Object)((List)assignment.get(CONSUMER_2)).size(), (Matcher)CoreMatchers.equalTo((Object)2));
        MatcherAssert.assertThat((Object)((List)assignment.get(CONSUMER_3)).size(), (Matcher)CoreMatchers.equalTo((Object)2));
        MatcherAssert.assertThat((Object)((List)assignment.get(CONSUMER_4)).size(), (Matcher)CoreMatchers.equalTo((Object)2));
    }

    @Test
    public void shouldInterleaveTasksByGroupIdDuringNewAssignment() {
        List<TaskId> allTasks = Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2, AssignmentTestUtils.TASK_2_0, AssignmentTestUtils.TASK_2_1);
        Map assignment = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)CONSUMER_1, new ArrayList<TaskId>(Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_2))), Utils.mkEntry((Object)CONSUMER_2, new ArrayList<TaskId>(Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_2_0))), Utils.mkEntry((Object)CONSUMER_3, new ArrayList<TaskId>(Arrays.asList(AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_2_1)))});
        ClientState state = new ClientState();
        SortedSet consumers = Utils.mkSortedSet((Comparable[])new String[]{CONSUMER_1, CONSUMER_2, CONSUMER_3});
        state.addPreviousTasksAndOffsetSums(CONSUMER_1, Collections.emptyMap());
        state.addPreviousTasksAndOffsetSums(CONSUMER_2, Collections.emptyMap());
        state.addPreviousTasksAndOffsetSums(CONSUMER_3, Collections.emptyMap());
        Collections.shuffle(allTasks);
        Map interleavedTaskIds = StreamsPartitionAssignor.assignTasksToThreads(allTasks, Collections.emptySet(), (SortedSet)consumers, (ClientState)state);
        MatcherAssert.assertThat((Object)interleavedTaskIds, (Matcher)CoreMatchers.equalTo((Object)assignment));
    }

    @Test
    public void testEagerSubscription() {
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        Set prevTasks = Utils.mkSet((Object[])new TaskId[]{new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1)});
        Set standbyTasks = Utils.mkSet((Object[])new TaskId[]{new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)});
        this.createMockTaskManager(prevTasks, standbyTasks);
        this.configurePartitionAssignorWith(Collections.singletonMap("upgrade.from", "2.3"));
        MatcherAssert.assertThat((Object)this.partitionAssignor.rebalanceProtocol(), (Matcher)CoreMatchers.equalTo((Object)ConsumerPartitionAssignor.RebalanceProtocol.EAGER));
        Set topics = Utils.mkSet((Object[])new String[]{"topic1", "topic2"});
        ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList(topics), this.partitionAssignor.subscriptionUserData(topics));
        Collections.sort(subscription.topics());
        Assert.assertEquals(Arrays.asList("topic1", "topic2"), (Object)subscription.topics());
        SubscriptionInfo info = StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_1, prevTasks, standbyTasks);
        Assert.assertEquals((Object)info, (Object)SubscriptionInfo.decode((ByteBuffer)subscription.userData()));
    }

    @Test
    public void testCooperativeSubscription() {
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        Set prevTasks = Utils.mkSet((Object[])new TaskId[]{new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1)});
        Set standbyTasks = Utils.mkSet((Object[])new TaskId[]{new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1), new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)});
        this.createMockTaskManager(prevTasks, standbyTasks);
        this.configureDefaultPartitionAssignor();
        Set topics = Utils.mkSet((Object[])new String[]{"topic1", "topic2"});
        ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList(topics), this.partitionAssignor.subscriptionUserData(topics));
        Collections.sort(subscription.topics());
        Assert.assertEquals(Arrays.asList("topic1", "topic2"), (Object)subscription.topics());
        SubscriptionInfo info = StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_1, prevTasks, standbyTasks);
        Assert.assertEquals((Object)info, (Object)SubscriptionInfo.decode((ByteBuffer)subscription.userData()));
    }

    @Test
    public void testAssignBasic() {
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store", false), new String[]{"processor"});
        List<String> topics = Arrays.asList("topic1", "topic2");
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        Set prevTasks10 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0});
        Set prevTasks11 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_1});
        Set prevTasks20 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_2});
        Set<TaskId> standbyTasks10 = AssignmentTestUtils.EMPTY_TASKS;
        Set standbyTasks11 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_2});
        Set standbyTasks20 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0});
        this.createMockTaskManager(prevTasks10, standbyTasks10);
        this.createMockAdminClient(StreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Collections.singletonList("stream-partition-assignor-test-store-changelog"), Collections.singletonList(3)));
        this.configureDefaultPartitionAssignor();
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_1, prevTasks10, standbyTasks10).encode()));
        this.subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription(topics, StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_1, prevTasks11, standbyTasks11).encode()));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(topics, StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_2, prevTasks20, standbyTasks20).encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assert.assertEquals((Object)Utils.mkSet((Object[])new Set[]{Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t2p0}), Utils.mkSet((Object[])new TopicPartition[]{this.t1p1, this.t2p1})}), (Object)Utils.mkSet((Object[])new HashSet[]{new HashSet(((ConsumerPartitionAssignor.Assignment)assignments.get("consumer10")).partitions()), new HashSet(((ConsumerPartitionAssignor.Assignment)assignments.get("consumer11")).partitions())}));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p2, this.t2p2}), new HashSet(((ConsumerPartitionAssignor.Assignment)assignments.get("consumer20")).partitions()));
        AssignmentInfo info10 = StreamsPartitionAssignorTest.checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment)assignments.get("consumer10"));
        HashSet allActiveTasks = new HashSet(info10.activeTasks());
        AssignmentInfo info11 = StreamsPartitionAssignorTest.checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment)assignments.get("consumer11"));
        allActiveTasks.addAll(info11.activeTasks());
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1}), allActiveTasks);
        AssignmentInfo info20 = StreamsPartitionAssignorTest.checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment)assignments.get("consumer20"));
        allActiveTasks.addAll(info20.activeTasks());
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals((Object)allTasks, new HashSet(allActiveTasks));
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals((Object)allTasks, allActiveTasks);
    }

    @Test
    public void shouldAssignEvenlyAcrossConsumersOneClientMultipleThreads() {
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addProcessor("processorII", new MockProcessorSupplier(), new String[]{"source2"});
        List<PartitionInfo> localInfos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 3, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 3, Node.noNode(), new Node[0], new Node[0]));
        Cluster localMetadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), localInfos, Collections.emptySet(), Collections.emptySet());
        List<String> topics = Arrays.asList("topic1", "topic2");
        this.configureDefault();
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, this.defaultSubscriptionInfo.encode()));
        this.subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription(topics, this.defaultSubscriptionInfo.encode()));
        Map assignments = this.partitionAssignor.assign(localMetadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assert.assertEquals((Object)Utils.mkSet((Object[])new Set[]{Utils.mkSet((Object[])new TopicPartition[]{this.t2p2, this.t1p0, this.t1p2, this.t2p0}), Utils.mkSet((Object[])new TopicPartition[]{this.t1p1, this.t2p1, this.t1p3, this.t2p3})}), (Object)Utils.mkSet((Object[])new HashSet[]{new HashSet(((ConsumerPartitionAssignor.Assignment)assignments.get("consumer10")).partitions()), new HashSet(((ConsumerPartitionAssignor.Assignment)assignments.get("consumer11")).partitions())}));
        AssignmentInfo info10 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer10")).userData());
        List<TaskId> expectedInfo10TaskIds = Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_2);
        Assert.assertEquals(expectedInfo10TaskIds, (Object)info10.activeTasks());
        AssignmentInfo info11 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer11")).userData());
        List<TaskId> expectedInfo11TaskIds = Arrays.asList(AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_3, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_3);
        Assert.assertEquals(expectedInfo11TaskIds, (Object)info11.activeTasks());
    }

    @Test
    public void testAssignWithPartialTopology() {
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store1", false), new String[]{"processor1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor2", new MockProcessorSupplier(), new String[]{"source2"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store2", false), new String[]{"processor2"});
        List<String> topics = Arrays.asList("topic1", "topic2");
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        this.createDefaultMockTaskManager();
        this.createMockAdminClient(StreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Collections.singletonList("stream-partition-assignor-test-store1-changelog"), Collections.singletonList(3)));
        this.configurePartitionAssignorWith(Collections.singletonMap("partition.grouper", SingleGroupPartitionGrouperStub.class));
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, this.defaultSubscriptionInfo.encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        AssignmentInfo info10 = StreamsPartitionAssignorTest.checkAssignment(Utils.mkSet((Object[])new String[]{"topic1"}), (ConsumerPartitionAssignor.Assignment)assignments.get("consumer10"));
        HashSet allActiveTasks = new HashSet(info10.activeTasks());
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals((Object)allTasks, new HashSet(allActiveTasks));
    }

    @Test
    public void testAssignEmptyMetadata() {
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        List<String> topics = Arrays.asList("topic1", "topic2");
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        Set prevTasks10 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0});
        Set standbyTasks10 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_1});
        Cluster emptyMetadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
        this.createMockTaskManager(prevTasks10, standbyTasks10);
        this.configureDefaultPartitionAssignor();
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_1, prevTasks10, standbyTasks10).encode()));
        Map assignments = this.partitionAssignor.assign(emptyMetadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assert.assertEquals(Collections.emptySet(), new HashSet(((ConsumerPartitionAssignor.Assignment)assignments.get("consumer10")).partitions()));
        AssignmentInfo info10 = StreamsPartitionAssignorTest.checkAssignment(Collections.emptySet(), (ConsumerPartitionAssignor.Assignment)assignments.get("consumer10"));
        HashSet allActiveTasks = new HashSet(info10.activeTasks());
        Assert.assertEquals((long)0L, (long)allActiveTasks.size());
        assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assert.assertEquals((Object)Utils.mkSet((Object[])new Set[]{Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t2p0, this.t1p0, this.t2p0, this.t1p1, this.t2p1, this.t1p2, this.t2p2})}), (Object)Utils.mkSet((Object[])new HashSet[]{new HashSet(((ConsumerPartitionAssignor.Assignment)assignments.get("consumer10")).partitions())}));
        info10 = StreamsPartitionAssignorTest.checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment)assignments.get("consumer10"));
        allActiveTasks.addAll(info10.activeTasks());
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals((Object)allTasks, new HashSet(allActiveTasks));
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals((Object)allTasks, allActiveTasks);
    }

    @Test
    public void testAssignWithNewTasks() {
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addSource(null, "source3", null, null, null, new String[]{"topic3"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2", "source3"});
        List<String> topics = Arrays.asList("topic1", "topic2", "topic3");
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_0_3});
        Set prevTasks10 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0});
        Set prevTasks11 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_1});
        Set prevTasks20 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_2});
        this.createMockTaskManager(prevTasks10, AssignmentTestUtils.EMPTY_TASKS);
        this.configureDefaultPartitionAssignor();
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_1, prevTasks10, AssignmentTestUtils.EMPTY_TASKS).encode()));
        this.subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription(topics, StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_1, prevTasks11, AssignmentTestUtils.EMPTY_TASKS).encode()));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(topics, StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_2, prevTasks20, AssignmentTestUtils.EMPTY_TASKS).encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        AssignmentInfo info = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer10")).userData());
        HashSet allActiveTasks = new HashSet(info.activeTasks());
        HashSet allPartitions = new HashSet(((ConsumerPartitionAssignor.Assignment)assignments.get("consumer10")).partitions());
        info = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer11")).userData());
        allActiveTasks.addAll(info.activeTasks());
        allPartitions.addAll(((ConsumerPartitionAssignor.Assignment)assignments.get("consumer11")).partitions());
        info = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer20")).userData());
        allActiveTasks.addAll(info.activeTasks());
        allPartitions.addAll(((ConsumerPartitionAssignor.Assignment)assignments.get("consumer20")).partitions());
        Assert.assertEquals((Object)allTasks, allActiveTasks);
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t1p1, this.t1p2, this.t2p0, this.t2p1, this.t2p2, this.t3p0, this.t3p1, this.t3p2, this.t3p3}), allPartitions);
    }

    @Test
    public void testAssignWithStates() {
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor-1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store1", false), new String[]{"processor-1"});
        this.builder.addProcessor("processor-2", new MockProcessorSupplier(), new String[]{"source2"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store2", false), new String[]{"processor-2"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store3", false), new String[]{"processor-2"});
        List<String> topics = Arrays.asList("topic1", "topic2");
        List<TaskId> tasks = Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2, AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2);
        this.createMockAdminClient(StreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Arrays.asList("stream-partition-assignor-test-store1-changelog", "stream-partition-assignor-test-store2-changelog", "stream-partition-assignor-test-store3-changelog"), Arrays.asList(3, 3, 3)));
        this.configureDefault();
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, this.defaultSubscriptionInfo.encode()));
        this.subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription(topics, this.defaultSubscriptionInfo.encode()));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(topics, StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_2, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS).encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        Assert.assertEquals((long)2L, (long)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer10")).partitions().size());
        Assert.assertEquals((long)2L, (long)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer11")).partitions().size());
        Assert.assertEquals((long)2L, (long)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer20")).partitions().size());
        AssignmentInfo info10 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer10")).userData());
        AssignmentInfo info11 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer11")).userData());
        AssignmentInfo info20 = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer20")).userData());
        Assert.assertEquals((long)2L, (long)info10.activeTasks().size());
        Assert.assertEquals((long)2L, (long)info11.activeTasks().size());
        Assert.assertEquals((long)2L, (long)info20.activeTasks().size());
        HashSet allTasks = new HashSet();
        allTasks.addAll(info10.activeTasks());
        allTasks.addAll(info11.activeTasks());
        allTasks.addAll(info20.activeTasks());
        Assert.assertEquals(new HashSet<TaskId>(tasks), allTasks);
        Map topicGroups = this.builder.topicGroups();
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2}), StreamsPartitionAssignorTest.tasksForState("store1", tasks, topicGroups));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2}), StreamsPartitionAssignorTest.tasksForState("store2", tasks, topicGroups));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_1_0, AssignmentTestUtils.TASK_1_1, AssignmentTestUtils.TASK_1_2}), StreamsPartitionAssignorTest.tasksForState("store3", tasks, topicGroups));
    }

    private static Set<TaskId> tasksForState(String storeName, List<TaskId> tasks, Map<Integer, InternalTopologyBuilder.TopicsInfo> topicGroups) {
        String changelogTopic = ProcessorStateManager.storeChangelogTopic((String)APPLICATION_ID, (String)storeName);
        HashSet<TaskId> ids = new HashSet<TaskId>();
        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
            Set stateChangelogTopics = entry.getValue().stateChangelogTopics.keySet();
            if (!stateChangelogTopics.contains(changelogTopic)) continue;
            for (TaskId id : tasks) {
                if (id.topicGroupId != entry.getKey()) continue;
                ids.add(id);
            }
        }
        return ids;
    }

    @Test
    public void testAssignWithStandbyReplicasAndStatelessTasks() {
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1", "topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1"});
        List<String> topics = Arrays.asList("topic1", "topic2");
        this.createMockTaskManager(Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0}), Collections.emptySet());
        this.configurePartitionAssignorWith(Collections.singletonMap("num.standby.replicas", 1));
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_1, Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0}), Collections.emptySet()).encode()));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(topics, StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_2, Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_2}), Collections.emptySet()).encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        AssignmentInfo info10 = StreamsPartitionAssignorTest.checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment)assignments.get("consumer10"));
        Assert.assertTrue((boolean)info10.standbyTasks().isEmpty());
        AssignmentInfo info20 = StreamsPartitionAssignorTest.checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment)assignments.get("consumer20"));
        Assert.assertTrue((boolean)info20.standbyTasks().isEmpty());
    }

    @Test
    public void testAssignWithStandbyReplicasAndLoggingDisabled() {
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1", "topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore(new MockKeyValueStoreBuilder("store1", false).withLoggingDisabled(), new String[]{"processor"});
        List<String> topics = Arrays.asList("topic1", "topic2");
        this.createMockTaskManager(Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0}), Collections.emptySet());
        this.configurePartitionAssignorWith(Collections.singletonMap("num.standby.replicas", 1));
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_1, Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0}), Collections.emptySet()).encode()));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(topics, StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_2, Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_2}), Collections.emptySet()).encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        AssignmentInfo info10 = StreamsPartitionAssignorTest.checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment)assignments.get("consumer10"));
        Assert.assertTrue((boolean)info10.standbyTasks().isEmpty());
        AssignmentInfo info20 = StreamsPartitionAssignorTest.checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment)assignments.get("consumer20"));
        Assert.assertTrue((boolean)info20.standbyTasks().isEmpty());
    }

    @Test
    public void testAssignWithStandbyReplicas() {
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topic2"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source1", "source2"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store1", false), new String[]{"processor"});
        List<String> topics = Arrays.asList("topic1", "topic2");
        Set allTopicPartitions = topics.stream().map(topic -> Arrays.asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(topic, 2))).flatMap(Collection::stream).collect(Collectors.toSet());
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        Set prevTasks00 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0});
        Set prevTasks01 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_1});
        Set prevTasks02 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_2});
        Set standbyTasks00 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0});
        Set standbyTasks01 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_1});
        Set standbyTasks02 = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_2});
        this.createMockTaskManager(prevTasks00, standbyTasks01);
        this.createMockAdminClient(StreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Collections.singletonList("stream-partition-assignor-test-store1-changelog"), Collections.singletonList(3)));
        this.configurePartitionAssignorWith(Collections.singletonMap("num.standby.replicas", 1));
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_1, prevTasks00, AssignmentTestUtils.EMPTY_TASKS, USER_END_POINT).encode()));
        this.subscriptions.put("consumer11", new ConsumerPartitionAssignor.Subscription(topics, StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_1, prevTasks01, standbyTasks02, USER_END_POINT).encode()));
        this.subscriptions.put("consumer20", new ConsumerPartitionAssignor.Subscription(topics, StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_2, prevTasks02, standbyTasks00, OTHER_END_POINT).encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        AssignmentInfo info10 = StreamsPartitionAssignorTest.checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment)assignments.get("consumer10"));
        HashSet allActiveTasks = new HashSet(info10.activeTasks());
        HashSet allStandbyTasks = new HashSet(info10.standbyTasks().keySet());
        AssignmentInfo info11 = StreamsPartitionAssignorTest.checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment)assignments.get("consumer11"));
        allActiveTasks.addAll(info11.activeTasks());
        allStandbyTasks.addAll(info11.standbyTasks().keySet());
        Assert.assertNotEquals((String)"same processId has same set of standby tasks", info11.standbyTasks().keySet(), info10.standbyTasks().keySet());
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1}), new HashSet(allActiveTasks));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_2}), new HashSet(allStandbyTasks));
        AssignmentInfo info20 = StreamsPartitionAssignorTest.checkAssignment(this.allTopics, (ConsumerPartitionAssignor.Assignment)assignments.get("consumer20"));
        allActiveTasks.addAll(info20.activeTasks());
        allStandbyTasks.addAll(info20.standbyTasks().keySet());
        Assert.assertEquals((long)3L, (long)allActiveTasks.size());
        Assert.assertEquals((Object)allTasks, allActiveTasks);
        Assert.assertEquals((long)3L, (long)allStandbyTasks.size());
        Assert.assertEquals((Object)allTasks, allStandbyTasks);
        Map partitionsByHost = info10.partitionsByHost();
        Assert.assertEquals((long)2L, (long)partitionsByHost.size());
        Assert.assertEquals(allTopicPartitions, partitionsByHost.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()));
        Map standbyPartitionsByHost = info10.standbyPartitionByHost();
        Assert.assertEquals((long)2L, (long)standbyPartitionsByHost.size());
        Assert.assertEquals(allTopicPartitions, standbyPartitionsByHost.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()));
        for (HostInfo hostInfo : partitionsByHost.keySet()) {
            Assert.assertTrue((boolean)Collections.disjoint((Collection)partitionsByHost.get(hostInfo), (Collection)standbyPartitionsByHost.get(hostInfo)));
        }
        Assert.assertEquals((Object)partitionsByHost, (Object)info11.partitionsByHost());
        Assert.assertEquals((Object)partitionsByHost, (Object)info20.partitionsByHost());
        Assert.assertEquals((Object)standbyPartitionsByHost, (Object)info11.standbyPartitionByHost());
        Assert.assertEquals((Object)standbyPartitionsByHost, (Object)info20.standbyPartitionByHost());
    }

    @Test
    public void testOnAssignment() {
        this.taskManager = (TaskManager)EasyMock.createStrictMock(TaskManager.class);
        Map<HostInfo, Set> hostState = Collections.singletonMap(new HostInfo("localhost", 9090), Utils.mkSet((Object[])new TopicPartition[]{this.t3p0, this.t3p3}));
        HashMap<TaskId, Set> activeTasks = new HashMap<TaskId, Set>();
        activeTasks.put(AssignmentTestUtils.TASK_0_0, Utils.mkSet((Object[])new TopicPartition[]{this.t3p0}));
        activeTasks.put(AssignmentTestUtils.TASK_0_3, Utils.mkSet((Object[])new TopicPartition[]{this.t3p3}));
        HashMap<TaskId, Set> standbyTasks = new HashMap<TaskId, Set>();
        standbyTasks.put(AssignmentTestUtils.TASK_0_1, Utils.mkSet((Object[])new TopicPartition[]{this.t3p1}));
        standbyTasks.put(AssignmentTestUtils.TASK_0_2, Utils.mkSet((Object[])new TopicPartition[]{this.t3p2}));
        this.taskManager.handleAssignment(activeTasks, standbyTasks);
        EasyMock.expectLastCall();
        this.streamsMetadataState = (StreamsMetadataState)EasyMock.createStrictMock(StreamsMetadataState.class);
        Capture capturedCluster = EasyMock.newCapture();
        this.streamsMetadataState.onChange((Map)EasyMock.eq(hostState), (Map)EasyMock.anyObject(), (Cluster)EasyMock.capture((Capture)capturedCluster));
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.streamsMetadataState});
        this.configureDefaultPartitionAssignor();
        List<TaskId> activeTaskList = Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_3);
        AssignmentInfo info = new AssignmentInfo(7, activeTaskList, standbyTasks, hostState, Collections.emptyMap(), 0);
        ConsumerPartitionAssignor.Assignment assignment = new ConsumerPartitionAssignor.Assignment(Arrays.asList(this.t3p0, this.t3p3), info.encode());
        this.partitionAssignor.onAssignment(assignment, null);
        EasyMock.verify((Object[])new Object[]{this.streamsMetadataState});
        EasyMock.verify((Object[])new Object[]{this.taskManager});
        Assert.assertEquals(Collections.singleton(this.t3p0.topic()), (Object)((Cluster)capturedCluster.getValue()).topics());
        Assert.assertEquals((long)2L, (long)((Cluster)capturedCluster.getValue()).partitionsForTopic(this.t3p0.topic()).size());
    }

    @Test
    public void testAssignWithInternalTopics() {
        this.builder.addInternalTopic("topicX", InternalTopicProperties.empty());
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addSink("sink1", "topicX", null, null, null, new String[]{"processor1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topicX"});
        this.builder.addProcessor("processor2", new MockProcessorSupplier(), new String[]{"source2"});
        List<String> topics = Arrays.asList("topic1", "stream-partition-assignor-test-topicX");
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        MockInternalTopicManager internalTopicManager = this.configureDefault();
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, this.defaultSubscriptionInfo.encode()));
        this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions));
        Assert.assertEquals((long)1L, (long)internalTopicManager.readyTopics.size());
        Assert.assertEquals((long)allTasks.size(), (long)internalTopicManager.readyTopics.get("stream-partition-assignor-test-topicX").intValue());
    }

    @Test
    public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() {
        this.builder.addInternalTopic("topicX", InternalTopicProperties.empty());
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addSink("sink1", "topicX", null, null, null, new String[]{"processor1"});
        this.builder.addSource(null, "source2", null, null, null, new String[]{"topicX"});
        this.builder.addInternalTopic("topicZ", InternalTopicProperties.empty());
        this.builder.addProcessor("processor2", new MockProcessorSupplier(), new String[]{"source2"});
        this.builder.addSink("sink2", "topicZ", null, null, null, new String[]{"processor2"});
        this.builder.addSource(null, "source3", null, null, null, new String[]{"topicZ"});
        List<String> topics = Arrays.asList("topic1", "stream-partition-assignor-test-topicX", "stream-partition-assignor-test-topicZ");
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        MockInternalTopicManager internalTopicManager = this.configureDefault();
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, this.defaultSubscriptionInfo.encode()));
        this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions));
        Assert.assertEquals((long)2L, (long)internalTopicManager.readyTopics.size());
        Assert.assertEquals((long)allTasks.size(), (long)internalTopicManager.readyTopics.get("stream-partition-assignor-test-topicZ").intValue());
    }

    @Test
    public void shouldGenerateTasksForAllCreatedPartitions() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream1 = streamsBuilder.stream("topic1").map(KeyValue::new);
        KTable table1 = streamsBuilder.table("topic3").groupBy(KeyValue::new).count();
        stream1.join(table1, (value1, value2) -> null);
        String client = "client1";
        this.builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
        this.createMockAdminClient(StreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Arrays.asList("stream-partition-assignor-test-topic3-STATE-STORE-0000000002-changelog", "stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog"), Arrays.asList(4, 4)));
        MockInternalTopicManager mockInternalTopicManager = this.configureDefault();
        this.subscriptions.put("client1", new ConsumerPartitionAssignor.Subscription(Arrays.asList("topic1", "topic3"), this.defaultSubscriptionInfo.encode()));
        Map assignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        HashMap<String, Integer> expectedCreatedInternalTopics = new HashMap<String, Integer>();
        expectedCreatedInternalTopics.put("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 4);
        expectedCreatedInternalTopics.put("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog", 4);
        expectedCreatedInternalTopics.put("stream-partition-assignor-test-topic3-STATE-STORE-0000000002-changelog", 4);
        expectedCreatedInternalTopics.put("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 4);
        MatcherAssert.assertThat(mockInternalTopicManager.readyTopics, (Matcher)CoreMatchers.equalTo(expectedCreatedInternalTopics));
        List<TopicPartition> expectedAssignment = Arrays.asList(new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2), new TopicPartition("topic3", 0), new TopicPartition("topic3", 1), new TopicPartition("topic3", 2), new TopicPartition("topic3", 3), new TopicPartition("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 0), new TopicPartition("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 1), new TopicPartition("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 2), new TopicPartition("stream-partition-assignor-test-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition", 3), new TopicPartition("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 0), new TopicPartition("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 1), new TopicPartition("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 2), new TopicPartition("stream-partition-assignor-test-KSTREAM-MAP-0000000001-repartition", 3));
        MatcherAssert.assertThat(new HashSet(((ConsumerPartitionAssignor.Assignment)assignment.get("client1")).partitions()), (Matcher)CoreMatchers.equalTo(new HashSet<TopicPartition>(expectedAssignment)));
    }

    @Test
    public void shouldAddUserDefinedEndPointToSubscription() {
        this.builder.addSource(null, "source", null, null, null, new String[]{"input"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        this.builder.addSink("sink", "output", null, null, null, new String[]{"processor"});
        this.createDefaultMockTaskManager();
        this.configurePartitionAssignorWith(Collections.singletonMap("application.server", USER_END_POINT));
        Set topics = Utils.mkSet((Object[])new String[]{"input"});
        ByteBuffer userData = this.partitionAssignor.subscriptionUserData(topics);
        ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList(topics), userData);
        SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode((ByteBuffer)subscription.userData());
        Assert.assertEquals((Object)USER_END_POINT, (Object)subscriptionInfo.userEndPoint());
    }

    @Test
    public void shouldMapUserEndPointToTopicPartitions() {
        this.builder.addSource(null, "source", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        this.builder.addSink("sink", "output", null, null, null, new String[]{"processor"});
        List<String> topics = Collections.singletonList("topic1");
        this.createDefaultMockTaskManager();
        this.configurePartitionAssignorWith(Collections.singletonMap("application.server", USER_END_POINT));
        this.subscriptions.put(CONSUMER_1, new ConsumerPartitionAssignor.Subscription(topics, StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_1, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS, USER_END_POINT).encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        ConsumerPartitionAssignor.Assignment consumerAssignment = (ConsumerPartitionAssignor.Assignment)assignments.get(CONSUMER_1);
        AssignmentInfo assignmentInfo = AssignmentInfo.decode((ByteBuffer)consumerAssignment.userData());
        Set topicPartitions = (Set)assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080));
        Assert.assertEquals((Object)Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("topic1", 0), new TopicPartition("topic1", 1), new TopicPartition("topic1", 2)}), (Object)topicPartitions);
    }

    @Test
    public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() {
        this.createDefaultMockTaskManager();
        try {
            this.configurePartitionAssignorWith(Collections.singletonMap("application.server", "localhost"));
            Assert.fail((String)"expected to an exception due to invalid config");
        }
        catch (ConfigException configException) {
            // empty catch block
        }
    }

    @Test
    public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() {
        this.createDefaultMockTaskManager();
        Assert.assertThrows(ConfigException.class, () -> this.configurePartitionAssignorWith(Collections.singletonMap("application.server", "localhost:j87yhk")));
    }

    @Test
    public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KStream stream1 = streamsBuilder.stream("topic1").selectKey((key, value) -> null).groupByKey().count(Materialized.as((String)"count")).toStream().map((key, value) -> null);
        streamsBuilder.stream("unknownTopic").selectKey((key, value) -> null).join(stream1, (value1, value2) -> null, JoinWindows.of((Duration)Duration.ofMillis(0L)));
        String client = "client1";
        this.builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
        MockInternalTopicManager mockInternalTopicManager = this.configureDefault();
        this.subscriptions.put("client1", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("unknownTopic"), this.defaultSubscriptionInfo.encode()));
        Map assignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat((Object)mockInternalTopicManager.readyTopics.isEmpty(), (Matcher)CoreMatchers.equalTo((Object)true));
        MatcherAssert.assertThat((Object)((ConsumerPartitionAssignor.Assignment)assignment.get("client1")).partitions().isEmpty(), (Matcher)CoreMatchers.equalTo((Object)true));
    }

    @Test
    public void shouldUpdateClusterMetadataAndHostInfoOnAssignment() {
        Map initialHostState = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new HostInfo("localhost", 9090), (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t1p1})), Utils.mkEntry((Object)new HostInfo("otherhost", 9090), (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t2p0, this.t2p1}))});
        Map newHostState = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new HostInfo("localhost", 9090), (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t1p1})), Utils.mkEntry((Object)new HostInfo("newotherhost", 9090), (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t2p0, this.t2p1}))});
        this.streamsMetadataState = (StreamsMetadataState)EasyMock.createStrictMock(StreamsMetadataState.class);
        this.streamsMetadataState.onChange((Map)EasyMock.eq((Object)initialHostState), (Map)EasyMock.anyObject(), (Cluster)EasyMock.anyObject());
        this.streamsMetadataState.onChange((Map)EasyMock.eq((Object)newHostState), (Map)EasyMock.anyObject(), (Cluster)EasyMock.anyObject());
        EasyMock.replay((Object[])new Object[]{this.streamsMetadataState});
        this.createDefaultMockTaskManager();
        this.configureDefaultPartitionAssignor();
        this.partitionAssignor.onAssignment(StreamsPartitionAssignorTest.createAssignment(initialHostState), null);
        this.partitionAssignor.onAssignment(StreamsPartitionAssignorTest.createAssignment(newHostState), null);
        EasyMock.verify((Object[])new Object[]{this.taskManager, this.streamsMetadataState});
    }

    @Test
    public void shouldTriggerImmediateRebalanceOnHostInfoChange() {
        Map oldHostState = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new HostInfo("localhost", 9090), (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t1p1})), Utils.mkEntry((Object)new HostInfo("otherhost", 9090), (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t2p0, this.t2p1}))});
        Map newHostState = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new HostInfo("newhost", 9090), (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t1p1})), Utils.mkEntry((Object)new HostInfo("otherhost", 9090), (Object)Utils.mkSet((Object[])new TopicPartition[]{this.t2p0, this.t2p1}))});
        this.createDefaultMockTaskManager();
        this.configurePartitionAssignorWith(Collections.singletonMap("application.server", "newhost:9090"));
        this.partitionAssignor.onAssignment(StreamsPartitionAssignorTest.createAssignment(oldHostState), null);
        MatcherAssert.assertThat((Object)this.nextScheduledRebalanceMs.get(), (Matcher)Matchers.is((Object)0L));
        this.partitionAssignor.onAssignment(StreamsPartitionAssignorTest.createAssignment(newHostState), null);
        MatcherAssert.assertThat((Object)this.nextScheduledRebalanceMs.get(), (Matcher)Matchers.is((Object)Long.MAX_VALUE));
    }

    @Test
    public void shouldTriggerImmediateRebalanceOnTasksRevoked() {
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        List<TopicPartition> allPartitions = Arrays.asList(this.t1p0, this.t1p1, this.t1p2);
        this.subscriptions.put(CONSUMER_1, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_1, allTasks, AssignmentTestUtils.EMPTY_TASKS).encode(), allPartitions));
        this.subscriptions.put(CONSUMER_2, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_1, AssignmentTestUtils.EMPTY_TASKS, allTasks).encode(), Collections.emptyList()));
        this.createMockTaskManager(allTasks, allTasks);
        this.configurePartitionAssignorWith(Collections.singletonMap("acceptable.recovery.lag", 0L));
        Map assignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat((Object)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_1)).partitions(), (Matcher)CoreMatchers.not(allPartitions));
        MatcherAssert.assertThat((Object)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_2)).partitions(), (Matcher)CoreMatchers.equalTo(Collections.emptyList()));
        MatcherAssert.assertThat((Object)AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_2)).userData()).activeTasks(), (Matcher)CoreMatchers.equalTo(Collections.emptyList()));
        MatcherAssert.assertThat((Object)AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_2)).userData()).standbyTasks(), (Matcher)CoreMatchers.equalTo(Collections.emptyMap()));
        this.partitionAssignor.onAssignment((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_2), null);
        MatcherAssert.assertThat((Object)this.nextScheduledRebalanceMs.get(), (Matcher)Matchers.is((Object)0L));
    }

    @Test
    public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("topic1").groupByKey().count();
        this.builder = TopologyWrapper.getInternalTopologyBuilder(streamsBuilder.build());
        this.createDefaultMockTaskManager();
        this.createMockAdminClient(StreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Collections.singletonList("stream-partition-assignor-test-KSTREAM-AGGREGATE-STATE-STORE-0000000001-changelog"), Collections.singletonList(3)));
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.put("num.standby.replicas", 1);
        props.put("application.server", USER_END_POINT);
        this.configurePartitionAssignorWith(props);
        this.subscriptions.put(CONSUMER_1, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_1, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS, USER_END_POINT).encode()));
        this.subscriptions.put(CONSUMER_2, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_2, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS, OTHER_END_POINT).encode()));
        Set allPartitions = Utils.mkSet((Object[])new TopicPartition[]{this.t1p0, this.t1p1, this.t1p2});
        Map assign = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        ConsumerPartitionAssignor.Assignment consumer1Assignment = (ConsumerPartitionAssignor.Assignment)assign.get(CONSUMER_1);
        AssignmentInfo assignmentInfo = AssignmentInfo.decode((ByteBuffer)consumer1Assignment.userData());
        Set consumer1ActivePartitions = (Set)assignmentInfo.partitionsByHost().get(new HostInfo("localhost", 8080));
        Set consumer2ActivePartitions = (Set)assignmentInfo.partitionsByHost().get(new HostInfo("other", 9090));
        Set consumer1StandbyPartitions = (Set)assignmentInfo.standbyPartitionByHost().get(new HostInfo("localhost", 8080));
        Set consumer2StandbyPartitions = (Set)assignmentInfo.standbyPartitionByHost().get(new HostInfo("other", 9090));
        HashSet allAssignedPartitions = new HashSet(consumer1ActivePartitions);
        allAssignedPartitions.addAll(consumer2ActivePartitions);
        MatcherAssert.assertThat((Object)consumer1ActivePartitions, (Matcher)CoreMatchers.not((Object)allPartitions));
        MatcherAssert.assertThat((Object)consumer2ActivePartitions, (Matcher)CoreMatchers.not((Object)allPartitions));
        MatcherAssert.assertThat((Object)consumer1ActivePartitions, (Matcher)CoreMatchers.equalTo((Object)consumer2StandbyPartitions));
        MatcherAssert.assertThat((Object)consumer2ActivePartitions, (Matcher)CoreMatchers.equalTo((Object)consumer1StandbyPartitions));
        MatcherAssert.assertThat(allAssignedPartitions, (Matcher)CoreMatchers.equalTo((Object)allPartitions));
    }

    @Test
    public void shouldThrowKafkaExceptionIfTaskMangerNotConfigured() {
        Map<String, Object> config = this.configProps();
        config.remove("__task.manager.instance__");
        try {
            this.partitionAssignor.configure(config);
            Assert.fail((String)"Should have thrown KafkaException");
        }
        catch (KafkaException expected) {
            MatcherAssert.assertThat((Object)expected.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"TaskManager is not specified"));
        }
    }

    @Test
    public void shouldThrowKafkaExceptionIfTaskMangerConfigIsNotTaskManagerInstance() {
        Map<String, Object> config = this.configProps();
        config.put("__task.manager.instance__", "i am not a task manager");
        try {
            this.partitionAssignor.configure(config);
            Assert.fail((String)"Should have thrown KafkaException");
        }
        catch (KafkaException expected) {
            MatcherAssert.assertThat((Object)expected.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"java.lang.String is not an instance of org.apache.kafka.streams.processor.internals.TaskManager"));
        }
    }

    @Test
    public void shouldThrowKafkaExceptionAssignmentErrorCodeNotConfigured() {
        this.createDefaultMockTaskManager();
        Map<String, Object> config = this.configProps();
        config.remove("__assignment.error.code__");
        try {
            this.partitionAssignor.configure(config);
            Assert.fail((String)"Should have thrown KafkaException");
        }
        catch (KafkaException expected) {
            MatcherAssert.assertThat((Object)expected.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"assignmentErrorCode is not specified"));
        }
    }

    @Test
    public void shouldThrowKafkaExceptionIfVersionProbingFlagConfigIsNotAtomicInteger() {
        this.createDefaultMockTaskManager();
        Map<String, Object> config = this.configProps();
        config.put("__assignment.error.code__", "i am not an AtomicInteger");
        try {
            this.partitionAssignor.configure(config);
            Assert.fail((String)"Should have thrown KafkaException");
        }
        catch (KafkaException expected) {
            MatcherAssert.assertThat((Object)expected.getMessage(), (Matcher)CoreMatchers.equalTo((Object)"java.lang.String is not an instance of java.util.concurrent.atomic.AtomicInteger"));
        }
    }

    @Test
    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV1V2() {
        this.shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(1, 2);
    }

    @Test
    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV1V3() {
        this.shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(1, 3);
    }

    @Test
    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersionsV2V3() {
        this.shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(2, 3);
    }

    private void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions(int smallestVersion, int otherVersion) {
        this.subscriptions.put(CONSUMER_1, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), StreamsPartitionAssignorTest.getInfoForOlderVersion(smallestVersion, AssignmentTestUtils.UUID_1, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS).encode()));
        this.subscriptions.put(CONSUMER_2, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), StreamsPartitionAssignorTest.getInfoForOlderVersion(otherVersion, AssignmentTestUtils.UUID_2, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS).encode()));
        this.configureDefault();
        Map assignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat((Object)assignment.size(), (Matcher)CoreMatchers.equalTo((Object)2));
        MatcherAssert.assertThat((Object)AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_1)).userData()).version(), (Matcher)CoreMatchers.equalTo((Object)smallestVersion));
        MatcherAssert.assertThat((Object)AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_2)).userData()).version(), (Matcher)CoreMatchers.equalTo((Object)smallestVersion));
    }

    @Test
    public void shouldDownGradeSubscriptionToVersion1() {
        this.createDefaultMockTaskManager();
        this.configurePartitionAssignorWith(Collections.singletonMap("upgrade.from", "0.10.0"));
        Set topics = Utils.mkSet((Object[])new String[]{"topic1"});
        ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList(topics), this.partitionAssignor.subscriptionUserData(topics));
        MatcherAssert.assertThat((Object)SubscriptionInfo.decode((ByteBuffer)subscription.userData()).version(), (Matcher)CoreMatchers.equalTo((Object)1));
    }

    @Test
    public void shouldDownGradeSubscriptionToVersion2For0101() {
        this.shouldDownGradeSubscriptionToVersion2("0.10.1");
    }

    @Test
    public void shouldDownGradeSubscriptionToVersion2For0102() {
        this.shouldDownGradeSubscriptionToVersion2("0.10.2");
    }

    @Test
    public void shouldDownGradeSubscriptionToVersion2For0110() {
        this.shouldDownGradeSubscriptionToVersion2("0.11.0");
    }

    @Test
    public void shouldDownGradeSubscriptionToVersion2For10() {
        this.shouldDownGradeSubscriptionToVersion2("1.0");
    }

    @Test
    public void shouldDownGradeSubscriptionToVersion2For11() {
        this.shouldDownGradeSubscriptionToVersion2("1.1");
    }

    private void shouldDownGradeSubscriptionToVersion2(Object upgradeFromValue) {
        this.createDefaultMockTaskManager();
        this.configurePartitionAssignorWith(Collections.singletonMap("upgrade.from", upgradeFromValue));
        Set topics = Utils.mkSet((Object[])new String[]{"topic1"});
        ConsumerPartitionAssignor.Subscription subscription = new ConsumerPartitionAssignor.Subscription(new ArrayList(topics), this.partitionAssignor.subscriptionUserData(topics));
        MatcherAssert.assertThat((Object)SubscriptionInfo.decode((ByteBuffer)subscription.userData()).version(), (Matcher)CoreMatchers.equalTo((Object)2));
    }

    @Test
    public void shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenNewConsumerJoins() {
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        this.subscriptions.put(CONSUMER_1, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_1, allTasks, AssignmentTestUtils.EMPTY_TASKS).encode(), Arrays.asList(this.t1p0, this.t1p1, this.t1p2)));
        this.subscriptions.put(CONSUMER_2, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), StreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_2, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS).encode(), Collections.emptyList()));
        this.createMockTaskManager(allTasks, allTasks);
        this.configureDefaultPartitionAssignor();
        Map assignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat((Object)assignment.size(), (Matcher)CoreMatchers.equalTo((Object)2));
        MatcherAssert.assertThat((Object)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_2)).partitions(), (Matcher)CoreMatchers.equalTo(Collections.emptyList()));
        AssignmentInfo actualAssignment = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_2)).userData());
        MatcherAssert.assertThat((Object)actualAssignment.version(), (Matcher)Matchers.is((Object)7));
        MatcherAssert.assertThat((Object)actualAssignment.activeTasks(), (Matcher)Matchers.empty());
        MatcherAssert.assertThat((Object)actualAssignment.partitionsByHost(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)actualAssignment.standbyPartitionByHost(), (Matcher)Matchers.anEmptyMap());
        MatcherAssert.assertThat((Object)actualAssignment.errCode(), (Matcher)Matchers.is((Object)0));
    }

    @Test
    public void shouldReturnInterleavedAssignmentForOnlyFutureInstancesDuringVersionProbing() {
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        this.subscriptions.put(CONSUMER_1, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), StreamsPartitionAssignorTest.encodeFutureSubscription(), Collections.emptyList()));
        this.subscriptions.put(CONSUMER_2, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), StreamsPartitionAssignorTest.encodeFutureSubscription(), Collections.emptyList()));
        this.createMockTaskManager(allTasks, allTasks);
        this.configurePartitionAssignorWith(Collections.singletonMap("num.standby.replicas", 1));
        Map assignment = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        MatcherAssert.assertThat((Object)assignment.size(), (Matcher)CoreMatchers.equalTo((Object)2));
        MatcherAssert.assertThat((Object)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_1)).partitions(), (Matcher)CoreMatchers.equalTo(Arrays.asList(this.t1p0, this.t1p2)));
        MatcherAssert.assertThat((Object)AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_1)).userData()), (Matcher)CoreMatchers.equalTo((Object)new AssignmentInfo(7, Arrays.asList(AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_2), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 0)));
        MatcherAssert.assertThat((Object)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_2)).partitions(), (Matcher)CoreMatchers.equalTo(Collections.singletonList(this.t1p1)));
        MatcherAssert.assertThat((Object)AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignment.get(CONSUMER_2)).userData()), (Matcher)CoreMatchers.equalTo((Object)new AssignmentInfo(7, Collections.singletonList(AssignmentTestUtils.TASK_0_1), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), 0)));
    }

    @Test
    public void shouldThrowIfV1SubscriptionAndFutureSubscriptionIsMixed() {
        this.shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(1);
    }

    @Test
    public void shouldThrowIfV2SubscriptionAndFutureSubscriptionIsMixed() {
        this.shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(2);
    }

    @Test
    public void shouldNotFailOnBranchedMultiLevelRepartitionConnectedTopology() {
        this.builder.addSource(null, "KSTREAM-SOURCE-0000000000", null, null, null, new String[]{"input-stream"});
        this.builder.addProcessor("KSTREAM-FLATMAPVALUES-0000000001", new MockProcessorSupplier(), new String[]{"KSTREAM-SOURCE-0000000000"});
        this.builder.addProcessor("KSTREAM-BRANCH-0000000002", new MockProcessorSupplier(), new String[]{"KSTREAM-FLATMAPVALUES-0000000001"});
        this.builder.addProcessor("KSTREAM-BRANCHCHILD-0000000003", new MockProcessorSupplier(), new String[]{"KSTREAM-BRANCH-0000000002"});
        this.builder.addProcessor("KSTREAM-BRANCHCHILD-0000000004", new MockProcessorSupplier(), new String[]{"KSTREAM-BRANCH-0000000002"});
        this.builder.addProcessor("KSTREAM-MAP-0000000005", new MockProcessorSupplier(), new String[]{"KSTREAM-BRANCHCHILD-0000000003"});
        this.builder.addInternalTopic("odd_store-repartition", InternalTopicProperties.empty());
        this.builder.addProcessor("odd_store-repartition-filter", new MockProcessorSupplier(), new String[]{"KSTREAM-MAP-0000000005"});
        this.builder.addSink("odd_store-repartition-sink", "odd_store-repartition", null, null, null, new String[]{"odd_store-repartition-filter"});
        this.builder.addSource(null, "odd_store-repartition-source", null, null, null, new String[]{"odd_store-repartition"});
        this.builder.addProcessor("KSTREAM-REDUCE-0000000006", new MockProcessorSupplier(), new String[]{"odd_store-repartition-source"});
        this.builder.addProcessor("KTABLE-TOSTREAM-0000000010", new MockProcessorSupplier(), new String[]{"KSTREAM-REDUCE-0000000006"});
        this.builder.addProcessor("KSTREAM-PEEK-0000000011", new MockProcessorSupplier(), new String[]{"KTABLE-TOSTREAM-0000000010"});
        this.builder.addProcessor("KSTREAM-MAP-0000000012", new MockProcessorSupplier(), new String[]{"KSTREAM-PEEK-0000000011"});
        this.builder.addInternalTopic("odd_store_2-repartition", InternalTopicProperties.empty());
        this.builder.addProcessor("odd_store_2-repartition-filter", new MockProcessorSupplier(), new String[]{"KSTREAM-MAP-0000000012"});
        this.builder.addSink("odd_store_2-repartition-sink", "odd_store_2-repartition", null, null, null, new String[]{"odd_store_2-repartition-filter"});
        this.builder.addSource(null, "odd_store_2-repartition-source", null, null, null, new String[]{"odd_store_2-repartition"});
        this.builder.addProcessor("KSTREAM-REDUCE-0000000013", new MockProcessorSupplier(), new String[]{"odd_store_2-repartition-source"});
        this.builder.addProcessor("KSTREAM-MAP-0000000017", new MockProcessorSupplier(), new String[]{"KSTREAM-BRANCHCHILD-0000000004"});
        this.builder.addInternalTopic("even_store-repartition", InternalTopicProperties.empty());
        this.builder.addProcessor("even_store-repartition-filter", new MockProcessorSupplier(), new String[]{"KSTREAM-MAP-0000000017"});
        this.builder.addSink("even_store-repartition-sink", "even_store-repartition", null, null, null, new String[]{"even_store-repartition-filter"});
        this.builder.addSource(null, "even_store-repartition-source", null, null, null, new String[]{"even_store-repartition"});
        this.builder.addProcessor("KSTREAM-REDUCE-0000000018", new MockProcessorSupplier(), new String[]{"even_store-repartition-source"});
        this.builder.addProcessor("KTABLE-TOSTREAM-0000000022", new MockProcessorSupplier(), new String[]{"KSTREAM-REDUCE-0000000018"});
        this.builder.addProcessor("KSTREAM-PEEK-0000000023", new MockProcessorSupplier(), new String[]{"KTABLE-TOSTREAM-0000000022"});
        this.builder.addProcessor("KSTREAM-MAP-0000000024", new MockProcessorSupplier(), new String[]{"KSTREAM-PEEK-0000000023"});
        this.builder.addInternalTopic("even_store_2-repartition", InternalTopicProperties.empty());
        this.builder.addProcessor("even_store_2-repartition-filter", new MockProcessorSupplier(), new String[]{"KSTREAM-MAP-0000000024"});
        this.builder.addSink("even_store_2-repartition-sink", "even_store_2-repartition", null, null, null, new String[]{"even_store_2-repartition-filter"});
        this.builder.addSource(null, "even_store_2-repartition-source", null, null, null, new String[]{"even_store_2-repartition"});
        this.builder.addProcessor("KSTREAM-REDUCE-0000000025", new MockProcessorSupplier(), new String[]{"even_store_2-repartition-source"});
        this.builder.addProcessor("KTABLE-JOINTHIS-0000000030", new MockProcessorSupplier(), new String[]{"KSTREAM-REDUCE-0000000013"});
        this.builder.addProcessor("KTABLE-JOINOTHER-0000000031", new MockProcessorSupplier(), new String[]{"KSTREAM-REDUCE-0000000025"});
        this.builder.addProcessor("KTABLE-MERGE-0000000029", new MockProcessorSupplier(), new String[]{"KTABLE-JOINTHIS-0000000030", "KTABLE-JOINOTHER-0000000031"});
        this.builder.addProcessor("KTABLE-TOSTREAM-0000000032", new MockProcessorSupplier(), new String[]{"KTABLE-MERGE-0000000029"});
        List<String> topics = Arrays.asList("input-stream", "test-even_store-repartition", "test-even_store_2-repartition", "test-odd_store-repartition", "test-odd_store_2-repartition");
        this.configureDefault();
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(topics, this.defaultSubscriptionInfo.encode()));
        Cluster metadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), Collections.singletonList(new PartitionInfo("input-stream", 0, Node.noNode(), new Node[0], new Node[0])), Collections.emptySet(), Collections.emptySet());
        this.partitionAssignor.assign(metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions));
    }

    @Test
    public void shouldGetAssignmentConfigs() {
        this.createDefaultMockTaskManager();
        Map<String, Object> props = this.configProps();
        props.put("acceptable.recovery.lag", 11);
        props.put("max.warmup.replicas", 33);
        props.put("num.standby.replicas", 44);
        props.put("probing.rebalance.interval.ms", 3300000L);
        this.partitionAssignor.configure(props);
        MatcherAssert.assertThat((Object)this.partitionAssignor.acceptableRecoveryLag(), (Matcher)CoreMatchers.equalTo((Object)11L));
        MatcherAssert.assertThat((Object)this.partitionAssignor.maxWarmupReplicas(), (Matcher)CoreMatchers.equalTo((Object)33));
        MatcherAssert.assertThat((Object)this.partitionAssignor.numStandbyReplicas(), (Matcher)CoreMatchers.equalTo((Object)44));
        MatcherAssert.assertThat((Object)this.partitionAssignor.probingRebalanceIntervalMs(), (Matcher)CoreMatchers.equalTo((Object)3300000L));
    }

    @Test
    public void shouldGetNextProbingRebalanceMs() {
        this.nextScheduledRebalanceMs.set(300000L);
        this.createDefaultMockTaskManager();
        Map<String, Object> props = this.configProps();
        AssignorConfiguration assignorConfiguration = new AssignorConfiguration(props);
        MatcherAssert.assertThat((Object)assignorConfiguration.nextScheduledRebalanceMs().get(), (Matcher)CoreMatchers.equalTo((Object)300000L));
    }

    @Test
    public void shouldGetTime() {
        this.time.setCurrentTimeMs(Long.MAX_VALUE);
        this.createDefaultMockTaskManager();
        Map<String, Object> props = this.configProps();
        AssignorConfiguration assignorConfiguration = new AssignorConfiguration(props);
        MatcherAssert.assertThat((Object)assignorConfiguration.time().milliseconds(), (Matcher)CoreMatchers.equalTo((Object)Long.MAX_VALUE));
    }

    @Test
    public void shouldThrowIllegalStateExceptionIfAnyPartitionsMissingFromChangelogEndOffsets() {
        int changelogNumPartitions = 3;
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store1", false), new String[]{"processor1"});
        this.createMockAdminClient(StreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Collections.singletonList("stream-partition-assignor-test-store1-changelog"), Collections.singletonList(2)));
        this.configureDefault();
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), this.defaultSubscriptionInfo.encode()));
        Assert.assertThrows(IllegalStateException.class, () -> this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)));
    }

    @Test
    public void shouldThrowIllegalStateExceptionIfAnyTopicsMissingFromChangelogEndOffsets() {
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store1", false), new String[]{"processor1"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store2", false), new String[]{"processor1"});
        this.createMockAdminClient(StreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Collections.singletonList("stream-partition-assignor-test-store1-changelog"), Collections.singletonList(3)));
        this.configureDefault();
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), this.defaultSubscriptionInfo.encode()));
        Assert.assertThrows(IllegalStateException.class, () -> this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)));
    }

    @Test
    public void shouldSkipListOffsetsRequestForNewlyCreatedChangelogTopics() {
        this.adminClient = (Admin)EasyMock.createMock(AdminClient.class);
        ListOffsetsResult result = (ListOffsetsResult)EasyMock.createNiceMock(ListOffsetsResult.class);
        KafkaFutureImpl allFuture = new KafkaFutureImpl();
        allFuture.complete(Collections.emptyMap());
        EasyMock.expect((Object)this.adminClient.listOffsets(Collections.emptyMap())).andStubReturn((Object)result);
        EasyMock.expect((Object)result.all()).andReturn((Object)allFuture);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store1", false), new String[]{"processor1"});
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), this.defaultSubscriptionInfo.encode()));
        EasyMock.replay((Object[])new Object[]{result});
        this.configureDefault();
        this.overwriteInternalTopicManagerWithMock(true);
        this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions));
        EasyMock.verify((Object[])new Object[]{this.adminClient});
    }

    @Test
    public void shouldRequestEndOffsetsForPreexistingChangelogs() {
        Set changelogs = Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("stream-partition-assignor-test-store-changelog", 0), new TopicPartition("stream-partition-assignor-test-store-changelog", 1), new TopicPartition("stream-partition-assignor-test-store-changelog", 2)});
        this.adminClient = (Admin)EasyMock.createMock(AdminClient.class);
        ListOffsetsResult result = (ListOffsetsResult)EasyMock.createNiceMock(ListOffsetsResult.class);
        KafkaFutureImpl allFuture = new KafkaFutureImpl();
        allFuture.complete(changelogs.stream().collect(Collectors.toMap(tp -> tp, tp -> {
            ListOffsetsResult.ListOffsetsResultInfo info = (ListOffsetsResult.ListOffsetsResultInfo)EasyMock.createNiceMock(ListOffsetsResult.ListOffsetsResultInfo.class);
            EasyMock.expect((Object)info.offset()).andStubReturn((Object)Long.MAX_VALUE);
            EasyMock.replay((Object[])new Object[]{info});
            return info;
        })));
        Capture capturedChangelogs = EasyMock.newCapture();
        EasyMock.expect((Object)this.adminClient.listOffsets((Map)EasyMock.capture((Capture)capturedChangelogs))).andStubReturn((Object)result);
        EasyMock.expect((Object)result.all()).andReturn((Object)allFuture);
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store", false), new String[]{"processor1"});
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), this.defaultSubscriptionInfo.encode()));
        EasyMock.replay((Object[])new Object[]{result});
        this.configureDefault();
        this.overwriteInternalTopicManagerWithMock(false);
        this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions));
        EasyMock.verify((Object[])new Object[]{this.adminClient});
        MatcherAssert.assertThat(((Map)capturedChangelogs.getValue()).keySet(), (Matcher)CoreMatchers.equalTo((Object)changelogs));
    }

    @Test
    public void shouldRequestCommittedOffsetsForPreexistingSourceChangelogs() {
        Set changelogs = Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition("stream-partition-assignor-test-store-changelog", 0), new TopicPartition("stream-partition-assignor-test-store-changelog", 1), new TopicPartition("stream-partition-assignor-test-store-changelog", 2)});
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table("topic1", Materialized.as((String)"store"));
        this.subscriptions.put("consumer10", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), this.defaultSubscriptionInfo.encode()));
        Consumer consumerClient = (Consumer)EasyMock.createMock(Consumer.class);
        this.createDefaultMockTaskManager();
        EasyMock.expect((Object)this.taskManager.mainConsumer()).andStubReturn((Object)consumerClient);
        this.configurePartitionAssignorWith(Collections.singletonMap("topology.optimization", "all"));
        this.overwriteInternalTopicManagerWithMock(false);
        EasyMock.expect((Object)consumerClient.committed(changelogs)).andStubReturn(changelogs.stream().collect(Collectors.toMap(tp -> tp, tp -> new OffsetAndMetadata(Long.MAX_VALUE))));
        EasyMock.replay((Object[])new Object[]{consumerClient});
        this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions));
        EasyMock.verify((Object[])new Object[]{consumerClient});
    }

    private static ByteBuffer encodeFutureSubscription() {
        ByteBuffer buf = ByteBuffer.allocate(8);
        buf.putInt(8);
        buf.putInt(8);
        return buf;
    }

    private void shouldThrowIfPreVersionProbingSubscriptionAndFutureSubscriptionIsMixed(int oldVersion) {
        this.subscriptions.put(CONSUMER_1, new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), StreamsPartitionAssignorTest.getInfoForOlderVersion(oldVersion, AssignmentTestUtils.UUID_1, AssignmentTestUtils.EMPTY_TASKS, AssignmentTestUtils.EMPTY_TASKS).encode()));
        this.subscriptions.put("future-consumer", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("topic1"), StreamsPartitionAssignorTest.encodeFutureSubscription()));
        this.configureDefault();
        Assert.assertThrows(IllegalStateException.class, () -> this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)));
    }

    private static ConsumerPartitionAssignor.Assignment createAssignment(Map<HostInfo, Set<TopicPartition>> firstHostState) {
        AssignmentInfo info = new AssignmentInfo(7, Collections.emptyList(), Collections.emptyMap(), firstHostState, Collections.emptyMap(), 0);
        return new ConsumerPartitionAssignor.Assignment(Collections.emptyList(), info.encode());
    }

    private static AssignmentInfo checkAssignment(Set<String> expectedTopics, ConsumerPartitionAssignor.Assignment assignment) {
        AssignmentInfo info = AssignmentInfo.decode((ByteBuffer)assignment.userData());
        Assert.assertEquals((long)assignment.partitions().size(), (long)info.activeTasks().size());
        ArrayList<TaskId> activeTasks = new ArrayList<TaskId>();
        HashSet<String> activeTopics = new HashSet<String>();
        for (TopicPartition partition : assignment.partitions()) {
            activeTasks.add(new TaskId(0, partition.partition()));
            activeTopics.add(partition.topic());
        }
        Assert.assertEquals(activeTasks, (Object)info.activeTasks());
        Assert.assertEquals(expectedTopics, activeTopics);
        HashSet<String> standbyTopics = new HashSet<String>();
        for (Map.Entry entry : info.standbyTasks().entrySet()) {
            TaskId id = (TaskId)entry.getKey();
            Set partitions = (Set)entry.getValue();
            for (TopicPartition partition : partitions) {
                Assert.assertEquals((long)id.partition, (long)partition.partition());
                standbyTopics.add(partition.topic());
            }
        }
        if (!info.standbyTasks().isEmpty()) {
            Assert.assertEquals(expectedTopics, standbyTopics);
        }
        return info;
    }

    private static void assertEquivalentAssignment(Map<String, List<TaskId>> thisAssignment, Map<String, List<TaskId>> otherAssignment) {
        Assert.assertEquals((long)thisAssignment.size(), (long)otherAssignment.size());
        for (Map.Entry<String, List<TaskId>> entry : thisAssignment.entrySet()) {
            String consumer = entry.getKey();
            Assert.assertTrue((boolean)otherAssignment.containsKey(consumer));
            List<TaskId> thisTaskList = entry.getValue();
            Collections.sort(thisTaskList);
            List<TaskId> otherTaskList = otherAssignment.get(consumer);
            Collections.sort(otherTaskList);
            MatcherAssert.assertThat(thisTaskList, (Matcher)CoreMatchers.equalTo(otherTaskList));
        }
    }

    private static Map<TopicPartition, Long> getTopicPartitionOffsetsMap(List<String> changelogTopics, List<Integer> topicsNumPartitions) {
        if (changelogTopics.size() != topicsNumPartitions.size()) {
            throw new IllegalStateException("Passed in " + changelogTopics.size() + " changelog topic names, but " + topicsNumPartitions.size() + " different numPartitions for the topics");
        }
        HashMap<TopicPartition, Long> changelogEndOffsets = new HashMap<TopicPartition, Long>();
        for (int i = 0; i < changelogTopics.size(); ++i) {
            String topic = changelogTopics.get(i);
            int numPartitions = topicsNumPartitions.get(i);
            for (int partition = 0; partition < numPartitions; ++partition) {
                changelogEndOffsets.put(new TopicPartition(topic, partition), Long.MAX_VALUE);
            }
        }
        return changelogEndOffsets;
    }

    private static SubscriptionInfo getInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
        return new SubscriptionInfo(7, 7, processId, null, StreamsPartitionAssignorTest.getTaskOffsetSums(prevTasks, standbyTasks));
    }

    private static SubscriptionInfo getInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
        return new SubscriptionInfo(7, 7, processId, userEndPoint, StreamsPartitionAssignorTest.getTaskOffsetSums(prevTasks, standbyTasks));
    }

    private static SubscriptionInfo getInfoForOlderVersion(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
        return new SubscriptionInfo(version, 7, processId, null, StreamsPartitionAssignorTest.getTaskOffsetSums(prevTasks, standbyTasks));
    }

    private static Map<TaskId, Long> getTaskOffsetSums(Collection<TaskId> activeTasks, Collection<TaskId> standbyTasks) {
        Map<TaskId, Long> taskOffsetSums = activeTasks.stream().collect(Collectors.toMap(t -> t, t -> -2L));
        taskOffsetSums.putAll(standbyTasks.stream().collect(Collectors.toMap(t -> t, t -> 0L)));
        return taskOffsetSums;
    }

    private static Map<TaskId, Long> getTaskEndOffsetSums(Collection<TaskId> allStatefulTasks) {
        return allStatefulTasks.stream().collect(Collectors.toMap(t -> t, t -> Long.MAX_VALUE));
    }
}

