/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core.rebalance;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceObserver;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZkBasedTableRebalanceObserver
implements TableRebalanceObserver {
    private static final Logger LOGGER = LoggerFactory.getLogger(ZkBasedTableRebalanceObserver.class);
    private final String _tableNameWithType;
    private final String _rebalanceJobId;
    private final PinotHelixResourceManager _pinotHelixResourceManager;
    private TableRebalanceProgressStats _tableRebalanceProgressStats;
    private int _numUpdatesToZk;

    public ZkBasedTableRebalanceObserver(String tableNameWithType, String rebalanceJobId, PinotHelixResourceManager pinotHelixResourceManager) {
        Preconditions.checkState((tableNameWithType != null ? 1 : 0) != 0, (Object)"Table name cannot be null");
        Preconditions.checkState((rebalanceJobId != null ? 1 : 0) != 0, (Object)"rebalanceId cannot be null");
        Preconditions.checkState((pinotHelixResourceManager != null ? 1 : 0) != 0, (Object)"PinotHelixManager cannot be null");
        this._tableNameWithType = tableNameWithType;
        this._rebalanceJobId = rebalanceJobId;
        this._pinotHelixResourceManager = pinotHelixResourceManager;
        this._tableRebalanceProgressStats = new TableRebalanceProgressStats();
        this._numUpdatesToZk = 0;
    }

    @Override
    public void onTrigger(TableRebalanceObserver.Trigger trigger, Map<String, Map<String, String>> currentState, Map<String, Map<String, String>> targetState) {
        switch (trigger) {
            case START_TRIGGER: {
                this.updateOnStart(currentState, targetState);
                this.trackStatsInZk();
                break;
            }
            case IDEAL_STATE_CHANGE_TRIGGER: {
                TableRebalanceProgressStats.RebalanceStateStats latest = ZkBasedTableRebalanceObserver.getDifferenceBetweenTableRebalanceStates(targetState, currentState);
                if (!TableRebalanceProgressStats.statsDiffer(this._tableRebalanceProgressStats.getCurrentToTargetConvergence(), latest)) break;
                this._tableRebalanceProgressStats.setCurrentToTargetConvergence(latest);
                this.trackStatsInZk();
                break;
            }
            case EXTERNAL_VIEW_TO_IDEAL_STATE_CONVERGENCE_TRIGGER: {
                TableRebalanceProgressStats.RebalanceStateStats latest = ZkBasedTableRebalanceObserver.getDifferenceBetweenTableRebalanceStates(targetState, currentState);
                if (!TableRebalanceProgressStats.statsDiffer(this._tableRebalanceProgressStats.getExternalViewToIdealStateConvergence(), latest)) break;
                this._tableRebalanceProgressStats.setExternalViewToIdealStateConvergence(latest);
                this.trackStatsInZk();
                break;
            }
            default: {
                throw new IllegalArgumentException("Unimplemented trigger: " + trigger);
            }
        }
    }

    private void updateOnStart(Map<String, Map<String, String>> currentState, Map<String, Map<String, String>> targetState) {
        Preconditions.checkState((this._tableRebalanceProgressStats.getStatus() != RebalanceResult.Status.IN_PROGRESS.toString() ? 1 : 0) != 0, (Object)"Rebalance Observer onStart called multiple times");
        this._tableRebalanceProgressStats.setStatus(RebalanceResult.Status.IN_PROGRESS.toString());
        this._tableRebalanceProgressStats.setInitialToTargetStateConvergence(ZkBasedTableRebalanceObserver.getDifferenceBetweenTableRebalanceStates(targetState, currentState));
        this._tableRebalanceProgressStats.setStartTimeMs(System.currentTimeMillis());
    }

    @Override
    public void onSuccess(String msg) {
        Preconditions.checkState((this._tableRebalanceProgressStats.getStatus() != RebalanceResult.Status.DONE.toString() ? 1 : 0) != 0, (Object)"Table Rebalance already completed");
        long timeToFinishInSeconds = (System.currentTimeMillis() - this._tableRebalanceProgressStats.getStartTimeMs()) / 1000L;
        this._tableRebalanceProgressStats.setCompletionStatusMsg(msg);
        this._tableRebalanceProgressStats.setTimeToFinishInSeconds(timeToFinishInSeconds);
        this._tableRebalanceProgressStats.setStatus(RebalanceResult.Status.DONE.toString());
        TableRebalanceProgressStats.RebalanceStateStats stats = new TableRebalanceProgressStats.RebalanceStateStats();
        this._tableRebalanceProgressStats.setExternalViewToIdealStateConvergence(stats);
        this._tableRebalanceProgressStats.setCurrentToTargetConvergence(stats);
        this.trackStatsInZk();
    }

    @Override
    public void onError(String errorMsg) {
        long timeToFinishInSeconds = (System.currentTimeMillis() - this._tableRebalanceProgressStats.getStartTimeMs()) / 1000L;
        this._tableRebalanceProgressStats.setTimeToFinishInSeconds(timeToFinishInSeconds);
        this._tableRebalanceProgressStats.setStatus(RebalanceResult.Status.FAILED.toString());
        this._tableRebalanceProgressStats.setCompletionStatusMsg(errorMsg);
        this.trackStatsInZk();
    }

    public int getNumUpdatesToZk() {
        return this._numUpdatesToZk;
    }

    private void trackStatsInZk() {
        HashMap<String, String> jobMetadata = new HashMap<String, String>();
        jobMetadata.put("tableName", this._tableNameWithType);
        jobMetadata.put("jobId", this._rebalanceJobId);
        jobMetadata.put("submissionTimeMs", Long.toString(System.currentTimeMillis()));
        jobMetadata.put("jobType", ControllerJobType.TABLE_REBALANCE.name());
        try {
            jobMetadata.put("REBALANCE_PROGRESS_STATS", JsonUtils.objectToString((Object)this._tableRebalanceProgressStats));
        }
        catch (JsonProcessingException e) {
            LOGGER.error("Error serialising rebalance stats to JSON for persisting to ZK {}", (Object)this._rebalanceJobId, (Object)e);
        }
        this._pinotHelixResourceManager.addControllerJobToZK(this._rebalanceJobId, jobMetadata, ZKMetadataProvider.constructPropertyStorePathForControllerJob((ControllerJobType)ControllerJobType.TABLE_REBALANCE));
        ++this._numUpdatesToZk;
        LOGGER.debug("Number of updates to Zk: {} for rebalanceJob: {}  ", (Object)this._numUpdatesToZk, (Object)this._rebalanceJobId);
    }

    public static TableRebalanceProgressStats.RebalanceStateStats getDifferenceBetweenTableRebalanceStates(Map<String, Map<String, String>> targetState, Map<String, Map<String, String>> sourceState) {
        TableRebalanceProgressStats.RebalanceStateStats rebalanceStats = new TableRebalanceProgressStats.RebalanceStateStats();
        for (Map.Entry<String, Map<String, String>> entry : targetState.entrySet()) {
            String segmentName = entry.getKey();
            Map<String, String> sourceInstanceStateMap = sourceState.get(segmentName);
            if (sourceInstanceStateMap == null) {
                ++rebalanceStats._segmentsMissing;
                ++rebalanceStats._segmentsToRebalance;
                continue;
            }
            Map<String, String> targetStateInstanceStateMap = entry.getValue();
            boolean hasSegmentConverged = true;
            for (Map.Entry<String, String> instanceStateEntry : targetStateInstanceStateMap.entrySet()) {
                String instanceName;
                String sourceInstanceState;
                String targetStateInstanceState = instanceStateEntry.getValue();
                if (targetStateInstanceState.equals("OFFLINE") || targetStateInstanceState.equals(sourceInstanceState = sourceInstanceStateMap.get(instanceName = instanceStateEntry.getKey()))) continue;
                ++rebalanceStats._replicasToRebalance;
                hasSegmentConverged = false;
            }
            if (hasSegmentConverged) continue;
            ++rebalanceStats._segmentsToRebalance;
        }
        int totalSegments = targetState.size();
        rebalanceStats._percentSegmentsToRebalance = totalSegments == 0 ? 0.0 : (double)rebalanceStats._segmentsToRebalance / (double)totalSegments * 100.0;
        return rebalanceStats;
    }
}

