package org.apache.flink.streaming.api.operators.sorted.state;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.RunnableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.Keyed;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.PriorityComparable;
import org.apache.flink.runtime.state.PriorityComparator;
import org.apache.flink.runtime.state.SavepointResources;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueElement;
import org.apache.flink.runtime.state.internal.InternalKvState;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend.class */
public class BatchExecutionKeyedStateBackend<K> implements CheckpointableKeyedStateBackend<K> {
    private static final Logger LOG = LoggerFactory.getLogger(BatchExecutionKeyedStateBackend.class);
    private static final Map<StateDescriptor.Type, StateFactory> STATE_FACTORIES = (Map) Stream.of((Object[]) new Tuple2[]{Tuple2.of(StateDescriptor.Type.VALUE, BatchExecutionKeyValueState::create), Tuple2.of(StateDescriptor.Type.LIST, BatchExecutionKeyListState::create), Tuple2.of(StateDescriptor.Type.MAP, BatchExecutionKeyMapState::create), Tuple2.of(StateDescriptor.Type.AGGREGATING, BatchExecutionKeyAggregatingState::create), Tuple2.of(StateDescriptor.Type.REDUCING, BatchExecutionKeyReducingState::create)}).collect(Collectors.toMap(tuple2 -> {
        return (StateDescriptor.Type) tuple2.f0;
    }, tuple22 -> {
        return (StateFactory) tuple22.f1;
    }));
    private final TypeSerializer<K> keySerializer;
    private final KeyGroupRange keyGroupRange;
    private final ExecutionConfig executionConfig;
    private K currentKey = null;
    private final List<KeyedStateBackend.KeySelectionListener<K>> keySelectionListeners = new ArrayList();
    private final Map<String, State> states = new HashMap();
    private final Map<String, KeyGroupedInternalPriorityQueue<?>> priorityQueues = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/flink/streaming/api/operators/sorted/state/BatchExecutionKeyedStateBackend$StateFactory.class */
    public interface StateFactory {
        /* JADX WARN: Incorrect return type in method signature: <T:Ljava/lang/Object;K:Ljava/lang/Object;N:Ljava/lang/Object;SV:Ljava/lang/Object;S::Lorg/apache/flink/api/common/state/State;IS:TS;>(Lorg/apache/flink/api/common/typeutils/TypeSerializer<TK;>;Lorg/apache/flink/api/common/typeutils/TypeSerializer<TN;>;Lorg/apache/flink/api/common/state/StateDescriptor<TS;TSV;>;)TIS; */
        State createState(TypeSerializer typeSerializer, TypeSerializer typeSerializer2, StateDescriptor stateDescriptor) throws Exception;
    }

    public BatchExecutionKeyedStateBackend(TypeSerializer<K> typeSerializer, KeyGroupRange keyGroupRange, ExecutionConfig executionConfig) {
        this.keySerializer = typeSerializer;
        this.keyGroupRange = keyGroupRange;
        this.executionConfig = executionConfig;
    }

    public void setCurrentKey(K k) {
        if (Objects.equals(k, this.currentKey)) {
            return;
        }
        notifyKeySelected(k);
        Iterator<State> it = this.states.values().iterator();
        while (it.hasNext()) {
            ((State) it.next()).clearAllNamespaces();
        }
        Iterator<KeyGroupedInternalPriorityQueue<?>> it2 = this.priorityQueues.values().iterator();
        while (it2.hasNext()) {
            do {
            } while (it2.next().poll() != null);
        }
        this.currentKey = k;
    }

    public K getCurrentKey() {
        return this.currentKey;
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.keySerializer;
    }

    public <N, S extends State, T> void applyToAllKeys(N n, TypeSerializer<N> typeSerializer, StateDescriptor<S, T> stateDescriptor, KeyedStateFunction<K, S> keyedStateFunction) {
        LOG.debug("Not iterating over all keyed in BATCH execution mode in applyToAllKeys().");
    }

    public <N> Stream<K> getKeys(String str, N n) {
        LOG.debug("Returning an empty stream in BATCH execution mode in getKeys().");
        return Stream.empty();
    }

    public <N> Stream<Tuple2<K, N>> getKeysAndNamespaces(String str) {
        LOG.debug("Returning an empty stream in BATCH execution mode in getKeysAndNamespaces().");
        return Stream.empty();
    }

    public <N, S extends State, T> S getOrCreateKeyedState(TypeSerializer<N> typeSerializer, StateDescriptor<S, T> stateDescriptor) throws Exception {
        Preconditions.checkNotNull(typeSerializer, "Namespace serializer");
        Preconditions.checkNotNull(this.keySerializer, "State key serializer has not been configured in the config. This operation cannot use partitioned state.");
        if (!stateDescriptor.isSerializerInitialized()) {
            stateDescriptor.initializeSerializerUnlessSet(this.executionConfig);
        }
        State state = this.states.get(stateDescriptor.getName());
        if (state == null) {
            state = createState(typeSerializer, stateDescriptor);
            this.states.put(stateDescriptor.getName(), state);
        }
        return (S) state;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <N, S extends State> S getPartitionedState(N n, TypeSerializer<N> typeSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        InternalKvState orCreateKeyedState = getOrCreateKeyedState(typeSerializer, stateDescriptor);
        orCreateKeyedState.setCurrentNamespace(n);
        return orCreateKeyedState;
    }

    public void dispose() {
    }

    private void notifyKeySelected(K k) {
        Iterator<KeyedStateBackend.KeySelectionListener<K>> it = this.keySelectionListeners.iterator();
        while (it.hasNext()) {
            it.next().keySelected(k);
        }
    }

    public void registerKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> keySelectionListener) {
        this.keySelectionListeners.add(keySelectionListener);
    }

    public boolean deregisterKeySelectionListener(KeyedStateBackend.KeySelectionListener<K> keySelectionListener) {
        return this.keySelectionListeners.remove(keySelectionListener);
    }

    /* JADX WARN: Incorrect return type in method signature: <N:Ljava/lang/Object;SV:Ljava/lang/Object;SEV:Ljava/lang/Object;S::Lorg/apache/flink/api/common/state/State;IS:TS;>(Lorg/apache/flink/api/common/typeutils/TypeSerializer<TN;>;Lorg/apache/flink/api/common/state/StateDescriptor<TS;TSV;>;Lorg/apache/flink/runtime/state/StateSnapshotTransformer$StateSnapshotTransformFactory<TSEV;>;)TIS; */
    @Nonnull
    public State createOrUpdateInternalState(@Nonnull TypeSerializer typeSerializer, @Nonnull StateDescriptor stateDescriptor, @Nonnull StateSnapshotTransformer.StateSnapshotTransformFactory stateSnapshotTransformFactory) throws Exception {
        return createState(typeSerializer, stateDescriptor);
    }

    /* JADX WARN: Incorrect return type in method signature: <N:Ljava/lang/Object;SV:Ljava/lang/Object;S::Lorg/apache/flink/api/common/state/State;IS:TS;>(Lorg/apache/flink/api/common/typeutils/TypeSerializer<TN;>;Lorg/apache/flink/api/common/state/StateDescriptor<TS;TSV;>;)TIS; */
    private State createState(@Nonnull TypeSerializer typeSerializer, @Nonnull StateDescriptor stateDescriptor) throws Exception {
        StateFactory stateFactory = STATE_FACTORIES.get(stateDescriptor.getType());
        if (stateFactory == null) {
            throw new FlinkRuntimeException(String.format("State %s is not supported by %s", stateDescriptor.getClass(), getClass()));
        }
        return stateFactory.createState(this.keySerializer, typeSerializer, stateDescriptor);
    }

    @Nonnull
    public <T extends HeapPriorityQueueElement & PriorityComparable<? super T> & Keyed<?>> KeyGroupedInternalPriorityQueue<T> create(@Nonnull String str, @Nonnull TypeSerializer<T> typeSerializer) {
        KeyGroupedInternalPriorityQueue<?> keyGroupedInternalPriorityQueue = this.priorityQueues.get(str);
        if (keyGroupedInternalPriorityQueue == null) {
            keyGroupedInternalPriorityQueue = new BatchExecutionInternalPriorityQueueSet(PriorityComparator.forPriorityComparableObjects(), StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM);
            this.priorityQueues.put(str, keyGroupedInternalPriorityQueue);
        }
        return (KeyGroupedInternalPriorityQueue<T>) keyGroupedInternalPriorityQueue;
    }

    public KeyGroupRange getKeyGroupRange() {
        return this.keyGroupRange;
    }

    public void close() throws IOException {
    }

    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot(long j, long j2, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) {
        throw new UnsupportedOperationException("Snapshotting is not supported in BATCH runtime mode.");
    }

    @Nonnull
    public SavepointResources<K> savepoint() throws Exception {
        throw new UnsupportedOperationException("Savepoints are not supported in BATCH runtime mode.");
    }
}
