/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Closeable;
import java.io.IOException;
import java.util.Optional;
import java.util.OptionalLong;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.SerializerFactory;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.DefaultKeyedStateStore;
import org.apache.flink.runtime.state.FullSnapshotResources;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SavepointSnapshotStrategy;
import org.apache.flink.runtime.state.SnapshotStrategyRunner;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.shaded.guava32.com.google.common.io.Closer;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
import org.apache.flink.util.CloseableIterable;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class StreamOperatorStateHandler {
    protected static final Logger LOG = LoggerFactory.getLogger(StreamOperatorStateHandler.class);
    @Nullable
    private final TypeSerializer<?> keySerializer;
    @Nullable
    private final AsyncKeyedStateBackend<?> asyncKeyedStateBackend;
    @Nullable
    private final CheckpointableKeyedStateBackend<?> keyedStateBackend;
    private final CloseableRegistry closeableRegistry;
    @Nullable
    private final DefaultKeyedStateStore keyedStateStore;
    private final OperatorStateBackend operatorStateBackend;
    private final StreamOperatorStateContext context;

    public StreamOperatorStateHandler(StreamOperatorStateContext context, final ExecutionConfig executionConfig, CloseableRegistry closeableRegistry) {
        this.context = context;
        this.keySerializer = context.keySerializer();
        this.operatorStateBackend = context.operatorStateBackend();
        this.keyedStateBackend = context.keyedStateBackend();
        this.asyncKeyedStateBackend = context.asyncKeyedStateBackend();
        this.closeableRegistry = closeableRegistry;
        this.keyedStateStore = this.keyedStateBackend != null || this.asyncKeyedStateBackend != null ? new DefaultKeyedStateStore(this.keyedStateBackend, this.asyncKeyedStateBackend, new SerializerFactory(){

            public <T> TypeSerializer<T> createSerializer(TypeInformation<T> typeInformation) {
                return typeInformation.createSerializer(executionConfig.getSerializerConfig());
            }
        }) : null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void initializeOperatorState(CheckpointedStreamOperator streamOperator) throws Exception {
        CloseableIterable<KeyGroupStatePartitionStreamProvider> keyedStateInputs = this.context.rawKeyedStateInputs();
        CloseableIterable<StatePartitionStreamProvider> operatorStateInputs = this.context.rawOperatorStateInputs();
        try {
            OptionalLong checkpointId = this.context.getRestoredCheckpointId();
            StateInitializationContextImpl initializationContext = new StateInitializationContextImpl(checkpointId.isPresent() ? Long.valueOf(checkpointId.getAsLong()) : null, this.operatorStateBackend, this.keyedStateStore, (Iterable<KeyGroupStatePartitionStreamProvider>)keyedStateInputs, (Iterable<StatePartitionStreamProvider>)operatorStateInputs);
            streamOperator.initializeState(initializationContext);
        }
        finally {
            StreamOperatorStateHandler.closeFromRegistry(operatorStateInputs, this.closeableRegistry);
            StreamOperatorStateHandler.closeFromRegistry(keyedStateInputs, this.closeableRegistry);
        }
    }

    private static void closeFromRegistry(Closeable closeable, CloseableRegistry registry) {
        if (registry.unregisterCloseable((AutoCloseable)closeable)) {
            IOUtils.closeQuietly((AutoCloseable)closeable);
        }
    }

    public void dispose() throws Exception {
        try (Closer closer = Closer.create();){
            if (this.closeableRegistry.unregisterCloseable((AutoCloseable)this.operatorStateBackend)) {
                closer.register((Closeable)this.operatorStateBackend);
            }
            if (this.closeableRegistry.unregisterCloseable(this.keyedStateBackend)) {
                closer.register(this.keyedStateBackend);
            }
            if (this.closeableRegistry.unregisterCloseable(this.asyncKeyedStateBackend)) {
                closer.register(this.asyncKeyedStateBackend);
            }
            if (this.operatorStateBackend != null) {
                closer.register(this.operatorStateBackend::dispose);
            }
            if (this.keyedStateBackend != null) {
                closer.register(this.keyedStateBackend::dispose);
            }
            if (this.asyncKeyedStateBackend != null) {
                closer.register(this.asyncKeyedStateBackend::dispose);
            }
        }
    }

    public OperatorSnapshotFutures snapshotState(CheckpointedStreamOperator streamOperator, Optional<InternalTimeServiceManager<?>> timeServiceManager, String operatorName, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory, boolean isUsingCustomRawKeyedState, boolean useAsyncState) throws CheckpointException {
        KeyGroupRange keyGroupRange = KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
        if (this.keyedStateBackend != null) {
            keyGroupRange = this.keyedStateBackend.getKeyGroupRange();
        } else if (this.asyncKeyedStateBackend != null) {
            keyGroupRange = this.asyncKeyedStateBackend.getKeyGroupRange();
        }
        OperatorSnapshotFutures snapshotInProgress = new OperatorSnapshotFutures();
        StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(checkpointId, timestamp, factory, keyGroupRange, this.closeableRegistry);
        this.snapshotState(streamOperator, timeServiceManager, operatorName, checkpointId, timestamp, checkpointOptions, factory, snapshotInProgress, snapshotContext, isUsingCustomRawKeyedState, useAsyncState);
        return snapshotInProgress;
    }

    @VisibleForTesting
    void snapshotState(CheckpointedStreamOperator streamOperator, Optional<InternalTimeServiceManager<?>> timeServiceManager, String operatorName, long checkpointId, long timestamp, CheckpointOptions checkpointOptions, CheckpointStreamFactory factory, OperatorSnapshotFutures snapshotInProgress, StateSnapshotContextSynchronousImpl snapshotContext, boolean isUsingCustomRawKeyedState, boolean useAsyncState) throws CheckpointException {
        try {
            if (timeServiceManager.isPresent()) {
                boolean requiresLegacyRawKeyedStateSnapshots;
                InternalTimeServiceManager<?> manager;
                if (useAsyncState) {
                    Preconditions.checkState((this.asyncKeyedStateBackend != null ? 1 : 0) != 0, (Object)"keyedStateBackend should be available with timeServiceManager");
                    manager = timeServiceManager.get();
                    requiresLegacyRawKeyedStateSnapshots = this.asyncKeyedStateBackend.requiresLegacySynchronousTimerSnapshots(checkpointOptions.getCheckpointType());
                } else {
                    Preconditions.checkState((this.keyedStateBackend != null ? 1 : 0) != 0, (Object)"keyedStateBackend should be available with timeServiceManager");
                    manager = timeServiceManager.get();
                    boolean bl = requiresLegacyRawKeyedStateSnapshots = this.keyedStateBackend instanceof AbstractKeyedStateBackend && ((AbstractKeyedStateBackend)this.keyedStateBackend).requiresLegacySynchronousTimerSnapshots(checkpointOptions.getCheckpointType());
                }
                if (requiresLegacyRawKeyedStateSnapshots) {
                    Preconditions.checkState((!isUsingCustomRawKeyedState ? 1 : 0) != 0, (Object)"Attempting to snapshot timers to raw keyed state, but this operator has custom raw keyed state to write.");
                    manager.snapshotToRawKeyedState(snapshotContext.getRawKeyedOperatorStateOutput(), operatorName);
                }
            }
            streamOperator.snapshotState(snapshotContext);
            snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
            snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
            if (null != this.operatorStateBackend) {
                snapshotInProgress.setOperatorStateManagedFuture(this.operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
            if (useAsyncState && null != this.asyncKeyedStateBackend) {
                if (this.isCanonicalSavepoint(checkpointOptions.getCheckpointType())) {
                    throw new UnsupportedOperationException("Not supported yet.");
                }
                snapshotInProgress.setKeyedStateManagedFuture(this.asyncKeyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
            }
            if (!useAsyncState && null != this.keyedStateBackend) {
                if (this.isCanonicalSavepoint(checkpointOptions.getCheckpointType())) {
                    SnapshotStrategyRunner<KeyedStateHandle, FullSnapshotResources<?>> snapshotRunner = StreamOperatorStateHandler.prepareCanonicalSavepoint(this.keyedStateBackend, this.closeableRegistry);
                    snapshotInProgress.setKeyedStateManagedFuture(snapshotRunner.snapshot(checkpointId, timestamp, factory, checkpointOptions));
                } else {
                    snapshotInProgress.setKeyedStateManagedFuture(this.keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));
                }
            }
        }
        catch (Exception snapshotException) {
            try {
                snapshotInProgress.cancel();
            }
            catch (Exception e) {
                snapshotException.addSuppressed(e);
            }
            String snapshotFailMessage = "Could not complete snapshot " + checkpointId + " for operator " + operatorName + ".";
            try {
                snapshotContext.closeExceptionally();
            }
            catch (IOException e) {
                snapshotException.addSuppressed(e);
            }
            throw new CheckpointException(snapshotFailMessage, CheckpointFailureReason.CHECKPOINT_DECLINED, snapshotException);
        }
    }

    private boolean isCanonicalSavepoint(SnapshotType snapshotType) {
        return snapshotType.isSavepoint() && ((SavepointType)snapshotType).getFormatType() == SavepointFormatType.CANONICAL;
    }

    @Nonnull
    public static SnapshotStrategyRunner<KeyedStateHandle, ? extends FullSnapshotResources<?>> prepareCanonicalSavepoint(CheckpointableKeyedStateBackend<?> keyedStateBackend, CloseableRegistry closeableRegistry) throws Exception {
        SavepointResources<?> savepointResources = keyedStateBackend.savepoint();
        SavepointSnapshotStrategy savepointSnapshotStrategy = new SavepointSnapshotStrategy(savepointResources.getSnapshotResources());
        return new SnapshotStrategyRunner("Asynchronous full Savepoint", savepointSnapshotStrategy, closeableRegistry, savepointResources.getPreferredSnapshotExecutionType());
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        if (this.keyedStateBackend instanceof CheckpointListener) {
            ((CheckpointListener)this.keyedStateBackend).notifyCheckpointComplete(checkpointId);
        }
        if (this.asyncKeyedStateBackend != null) {
            this.asyncKeyedStateBackend.notifyCheckpointComplete(checkpointId);
        }
    }

    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        if (this.keyedStateBackend instanceof CheckpointListener) {
            ((CheckpointListener)this.keyedStateBackend).notifyCheckpointAborted(checkpointId);
        }
        if (this.asyncKeyedStateBackend != null) {
            this.asyncKeyedStateBackend.notifyCheckpointAborted(checkpointId);
        }
    }

    public <K> TypeSerializer<K> getKeySerializer() {
        return this.keySerializer;
    }

    public <K> KeyedStateBackend<K> getKeyedStateBackend() {
        return this.keyedStateBackend;
    }

    @Nullable
    public <K> AsyncKeyedStateBackend<K> getAsyncKeyedStateBackend() {
        return this.asyncKeyedStateBackend;
    }

    public OperatorStateBackend getOperatorStateBackend() {
        return this.operatorStateBackend;
    }

    public <N, S extends State, T> S getOrCreateKeyedState(TypeSerializer<N> namespaceSerializer, StateDescriptor<S, T> stateDescriptor) throws Exception {
        if (this.keyedStateBackend != null) {
            return this.keyedStateBackend.getOrCreateKeyedState(namespaceSerializer, stateDescriptor);
        }
        throw new IllegalStateException("Cannot create partitioned state. The keyed state backend has not been set.This indicates that the operator is not partitioned/keyed.");
    }

    public <N, S extends org.apache.flink.api.common.state.v2.State, T> S getOrCreateKeyedState(N defaultNamespace, TypeSerializer<N> namespaceSerializer, org.apache.flink.api.common.state.v2.StateDescriptor<T> stateDescriptor) throws Exception {
        if (this.asyncKeyedStateBackend != null) {
            return this.asyncKeyedStateBackend.getOrCreateKeyedState(defaultNamespace, namespaceSerializer, stateDescriptor);
        }
        throw new IllegalStateException("Cannot create partitioned state. The keyed state backend has not been set.This indicates that the operator is not partitioned/keyed.");
    }

    protected <S extends State, N> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        if (this.keyedStateBackend != null) {
            return this.keyedStateBackend.getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
        }
        throw new RuntimeException("Cannot create partitioned state. The keyed state backend has not been set. This indicates that the operator is not partitioned/keyed.");
    }

    public void setCurrentKey(Object key) {
        if (this.keyedStateBackend != null) {
            try {
                CheckpointableKeyedStateBackend<?> rawBackend = this.keyedStateBackend;
                rawBackend.setCurrentKey(key);
            }
            catch (Exception e) {
                throw new RuntimeException("Exception occurred while setting the current key context.", e);
            }
        }
    }

    public Object getCurrentKey() {
        if (this.keyedStateBackend != null) {
            return this.keyedStateBackend.getCurrentKey();
        }
        throw new UnsupportedOperationException("Key can only be retrieved on KeyedStream.");
    }

    public InternalTimeServiceManager<?> getAsyncInternalTimerServiceManager() {
        return this.context.asyncInternalTimerServiceManager();
    }

    public Optional<KeyedStateStore> getKeyedStateStore() {
        return Optional.ofNullable(this.keyedStateStore);
    }

    public static interface CheckpointedStreamOperator {
        public void initializeState(StateInitializationContext var1) throws Exception;

        public void snapshotState(StateSnapshotContext var1) throws Exception;
    }
}

