/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.validation;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.ValidationMetrics;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.controller.util.SegmentIntervalUtils;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.joda.time.base.BaseInterval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OfflineSegmentIntervalChecker
extends ControllerPeriodicTask<Void> {
    private static final Logger LOGGER = LoggerFactory.getLogger(OfflineSegmentIntervalChecker.class);
    private final ValidationMetrics _validationMetrics;

    public OfflineSegmentIntervalChecker(ControllerConf config, PinotHelixResourceManager pinotHelixResourceManager, LeadControllerManager leadControllerManager, ValidationMetrics validationMetrics, ControllerMetrics controllerMetrics) {
        super("OfflineSegmentIntervalChecker", config.getOfflineSegmentIntervalCheckerFrequencyInSeconds(), config.getOfflineSegmentIntervalCheckerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager, controllerMetrics);
        this._validationMetrics = validationMetrics;
    }

    @Override
    protected void processTable(String tableNameWithType) {
        TableType tableType = TableNameBuilder.getTableTypeFromTableName((String)tableNameWithType);
        if (tableType == TableType.OFFLINE) {
            TableConfig tableConfig = this._pinotHelixResourceManager.getTableConfig(tableNameWithType);
            if (tableConfig == null) {
                LOGGER.warn("Failed to find table config for table: {}, skipping validation", (Object)tableNameWithType);
                return;
            }
            this.validateOfflineSegmentPush(tableConfig);
        }
    }

    private void validateOfflineSegmentPush(TableConfig tableConfig) {
        long endTimeMs;
        SegmentsValidationAndRetentionConfig validationConfig;
        String offlineTableName = tableConfig.getTableName();
        List<SegmentZKMetadata> segmentsZKMetadata = this._pinotHelixResourceManager.getSegmentsZKMetadata(offlineTableName);
        int numMissingSegments = 0;
        int numSegments = segmentsZKMetadata.size();
        if (SegmentIntervalUtils.eligibleForMissingSegmentCheck(numSegments, validationConfig = tableConfig.getValidationConfig())) {
            ArrayList<Interval> segmentIntervals = new ArrayList<Interval>(numSegments);
            int numSegmentsWithInvalidIntervals = 0;
            for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
                long startTimeMs = segmentZKMetadata.getStartTimeMs();
                endTimeMs = segmentZKMetadata.getEndTimeMs();
                if (TimeUtils.timeValueInValidRange((long)startTimeMs) && TimeUtils.timeValueInValidRange((long)endTimeMs)) {
                    segmentIntervals.add(new Interval(startTimeMs, endTimeMs));
                    continue;
                }
                ++numSegmentsWithInvalidIntervals;
            }
            if (numSegmentsWithInvalidIntervals > 0) {
                LOGGER.warn("Table: {} has {} segments with invalid interval", (Object)offlineTableName, (Object)numSegmentsWithInvalidIntervals);
            }
            Duration frequency = SegmentIntervalUtils.convertToDuration(IngestionConfigUtils.getBatchSegmentIngestionFrequency((TableConfig)tableConfig));
            numMissingSegments = OfflineSegmentIntervalChecker.computeNumMissingSegments(segmentIntervals, frequency);
        }
        this._validationMetrics.updateMissingSegmentCountGauge(offlineTableName, numMissingSegments);
        long maxSegmentEndTime = Long.MIN_VALUE;
        long maxSegmentPushTime = Long.MIN_VALUE;
        for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
            long segmentRefreshTime;
            long segmentPushTime;
            long segmentUpdateTime;
            endTimeMs = segmentZKMetadata.getEndTimeMs();
            if (TimeUtils.timeValueInValidRange((long)endTimeMs) && maxSegmentEndTime < endTimeMs) {
                maxSegmentEndTime = endTimeMs;
            }
            if (maxSegmentPushTime >= (segmentUpdateTime = Math.max(segmentPushTime = segmentZKMetadata.getPushTime(), segmentRefreshTime = segmentZKMetadata.getRefreshTime()))) continue;
            maxSegmentPushTime = segmentUpdateTime;
        }
        this._validationMetrics.updateOfflineSegmentDelayGauge(offlineTableName, maxSegmentEndTime);
        this._validationMetrics.updateLastPushTimeGauge(offlineTableName, maxSegmentPushTime);
        this._validationMetrics.updateTotalDocumentCountGauge(offlineTableName, OfflineSegmentIntervalChecker.computeOfflineTotalDocumentInSegments(segmentsZKMetadata));
        this._validationMetrics.updateSegmentCountGauge(offlineTableName, (long)numSegments);
    }

    @Override
    protected void nonLeaderCleanup(List<String> tableNamesWithType) {
        for (String tableNameWithType : tableNamesWithType) {
            TableType tableType = TableNameBuilder.getTableTypeFromTableName((String)tableNameWithType);
            if (tableType != TableType.OFFLINE) continue;
            this._validationMetrics.cleanupMissingSegmentCountGauge(tableNameWithType);
            this._validationMetrics.cleanupOfflineSegmentDelayGauge(tableNameWithType);
            this._validationMetrics.cleanupLastPushTimeGauge(tableNameWithType);
            this._validationMetrics.cleanupTotalDocumentCountGauge(tableNameWithType);
            this._validationMetrics.cleanupSegmentCountGauge(tableNameWithType);
        }
    }

    @VisibleForTesting
    static int computeNumMissingSegments(List<Interval> segmentIntervals, Duration frequency) {
        int numSegments = segmentIntervals.size();
        if (numSegments < 2) {
            return 0;
        }
        segmentIntervals.sort(Comparator.comparingLong(BaseInterval::getStartMillis));
        int numMissingSegments = 0;
        long frequencyMs = frequency.getMillis();
        long lastStartTimeMs = -1L;
        for (Interval segmentInterval : segmentIntervals) {
            long startTimeMs = segmentInterval.getStartMillis();
            if (lastStartTimeMs != -1L && startTimeMs - lastStartTimeMs > frequencyMs) {
                numMissingSegments = (int)((long)numMissingSegments + (startTimeMs - lastStartTimeMs - frequencyMs) / frequencyMs);
            }
            long endTimeMs = segmentInterval.getEndMillis();
            while (startTimeMs + frequencyMs <= endTimeMs) {
                startTimeMs += frequencyMs;
            }
            lastStartTimeMs = Math.max(lastStartTimeMs, startTimeMs);
        }
        return numMissingSegments;
    }

    @VisibleForTesting
    static long computeOfflineTotalDocumentInSegments(List<SegmentZKMetadata> segmentsZKMetadata) {
        long numTotalDocs = 0L;
        for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
            numTotalDocs += segmentZKMetadata.getTotalDocs();
        }
        return numTotalDocs;
    }

    @VisibleForTesting
    public ValidationMetrics getValidationMetrics() {
        return this._validationMetrics;
    }

    public void cleanUpTask() {
        LOGGER.info("Unregister all the validation metrics.");
        this._validationMetrics.unregisterAllMetrics();
    }
}

