/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetricsBuilder;
import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
import org.apache.flink.runtime.checkpoint.filemerging.SubtaskFileMergingManagerRestoreOperation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.AsyncKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackendParametersImpl;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.state.v2.adaptor.AsyncKeyedStateBackendAdaptor;
import org.apache.flink.runtime.util.OperatorSubtaskDescriptionText;
import org.apache.flink.streaming.api.operators.BackendRestorerProcedure;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.StreamOperatorStateContext;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTaskCancellationContext;
import org.apache.flink.util.CloseableIterable;
import org.apache.flink.util.Disposable;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.clock.SystemClock;

public class StreamTaskStateInitializerImpl
implements StreamTaskStateInitializer {
    private final Environment environment;
    private final TaskStateManager taskStateManager;
    private final StateBackend stateBackend;
    private final SubTaskInitializationMetricsBuilder initializationMetrics;
    private final TtlTimeProvider ttlTimeProvider;
    private final InternalTimeServiceManager.Provider timeServiceManagerProvider;
    private final StreamTaskCancellationContext cancellationContext;

    public StreamTaskStateInitializerImpl(Environment environment, StateBackend stateBackend) {
        this(environment, stateBackend, new SubTaskInitializationMetricsBuilder(SystemClock.getInstance().absoluteTimeMillis()), TtlTimeProvider.DEFAULT, InternalTimeServiceManagerImpl::create, StreamTaskCancellationContext.alwaysRunning());
    }

    public StreamTaskStateInitializerImpl(Environment environment, StateBackend stateBackend, SubTaskInitializationMetricsBuilder initializationMetrics, TtlTimeProvider ttlTimeProvider, InternalTimeServiceManager.Provider timeServiceManagerProvider, StreamTaskCancellationContext cancellationContext) {
        this.environment = environment;
        this.taskStateManager = (TaskStateManager)Preconditions.checkNotNull((Object)environment.getTaskStateManager());
        this.stateBackend = (StateBackend)Preconditions.checkNotNull((Object)stateBackend);
        this.initializationMetrics = initializationMetrics;
        this.ttlTimeProvider = ttlTimeProvider;
        this.timeServiceManagerProvider = (InternalTimeServiceManager.Provider)Preconditions.checkNotNull((Object)timeServiceManagerProvider);
        this.cancellationContext = cancellationContext;
    }

    @Override
    public StreamOperatorStateContext streamOperatorStateContext(@Nonnull OperatorID operatorID, @Nonnull String operatorClassName, @Nonnull ProcessingTimeService processingTimeService, @Nonnull KeyContext keyContext, @Nullable TypeSerializer<?> keySerializer, @Nonnull CloseableRegistry streamTaskCloseableRegistry, @Nonnull MetricGroup metricGroup, double managedMemoryFraction, boolean isUsingCustomRawKeyedState, boolean isAsyncState) throws Exception {
        TaskInfo taskInfo = this.environment.getTaskInfo();
        this.registerRestoredStateToFileMergingManager(this.environment.getJobID(), taskInfo, operatorID);
        OperatorSubtaskDescriptionText operatorSubtaskDescription = new OperatorSubtaskDescriptionText(operatorID, operatorClassName, taskInfo.getIndexOfThisSubtask(), taskInfo.getNumberOfParallelSubtasks());
        String operatorIdentifierText = operatorSubtaskDescription.toString();
        PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates = this.taskStateManager.prioritizedOperatorState(operatorID);
        KeyedStateBackend keyedStatedBackend = null;
        AsyncKeyedStateBackend asyncKeyedStateBackend = null;
        OperatorStateBackend operatorStateBackend = null;
        CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs = null;
        CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs = null;
        InternalTimeServiceManager timeServiceManager = null;
        InternalTimeServiceManager asyncTimeServiceManager = null;
        StateObject.StateObjectSizeStatsCollector statsCollector = StateObject.StateObjectSizeStatsCollector.create();
        try {
            List<KeyGroupStatePartitionStreamProvider> restoredRawKeyedStateTimers;
            if (isAsyncState) {
                asyncKeyedStateBackend = this.stateBackend.supportsAsyncKeyedStateBackend() ? this.keyedStatedBackend(keySerializer, operatorIdentifierText, prioritizedOperatorSubtaskStates, streamTaskCloseableRegistry, metricGroup, managedMemoryFraction, statsCollector, StateBackend::createAsyncKeyedStateBackend) : new AsyncKeyedStateBackendAdaptor(this.keyedStatedBackend(keySerializer, operatorIdentifierText, prioritizedOperatorSubtaskStates, streamTaskCloseableRegistry, metricGroup, managedMemoryFraction, statsCollector, StateBackend::createKeyedStateBackend));
            } else {
                keyedStatedBackend = this.keyedStatedBackend(keySerializer, operatorIdentifierText, prioritizedOperatorSubtaskStates, streamTaskCloseableRegistry, metricGroup, managedMemoryFraction, statsCollector, StateBackend::createKeyedStateBackend);
            }
            operatorStateBackend = this.operatorStateBackend(operatorIdentifierText, prioritizedOperatorSubtaskStates, streamTaskCloseableRegistry, statsCollector);
            rawKeyedStateInputs = this.rawKeyedStateInputs(prioritizedOperatorSubtaskStates.getPrioritizedRawKeyedState().iterator(), statsCollector);
            streamTaskCloseableRegistry.registerCloseable(rawKeyedStateInputs);
            rawOperatorStateInputs = this.rawOperatorStateInputs(prioritizedOperatorSubtaskStates.getPrioritizedRawOperatorState().iterator(), statsCollector);
            streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
            List<KeyGroupStatePartitionStreamProvider> list = restoredRawKeyedStateTimers = prioritizedOperatorSubtaskStates.isRestored() && !isUsingCustomRawKeyedState ? rawKeyedStateInputs : Collections.emptyList();
            if (isAsyncState) {
                if (asyncKeyedStateBackend != null) {
                    asyncTimeServiceManager = this.timeServiceManagerProvider.create(this.environment.getMetricGroup().getIOMetricGroup(), asyncKeyedStateBackend, asyncKeyedStateBackend.getKeyGroupRange(), this.environment.getUserCodeClassLoader().asClassLoader(), keyContext, processingTimeService, restoredRawKeyedStateTimers, this.cancellationContext);
                }
            } else if (keyedStatedBackend != null) {
                timeServiceManager = this.timeServiceManagerProvider.create(this.environment.getMetricGroup().getIOMetricGroup(), keyedStatedBackend, keyedStatedBackend.getKeyGroupRange(), this.environment.getUserCodeClassLoader().asClassLoader(), keyContext, processingTimeService, restoredRawKeyedStateTimers, this.cancellationContext);
            }
            Stream.concat(prioritizedOperatorSubtaskStates.getPrioritizedInputChannelState().stream(), prioritizedOperatorSubtaskStates.getPrioritizedResultSubpartitionState().stream()).filter(Objects::nonNull).forEach(channelHandle -> channelHandle.collectSizeStats(statsCollector));
            statsCollector.getStats().forEach((location, metricValue) -> this.initializationMetrics.addDurationMetric("RestoredStateSizeBytes." + location, (long)metricValue));
            return new StreamOperatorStateContextImpl(prioritizedOperatorSubtaskStates.getRestoredCheckpointId(), operatorStateBackend, keySerializer, (CheckpointableKeyedStateBackend<?>)keyedStatedBackend, asyncKeyedStateBackend, timeServiceManager, asyncTimeServiceManager, rawOperatorStateInputs, rawKeyedStateInputs);
        }
        catch (Exception ex) {
            if (keyedStatedBackend != null) {
                if (streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) {
                    IOUtils.closeQuietly(keyedStatedBackend);
                }
                keyedStatedBackend.dispose();
            }
            if (asyncKeyedStateBackend != null) {
                if (streamTaskCloseableRegistry.unregisterCloseable(asyncKeyedStateBackend)) {
                    IOUtils.closeQuietly(asyncKeyedStateBackend);
                }
                asyncKeyedStateBackend.dispose();
            }
            if (operatorStateBackend != null) {
                if (streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
                    IOUtils.closeQuietly(operatorStateBackend);
                }
                operatorStateBackend.dispose();
            }
            if (streamTaskCloseableRegistry.unregisterCloseable(rawKeyedStateInputs)) {
                IOUtils.closeQuietly(rawKeyedStateInputs);
            }
            if (streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
                IOUtils.closeQuietly(rawOperatorStateInputs);
            }
            throw new Exception("Exception while creating StreamOperatorStateContext.", ex);
        }
    }

    private void registerRestoredStateToFileMergingManager(JobID jobID, TaskInfo taskInfo, OperatorID operatorID) {
        FileMergingSnapshotManager fileMergingSnapshotManager = this.taskStateManager.getFileMergingSnapshotManager();
        Optional<Long> restoredCheckpointId = this.taskStateManager.getRestoreCheckpointId();
        if (fileMergingSnapshotManager == null || !restoredCheckpointId.isPresent()) {
            return;
        }
        Optional<OperatorSubtaskState> subtaskState = this.taskStateManager.getSubtaskJobManagerRestoredState(operatorID);
        if (subtaskState.isPresent()) {
            SubtaskFileMergingManagerRestoreOperation restoreOperation = new SubtaskFileMergingManagerRestoreOperation(restoredCheckpointId.get(), fileMergingSnapshotManager, jobID, taskInfo, operatorID, subtaskState.get());
            restoreOperation.restore();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected OperatorStateBackend operatorStateBackend(String operatorIdentifierText, PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates, CloseableRegistry backendCloseableRegistry, StateObject.StateObjectSizeStatsCollector statsCollector) throws Exception {
        String logDescription = "operator state backend for " + operatorIdentifierText;
        CloseableRegistry cancelStreamRegistryForRestore = new CloseableRegistry();
        backendCloseableRegistry.registerCloseable((AutoCloseable)cancelStreamRegistryForRestore);
        BackendRestorerProcedure backendRestorer = new BackendRestorerProcedure(stateHandles -> this.stateBackend.createOperatorStateBackend(new OperatorStateBackendParametersImpl(this.environment, operatorIdentifierText, (Collection<OperatorStateHandle>)stateHandles, cancelStreamRegistryForRestore)), backendCloseableRegistry, logDescription);
        try {
            OperatorStateBackend operatorStateBackend = (OperatorStateBackend)backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedOperatorState(), statsCollector);
            return operatorStateBackend;
        }
        finally {
            if (backendCloseableRegistry.unregisterCloseable((AutoCloseable)cancelStreamRegistryForRestore)) {
                IOUtils.closeQuietly((Closeable)cancelStreamRegistryForRestore);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected <K, R extends Disposable & Closeable> R keyedStatedBackend(TypeSerializer<K> keySerializer, String operatorIdentifierText, PrioritizedOperatorSubtaskState prioritizedOperatorSubtaskStates, CloseableRegistry backendCloseableRegistry, MetricGroup metricGroup, double managedMemoryFraction, StateObject.StateObjectSizeStatsCollector statsCollector, KeyedStateBackendCreator<K, R> keyedStateBackendCreator) throws Exception {
        if (keySerializer == null) {
            return null;
        }
        String logDescription = "keyed state backend for " + operatorIdentifierText;
        TaskInfo taskInfo = this.environment.getTaskInfo();
        KeyGroupRange keyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(taskInfo.getMaxNumberOfParallelSubtasks(), taskInfo.getNumberOfParallelSubtasks(), taskInfo.getIndexOfThisSubtask());
        CloseableRegistry cancelStreamRegistryForRestore = new CloseableRegistry();
        backendCloseableRegistry.registerCloseable((AutoCloseable)cancelStreamRegistryForRestore);
        BackendRestorerProcedure backendRestorer = new BackendRestorerProcedure(stateHandles -> {
            KeyedStateBackendParametersImpl parameters = new KeyedStateBackendParametersImpl(this.environment, this.environment.getJobID(), operatorIdentifierText, keySerializer, taskInfo.getMaxNumberOfParallelSubtasks(), keyGroupRange, this.environment.getTaskKvStateRegistry(), this.ttlTimeProvider, metricGroup, this.initializationMetrics::addDurationMetric, (Collection<KeyedStateHandle>)stateHandles, cancelStreamRegistryForRestore, managedMemoryFraction);
            return keyedStateBackendCreator.create(StateBackendLoader.loadStateBackendFromKeyedStateHandles(this.stateBackend, this.environment.getUserCodeClassLoader().asClassLoader(), stateHandles), parameters);
        }, backendCloseableRegistry, logDescription);
        try {
            Disposable disposable = (Disposable)backendRestorer.createAndRestore(prioritizedOperatorSubtaskStates.getPrioritizedManagedKeyedState(), statsCollector);
            return (R)disposable;
        }
        finally {
            if (backendCloseableRegistry.unregisterCloseable((AutoCloseable)cancelStreamRegistryForRestore)) {
                IOUtils.closeQuietly((Closeable)cancelStreamRegistryForRestore);
            }
        }
    }

    protected CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs(@Nonnull Iterator<StateObjectCollection<OperatorStateHandle>> restoreStateAlternatives, @Nonnull StateObject.StateObjectSizeStatsCollector statsCollector) {
        if (restoreStateAlternatives.hasNext()) {
            final Collection rawOperatorState = restoreStateAlternatives.next();
            Preconditions.checkState((!restoreStateAlternatives.hasNext() ? 1 : 0) != 0, (Object)"Local recovery is currently not implemented for raw operator state, but found state alternative.");
            if (rawOperatorState != null) {
                rawOperatorState.forEach(stateObject -> stateObject.collectSizeStats(statsCollector));
                return new CloseableIterable<StatePartitionStreamProvider>(){
                    final CloseableRegistry closeableRegistry = new CloseableRegistry();

                    public void close() throws IOException {
                        this.closeableRegistry.close();
                    }

                    @Nonnull
                    public Iterator<StatePartitionStreamProvider> iterator() {
                        return new OperatorStateStreamIterator("_default_", rawOperatorState.iterator(), this.closeableRegistry);
                    }
                };
            }
        }
        return CloseableIterable.empty();
    }

    protected CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs(@Nonnull Iterator<StateObjectCollection<KeyedStateHandle>> restoreStateAlternatives, @Nonnull StateObject.StateObjectSizeStatsCollector statsCollector) {
        if (restoreStateAlternatives.hasNext()) {
            Collection rawKeyedState = restoreStateAlternatives.next();
            Preconditions.checkState((!restoreStateAlternatives.hasNext() ? 1 : 0) != 0, (Object)"Local recovery is currently not implemented for raw keyed state, but found state alternative.");
            if (rawKeyedState != null) {
                final Collection<KeyGroupsStateHandle> keyGroupsStateHandles = StreamTaskStateInitializerImpl.transform(rawKeyedState);
                keyGroupsStateHandles.forEach(stateObject -> stateObject.collectSizeStats(statsCollector));
                final CloseableRegistry closeableRegistry = new CloseableRegistry();
                return new CloseableIterable<KeyGroupStatePartitionStreamProvider>(){

                    public void close() throws IOException {
                        closeableRegistry.close();
                    }

                    public Iterator<KeyGroupStatePartitionStreamProvider> iterator() {
                        return new KeyGroupStreamIterator(keyGroupsStateHandles.iterator(), closeableRegistry);
                    }
                };
            }
        }
        return CloseableIterable.empty();
    }

    private static Collection<KeyGroupsStateHandle> transform(Collection<KeyedStateHandle> keyedStateHandles) {
        if (keyedStateHandles == null) {
            return null;
        }
        ArrayList<KeyGroupsStateHandle> keyGroupsStateHandles = new ArrayList<KeyGroupsStateHandle>(keyedStateHandles.size());
        for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
            if (keyedStateHandle instanceof KeyGroupsStateHandle) {
                keyGroupsStateHandles.add((KeyGroupsStateHandle)keyedStateHandle);
                continue;
            }
            if (keyedStateHandle == null) continue;
            throw StateUtil.unexpectedStateHandleException(KeyGroupsStateHandle.class, keyedStateHandle.getClass());
        }
        return keyGroupsStateHandles;
    }

    private static class StreamOperatorStateContextImpl
    implements StreamOperatorStateContext {
        @Nullable
        private final Long restoredCheckpointId;
        private final OperatorStateBackend operatorStateBackend;
        @Nullable
        private final TypeSerializer<?> keySerializer;
        private final CheckpointableKeyedStateBackend<?> keyedStateBackend;
        private final AsyncKeyedStateBackend<?> asyncKeyedStateBackend;
        private final InternalTimeServiceManager<?> internalTimeServiceManager;
        private final InternalTimeServiceManager<?> asyncInternalTimeServiceManager;
        private final CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs;
        private final CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs;

        StreamOperatorStateContextImpl(@Nullable Long restoredCheckpointId, OperatorStateBackend operatorStateBackend, @Nullable TypeSerializer<?> keySerializer, CheckpointableKeyedStateBackend<?> keyedStateBackend, AsyncKeyedStateBackend<?> asyncKeyedStateBackend, InternalTimeServiceManager<?> internalTimeServiceManager, InternalTimeServiceManager<?> asyncInternalTimeServiceManager, CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs, CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs) {
            this.restoredCheckpointId = restoredCheckpointId;
            this.operatorStateBackend = operatorStateBackend;
            this.keySerializer = keySerializer;
            this.keyedStateBackend = keyedStateBackend;
            this.asyncKeyedStateBackend = asyncKeyedStateBackend;
            this.internalTimeServiceManager = internalTimeServiceManager;
            this.asyncInternalTimeServiceManager = asyncInternalTimeServiceManager;
            this.rawOperatorStateInputs = rawOperatorStateInputs;
            this.rawKeyedStateInputs = rawKeyedStateInputs;
        }

        @Override
        public OptionalLong getRestoredCheckpointId() {
            return this.restoredCheckpointId == null ? OptionalLong.empty() : OptionalLong.of(this.restoredCheckpointId);
        }

        @Override
        public TypeSerializer<?> keySerializer() {
            return this.keySerializer;
        }

        @Override
        public CheckpointableKeyedStateBackend<?> keyedStateBackend() {
            return this.keyedStateBackend;
        }

        @Override
        public AsyncKeyedStateBackend<?> asyncKeyedStateBackend() {
            return this.asyncKeyedStateBackend;
        }

        @Override
        public OperatorStateBackend operatorStateBackend() {
            return this.operatorStateBackend;
        }

        @Override
        public InternalTimeServiceManager<?> internalTimerServiceManager() {
            return this.internalTimeServiceManager;
        }

        @Override
        public InternalTimeServiceManager<?> asyncInternalTimerServiceManager() {
            return this.asyncInternalTimeServiceManager;
        }

        @Override
        public CloseableIterable<StatePartitionStreamProvider> rawOperatorStateInputs() {
            return this.rawOperatorStateInputs;
        }

        @Override
        public CloseableIterable<KeyGroupStatePartitionStreamProvider> rawKeyedStateInputs() {
            return this.rawKeyedStateInputs;
        }
    }

    private static abstract class AbstractStateStreamIterator<T extends StatePartitionStreamProvider, H extends StreamStateHandle>
    implements Iterator<T> {
        protected final Iterator<H> stateHandleIterator;
        protected final CloseableRegistry closableRegistry;
        protected H currentStateHandle;
        protected FSDataInputStream currentStream;

        AbstractStateStreamIterator(Iterator<H> stateHandleIterator, CloseableRegistry closableRegistry) {
            this.stateHandleIterator = (Iterator)Preconditions.checkNotNull(stateHandleIterator);
            this.closableRegistry = (CloseableRegistry)Preconditions.checkNotNull((Object)closableRegistry);
        }

        protected void openCurrentStream() throws IOException {
            Preconditions.checkState((this.currentStream == null ? 1 : 0) != 0);
            FSDataInputStream stream = this.currentStateHandle.openInputStream();
            this.closableRegistry.registerCloseable((AutoCloseable)stream);
            this.currentStream = stream;
        }

        protected void closeCurrentStream() {
            if (this.closableRegistry.unregisterCloseable((AutoCloseable)this.currentStream)) {
                IOUtils.closeQuietly((InputStream)this.currentStream);
            }
            this.currentStream = null;
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException("Read only Iterator");
        }
    }

    private static class OperatorStateStreamIterator
    extends AbstractStateStreamIterator<StatePartitionStreamProvider, OperatorStateHandle> {
        private final String stateName;
        private long[] offsets;
        private int offPos;

        OperatorStateStreamIterator(String stateName, Iterator<OperatorStateHandle> stateHandleIterator, CloseableRegistry closableRegistry) {
            super(stateHandleIterator, closableRegistry);
            this.stateName = (String)Preconditions.checkNotNull((Object)stateName);
        }

        @Override
        public boolean hasNext() {
            if (null != this.offsets && this.offPos < this.offsets.length) {
                return true;
            }
            this.closeCurrentStream();
            while (this.stateHandleIterator.hasNext()) {
                long[] metaOffsets;
                this.currentStateHandle = (StreamStateHandle)this.stateHandleIterator.next();
                OperatorStateHandle.StateMetaInfo metaInfo = ((OperatorStateHandle)this.currentStateHandle).getStateNameToPartitionOffsets().get(this.stateName);
                if (null == metaInfo || null == (metaOffsets = metaInfo.getOffsets()) || metaOffsets.length <= 0) continue;
                this.offsets = metaOffsets;
                this.offPos = 0;
                if (this.closableRegistry.unregisterCloseable((AutoCloseable)this.currentStream)) {
                    IOUtils.closeQuietly((InputStream)this.currentStream);
                    this.currentStream = null;
                }
                return true;
            }
            return false;
        }

        @Override
        public StatePartitionStreamProvider next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException("Iterator exhausted");
            }
            long offset = this.offsets[this.offPos++];
            try {
                if (null == this.currentStream) {
                    this.openCurrentStream();
                }
                this.currentStream.seek(offset);
                return new StatePartitionStreamProvider((InputStream)this.currentStream);
            }
            catch (IOException ioex) {
                return new StatePartitionStreamProvider(ioex);
            }
        }
    }

    private static class KeyGroupStreamIterator
    extends AbstractStateStreamIterator<KeyGroupStatePartitionStreamProvider, KeyGroupsStateHandle> {
        private Iterator<Tuple2<Integer, Long>> currentOffsetsIterator;

        KeyGroupStreamIterator(Iterator<KeyGroupsStateHandle> stateHandleIterator, CloseableRegistry closableRegistry) {
            super(stateHandleIterator, closableRegistry);
        }

        @Override
        public boolean hasNext() {
            if (null != this.currentStateHandle && this.currentOffsetsIterator.hasNext()) {
                return true;
            }
            this.closeCurrentStream();
            while (this.stateHandleIterator.hasNext()) {
                this.currentStateHandle = (StreamStateHandle)this.stateHandleIterator.next();
                if (((KeyGroupsStateHandle)this.currentStateHandle).getKeyGroupRange().getNumberOfKeyGroups() <= 0) continue;
                this.currentOffsetsIterator = KeyGroupStreamIterator.unsetOffsetsSkippingIterator((KeyGroupsStateHandle)this.currentStateHandle);
                if (!this.currentOffsetsIterator.hasNext()) continue;
                return true;
            }
            return false;
        }

        private static Iterator<Tuple2<Integer, Long>> unsetOffsetsSkippingIterator(KeyGroupsStateHandle keyGroupsStateHandle) {
            return StreamSupport.stream(keyGroupsStateHandle.getGroupRangeOffsets().spliterator(), false).filter(keyGroupIdAndOffset -> (Long)keyGroupIdAndOffset.f1 != -1L).iterator();
        }

        @Override
        public KeyGroupStatePartitionStreamProvider next() {
            if (!this.hasNext()) {
                throw new NoSuchElementException("Iterator exhausted");
            }
            Tuple2<Integer, Long> keyGroupOffset = this.currentOffsetsIterator.next();
            try {
                if (null == this.currentStream) {
                    this.openCurrentStream();
                }
                this.currentStream.seek(((Long)keyGroupOffset.f1).longValue());
                return new KeyGroupStatePartitionStreamProvider((InputStream)this.currentStream, (int)((Integer)keyGroupOffset.f0));
            }
            catch (IOException ioex) {
                return new KeyGroupStatePartitionStreamProvider(ioex, (int)((Integer)keyGroupOffset.f0));
            }
        }
    }

    @FunctionalInterface
    protected static interface KeyedStateBackendCreator<K, R extends Disposable & Closeable> {
        public R create(StateBackend var1, StateBackend.KeyedStateBackendParameters<K> var2) throws Exception;
    }
}

