/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.RunnableFuture;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.v2.MapStateDescriptor;
import org.apache.flink.api.common.state.v2.ReducingStateDescriptor;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.state.v2.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.ICloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.HeapPriorityQueuesManager;
import org.apache.flink.runtime.state.InternalKeyContext;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.SnapshotExecutionType;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.v2.RegisteredKeyAndUserKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.v2.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.v2.internal.InternalKeyedState;
import org.apache.flink.runtime.state.v2.ttl.TtlStateFactory;
import org.apache.flink.state.forst.ForStAggregatingState;
import org.apache.flink.state.forst.ForStDBTtlCompactFiltersManager;
import org.apache.flink.state.forst.ForStListState;
import org.apache.flink.state.forst.ForStMapState;
import org.apache.flink.state.forst.ForStNativeMetricMonitor;
import org.apache.flink.state.forst.ForStOperationUtils;
import org.apache.flink.state.forst.ForStReducingState;
import org.apache.flink.state.forst.ForStResourceContainer;
import org.apache.flink.state.forst.ForStStateExecutor;
import org.apache.flink.state.forst.ForStValueState;
import org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.apache.flink.util.StateMigrationException;
import org.forstdb.ColumnFamilyHandle;
import org.forstdb.ColumnFamilyOptions;
import org.forstdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ForStKeyedStateBackend<K>
implements AsyncKeyedStateBackend<K> {
    private static final Logger LOG = LoggerFactory.getLogger(ForStKeyedStateBackend.class);
    private final ExecutionConfig executionConfig;
    private final int keyGroupPrefixBytes;
    private final KeyGroupRange keyGroupRange;
    protected final TtlTimeProvider ttlTimeProvider;
    protected final TypeSerializer<K> keySerializer;
    private final Supplier<SerializedCompositeKeyBuilder<K>> serializedKeyBuilder;
    private final Supplier<DataOutputSerializer> valueSerializerView;
    private final Supplier<DataInputDeserializer> valueDeserializerView;
    private final UUID backendUID;
    private final ForStResourceContainer optionsContainer;
    private final ResourceGuard resourceGuard;
    private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
    private final ColumnFamilyHandle defaultColumnFamily;
    private final ForStSnapshotStrategyBase<K, ?> snapshotStrategy;
    private final PriorityQueueSetFactory priorityQueueFactory;
    private final HeapPriorityQueuesManager heapPriorityQueuesManager;
    private final CloseableRegistry cancelStreamRegistry;
    private final ForStNativeMetricMonitor nativeMetricMonitor;
    protected final RocksDB db;
    private StateRequestHandler stateRequestHandler;
    private final LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation;
    private final HashMap<String, InternalKeyedState<K, ?, ?>> keyValueStatesByName;
    private final Object lock = new Object();
    @GuardedBy(value="lock")
    private final Set<StateExecutor> managedStateExecutors;
    @GuardedBy(value="lock")
    private boolean disposed = false;
    private final ForStDBTtlCompactFiltersManager ttlCompactFiltersManager;

    public ForStKeyedStateBackend(UUID backendUID, ExecutionConfig executionConfig, ForStResourceContainer optionsContainer, ResourceGuard resourceGuard, int keyGroupPrefixBytes, TypeSerializer<K> keySerializer, Supplier<SerializedCompositeKeyBuilder<K>> serializedKeyBuilder, Supplier<DataOutputSerializer> valueSerializerView, Supplier<DataInputDeserializer> valueDeserializerView, RocksDB db, LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation, Map<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, ColumnFamilyHandle defaultColumnFamilyHandle, ForStSnapshotStrategyBase<K, ?> snapshotStrategy, PriorityQueueSetFactory priorityQueueFactory, CloseableRegistry cancelStreamRegistry, ForStNativeMetricMonitor nativeMetricMonitor, InternalKeyContext<K> keyContext, TtlTimeProvider ttlTimeProvider, ForStDBTtlCompactFiltersManager ttlCompactFiltersManager) {
        this.backendUID = backendUID;
        this.executionConfig = executionConfig;
        this.optionsContainer = (ForStResourceContainer)Preconditions.checkNotNull((Object)optionsContainer);
        this.resourceGuard = resourceGuard;
        this.keyGroupPrefixBytes = keyGroupPrefixBytes;
        this.keyGroupRange = keyContext.getKeyGroupRange();
        this.keySerializer = keySerializer;
        this.serializedKeyBuilder = serializedKeyBuilder;
        this.valueSerializerView = valueSerializerView;
        this.valueDeserializerView = valueDeserializerView;
        this.db = db;
        this.kvStateInformation = kvStateInformation;
        this.keyValueStatesByName = new HashMap();
        this.columnFamilyOptionsFactory = columnFamilyOptionsFactory;
        this.defaultColumnFamily = defaultColumnFamilyHandle;
        this.snapshotStrategy = snapshotStrategy;
        this.cancelStreamRegistry = cancelStreamRegistry;
        this.nativeMetricMonitor = nativeMetricMonitor;
        this.ttlTimeProvider = ttlTimeProvider;
        this.ttlCompactFiltersManager = ttlCompactFiltersManager;
        this.managedStateExecutors = new HashSet<StateExecutor>(1);
        this.priorityQueueFactory = priorityQueueFactory;
        this.heapPriorityQueuesManager = priorityQueueFactory instanceof HeapPriorityQueueSetFactory ? new HeapPriorityQueuesManager(registeredPQStates, (HeapPriorityQueueSetFactory)priorityQueueFactory, keyContext.getKeyGroupRange(), keyContext.getNumberOfKeyGroups()) : null;
    }

    public void setup(@Nonnull StateRequestHandler stateRequestHandler) {
        this.stateRequestHandler = stateRequestHandler;
    }

    public <N, S extends State, SV> S getOrCreateKeyedState(N defaultNamespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<SV> stateDesc) throws Exception {
        Preconditions.checkNotNull(namespaceSerializer, (String)"Namespace serializer");
        InternalKeyedState kvState = this.keyValueStatesByName.get(stateDesc.getStateId());
        if (kvState == null) {
            if (!stateDesc.isSerializerInitialized()) {
                stateDesc.initializeSerializerUnlessSet(this.executionConfig);
            }
            kvState = (InternalKeyedState)this.createState(defaultNamespace, namespaceSerializer, stateDesc);
            this.keyValueStatesByName.put(stateDesc.getStateId(), kvState);
        }
        return (S)kvState;
    }

    @Nonnull
    protected <N, S extends State, SV> S createState(@Nonnull N defaultNamespace, @Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<SV> stateDesc) throws Exception {
        return (S)TtlStateFactory.createStateAndWrapWithTtlIfEnabled(defaultNamespace, namespaceSerializer, stateDesc, (AsyncKeyedStateBackend)this, (TtlTimeProvider)this.ttlTimeProvider);
    }

    @Nonnull
    public <N, S extends InternalKeyedState, SV> S createStateInternal(@Nonnull N defaultNamespace, @Nonnull TypeSerializer<N> namespaceSerializer, @Nonnull StateDescriptor<SV> stateDesc) throws Exception {
        Preconditions.checkNotNull((Object)this.stateRequestHandler, (String)"A non-null stateRequestHandler must be setup before createState");
        Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult = this.tryRegisterKvStateInformation(stateDesc, namespaceSerializer);
        ColumnFamilyHandle columnFamilyHandle = (ColumnFamilyHandle)registerResult.f0;
        switch (stateDesc.getType()) {
            case VALUE: {
                return (S)new ForStValueState(this.stateRequestHandler, columnFamilyHandle, ((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getStateSerializer(), this.serializedKeyBuilder, defaultNamespace, () -> namespaceSerializer.duplicate(), this.valueSerializerView, this.valueDeserializerView);
            }
            case LIST: {
                return (S)new ForStListState(this.stateRequestHandler, columnFamilyHandle, ((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getStateSerializer(), this.serializedKeyBuilder, defaultNamespace, () -> namespaceSerializer.duplicate(), this.valueSerializerView, this.valueDeserializerView);
            }
            case MAP: {
                Supplier<DataInputDeserializer> keyDeserializerView = DataInputDeserializer::new;
                RegisteredKeyAndUserKeyValueStateBackendMetaInfo mapStateMetaInfo = (RegisteredKeyAndUserKeyValueStateBackendMetaInfo)registerResult.f1;
                return (S)((InternalKeyedState)ForStMapState.create(mapStateMetaInfo.getUserKeySerializer(), mapStateMetaInfo.getStateSerializer(), this.stateRequestHandler, columnFamilyHandle, this.serializedKeyBuilder, defaultNamespace, () -> namespaceSerializer.duplicate(), this.valueSerializerView, keyDeserializerView, this.valueDeserializerView, this.keyGroupPrefixBytes));
            }
            case REDUCING: {
                return (S)new ForStReducingState(this.stateRequestHandler, columnFamilyHandle, ((ReducingStateDescriptor)stateDesc).getReduceFunction(), ((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getStateSerializer(), this.serializedKeyBuilder, defaultNamespace, () -> namespaceSerializer.duplicate(), this.valueSerializerView, this.valueDeserializerView);
            }
            case AGGREGATING: {
                return (S)new ForStAggregatingState(((AggregatingStateDescriptor)stateDesc).getAggregateFunction(), ((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getStateSerializer(), this.stateRequestHandler, columnFamilyHandle, this.serializedKeyBuilder, defaultNamespace, () -> namespaceSerializer.duplicate(), this.valueSerializerView, this.valueDeserializerView);
            }
        }
        throw new UnsupportedOperationException(String.format("Unsupported state type: %s", stateDesc.getType()));
    }

    private <N, SV> Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> tryRegisterKvStateInformation(StateDescriptor<SV> stateDesc, TypeSerializer<N> namespaceSerializer) throws Exception {
        ForStOperationUtils.ForStKvStateInfo newStateInfo;
        Object newMetaInfo;
        TypeSerializer userKeySerializer;
        ForStOperationUtils.ForStKvStateInfo oldStateInfo = this.kvStateInformation.get(stateDesc.getStateId());
        TypeSerializer stateSerializer = stateDesc.getSerializer();
        TypeSerializer typeSerializer = userKeySerializer = stateDesc instanceof MapStateDescriptor ? ((MapStateDescriptor)stateDesc).getUserKeySerializer() : null;
        if (oldStateInfo != null) {
            RegisteredKeyValueStateBackendMetaInfo castedMetaInfo = (RegisteredKeyValueStateBackendMetaInfo)oldStateInfo.metaInfo;
            newMetaInfo = this.updateRestoredStateMetaInfo(Tuple2.of((Object)oldStateInfo.columnFamilyHandle, (Object)castedMetaInfo), stateDesc, stateSerializer, userKeySerializer, namespaceSerializer);
            newStateInfo = new ForStOperationUtils.ForStKvStateInfo(oldStateInfo.columnFamilyHandle, (RegisteredStateMetaInfoBase)newMetaInfo);
            this.kvStateInformation.put(stateDesc.getStateId(), newStateInfo);
        } else {
            newMetaInfo = stateDesc.getType().equals((Object)StateDescriptor.Type.MAP) ? new RegisteredKeyAndUserKeyValueStateBackendMetaInfo(stateDesc.getStateId(), stateDesc.getType(), namespaceSerializer, stateSerializer, userKeySerializer) : new RegisteredKeyValueStateBackendMetaInfo(stateDesc.getStateId(), stateDesc.getType(), namespaceSerializer, stateSerializer);
            newStateInfo = ForStOperationUtils.createStateInfo((RegisteredStateMetaInfoBase)newMetaInfo, this.db, this.columnFamilyOptionsFactory, this.ttlCompactFiltersManager, this.optionsContainer.getWriteBufferManagerCapacity(), (ICloseableRegistry)this.cancelStreamRegistry);
            ForStOperationUtils.registerKvStateInformation(this.kvStateInformation, this.nativeMetricMonitor, stateDesc.getStateId(), newStateInfo);
        }
        return Tuple2.of((Object)newStateInfo.columnFamilyHandle, (Object)newMetaInfo);
    }

    private <N, UK, SV> RegisteredKeyValueStateBackendMetaInfo<N, SV> updateRestoredStateMetaInfo(Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> oldStateInfo, StateDescriptor<SV> stateDesc, TypeSerializer<SV> stateSerializer, TypeSerializer<UK> userKeySerializer, TypeSerializer<N> namespaceSerializer) throws Exception {
        TypeSerializerSchemaCompatibility userKeySerializerCompatibility;
        RegisteredKeyValueStateBackendMetaInfo restoredKvStateMetaInfo = (RegisteredKeyValueStateBackendMetaInfo)oldStateInfo.f1;
        TypeSerializer previousNamespaceSerializer = restoredKvStateMetaInfo.getNamespaceSerializer();
        TypeSerializerSchemaCompatibility s = restoredKvStateMetaInfo.updateNamespaceSerializer(namespaceSerializer);
        if (s.isCompatibleAfterMigration() || s.isIncompatible()) {
            throw new StateMigrationException("The new namespace serializer (" + namespaceSerializer + ") must be compatible with the old namespace serializer (" + previousNamespaceSerializer + ").");
        }
        restoredKvStateMetaInfo.checkStateMetaInfo(stateDesc);
        TypeSerializer previousStateSerializer = restoredKvStateMetaInfo.getStateSerializer();
        TypeSerializerSchemaCompatibility newStateSerializerCompatibility = restoredKvStateMetaInfo.updateStateSerializer(stateSerializer);
        if (!stateSerializer.equals((Object)previousStateSerializer) && newStateSerializerCompatibility.isCompatibleAfterMigration()) {
            throw new UnsupportedOperationException("State migration not support yet.");
        }
        if (newStateSerializerCompatibility.isIncompatible()) {
            throw new StateMigrationException("The new state serializer (" + stateSerializer + ") must not be incompatible with the old state serializer (" + previousStateSerializer + ").");
        }
        if (userKeySerializer != null && !(userKeySerializerCompatibility = ((RegisteredKeyAndUserKeyValueStateBackendMetaInfo)restoredKvStateMetaInfo).updateUserKeySerializer(userKeySerializer)).isCompatibleAsIs()) {
            throw new StateMigrationException("The new serializer for a MapState requires state migration in order for the job to proceed. State migration not support yet.");
        }
        return restoredKvStateMetaInfo;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nonnull
    public StateExecutor createStateExecutor() {
        Object object = this.lock;
        synchronized (object) {
            if (this.disposed) {
                throw new FlinkRuntimeException("Attempt to create StateExecutor after ForStKeyedStateBackend is disposed.");
            }
            ForStStateExecutor stateExecutor = new ForStStateExecutor(this.optionsContainer.isCoordinatorInline(), this.optionsContainer.isWriteInline(), this.optionsContainer.getReadIoParallelism(), this.optionsContainer.getWriteIoParallelism(), this.db, this.optionsContainer.getWriteOptions());
            this.managedStateExecutors.add(stateExecutor);
            return stateExecutor;
        }
    }

    public KeyGroupRange getKeyGroupRange() {
        return this.keyGroupRange;
    }

    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) throws Exception {
        return new SnapshotStrategyRunner(this.snapshotStrategy.getDescription(), this.snapshotStrategy, this.cancelStreamRegistry, SnapshotExecutionType.ASYNCHRONOUS).snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (this.snapshotStrategy != null) {
            this.snapshotStrategy.notifyCheckpointComplete(checkpointId);
        }
    }

    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        if (this.snapshotStrategy != null) {
            this.snapshotStrategy.notifyCheckpointAborted(checkpointId);
        }
    }

    public void notifyCheckpointSubsumed(long checkpointId) throws Exception {
        LOG.info("Backend:{} checkpoint: {} subsumed.", (Object)this.backendUID, (Object)checkpointId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void dispose() {
        Object object = this.lock;
        synchronized (object) {
            if (this.disposed) {
                return;
            }
            this.resourceGuard.close();
            for (StateExecutor executor : this.managedStateExecutors) {
                executor.shutdown();
            }
            if (this.db != null) {
                if (this.nativeMetricMonitor != null) {
                    this.nativeMetricMonitor.close();
                }
                IOUtils.closeQuietly((AutoCloseable)this.defaultColumnFamily);
                IOUtils.closeQuietly((AutoCloseable)this.db);
                LOG.info("Closed ForSt State Backend. Cleaning up ForSt local working directory {}, remote working directory {}.", (Object)this.optionsContainer.getLocalBasePath(), (Object)this.optionsContainer.getRemoteBasePath());
                try {
                    this.optionsContainer.clearDirectories();
                }
                catch (Exception ex) {
                    LOG.warn("Could not delete ForSt local working directory {}, remote working directory {}.", new Object[]{this.optionsContainer.getLocalBasePath(), this.optionsContainer.getRemoteBasePath(), ex});
                }
                IOUtils.closeQuietly((AutoCloseable)this.optionsContainer);
            }
            IOUtils.closeQuietly(this.snapshotStrategy);
            this.disposed = true;
        }
    }

    public boolean isSafeToReuseKVState() {
        return true;
    }

    @VisibleForTesting
    Path getLocalBasePath() {
        return this.optionsContainer.getLocalBasePath();
    }

    @VisibleForTesting
    Path getRemoteBasePath() {
        return this.optionsContainer.getRemoteBasePath();
    }

    public void close() throws IOException {
        this.dispose();
    }

    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer) {
        return this.create(stateName, byteOrderedElementSerializer, false);
    }

    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String stateName, @Nonnull TypeSerializer<T> byteOrderedElementSerializer, boolean allowFutureMetadataUpdates) {
        if (this.heapPriorityQueuesManager != null) {
            return this.heapPriorityQueuesManager.createOrUpdate(stateName, byteOrderedElementSerializer, allowFutureMetadataUpdates);
        }
        return this.priorityQueueFactory.create(stateName, byteOrderedElementSerializer, allowFutureMetadataUpdates);
    }
}

