/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.List;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.internals.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.internals.WrappedStateStore;

public class MeteredKeyValueStore<K, V>
extends WrappedStateStore.AbstractWrappedStateStore
implements KeyValueStore<K, V> {
    protected final KeyValueStore<K, V> inner;
    protected final String metricScope;
    protected final Time time;
    private Sensor putTime;
    private Sensor putIfAbsentTime;
    private Sensor getTime;
    private Sensor deleteTime;
    private Sensor putAllTime;
    private Sensor allTime;
    private Sensor rangeTime;
    private Sensor flushTime;
    private Sensor restoreTime;
    private StreamsMetricsImpl metrics;
    private K key;
    private V value;
    private Runnable getDelegate = new Runnable(){

        @Override
        public void run() {
            MeteredKeyValueStore.this.value = MeteredKeyValueStore.this.inner.get(MeteredKeyValueStore.this.key);
        }
    };
    private Runnable putDelegate = new Runnable(){

        @Override
        public void run() {
            MeteredKeyValueStore.this.inner.put(MeteredKeyValueStore.this.key, MeteredKeyValueStore.this.value);
        }
    };
    private Runnable putIfAbsentDelegate = new Runnable(){

        @Override
        public void run() {
            MeteredKeyValueStore.this.value = MeteredKeyValueStore.this.inner.putIfAbsent(MeteredKeyValueStore.this.key, MeteredKeyValueStore.this.value);
        }
    };
    private List<KeyValue<K, V>> entries;
    private Runnable putAllDelegate = new Runnable(){

        @Override
        public void run() {
            MeteredKeyValueStore.this.inner.putAll(MeteredKeyValueStore.this.entries);
        }
    };
    private Runnable deleteDelegate = new Runnable(){

        @Override
        public void run() {
            MeteredKeyValueStore.this.value = MeteredKeyValueStore.this.inner.delete(MeteredKeyValueStore.this.key);
        }
    };
    private Runnable flushDelegate = new Runnable(){

        @Override
        public void run() {
            MeteredKeyValueStore.this.inner.flush();
        }
    };
    private ProcessorContext context;
    private StateStore root;
    private Runnable initDelegate = new Runnable(){

        @Override
        public void run() {
            MeteredKeyValueStore.this.inner.init(MeteredKeyValueStore.this.context, MeteredKeyValueStore.this.root);
        }
    };

    public MeteredKeyValueStore(KeyValueStore<K, V> inner, String metricScope, Time time) {
        super(inner);
        this.inner = inner;
        this.metricScope = metricScope;
        this.time = time != null ? time : Time.SYSTEM;
    }

    @Override
    public void init(ProcessorContext context, StateStore root) {
        String name = this.name();
        this.context = context;
        this.root = root;
        this.metrics = (StreamsMetricsImpl)context.metrics();
        this.putTime = this.metrics.addLatencyAndThroughputSensor(this.metricScope, name, "put", Sensor.RecordingLevel.DEBUG, new String[0]);
        this.putIfAbsentTime = this.metrics.addLatencyAndThroughputSensor(this.metricScope, name, "put-if-absent", Sensor.RecordingLevel.DEBUG, new String[0]);
        this.getTime = this.metrics.addLatencyAndThroughputSensor(this.metricScope, name, "get", Sensor.RecordingLevel.DEBUG, new String[0]);
        this.deleteTime = this.metrics.addLatencyAndThroughputSensor(this.metricScope, name, "delete", Sensor.RecordingLevel.DEBUG, new String[0]);
        this.putAllTime = this.metrics.addLatencyAndThroughputSensor(this.metricScope, name, "put-all", Sensor.RecordingLevel.DEBUG, new String[0]);
        this.allTime = this.metrics.addLatencyAndThroughputSensor(this.metricScope, name, "all", Sensor.RecordingLevel.DEBUG, new String[0]);
        this.rangeTime = this.metrics.addLatencyAndThroughputSensor(this.metricScope, name, "range", Sensor.RecordingLevel.DEBUG, new String[0]);
        this.flushTime = this.metrics.addLatencyAndThroughputSensor(this.metricScope, name, "flush", Sensor.RecordingLevel.DEBUG, new String[0]);
        this.restoreTime = this.metrics.addLatencyAndThroughputSensor(this.metricScope, name, "restore", Sensor.RecordingLevel.DEBUG, new String[0]);
        this.metrics.measureLatencyNs(this.time, this.initDelegate, this.restoreTime);
    }

    @Override
    public V get(K key) {
        this.key = key;
        this.metrics.measureLatencyNs(this.time, this.getDelegate, this.getTime);
        return this.value;
    }

    @Override
    public void put(K key, V value) {
        this.key = key;
        this.value = value;
        this.metrics.measureLatencyNs(this.time, this.putDelegate, this.putTime);
    }

    @Override
    public V putIfAbsent(K key, V value) {
        this.key = key;
        this.value = value;
        this.metrics.measureLatencyNs(this.time, this.putIfAbsentDelegate, this.putIfAbsentTime);
        return this.value;
    }

    @Override
    public void putAll(List<KeyValue<K, V>> entries) {
        this.entries = entries;
        this.metrics.measureLatencyNs(this.time, this.putAllDelegate, this.putAllTime);
    }

    @Override
    public V delete(K key) {
        this.key = key;
        this.metrics.measureLatencyNs(this.time, this.deleteDelegate, this.deleteTime);
        return this.value;
    }

    @Override
    public KeyValueIterator<K, V> range(K from, K to) {
        return new MeteredKeyValueIterator(this.inner.range(from, to), this.rangeTime);
    }

    @Override
    public KeyValueIterator<K, V> all() {
        return new MeteredKeyValueIterator(this.inner.all(), this.allTime);
    }

    @Override
    public long approximateNumEntries() {
        return this.inner.approximateNumEntries();
    }

    @Override
    public void flush() {
        this.metrics.measureLatencyNs(this.time, this.flushDelegate, this.flushTime);
    }

    private class MeteredKeyValueIterator<K1, V1>
    implements KeyValueIterator<K1, V1> {
        private final KeyValueIterator<K1, V1> iter;
        private final Sensor sensor;
        private final long startNs;

        public MeteredKeyValueIterator(KeyValueIterator<K1, V1> iter, Sensor sensor) {
            this.iter = iter;
            this.sensor = sensor;
            this.startNs = MeteredKeyValueStore.this.time.nanoseconds();
        }

        @Override
        public boolean hasNext() {
            return this.iter.hasNext();
        }

        @Override
        public KeyValue<K1, V1> next() {
            return (KeyValue)this.iter.next();
        }

        @Override
        public void remove() {
            this.iter.remove();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void close() {
            try {
                this.iter.close();
            }
            finally {
                MeteredKeyValueStore.this.metrics.recordLatency(this.sensor, this.startNs, MeteredKeyValueStore.this.time.nanoseconds());
            }
        }

        @Override
        public K1 peekNextKey() {
            return this.iter.peekNextKey();
        }
    }
}

