/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.internals.BrokerAndSortedReplicas;
import com.linkedin.kafka.cruisecontrol.analyzer.kafkaassigner.KafkaAssignerUtils;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.ClusterModelStats;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
import java.util.StringJoiner;
import java.util.TreeSet;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaAssignerDiskUsageDistributionGoal
implements Goal {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaAssignerDiskUsageDistributionGoal.class);
    private static final double BALANCE_MARGIN = 0.9;
    private static final double USAGE_EQUALITY_DELTA = 1.0E-4;
    private static final double REPLICA_CONVERGENCE_DELTA = 0.4;
    private BalancingConstraint _balancingConstraint;
    private double _minMonitoredPartitionPercentage = 0.995;

    public KafkaAssignerDiskUsageDistributionGoal() {
    }

    KafkaAssignerDiskUsageDistributionGoal(BalancingConstraint constraint) {
        this._balancingConstraint = constraint;
    }

    @Override
    public Goal.ClusterModelStatsComparator clusterModelStatsComparator() {
        return new DiskDistributionGoalStatsComparator();
    }

    public void configure(Map<String, ?> configs) {
        KafkaCruiseControlConfig parsedConfig = new KafkaCruiseControlConfig(configs, false);
        this._balancingConstraint = new BalancingConstraint(parsedConfig);
        this._minMonitoredPartitionPercentage = parsedConfig.getDouble("min.valid.partition.ratio");
    }

    @Override
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(1, this._minMonitoredPartitionPercentage, true);
    }

    @Override
    public boolean optimize(ClusterModel clusterModel, Set<Goal> optimizedGoals, OptimizationOptions optimizationOptions) {
        boolean improved;
        KafkaAssignerUtils.sanityCheckOptimizationOptions(optimizationOptions);
        Set<String> excludedTopics = optimizationOptions.excludedTopics();
        double meanDiskUsage = clusterModel.load().expectedUtilizationFor(Resource.DISK) / clusterModel.capacityFor(Resource.DISK);
        double upperThreshold = meanDiskUsage * (1.0 + this.balancePercentageWithMargin());
        double lowerThreshold = meanDiskUsage * Math.max(0.0, 1.0 - this.balancePercentageWithMargin());
        Comparator<BrokerAndSortedReplicas> brokerComparator = Comparator.comparingDouble(this::diskUsage).thenComparingInt(bs -> bs.broker().id());
        Comparator<Replica> replicaComparator = Comparator.comparingDouble(this::replicaSize).thenComparing(r -> r);
        TreeSet<BrokerAndSortedReplicas> allBrokers = new TreeSet<BrokerAndSortedReplicas>(brokerComparator);
        clusterModel.aliveBrokers().forEach(b -> allBrokers.add(new BrokerAndSortedReplicas((Broker)b, replicaComparator)));
        int numIterations = 0;
        do {
            improved = false;
            LOG.debug("Starting iteration {}", (Object)numIterations);
            ArrayList<BrokerAndSortedReplicas> allBrokerAndSortedReplicas = new ArrayList<BrokerAndSortedReplicas>(allBrokers);
            for (BrokerAndSortedReplicas bas : allBrokerAndSortedReplicas) {
                if (!this.checkAndOptimize(allBrokers, bas, clusterModel, meanDiskUsage, lowerThreshold, upperThreshold, excludedTopics)) continue;
                improved = true;
            }
            ++numIterations;
        } while (improved);
        boolean succeeded = this.isOptimized(clusterModel, upperThreshold, lowerThreshold);
        LOG.debug("Finished optimization in {} iterations.", (Object)numIterations);
        return succeeded;
    }

    private boolean isOptimized(ClusterModel clusterModel, double upperThreshold, double lowerThreshold) {
        StringJoiner joiner;
        HashSet<Broker> brokersAboveUpperThreshold = new HashSet<Broker>();
        HashSet<Broker> brokersUnderLowerThreshold = new HashSet<Broker>();
        for (Broker broker : clusterModel.aliveBrokers()) {
            double diskUsage = this.diskUsage(broker);
            if (diskUsage < lowerThreshold) {
                brokersUnderLowerThreshold.add(broker);
                continue;
            }
            if (!(diskUsage > upperThreshold)) continue;
            brokersAboveUpperThreshold.add(broker);
        }
        if (!brokersUnderLowerThreshold.isEmpty()) {
            joiner = new StringJoiner(", ");
            brokersUnderLowerThreshold.forEach(b -> joiner.add(String.format("%d:(%.3f)", b.id(), this.diskUsage((Broker)b))));
            LOG.warn("There are still {} brokers under the lower threshold of {}. The brokers are {}", new Object[]{brokersUnderLowerThreshold.size(), this.dWrap(lowerThreshold), joiner.toString()});
        }
        if (!brokersAboveUpperThreshold.isEmpty()) {
            joiner = new StringJoiner(", ");
            brokersAboveUpperThreshold.forEach(b -> joiner.add(String.format("%d:(%.3f)", b.id(), this.diskUsage((Broker)b))));
            LOG.warn("There are still {} brokers above the upper threshold of {}. The brokers are {}", new Object[]{brokersAboveUpperThreshold.size(), this.dWrap(upperThreshold), joiner.toString()});
        }
        return brokersUnderLowerThreshold.isEmpty() && brokersAboveUpperThreshold.isEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean checkAndOptimize(SortedSet<BrokerAndSortedReplicas> allBrokers, BrokerAndSortedReplicas toOptimize, ClusterModel clusterModel, double meanDiskUsage, double lowerThreshold, double upperThreshold, Set<String> excludedTopics) {
        ArrayList<BrokerAndSortedReplicas> candidateBrokersToSwapWith;
        if (LOG.isTraceEnabled()) {
            LOG.trace("Optimizing broker {}. BrokerDiskUsage = {}, meanDiskUsage = {}", new Object[]{toOptimize.broker(), this.dWrap(this.diskUsage(toOptimize.broker())), this.dWrap(meanDiskUsage)});
        }
        double brokerDiskUsage = this.diskUsage(toOptimize.broker());
        boolean improved = false;
        if (brokerDiskUsage > upperThreshold) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Broker {} disk usage {} is above upper threshold of {}", new Object[]{toOptimize.broker().id(), this.dWrap(brokerDiskUsage), this.dWrap(upperThreshold)});
            }
            candidateBrokersToSwapWith = new ArrayList<BrokerAndSortedReplicas>(allBrokers.headSet(toOptimize));
        } else if (brokerDiskUsage < lowerThreshold) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Broker {} disk usage {} is below lower threshold of {}", new Object[]{toOptimize.broker().id(), this.dWrap(brokerDiskUsage), this.dWrap(lowerThreshold)});
            }
            candidateBrokersToSwapWith = new ArrayList<BrokerAndSortedReplicas>(allBrokers.tailSet(toOptimize));
            Collections.reverse(candidateBrokersToSwapWith);
        } else {
            return false;
        }
        for (BrokerAndSortedReplicas toSwapWith : candidateBrokersToSwapWith) {
            block10: {
                if (toSwapWith == toOptimize || Math.abs(this.diskUsage(toSwapWith) - this.diskUsage(toOptimize)) < 1.0E-4) continue;
                allBrokers.removeAll(Arrays.asList(toOptimize, toSwapWith));
                try {
                    if (!this.swapReplicas(toOptimize, toSwapWith, meanDiskUsage, clusterModel, excludedTopics)) break block10;
                    improved = true;
                }
                catch (Throwable throwable) {
                    allBrokers.addAll(Arrays.asList(toOptimize, toSwapWith));
                    throw throwable;
                }
                allBrokers.addAll(Arrays.asList(toOptimize, toSwapWith));
                break;
            }
            allBrokers.addAll(Arrays.asList(toOptimize, toSwapWith));
        }
        return improved;
    }

    boolean swapReplicas(BrokerAndSortedReplicas toSwap, BrokerAndSortedReplicas toSwapWith, double meanDiskUsage, ClusterModel clusterModel, Set<String> excludedTopics) {
        Iterator<ReplicaWrapper> toSwapIter;
        if (LOG.isTraceEnabled()) {
            LOG.trace("Swapping replicas between broker {}({}) and broker {}({})", new Object[]{toSwap.broker().id(), this.dWrap(this.brokerSize(toSwap)), toSwapWith.broker().id(), this.dWrap(this.brokerSize(toSwapWith))});
        }
        double sizeToChange = toSwap.broker().capacityFor(Resource.DISK) * meanDiskUsage - this.brokerSize(toSwap);
        NavigableSet<ReplicaWrapper> sortedReplicasToSwap = this.sortReplicasAscend(toSwap, excludedTopics);
        NavigableSet<ReplicaWrapper> sortedLeadersToSwapWith = this.sortReplicasAscend(toSwapWith, excludedTopics);
        NavigableSet<ReplicaWrapper> sortedFollowersToSwapWith = this.sortedFollowerReplicas(toSwapWith, excludedTopics);
        Iterator<ReplicaWrapper> iterator = toSwapIter = sizeToChange > 0.0 ? sortedReplicasToSwap.iterator() : sortedReplicasToSwap.descendingIterator();
        while (toSwapIter.hasNext()) {
            Replica replicaToSwapWith;
            double currentSizeOfBrokerToSwapWith;
            double currentSizeOfBrokerToSwap;
            Replica replicaToSwap = toSwapIter.next().replica();
            if (excludedTopics.contains(replicaToSwap.topicPartition().topic()) || !this.possibleToMove(replicaToSwap, toSwapWith.broker(), clusterModel)) continue;
            NavigableSet<ReplicaWrapper> sortedReplicasToSwapWith = replicaToSwap.isLeader() ? sortedLeadersToSwapWith : sortedFollowersToSwapWith;
            double sizeToSwap = this.replicaSize(replicaToSwap);
            if (sizeToChange < 0.0 && sizeToSwap == 0.0) break;
            double maxSize = Double.MAX_VALUE;
            double minSize = Double.MIN_VALUE;
            if (sizeToChange > 0.0) {
                minSize = sizeToSwap;
                double maxSizeOfBrokerToSwap = this.diskUsage(toSwapWith) * toSwap.broker().capacityFor(Resource.DISK);
                currentSizeOfBrokerToSwap = this.brokerSize(toSwap);
                maxSize = Math.min(maxSize, maxSizeOfBrokerToSwap - (currentSizeOfBrokerToSwap - sizeToSwap));
                double minSizeOfBrokerToSwapWith = this.diskUsage(toSwap) * toSwapWith.broker().capacityFor(Resource.DISK);
                currentSizeOfBrokerToSwapWith = this.brokerSize(toSwapWith);
                maxSize = Math.min(maxSize, currentSizeOfBrokerToSwapWith + sizeToSwap - minSizeOfBrokerToSwapWith);
            } else {
                maxSize = sizeToSwap;
                double minSizeOfBrokerToSwap = this.diskUsage(toSwapWith) * toSwap.broker().capacityFor(Resource.DISK);
                currentSizeOfBrokerToSwap = this.brokerSize(toSwap);
                minSize = Math.max(minSize, minSizeOfBrokerToSwap - (currentSizeOfBrokerToSwap - sizeToSwap));
                double maxSizeOfBrokerToSwapWith = this.diskUsage(toSwap) * toSwapWith.broker().capacityFor(Resource.DISK);
                currentSizeOfBrokerToSwapWith = this.brokerSize(toSwapWith);
                minSize = Math.max(minSize, currentSizeOfBrokerToSwapWith + sizeToSwap - maxSizeOfBrokerToSwapWith);
            }
            minSize += 0.4;
            maxSize -= 0.4;
            double targetSize = sizeToSwap + sizeToChange;
            if (LOG.isTraceEnabled()) {
                LOG.trace("replicaToSwap: {}(size={}), targetSize={}, minSize={}, maxSize={}", new Object[]{replicaToSwap, this.dWrap(this.replicaSize(replicaToSwap)), this.dWrap(targetSize), this.dWrap(minSize), this.dWrap(maxSize)});
            }
            if ((replicaToSwapWith = sortedReplicasToSwapWith.isEmpty() ? null : this.findReplicaToSwapWith(replicaToSwap, sortedReplicasToSwapWith, targetSize, minSize, maxSize, clusterModel)) == null) continue;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Found replica to swap. Swapping {}({}) on broker {}({}) and {}({}) on broker {}({})", new Object[]{replicaToSwap.topicPartition(), this.dWrap(this.replicaSize(replicaToSwap)), toSwap.broker().id(), this.dWrap(this.brokerSize(toSwap)), replicaToSwapWith.topicPartition(), this.dWrap(this.replicaSize(replicaToSwapWith)), toSwapWith.broker().id(), this.dWrap(this.brokerSize(toSwapWith))});
            }
            clusterModel.relocateReplica(replicaToSwapWith.topicPartition(), toSwapWith.broker().id(), toSwap.broker().id());
            clusterModel.relocateReplica(replicaToSwap.topicPartition(), toSwap.broker().id(), toSwapWith.broker().id());
            toSwap.sortedReplicas().remove(replicaToSwap);
            toSwap.sortedReplicas().add(replicaToSwapWith);
            toSwapWith.sortedReplicas().remove(replicaToSwapWith);
            toSwapWith.sortedReplicas().add(replicaToSwap);
            return true;
        }
        LOG.trace("Nothing to swap between broker {} and broker {}", (Object)toSwap.broker().id(), (Object)toSwapWith.broker().id());
        return false;
    }

    Replica findReplicaToSwapWith(Replica replica, NavigableSet<ReplicaWrapper> sortedReplicasToSearch, double targetSize, double minSize, double maxSize, ClusterModel clusterModel) {
        if (minSize > maxSize) {
            return null;
        }
        NavigableSet<ReplicaWrapper> candidates = sortedReplicasToSearch.subSet(ReplicaWrapper.greaterThan(minSize), false, ReplicaWrapper.lessThan(maxSize), false);
        if (candidates.isEmpty()) {
            return null;
        }
        Iterator<ReplicaWrapper> ascendingLargerIter = null;
        Iterator<ReplicaWrapper> descendingLessIter = null;
        if (targetSize <= minSize) {
            ascendingLargerIter = candidates.iterator();
        } else if (targetSize >= maxSize) {
            descendingLessIter = candidates.descendingIterator();
        } else {
            ascendingLargerIter = candidates.tailSet(ReplicaWrapper.greaterThanOrEqualsTo(targetSize), true).iterator();
            descendingLessIter = candidates.headSet(ReplicaWrapper.lessThanOrEqualsTo(targetSize), true).descendingIterator();
        }
        ReplicaWrapper low = null;
        ReplicaWrapper high = null;
        ReplicaWrapper candidateReplica = null;
        do {
            double highDiff;
            if (candidateReplica == high) {
                ReplicaWrapper replicaWrapper = high = ascendingLargerIter != null && ascendingLargerIter.hasNext() ? ascendingLargerIter.next() : null;
            }
            if (candidateReplica == low) {
                ReplicaWrapper replicaWrapper = low = descendingLessIter != null && descendingLessIter.hasNext() ? descendingLessIter.next() : null;
            }
            if (high == null && low == null) {
                return null;
            }
            if (high == null) {
                candidateReplica = low;
                continue;
            }
            if (low == null) {
                candidateReplica = high;
                continue;
            }
            double lowDiff = targetSize - low.size();
            ReplicaWrapper replicaWrapper = candidateReplica = lowDiff <= (highDiff = high.size() - targetSize) ? low : high;
        } while (!this.canSwap(replica, candidateReplica.replica(), clusterModel));
        return candidateReplica.replica();
    }

    private boolean possibleToMove(Replica replica, Broker destinationBroker, ClusterModel clusterModel) {
        TopicPartition tp = replica.topicPartition();
        boolean case1 = !clusterModel.partition(tp).partitionRacks().contains(destinationBroker.rack());
        boolean case2 = replica.broker().rack() == destinationBroker.rack() && destinationBroker.replica(tp) == null;
        return case1 || case2;
    }

    boolean canSwap(Replica r1, Replica r2, ClusterModel clusterModel) {
        boolean inSameRack = r1.broker().rack() == r2.broker().rack() && r1.broker() != r2.broker();
        boolean rackAware = !clusterModel.partition(r1.topicPartition()).partitionRacks().contains(r2.broker().rack()) && !clusterModel.partition(r2.topicPartition()).partitionRacks().contains(r1.broker().rack());
        boolean sameRole = r1.isLeader() == r2.isLeader();
        return (inSameRack || rackAware) && sameRole;
    }

    private NavigableSet<ReplicaWrapper> sortReplicasAscend(BrokerAndSortedReplicas bas, Set<String> excludedTopics) {
        TreeSet<ReplicaWrapper> sortedReplicas = new TreeSet<ReplicaWrapper>();
        bas.sortedReplicas().forEach(r -> {
            if (!excludedTopics.contains(r.topicPartition().topic())) {
                sortedReplicas.add(new ReplicaWrapper((Replica)r, this.replicaSize((Replica)r)));
            }
        });
        return sortedReplicas;
    }

    private NavigableSet<ReplicaWrapper> sortedFollowerReplicas(BrokerAndSortedReplicas bas, Set<String> excludedTopics) {
        TreeSet<ReplicaWrapper> sortedFollowers = new TreeSet<ReplicaWrapper>();
        bas.sortedReplicas().forEach(r -> {
            if (!r.isLeader() || excludedTopics.contains(r.topicPartition().topic())) {
                sortedFollowers.add(new ReplicaWrapper((Replica)r, this.replicaSize((Replica)r)));
            }
        });
        return sortedFollowers;
    }

    @Override
    public ActionAcceptance actionAcceptance(BalancingAction action, ClusterModel clusterModel) {
        throw new IllegalStateException("No goal should be executed after " + this.name());
    }

    @Override
    public String name() {
        return KafkaAssignerDiskUsageDistributionGoal.class.getSimpleName();
    }

    @Override
    public void finish() {
    }

    @Override
    public boolean isHardGoal() {
        return true;
    }

    private double diskUsage(BrokerAndSortedReplicas bas) {
        double diskCapacity = bas.broker().capacityFor(Resource.DISK);
        return Double.compare(diskCapacity, 0.0) < 1 ? 0.0 : bas.broker().load().expectedUtilizationFor(Resource.DISK) / diskCapacity;
    }

    private double diskUsage(Broker broker) {
        double diskCapacity = broker.capacityFor(Resource.DISK);
        return Double.compare(diskCapacity, 0.0) < 1 ? 0.0 : broker.load().expectedUtilizationFor(Resource.DISK) / diskCapacity;
    }

    private double replicaSize(Replica replica) {
        return replica.load().expectedUtilizationFor(Resource.DISK);
    }

    private double brokerSize(BrokerAndSortedReplicas bas) {
        return this.diskUsage(bas) * bas.broker().capacityFor(Resource.DISK);
    }

    private double balancePercentageWithMargin() {
        return (this._balancingConstraint.resourceBalancePercentage(Resource.DISK) - 1.0) * 0.9;
    }

    private DoubleWrapper dWrap(double value) {
        return new DoubleWrapper(value);
    }

    private static class DoubleWrapper {
        final double _value;

        private DoubleWrapper(double value) {
            this._value = value;
        }

        public String toString() {
            return Double.toString(this._value);
        }
    }

    static class ReplicaWrapper
    implements Comparable<ReplicaWrapper> {
        private final Replica _replica;
        private final double _size;

        ReplicaWrapper(Replica replica, double size) {
            this._replica = replica;
            this._size = size;
        }

        private double size() {
            return this._size;
        }

        private Replica replica() {
            return this._replica;
        }

        @Override
        public int compareTo(ReplicaWrapper o) {
            if (o == null) {
                throw new IllegalArgumentException("Cannot compare to a null object.");
            }
            int result = Double.compare(this.size(), o.size());
            if (result != 0) {
                return result;
            }
            if (!(this.replica() != Replica.MAX_REPLICA && this.replica() != Replica.MIN_REPLICA || o.replica() != Replica.MAX_REPLICA && o.replica() != Replica.MIN_REPLICA)) {
                return 0;
            }
            if (this.replica() == Replica.MAX_REPLICA || o.replica() == Replica.MIN_REPLICA) {
                return 1;
            }
            if (this.replica() == Replica.MIN_REPLICA || o.replica() == Replica.MAX_REPLICA) {
                return -1;
            }
            return this.replica().compareTo(o.replica());
        }

        public boolean equals(Object obj) {
            return obj instanceof ReplicaWrapper && ((ReplicaWrapper)obj).size() == this._size && ((ReplicaWrapper)obj).replica().equals(this._replica);
        }

        public int hashCode() {
            return Objects.hash(this._replica, this._size);
        }

        private static ReplicaWrapper greaterThanOrEqualsTo(double size) {
            return new ReplicaWrapper(Replica.MIN_REPLICA, size);
        }

        private static ReplicaWrapper greaterThan(double size) {
            return new ReplicaWrapper(Replica.MAX_REPLICA, size);
        }

        private static ReplicaWrapper lessThanOrEqualsTo(double size) {
            return new ReplicaWrapper(Replica.MAX_REPLICA, size);
        }

        private static ReplicaWrapper lessThan(double size) {
            return new ReplicaWrapper(Replica.MIN_REPLICA, size);
        }
    }

    private class DiskDistributionGoalStatsComparator
    implements Goal.ClusterModelStatsComparator {
        private String _reasonForLastNegativeResult;

        private DiskDistributionGoalStatsComparator() {
        }

        @Override
        public int compare(ClusterModelStats stats1, ClusterModelStats stats2) {
            int numBalancedBroker1 = stats1.numBalancedBrokersByResource().get((Object)Resource.DISK);
            int numBalancedBroker2 = stats2.numBalancedBrokersByResource().get((Object)Resource.DISK);
            if (numBalancedBroker2 > numBalancedBroker1) {
                this._reasonForLastNegativeResult = String.format("Violated %s. [Number of Balanced Brokers] for resource %s. post-optimization:%d pre-optimization:%d", new Object[]{KafkaAssignerDiskUsageDistributionGoal.this.name(), Resource.DISK, numBalancedBroker1, numBalancedBroker2});
                return -1;
            }
            return 1;
        }

        @Override
        public String explainLastComparison() {
            return this._reasonForLastNegativeResult;
        }
    }
}

