/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.TaskManager;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.MockProcessorSupplier;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;

public class HighAvailabilityStreamsPartitionAssignorTest {
    private final List<PartitionInfo> infos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]));
    private final Cluster metadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), this.infos, Collections.emptySet(), Collections.emptySet());
    private final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
    private static final String USER_END_POINT = "localhost:8080";
    private static final String APPLICATION_ID = "stream-partition-assignor-test";
    private TaskManager taskManager;
    private Admin adminClient;
    private StreamsConfig streamsConfig = new StreamsConfig(this.configProps());
    private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
    private final StreamsMetadataState streamsMetadataState = (StreamsMetadataState)EasyMock.createNiceMock(StreamsMetadataState.class);
    private final Map<String, ConsumerPartitionAssignor.Subscription> subscriptions = new HashMap<String, ConsumerPartitionAssignor.Subscription>();
    private final AtomicInteger assignmentError = new AtomicInteger();
    private final AtomicLong nextProbingRebalanceMs = new AtomicLong(Long.MAX_VALUE);
    private final MockTime time = new MockTime();

    private Map<String, Object> configProps() {
        HashMap<String, Object> configurationMap = new HashMap<String, Object>();
        configurationMap.put("application.id", APPLICATION_ID);
        configurationMap.put("bootstrap.servers", USER_END_POINT);
        configurationMap.put("__task.manager.instance__", this.taskManager);
        configurationMap.put("__streams.metadata.state.instance__", this.streamsMetadataState);
        configurationMap.put("__streams.admin.client.instance__", this.adminClient);
        configurationMap.put("__assignment.error.code__", this.assignmentError);
        configurationMap.put("__next.probing.rebalance.ms__", this.nextProbingRebalanceMs);
        configurationMap.put("__time__", this.time);
        configurationMap.put("internal.task.assignor.class", HighAvailabilityTaskAssignor.class.getName());
        return configurationMap;
    }

    private void configurePartitionAssignorWith(Map<String, Object> props) {
        Map<String, Object> configMap = this.configProps();
        configMap.putAll(props);
        this.streamsConfig = new StreamsConfig(configMap);
        this.partitionAssignor.configure(configMap);
        EasyMock.replay((Object[])new Object[]{this.taskManager, this.adminClient});
        this.overwriteInternalTopicManagerWithMock();
    }

    private void createMockTaskManager(Set<TaskId> activeTasks) {
        this.createMockTaskManager(HighAvailabilityStreamsPartitionAssignorTest.getTaskOffsetSums(activeTasks));
    }

    private void createMockTaskManager(Map<TaskId, Long> taskOffsetSums) {
        this.taskManager = (TaskManager)EasyMock.createNiceMock(TaskManager.class);
        EasyMock.expect((Object)this.taskManager.builder()).andReturn((Object)this.builder).anyTimes();
        EasyMock.expect((Object)this.taskManager.getTaskOffsetSums()).andReturn(taskOffsetSums).anyTimes();
        EasyMock.expect((Object)this.taskManager.processId()).andReturn((Object)AssignmentTestUtils.UUID_1).anyTimes();
        this.builder.setApplicationId(APPLICATION_ID);
        this.builder.buildTopology();
    }

    private void createMockAdminClient(Map<TopicPartition, Long> changelogEndOffsets) {
        this.adminClient = (Admin)EasyMock.createMock(AdminClient.class);
        ListOffsetsResult result = (ListOffsetsResult)EasyMock.createNiceMock(ListOffsetsResult.class);
        KafkaFutureImpl allFuture = new KafkaFutureImpl();
        allFuture.complete(changelogEndOffsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, t -> {
            ListOffsetsResult.ListOffsetsResultInfo info = (ListOffsetsResult.ListOffsetsResultInfo)EasyMock.createNiceMock(ListOffsetsResult.ListOffsetsResultInfo.class);
            EasyMock.expect((Object)info.offset()).andStubReturn(t.getValue());
            EasyMock.replay((Object[])new Object[]{info});
            return info;
        })));
        EasyMock.expect((Object)this.adminClient.listOffsets((Map)EasyMock.anyObject())).andStubReturn((Object)result);
        EasyMock.expect((Object)result.all()).andReturn((Object)allFuture);
        EasyMock.replay((Object[])new Object[]{result});
    }

    private void overwriteInternalTopicManagerWithMock() {
        MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(this.streamsConfig, this.mockClientSupplier.restoreConsumer, false);
        this.partitionAssignor.setInternalTopicManager((InternalTopicManager)mockInternalTopicManager);
    }

    @Before
    public void setUp() {
        this.createMockAdminClient(AssignmentTestUtils.EMPTY_CHANGELOG_END_OFFSETS);
    }

    @Test
    public void shouldReturnAllActiveTasksToPreviousOwnerRegardlessOfBalanceAndTriggerRebalanceIfEndOffsetFetchFailsAndHighAvailabilityEnabled() {
        long rebalanceInterval = 300000L;
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store1", false), new String[]{"processor1"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        this.createMockTaskManager(allTasks);
        this.adminClient = (Admin)EasyMock.createMock(AdminClient.class);
        EasyMock.expect((Object)this.adminClient.listOffsets((Map)EasyMock.anyObject())).andThrow((Throwable)new StreamsException("Should be handled"));
        this.configurePartitionAssignorWith(Collections.singletonMap("probing.rebalance.interval.ms", 300000L));
        String firstConsumer = "consumer1";
        String newConsumer = "consumer2";
        this.subscriptions.put("consumer1", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("source1"), HighAvailabilityStreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_1, allTasks).encode()));
        this.subscriptions.put("consumer2", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("source1"), HighAvailabilityStreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_2, AssignmentTestUtils.EMPTY_TASKS).encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        AssignmentInfo firstConsumerUserData = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer1")).userData());
        List firstConsumerActiveTasks = firstConsumerUserData.activeTasks();
        AssignmentInfo newConsumerUserData = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer2")).userData());
        List newConsumerActiveTasks = newConsumerUserData.activeTasks();
        MatcherAssert.assertThat((Object)firstConsumerActiveTasks, (Matcher)CoreMatchers.equalTo(new ArrayList(allTasks)));
        MatcherAssert.assertThat((Object)newConsumerActiveTasks, (Matcher)Matchers.empty());
        MatcherAssert.assertThat((Object)(this.time.milliseconds() + 300000L), (Matcher)Matchers.anyOf((Matcher)Matchers.is((Object)firstConsumerUserData.nextRebalanceMs()), (Matcher)Matchers.is((Object)newConsumerUserData.nextRebalanceMs())));
    }

    @Test
    public void shouldScheduleProbingRebalanceOnThisClientIfWarmupTasksRequired() {
        long rebalanceInterval = 300000L;
        this.builder.addSource(null, "source1", null, null, null, new String[]{"topic1"});
        this.builder.addProcessor("processor1", new MockProcessorSupplier(), new String[]{"source1"});
        this.builder.addStateStore((StoreBuilder)new MockKeyValueStoreBuilder("store1", false), new String[]{"processor1"});
        Set allTasks = Utils.mkSet((Object[])new TaskId[]{AssignmentTestUtils.TASK_0_0, AssignmentTestUtils.TASK_0_1, AssignmentTestUtils.TASK_0_2});
        this.createMockTaskManager(allTasks);
        this.createMockAdminClient(HighAvailabilityStreamsPartitionAssignorTest.getTopicPartitionOffsetsMap(Collections.singletonList("stream-partition-assignor-test-store1-changelog"), Collections.singletonList(3)));
        this.configurePartitionAssignorWith(Collections.singletonMap("probing.rebalance.interval.ms", 300000L));
        String firstConsumer = "consumer1";
        String newConsumer = "consumer2";
        this.subscriptions.put("consumer1", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("source1"), HighAvailabilityStreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_1, allTasks).encode()));
        this.subscriptions.put("consumer2", new ConsumerPartitionAssignor.Subscription(Collections.singletonList("source1"), HighAvailabilityStreamsPartitionAssignorTest.getInfo(AssignmentTestUtils.UUID_2, AssignmentTestUtils.EMPTY_TASKS).encode()));
        Map assignments = this.partitionAssignor.assign(this.metadata, new ConsumerPartitionAssignor.GroupSubscription(this.subscriptions)).groupAssignment();
        List firstConsumerActiveTasks = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer1")).userData()).activeTasks();
        List newConsumerActiveTasks = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer2")).userData()).activeTasks();
        MatcherAssert.assertThat((Object)firstConsumerActiveTasks, (Matcher)CoreMatchers.equalTo(new ArrayList(allTasks)));
        MatcherAssert.assertThat((Object)newConsumerActiveTasks, (Matcher)Matchers.empty());
        MatcherAssert.assertThat((Object)this.assignmentError.get(), (Matcher)CoreMatchers.equalTo((Object)AssignorError.NONE.code()));
        long nextScheduledRebalanceOnThisClient = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer1")).userData()).nextRebalanceMs();
        long nextScheduledRebalanceOnOtherClient = AssignmentInfo.decode((ByteBuffer)((ConsumerPartitionAssignor.Assignment)assignments.get("consumer2")).userData()).nextRebalanceMs();
        MatcherAssert.assertThat((Object)nextScheduledRebalanceOnThisClient, (Matcher)CoreMatchers.equalTo((Object)(this.time.milliseconds() + 300000L)));
        MatcherAssert.assertThat((Object)nextScheduledRebalanceOnOtherClient, (Matcher)CoreMatchers.equalTo((Object)Long.MAX_VALUE));
    }

    private static Map<TopicPartition, Long> getTopicPartitionOffsetsMap(List<String> changelogTopics, List<Integer> topicsNumPartitions) {
        if (changelogTopics.size() != topicsNumPartitions.size()) {
            throw new IllegalStateException("Passed in " + changelogTopics.size() + " changelog topic names, but " + topicsNumPartitions.size() + " different numPartitions for the topics");
        }
        HashMap<TopicPartition, Long> changelogEndOffsets = new HashMap<TopicPartition, Long>();
        for (int i = 0; i < changelogTopics.size(); ++i) {
            String topic = changelogTopics.get(i);
            int numPartitions = topicsNumPartitions.get(i);
            for (int partition = 0; partition < numPartitions; ++partition) {
                changelogEndOffsets.put(new TopicPartition(topic, partition), Long.MAX_VALUE);
            }
        }
        return changelogEndOffsets;
    }

    private static SubscriptionInfo getInfo(UUID processId, Set<TaskId> prevTasks) {
        return new SubscriptionInfo(8, 8, processId, null, HighAvailabilityStreamsPartitionAssignorTest.getTaskOffsetSums(prevTasks), 0);
    }

    private static Map<TaskId, Long> getTaskOffsetSums(Set<TaskId> activeTasks) {
        Map<TaskId, Long> taskOffsetSums = activeTasks.stream().collect(Collectors.toMap(t -> t, t -> -2L));
        taskOffsetSums.putAll(AssignmentTestUtils.EMPTY_TASKS.stream().collect(Collectors.toMap(t -> t, t -> 0L)));
        return taskOffsetSums;
    }
}

