/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator.duty;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.client.ImmutableDruidDataSource;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.CloneStatusManager;
import org.apache.druid.server.coordinator.DruidCluster;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.ServerCloneStatus;
import org.apache.druid.server.coordinator.ServerHolder;
import org.apache.druid.server.coordinator.duty.CoordinatorDuty;
import org.apache.druid.server.coordinator.loading.SegmentAction;
import org.apache.druid.server.coordinator.loading.SegmentLoadQueueManager;
import org.apache.druid.server.coordinator.stats.Dimension;
import org.apache.druid.server.coordinator.stats.RowKey;
import org.apache.druid.server.coordinator.stats.Stats;
import org.apache.druid.timeline.DataSegment;

public class CloneHistoricals
implements CoordinatorDuty {
    private static final Logger log = new Logger(CloneHistoricals.class);
    private final SegmentLoadQueueManager loadQueueManager;
    private final CloneStatusManager cloneStatusManager;

    public CloneHistoricals(SegmentLoadQueueManager loadQueueManager, CloneStatusManager cloneStatusManager) {
        this.loadQueueManager = loadQueueManager;
        this.cloneStatusManager = cloneStatusManager;
    }

    @Override
    public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) {
        Map<String, String> cloneServers = params.getCoordinatorDynamicConfig().getCloneServers();
        DruidCluster cluster = params.getDruidCluster();
        if (cloneServers.isEmpty()) {
            return params;
        }
        Map<String, ServerHolder> hostToHistoricalMap = cluster.getHistoricals().values().stream().flatMap(Collection::stream).collect(Collectors.toMap(serverHolder -> serverHolder.getServer().getHost(), serverHolder -> serverHolder));
        for (Map.Entry<String, String> entry : cloneServers.entrySet()) {
            String targetHistoricalName = entry.getKey();
            ServerHolder targetServer = hostToHistoricalMap.get(targetHistoricalName);
            String sourceHistoricalName = entry.getValue();
            ServerHolder sourceServer = hostToHistoricalMap.get(sourceHistoricalName);
            if (sourceServer == null || targetServer == null) {
                log.error("Could not process clone mapping[%s] as historical[%s] does not exist.", new Object[]{entry, sourceServer == null ? sourceHistoricalName : targetHistoricalName});
                continue;
            }
            Set<DataSegment> sourceProjectedSegments = sourceServer.getProjectedSegments();
            Set<DataSegment> targetProjectedSegments = targetServer.getProjectedSegments();
            for (DataSegment segment : sourceProjectedSegments) {
                if (targetProjectedSegments.contains(segment)) continue;
                this.loadSegmentOnTargetServer(segment, targetServer, params);
            }
            for (DataSegment segment : targetProjectedSegments) {
                if (sourceProjectedSegments.contains(segment)) continue;
                this.dropSegmentFromTargetServer(segment, targetServer, params);
            }
        }
        Map<String, ServerCloneStatus> newStatusMap = this.createCurrentStatusMap(hostToHistoricalMap, cloneServers);
        this.cloneStatusManager.updateStatus(newStatusMap);
        return params;
    }

    private void loadSegmentOnTargetServer(DataSegment segment, ServerHolder targetServer, DruidCoordinatorRuntimeParams params) {
        RowKey.Builder rowKey = RowKey.with(Dimension.SERVER, targetServer.getServer().getName()).with(Dimension.DATASOURCE, segment.getDataSource());
        DataSegment loadableSegment = this.getLoadableSegment(segment, params);
        if (loadableSegment == null) {
            params.getCoordinatorStats().add(Stats.Segments.ASSIGN_SKIPPED, rowKey.and(Dimension.DESCRIPTION, "Segment not found in metadata cache"), 1L);
        } else if (this.loadQueueManager.loadSegment(loadableSegment, targetServer, SegmentAction.LOAD)) {
            params.getCoordinatorStats().add(Stats.Segments.ASSIGNED_TO_CLONE, rowKey.build(), 1L);
        }
    }

    private void dropSegmentFromTargetServer(DataSegment segment, ServerHolder targetServer, DruidCoordinatorRuntimeParams params) {
        if (targetServer.isLoadingSegment(segment)) {
            targetServer.cancelOperation(SegmentAction.LOAD, segment);
        } else if (this.loadQueueManager.dropSegment(segment, targetServer)) {
            params.getCoordinatorStats().add(Stats.Segments.DROPPED_FROM_CLONE, RowKey.of(Dimension.SERVER, targetServer.getServer().getName()), 1L);
        }
    }

    @Nullable
    private DataSegment getLoadableSegment(DataSegment segmentToMove, DruidCoordinatorRuntimeParams params) {
        if (!params.isUsedSegment(segmentToMove)) {
            return null;
        }
        ImmutableDruidDataSource datasource = params.getDataSourcesSnapshot().getDataSource(segmentToMove.getDataSource());
        if (datasource == null) {
            return null;
        }
        return datasource.getSegment(segmentToMove.getId());
    }

    private Map<String, ServerCloneStatus> createCurrentStatusMap(Map<String, ServerHolder> historicalMap, Map<String, String> cloneServers) {
        HashMap<String, ServerCloneStatus> newStatusMap = new HashMap<String, ServerCloneStatus>();
        for (Map.Entry<String, String> entry : cloneServers.entrySet()) {
            ServerCloneStatus newStatus;
            String targetServerName = entry.getKey();
            ServerHolder targetServer = historicalMap.get(entry.getKey());
            String sourceServerName = entry.getValue();
            long segmentLoad = 0L;
            long bytesLeft = 0L;
            long segmentDrop = 0L;
            if (targetServer == null) {
                newStatus = ServerCloneStatus.unknown(sourceServerName, targetServerName);
            } else {
                ServerCloneStatus.State state = !historicalMap.containsKey(sourceServerName) ? ServerCloneStatus.State.SOURCE_SERVER_MISSING : ServerCloneStatus.State.IN_PROGRESS;
                for (Map.Entry<DataSegment, SegmentAction> queuedSegment : targetServer.getQueuedSegments().entrySet()) {
                    if (queuedSegment.getValue().isLoad()) {
                        ++segmentLoad;
                        bytesLeft += queuedSegment.getKey().getSize();
                        continue;
                    }
                    ++segmentDrop;
                }
                newStatus = new ServerCloneStatus(sourceServerName, targetServerName, state, segmentLoad, segmentDrop, bytesLeft);
            }
            newStatusMap.put(targetServerName, newStatus);
        }
        return newStatusMap;
    }
}

