package org.apache.kafka.streams.processor.assignment;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.KafkaStreamsAssignment;
import org.apache.kafka.streams.processor.assignment.TaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.ConstrainedPrioritySet;
import org.apache.kafka.streams.processor.internals.assignment.Graph;
import org.apache.kafka.streams.processor.internals.assignment.MinTrafficGraphConstructor;
import org.apache.kafka.streams.processor.internals.assignment.RackAwareGraphConstructor;
import org.apache.kafka.streams.processor.internals.assignment.RackAwareGraphConstructorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils.class */
public final class TaskAssignmentUtils {
    private static final Logger LOG = LoggerFactory.getLogger(TaskAssignmentUtils.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils$AssignmentGraph.class */
    public static final class AssignmentGraph {
        public final Graph<Integer> graph;
        public final Map<TaskId, ProcessId> clientByTask;
        public final Map<ProcessId, Integer> taskCountByClient;

        public AssignmentGraph(Graph<Integer> graph, Map<TaskId, ProcessId> map, Map<ProcessId, Integer> map2) {
            this.graph = graph;
            this.clientByTask = map;
            this.taskCountByClient = map2;
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils$MoveStandbyTaskPredicate.class */
    public interface MoveStandbyTaskPredicate {
        boolean canMoveStandbyTask(KafkaStreamsState kafkaStreamsState, KafkaStreamsState kafkaStreamsState2, TaskId taskId, Map<ProcessId, KafkaStreamsAssignment> map);
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils$RackAwareOptimizationParams.class */
    public static final class RackAwareOptimizationParams {
        private final ApplicationState applicationState;
        private final Optional<Integer> trafficCostOverride;
        private final Optional<Integer> nonOverlapCostOverride;
        private final Optional<SortedSet<TaskId>> tasksToOptimize;

        private RackAwareOptimizationParams(ApplicationState applicationState, Optional<Integer> optional, Optional<Integer> optional2, Optional<SortedSet<TaskId>> optional3) {
            this.applicationState = applicationState;
            this.trafficCostOverride = optional;
            this.nonOverlapCostOverride = optional2;
            this.tasksToOptimize = optional3;
        }

        public static RackAwareOptimizationParams of(ApplicationState applicationState) {
            return new RackAwareOptimizationParams(applicationState, Optional.empty(), Optional.empty(), Optional.empty());
        }

        public RackAwareOptimizationParams forStatefulTasks() {
            return forTasks((SortedSet) this.applicationState.allTasks().values().stream().filter((v0) -> {
                return v0.isStateful();
            }).map((v0) -> {
                return v0.id();
            }).collect(Collectors.toCollection(TreeSet::new)));
        }

        public RackAwareOptimizationParams forStatelessTasks() {
            return forTasks((SortedSet) this.applicationState.allTasks().values().stream().filter(taskInfo -> {
                return !taskInfo.isStateful();
            }).map((v0) -> {
                return v0.id();
            }).collect(Collectors.toCollection(TreeSet::new)));
        }

        public RackAwareOptimizationParams forTasks(SortedSet<TaskId> sortedSet) {
            return new RackAwareOptimizationParams(this.applicationState, this.trafficCostOverride, this.nonOverlapCostOverride, Optional.of(sortedSet));
        }

        public RackAwareOptimizationParams withTrafficCostOverride(int i) {
            return new RackAwareOptimizationParams(this.applicationState, Optional.of(Integer.valueOf(i)), this.nonOverlapCostOverride, this.tasksToOptimize);
        }

        public RackAwareOptimizationParams withNonOverlapCostOverride(int i) {
            return new RackAwareOptimizationParams(this.applicationState, this.trafficCostOverride, Optional.of(Integer.valueOf(i)), this.tasksToOptimize);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/assignment/TaskAssignmentUtils$TagStatistics.class */
    public static class TagStatistics {
        private final Map<String, Set<String>> tagKeyToValues;
        private final Map<KeyValue<String, String>, Set<ProcessId>> tagEntryToClients;

        public TagStatistics(ApplicationState applicationState) {
            Map<ProcessId, KafkaStreamsState> kafkaStreamsStates = applicationState.kafkaStreamsStates(false);
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            for (KafkaStreamsState kafkaStreamsState : kafkaStreamsStates.values()) {
                kafkaStreamsState.clientTags().forEach((str, str2) -> {
                    ((Set) hashMap.computeIfAbsent(str, str -> {
                        return new HashSet();
                    })).add(str2);
                    ((Set) hashMap2.computeIfAbsent(new KeyValue(str, str2), keyValue -> {
                        return new HashSet();
                    })).add(kafkaStreamsState.processId());
                });
            }
            this.tagKeyToValues = hashMap;
            this.tagEntryToClients = hashMap2;
        }
    }

    private TaskAssignmentUtils() {
    }

    public static TaskAssignor.AssignmentError validateTaskAssignment(ApplicationState applicationState, TaskAssignor.TaskAssignment taskAssignment) {
        Set<TaskId> keySet = applicationState.allTasks().keySet();
        Collection<KafkaStreamsAssignment> assignment = taskAssignment.assignment();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (KafkaStreamsAssignment kafkaStreamsAssignment : assignment) {
            for (KafkaStreamsAssignment.AssignedTask assignedTask : kafkaStreamsAssignment.tasks().values()) {
                if (!keySet.contains(assignedTask.id())) {
                    LOG.error("Assignment is invalid: task {} assigned to KafkaStreams client {} was unknown", assignedTask.id(), kafkaStreamsAssignment.processId().id());
                    return TaskAssignor.AssignmentError.UNKNOWN_TASK_ID;
                }
                if (hashMap.containsKey(assignedTask.id()) && assignedTask.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
                    LOG.error("Assignment is invalid: active task {} was assigned to multiple KafkaStreams clients: {} and {}", new Object[]{assignedTask.id(), kafkaStreamsAssignment.processId().id(), ((ProcessId) hashMap.get(assignedTask.id())).id()});
                    return TaskAssignor.AssignmentError.ACTIVE_TASK_ASSIGNED_MULTIPLE_TIMES;
                }
                if (assignedTask.type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
                    hashMap.put(assignedTask.id(), kafkaStreamsAssignment.processId());
                } else {
                    hashMap2.put(assignedTask.id(), kafkaStreamsAssignment.processId());
                }
            }
        }
        for (TaskInfo taskInfo : applicationState.allTasks().values()) {
            if (!taskInfo.isStateful() && hashMap2.containsKey(taskInfo.id())) {
                LOG.error("Assignment is invalid: standby task for stateless task {} was assigned to KafkaStreams client {}", taskInfo.id(), ((ProcessId) hashMap2.get(taskInfo.id())).id());
                return TaskAssignor.AssignmentError.INVALID_STANDBY_TASK;
            }
        }
        Map<ProcessId, KafkaStreamsState> kafkaStreamsStates = applicationState.kafkaStreamsStates(false);
        Set<ProcessId> set = (Set) assignment.stream().map((v0) -> {
            return v0.processId();
        }).collect(Collectors.toSet());
        Iterator<Map.Entry<ProcessId, KafkaStreamsState>> it = kafkaStreamsStates.entrySet().iterator();
        while (it.hasNext()) {
            ProcessId key = it.next().getKey();
            if (!set.contains(key)) {
                LOG.error("Assignment is invalid: KafkaStreams client {} has no assignment", key.id());
                return TaskAssignor.AssignmentError.MISSING_PROCESS_ID;
            }
        }
        for (ProcessId processId : set) {
            if (!kafkaStreamsStates.containsKey(processId)) {
                LOG.error("Assignment is invalid: the KafkaStreams client {} is unknown", processId.id());
                return TaskAssignor.AssignmentError.UNKNOWN_PROCESS_ID;
            }
        }
        return TaskAssignor.AssignmentError.NONE;
    }

    public static Map<ProcessId, KafkaStreamsAssignment> identityAssignment(ApplicationState applicationState) {
        HashMap hashMap = new HashMap();
        applicationState.kafkaStreamsStates(false).forEach((processId, kafkaStreamsState) -> {
            HashSet hashSet = new HashSet();
            kafkaStreamsState.previousActiveTasks().forEach(taskId -> {
                hashSet.add(new KafkaStreamsAssignment.AssignedTask(taskId, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE));
            });
            kafkaStreamsState.previousStandbyTasks().forEach(taskId2 -> {
                hashSet.add(new KafkaStreamsAssignment.AssignedTask(taskId2, KafkaStreamsAssignment.AssignedTask.Type.STANDBY));
            });
            hashMap.put(processId, KafkaStreamsAssignment.of(processId, hashSet));
        });
        return hashMap;
    }

    public static void defaultStandbyTaskAssignment(ApplicationState applicationState, Map<ProcessId, KafkaStreamsAssignment> map) {
        if (applicationState.assignmentConfigs().rackAwareAssignmentTags().isEmpty()) {
            loadBasedStandbyTaskAssignment(applicationState, map);
        } else {
            tagBasedStandbyTaskAssignment(applicationState, map);
        }
    }

    public static void optimizeRackAwareActiveTasks(RackAwareOptimizationParams rackAwareOptimizationParams, Map<ProcessId, KafkaStreamsAssignment> map) {
        ApplicationState applicationState = rackAwareOptimizationParams.applicationState;
        SortedSet<TaskId> tasksToOptimize = getTasksToOptimize(map, rackAwareOptimizationParams, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE);
        if (!tasksToOptimize.isEmpty() && canPerformRackAwareOptimization(applicationState, rackAwareOptimizationParams, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE)) {
            initializeAssignmentsForAllClients(applicationState, map);
            int intValue = ((Integer) rackAwareOptimizationParams.trafficCostOverride.orElseGet(() -> {
                return Integer.valueOf(applicationState.assignmentConfigs().rackAwareTrafficCost().getAsInt());
            })).intValue();
            int intValue2 = ((Integer) rackAwareOptimizationParams.nonOverlapCostOverride.orElseGet(() -> {
                return Integer.valueOf(applicationState.assignmentConfigs().rackAwareNonOverlapCost().getAsInt());
            })).intValue();
            Map<ProcessId, KafkaStreamsState> kafkaStreamsStates = applicationState.kafkaStreamsStates(false);
            ArrayList arrayList = new ArrayList(tasksToOptimize);
            Map map2 = (Map) applicationState.allTasks().values().stream().filter(taskInfo -> {
                return tasksToOptimize.contains(taskInfo.id());
            }).collect(Collectors.toMap((v0) -> {
                return v0.id();
            }, (v0) -> {
                return v0.topicPartitions();
            }));
            ArrayList arrayList2 = new ArrayList(kafkaStreamsStates.keySet());
            LOG.info("Assignment before active task optimization has cost {}", Long.valueOf(computeTotalAssignmentCost(map2, arrayList, arrayList2, map, kafkaStreamsStates, intValue, intValue2, false, false)));
            RackAwareGraphConstructor create = RackAwareGraphConstructorFactory.create(applicationState.assignmentConfigs().rackAwareAssignmentStrategy(), (Collection<TaskId>) arrayList);
            AssignmentGraph buildTaskGraph = buildTaskGraph(map, kafkaStreamsStates, arrayList, arrayList2, map2, intValue, intValue2, false, false, create);
            buildTaskGraph.graph.solveMinCostFlow();
            create.assignTaskFromMinCostFlow(buildTaskGraph.graph, arrayList2, arrayList, map, buildTaskGraph.taskCountByClient, buildTaskGraph.clientByTask, (kafkaStreamsAssignment, taskId) -> {
                kafkaStreamsAssignment.assignTask(new KafkaStreamsAssignment.AssignedTask(taskId, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE));
            }, (kafkaStreamsAssignment2, taskId2) -> {
                kafkaStreamsAssignment2.removeTask(new KafkaStreamsAssignment.AssignedTask(taskId2, KafkaStreamsAssignment.AssignedTask.Type.ACTIVE));
            }, (kafkaStreamsAssignment3, taskId3) -> {
                return kafkaStreamsAssignment3.tasks().containsKey(taskId3) && kafkaStreamsAssignment3.tasks().get(taskId3).type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE;
            });
        }
    }

    public static void optimizeRackAwareStandbyTasks(RackAwareOptimizationParams rackAwareOptimizationParams, Map<ProcessId, KafkaStreamsAssignment> map) {
        ApplicationState applicationState = rackAwareOptimizationParams.applicationState;
        SortedSet<TaskId> tasksToOptimize = getTasksToOptimize(map, rackAwareOptimizationParams, KafkaStreamsAssignment.AssignedTask.Type.STANDBY);
        if (!tasksToOptimize.isEmpty() && canPerformRackAwareOptimization(applicationState, rackAwareOptimizationParams, KafkaStreamsAssignment.AssignedTask.Type.STANDBY)) {
            initializeAssignmentsForAllClients(applicationState, map);
            int intValue = ((Integer) rackAwareOptimizationParams.trafficCostOverride.orElseGet(() -> {
                return Integer.valueOf(applicationState.assignmentConfigs().rackAwareTrafficCost().getAsInt());
            })).intValue();
            int intValue2 = ((Integer) rackAwareOptimizationParams.nonOverlapCostOverride.orElseGet(() -> {
                return Integer.valueOf(applicationState.assignmentConfigs().rackAwareNonOverlapCost().getAsInt());
            })).intValue();
            Map<ProcessId, KafkaStreamsState> kafkaStreamsStates = applicationState.kafkaStreamsStates(false);
            Map map2 = (Map) applicationState.allTasks().values().stream().collect(Collectors.toMap((v0) -> {
                return v0.id();
            }, taskInfo -> {
                return (Set) taskInfo.topicPartitions().stream().filter((v0) -> {
                    return v0.isChangelog();
                }).collect(Collectors.toSet());
            }));
            ArrayList arrayList = new ArrayList(kafkaStreamsStates.keySet());
            LOG.info("Assignment before standby task optimization has cost {}", Long.valueOf(computeTotalAssignmentCost(map2, new ArrayList(tasksToOptimize), arrayList, map, kafkaStreamsStates, intValue, intValue2, true, true)));
            MoveStandbyTaskPredicate standbyTaskMovePredicate = getStandbyTaskMovePredicate(applicationState);
            BiFunction biFunction = (kafkaStreamsAssignment, kafkaStreamsAssignment2) -> {
                return (List) kafkaStreamsAssignment.tasks().values().stream().filter(assignedTask -> {
                    return assignedTask.type() == KafkaStreamsAssignment.AssignedTask.Type.STANDBY;
                }).filter(assignedTask2 -> {
                    return !kafkaStreamsAssignment2.tasks().containsKey(assignedTask2.id());
                }).filter(assignedTask3 -> {
                    return standbyTaskMovePredicate.canMoveStandbyTask((KafkaStreamsState) kafkaStreamsStates.get(kafkaStreamsAssignment.processId()), (KafkaStreamsState) kafkaStreamsStates.get(kafkaStreamsAssignment.processId()), assignedTask3.id(), map);
                }).map((v0) -> {
                    return v0.id();
                }).sorted().collect(Collectors.toList());
            };
            long currentTimeMillis = System.currentTimeMillis();
            boolean z = true;
            int i = 0;
            RackAwareGraphConstructor create = RackAwareGraphConstructorFactory.create(applicationState.assignmentConfigs().rackAwareAssignmentStrategy(), tasksToOptimize);
            while (z && i < 4) {
                z = false;
                i++;
                for (int i2 = 0; i2 < map.size(); i2++) {
                    ProcessId processId = (ProcessId) arrayList.get(i2);
                    KafkaStreamsAssignment kafkaStreamsAssignment3 = map.get(processId);
                    for (int i3 = i2 + 1; i3 < map.size(); i3++) {
                        ProcessId processId2 = (ProcessId) arrayList.get(i3);
                        KafkaStreamsAssignment kafkaStreamsAssignment4 = map.get(processId2);
                        if (!kafkaStreamsStates.get(processId).rackId().get().equals(kafkaStreamsStates.get(processId2).rackId().get())) {
                            List list = (List) biFunction.apply(kafkaStreamsAssignment3, kafkaStreamsAssignment4);
                            List list2 = (List) biFunction.apply(kafkaStreamsAssignment4, kafkaStreamsAssignment3);
                            if (!list.isEmpty() && !list2.isEmpty()) {
                                List<TaskId> list3 = (List) Stream.concat(list.stream(), list2.stream()).sorted().collect(Collectors.toList());
                                List<ProcessId> list4 = (List) Stream.of((Object[]) new ProcessId[]{processId, processId2}).sorted().collect(Collectors.toList());
                                AssignmentGraph buildTaskGraph = buildTaskGraph(map, kafkaStreamsStates, list3, list4, map2, intValue, intValue2, true, true, create);
                                buildTaskGraph.graph.solveMinCostFlow();
                                z |= create.assignTaskFromMinCostFlow(buildTaskGraph.graph, list4, list3, map, buildTaskGraph.taskCountByClient, buildTaskGraph.clientByTask, (kafkaStreamsAssignment5, taskId) -> {
                                    kafkaStreamsAssignment5.assignTask(new KafkaStreamsAssignment.AssignedTask(taskId, KafkaStreamsAssignment.AssignedTask.Type.STANDBY));
                                }, (kafkaStreamsAssignment6, taskId2) -> {
                                    kafkaStreamsAssignment6.removeTask(new KafkaStreamsAssignment.AssignedTask(taskId2, KafkaStreamsAssignment.AssignedTask.Type.STANDBY));
                                }, (kafkaStreamsAssignment7, taskId3) -> {
                                    return kafkaStreamsAssignment7.tasks().containsKey(taskId3) && kafkaStreamsAssignment7.tasks().get(taskId3).type() == KafkaStreamsAssignment.AssignedTask.Type.STANDBY;
                                });
                            }
                        }
                    }
                }
            }
            LOG.info("Assignment after {} rounds and {} milliseconds for standby task optimization is {}\n with cost {}", new Object[]{Integer.valueOf(i), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), map, Long.valueOf(computeTotalAssignmentCost(map2, new ArrayList(tasksToOptimize), arrayList, map, kafkaStreamsStates, intValue, intValue2, true, true))});
        }
    }

    private static long computeTotalAssignmentCost(Map<TaskId, Set<TaskTopicPartition>> map, List<TaskId> list, List<ProcessId> list2, Map<ProcessId, KafkaStreamsAssignment> map2, Map<ProcessId, KafkaStreamsState> map3, int i, int i2, boolean z, boolean z2) {
        if (list.isEmpty()) {
            return 0L;
        }
        return buildTaskGraph(map2, map3, list, list2, map, i, i2, z, z2, new MinTrafficGraphConstructor()).graph.totalCost();
    }

    private static AssignmentGraph buildTaskGraph(Map<ProcessId, KafkaStreamsAssignment> map, Map<ProcessId, KafkaStreamsState> map2, List<TaskId> list, List<ProcessId> list2, Map<TaskId, Set<TaskTopicPartition>> map3, int i, int i2, boolean z, boolean z2, RackAwareGraphConstructor<KafkaStreamsAssignment> rackAwareGraphConstructor) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        KafkaStreamsAssignment.AssignedTask.Type type = z2 ? KafkaStreamsAssignment.AssignedTask.Type.STANDBY : KafkaStreamsAssignment.AssignedTask.Type.ACTIVE;
        return new AssignmentGraph(rackAwareGraphConstructor.constructTaskGraph(list2, list, map, hashMap, hashMap2, (kafkaStreamsAssignment, taskId) -> {
            return kafkaStreamsAssignment.tasks().containsKey(taskId) && kafkaStreamsAssignment.tasks().get(taskId).type() == type;
        }, (taskId2, processId, z3, i3, i4, z4) -> {
            return (!z3 ? i2 : 0) + getCrossRackTrafficCost((Set) map3.get(taskId2), ((KafkaStreamsState) map2.get(processId)).rackId().get(), i);
        }, i, i2, z, z2), hashMap, hashMap2);
    }

    private static int getCrossRackTrafficCost(Set<TaskTopicPartition> set, String str, int i) {
        int i2 = 0;
        Iterator<TaskTopicPartition> it = set.iterator();
        while (it.hasNext()) {
            if (!it.next().rackIds().get().contains(str)) {
                i2 += i;
            }
        }
        return i2;
    }

    private static boolean canPerformRackAwareOptimization(ApplicationState applicationState, RackAwareOptimizationParams rackAwareOptimizationParams, KafkaStreamsAssignment.AssignedTask.Type type) {
        AssignmentConfigs assignmentConfigs = applicationState.assignmentConfigs();
        String rackAwareAssignmentStrategy = assignmentConfigs.rackAwareAssignmentStrategy();
        if ("none".equals(rackAwareAssignmentStrategy)) {
            LOG.warn("Rack aware task assignment optimization disabled: rack aware strategy was set to {}", rackAwareAssignmentStrategy);
            return false;
        }
        if (!assignmentConfigs.rackAwareTrafficCost().isPresent()) {
            LOG.warn("Rack aware task assignment optimization unavailable: must configure {}", StreamsConfig.RACK_AWARE_ASSIGNMENT_TRAFFIC_COST_CONFIG);
            return false;
        }
        if (assignmentConfigs.rackAwareNonOverlapCost().isPresent()) {
            return hasValidRackInformation(applicationState, type);
        }
        LOG.warn("Rack aware task assignment optimization unavailable: must configure {}", StreamsConfig.RACK_AWARE_ASSIGNMENT_NON_OVERLAP_COST_CONFIG);
        return false;
    }

    private static boolean hasValidRackInformation(ApplicationState applicationState, KafkaStreamsAssignment.AssignedTask.Type type) {
        Iterator<KafkaStreamsState> it = applicationState.kafkaStreamsStates(false).values().iterator();
        while (it.hasNext()) {
            if (!hasValidRackInformation(it.next())) {
                return false;
            }
        }
        Iterator<TaskInfo> it2 = applicationState.allTasks().values().iterator();
        while (it2.hasNext()) {
            if (!hasValidRackInformation(it2.next(), type)) {
                return false;
            }
        }
        return true;
    }

    private static boolean hasValidRackInformation(KafkaStreamsState kafkaStreamsState) {
        if (kafkaStreamsState.rackId().isPresent()) {
            return true;
        }
        LOG.error("KafkaStreams client {} doesn't have a rack id configured.", kafkaStreamsState.processId().id());
        return false;
    }

    private static boolean hasValidRackInformation(TaskInfo taskInfo, KafkaStreamsAssignment.AssignedTask.Type type) {
        for (TaskTopicPartition taskTopicPartition : type == KafkaStreamsAssignment.AssignedTask.Type.STANDBY ? (Collection) taskInfo.topicPartitions().stream().filter((v0) -> {
            return v0.isChangelog();
        }).collect(Collectors.toSet()) : taskInfo.topicPartitions()) {
            Optional<Set<String>> rackIds = taskTopicPartition.rackIds();
            if (!rackIds.isPresent() || rackIds.get().isEmpty()) {
                LOG.error("Topic partition {} for task {} does not have racks configured.", taskTopicPartition, taskInfo.id());
                return false;
            }
        }
        return true;
    }

    private static Map<ProcessId, KafkaStreamsAssignment> tagBasedStandbyTaskAssignment(ApplicationState applicationState, Map<ProcessId, KafkaStreamsAssignment> map) {
        initializeAssignmentsForAllClients(applicationState, map);
        int numStandbyReplicas = applicationState.assignmentConfigs().numStandbyReplicas();
        Map<ProcessId, KafkaStreamsState> kafkaStreamsStates = applicationState.kafkaStreamsStates(false);
        HashSet hashSet = new HashSet(applicationState.assignmentConfigs().rackAwareAssignmentTags());
        TagStatistics tagStatistics = new TagStatistics(applicationState);
        ConstrainedPrioritySet standbyTaskPriorityListByLoad = standbyTaskPriorityListByLoad(kafkaStreamsStates, map);
        Set<TaskId> set = (Set) applicationState.allTasks().values().stream().filter(taskInfo -> {
            return taskInfo.topicPartitions().stream().anyMatch((v0) -> {
                return v0.isChangelog();
            });
        }).map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet());
        Map map2 = (Map) set.stream().collect(Collectors.toMap(Function.identity(), taskId -> {
            return Integer.valueOf(numStandbyReplicas);
        }));
        HashMap hashMap = new HashMap();
        for (TaskId taskId2 : set) {
            for (KafkaStreamsAssignment kafkaStreamsAssignment : map.values()) {
                if (kafkaStreamsAssignment.tasks().containsKey(taskId2) && kafkaStreamsAssignment.tasks().get(taskId2).type() == KafkaStreamsAssignment.AssignedTask.Type.ACTIVE) {
                    assignStandbyTasksToClientsWithDifferentTags(numStandbyReplicas, standbyTaskPriorityListByLoad, taskId2, kafkaStreamsAssignment.processId(), hashSet, kafkaStreamsStates, map, map2, tagStatistics.tagKeyToValues, tagStatistics.tagEntryToClients, hashMap);
                }
            }
        }
        if (!map2.isEmpty()) {
            assignPendingStandbyTasksToLeastLoadedClients(map, numStandbyReplicas, standbyTaskPriorityListByLoad, map2);
        }
        return map;
    }

    private static Map<ProcessId, KafkaStreamsAssignment> loadBasedStandbyTaskAssignment(ApplicationState applicationState, Map<ProcessId, KafkaStreamsAssignment> map) {
        initializeAssignmentsForAllClients(applicationState, map);
        int numStandbyReplicas = applicationState.assignmentConfigs().numStandbyReplicas();
        Map<ProcessId, KafkaStreamsState> kafkaStreamsStates = applicationState.kafkaStreamsStates(false);
        Set set = (Set) applicationState.allTasks().values().stream().filter(taskInfo -> {
            return taskInfo.topicPartitions().stream().anyMatch((v0) -> {
                return v0.isChangelog();
            });
        }).map((v0) -> {
            return v0.id();
        }).collect(Collectors.toSet());
        Map map2 = (Map) set.stream().collect(Collectors.toMap(Function.identity(), taskId -> {
            return Integer.valueOf(numStandbyReplicas);
        }));
        ConstrainedPrioritySet standbyTaskPriorityListByLoad = standbyTaskPriorityListByLoad(kafkaStreamsStates, map);
        standbyTaskPriorityListByLoad.offerAll(kafkaStreamsStates.keySet());
        Iterator it = set.iterator();
        while (it.hasNext()) {
            assignStandbyTasksForActiveTask(numStandbyReplicas, map, map2, standbyTaskPriorityListByLoad, (TaskId) it.next());
        }
        return map;
    }

    private static void assignStandbyTasksForActiveTask(int i, Map<ProcessId, KafkaStreamsAssignment> map, Map<TaskId, Integer> map2, ConstrainedPrioritySet constrainedPrioritySet, TaskId taskId) {
        ProcessId poll;
        int intValue = map2.get(taskId).intValue();
        while (intValue > 0 && (poll = constrainedPrioritySet.poll(taskId)) != null) {
            map.get(poll).assignTask(new KafkaStreamsAssignment.AssignedTask(taskId, KafkaStreamsAssignment.AssignedTask.Type.STANDBY));
            intValue--;
            constrainedPrioritySet.offer(poll);
        }
        map2.put(taskId, Integer.valueOf(intValue));
        if (intValue > 0) {
            LOG.warn("Unable to assign {} of {} standby tasks for task [{}]. There is not enough available capacity. You should increase the number of application instances to maintain the requested number of standby replicas.", new Object[]{Integer.valueOf(intValue), Integer.valueOf(i), taskId});
        }
    }

    private static void assignStandbyTasksToClientsWithDifferentTags(int i, ConstrainedPrioritySet constrainedPrioritySet, TaskId taskId, ProcessId processId, Set<String> set, Map<ProcessId, KafkaStreamsState> map, Map<ProcessId, KafkaStreamsAssignment> map2, Map<TaskId, Integer> map3, Map<String, Set<String>> map4, Map<KeyValue<String, String>, Set<ProcessId>> map5, Map<TaskId, ProcessId> map6) {
        constrainedPrioritySet.offerAll(map.keySet());
        int i2 = 1;
        int intValue = map3.get(taskId).intValue();
        HashMap hashMap = new HashMap();
        ProcessId processId2 = processId;
        do {
            updateClientsOnAlreadyUsedTagEntries(map.get(processId2), i2, set, map5, map4, hashMap);
            ProcessId poll = constrainedPrioritySet.poll(taskId, processId3 -> {
                return Boolean.valueOf(!isClientUsedOnAnyOfTheTagEntries(processId3, hashMap));
            });
            if (poll == null) {
                break;
            }
            KafkaStreamsState kafkaStreamsState = map.get(poll);
            i2++;
            intValue--;
            LOG.debug("Assigning {} out of {} standby tasks for an active task [{}] with client tags {}. Standby task client tags are {}.", new Object[]{Integer.valueOf(i - intValue), Integer.valueOf(i), taskId, map.get(processId).clientTags(), kafkaStreamsState.clientTags()});
            map2.get(kafkaStreamsState.processId()).assignTask(new KafkaStreamsAssignment.AssignedTask(taskId, KafkaStreamsAssignment.AssignedTask.Type.STANDBY));
            processId2 = poll;
        } while (intValue > 0);
        if (intValue <= 0) {
            map3.remove(taskId);
            return;
        }
        map6.put(taskId, processId);
        map3.put(taskId, Integer.valueOf(intValue));
        LOG.warn("Rack aware standby task assignment was not able to assign {} of {} standby tasks for the active task [{}] with the rack aware assignment tags {}. This may happen when there aren't enough application instances on different tag dimensions compared to an active and corresponding standby task. Consider launching application instances on different tag dimensions than [{}]. Standby task assignment will fall back to assigning standby tasks to the least loaded clients.", new Object[]{Integer.valueOf(intValue), Integer.valueOf(i), taskId, set, map.get(processId).clientTags()});
    }

    private static boolean isClientUsedOnAnyOfTheTagEntries(ProcessId processId, Map<KeyValue<String, String>, Set<ProcessId>> map) {
        return map.values().stream().anyMatch(set -> {
            return set.contains(processId);
        });
    }

    private static void updateClientsOnAlreadyUsedTagEntries(KafkaStreamsState kafkaStreamsState, int i, Set<String> set, Map<KeyValue<String, String>, Set<ProcessId>> map, Map<String, Set<String>> map2, Map<KeyValue<String, String>, Set<ProcessId>> map3) {
        for (Map.Entry<String, String> entry : kafkaStreamsState.clientTags().entrySet()) {
            String key = entry.getKey();
            if (set.contains(key)) {
                Set<String> set2 = map2.get(key);
                if (set2.size() <= i) {
                    set2.forEach(str -> {
                    });
                } else {
                    KeyValue<String, String> keyValue = new KeyValue<>(key, entry.getValue());
                    map3.put(keyValue, map.get(keyValue));
                }
            } else {
                LOG.warn("Client tag with key [{}] will be ignored when computing rack aware standby task assignment because it is not part of the configured rack awareness [{}].", key, set);
            }
        }
    }

    private static MoveStandbyTaskPredicate getStandbyTaskMovePredicate(ApplicationState applicationState) {
        if (!(!applicationState.assignmentConfigs().rackAwareAssignmentTags().isEmpty())) {
            return (kafkaStreamsState, kafkaStreamsState2, taskId, map) -> {
                return true;
            };
        }
        BiConsumer biConsumer = (kafkaStreamsState3, set) -> {
            Map<String, String> clientTags = kafkaStreamsState3.clientTags();
            if (clientTags != null) {
                set.addAll((Collection) clientTags.entrySet().stream().map(entry -> {
                    return KeyValue.pair(entry.getKey(), entry.getValue());
                }).collect(Collectors.toList()));
            }
        };
        Map<ProcessId, KafkaStreamsState> kafkaStreamsStates = applicationState.kafkaStreamsStates(false);
        return (kafkaStreamsState4, kafkaStreamsState5, taskId2, map2) -> {
            HashSet hashSet = new HashSet();
            HashSet hashSet2 = new HashSet();
            for (KafkaStreamsAssignment kafkaStreamsAssignment : map2.values()) {
                boolean containsKey = kafkaStreamsAssignment.tasks().containsKey(taskId2);
                boolean equals = kafkaStreamsAssignment.processId().equals(kafkaStreamsState4.processId());
                boolean equals2 = kafkaStreamsAssignment.processId().equals(kafkaStreamsState5.processId());
                if (containsKey && !equals && !equals2) {
                    KafkaStreamsState kafkaStreamsState4 = (KafkaStreamsState) kafkaStreamsStates.get(kafkaStreamsAssignment.processId());
                    biConsumer.accept(kafkaStreamsState4, hashSet);
                    biConsumer.accept(kafkaStreamsState4, hashSet2);
                }
            }
            biConsumer.accept(kafkaStreamsState4, hashSet);
            biConsumer.accept(kafkaStreamsState5, hashSet2);
            return hashSet2.size() >= hashSet.size();
        };
    }

    private static ConstrainedPrioritySet standbyTaskPriorityListByLoad(Map<ProcessId, KafkaStreamsState> map, Map<ProcessId, KafkaStreamsAssignment> map2) {
        return new ConstrainedPrioritySet((processId, taskId) -> {
            return Boolean.valueOf(!((KafkaStreamsAssignment) map2.get(processId)).tasks().containsKey(taskId));
        }, processId2 -> {
            return Double.valueOf(((KafkaStreamsAssignment) map2.get(processId2)).tasks().size() / ((KafkaStreamsState) map.get(processId2)).numProcessingThreads());
        });
    }

    private static void assignPendingStandbyTasksToLeastLoadedClients(Map<ProcessId, KafkaStreamsAssignment> map, int i, ConstrainedPrioritySet constrainedPrioritySet, Map<TaskId, Integer> map2) {
        constrainedPrioritySet.offerAll(map.keySet());
        Iterator<Map.Entry<TaskId, Integer>> it = map2.entrySet().iterator();
        while (it.hasNext()) {
            assignStandbyTasksForActiveTask(i, map, map2, constrainedPrioritySet, it.next().getKey());
        }
    }

    private static void initializeAssignmentsForAllClients(ApplicationState applicationState, Map<ProcessId, KafkaStreamsAssignment> map) {
        for (ProcessId processId : applicationState.kafkaStreamsStates(false).keySet()) {
            if (!map.containsKey(processId)) {
                map.put(processId, KafkaStreamsAssignment.of(processId, new HashSet()));
            }
        }
    }

    private static SortedSet<TaskId> getTasksToOptimize(Map<ProcessId, KafkaStreamsAssignment> map, RackAwareOptimizationParams rackAwareOptimizationParams, KafkaStreamsAssignment.AssignedTask.Type type) {
        return (rackAwareOptimizationParams == null || !rackAwareOptimizationParams.tasksToOptimize.isPresent()) ? (SortedSet) map.values().stream().flatMap(kafkaStreamsAssignment -> {
            return kafkaStreamsAssignment.tasks().values().stream();
        }).filter(assignedTask -> {
            return assignedTask.type() == type;
        }).map((v0) -> {
            return v0.id();
        }).collect(Collectors.toCollection(TreeSet::new)) : (SortedSet) rackAwareOptimizationParams.tasksToOptimize.get();
    }
}
