/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.samza.metrics;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.beam.runners.samza.metrics.SamzaTransformMetrics;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.samza.context.Context;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SamzaTransformMetricRegistry
implements Serializable {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(SamzaTransformMetricRegistry.class);
    private final @UnknownKeyFor @NonNull @Initialized ConcurrentHashMap<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized ConcurrentHashMap<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized ConcurrentHashMap<@UnknownKeyFor @NonNull @Initialized Long, @UnknownKeyFor @NonNull @Initialized Long>>> avgArrivalTimeMap;
    @SuppressFBWarnings(value={"SE_BAD_FIELD"})
    private final @UnknownKeyFor @NonNull @Initialized ConcurrentHashMap<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized ConcurrentHashMap<@UnknownKeyFor @NonNull @Initialized BoundedWindow, @UnknownKeyFor @NonNull @Initialized Long>> avgArrivalTimeMapForGbk;
    private final @UnknownKeyFor @NonNull @Initialized SamzaTransformMetrics transformMetrics;

    public SamzaTransformMetricRegistry() {
        this.avgArrivalTimeMap = new ConcurrentHashMap();
        this.avgArrivalTimeMapForGbk = new ConcurrentHashMap();
        this.transformMetrics = new SamzaTransformMetrics();
    }

    @VisibleForTesting
    SamzaTransformMetricRegistry(@UnknownKeyFor @NonNull @Initialized SamzaTransformMetrics samzaTransformMetrics) {
        this.transformMetrics = samzaTransformMetrics;
        this.avgArrivalTimeMap = new ConcurrentHashMap();
        this.avgArrivalTimeMapForGbk = new ConcurrentHashMap();
    }

    public void register(@UnknownKeyFor @NonNull @Initialized String transformFullName, @UnknownKeyFor @NonNull @Initialized String pValue, @UnknownKeyFor @NonNull @Initialized Context ctx) {
        this.transformMetrics.register(transformFullName, ctx);
        this.avgArrivalTimeMap.putIfAbsent(transformFullName, new ConcurrentHashMap());
        this.avgArrivalTimeMap.get(transformFullName).putIfAbsent(pValue, new ConcurrentHashMap());
        this.avgArrivalTimeMapForGbk.putIfAbsent(transformFullName, new ConcurrentHashMap());
    }

    public @UnknownKeyFor @NonNull @Initialized SamzaTransformMetrics getTransformMetrics() {
        return this.transformMetrics;
    }

    public void updateArrivalTimeMap(@UnknownKeyFor @NonNull @Initialized String transformName, @UnknownKeyFor @NonNull @Initialized String pValue, @UnknownKeyFor @NonNull @Initialized long watermark, @UnknownKeyFor @NonNull @Initialized long avg) {
        if (this.avgArrivalTimeMap.get(transformName) != null && this.avgArrivalTimeMap.get(transformName).get(pValue) != null) {
            ConcurrentHashMap<Long, Long> avgArrivalTimeMapForPValue = this.avgArrivalTimeMap.get(transformName).get(pValue);
            avgArrivalTimeMapForPValue.put(watermark, avg);
            avgArrivalTimeMapForPValue.entrySet().removeIf(entry -> (Long)entry.getKey() < watermark);
        }
    }

    public void updateArrivalTimeMap(@UnknownKeyFor @NonNull @Initialized String transformName, @UnknownKeyFor @NonNull @Initialized BoundedWindow windowId, @UnknownKeyFor @NonNull @Initialized long avg) {
        ConcurrentHashMap<BoundedWindow, Long> avgArrivalTimeMapForTransform = this.avgArrivalTimeMapForGbk.get(transformName);
        if (avgArrivalTimeMapForTransform != null) {
            avgArrivalTimeMapForTransform.put(windowId, avg);
        }
    }

    public void emitLatencyMetric(@UnknownKeyFor @NonNull @Initialized String transformName, @UnknownKeyFor @NonNull @Initialized BoundedWindow windowId, @UnknownKeyFor @NonNull @Initialized long avgArrivalEndTime, @UnknownKeyFor @NonNull @Initialized String taskName) {
        Long avgArrivalStartTime;
        Long l = avgArrivalStartTime = this.avgArrivalTimeMapForGbk.get(transformName) != null ? this.avgArrivalTimeMapForGbk.get(transformName).remove(windowId) : null;
        if (avgArrivalStartTime == null || avgArrivalStartTime == 0L || avgArrivalEndTime == 0L) {
            LOG.debug("Failure to Emit Metric for Transform: {}, Start-Time: {} or End-Time: {} found is 0/null for windowId: {}, task: {}", new Object[]{transformName, avgArrivalStartTime, avgArrivalEndTime, windowId, taskName});
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Success Emit Metric for Transform: {}, window: {} for task: {}", new Object[]{transformName, windowId, taskName});
        }
        this.transformMetrics.getTransformLatencyMetric(transformName).update(avgArrivalEndTime - avgArrivalStartTime);
        this.transformMetrics.getTransformCacheSize(transformName).set((Object)this.avgArrivalTimeMapForGbk.get(transformName).size());
    }

    public void emitLatencyMetric(@UnknownKeyFor @NonNull @Initialized String transformName, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> inputs, @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String> outputs, @UnknownKeyFor @NonNull @Initialized Long watermark, @UnknownKeyFor @NonNull @Initialized String taskName) {
        ConcurrentHashMap<String, ConcurrentHashMap<Long, Long>> avgArrivalTimeMapForTransform = this.avgArrivalTimeMap.get(transformName);
        if (avgArrivalTimeMapForTransform == null || inputs.isEmpty() || outputs.isEmpty()) {
            return;
        }
        List inputPValuesAvgArrivalTimes = inputs.stream().map(avgArrivalTimeMapForTransform::get).map(map -> map == null ? null : (Long)map.remove(watermark)).filter(avgArrivalTime -> avgArrivalTime != null).collect(Collectors.toList());
        List outputPValuesAvgArrivalTimes = outputs.stream().map(avgArrivalTimeMapForTransform::get).map(map -> map == null ? null : (Long)map.remove(watermark)).filter(avgArrivalTime -> avgArrivalTime != null).collect(Collectors.toList());
        if (inputPValuesAvgArrivalTimes.isEmpty() || outputPValuesAvgArrivalTimes.isEmpty()) {
            LOG.debug("Failure to Emit Metric for Transform: {} inputArrivalTime: {} or outputArrivalTime: {} not found for Watermark: {} Task: {}", new Object[]{transformName, inputPValuesAvgArrivalTimes, inputPValuesAvgArrivalTimes, watermark, taskName});
            return;
        }
        long startTime = (Long)Collections.min(inputPValuesAvgArrivalTimes);
        long endTime = (Long)Collections.max(outputPValuesAvgArrivalTimes);
        long latency = endTime - startTime;
        this.transformMetrics.getTransformLatencyMetric(transformName).update(latency);
        this.transformMetrics.getTransformCacheSize(transformName).set((Object)avgArrivalTimeMapForTransform.values().stream().mapToLong(ConcurrentHashMap::size).sum());
        LOG.debug("Success Emit Metric Transform: {} for watermark: {} for task: {}", new Object[]{transformName, watermark, taskName});
    }

    @VisibleForTesting
    @Nullable @UnknownKeyFor @Initialized ConcurrentHashMap<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized ConcurrentHashMap<@UnknownKeyFor @NonNull @Initialized Long, @UnknownKeyFor @NonNull @Initialized Long>> getAverageArrivalTimeMap(@UnknownKeyFor @NonNull @Initialized String transformName) {
        return this.avgArrivalTimeMap.get(transformName);
    }

    @VisibleForTesting
    @Nullable @UnknownKeyFor @Initialized ConcurrentHashMap<@UnknownKeyFor @NonNull @Initialized BoundedWindow, @UnknownKeyFor @NonNull @Initialized Long> getAverageArrivalTimeMapForGBK(@UnknownKeyFor @NonNull @Initialized String transformName) {
        return this.avgArrivalTimeMapForGbk.get(transformName);
    }
}

