/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core.rebalance.tenant;

import com.google.common.collect.Sets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceContext;
import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceObserver;
import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalancer;
import org.apache.pinot.controller.helix.core.rebalance.tenant.ZkBasedTenantRebalanceObserver;
import org.apache.pinot.spi.config.table.TableConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultTenantRebalancer
implements TenantRebalancer {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultTenantRebalancer.class);
    PinotHelixResourceManager _pinotHelixResourceManager;
    ExecutorService _executorService;

    public DefaultTenantRebalancer(PinotHelixResourceManager pinotHelixResourceManager, ExecutorService executorService) {
        this._pinotHelixResourceManager = pinotHelixResourceManager;
        this._executorService = executorService;
    }

    @Override
    public TenantRebalanceResult rebalance(TenantRebalanceContext context) {
        HashMap<String, RebalanceResult> rebalanceResult = new HashMap<String, RebalanceResult>();
        Set<String> tables = this.getTenantTables(context.getTenantName());
        tables.forEach(table -> {
            try {
                Configuration config = this.extractRebalanceConfig(context);
                config.setProperty("dryRun", (Object)true);
                rebalanceResult.put((String)table, this._pinotHelixResourceManager.rebalanceTable((String)table, config, false));
            }
            catch (TableNotFoundException exception) {
                rebalanceResult.put((String)table, new RebalanceResult(null, RebalanceResult.Status.FAILED, exception.getMessage(), null, null, null));
            }
        });
        if (context.isDryRun().booleanValue() || context.isDowntime().booleanValue()) {
            return new TenantRebalanceResult(null, rebalanceResult, context.isVerboseResult());
        }
        for (String table2 : rebalanceResult.keySet()) {
            RebalanceResult result = (RebalanceResult)rebalanceResult.get(table2);
            if (result.getStatus() != RebalanceResult.Status.DONE) continue;
            rebalanceResult.put(table2, new RebalanceResult(result.getJobId(), RebalanceResult.Status.IN_PROGRESS, "In progress, check controller task status for the", result.getInstanceAssignment(), result.getTierInstanceAssignment(), result.getSegmentAssignment()));
        }
        String tenantRebalanceJobId = this.createUniqueRebalanceJobIdentifier();
        ZkBasedTenantRebalanceObserver observer = new ZkBasedTenantRebalanceObserver(tenantRebalanceJobId, context.getTenantName(), tables, this._pinotHelixResourceManager);
        observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, null);
        LinkedList sequentialQueue = new LinkedList();
        ConcurrentLinkedDeque parallelQueue = new ConcurrentLinkedDeque();
        int parallelism = Math.max(context.getDegreeOfParallelism(), 1);
        Set<String> dimTables = this.getDimensionalTables(context.getTenantName());
        AtomicInteger activeThreads = new AtomicInteger(parallelism);
        try {
            if (parallelism > 1) {
                Object parallelTables = !context.getParallelWhitelist().isEmpty() ? new HashSet<String>(context.getParallelWhitelist()) : new HashSet(tables);
                if (!context.getParallelBlacklist().isEmpty()) {
                    parallelTables = Sets.difference(parallelTables, context.getParallelBlacklist());
                }
                parallelTables.forEach(table -> {
                    if (dimTables.contains(table)) {
                        parallelQueue.addFirst(table);
                    } else {
                        parallelQueue.addLast(table);
                    }
                });
                Sets.difference(tables, (Set)parallelTables).forEach(table -> {
                    if (dimTables.contains(table)) {
                        sequentialQueue.addFirst(table);
                    } else {
                        sequentialQueue.addLast(table);
                    }
                });
            } else {
                tables.forEach(table -> {
                    if (dimTables.contains(table)) {
                        sequentialQueue.addFirst(table);
                    } else {
                        sequentialQueue.addLast(table);
                    }
                });
            }
            for (int i = 0; i < parallelism; ++i) {
                this._executorService.submit(() -> {
                    String table;
                    while ((table = (String)parallelQueue.pollFirst()) != null) {
                        Configuration config = this.extractRebalanceConfig(context);
                        config.setProperty("dryRun", (Object)false);
                        config.setProperty("jobId", (Object)((RebalanceResult)rebalanceResult.get(table)).getJobId());
                        this.rebalanceTable(table, config, observer);
                    }
                    if (activeThreads.decrementAndGet() == 0) {
                        String table2;
                        Configuration config = this.extractRebalanceConfig(context);
                        config.setProperty("dryRun", (Object)false);
                        while ((table2 = (String)sequentialQueue.pollFirst()) != null) {
                            config.setProperty("jobId", (Object)((RebalanceResult)rebalanceResult.get(table2)).getJobId());
                            this.rebalanceTable(table2, config, observer);
                        }
                        observer.onSuccess(String.format("Successfully rebalanced tenant %s.", context.getTenantName()));
                    }
                });
            }
        }
        catch (Exception exception) {
            observer.onError(String.format("Failed to rebalance the tenant %s. Cause: %s", context.getTenantName(), exception.getMessage()));
        }
        return new TenantRebalanceResult(tenantRebalanceJobId, rebalanceResult, context.isVerboseResult());
    }

    private Set<String> getDimensionalTables(String tenantName) {
        HashSet<String> dimTables = new HashSet<String>();
        for (String table : this._pinotHelixResourceManager.getAllTables()) {
            TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(table);
            if (tableConfig == null) {
                LOGGER.error("Unable to retrieve table config for table: {}", (Object)table);
                continue;
            }
            if (!tenantName.equals(tableConfig.getTenantConfig().getServer()) || !tableConfig.isDimTable()) continue;
            dimTables.add(table);
        }
        return dimTables;
    }

    private Configuration extractRebalanceConfig(TenantRebalanceContext context) {
        BaseConfiguration rebalanceConfig = new BaseConfiguration();
        rebalanceConfig.addProperty("dryRun", (Object)context.isDryRun());
        rebalanceConfig.addProperty("reassignInstances", (Object)context.isReassignInstances());
        rebalanceConfig.addProperty("includeConsuming", (Object)context.isIncludeConsuming());
        rebalanceConfig.addProperty("bootstrap", (Object)context.isBootstrap());
        rebalanceConfig.addProperty("downtime", (Object)context.isDowntime());
        rebalanceConfig.addProperty("minReplicasToKeepUpForNoDowntime", (Object)context.getMinAvailableReplicas());
        rebalanceConfig.addProperty("bestEfforts", (Object)context.isBestEfforts());
        rebalanceConfig.addProperty("externalViewCheckIntervalInMs", (Object)context.getExternalViewCheckIntervalInMs());
        rebalanceConfig.addProperty("externalViewStabilizationTimeoutInMs", (Object)context.getExternalViewStabilizationTimeoutInMs());
        rebalanceConfig.addProperty("updateTargetTier", (Object)context.isUpdateTargetTier());
        rebalanceConfig.addProperty("jobId", (Object)this.createUniqueRebalanceJobIdentifier());
        return rebalanceConfig;
    }

    private String createUniqueRebalanceJobIdentifier() {
        return UUID.randomUUID().toString();
    }

    private Set<String> getTenantTables(String tenantName) {
        HashSet<String> tables = new HashSet<String>();
        for (String table : this._pinotHelixResourceManager.getAllTables()) {
            TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(table);
            if (tableConfig == null) {
                LOGGER.error("Unable to retrieve table config for table: {}", (Object)table);
                continue;
            }
            String tableConfigTenant = tableConfig.getTenantConfig().getServer();
            if (!tenantName.equals(tableConfigTenant)) continue;
            tables.add(table);
        }
        return tables;
    }

    private void rebalanceTable(String tableName, Configuration config, TenantRebalanceObserver observer) {
        try {
            observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER, tableName, config.getString("jobId"));
            RebalanceResult result = this._pinotHelixResourceManager.rebalanceTable(tableName, config, true);
            if (result.getStatus().equals((Object)RebalanceResult.Status.DONE)) {
                observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER, tableName, null);
            } else {
                observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER, tableName, result.getDescription());
            }
        }
        catch (Throwable t) {
            observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_ERRORED_TRIGGER, tableName, String.format("Caught exception/error while rebalancing table: %s", tableName));
        }
    }
}

