/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst;

import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.ICloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.InternalKeyContextImpl;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendBuilder;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.v2.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.state.forst.ForStConfigurableOptions;
import org.apache.flink.state.forst.ForStDBTtlCompactFiltersManager;
import org.apache.flink.state.forst.ForStKeyedStateBackend;
import org.apache.flink.state.forst.ForStNativeMetricMonitor;
import org.apache.flink.state.forst.ForStNativeMetricOptions;
import org.apache.flink.state.forst.ForStOperationUtils;
import org.apache.flink.state.forst.ForStResourceContainer;
import org.apache.flink.state.forst.ForStStateBackend;
import org.apache.flink.state.forst.datatransfer.ForStStateDataTransfer;
import org.apache.flink.state.forst.fs.cache.FileBasedCache;
import org.apache.flink.state.forst.restore.ForStHeapTimersFullRestoreOperation;
import org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation;
import org.apache.flink.state.forst.restore.ForStNoneRestoreOperation;
import org.apache.flink.state.forst.restore.ForStRestoreOperation;
import org.apache.flink.state.forst.restore.ForStRestoreResult;
import org.apache.flink.state.forst.snapshot.ForStIncrementalSnapshotStrategy;
import org.apache.flink.state.forst.snapshot.ForStNativeFullSnapshotStrategy;
import org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase;
import org.apache.flink.state.forst.sync.ForStPriorityQueueConfig;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.forstdb.ColumnFamilyHandle;
import org.forstdb.ColumnFamilyOptions;
import org.forstdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ForStKeyedStateBackendBuilder<K>
implements StateBackendBuilder<ForStKeyedStateBackend<K>, BackendBuildingException> {
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    private static final int KEY_SERIALIZER_BUFFER_START_SIZE = 32;
    private static final int VALUE_SERIALIZER_BUFFER_START_SIZE = 128;
    private long writeBatchSize = ((MemorySize)ForStConfigurableOptions.WRITE_BATCH_SIZE.defaultValue()).getBytes();
    private final String operatorIdentifier;
    private final ForStPriorityQueueConfig priorityQueueConfig;
    protected final ClassLoader userCodeClassLoader;
    protected final CloseableRegistry cancelStreamRegistry;
    private final StateSerializerProvider<K> keySerializerProvider;
    private final int numberOfKeyGroups;
    private final KeyGroupRange keyGroupRange;
    private final ExecutionConfig executionConfig;
    private final TtlTimeProvider ttlTimeProvider;
    private final Collection<KeyedStateHandle> restoreStateHandles;
    private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
    private final ForStResourceContainer optionsContainer;
    private final MetricGroup metricGroup;
    private final StateBackend.CustomInitializationMetrics customInitializationMetrics;
    private boolean enableIncrementalCheckpointing;
    private ForStNativeMetricOptions nativeMetricOptions;
    private boolean rescalingUseDeleteFilesInRange = false;
    private double overlapFractionThreshold = 0.5;
    private boolean useIngestDbRestoreMode = false;
    private RecoveryClaimMode recoveryClaimMode = RecoveryClaimMode.DEFAULT;

    public ForStKeyedStateBackendBuilder(String operatorIdentifier, ClassLoader userCodeClassLoader, ForStResourceContainer optionsContainer, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, ForStPriorityQueueConfig priorityQueueConfig, TtlTimeProvider ttlTimeProvider, MetricGroup metricGroup, StateBackend.CustomInitializationMetrics customInitializationMetrics, @Nonnull Collection<KeyedStateHandle> stateHandles, CloseableRegistry cancelStreamRegistry) {
        this.operatorIdentifier = operatorIdentifier;
        this.userCodeClassLoader = userCodeClassLoader;
        this.optionsContainer = optionsContainer;
        this.columnFamilyOptionsFactory = (Function)Preconditions.checkNotNull(columnFamilyOptionsFactory);
        this.keySerializerProvider = StateSerializerProvider.fromNewRegisteredSerializer(keySerializer);
        this.numberOfKeyGroups = numberOfKeyGroups;
        this.keyGroupRange = keyGroupRange;
        this.executionConfig = executionConfig;
        this.priorityQueueConfig = priorityQueueConfig;
        this.ttlTimeProvider = ttlTimeProvider;
        this.metricGroup = metricGroup;
        this.customInitializationMetrics = customInitializationMetrics;
        this.restoreStateHandles = stateHandles;
        this.nativeMetricOptions = new ForStNativeMetricOptions();
        this.cancelStreamRegistry = cancelStreamRegistry;
    }

    ForStKeyedStateBackendBuilder<K> setEnableIncrementalCheckpointing(boolean enableIncrementalCheckpointing) {
        this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
        return this;
    }

    ForStKeyedStateBackendBuilder<K> setNativeMetricOptions(ForStNativeMetricOptions nativeMetricOptions) {
        this.nativeMetricOptions = nativeMetricOptions;
        return this;
    }

    ForStKeyedStateBackendBuilder<K> setOverlapFractionThreshold(double overlapFractionThreshold) {
        this.overlapFractionThreshold = overlapFractionThreshold;
        return this;
    }

    ForStKeyedStateBackendBuilder<K> setUseIngestDbRestoreMode(boolean useIngestDbRestoreMode) {
        this.useIngestDbRestoreMode = useIngestDbRestoreMode;
        return this;
    }

    ForStKeyedStateBackendBuilder<K> setRescalingUseDeleteFilesInRange(boolean rescalingUseDeleteFilesInRange) {
        this.rescalingUseDeleteFilesInRange = rescalingUseDeleteFilesInRange;
        return this;
    }

    ForStKeyedStateBackendBuilder<K> setRecoveryClaimMode(RecoveryClaimMode recoveryClaimMode) {
        this.recoveryClaimMode = recoveryClaimMode;
        return this;
    }

    public ForStKeyedStateBackend<K> build() throws BackendBuildingException {
        HeapPriorityQueueSetFactory priorityQueueFactory;
        ColumnFamilyHandle defaultColumnFamilyHandle = null;
        ForStNativeMetricMonitor nativeMetricMonitor = null;
        CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
        LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation = new LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo>();
        LinkedHashMap registeredPQStates = new LinkedHashMap();
        ForStDBTtlCompactFiltersManager ttlCompactFiltersManager = new ForStDBTtlCompactFiltersManager(this.ttlTimeProvider, this.optionsContainer.getQueryTimeAfterNumEntries(), this.optionsContainer.getPeriodicCompactionTime());
        RocksDB db = null;
        ForStRestoreOperation restoreOperation = null;
        int keyGroupPrefixBytes = CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix((int)this.numberOfKeyGroups);
        ResourceGuard forstResourceGuard = new ResourceGuard();
        ForStSnapshotStrategyBase<K, ?> snapshotStrategy = null;
        Supplier serializedKeyBuilder = () -> new SerializedCompositeKeyBuilder(this.keySerializerProvider.currentSchemaSerializer().duplicate(), keyGroupPrefixBytes, 32);
        Supplier<DataOutputSerializer> valueSerializerView = () -> new DataOutputSerializer(128);
        Supplier<DataInputDeserializer> valueDeserializerView = DataInputDeserializer::new;
        UUID backendUID = UUID.randomUUID();
        try {
            FileBasedCache.setFlinkThread();
            this.optionsContainer.prepareDirectories();
            restoreOperation = this.getForStRestoreOperation(keyGroupPrefixBytes, kvStateInformation, registeredPQStates, ttlCompactFiltersManager);
            ForStRestoreResult restoreResult = restoreOperation.restore();
            db = restoreResult.getDb();
            defaultColumnFamilyHandle = restoreResult.getDefaultColumnFamilyHandle();
            nativeMetricMonitor = restoreResult.getNativeMetricMonitor();
            SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> materializedSstFiles = new TreeMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>>();
            long lastCompletedCheckpointId = -1L;
            if (restoreOperation instanceof ForStIncrementalRestoreOperation) {
                backendUID = restoreResult.getBackendUID();
                lastCompletedCheckpointId = restoreResult.getLastCompletedCheckpointId();
                if (this.recoveryClaimMode != RecoveryClaimMode.NO_CLAIM) {
                    materializedSstFiles = restoreResult.getRestoredSstFiles();
                }
            }
            snapshotStrategy = this.initializeSnapshotStrategy(db, forstResourceGuard, this.keySerializerProvider.currentSchemaSerializer(), kvStateInformation, this.keyGroupRange, keyGroupPrefixBytes, backendUID, materializedSstFiles, lastCompletedCheckpointId);
            priorityQueueFactory = this.createHeapQueueFactory();
        }
        catch (Throwable e) {
            IOUtils.closeQuietly((AutoCloseable)cancelStreamRegistryForBackend);
            IOUtils.closeQuietly(defaultColumnFamilyHandle);
            IOUtils.closeQuietly(nativeMetricMonitor);
            IOUtils.closeQuietly(db);
            IOUtils.closeQuietly((AutoCloseable)restoreOperation);
            try {
                this.optionsContainer.clearDirectories();
                this.optionsContainer.forceClearRemoteDirectories();
            }
            catch (Exception ex) {
                this.logger.warn("Failed to delete ForSt: {}.", (Object)this.optionsContainer.getPathContainer(), (Object)ex);
            }
            IOUtils.closeQuietly((AutoCloseable)this.optionsContainer);
            IOUtils.closeQuietly(snapshotStrategy);
            if (e instanceof BackendBuildingException) {
                throw (BackendBuildingException)e;
            }
            String errMsg = "Caught unexpected exception.";
            this.logger.error(errMsg, e);
            throw new BackendBuildingException(errMsg, e);
        }
        InternalKeyContextImpl keyContext = new InternalKeyContextImpl(this.keyGroupRange, this.numberOfKeyGroups);
        this.logger.info("Finished building ForSt keyed state-backend at {}", (Object)this.optionsContainer.getPathContainer());
        return new ForStKeyedStateBackend(backendUID, this.executionConfig, this.optionsContainer, forstResourceGuard, keyGroupPrefixBytes, this.keySerializerProvider.currentSchemaSerializer(), serializedKeyBuilder, valueSerializerView, valueDeserializerView, db, kvStateInformation, registeredPQStates, this.columnFamilyOptionsFactory, defaultColumnFamilyHandle, snapshotStrategy, (PriorityQueueSetFactory)priorityQueueFactory, cancelStreamRegistryForBackend, nativeMetricMonitor, keyContext, this.ttlTimeProvider, ttlCompactFiltersManager);
    }

    private ForStRestoreOperation getForStRestoreOperation(int keyGroupPrefixBytes, LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation, LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, ForStDBTtlCompactFiltersManager ttlCompactFiltersManager) {
        Path instanceForStPath;
        Path path = instanceForStPath = this.optionsContainer.getPathContainer().getRemoteForStPath() == null ? this.optionsContainer.getPathContainer().getLocalForStPath() : new Path("/db");
        if (CollectionUtil.isEmptyOrAllElementsNull(this.restoreStateHandles)) {
            return new ForStNoneRestoreOperation(Collections.emptyMap(), instanceForStPath, this.optionsContainer.getDbOptions(), this.columnFamilyOptionsFactory, this.nativeMetricOptions, this.metricGroup, ttlCompactFiltersManager, this.writeBatchSize, this.optionsContainer.getWriteBufferManagerCapacity());
        }
        KeyedStateHandle firstStateHandle = this.restoreStateHandles.iterator().next();
        if (firstStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
            return new ForStIncrementalRestoreOperation<K>(this.operatorIdentifier, this.keyGroupRange, keyGroupPrefixBytes, this.cancelStreamRegistry, this.userCodeClassLoader, kvStateInformation, this.keySerializerProvider, this.optionsContainer, this.optionsContainer.getBasePath(), instanceForStPath, this.optionsContainer.getDbOptions(), this.columnFamilyOptionsFactory, this.nativeMetricOptions, this.metricGroup, ttlCompactFiltersManager, this.writeBatchSize, this.optionsContainer.getWriteBufferManagerCapacity(), this.customInitializationMetrics, CollectionUtil.checkedSubTypeCast(this.restoreStateHandles, IncrementalRemoteKeyedStateHandle.class), this.overlapFractionThreshold, this.useIngestDbRestoreMode, this.rescalingUseDeleteFilesInRange, this.recoveryClaimMode, RegisteredKeyValueStateBackendMetaInfo::fromMetaInfoSnapshot);
        }
        if (this.priorityQueueConfig.getPriorityQueueStateType() == ForStStateBackend.PriorityQueueStateType.HEAP) {
            return new ForStHeapTimersFullRestoreOperation<K>(this.keyGroupRange, this.numberOfKeyGroups, this.userCodeClassLoader, kvStateInformation, registeredPQStates, this.createHeapQueueFactory(), this.keySerializerProvider, instanceForStPath, this.optionsContainer.getDbOptions(), this.columnFamilyOptionsFactory, this.nativeMetricOptions, this.metricGroup, ttlCompactFiltersManager, this.writeBatchSize, this.optionsContainer.getWriteBufferManagerCapacity(), this.restoreStateHandles, (ICloseableRegistry)this.cancelStreamRegistry);
        }
        throw new UnsupportedOperationException("Not support restoring yet for ForStStateBackend");
    }

    private ForStSnapshotStrategyBase<K, ?> initializeSnapshotStrategy(@Nonnull RocksDB db, @Nonnull ResourceGuard forstResourceGuard, @Nonnull TypeSerializer<K> keySerializer, @Nonnull LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation, @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int keyGroupPrefixBytes, @Nonnull UUID backendUID, @Nonnull SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> uploadedStateHandles, long lastCompletedCheckpointId) {
        ForStStateDataTransfer stateTransfer = new ForStStateDataTransfer(4, this.optionsContainer.getFileSystem());
        if (this.enableIncrementalCheckpointing) {
            return new ForStIncrementalSnapshotStrategy<K>(db, forstResourceGuard, this.optionsContainer, keySerializer, kvStateInformation, keyGroupRange, keyGroupPrefixBytes, backendUID, uploadedStateHandles, stateTransfer, lastCompletedCheckpointId);
        }
        return new ForStNativeFullSnapshotStrategy<K>(db, forstResourceGuard, this.optionsContainer, keySerializer, kvStateInformation, keyGroupRange, keyGroupPrefixBytes, backendUID, stateTransfer);
    }

    private HeapPriorityQueueSetFactory createHeapQueueFactory() {
        return new HeapPriorityQueueSetFactory(this.keyGroupRange, this.numberOfKeyGroups, 128);
    }
}

