/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst;

import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.asyncprocessing.StateExecutor;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.v2.StateDescriptor;
import org.apache.flink.runtime.state.v2.ValueStateDescriptor;
import org.apache.flink.state.forst.ForStNativeMetricMonitor;
import org.apache.flink.state.forst.ForStOperationUtils;
import org.apache.flink.state.forst.ForStResourceContainer;
import org.apache.flink.state.forst.ForStStateExecutor;
import org.apache.flink.state.forst.ForStValueState;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.RocksDB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ForStKeyedStateBackend<K>
implements AsyncKeyedStateBackend {
    private static final Logger LOG = LoggerFactory.getLogger(ForStKeyedStateBackend.class);
    protected final TypeSerializer<K> keySerializer;
    private final Supplier<SerializedCompositeKeyBuilder<K>> serializedKeyBuilder;
    private final Supplier<DataOutputSerializer> valueSerializerView;
    private final Supplier<DataInputDeserializer> valueDeserializerView;
    private final ForStResourceContainer optionsContainer;
    private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
    private final ColumnFamilyHandle defaultColumnFamily;
    private final ForStNativeMetricMonitor nativeMetricMonitor;
    protected final RocksDB db;
    private StateRequestHandler stateRequestHandler;
    private final Object lock = new Object();
    @GuardedBy(value="lock")
    private final Set<StateExecutor> managedStateExecutors;
    @GuardedBy(value="lock")
    private boolean closed = false;
    private boolean disposed = false;

    public ForStKeyedStateBackend(ForStResourceContainer optionsContainer, TypeSerializer<K> keySerializer, Supplier<SerializedCompositeKeyBuilder<K>> serializedKeyBuilder, Supplier<DataOutputSerializer> valueSerializerView, Supplier<DataInputDeserializer> valueDeserializerView, RocksDB db, Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, ColumnFamilyHandle defaultColumnFamilyHandle, ForStNativeMetricMonitor nativeMetricMonitor) {
        this.optionsContainer = (ForStResourceContainer)Preconditions.checkNotNull((Object)optionsContainer);
        this.keySerializer = keySerializer;
        this.serializedKeyBuilder = serializedKeyBuilder;
        this.valueSerializerView = valueSerializerView;
        this.valueDeserializerView = valueDeserializerView;
        this.db = db;
        this.columnFamilyOptionsFactory = columnFamilyOptionsFactory;
        this.defaultColumnFamily = defaultColumnFamilyHandle;
        this.nativeMetricMonitor = nativeMetricMonitor;
        this.managedStateExecutors = new HashSet<StateExecutor>(1);
    }

    public void setup(@Nonnull StateRequestHandler stateRequestHandler) {
        this.stateRequestHandler = stateRequestHandler;
    }

    @Nonnull
    public <SV, S extends State> S createState(@Nonnull StateDescriptor<SV> stateDesc) {
        Preconditions.checkNotNull((Object)this.stateRequestHandler, (String)"A non-null stateRequestHandler must be setup before createState");
        ColumnFamilyHandle columnFamilyHandle = ForStOperationUtils.createColumnFamilyHandle(stateDesc.getStateId(), this.db, this.columnFamilyOptionsFactory);
        if (stateDesc.getType() == StateDescriptor.Type.VALUE) {
            return (S)new ForStValueState(this.stateRequestHandler, columnFamilyHandle, (ValueStateDescriptor)stateDesc, this.serializedKeyBuilder, this.valueSerializerView, this.valueDeserializerView);
        }
        throw new UnsupportedOperationException(String.format("Unsupported state type: %s", stateDesc.getType()));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nonnull
    public StateExecutor createStateExecutor() {
        Object object = this.lock;
        synchronized (object) {
            if (this.closed) {
                throw new FlinkRuntimeException("Attempt to create StateExecutor after ForStKeyedStateBackend is disposed.");
            }
            ForStStateExecutor stateExecutor = new ForStStateExecutor(4, this.db, this.optionsContainer.getWriteOptions());
            this.managedStateExecutors.add(stateExecutor);
            return stateExecutor;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void dispose() {
        if (this.disposed) {
            return;
        }
        Object object = this.lock;
        synchronized (object) {
            if (!this.closed) {
                IOUtils.closeQuietly((AutoCloseable)((Object)this));
            }
        }
        if (this.db != null) {
            if (this.nativeMetricMonitor != null) {
                this.nativeMetricMonitor.close();
            }
            IOUtils.closeQuietly((AutoCloseable)this.defaultColumnFamily);
            IOUtils.closeQuietly((AutoCloseable)this.db);
            LOG.info("Closed ForSt State Backend. Cleaning up ForSt local working directory {}, remote working directory {}.", (Object)this.optionsContainer.getLocalBasePath(), (Object)this.optionsContainer.getRemoteBasePath());
            try {
                this.optionsContainer.clearDirectories();
            }
            catch (Exception ex) {
                LOG.warn("Could not delete ForSt local working directory {}, remote working directory {}.", new Object[]{this.optionsContainer.getLocalBasePath(), this.optionsContainer.getRemoteBasePath(), ex});
            }
            IOUtils.closeQuietly((AutoCloseable)this.optionsContainer);
        }
        this.disposed = true;
    }

    @VisibleForTesting
    File getLocalBasePath() {
        return this.optionsContainer.getLocalBasePath();
    }

    @VisibleForTesting
    Path getRemoteBasePath() {
        return this.optionsContainer.getRemoteBasePath();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws IOException {
        Object object = this.lock;
        synchronized (object) {
            if (this.closed) {
                return;
            }
            this.closed = true;
            for (StateExecutor executor : this.managedStateExecutors) {
                executor.shutdown();
            }
        }
    }
}

