/*
 * Decompiled with CFR 0.152.
 */
package com.linkedin.kafka.cruisecontrol.executor;

import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTask;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTaskManager;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeReplicaLogDirsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartitionReplica;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.LogDirNotFoundException;
import org.apache.kafka.common.errors.ReplicaNotAvailableException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutorAdminUtils {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutorAdminUtils.class);

    private ExecutorAdminUtils() {
    }

    static Map<ExecutionTask, DescribeReplicaLogDirsResult.ReplicaLogDirInfo> getLogdirInfoForExecutionTask(Collection<ExecutionTask> tasks, AdminClient adminClient, KafkaCruiseControlConfig config) {
        HashSet replicasToCheck = new HashSet(tasks.size());
        HashMap<ExecutionTask, DescribeReplicaLogDirsResult.ReplicaLogDirInfo> logdirInfoByTask = new HashMap<ExecutionTask, DescribeReplicaLogDirsResult.ReplicaLogDirInfo>(tasks.size());
        HashMap taskByReplica = new HashMap(tasks.size());
        tasks.forEach(t -> {
            TopicPartitionReplica tpr = new TopicPartitionReplica(t.proposal().topic(), t.proposal().partitionId(), t.brokerId());
            replicasToCheck.add(tpr);
            taskByReplica.put(tpr, t);
        });
        Map logDirsByReplicas = adminClient.describeReplicaLogDirs(replicasToCheck).values();
        for (Map.Entry entry : logDirsByReplicas.entrySet()) {
            try {
                DescribeReplicaLogDirsResult.ReplicaLogDirInfo info = (DescribeReplicaLogDirsResult.ReplicaLogDirInfo)((KafkaFuture)entry.getValue()).get(config.getLong("logdir.response.timeout.ms").longValue(), TimeUnit.MILLISECONDS);
                logdirInfoByTask.put((ExecutionTask)taskByReplica.get(entry.getKey()), info);
            }
            catch (InterruptedException | ExecutionException | TimeoutException e) {
                LOG.warn("Encounter exception {} when fetching logdir information for replica {}", (Object)e.getMessage(), entry.getKey());
            }
        }
        return logdirInfoByTask;
    }

    static void executeIntraBrokerReplicaMovements(List<ExecutionTask> tasksToExecute, AdminClient adminClient, ExecutionTaskManager executionTaskManager, KafkaCruiseControlConfig config) {
        HashMap replicaAssignment = new HashMap(tasksToExecute.size());
        HashMap replicaToTask = new HashMap(tasksToExecute.size());
        tasksToExecute.forEach(t -> {
            TopicPartitionReplica tpr = new TopicPartitionReplica(t.proposal().topic(), t.proposal().partitionId(), t.brokerId());
            replicaAssignment.put(tpr, t.proposal().replicasToMoveBetweenDisksByBroker().get(t.brokerId()).logdir());
            replicaToTask.put(tpr, t);
        });
        for (Map.Entry entry : adminClient.alterReplicaLogDirs(replicaAssignment).values().entrySet()) {
            try {
                ((KafkaFuture)entry.getValue()).get(config.getLong("logdir.response.timeout.ms").longValue(), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException | ExecutionException | TimeoutException | KafkaStorageException | LogDirNotFoundException | ReplicaNotAvailableException e) {
                LOG.warn("Encounter exception {} when trying to execute task {}, mark task dead.", (Object)e.getMessage(), replicaToTask.get(entry.getKey()));
                executionTaskManager.markTaskAborting((ExecutionTask)replicaToTask.get(entry.getKey()));
                executionTaskManager.markTaskDead((ExecutionTask)replicaToTask.get(entry.getKey()));
            }
        }
    }

    static boolean hasOngoingIntraBrokerReplicaMovement(Collection<Integer> brokersToCheck, AdminClient adminClient, KafkaCruiseControlConfig config) throws InterruptedException, ExecutionException, TimeoutException {
        Map logDirsByBrokerId = adminClient.describeLogDirs(brokersToCheck).values();
        for (Map.Entry entry : logDirsByBrokerId.entrySet()) {
            Map logInfos = (Map)((KafkaFuture)entry.getValue()).get(config.getLong("logdir.response.timeout.ms").longValue(), TimeUnit.MILLISECONDS);
            for (DescribeLogDirsResponse.LogDirInfo info : logInfos.values()) {
                if (info.error != Errors.NONE || !info.replicaInfos.values().stream().anyMatch(i -> i.isFuture)) continue;
                return true;
            }
        }
        return false;
    }
}

