/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.metrics;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.internal.InternalAggregatingState;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalReducingState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.metrics.MetricsTrackingAggregatingState;
import org.apache.flink.runtime.state.metrics.MetricsTrackingListState;
import org.apache.flink.runtime.state.metrics.MetricsTrackingMapState;
import org.apache.flink.runtime.state.metrics.MetricsTrackingReducingState;
import org.apache.flink.runtime.state.metrics.MetricsTrackingStateFactory;
import org.apache.flink.runtime.state.metrics.MetricsTrackingValueState;
import org.apache.flink.runtime.state.metrics.SizeTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackend;
import org.apache.flink.runtime.state.ttl.mock.MockKeyedStateBackendBuilder;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
public class MetricsTrackingStateFactoryTest {
    @Parameter
    public Tuple2<Boolean, Boolean> enableLatencyOrSizeTracking;

    @Parameters(name="enable latency or size tracking: {0}")
    public static Collection<Tuple2<Boolean, Boolean>> enabled() {
        return Arrays.asList(Tuple2.of((Object)true, (Object)true), Tuple2.of((Object)true, (Object)false), Tuple2.of((Object)false, (Object)true), Tuple2.of((Object)false, (Object)false));
    }

    private LatencyTrackingStateConfig getLatencyTrackingStateConfig() {
        UnregisteredMetricsGroup metricsGroup = new UnregisteredMetricsGroup();
        return ((LatencyTrackingStateConfig.Builder)((LatencyTrackingStateConfig.Builder)LatencyTrackingStateConfig.newBuilder().setEnabled(((Boolean)this.enableLatencyOrSizeTracking.f0).booleanValue())).setMetricGroup((MetricGroup)metricsGroup)).build();
    }

    private SizeTrackingStateConfig getSizeTrackingStateConfig() {
        UnregisteredMetricsGroup metricsGroup = new UnregisteredMetricsGroup();
        return ((SizeTrackingStateConfig.Builder)((SizeTrackingStateConfig.Builder)SizeTrackingStateConfig.newBuilder().setEnabled(((Boolean)this.enableLatencyOrSizeTracking.f1).booleanValue())).setMetricGroup((MetricGroup)metricsGroup)).build();
    }

    @TestTemplate
    <K, N> void testTrackValueState() throws Exception {
        try (MockKeyedStateBackend<String> backend = this.createMock();){
            ValueStateDescriptor valueStateDescriptor = new ValueStateDescriptor("value", String.class);
            InternalValueState valueState = (InternalValueState)backend.createOrUpdateInternalState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)valueStateDescriptor);
            InternalKvState latencyTrackingState = MetricsTrackingStateFactory.createStateAndWrapWithMetricsTrackingIfEnabled((InternalKvState)valueState, null, (StateDescriptor)valueStateDescriptor, (LatencyTrackingStateConfig)this.getLatencyTrackingStateConfig(), (SizeTrackingStateConfig)this.getSizeTrackingStateConfig());
            if (((Boolean)this.enableLatencyOrSizeTracking.f0).booleanValue() || ((Boolean)this.enableLatencyOrSizeTracking.f1).booleanValue()) {
                Assertions.assertThat((Object)latencyTrackingState).isInstanceOf(MetricsTrackingValueState.class);
            } else {
                Assertions.assertThat((Object)latencyTrackingState).isEqualTo((Object)valueState);
            }
        }
    }

    @TestTemplate
    <K, N> void testTrackListState() throws Exception {
        try (MockKeyedStateBackend<String> backend = this.createMock();){
            ListStateDescriptor listStateDescriptor = new ListStateDescriptor("list", String.class);
            InternalListState listState = (InternalListState)backend.createOrUpdateInternalState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)listStateDescriptor);
            InternalKvState latencyTrackingState = MetricsTrackingStateFactory.createStateAndWrapWithMetricsTrackingIfEnabled((InternalKvState)listState, null, (StateDescriptor)listStateDescriptor, (LatencyTrackingStateConfig)this.getLatencyTrackingStateConfig(), (SizeTrackingStateConfig)this.getSizeTrackingStateConfig());
            if (((Boolean)this.enableLatencyOrSizeTracking.f0).booleanValue() || ((Boolean)this.enableLatencyOrSizeTracking.f1).booleanValue()) {
                Assertions.assertThat((Object)latencyTrackingState).isInstanceOf(MetricsTrackingListState.class);
            } else {
                Assertions.assertThat((Object)latencyTrackingState).isEqualTo((Object)listState);
            }
        }
    }

    @TestTemplate
    <K, N> void testTrackMapState() throws Exception {
        try (MockKeyedStateBackend<String> backend = this.createMock();){
            MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("map", String.class, Long.class);
            InternalMapState mapState = (InternalMapState)backend.createOrUpdateInternalState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)mapStateDescriptor);
            InternalKvState latencyTrackingState = MetricsTrackingStateFactory.createStateAndWrapWithMetricsTrackingIfEnabled((InternalKvState)mapState, null, (StateDescriptor)mapStateDescriptor, (LatencyTrackingStateConfig)this.getLatencyTrackingStateConfig(), (SizeTrackingStateConfig)this.getSizeTrackingStateConfig());
            if (((Boolean)this.enableLatencyOrSizeTracking.f0).booleanValue() || ((Boolean)this.enableLatencyOrSizeTracking.f1).booleanValue()) {
                Assertions.assertThat((Object)latencyTrackingState).isInstanceOf(MetricsTrackingMapState.class);
            } else {
                Assertions.assertThat((Object)latencyTrackingState).isEqualTo((Object)mapState);
            }
        }
    }

    @TestTemplate
    <K, N> void testTrackReducingState() throws Exception {
        try (MockKeyedStateBackend<String> backend = this.createMock();){
            ReducingStateDescriptor reducingStateDescriptor = new ReducingStateDescriptor("reducing", Long::sum, Long.class);
            InternalReducingState reducingState = (InternalReducingState)backend.createOrUpdateInternalState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)reducingStateDescriptor);
            InternalKvState latencyTrackingState = MetricsTrackingStateFactory.createStateAndWrapWithMetricsTrackingIfEnabled((InternalKvState)reducingState, null, (StateDescriptor)reducingStateDescriptor, (LatencyTrackingStateConfig)this.getLatencyTrackingStateConfig(), (SizeTrackingStateConfig)this.getSizeTrackingStateConfig());
            if (((Boolean)this.enableLatencyOrSizeTracking.f0).booleanValue() || ((Boolean)this.enableLatencyOrSizeTracking.f1).booleanValue()) {
                Assertions.assertThat((Object)latencyTrackingState).isInstanceOf(MetricsTrackingReducingState.class);
            } else {
                Assertions.assertThat((Object)latencyTrackingState).isEqualTo((Object)reducingState);
            }
        }
    }

    @TestTemplate
    <K, N> void testTrackAggregatingState() throws Exception {
        try (MockKeyedStateBackend<String> backend = this.createMock();){
            AggregatingStateDescriptor aggregatingStateDescriptor = new AggregatingStateDescriptor("aggregate", (AggregateFunction)new AggregateFunction<Long, Long, Long>(){
                private static final long serialVersionUID = 1L;

                public Long createAccumulator() {
                    return 0L;
                }

                public Long add(Long value, Long accumulator) {
                    return value + accumulator;
                }

                public Long getResult(Long accumulator) {
                    return accumulator;
                }

                public Long merge(Long a, Long b) {
                    return a + b;
                }
            }, Long.class);
            InternalAggregatingState aggregatingState = (InternalAggregatingState)backend.createOrUpdateInternalState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)aggregatingStateDescriptor);
            InternalKvState latencyTrackingState = MetricsTrackingStateFactory.createStateAndWrapWithMetricsTrackingIfEnabled((InternalKvState)aggregatingState, null, (StateDescriptor)aggregatingStateDescriptor, (LatencyTrackingStateConfig)this.getLatencyTrackingStateConfig(), (SizeTrackingStateConfig)this.getSizeTrackingStateConfig());
            if (((Boolean)this.enableLatencyOrSizeTracking.f0).booleanValue() || ((Boolean)this.enableLatencyOrSizeTracking.f1).booleanValue()) {
                Assertions.assertThat((Object)latencyTrackingState).isInstanceOf(MetricsTrackingAggregatingState.class);
            } else {
                Assertions.assertThat((Object)latencyTrackingState).isEqualTo((Object)aggregatingState);
            }
        }
    }

    private MockKeyedStateBackend<String> createMock() {
        return new MockKeyedStateBackendBuilder(new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()), StringSerializer.INSTANCE, this.getClass().getClassLoader(), 1, KeyGroupRange.of((int)0, (int)0), new ExecutionConfig(), TtlTimeProvider.DEFAULT, this.getLatencyTrackingStateConfig(), this.getSizeTrackingStateConfig(), (Collection<KeyedStateHandle>)Collections.emptyList(), UncompressedStreamCompressionDecorator.INSTANCE, new CloseableRegistry(), MockKeyedStateBackend.MockSnapshotSupplier.EMPTY).build();
    }
}

