/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.StringWriter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import org.apache.flink.AttributeBuilder;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.TraceOptions;
import org.apache.flink.events.EventBuilder;
import org.apache.flink.events.Events;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
import org.apache.flink.runtime.checkpoint.CheckpointStatsListener;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
import org.apache.flink.runtime.checkpoint.JobInitializationMetrics;
import org.apache.flink.runtime.checkpoint.JobInitializationMetricsBuilder;
import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetrics;
import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
import org.apache.flink.runtime.checkpoint.TaskStateStats;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
import org.apache.flink.runtime.rest.util.RestMapperUtils;
import org.apache.flink.runtime.util.LongArrayList;
import org.apache.flink.runtime.util.stats.StatsSummary;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.traces.Span;
import org.apache.flink.traces.SpanBuilder;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultCheckpointStatsTracker
implements CheckpointStatsTracker {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultCheckpointStatsTracker.class);
    private static final ObjectMapper MAPPER = RestMapperUtils.getStrictObjectMapper();
    private static final List<CheckpointSpanMetric> CHECKPOINT_SPAN_METRICS = Arrays.asList(CheckpointSpanMetric.of("StateSizeBytes", TaskStateStats.TaskStateStatsSummary::getStateSizeStats, SubtaskStateStats::getStateSize), CheckpointSpanMetric.of("CheckpointedSizeBytes", TaskStateStats.TaskStateStatsSummary::getCheckpointedSize, SubtaskStateStats::getCheckpointedSize), CheckpointSpanMetric.of("CheckpointStartDelayMs", TaskStateStats.TaskStateStatsSummary::getCheckpointStartDelayStats, SubtaskStateStats::getCheckpointStartDelay), CheckpointSpanMetric.of("AlignmentDurationMs", TaskStateStats.TaskStateStatsSummary::getAlignmentDurationStats, SubtaskStateStats::getAlignmentDuration), CheckpointSpanMetric.of("SyncCheckpointDurationMs", TaskStateStats.TaskStateStatsSummary::getSyncCheckpointDurationStats, SubtaskStateStats::getSyncCheckpointDuration), CheckpointSpanMetric.of("AsyncCheckpointDurationMs", TaskStateStats.TaskStateStatsSummary::getAsyncCheckpointDurationStats, SubtaskStateStats::getAsyncCheckpointDuration), CheckpointSpanMetric.of("ProcessedDataBytes", TaskStateStats.TaskStateStatsSummary::getProcessedDataStats, SubtaskStateStats::getProcessedData), CheckpointSpanMetric.of("PersistedDataBytes", TaskStateStats.TaskStateStatsSummary::getPersistedDataStats, SubtaskStateStats::getPersistedData));
    private final TraceOptions.CheckpointSpanDetailLevel checkpointSpanDetailLevel;
    private final ReentrantLock statsReadWriteLock = new ReentrantLock();
    private final CheckpointStatsCounts counts = new CheckpointStatsCounts();
    private final CompletedCheckpointStatsSummary summary = new CompletedCheckpointStatsSummary();
    private final CheckpointStatsHistory history;
    private final JobManagerJobMetricGroup metricGroup;
    private Optional<JobInitializationMetricsBuilder> jobInitializationMetricsBuilder = Optional.empty();
    @Nullable
    private final CheckpointStatsListener checkpointStatsListener;
    private volatile CheckpointStatsSnapshot latestSnapshot;
    private volatile boolean dirty;
    @Nullable
    private volatile CompletedCheckpointStats latestCompletedCheckpoint;
    @VisibleForTesting
    static final String NUMBER_OF_CHECKPOINTS_METRIC = "totalNumberOfCheckpoints";
    @VisibleForTesting
    static final String NUMBER_OF_IN_PROGRESS_CHECKPOINTS_METRIC = "numberOfInProgressCheckpoints";
    @VisibleForTesting
    static final String NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC = "numberOfCompletedCheckpoints";
    @VisibleForTesting
    static final String NUMBER_OF_FAILED_CHECKPOINTS_METRIC = "numberOfFailedCheckpoints";
    @VisibleForTesting
    static final String LATEST_RESTORED_CHECKPOINT_TIMESTAMP_METRIC = "lastCheckpointRestoreTimestamp";
    @VisibleForTesting
    static final String LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC = "lastCheckpointSize";
    @VisibleForTesting
    static final String LATEST_COMPLETED_CHECKPOINT_FULL_SIZE_METRIC = "lastCheckpointFullSize";
    @VisibleForTesting
    static final String LATEST_COMPLETED_CHECKPOINT_METADATA_SIZE_METRIC = "lastCheckpointMetadataSize";
    @VisibleForTesting
    static final String LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC = "lastCheckpointDuration";
    @VisibleForTesting
    static final String LATEST_COMPLETED_CHECKPOINT_PROCESSED_DATA_METRIC = "lastCheckpointProcessedData";
    @VisibleForTesting
    static final String LATEST_COMPLETED_CHECKPOINT_PERSISTED_DATA_METRIC = "lastCheckpointPersistedData";
    @VisibleForTesting
    static final String LATEST_COMPLETED_CHECKPOINT_EXTERNAL_PATH_METRIC = "lastCheckpointExternalPath";
    @VisibleForTesting
    static final String LATEST_COMPLETED_CHECKPOINT_ID_METRIC = "lastCompletedCheckpointId";
    @VisibleForTesting
    static final String LATEST_CHECKPOINT_COMPLETED_TIMESTAMP = "lastCheckpointCompletedTimestamp";

    public DefaultCheckpointStatsTracker(int numRememberedCheckpoints, JobManagerJobMetricGroup metricGroup) {
        this(numRememberedCheckpoints, metricGroup, TraceOptions.CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT, null);
    }

    public DefaultCheckpointStatsTracker(int numRememberedCheckpoints, JobManagerJobMetricGroup metricGroup, TraceOptions.CheckpointSpanDetailLevel checkpointSpanDetailLevel, @Nullable CheckpointStatsListener checkpointStatsListener) {
        Preconditions.checkArgument(numRememberedCheckpoints >= 0, "Negative number of remembered checkpoints");
        this.history = new CheckpointStatsHistory(numRememberedCheckpoints);
        this.metricGroup = metricGroup;
        this.checkpointSpanDetailLevel = checkpointSpanDetailLevel;
        this.checkpointStatsListener = checkpointStatsListener;
        this.latestSnapshot = new CheckpointStatsSnapshot(this.counts.createSnapshot(), this.summary.createSnapshot(), this.history.createSnapshot(), null);
        this.registerMetrics(metricGroup);
    }

    @Override
    public CheckpointStatsSnapshot createSnapshot() {
        CheckpointStatsSnapshot snapshot = this.latestSnapshot;
        if (this.dirty && this.statsReadWriteLock.tryLock()) {
            try {
                this.latestSnapshot = snapshot = new CheckpointStatsSnapshot(this.counts.createSnapshot(), this.summary.createSnapshot(), this.history.createSnapshot(), this.jobInitializationMetricsBuilder.flatMap(JobInitializationMetricsBuilder::buildRestoredCheckpointStats).orElse(null));
                this.dirty = false;
            }
            finally {
                this.statsReadWriteLock.unlock();
            }
        }
        return snapshot;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public PendingCheckpointStats reportPendingCheckpoint(long checkpointId, long triggerTimestamp, CheckpointProperties props, Map<JobVertexID, Integer> vertexToDop) {
        PendingCheckpointStats pending = new PendingCheckpointStats(checkpointId, triggerTimestamp, props, vertexToDop);
        this.statsReadWriteLock.lock();
        try {
            this.counts.incrementInProgressCheckpoints();
            this.history.addInProgressCheckpoint(pending);
            this.dirty = true;
        }
        finally {
            this.statsReadWriteLock.unlock();
        }
        return pending;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void reportRestoredCheckpoint(long checkpointID, CheckpointProperties properties, String externalPath, long stateSize) {
        this.statsReadWriteLock.lock();
        try {
            this.counts.incrementRestoredCheckpoints();
            Preconditions.checkState(this.jobInitializationMetricsBuilder.isPresent(), "JobInitializationMetrics should have been set first, before RestoredCheckpointStats");
            this.jobInitializationMetricsBuilder.get().setRestoredCheckpointStats(checkpointID, stateSize, properties, externalPath);
            this.dirty = true;
        }
        finally {
            this.statsReadWriteLock.unlock();
        }
    }

    @Override
    public void reportCompletedCheckpoint(CompletedCheckpointStats completed) {
        this.statsReadWriteLock.lock();
        try {
            this.latestCompletedCheckpoint = completed;
            this.counts.incrementCompletedCheckpoints();
            this.history.replacePendingCheckpointById(completed);
            this.summary.updateSummary(completed);
            this.dirty = true;
            this.logCheckpointStatistics(completed);
            if (this.checkpointStatsListener != null) {
                this.checkpointStatsListener.onCompletedCheckpoint();
            }
        }
        finally {
            this.statsReadWriteLock.unlock();
        }
    }

    @Override
    public void reportFailedCheckpoint(FailedCheckpointStats failed) {
        this.statsReadWriteLock.lock();
        try {
            this.counts.incrementFailedCheckpoints();
            this.history.replacePendingCheckpointById(failed);
            this.dirty = true;
            this.logCheckpointStatistics(failed);
            if (this.checkpointStatsListener != null) {
                this.checkpointStatsListener.onFailedCheckpoint();
            }
        }
        finally {
            this.statsReadWriteLock.unlock();
        }
    }

    private void logCheckpointStatistics(AbstractCheckpointStats checkpointStats) {
        try {
            EventBuilder eventBuilder = Events.CheckpointEvent.builder(CheckpointStatsTracker.class).setObservedTsMillis(checkpointStats.getLatestAckTimestamp()).setSeverity("INFO");
            this.addCommonCheckpointStatsAttributes((AttributeBuilder)eventBuilder, checkpointStats);
            this.metricGroup.addEvent(eventBuilder);
            SpanBuilder spanBuilder = Span.builder(CheckpointStatsTracker.class, (String)"Checkpoint").setStartTsMillis(checkpointStats.getTriggerTimestamp()).setEndTsMillis(checkpointStats.getLatestAckTimestamp());
            this.addCommonCheckpointStatsAttributes((AttributeBuilder)spanBuilder, checkpointStats);
            this.addCheckpointAggregationStats(checkpointStats, spanBuilder);
            this.metricGroup.addSpan(spanBuilder);
            if (LOG.isDebugEnabled()) {
                StringWriter sw = new StringWriter();
                MAPPER.writeValue(sw, (Object)CheckpointStatistics.generateCheckpointStatistics(checkpointStats, true));
                String jsonDump = sw.toString();
                LOG.debug("CheckpointStatistics (for jobID={}, checkpointId={}) dump = {} ", new Object[]{this.metricGroup.jobId(), checkpointStats.checkpointId, jsonDump});
            }
        }
        catch (Exception ex) {
            LOG.warn("Fail to log CheckpointStatistics", (Throwable)ex);
        }
    }

    private AttributeBuilder addCommonCheckpointStatsAttributes(AttributeBuilder attributeBuilder, AbstractCheckpointStats checkpointStats) {
        attributeBuilder.setAttribute("checkpointId", checkpointStats.getCheckpointId()).setAttribute("fullSize", checkpointStats.getStateSize()).setAttribute("checkpointedSize", checkpointStats.getCheckpointedSize()).setAttribute("metadataSize", checkpointStats.getMetadataSize()).setAttribute("checkpointStatus", checkpointStats.getStatus().name()).setAttribute("isUnaligned", Boolean.toString(checkpointStats.isUnalignedCheckpoint())).setAttribute("checkpointType", checkpointStats.getProperties().getCheckpointType().getName());
        return attributeBuilder;
    }

    private void addCheckpointAggregationStats(AbstractCheckpointStats checkpointStats, SpanBuilder checkpointSpanBuilder) {
        ArrayList<TaskStateStats> sortedTaskStateStats = new ArrayList<TaskStateStats>(checkpointStats.getAllTaskStateStats());
        sortedTaskStateStats.sort((x, y) -> Long.signum(x.getSummaryStats().getCheckpointStartDelayStats().getMinimum() - y.getSummaryStats().getCheckpointStartDelayStats().getMinimum()));
        CHECKPOINT_SPAN_METRICS.stream().map(metric -> TaskStatsAggregator.aggregate(sortedTaskStateStats, metric)).forEach(aggregator -> {
            String metricName = aggregator.getMetricName();
            checkpointSpanBuilder.setAttribute("max" + metricName, aggregator.getTotalMax());
            if (!this.shouldSkipSumMetricNameInCheckpointSpanForCompatibility(metricName)) {
                checkpointSpanBuilder.setAttribute("sum" + metricName, aggregator.getTotalSum());
            }
            if (this.checkpointSpanDetailLevel == TraceOptions.CheckpointSpanDetailLevel.SPAN_PER_CHECKPOINT_WITH_TASKS) {
                checkpointSpanBuilder.setAttribute("perTaskMax" + metricName, Arrays.toString(aggregator.getValuesMax().getInternalArray()));
                checkpointSpanBuilder.setAttribute("perTaskSum" + metricName, Arrays.toString(aggregator.getValuesSum().getInternalArray()));
            }
        });
        if (this.checkpointSpanDetailLevel == TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_TASK || this.checkpointSpanDetailLevel == TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_SUBTASK) {
            for (TaskStateStats taskStats : sortedTaskStateStats) {
                checkpointSpanBuilder.addChild(this.createTaskSpan(checkpointStats, taskStats, this.checkpointSpanDetailLevel == TraceOptions.CheckpointSpanDetailLevel.CHILDREN_SPANS_PER_SUBTASK));
            }
        }
    }

    private SpanBuilder createTaskSpan(AbstractCheckpointStats checkpointStats, TaskStateStats taskStats, boolean addSubtaskSpans) {
        long taskStartTs = checkpointStats.getTriggerTimestamp() + taskStats.getSummaryStats().getCheckpointStartDelayStats().getMinimum();
        SpanBuilder taskSpanBuilder = Span.builder(CheckpointStatsTracker.class, (String)"Checkpoint_Task").setStartTsMillis(taskStartTs).setEndTsMillis(taskStats.getLatestAckTimestamp()).setAttribute("checkpointId", checkpointStats.getCheckpointId()).setAttribute("jobVertexId", taskStats.getJobVertexId().toString());
        for (CheckpointSpanMetric spanMetric : CHECKPOINT_SPAN_METRICS) {
            String metricName = spanMetric.metricName;
            StatsSummary statsSummary = spanMetric.taskStatsSummaryExtractor.extract(taskStats.getSummaryStats());
            taskSpanBuilder.setAttribute("max" + metricName, statsSummary.getMaximum());
            taskSpanBuilder.setAttribute("sum" + metricName, statsSummary.getSum());
        }
        if (addSubtaskSpans) {
            this.addSubtaskSpans(checkpointStats, taskStats, taskSpanBuilder);
        }
        return taskSpanBuilder;
    }

    private void addSubtaskSpans(AbstractCheckpointStats checkpointStats, TaskStateStats taskStats, SpanBuilder taskSpanBuilder) {
        for (SubtaskStateStats subtaskStat : taskStats.getSubtaskStats()) {
            if (subtaskStat == null) continue;
            long subTaskStartTs = checkpointStats.getTriggerTimestamp() + subtaskStat.getCheckpointStartDelay();
            SpanBuilder subTaskSpanBuilder = Span.builder(CheckpointStatsTracker.class, (String)"Checkpoint_Subtask").setStartTsMillis(subTaskStartTs).setEndTsMillis(subtaskStat.getAckTimestamp()).setAttribute("checkpointId", checkpointStats.getCheckpointId()).setAttribute("jobVertexId", taskStats.getJobVertexId().toString()).setAttribute("subtaskId", (long)subtaskStat.getSubtaskIndex());
            for (CheckpointSpanMetric spanMetric : CHECKPOINT_SPAN_METRICS) {
                String metricName = spanMetric.metricName;
                long metricValue = spanMetric.subtaskMetricExtractor.extract(subtaskStat);
                subTaskSpanBuilder.setAttribute(metricName, metricValue);
            }
            taskSpanBuilder.addChild(subTaskSpanBuilder);
        }
    }

    private boolean shouldSkipSumMetricNameInCheckpointSpanForCompatibility(String metricName) {
        return metricName.equals("StateSizeBytes") || metricName.equals("CheckpointedSizeBytes");
    }

    @Override
    public void reportFailedCheckpointsWithoutInProgress() {
        this.statsReadWriteLock.lock();
        try {
            this.counts.incrementFailedCheckpointsWithoutInProgress();
            this.dirty = true;
            if (this.checkpointStatsListener != null) {
                this.checkpointStatsListener.onFailedCheckpoint();
            }
        }
        finally {
            this.statsReadWriteLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public PendingCheckpointStats getPendingCheckpointStats(long checkpointId) {
        this.statsReadWriteLock.lock();
        try {
            AbstractCheckpointStats stats = this.history.getCheckpointById(checkpointId);
            PendingCheckpointStats pendingCheckpointStats = stats instanceof PendingCheckpointStats ? (PendingCheckpointStats)stats : null;
            return pendingCheckpointStats;
        }
        finally {
            this.statsReadWriteLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void reportIncompleteStats(long checkpointId, ExecutionAttemptID attemptId, CheckpointMetrics metrics) {
        this.statsReadWriteLock.lock();
        try {
            AbstractCheckpointStats stats = this.history.getCheckpointById(checkpointId);
            if (stats instanceof PendingCheckpointStats) {
                ((PendingCheckpointStats)stats).reportSubtaskStats(attemptId.getJobVertexId(), new SubtaskStateStats(attemptId.getSubtaskIndex(), System.currentTimeMillis(), metrics.getBytesPersistedOfThisCheckpoint(), metrics.getTotalBytesPersisted(), metrics.getSyncDurationMillis(), metrics.getAsyncDurationMillis(), metrics.getBytesProcessedDuringAlignment(), metrics.getBytesPersistedDuringAlignment(), metrics.getAlignmentDurationNanos() / 1000000L, metrics.getCheckpointStartDelayNanos() / 1000000L, metrics.getUnalignedCheckpoint(), false));
                this.dirty = true;
            }
        }
        finally {
            this.statsReadWriteLock.unlock();
        }
    }

    @Override
    public void reportInitializationStarted(Set<ExecutionAttemptID> toInitialize, long initializationStartTs) {
        this.jobInitializationMetricsBuilder = Optional.of(new JobInitializationMetricsBuilder(toInitialize, initializationStartTs));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void reportInitializationMetrics(ExecutionAttemptID executionAttemptId, SubTaskInitializationMetrics initializationMetrics) {
        this.statsReadWriteLock.lock();
        try {
            if (!this.jobInitializationMetricsBuilder.isPresent()) {
                LOG.warn("Attempted to report SubTaskInitializationMetrics [{}] without jobInitializationMetricsBuilder present", (Object)initializationMetrics);
                return;
            }
            JobInitializationMetricsBuilder builder = this.jobInitializationMetricsBuilder.get();
            builder.reportInitializationMetrics(executionAttemptId, initializationMetrics);
            if (builder.isComplete()) {
                this.traceInitializationMetrics(builder.build());
            }
        }
        catch (Exception ex) {
            LOG.warn("Failed to log SubTaskInitializationMetrics [{}]", (Object)initializationMetrics, (Object)ex);
        }
        finally {
            this.statsReadWriteLock.unlock();
        }
    }

    private void traceInitializationMetrics(JobInitializationMetrics jobInitializationMetrics) {
        SpanBuilder span = Span.builder(CheckpointStatsTracker.class, (String)"JobInitialization").setStartTsMillis(jobInitializationMetrics.getStartTs()).setEndTsMillis(jobInitializationMetrics.getEndTs()).setAttribute("initializationStatus", jobInitializationMetrics.getStatus().name());
        for (JobInitializationMetrics.SumMaxDuration duration : jobInitializationMetrics.getDurationMetrics().values()) {
            this.setDurationSpanAttribute(span, duration);
        }
        if (jobInitializationMetrics.getCheckpointId() != -1L) {
            span.setAttribute("checkpointId", jobInitializationMetrics.getCheckpointId());
        }
        if (jobInitializationMetrics.getStateSize() != -1L) {
            span.setAttribute("fullSize", jobInitializationMetrics.getStateSize());
        }
        this.metricGroup.addSpan(span);
    }

    private void setDurationSpanAttribute(SpanBuilder span, JobInitializationMetrics.SumMaxDuration duration) {
        span.setAttribute("max" + duration.getName(), duration.getMax());
        span.setAttribute("sum" + duration.getName(), duration.getSum());
    }

    private void registerMetrics(MetricGroup metricGroup) {
        metricGroup.gauge(NUMBER_OF_CHECKPOINTS_METRIC, (Gauge)new CheckpointsCounter());
        metricGroup.gauge(NUMBER_OF_IN_PROGRESS_CHECKPOINTS_METRIC, (Gauge)new InProgressCheckpointsCounter());
        metricGroup.gauge(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC, (Gauge)new CompletedCheckpointsCounter());
        metricGroup.gauge(NUMBER_OF_FAILED_CHECKPOINTS_METRIC, (Gauge)new FailedCheckpointsCounter());
        metricGroup.gauge(LATEST_RESTORED_CHECKPOINT_TIMESTAMP_METRIC, (Gauge)new LatestRestoredCheckpointTimestampGauge());
        metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_SIZE_METRIC, (Gauge)new LatestCompletedCheckpointSizeGauge());
        metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_FULL_SIZE_METRIC, (Gauge)new LatestCompletedCheckpointFullSizeGauge());
        metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_METADATA_SIZE_METRIC, (Gauge)new LatestCompletedCheckpointMetadataSizeGauge());
        metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_DURATION_METRIC, (Gauge)new LatestCompletedCheckpointDurationGauge());
        metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_PROCESSED_DATA_METRIC, (Gauge)new LatestCompletedCheckpointProcessedDataGauge());
        metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_PERSISTED_DATA_METRIC, (Gauge)new LatestCompletedCheckpointPersistedDataGauge());
        metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_EXTERNAL_PATH_METRIC, (Gauge)new LatestCompletedCheckpointExternalPathGauge());
        metricGroup.gauge(LATEST_COMPLETED_CHECKPOINT_ID_METRIC, (Gauge)new LatestCompletedCheckpointIdGauge());
        metricGroup.gauge(LATEST_CHECKPOINT_COMPLETED_TIMESTAMP, (Gauge)new LatestCheckpointCompletedTimestampGauge());
    }

    static class TaskStatsAggregator {
        final String metricName;
        final LongArrayList valuesMax;
        final LongArrayList valuesSum;

        TaskStatsAggregator(String metric, LongArrayList valuesMax, LongArrayList valuesSum) {
            this.metricName = metric;
            this.valuesMax = valuesMax;
            this.valuesSum = valuesSum;
        }

        public static TaskStatsAggregator aggregate(Collection<TaskStateStats> allTaskStateStats, CheckpointSpanMetric metricDescriptor) {
            LongArrayList valuesMax = new LongArrayList(allTaskStateStats.size());
            LongArrayList valuesSum = new LongArrayList(allTaskStateStats.size());
            for (TaskStateStats taskStats : allTaskStateStats) {
                StatsSummary statsSummary = metricDescriptor.taskStatsSummaryExtractor.extract(taskStats.getSummaryStats());
                valuesMax.add(statsSummary.getMaximum());
                valuesSum.add(statsSummary.getSum());
            }
            return new TaskStatsAggregator(metricDescriptor.metricName, valuesMax, valuesSum);
        }

        public LongArrayList getValuesMax() {
            return this.valuesMax;
        }

        public LongArrayList getValuesSum() {
            return this.valuesSum;
        }

        public String getMetricName() {
            return this.metricName;
        }

        public long getTotalMax() {
            return Arrays.stream(this.valuesMax.getInternalArray()).filter(val -> val > 0L).max().orElse(0L);
        }

        public long getTotalSum() {
            return Arrays.stream(this.valuesSum.getInternalArray()).filter(val -> val >= 0L).sum();
        }
    }

    private class LatestCheckpointCompletedTimestampGauge
    implements Gauge<Long> {
        private LatestCheckpointCompletedTimestampGauge() {
        }

        public Long getValue() {
            CompletedCheckpointStats completed = DefaultCheckpointStatsTracker.this.latestCompletedCheckpoint;
            if (completed != null) {
                return completed.getLatestAckTimestamp();
            }
            return -1L;
        }
    }

    private class LatestCompletedCheckpointIdGauge
    implements Gauge<Long> {
        private LatestCompletedCheckpointIdGauge() {
        }

        public Long getValue() {
            CompletedCheckpointStats completed = DefaultCheckpointStatsTracker.this.latestCompletedCheckpoint;
            if (completed != null) {
                return completed.getCheckpointId();
            }
            return -1L;
        }
    }

    private class LatestCompletedCheckpointExternalPathGauge
    implements Gauge<String> {
        private LatestCompletedCheckpointExternalPathGauge() {
        }

        public String getValue() {
            CompletedCheckpointStats completed = DefaultCheckpointStatsTracker.this.latestCompletedCheckpoint;
            if (completed != null && completed.getExternalPath() != null) {
                return completed.getExternalPath();
            }
            return "n/a";
        }
    }

    private class LatestCompletedCheckpointPersistedDataGauge
    implements Gauge<Long> {
        private LatestCompletedCheckpointPersistedDataGauge() {
        }

        public Long getValue() {
            CompletedCheckpointStats completed = DefaultCheckpointStatsTracker.this.latestCompletedCheckpoint;
            if (completed != null) {
                return completed.getPersistedData();
            }
            return -1L;
        }
    }

    private class LatestCompletedCheckpointProcessedDataGauge
    implements Gauge<Long> {
        private LatestCompletedCheckpointProcessedDataGauge() {
        }

        public Long getValue() {
            CompletedCheckpointStats completed = DefaultCheckpointStatsTracker.this.latestCompletedCheckpoint;
            if (completed != null) {
                return completed.getProcessedData();
            }
            return -1L;
        }
    }

    private class LatestCompletedCheckpointDurationGauge
    implements Gauge<Long> {
        private LatestCompletedCheckpointDurationGauge() {
        }

        public Long getValue() {
            CompletedCheckpointStats completed = DefaultCheckpointStatsTracker.this.latestCompletedCheckpoint;
            if (completed != null) {
                return completed.getEndToEndDuration();
            }
            return -1L;
        }
    }

    private class LatestCompletedCheckpointMetadataSizeGauge
    implements Gauge<Long> {
        private LatestCompletedCheckpointMetadataSizeGauge() {
        }

        public Long getValue() {
            CompletedCheckpointStats completed = DefaultCheckpointStatsTracker.this.latestCompletedCheckpoint;
            return completed != null ? completed.getMetadataSize() : -1L;
        }
    }

    private class LatestCompletedCheckpointFullSizeGauge
    implements Gauge<Long> {
        private LatestCompletedCheckpointFullSizeGauge() {
        }

        public Long getValue() {
            CompletedCheckpointStats completed = DefaultCheckpointStatsTracker.this.latestCompletedCheckpoint;
            if (completed != null) {
                return completed.getStateSize();
            }
            return -1L;
        }
    }

    private class LatestCompletedCheckpointSizeGauge
    implements Gauge<Long> {
        private LatestCompletedCheckpointSizeGauge() {
        }

        public Long getValue() {
            CompletedCheckpointStats completed = DefaultCheckpointStatsTracker.this.latestCompletedCheckpoint;
            if (completed != null) {
                return completed.getCheckpointedSize();
            }
            return -1L;
        }
    }

    private class LatestRestoredCheckpointTimestampGauge
    implements Gauge<Long> {
        private LatestRestoredCheckpointTimestampGauge() {
        }

        public Long getValue() {
            return DefaultCheckpointStatsTracker.this.jobInitializationMetricsBuilder.map(JobInitializationMetricsBuilder::getStartTs).orElse(-1L);
        }
    }

    private class FailedCheckpointsCounter
    implements Gauge<Long> {
        private FailedCheckpointsCounter() {
        }

        public Long getValue() {
            return DefaultCheckpointStatsTracker.this.counts.getNumberOfFailedCheckpoints();
        }
    }

    private class CompletedCheckpointsCounter
    implements Gauge<Long> {
        private CompletedCheckpointsCounter() {
        }

        public Long getValue() {
            return DefaultCheckpointStatsTracker.this.counts.getNumberOfCompletedCheckpoints();
        }
    }

    private class InProgressCheckpointsCounter
    implements Gauge<Integer> {
        private InProgressCheckpointsCounter() {
        }

        public Integer getValue() {
            return DefaultCheckpointStatsTracker.this.counts.getNumberOfInProgressCheckpoints();
        }
    }

    private class CheckpointsCounter
    implements Gauge<Long> {
        private CheckpointsCounter() {
        }

        public Long getValue() {
            return DefaultCheckpointStatsTracker.this.counts.getTotalNumberOfCheckpoints();
        }
    }

    static final class CheckpointSpanMetric {
        final String metricName;
        final TaskStatsSummaryExtractor taskStatsSummaryExtractor;
        final SubtaskMetricExtractor subtaskMetricExtractor;

        private CheckpointSpanMetric(String metricName, TaskStatsSummaryExtractor taskStatsSummaryExtractor, SubtaskMetricExtractor subtaskMetricExtractor) {
            this.metricName = metricName;
            this.taskStatsSummaryExtractor = taskStatsSummaryExtractor;
            this.subtaskMetricExtractor = subtaskMetricExtractor;
        }

        static CheckpointSpanMetric of(String metricName, TaskStatsSummaryExtractor taskStatsSummaryExtractor, SubtaskMetricExtractor subtaskMetricExtractor) {
            return new CheckpointSpanMetric(metricName, taskStatsSummaryExtractor, subtaskMetricExtractor);
        }
    }

    @FunctionalInterface
    static interface SubtaskMetricExtractor {
        public long extract(SubtaskStateStats var1);
    }

    @FunctionalInterface
    static interface TaskStatsSummaryExtractor {
        public StatsSummary extract(TaskStateStats.TaskStateStatsSummary var1);
    }
}

