/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.v2.ttl;

import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.v2.ListStateDescriptor;
import org.apache.flink.api.common.state.v2.MapStateDescriptor;
import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.state.v2.StateDescriptor;
import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompositeSerializer;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.ttl.TtlReduceFunction;
import org.apache.flink.runtime.state.ttl.TtlStateContext;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.ttl.TtlValue;
import org.apache.flink.runtime.state.v2.ttl.TtlAggregateFunction;
import org.apache.flink.runtime.state.v2.ttl.TtlAggregatingState;
import org.apache.flink.runtime.state.v2.ttl.TtlListState;
import org.apache.flink.runtime.state.v2.ttl.TtlMapState;
import org.apache.flink.runtime.state.v2.ttl.TtlReducingState;
import org.apache.flink.runtime.state.v2.ttl.TtlValueState;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;

public class TtlStateFactory<K, N, SV, TTLSV, S extends State, IS> {
    private final Map<StateDescriptor.Type, SupplierWithException<IS, Exception>> stateFactories;
    private N defaultNamespace;
    private TypeSerializer<N> namespaceSerializer;
    private StateDescriptor<SV> stateDesc;
    private AsyncKeyedStateBackend<K> stateBackend;
    private TtlTimeProvider timeProvider;
    @Nonnull
    private final StateTtlConfig ttlConfig;
    private final long ttl;

    public static <K, N, SV, TTLSV, S extends State, IS extends S> IS createStateAndWrapWithTtlIfEnabled(N defaultNamespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<SV> stateDesc, AsyncKeyedStateBackend<K> stateBackend, TtlTimeProvider timeProvider) throws Exception {
        Preconditions.checkNotNull(namespaceSerializer);
        Preconditions.checkNotNull(stateDesc);
        Preconditions.checkNotNull(stateBackend);
        Preconditions.checkNotNull((Object)timeProvider);
        if (stateDesc.getTtlConfig().isEnabled()) {
            if (!stateDesc.getTtlConfig().getCleanupStrategies().inRocksdbCompactFilter()) {
                throw new UnsupportedOperationException("Only ROCKSDB_COMPACTION_FILTER strategy is supported in state V2.");
            }
            if (stateDesc.getTtlConfig().getUpdateType().equals((Object)StateTtlConfig.UpdateType.OnReadAndWrite)) {
                throw new UnsupportedOperationException("OnReadAndWrite update type is not supported in state V2.");
            }
        }
        return (IS)(stateDesc.getTtlConfig().isEnabled() ? (State)new TtlStateFactory<K, N, SV, TTLSV, S, IS>(defaultNamespace, namespaceSerializer, stateDesc, stateBackend, timeProvider).createState() : stateBackend.createStateInternal(defaultNamespace, namespaceSerializer, stateDesc));
    }

    public static boolean isTtlStateSerializer(TypeSerializer<?> typeSerializer) {
        boolean ttlSerializer = typeSerializer instanceof TtlSerializer;
        return ttlSerializer;
    }

    private TtlStateFactory(N defaultNamespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<SV> stateDesc, AsyncKeyedStateBackend<K> stateBackend, TtlTimeProvider timeProvider) {
        this.defaultNamespace = defaultNamespace;
        this.namespaceSerializer = namespaceSerializer;
        this.stateDesc = stateDesc;
        this.stateBackend = stateBackend;
        this.ttlConfig = stateDesc.getTtlConfig();
        this.timeProvider = timeProvider;
        this.ttl = this.ttlConfig.getTimeToLive().toMillis();
        this.stateFactories = this.createStateFactories();
    }

    private Map<StateDescriptor.Type, SupplierWithException<IS, Exception>> createStateFactories() {
        return Stream.of(Tuple2.of((Object)StateDescriptor.Type.VALUE, this::createValueState), Tuple2.of((Object)StateDescriptor.Type.LIST, this::createListState), Tuple2.of((Object)StateDescriptor.Type.MAP, this::createMapState), Tuple2.of((Object)StateDescriptor.Type.REDUCING, this::createReducingState), Tuple2.of((Object)StateDescriptor.Type.AGGREGATING, this::createAggregatingState)).collect(Collectors.toMap(t -> (StateDescriptor.Type)t.f0, t -> (SupplierWithException)t.f1));
    }

    private IS createState() throws Exception {
        SupplierWithException<IS, Exception> stateFactory = this.stateFactories.get(this.stateDesc.getType());
        if (stateFactory == null) {
            String message = String.format("State type: %s is not supported by %s", this.stateDesc.getType(), TtlStateFactory.class);
            throw new FlinkRuntimeException(message);
        }
        return (IS)stateFactory.get();
    }

    private IS createValueState() throws Exception {
        ValueStateDescriptor ttlDescriptor = this.stateDesc.getSerializer() instanceof TtlSerializer ? (ValueStateDescriptor)this.stateDesc : new ValueStateDescriptor(this.stateDesc.getStateId(), new TtlSerializer((TypeSerializer<Long>)LongSerializer.INSTANCE, this.stateDesc.getSerializer()));
        return (IS)new TtlValueState(this.createTtlStateContext((StateDescriptor)ttlDescriptor));
    }

    private <T> IS createListState() throws Exception {
        ListStateDescriptor listStateDesc = (ListStateDescriptor)this.stateDesc;
        ListStateDescriptor ttlDescriptor = listStateDesc.getSerializer() instanceof TtlSerializer ? (ListStateDescriptor)this.stateDesc : new ListStateDescriptor(this.stateDesc.getStateId(), new TtlSerializer((TypeSerializer<Long>)LongSerializer.INSTANCE, listStateDesc.getSerializer()));
        return (IS)new TtlListState(this.createTtlStateContext((StateDescriptor)ttlDescriptor));
    }

    private <UK, UV> IS createMapState() throws Exception {
        MapStateDescriptor mapStateDesc = (MapStateDescriptor)this.stateDesc;
        MapStateDescriptor ttlDescriptor = mapStateDesc.getSerializer() instanceof TtlSerializer ? (MapStateDescriptor)this.stateDesc : new MapStateDescriptor(this.stateDesc.getStateId(), mapStateDesc.getUserKeySerializer(), new TtlSerializer((TypeSerializer<Long>)LongSerializer.INSTANCE, mapStateDesc.getSerializer()));
        return (IS)new TtlMapState(this.createTtlStateContext((StateDescriptor)ttlDescriptor));
    }

    private IS createReducingState() throws Exception {
        ReducingStateDescriptor reducingStateDesc = (ReducingStateDescriptor)this.stateDesc;
        ReducingStateDescriptor ttlDescriptor = this.stateDesc.getSerializer() instanceof TtlSerializer ? (ReducingStateDescriptor)this.stateDesc : new ReducingStateDescriptor(this.stateDesc.getStateId(), new TtlReduceFunction(reducingStateDesc.getReduceFunction(), this.ttlConfig, this.timeProvider), new TtlSerializer((TypeSerializer<Long>)LongSerializer.INSTANCE, this.stateDesc.getSerializer()));
        return (IS)new TtlReducingState(this.createTtlStateContext((StateDescriptor)ttlDescriptor));
    }

    private <IN, OUT> IS createAggregatingState() throws Exception {
        AggregatingStateDescriptor aggregatingStateDescriptor = (AggregatingStateDescriptor)this.stateDesc;
        TtlAggregateFunction ttlAggregateFunction = new TtlAggregateFunction(aggregatingStateDescriptor.getAggregateFunction(), this.ttlConfig, this.timeProvider);
        AggregatingStateDescriptor ttlDescriptor = this.stateDesc.getSerializer() instanceof TtlSerializer ? (AggregatingStateDescriptor)this.stateDesc : new AggregatingStateDescriptor(this.stateDesc.getStateId(), ttlAggregateFunction, new TtlSerializer((TypeSerializer<Long>)LongSerializer.INSTANCE, this.stateDesc.getSerializer()));
        return (IS)new TtlAggregatingState(this.createTtlStateContext((StateDescriptor)ttlDescriptor), ttlAggregateFunction);
    }

    private <OIS extends State, TTLS extends State, V, TTLV> TtlStateContext<OIS, V> createTtlStateContext(StateDescriptor<TTLV> ttlDescriptor) throws Exception {
        ttlDescriptor.enableTimeToLive(this.stateDesc.getTtlConfig());
        Object originalState = this.stateBackend.createStateInternal(this.defaultNamespace, this.namespaceSerializer, ttlDescriptor);
        return new TtlStateContext(originalState, this.ttlConfig, this.timeProvider, this.stateDesc.getSerializer(), () -> {});
    }

    public static final class TtlSerializerSnapshot<T>
    extends CompositeTypeSerializerSnapshot<TtlValue<T>, TtlSerializer<T>> {
        private static final int VERSION = 2;

        public TtlSerializerSnapshot() {
        }

        TtlSerializerSnapshot(TtlSerializer<T> serializerInstance) {
            super(serializerInstance);
        }

        protected int getCurrentOuterSnapshotVersion() {
            return 2;
        }

        protected TypeSerializer<?>[] getNestedSerializers(TtlSerializer<T> outerSerializer) {
            return new TypeSerializer[]{outerSerializer.getTimestampSerializer(), outerSerializer.getValueSerializer()};
        }

        protected TtlSerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
            TypeSerializer<?> timestampSerializer = nestedSerializers[0];
            TypeSerializer<?> valueSerializer = nestedSerializers[1];
            return new TtlSerializer(timestampSerializer, valueSerializer);
        }
    }

    public static class TtlSerializer<T>
    extends CompositeSerializer<TtlValue<T>> {
        private static final long serialVersionUID = 131020282727167064L;

        public TtlSerializer(TypeSerializer<Long> timestampSerializer, TypeSerializer<T> userValueSerializer) {
            super(true, new TypeSerializer[]{timestampSerializer, userValueSerializer});
            Preconditions.checkArgument((!(userValueSerializer instanceof TtlSerializer) ? 1 : 0) != 0);
        }

        public TtlSerializer(CompositeSerializer.PrecomputedParameters precomputed, TypeSerializer<?> ... fieldSerializers) {
            super(precomputed, fieldSerializers);
        }

        public TtlValue<T> createInstance(Object ... values) {
            Preconditions.checkArgument((values.length == 2 ? 1 : 0) != 0);
            return new TtlValue<Object>(values[1], (Long)values[0]);
        }

        protected void setField(@Nonnull TtlValue<T> v, int index, Object fieldValue) {
            throw new UnsupportedOperationException("TtlValue is immutable");
        }

        protected Object getField(@Nonnull TtlValue<T> v, int index) {
            return index == 0 ? Long.valueOf(v.getLastAccessTimestamp()) : v.getUserValue();
        }

        protected CompositeSerializer<TtlValue<T>> createSerializerInstance(CompositeSerializer.PrecomputedParameters precomputed, TypeSerializer<?> ... originalSerializers) {
            Preconditions.checkNotNull(originalSerializers);
            Preconditions.checkArgument((originalSerializers.length == 2 ? 1 : 0) != 0);
            return new TtlSerializer<T>(precomputed, originalSerializers);
        }

        TypeSerializer<Long> getTimestampSerializer() {
            return this.fieldSerializers[0];
        }

        TypeSerializer<T> getValueSerializer() {
            return this.fieldSerializers[1];
        }

        public TypeSerializerSnapshot<TtlValue<T>> snapshotConfiguration() {
            return new TtlSerializerSnapshot(this);
        }
    }
}

