/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.controller.helix.core.realtime;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.helix.AccessOption;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.AbstractMetrics;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MissingConsumingSegmentFinder {
    private static final Logger LOGGER = LoggerFactory.getLogger(MissingConsumingSegmentFinder.class);
    private final String _realtimeTableName;
    private final SegmentMetadataFetcher _segmentMetadataFetcher;
    private final Map<Integer, StreamPartitionMsgOffset> _partitionGroupIdToLargestStreamOffsetMap;
    private final StreamPartitionMsgOffsetFactory _streamPartitionMsgOffsetFactory;
    private ControllerMetrics _controllerMetrics;

    public MissingConsumingSegmentFinder(String realtimeTableName, ZkHelixPropertyStore<ZNRecord> propertyStore, ControllerMetrics controllerMetrics, PartitionLevelStreamConfig streamConfig) {
        this._realtimeTableName = realtimeTableName;
        this._controllerMetrics = controllerMetrics;
        this._segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore, controllerMetrics);
        this._streamPartitionMsgOffsetFactory = StreamConsumerFactoryProvider.create((StreamConfig)streamConfig).createStreamMsgOffsetFactory();
        this._partitionGroupIdToLargestStreamOffsetMap = new HashMap<Integer, StreamPartitionMsgOffset>();
        streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA);
        try {
            PinotTableIdealStateBuilder.getPartitionGroupMetadataList((StreamConfig)streamConfig, Collections.emptyList()).forEach(metadata -> this._partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), metadata.getStartOffset()));
        }
        catch (Exception e) {
            LOGGER.warn("Problem encountered in fetching stream metadata for topic: {} of table: {}. Continue finding missing consuming segment only with ideal state information.", (Object)streamConfig.getTopicName(), (Object)streamConfig.getTableNameWithType());
        }
    }

    @VisibleForTesting
    MissingConsumingSegmentFinder(String realtimeTableName, SegmentMetadataFetcher segmentMetadataFetcher, Map<Integer, StreamPartitionMsgOffset> partitionGroupIdToLargestStreamOffsetMap, StreamPartitionMsgOffsetFactory streamPartitionMsgOffsetFactory) {
        this._realtimeTableName = realtimeTableName;
        this._segmentMetadataFetcher = segmentMetadataFetcher;
        this._partitionGroupIdToLargestStreamOffsetMap = partitionGroupIdToLargestStreamOffsetMap;
        this._streamPartitionMsgOffsetFactory = streamPartitionMsgOffsetFactory;
    }

    public void findAndEmitMetrics(IdealState idealState) {
        MissingSegmentInfo info = this.findMissingSegments(idealState.getRecord().getMapFields(), Instant.now());
        this._controllerMetrics.setValueOfTableGauge(this._realtimeTableName, (AbstractMetrics.Gauge)ControllerGauge.MISSING_CONSUMING_SEGMENT_TOTAL_COUNT, info._totalCount);
        this._controllerMetrics.setValueOfTableGauge(this._realtimeTableName, (AbstractMetrics.Gauge)ControllerGauge.MISSING_CONSUMING_SEGMENT_NEW_PARTITION_COUNT, info._newPartitionGroupCount);
        this._controllerMetrics.setValueOfTableGauge(this._realtimeTableName, (AbstractMetrics.Gauge)ControllerGauge.MISSING_CONSUMING_SEGMENT_MAX_DURATION_MINUTES, info._maxDurationInMinutes);
    }

    @VisibleForTesting
    MissingSegmentInfo findMissingSegments(Map<String, Map<String, String>> idealStateMap, Instant now) {
        HashMap partitionGroupIdToLatestConsumingSegmentMap = new HashMap();
        HashMap<Integer, LLCSegmentName> partitionGroupIdToLatestCompletedSegmentMap = new HashMap<Integer, LLCSegmentName>();
        idealStateMap.forEach((segmentName, instanceToStatusMap) -> {
            LLCSegmentName llcSegmentName = LLCSegmentName.of((String)segmentName);
            if (llcSegmentName != null) {
                if (instanceToStatusMap.containsValue("CONSUMING")) {
                    this.updateMap(partitionGroupIdToLatestConsumingSegmentMap, llcSegmentName);
                } else if (instanceToStatusMap.containsValue("ONLINE")) {
                    this.updateMap(partitionGroupIdToLatestCompletedSegmentMap, llcSegmentName);
                }
            }
        });
        MissingSegmentInfo missingSegmentInfo = new MissingSegmentInfo();
        if (!this._partitionGroupIdToLargestStreamOffsetMap.isEmpty()) {
            this._partitionGroupIdToLargestStreamOffsetMap.forEach((partitionGroupId, largestStreamOffset) -> {
                if (!partitionGroupIdToLatestConsumingSegmentMap.containsKey(partitionGroupId)) {
                    LLCSegmentName latestCompletedSegment = (LLCSegmentName)partitionGroupIdToLatestCompletedSegmentMap.get(partitionGroupId);
                    if (latestCompletedSegment == null) {
                        ++missingSegmentInfo._newPartitionGroupCount;
                        ++missingSegmentInfo._totalCount;
                    } else {
                        SegmentZKMetadata segmentZKMetadata = this._segmentMetadataFetcher.fetchSegmentZkMetadata(this._realtimeTableName, latestCompletedSegment.getSegmentName());
                        StreamPartitionMsgOffset completedSegmentEndOffset = this._streamPartitionMsgOffsetFactory.create(segmentZKMetadata.getEndOffset());
                        if (completedSegmentEndOffset.compareTo(largestStreamOffset) < 0) {
                            ++missingSegmentInfo._totalCount;
                            this.updateMaxDurationInfo(missingSegmentInfo, (Integer)partitionGroupId, segmentZKMetadata.getCreationTime(), now);
                        }
                    }
                }
            });
        } else {
            partitionGroupIdToLatestCompletedSegmentMap.forEach((partitionGroupId, latestCompletedSegment) -> {
                if (!partitionGroupIdToLatestConsumingSegmentMap.containsKey(partitionGroupId)) {
                    ++missingSegmentInfo._totalCount;
                    long segmentCompletionTimeMillis = this._segmentMetadataFetcher.fetchSegmentCompletionTime(this._realtimeTableName, latestCompletedSegment.getSegmentName());
                    this.updateMaxDurationInfo(missingSegmentInfo, (Integer)partitionGroupId, segmentCompletionTimeMillis, now);
                }
            });
        }
        return missingSegmentInfo;
    }

    private void updateMaxDurationInfo(MissingSegmentInfo missingSegmentInfo, Integer partitionGroupId, long segmentCompletionTimeMillis, Instant now) {
        long duration = Duration.between(Instant.ofEpochMilli(segmentCompletionTimeMillis), now).toMinutes();
        if (duration > missingSegmentInfo._maxDurationInMinutes) {
            missingSegmentInfo._maxDurationInMinutes = duration;
        }
        LOGGER.warn("PartitionGroupId {} hasn't had a consuming segment for {} minutes!", (Object)partitionGroupId, (Object)duration);
    }

    private void updateMap(Map<Integer, LLCSegmentName> partitionGroupIdToLatestSegmentMap, LLCSegmentName llcSegmentName) {
        int partitionGroupId = llcSegmentName.getPartitionGroupId();
        partitionGroupIdToLatestSegmentMap.compute(partitionGroupId, (pid, existingSegment) -> {
            if (existingSegment == null) {
                return llcSegmentName;
            }
            return existingSegment.getSequenceNumber() > llcSegmentName.getSequenceNumber() ? existingSegment : llcSegmentName;
        });
    }

    static class SegmentMetadataFetcher {
        private ZkHelixPropertyStore<ZNRecord> _propertyStore;
        private ControllerMetrics _controllerMetrics;

        public SegmentMetadataFetcher(ZkHelixPropertyStore<ZNRecord> propertyStore, ControllerMetrics controllerMetrics) {
            this._propertyStore = propertyStore;
            this._controllerMetrics = controllerMetrics;
        }

        public SegmentZKMetadata fetchSegmentZkMetadata(String tableName, String segmentName) {
            return this.fetchSegmentZkMetadata(tableName, segmentName, null);
        }

        public long fetchSegmentCompletionTime(String tableName, String segmentName) {
            Stat stat = new Stat();
            this.fetchSegmentZkMetadata(tableName, segmentName, stat);
            return stat.getMtime();
        }

        private SegmentZKMetadata fetchSegmentZkMetadata(String tableName, String segmentName, Stat stat) {
            try {
                ZNRecord znRecord = (ZNRecord)this._propertyStore.get(ZKMetadataProvider.constructPropertyStorePathForSegment((String)tableName, (String)segmentName), stat, AccessOption.PERSISTENT);
                Preconditions.checkState((znRecord != null ? 1 : 0) != 0, (String)"Failed to find segment ZK metadata for segment: %s of table: %s", (Object)segmentName, (Object)tableName);
                return new SegmentZKMetadata(znRecord);
            }
            catch (Exception e) {
                this._controllerMetrics.addMeteredTableValue(tableName, (AbstractMetrics.Meter)ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
                throw e;
            }
        }
    }

    @VisibleForTesting
    static class MissingSegmentInfo {
        long _totalCount;
        long _newPartitionGroupCount;
        long _maxDurationInMinutes;

        MissingSegmentInfo() {
        }
    }
}

