/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst.sync;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.ICloseableRegistry;
import org.apache.flink.core.fs.Path;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackendBuilder;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.CompositeKeySerializationUtils;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.InternalKeyContextImpl;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.PriorityQueueSetFactory;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSnapshotRestoreWrapper;
import org.apache.flink.runtime.state.metrics.LatencyTrackingStateConfig;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
import org.apache.flink.state.forst.ForStConfigurableOptions;
import org.apache.flink.state.forst.ForStDBTtlCompactFiltersManager;
import org.apache.flink.state.forst.ForStDBWriteBatchWrapper;
import org.apache.flink.state.forst.ForStNativeMetricMonitor;
import org.apache.flink.state.forst.ForStNativeMetricOptions;
import org.apache.flink.state.forst.ForStOperationUtils;
import org.apache.flink.state.forst.ForStResourceContainer;
import org.apache.flink.state.forst.ForStStateBackend;
import org.apache.flink.state.forst.datatransfer.ForStStateDataTransfer;
import org.apache.flink.state.forst.fs.cache.FileBasedCache;
import org.apache.flink.state.forst.restore.ForStHeapTimersFullRestoreOperation;
import org.apache.flink.state.forst.restore.ForStIncrementalRestoreOperation;
import org.apache.flink.state.forst.restore.ForStNoneRestoreOperation;
import org.apache.flink.state.forst.restore.ForStRestoreOperation;
import org.apache.flink.state.forst.restore.ForStRestoreResult;
import org.apache.flink.state.forst.snapshot.ForStIncrementalSnapshotStrategy;
import org.apache.flink.state.forst.snapshot.ForStNativeFullSnapshotStrategy;
import org.apache.flink.state.forst.snapshot.ForStSnapshotStrategyBase;
import org.apache.flink.state.forst.sync.ForStDBPriorityQueueSetFactory;
import org.apache.flink.state.forst.sync.ForStPriorityQueueConfig;
import org.apache.flink.state.forst.sync.ForStSyncKeyedStateBackend;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ResourceGuard;
import org.forstdb.ColumnFamilyHandle;
import org.forstdb.ColumnFamilyOptions;
import org.forstdb.RocksDB;

public class ForStSyncKeyedStateBackendBuilder<K>
extends AbstractKeyedStateBackendBuilder<K> {
    private final String operatorIdentifier;
    private final ForStPriorityQueueConfig priorityQueueConfig;
    private final LocalRecoveryConfig localRecoveryConfig;
    private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
    private final ForStResourceContainer optionsContainer;
    private final MetricGroup metricGroup;
    private final StateBackend.CustomInitializationMetrics customInitializationMetrics;
    private boolean enableIncrementalCheckpointing;
    private ForStNativeMetricOptions nativeMetricOptions;
    private long writeBatchSize = ((MemorySize)ForStConfigurableOptions.WRITE_BATCH_SIZE.defaultValue()).getBytes();
    private RocksDB injectedTestDB;
    private boolean incrementalRestoreAsyncCompactAfterRescale = false;
    private double overlapFractionThreshold = (Double)ForStConfigurableOptions.RESTORE_OVERLAP_FRACTION_THRESHOLD.defaultValue();
    private boolean useIngestDbRestoreMode = (Boolean)ForStConfigurableOptions.USE_INGEST_DB_RESTORE_MODE.defaultValue();
    private boolean rescalingUseDeleteFilesInRange = (Boolean)ForStConfigurableOptions.USE_DELETE_FILES_IN_RANGE_DURING_RESCALING.defaultValue();
    private ColumnFamilyHandle injectedDefaultColumnFamilyHandle;
    private AsyncExceptionHandler asyncExceptionHandler;
    private RecoveryClaimMode recoveryClaimMode;

    public ForStSyncKeyedStateBackendBuilder(String operatorIdentifier, ClassLoader userCodeClassLoader, ForStResourceContainer optionsContainer, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, LocalRecoveryConfig localRecoveryConfig, ForStPriorityQueueConfig priorityQueueConfig, TtlTimeProvider ttlTimeProvider, LatencyTrackingStateConfig latencyTrackingStateConfig, MetricGroup metricGroup, StateBackend.CustomInitializationMetrics customInitializationMetrics, @Nonnull Collection<KeyedStateHandle> stateHandles, StreamCompressionDecorator keyGroupCompressionDecorator, CloseableRegistry cancelStreamRegistry) {
        super(kvStateRegistry, keySerializer, userCodeClassLoader, numberOfKeyGroups, keyGroupRange, executionConfig, ttlTimeProvider, latencyTrackingStateConfig, stateHandles, keyGroupCompressionDecorator, cancelStreamRegistry);
        this.operatorIdentifier = operatorIdentifier;
        this.priorityQueueConfig = priorityQueueConfig;
        this.localRecoveryConfig = localRecoveryConfig;
        this.columnFamilyOptionsFactory = (Function)Preconditions.checkNotNull(columnFamilyOptionsFactory);
        this.optionsContainer = optionsContainer;
        this.metricGroup = metricGroup;
        this.customInitializationMetrics = customInitializationMetrics;
        this.enableIncrementalCheckpointing = false;
        this.nativeMetricOptions = new ForStNativeMetricOptions();
        this.recoveryClaimMode = RecoveryClaimMode.DEFAULT;
    }

    @VisibleForTesting
    ForStSyncKeyedStateBackendBuilder(String operatorIdentifier, ClassLoader userCodeClassLoader, ForStResourceContainer optionsContainer, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, TaskKvStateRegistry kvStateRegistry, TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig, LocalRecoveryConfig localRecoveryConfig, ForStPriorityQueueConfig forStPriorityQueueConfig, TtlTimeProvider ttlTimeProvider, LatencyTrackingStateConfig latencyTrackingStateConfig, MetricGroup metricGroup, @Nonnull Collection<KeyedStateHandle> stateHandles, StreamCompressionDecorator keyGroupCompressionDecorator, RocksDB injectedTestDB, ColumnFamilyHandle injectedDefaultColumnFamilyHandle, CloseableRegistry cancelStreamRegistry) {
        this(operatorIdentifier, userCodeClassLoader, optionsContainer, columnFamilyOptionsFactory, kvStateRegistry, keySerializer, numberOfKeyGroups, keyGroupRange, executionConfig, localRecoveryConfig, forStPriorityQueueConfig, ttlTimeProvider, latencyTrackingStateConfig, metricGroup, (key, value) -> {}, stateHandles, keyGroupCompressionDecorator, cancelStreamRegistry);
        this.injectedTestDB = injectedTestDB;
        this.injectedDefaultColumnFamilyHandle = injectedDefaultColumnFamilyHandle;
    }

    public ForStSyncKeyedStateBackendBuilder<K> setEnableIncrementalCheckpointing(boolean enableIncrementalCheckpointing) {
        this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
        return this;
    }

    public ForStSyncKeyedStateBackendBuilder<K> setNativeMetricOptions(ForStNativeMetricOptions nativeMetricOptions) {
        this.nativeMetricOptions = nativeMetricOptions;
        return this;
    }

    public ForStSyncKeyedStateBackendBuilder<K> setWriteBatchSize(long writeBatchSize) {
        Preconditions.checkArgument((writeBatchSize >= 0L ? 1 : 0) != 0, (Object)"Write batch size should be non negative.");
        this.writeBatchSize = writeBatchSize;
        return this;
    }

    private static void checkAndCreateDirectory(File directory) throws IOException {
        if (directory.exists()) {
            if (!directory.isDirectory()) {
                throw new IOException("Not a directory: " + directory);
            }
        } else if (!directory.mkdirs()) {
            throw new IOException(String.format("Could not create ForSt data directory at %s.", directory));
        }
    }

    public ForStSyncKeyedStateBackend<K> build() throws BackendBuildingException {
        PriorityQueueSetFactory priorityQueueFactory;
        SerializedCompositeKeyBuilder sharedRocksKeyBuilder;
        ForStDBWriteBatchWrapper writeBatchWrapper = null;
        ColumnFamilyHandle defaultColumnFamilyHandle = null;
        ForStNativeMetricMonitor nativeMetricMonitor = null;
        CloseableRegistry cancelRegistryForBackend = new CloseableRegistry();
        LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation = new LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo>();
        LinkedHashMap registeredPQStates = new LinkedHashMap();
        RocksDB db = null;
        ForStRestoreOperation restoreOperation = null;
        CompletableFuture<Void> asyncCompactAfterRestoreFuture = null;
        ForStDBTtlCompactFiltersManager ttlCompactFiltersManager = new ForStDBTtlCompactFiltersManager(this.ttlTimeProvider, this.optionsContainer.getQueryTimeAfterNumEntries(), this.optionsContainer.getPeriodicCompactionTime());
        ForStSnapshotStrategyBase<K, ?> checkpointStrategy = null;
        ResourceGuard forStResourceGuard = new ResourceGuard();
        int keyGroupPrefixBytes = CompositeKeySerializationUtils.computeRequiredBytesInKeyGroupPrefix((int)this.numberOfKeyGroups);
        try {
            FileBasedCache.setFlinkThread();
            UUID backendUID = UUID.randomUUID();
            TreeMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> materializedSstFiles = new TreeMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>>();
            long lastCompletedCheckpointId = -1L;
            this.optionsContainer.prepareDirectories();
            restoreOperation = this.getForStDBRestoreOperation(keyGroupPrefixBytes, kvStateInformation, registeredPQStates, ttlCompactFiltersManager);
            ForStRestoreResult restoreResult = restoreOperation.restore();
            db = restoreResult.getDb();
            defaultColumnFamilyHandle = restoreResult.getDefaultColumnFamilyHandle();
            nativeMetricMonitor = restoreResult.getNativeMetricMonitor();
            writeBatchWrapper = new ForStDBWriteBatchWrapper(db, this.optionsContainer.getWriteOptions(), this.writeBatchSize);
            sharedRocksKeyBuilder = new SerializedCompositeKeyBuilder(this.keySerializerProvider.currentSchemaSerializer(), keyGroupPrefixBytes, 32);
            checkpointStrategy = this.initializeSnapshotStrategy(db, forStResourceGuard, this.keySerializerProvider.currentSchemaSerializer(), kvStateInformation, this.keyGroupRange, keyGroupPrefixBytes, backendUID, materializedSstFiles, lastCompletedCheckpointId);
            priorityQueueFactory = this.initPriorityQueueFactory(keyGroupPrefixBytes, kvStateInformation, db, writeBatchWrapper, nativeMetricMonitor);
        }
        catch (Throwable e) {
            ArrayList<ColumnFamilyOptions> columnFamilyOptions = new ArrayList<ColumnFamilyOptions>(kvStateInformation.values().size());
            IOUtils.closeQuietly((AutoCloseable)cancelRegistryForBackend);
            IOUtils.closeQuietly(writeBatchWrapper);
            IOUtils.closeQuietly((AutoCloseable)forStResourceGuard);
            ForStOperationUtils.addColumnFamilyOptionsToCloseLater(columnFamilyOptions, defaultColumnFamilyHandle);
            IOUtils.closeQuietly(defaultColumnFamilyHandle);
            IOUtils.closeQuietly(nativeMetricMonitor);
            for (ForStOperationUtils.ForStKvStateInfo kvStateInfo : kvStateInformation.values()) {
                ForStOperationUtils.addColumnFamilyOptionsToCloseLater(columnFamilyOptions, kvStateInfo.columnFamilyHandle);
                IOUtils.closeQuietly((AutoCloseable)kvStateInfo.columnFamilyHandle);
            }
            IOUtils.closeQuietly((AutoCloseable)db);
            IOUtils.closeQuietly((AutoCloseable)restoreOperation);
            IOUtils.closeAllQuietly(columnFamilyOptions);
            IOUtils.closeQuietly((AutoCloseable)this.optionsContainer);
            ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
            kvStateInformation.clear();
            try {
                FileUtils.deleteDirectory((File)new File(this.optionsContainer.getBasePath().getPath()));
            }
            catch (Exception ex) {
                this.logger.warn("Failed to delete base path for ForSt: " + this.optionsContainer.getBasePath(), (Throwable)ex);
            }
            if (e instanceof BackendBuildingException) {
                throw (BackendBuildingException)e;
            }
            String errMsg = "Caught unexpected exception.";
            this.logger.error(errMsg, e);
            throw new BackendBuildingException(errMsg, e);
        }
        InternalKeyContextImpl keyContext = new InternalKeyContextImpl(this.keyGroupRange, this.numberOfKeyGroups);
        this.logger.info("Finished building ForSt keyed state-backend at {}.", (Object)this.optionsContainer.getBasePath());
        return new ForStSyncKeyedStateBackend<K>(this.userCodeClassLoader, this.optionsContainer, this.columnFamilyOptionsFactory, this.kvStateRegistry, this.keySerializerProvider.currentSchemaSerializer(), this.executionConfig, this.ttlTimeProvider, this.latencyTrackingStateConfig, db, kvStateInformation, registeredPQStates, keyGroupPrefixBytes, cancelRegistryForBackend, this.keyGroupCompressionDecorator, forStResourceGuard, checkpointStrategy, writeBatchWrapper, defaultColumnFamilyHandle, nativeMetricMonitor, sharedRocksKeyBuilder, priorityQueueFactory, ttlCompactFiltersManager, keyContext, this.writeBatchSize, asyncCompactAfterRestoreFuture);
    }

    public ForStSyncKeyedStateBackendBuilder<K> setOverlapFractionThreshold(double overlapFractionThreshold) {
        this.overlapFractionThreshold = overlapFractionThreshold;
        return this;
    }

    public ForStSyncKeyedStateBackendBuilder<K> setUseIngestDbRestoreMode(boolean useIngestDbRestoreMode) {
        this.useIngestDbRestoreMode = useIngestDbRestoreMode;
        return this;
    }

    public ForStSyncKeyedStateBackendBuilder<K> setRescalingUseDeleteFilesInRange(boolean rescalingUseDeleteFilesInRange) {
        this.rescalingUseDeleteFilesInRange = rescalingUseDeleteFilesInRange;
        return this;
    }

    public ForStSyncKeyedStateBackendBuilder<K> setRecoveryClaimMode(RecoveryClaimMode recoveryClaimMode) {
        this.recoveryClaimMode = recoveryClaimMode;
        return this;
    }

    private ForStSnapshotStrategyBase<K, ?> initializeSnapshotStrategy(@Nonnull RocksDB db, @Nonnull ResourceGuard forstResourceGuard, @Nonnull TypeSerializer<K> keySerializer, @Nonnull LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation, @Nonnull KeyGroupRange keyGroupRange, @Nonnegative int keyGroupPrefixBytes, @Nonnull UUID backendUID, @Nonnull SortedMap<Long, Collection<IncrementalKeyedStateHandle.HandleAndLocalPath>> uploadedStateHandles, long lastCompletedCheckpointId) throws IOException {
        ForStStateDataTransfer stateTransfer = new ForStStateDataTransfer(4, this.optionsContainer.getFileSystem());
        if (this.enableIncrementalCheckpointing) {
            return new ForStIncrementalSnapshotStrategy<K>(db, forstResourceGuard, this.optionsContainer, keySerializer, kvStateInformation, keyGroupRange, keyGroupPrefixBytes, backendUID, uploadedStateHandles, stateTransfer, lastCompletedCheckpointId);
        }
        return new ForStNativeFullSnapshotStrategy<K>(db, forstResourceGuard, this.optionsContainer, keySerializer, kvStateInformation, keyGroupRange, keyGroupPrefixBytes, backendUID, stateTransfer);
    }

    private ForStRestoreOperation getForStDBRestoreOperation(int keyGroupPrefixBytes, LinkedHashMap<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation, LinkedHashMap<String, HeapPriorityQueueSnapshotRestoreWrapper<?>> registeredPQStates, ForStDBTtlCompactFiltersManager ttlCompactFiltersManager) {
        Path instanceForStPath;
        Path path = instanceForStPath = this.optionsContainer.getPathContainer().getRemoteForStPath() == null ? this.optionsContainer.getPathContainer().getLocalForStPath() : new Path("/db");
        if (CollectionUtil.isEmptyOrAllElementsNull((Collection)this.restoreStateHandles)) {
            return new ForStNoneRestoreOperation(Collections.emptyMap(), instanceForStPath, this.optionsContainer.getDbOptions(), this.columnFamilyOptionsFactory, this.nativeMetricOptions, this.metricGroup, ttlCompactFiltersManager, this.writeBatchSize, this.optionsContainer.getWriteBufferManagerCapacity());
        }
        KeyedStateHandle firstStateHandle = (KeyedStateHandle)this.restoreStateHandles.iterator().next();
        if (firstStateHandle instanceof IncrementalRemoteKeyedStateHandle) {
            return new ForStIncrementalRestoreOperation(this.operatorIdentifier, this.keyGroupRange, keyGroupPrefixBytes, this.cancelStreamRegistry, this.userCodeClassLoader, kvStateInformation, this.keySerializerProvider, this.optionsContainer, this.optionsContainer.getBasePath(), instanceForStPath, this.optionsContainer.getDbOptions(), this.columnFamilyOptionsFactory, this.nativeMetricOptions, this.metricGroup, ttlCompactFiltersManager, this.writeBatchSize, this.optionsContainer.getWriteBufferManagerCapacity(), this.customInitializationMetrics, CollectionUtil.checkedSubTypeCast((Collection)this.restoreStateHandles, IncrementalRemoteKeyedStateHandle.class), this.overlapFractionThreshold, this.useIngestDbRestoreMode, this.rescalingUseDeleteFilesInRange, this.recoveryClaimMode);
        }
        if (this.priorityQueueConfig.getPriorityQueueStateType() == ForStStateBackend.PriorityQueueStateType.HEAP) {
            return new ForStHeapTimersFullRestoreOperation(this.keyGroupRange, this.numberOfKeyGroups, this.userCodeClassLoader, kvStateInformation, registeredPQStates, this.createHeapQueueFactory(), this.keySerializerProvider, instanceForStPath, this.optionsContainer.getDbOptions(), this.columnFamilyOptionsFactory, this.nativeMetricOptions, this.metricGroup, ttlCompactFiltersManager, this.writeBatchSize, this.optionsContainer.getWriteBufferManagerCapacity(), this.restoreStateHandles, (ICloseableRegistry)this.cancelStreamRegistry);
        }
        throw new UnsupportedOperationException("Not support restoring yet for ForStStateBackend");
    }

    private PriorityQueueSetFactory initPriorityQueueFactory(int keyGroupPrefixBytes, Map<String, ForStOperationUtils.ForStKvStateInfo> kvStateInformation, RocksDB db, ForStDBWriteBatchWrapper writeBatchWrapper, ForStNativeMetricMonitor nativeMetricMonitor) {
        Object priorityQueueFactory;
        switch (this.priorityQueueConfig.getPriorityQueueStateType()) {
            case HEAP: {
                priorityQueueFactory = this.createHeapQueueFactory();
                break;
            }
            case ForStDB: {
                priorityQueueFactory = new ForStDBPriorityQueueSetFactory(this.keyGroupRange, keyGroupPrefixBytes, this.numberOfKeyGroups, kvStateInformation, db, this.optionsContainer.getReadOptions(), writeBatchWrapper, nativeMetricMonitor, this.columnFamilyOptionsFactory, this.optionsContainer.getWriteBufferManagerCapacity(), this.priorityQueueConfig.getForStDBPriorityQueueSetCacheSize());
                break;
            }
            default: {
                throw new IllegalArgumentException("Unknown priority queue state type: " + this.priorityQueueConfig.getPriorityQueueStateType());
            }
        }
        return priorityQueueFactory;
    }

    private HeapPriorityQueueSetFactory createHeapQueueFactory() {
        return new HeapPriorityQueueSetFactory(this.keyGroupRange, this.numberOfKeyGroups, 128);
    }
}

