/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.easymock.EasyMock;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;

public final class AssignmentTestUtils {
    public static final UUID UUID_1 = AssignmentTestUtils.uuidForInt(1);
    public static final UUID UUID_2 = AssignmentTestUtils.uuidForInt(2);
    public static final UUID UUID_3 = AssignmentTestUtils.uuidForInt(3);
    public static final UUID UUID_4 = AssignmentTestUtils.uuidForInt(4);
    public static final UUID UUID_5 = AssignmentTestUtils.uuidForInt(5);
    public static final UUID UUID_6 = AssignmentTestUtils.uuidForInt(6);
    public static final TopicPartition TP_0_0 = new TopicPartition("topic0", 0);
    public static final TopicPartition TP_0_1 = new TopicPartition("topic0", 1);
    public static final TopicPartition TP_0_2 = new TopicPartition("topic0", 2);
    public static final TopicPartition TP_1_0 = new TopicPartition("topic1", 0);
    public static final TopicPartition TP_1_1 = new TopicPartition("topic1", 1);
    public static final TopicPartition TP_1_2 = new TopicPartition("topic1", 2);
    public static final TaskId TASK_0_0 = new TaskId(0, 0);
    public static final TaskId TASK_0_1 = new TaskId(0, 1);
    public static final TaskId TASK_0_2 = new TaskId(0, 2);
    public static final TaskId TASK_0_3 = new TaskId(0, 3);
    public static final TaskId TASK_0_4 = new TaskId(0, 4);
    public static final TaskId TASK_0_5 = new TaskId(0, 5);
    public static final TaskId TASK_0_6 = new TaskId(0, 6);
    public static final TaskId TASK_1_0 = new TaskId(1, 0);
    public static final TaskId TASK_1_1 = new TaskId(1, 1);
    public static final TaskId TASK_1_2 = new TaskId(1, 2);
    public static final TaskId TASK_1_3 = new TaskId(1, 3);
    public static final TaskId TASK_2_0 = new TaskId(2, 0);
    public static final TaskId TASK_2_1 = new TaskId(2, 1);
    public static final TaskId TASK_2_2 = new TaskId(2, 2);
    public static final TaskId TASK_2_3 = new TaskId(2, 3);
    public static final TaskId NAMED_TASK_T0_0_0 = new TaskId(0, 0, "topology0");
    public static final TaskId NAMED_TASK_T0_0_1 = new TaskId(0, 1, "topology0");
    public static final TaskId NAMED_TASK_T0_1_0 = new TaskId(1, 0, "topology0");
    public static final TaskId NAMED_TASK_T0_1_1 = new TaskId(1, 1, "topology0");
    public static final TaskId NAMED_TASK_T1_0_0 = new TaskId(0, 0, "topology1");
    public static final TaskId NAMED_TASK_T1_0_1 = new TaskId(0, 1, "topology1");
    public static final TaskId NAMED_TASK_T2_0_0 = new TaskId(0, 0, "topology2");
    public static final TaskId NAMED_TASK_T2_2_0 = new TaskId(2, 0, "topology2");
    public static final TopologyMetadata.Subtopology SUBTOPOLOGY_0 = new TopologyMetadata.Subtopology(0, null);
    public static final TopologyMetadata.Subtopology SUBTOPOLOGY_1 = new TopologyMetadata.Subtopology(1, null);
    public static final TopologyMetadata.Subtopology SUBTOPOLOGY_2 = new TopologyMetadata.Subtopology(2, null);
    public static final Set<TaskId> EMPTY_TASKS = Collections.emptySet();
    public static final Map<TopicPartition, Long> EMPTY_CHANGELOG_END_OFFSETS = new HashMap<TopicPartition, Long>();
    public static final List<String> EMPTY_RACK_AWARE_ASSIGNMENT_TAGS = Collections.emptyList();
    public static final Map<String, String> EMPTY_CLIENT_TAGS = Collections.emptyMap();

    private AssignmentTestUtils() {
    }

    static Map<UUID, ClientState> getClientStatesMap(ClientState ... states) {
        HashMap<UUID, ClientState> clientStates = new HashMap<UUID, ClientState>();
        int nthState = 1;
        for (ClientState state : states) {
            clientStates.put(AssignmentTestUtils.uuidForInt(nthState), state);
            ++nthState;
        }
        return clientStates;
    }

    public static AdminClient createMockAdminClientForAssignor(Map<TopicPartition, Long> changelogEndOffsets) {
        AdminClient adminClient = (AdminClient)EasyMock.createMock(AdminClient.class);
        ListOffsetsResult result = (ListOffsetsResult)EasyMock.createNiceMock(ListOffsetsResult.class);
        KafkaFutureImpl allFuture = new KafkaFutureImpl();
        allFuture.complete(changelogEndOffsets.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, t -> {
            ListOffsetsResult.ListOffsetsResultInfo info = (ListOffsetsResult.ListOffsetsResultInfo)EasyMock.createNiceMock(ListOffsetsResult.ListOffsetsResultInfo.class);
            EasyMock.expect((Object)info.offset()).andStubReturn(t.getValue());
            EasyMock.replay((Object[])new Object[]{info});
            return info;
        })));
        EasyMock.expect((Object)adminClient.listOffsets((Map)EasyMock.anyObject())).andStubReturn((Object)result);
        EasyMock.expect((Object)result.all()).andStubReturn((Object)allFuture);
        EasyMock.replay((Object[])new Object[]{result});
        return adminClient;
    }

    public static SubscriptionInfo getInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
        return new SubscriptionInfo(11, 11, processId, null, AssignmentTestUtils.getTaskOffsetSums(prevTasks, standbyTasks), 0, 0, EMPTY_CLIENT_TAGS);
    }

    public static SubscriptionInfo getInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
        return new SubscriptionInfo(11, 11, processId, userEndPoint, AssignmentTestUtils.getTaskOffsetSums(prevTasks, standbyTasks), 0, 0, EMPTY_CLIENT_TAGS);
    }

    public static SubscriptionInfo getInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, byte uniqueField) {
        return new SubscriptionInfo(11, 11, processId, null, AssignmentTestUtils.getTaskOffsetSums(prevTasks, standbyTasks), uniqueField, 0, EMPTY_CLIENT_TAGS);
    }

    public static SubscriptionInfo getInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, byte uniqueField, Map<String, String> clientTags) {
        return new SubscriptionInfo(11, 11, processId, null, AssignmentTestUtils.getTaskOffsetSums(prevTasks, standbyTasks), uniqueField, 0, clientTags);
    }

    private static Map<TaskId, Long> getTaskOffsetSums(Collection<TaskId> activeTasks, Collection<TaskId> standbyTasks) {
        Map<TaskId, Long> taskOffsetSums = activeTasks.stream().collect(Collectors.toMap(t -> t, t -> -2L));
        taskOffsetSums.putAll(standbyTasks.stream().collect(Collectors.toMap(t -> t, t -> 0L)));
        return taskOffsetSums;
    }

    public static UUID uuidForInt(int n) {
        return new UUID(0L, n);
    }

    static void assertValidAssignment(int numStandbyReplicas, Set<TaskId> statefulTasks, Set<TaskId> statelessTasks, Map<UUID, ClientState> assignedStates, StringBuilder failureContext) {
        AssignmentTestUtils.assertValidAssignment(numStandbyReplicas, 0, statefulTasks, statelessTasks, assignedStates, failureContext);
    }

    static void assertValidAssignment(int numStandbyReplicas, int maxWarmupReplicas, Set<TaskId> statefulTasks, Set<TaskId> statelessTasks, Map<UUID, ClientState> assignedStates, StringBuilder failureContext) {
        TreeMap<TaskId, Set<UUID>> assignments = new TreeMap<TaskId, Set<UUID>>();
        for (TaskId taskId : statefulTasks) {
            assignments.put(taskId, new TreeSet());
        }
        for (TaskId taskId : statelessTasks) {
            assignments.put(taskId, new TreeSet());
        }
        for (Map.Entry entry2 : assignedStates.entrySet()) {
            AssignmentTestUtils.validateAndAddActiveAssignments(statefulTasks, statelessTasks, failureContext, assignments, entry2);
            AssignmentTestUtils.validateAndAddStandbyAssignments(statefulTasks, statelessTasks, failureContext, assignments, entry2);
        }
        AtomicInteger remainingWarmups = new AtomicInteger(maxWarmupReplicas);
        TreeMap treeMap = (TreeMap)assignments.entrySet().stream().filter(entry -> {
            boolean expectedActives = true;
            boolean isStateless = statelessTasks.contains(entry.getKey());
            int expectedStandbys = isStateless ? 0 : numStandbyReplicas;
            int expectedAssignments = Math.min(assignedStates.size(), 1 + expectedStandbys);
            int actualAssignments = ((Set)entry.getValue()).size();
            if (actualAssignments == expectedAssignments) {
                return false;
            }
            if (actualAssignments == expectedAssignments + 1 && remainingWarmups.get() > 0) {
                remainingWarmups.getAndDecrement();
                return false;
            }
            return true;
        }).collect(Utils.entriesToMap(TreeMap::new));
        if (!treeMap.isEmpty()) {
            MatcherAssert.assertThat((String)("Found some over- or under-assigned tasks in the final assignment with " + numStandbyReplicas + " and max warmups " + maxWarmupReplicas + " standby replicas, stateful tasks:" + statefulTasks + ", and stateless tasks:" + statelessTasks + failureContext), (Object)treeMap, (Matcher)Matchers.is(Collections.emptyMap()));
        }
    }

    private static void validateAndAddStandbyAssignments(Set<TaskId> statefulTasks, Set<TaskId> statelessTasks, StringBuilder failureContext, Map<TaskId, Set<UUID>> assignments, Map.Entry<UUID, ClientState> entry) {
        for (TaskId standbyTask : entry.getValue().standbyTasks()) {
            if (statelessTasks.contains(standbyTask)) {
                throw new AssertionError((Object)("Found a standby task for stateless task " + standbyTask + " on client " + entry + " stateless tasks:" + statelessTasks + failureContext));
            }
            if (assignments.containsKey(standbyTask)) {
                assignments.get(standbyTask).add(entry.getKey());
                continue;
            }
            throw new AssertionError((Object)("Found an extra standby task " + standbyTask + " on client " + entry + " but expected stateful tasks:" + statefulTasks + failureContext));
        }
    }

    private static void validateAndAddActiveAssignments(Set<TaskId> statefulTasks, Set<TaskId> statelessTasks, StringBuilder failureContext, Map<TaskId, Set<UUID>> assignments, Map.Entry<UUID, ClientState> entry) {
        for (TaskId activeTask : entry.getValue().activeTasks()) {
            if (assignments.containsKey(activeTask)) {
                assignments.get(activeTask).add(entry.getKey());
                continue;
            }
            throw new AssertionError((Object)("Found an extra active task " + activeTask + " on client " + entry + " but expected stateful tasks:" + statefulTasks + " and stateless tasks:" + statelessTasks + failureContext));
        }
    }

    static void assertBalancedStatefulAssignment(Set<TaskId> allStatefulTasks, Map<UUID, ClientState> clientStates, StringBuilder failureContext) {
        double maxStateful = Double.MIN_VALUE;
        double minStateful = Double.MAX_VALUE;
        for (ClientState clientState : clientStates.values()) {
            Set statefulTasks = Utils.intersection(HashSet::new, (Set)clientState.assignedTasks(), (Set[])new Set[]{allStatefulTasks});
            double statefulTaskLoad = 1.0 * (double)statefulTasks.size() / (double)clientState.capacity();
            maxStateful = Math.max(maxStateful, statefulTaskLoad);
            minStateful = Math.min(minStateful, statefulTaskLoad);
        }
        double statefulDiff = maxStateful - minStateful;
        if (statefulDiff > 1.0) {
            StringBuilder builder = new StringBuilder().append("detected a stateful assignment balance factor violation: ").append(statefulDiff).append(">").append(1.0).append(" in: ");
            AssignmentTestUtils.appendClientStates(builder, clientStates);
            Assert.fail((String)builder.append((CharSequence)failureContext).toString());
        }
    }

    static void assertBalancedActiveAssignment(Map<UUID, ClientState> clientStates, StringBuilder failureContext) {
        double maxActive = Double.MIN_VALUE;
        double minActive = Double.MAX_VALUE;
        for (ClientState clientState : clientStates.values()) {
            double activeTaskLoad = clientState.activeTaskLoad();
            maxActive = Math.max(maxActive, activeTaskLoad);
            minActive = Math.min(minActive, activeTaskLoad);
        }
        double activeDiff = maxActive - minActive;
        if (activeDiff > 1.0) {
            StringBuilder builder = new StringBuilder().append("detected an active assignment balance factor violation: ").append(activeDiff).append(">").append(1.0).append(" in: ");
            AssignmentTestUtils.appendClientStates(builder, clientStates);
            Assert.fail((String)builder.append((CharSequence)failureContext).toString());
        }
    }

    static void assertBalancedTasks(Map<UUID, ClientState> clientStates) {
        TaskSkewReport taskSkewReport = AssignmentTestUtils.analyzeTaskAssignmentBalance(clientStates);
        if (taskSkewReport.totalSkewedTasks() > 0) {
            Assert.fail((String)("Expected a balanced task assignment, but was: " + taskSkewReport));
        }
    }

    static TaskSkewReport analyzeTaskAssignmentBalance(Map<UUID, ClientState> clientStates) {
        Function<Integer, Map> initialClientCounts = i -> clientStates.keySet().stream().collect(Collectors.toMap(c -> c, c -> new AtomicInteger(0)));
        TreeMap<Integer, Map> subtopologyToClientsWithPartition = new TreeMap<Integer, Map>();
        for (Map.Entry<UUID, ClientState> entry : clientStates.entrySet()) {
            UUID client = entry.getKey();
            ClientState clientState = entry.getValue();
            for (TaskId task : clientState.activeTasks()) {
                int subtopology = task.subtopology();
                ((AtomicInteger)subtopologyToClientsWithPartition.computeIfAbsent(subtopology, initialClientCounts).get(client)).incrementAndGet();
            }
        }
        int maxTaskSkew = 0;
        TreeSet skewedSubtopologies = new TreeSet();
        for (Map.Entry entry : subtopologyToClientsWithPartition.entrySet()) {
            Map clientsWithPartition = (Map)entry.getValue();
            int max = Integer.MIN_VALUE;
            int min = Integer.MAX_VALUE;
            for (AtomicInteger count : clientsWithPartition.values()) {
                max = Math.max(max, count.get());
                min = Math.min(min, count.get());
            }
            int taskSkew = max - min;
            maxTaskSkew = Math.max(maxTaskSkew, taskSkew);
            if (taskSkew <= 1) continue;
            skewedSubtopologies.add(entry.getKey());
        }
        return new TaskSkewReport(maxTaskSkew, skewedSubtopologies, subtopologyToClientsWithPartition);
    }

    static Matcher<ClientState> hasAssignedTasks(int taskCount) {
        return AssignmentTestUtils.hasProperty("assignedTasks", ClientState::assignedTaskCount, taskCount);
    }

    static Matcher<ClientState> hasActiveTasks(int taskCount) {
        return AssignmentTestUtils.hasProperty("activeTasks", ClientState::activeTaskCount, taskCount);
    }

    static Matcher<ClientState> hasStandbyTasks(int taskCount) {
        return AssignmentTestUtils.hasProperty("standbyTasks", ClientState::standbyTaskCount, taskCount);
    }

    static <V> Matcher<ClientState> hasProperty(final String propertyName, final Function<ClientState, V> propertyExtractor, final V propertyValue) {
        return new BaseMatcher<ClientState>(){

            public void describeTo(Description description) {
                description.appendText(propertyName).appendText(":").appendValue(propertyValue);
            }

            public boolean matches(Object actual) {
                if (actual instanceof ClientState) {
                    return Objects.equals(propertyExtractor.apply((ClientState)actual), propertyValue);
                }
                return false;
            }
        };
    }

    static void appendClientStates(StringBuilder stringBuilder, Map<UUID, ClientState> clientStates) {
        stringBuilder.append('{').append('\n');
        for (Map.Entry<UUID, ClientState> entry : clientStates.entrySet()) {
            stringBuilder.append("  ").append(entry.getKey()).append(": ").append(entry.getValue()).append('\n');
        }
        stringBuilder.append('}').append('\n');
    }

    static final class TaskSkewReport {
        private final int maxTaskSkew;
        private final Set<Integer> skewedSubtopologies;
        private final Map<Integer, Map<UUID, AtomicInteger>> subtopologyToClientsWithPartition;

        private TaskSkewReport(int maxTaskSkew, Set<Integer> skewedSubtopologies, Map<Integer, Map<UUID, AtomicInteger>> subtopologyToClientsWithPartition) {
            this.maxTaskSkew = maxTaskSkew;
            this.skewedSubtopologies = skewedSubtopologies;
            this.subtopologyToClientsWithPartition = subtopologyToClientsWithPartition;
        }

        int totalSkewedTasks() {
            return this.skewedSubtopologies.size();
        }

        Set<Integer> skewedSubtopologies() {
            return this.skewedSubtopologies;
        }

        public String toString() {
            return "TaskSkewReport{maxTaskSkew=" + this.maxTaskSkew + ", skewedSubtopologies=" + this.skewedSubtopologies + ", subtopologyToClientsWithPartition=" + this.subtopologyToClientsWithPartition + '}';
        }
    }
}

