/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.kafka;

import com.google.auto.value.AutoValue;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.sdk.io.kafka.AutoValue_KafkaMetrics_KafkaMetricsImpl;
import org.apache.beam.sdk.io.kafka.KafkaSinkMetrics;
import org.apache.beam.sdk.metrics.Gauge;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.util.Preconditions;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public interface KafkaMetrics {
    public void updateSuccessfulRpcMetrics(@UnknownKeyFor @NonNull @Initialized String var1, @UnknownKeyFor @NonNull @Initialized Duration var2);

    public void updateBacklogBytes(@UnknownKeyFor @NonNull @Initialized String var1, @UnknownKeyFor @NonNull @Initialized int var2, @UnknownKeyFor @NonNull @Initialized long var3);

    public void flushBufferedMetrics();

    @AutoValue
    public static abstract class KafkaMetricsImpl
    implements KafkaMetrics {
        private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(KafkaMetricsImpl.class);
        private static final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Histogram> LATENCY_HISTOGRAMS = new ConcurrentHashMap<String, Histogram>();

        abstract @UnknownKeyFor @NonNull @Initialized ConcurrentHashMap<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized ConcurrentLinkedQueue<@UnknownKeyFor @NonNull @Initialized Duration>> perTopicRpcLatencies();

        abstract @UnknownKeyFor @NonNull @Initialized ConcurrentHashMap<@UnknownKeyFor @NonNull @Initialized MetricName, @UnknownKeyFor @NonNull @Initialized Long> perTopicPartitionBacklogs();

        abstract @UnknownKeyFor @NonNull @Initialized AtomicBoolean isWritable();

        public static @UnknownKeyFor @NonNull @Initialized KafkaMetricsImpl create() {
            return new AutoValue_KafkaMetrics_KafkaMetricsImpl(new ConcurrentHashMap<String, ConcurrentLinkedQueue<Duration>>(), new ConcurrentHashMap<MetricName, Long>(), new AtomicBoolean(true));
        }

        @Override
        public void updateSuccessfulRpcMetrics(@UnknownKeyFor @NonNull @Initialized String topic, @UnknownKeyFor @NonNull @Initialized Duration elapsedTime) {
            if (this.isWritable().get()) {
                ConcurrentLinkedQueue<Duration> latencies = this.perTopicRpcLatencies().get(topic);
                if (latencies == null) {
                    latencies = new ConcurrentLinkedQueue();
                    latencies.add(elapsedTime);
                    this.perTopicRpcLatencies().putIfAbsent(topic, latencies);
                } else {
                    latencies.add(elapsedTime);
                }
            }
        }

        @Override
        public void updateBacklogBytes(@UnknownKeyFor @NonNull @Initialized String topicName, @UnknownKeyFor @NonNull @Initialized int partitionId, @UnknownKeyFor @NonNull @Initialized long backlog) {
            if (this.isWritable().get()) {
                MetricName metricName = KafkaSinkMetrics.getMetricGaugeName(topicName, partitionId);
                this.perTopicPartitionBacklogs().put(metricName, backlog);
            }
        }

        private void recordRpcLatencyMetrics() {
            for (Map.Entry<String, ConcurrentLinkedQueue<Duration>> topicLatencies : this.perTopicRpcLatencies().entrySet()) {
                Histogram topicHistogram;
                if (LATENCY_HISTOGRAMS.containsKey(topicLatencies.getKey())) {
                    topicHistogram = LATENCY_HISTOGRAMS.get(topicLatencies.getKey());
                } else {
                    topicHistogram = KafkaSinkMetrics.createRPCLatencyHistogram(KafkaSinkMetrics.RpcMethod.POLL, topicLatencies.getKey());
                    LATENCY_HISTOGRAMS.put(topicLatencies.getKey(), topicHistogram);
                }
                for (Duration d : topicLatencies.getValue()) {
                    Preconditions.checkArgumentNotNull((Object)topicHistogram);
                    topicHistogram.update((double)d.toMillis());
                }
            }
        }

        private void recordBacklogBytesInternal() {
            for (Map.Entry<MetricName, Long> backlog : this.perTopicPartitionBacklogs().entrySet()) {
                Gauge gauge = KafkaSinkMetrics.createBacklogGauge(backlog.getKey());
                gauge.set(backlog.getValue().longValue());
            }
        }

        @Override
        public void flushBufferedMetrics() {
            if (!this.isWritable().compareAndSet(true, false)) {
                LOG.warn("Updating stale Kafka metrics container");
                return;
            }
            this.recordBacklogBytesInternal();
            this.recordRpcLatencyMetrics();
        }
    }

    public static class NoOpKafkaMetrics
    implements KafkaMetrics {
        private static @UnknownKeyFor @NonNull @Initialized NoOpKafkaMetrics singleton = new NoOpKafkaMetrics();

        private NoOpKafkaMetrics() {
        }

        @Override
        public void updateSuccessfulRpcMetrics(@UnknownKeyFor @NonNull @Initialized String topic, @UnknownKeyFor @NonNull @Initialized Duration elapsedTime) {
        }

        @Override
        public void updateBacklogBytes(@UnknownKeyFor @NonNull @Initialized String topic, @UnknownKeyFor @NonNull @Initialized int partitionId, @UnknownKeyFor @NonNull @Initialized long backlog) {
        }

        @Override
        public void flushBufferedMetrics() {
        }

        static @UnknownKeyFor @NonNull @Initialized NoOpKafkaMetrics getInstance() {
            return singleton;
        }
    }
}

