/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(value=MockitoJUnitRunner.StrictStubs.class)
public class HighAvailabilityStreamsPartitionAssignorTest {
    private final List<PartitionInfo> infos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]));
    private final Cluster metadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), this.infos, Collections.emptySet(), Collections.emptySet());
    private final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
    private static final String USER_END_POINT = "localhost:8080";
    private static final String APPLICATION_ID = "stream-partition-assignor-test";
    @Mock
    private TaskManager taskManager;
    @Mock
    private Admin adminClient;
    private StreamsConfig streamsConfig = new StreamsConfig(this.configProps());
    private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
    private TopologyMetadata topologyMetadata = new TopologyMetadata(this.builder, this.streamsConfig);
    @Mock
    private StreamsMetadataState streamsMetadataState;
    @Mock
    private Consumer<byte[], byte[]> consumer;
    private final Map<String, ConsumerPartitionAssignor.Subscription> subscriptions = new HashMap<String, ConsumerPartitionAssignor.Subscription>();
    private ReferenceContainer referenceContainer;
    private final MockTime time = new MockTime();

    private Map<String, Object> configProps() {
        HashMap<String, Object> configurationMap = new HashMap<String, Object>();
        configurationMap.put("application.id", APPLICATION_ID);
        configurationMap.put("bootstrap.servers", USER_END_POINT);
        this.referenceContainer = new ReferenceContainer();
        this.referenceContainer.mainConsumer = this.consumer;
        this.referenceContainer.adminClient = this.adminClient;
        this.referenceContainer.taskManager = this.taskManager;
        this.referenceContainer.streamsMetadataState = this.streamsMetadataState;
        this.referenceContainer.time = this.time;
        configurationMap.put("__reference.container.instance__", this.referenceContainer);
        return configurationMap;
    }

    private void configurePartitionAssignorWith(Map<String, Object> props) {
        Map<String, Object> configMap = this.configProps();
        configMap.putAll(props);
        this.streamsConfig = new StreamsConfig(configMap);
        this.topologyMetadata = new TopologyMetadata(this.builder, this.streamsConfig);
        this.partitionAssignor.configure(configMap);
        this.overwriteInternalTopicManagerWithMock();
    }

    private void createMockTaskManager() {
        Mockito.when((Object)this.taskManager.topologyMetadata()).thenReturn((Object)this.topologyMetadata);
        Mockito.when((Object)this.taskManager.processId()).thenReturn((Object)AssignmentTestUtils.PID_1);
        this.topologyMetadata.buildAndRewriteTopology();
    }

    private void overwriteInternalTopicManagerWithMock() {
        MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager((Time)this.time, this.streamsConfig, this.mockClientSupplier.restoreConsumer, false);
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)mockInternalTopicManager);
    }

    @Before
    public void setUp() {
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(AssignmentTestUtils.EMPTY_CHANGELOG_END_OFFSETS);
    }

    @Test
    public void shouldReturnAllActiveTasksToPreviousOwnerRegardlessOfBalanceAndTriggerRebalanceIfEndOffsetFetchFailsAndHighAvailabilityEnabled() {
        long rebalanceInterval = 300000L;
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store1", false), new String[]{"processor1"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        this.createMockTaskManager();
        Mockito.when((Object)this.adminClient.listOffsets((Map)ArgumentMatchers.any())).thenThrow(new Throwable[]{new StreamsException("Should be handled")});
        this.configurePartitionAssignorWith(Collections.singletonMap("probing.rebalance.interval.ms", 300000L));
        String firstConsumer = "consumer1";
        String newConsumer = "consumer2";
        this.subscriptions.put("consumer1", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("source1"), HighAvailabilityStreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.PID_1, allTasks).encode()));
        this.subscriptions.put("consumer2", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("source1"), HighAvailabilityStreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.PID_2, AssignmentTestUtils.EMPTY_TASKS).encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        AssignmentInfo firstConsumerUserData = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer1")).userData());
        List firstConsumerActiveTasks = firstConsumerUserData.activeTasks();
        AssignmentInfo newConsumerUserData = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer2")).userData());
        List newConsumerActiveTasks = newConsumerUserData.activeTasks();
        ArrayList sortedExpectedTasks = new ArrayList(allTasks);
        Collections.sort(sortedExpectedTasks);
        MatcherAssert.assertThat((Object)firstConsumerActiveTasks, (Matcher)CoreMatchers.equalTo(sortedExpectedTasks));
        MatcherAssert.assertThat((Object)newConsumerActiveTasks, (Matcher)Matchers.empty());
        MatcherAssert.assertThat((Object)(this.time.milliseconds() + 300000L), (Matcher)Matchers.anyOf((Matcher)Matchers.is((Object)firstConsumerUserData.nextRebalanceMs()), (Matcher)Matchers.is((Object)newConsumerUserData.nextRebalanceMs())));
    }

    @Test
    public void shouldScheduleProbingRebalanceOnThisClientIfWarmupTasksRequired() {
        long rebalanceInterval = 300000L;
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockApiProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store1", false), new String[]{"processor1"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        this.createMockTaskManager();
        this.adminClient = AssignmentTestUtils.createMockAdminClientForAssignor(HighAvailabilityStreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Collections.singletonList("stream-partition-assignor-test-store1-changelog"), Collections.singletonList(3)));
        this.configurePartitionAssignorWith(Collections.singletonMap("probing.rebalance.interval.ms", 300000L));
        String firstConsumer = "consumer1";
        String newConsumer = "consumer2";
        this.subscriptions.put("consumer1", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("source1"), HighAvailabilityStreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.PID_1, allTasks).encode()));
        this.subscriptions.put("consumer2", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("source1"), HighAvailabilityStreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.PID_2, AssignmentTestUtils.EMPTY_TASKS).encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        List firstConsumerActiveTasks = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer1")).userData()).activeTasks();
        List newConsumerActiveTasks = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer2")).userData()).activeTasks();
        ArrayList sortedExpectedTasks = new ArrayList(allTasks);
        Collections.sort(sortedExpectedTasks);
        MatcherAssert.assertThat((Object)firstConsumerActiveTasks, (Matcher)CoreMatchers.equalTo(sortedExpectedTasks));
        MatcherAssert.assertThat((Object)newConsumerActiveTasks, (Matcher)Matchers.empty());
        MatcherAssert.assertThat((Object)this.referenceContainer.assignmentErrorCode.get(), (Matcher)CoreMatchers.equalTo((Object)AssignorError.NONE.code()));
        long nextScheduledRebalanceOnThisClient = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer1")).userData()).nextRebalanceMs();
        long nextScheduledRebalanceOnOtherClient = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer2")).userData()).nextRebalanceMs();
        MatcherAssert.assertThat((Object)nextScheduledRebalanceOnThisClient, (Matcher)CoreMatchers.equalTo((Object)(this.time.milliseconds() + 300000L)));
        MatcherAssert.assertThat((Object)nextScheduledRebalanceOnOtherClient, (Matcher)CoreMatchers.equalTo((Object)Long.MAX_VALUE));
    }

    private static Map<TopicPartition, Long> getTopicPartitionOffsetsMap(List<String> changelogTopics, List<Integer> topicsNumPartitions) {
        if (changelogTopics.size() != topicsNumPartitions.size()) {
            throw new IllegalStateException("Passed in " + changelogTopics.size() + " changelog topic names, but " + topicsNumPartitions.size() + " different numPartitions for the topics");
        }
        HashMap<TopicPartition, Long> changelogEndOffsets = new HashMap<TopicPartition, Long>();
        for (int i = 0; i < changelogTopics.size(); ++i) {
            String topic = changelogTopics.get(i);
            int numPartitions = topicsNumPartitions.get(i);
            for (int partition = 0; partition < numPartitions; ++partition) {
                changelogEndOffsets.put(new TopicPartition(topic, partition), Long.MAX_VALUE);
            }
        }
        return changelogEndOffsets;
    }

    private static SubscriptionInfo getInfo(ProcessId processId, Set<TaskId> prevTasks) {
        return new SubscriptionInfo(11, 11, processId, null, HighAvailabilityStreamsPartitionAssignorTest.getTaskOffsetSums(prevTasks), 0, 0, Collections.emptyMap());
    }

    private static Map<TaskId, Long> getTaskOffsetSums(Set<TaskId> activeTasks) {
        Map<TaskId, Long> taskOffsetSums = activeTasks.stream().collect(Collectors.toMap(t -> t, t -> -2L));
        taskOffsetSums.putAll(AssignmentTestUtils.EMPTY_TASKS.stream().collect(Collectors.toMap(t -> t, t -> 0L)));
        return taskOffsetSums;
    }
}

