/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.RunnableFuture;
import org.apache.commons.io.IOUtils;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources;
import org.apache.flink.runtime.io.async.AsyncStoppableTaskWithCallback;
import org.apache.flink.runtime.state.ArrayListSerializer;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.runtime.state.OperatorBackendSerializationProxy;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.RegisteredOperatorBackendStateMetaInfo;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class DefaultOperatorStateBackend
implements OperatorStateBackend {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultOperatorStateBackend.class);
    public static final String DEFAULT_OPERATOR_STATE_NAME = "_default_";
    private final Map<String, PartitionableListState<?>> registeredStates;
    private final CloseableRegistry closeStreamOnCancelRegistry = new CloseableRegistry();
    private final JavaSerializer<Serializable> javaSerializer;
    private final ClassLoader userClassloader;
    private final ExecutionConfig executionConfig;
    private final boolean asynchronousSnapshots;
    private final Map<String, RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> restoredStateMetaInfos;
    private final HashMap<String, PartitionableListState<?>> accessedStatesByName;

    public DefaultOperatorStateBackend(ClassLoader userClassLoader, ExecutionConfig executionConfig, boolean asynchronousSnapshots) throws IOException {
        this.userClassloader = (ClassLoader)Preconditions.checkNotNull((Object)userClassLoader);
        this.executionConfig = executionConfig;
        this.javaSerializer = new JavaSerializer();
        this.registeredStates = new HashMap();
        this.asynchronousSnapshots = asynchronousSnapshots;
        this.accessedStatesByName = new HashMap();
        this.restoredStateMetaInfos = new HashMap();
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public Set<String> getRegisteredStateNames() {
        return this.registeredStates.keySet();
    }

    @Override
    public void close() throws IOException {
        this.closeStreamOnCancelRegistry.close();
    }

    @Override
    public void dispose() {
        IOUtils.closeQuietly((Closeable)this);
        this.registeredStates.clear();
    }

    public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
        return this.getListState(stateDescriptor, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
    }

    public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
        return this.getListState(stateDescriptor, OperatorStateHandle.Mode.BROADCAST);
    }

    @Deprecated
    public <S> ListState<S> getOperatorState(ListStateDescriptor<S> stateDescriptor) throws Exception {
        return this.getListState(stateDescriptor);
    }

    @Deprecated
    public <T extends Serializable> ListState<T> getSerializableListState(String stateName) throws Exception {
        return this.getListState(new ListStateDescriptor(stateName, this.javaSerializer));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public RunnableFuture<OperatorStateHandle> snapshot(final long checkpointId, final long timestamp, final CheckpointStreamFactory streamFactory, CheckpointOptions checkpointOptions) throws Exception {
        long syncStartTime = System.currentTimeMillis();
        if (this.registeredStates.isEmpty()) {
            return DoneFuture.nullValue();
        }
        final HashMap registeredStatesDeepCopies = new HashMap(this.registeredStates.size());
        ClassLoader snapshotClassLoader = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(this.userClassloader);
        try {
            for (Map.Entry<String, PartitionableListState<?>> entry : this.registeredStates.entrySet()) {
                PartitionableListState<?> listState = entry.getValue();
                if (null != listState) {
                    listState = listState.deepCopy();
                }
                registeredStatesDeepCopies.put(entry.getKey(), listState);
            }
        }
        finally {
            Thread.currentThread().setContextClassLoader(snapshotClassLoader);
        }
        AbstractAsyncCallableWithResources<OperatorStateHandle> ioCallable = new AbstractAsyncCallableWithResources<OperatorStateHandle>(){
            CheckpointStreamFactory.CheckpointStateOutputStream out = null;

            @Override
            protected void acquireResources() throws Exception {
                this.openOutStream();
            }

            @Override
            protected void releaseResources() throws Exception {
                this.closeOutStream();
            }

            @Override
            protected void stopOperation() throws Exception {
                this.closeOutStream();
            }

            private void openOutStream() throws Exception {
                this.out = streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
                DefaultOperatorStateBackend.this.closeStreamOnCancelRegistry.registerCloseable((Closeable)((Object)this.out));
            }

            private void closeOutStream() {
                if (DefaultOperatorStateBackend.this.closeStreamOnCancelRegistry.unregisterCloseable((Closeable)((Object)this.out))) {
                    IOUtils.closeQuietly((OutputStream)((Object)this.out));
                }
            }

            @Override
            public OperatorStateHandle performOperation() throws Exception {
                StreamStateHandle stateHandle;
                long asyncStartTime = System.currentTimeMillis();
                CheckpointStreamFactory.CheckpointStateOutputStream localOut = this.out;
                HashMap<String, OperatorStateHandle.StateMetaInfo> writtenStatesMetaData = new HashMap<String, OperatorStateHandle.StateMetaInfo>(registeredStatesDeepCopies.size());
                ArrayList metaInfoSnapshots = new ArrayList(registeredStatesDeepCopies.size());
                for (Map.Entry entry : registeredStatesDeepCopies.entrySet()) {
                    metaInfoSnapshots.add(((PartitionableListState)entry.getValue()).getStateMetaInfo().snapshot());
                }
                DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper((OutputStream)((Object)localOut));
                OperatorBackendSerializationProxy backendSerializationProxy = new OperatorBackendSerializationProxy(metaInfoSnapshots);
                backendSerializationProxy.write((DataOutputView)dov);
                dov.writeInt(registeredStatesDeepCopies.size());
                for (Map.Entry entry : registeredStatesDeepCopies.entrySet()) {
                    PartitionableListState value = (PartitionableListState)entry.getValue();
                    long[] partitionOffsets = value.write(localOut);
                    OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
                    writtenStatesMetaData.put((String)entry.getKey(), new OperatorStateHandle.StateMetaInfo(partitionOffsets, mode));
                }
                OperatorStateHandle retValue = null;
                if (DefaultOperatorStateBackend.this.closeStreamOnCancelRegistry.unregisterCloseable((Closeable)((Object)this.out)) && (stateHandle = this.out.closeAndGetHandle()) != null) {
                    retValue = new OperatorStateHandle(writtenStatesMetaData, stateHandle);
                }
                if (DefaultOperatorStateBackend.this.asynchronousSnapshots) {
                    LOG.info("DefaultOperatorStateBackend snapshot ({}, asynchronous part) in thread {} took {} ms.", new Object[]{streamFactory, Thread.currentThread(), System.currentTimeMillis() - asyncStartTime});
                }
                return retValue;
            }
        };
        AsyncStoppableTaskWithCallback<OperatorStateHandle> task = AsyncStoppableTaskWithCallback.from(ioCallable);
        if (!this.asynchronousSnapshots) {
            task.run();
        }
        LOG.info("DefaultOperatorStateBackend snapshot ({}, synchronous part) in thread {} took {} ms.", new Object[]{streamFactory, Thread.currentThread(), System.currentTimeMillis() - syncStartTime});
        return task;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void restore(Collection<OperatorStateHandle> restoreSnapshots) throws Exception {
        if (null == restoreSnapshots) {
            return;
        }
        for (OperatorStateHandle stateHandle : restoreSnapshots) {
            if (stateHandle == null) continue;
            FSDataInputStream in = stateHandle.openInputStream();
            this.closeStreamOnCancelRegistry.registerCloseable((Closeable)in);
            ClassLoader restoreClassLoader = Thread.currentThread().getContextClassLoader();
            try {
                Thread.currentThread().setContextClassLoader(this.userClassloader);
                OperatorBackendSerializationProxy backendSerializationProxy = new OperatorBackendSerializationProxy(this.userClassloader);
                backendSerializationProxy.read((DataInputView)new DataInputViewStreamWrapper((InputStream)in));
                List<RegisteredOperatorBackendStateMetaInfo.Snapshot<?>> restoredMetaInfoSnapshots = backendSerializationProxy.getStateMetaInfoSnapshots();
                for (RegisteredOperatorBackendStateMetaInfo.Snapshot<?> snapshot : restoredMetaInfoSnapshots) {
                    if (snapshot.getPartitionStateSerializer() == null || snapshot.getPartitionStateSerializer() instanceof UnloadableDummyTypeSerializer) {
                        throw new IOException("Unable to restore operator state [" + snapshot.getName() + "]. The previous serializer of the operator state must be present; the serializer could have been removed from the classpath, or its implementation have changed and could not be loaded. This is a temporary restriction that will be fixed in future versions.");
                    }
                    this.restoredStateMetaInfos.put(snapshot.getName(), snapshot);
                    PartitionableListState<?> listState = this.registeredStates.get(snapshot.getName());
                    if (null != listState) continue;
                    listState = new PartitionableListState(new RegisteredOperatorBackendStateMetaInfo(snapshot.getName(), snapshot.getPartitionStateSerializer(), snapshot.getAssignmentMode()));
                    this.registeredStates.put(listState.getStateMetaInfo().getName(), listState);
                }
                for (Map.Entry entry : stateHandle.getStateNameToPartitionOffsets().entrySet()) {
                    PartitionableListState<?> stateListForName = this.registeredStates.get(entry.getKey());
                    Preconditions.checkState((null != stateListForName ? 1 : 0) != 0, (Object)("Found state without corresponding meta info: " + (String)entry.getKey()));
                    DefaultOperatorStateBackend.deserializeStateValues(stateListForName, in, (OperatorStateHandle.StateMetaInfo)entry.getValue());
                }
            }
            finally {
                Thread.currentThread().setContextClassLoader(restoreClassLoader);
                if (!this.closeStreamOnCancelRegistry.unregisterCloseable((Closeable)in)) continue;
                IOUtils.closeQuietly((InputStream)in);
            }
        }
    }

    private <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor, OperatorStateHandle.Mode mode) throws IOException, StateMigrationException {
        Preconditions.checkNotNull(stateDescriptor);
        String name = (String)Preconditions.checkNotNull((Object)stateDescriptor.getName());
        PartitionableListState<?> previous = this.accessedStatesByName.get(name);
        if (previous != null) {
            DefaultOperatorStateBackend.checkStateNameAndMode(previous.getStateMetaInfo(), name, mode);
            return previous;
        }
        stateDescriptor.initializeSerializerUnlessSet(this.getExecutionConfig());
        TypeSerializer partitionStateSerializer = (TypeSerializer)Preconditions.checkNotNull((Object)stateDescriptor.getElementSerializer());
        PartitionableListState<Object> partitionableListState = this.registeredStates.get(name);
        if (null == partitionableListState) {
            partitionableListState = new PartitionableListState(new RegisteredOperatorBackendStateMetaInfo(name, partitionStateSerializer, mode));
            this.registeredStates.put(name, partitionableListState);
        } else {
            DefaultOperatorStateBackend.checkStateNameAndMode(partitionableListState.getStateMetaInfo(), name, mode);
            RegisteredOperatorBackendStateMetaInfo.Snapshot<?> restoredMetaInfo = this.restoredStateMetaInfos.get(name);
            CompatibilityResult stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(restoredMetaInfo.getPartitionStateSerializer(), UnloadableDummyTypeSerializer.class, (TypeSerializerConfigSnapshot)restoredMetaInfo.getPartitionStateSerializerConfigSnapshot(), (TypeSerializer)partitionStateSerializer);
            if (!stateCompatibility.isRequiresMigration()) {
                partitionableListState.setStateMetaInfo(new RegisteredOperatorBackendStateMetaInfo(name, partitionStateSerializer, mode));
            } else {
                throw new StateMigrationException("State migration isn't supported, yet.");
            }
        }
        this.accessedStatesByName.put(name, partitionableListState);
        return partitionableListState;
    }

    private static <S> void deserializeStateValues(PartitionableListState<S> stateListForName, FSDataInputStream in, OperatorStateHandle.StateMetaInfo metaInfo) throws IOException {
        long[] offsets;
        if (null != metaInfo && null != (offsets = metaInfo.getOffsets())) {
            DataInputViewStreamWrapper div = new DataInputViewStreamWrapper((InputStream)in);
            TypeSerializer<S> serializer = stateListForName.getStateMetaInfo().getPartitionStateSerializer();
            for (long offset : offsets) {
                in.seek(offset);
                stateListForName.add(serializer.deserialize((DataInputView)div));
            }
        }
    }

    private static void checkStateNameAndMode(RegisteredOperatorBackendStateMetaInfo previousMetaInfo, String expectedName, OperatorStateHandle.Mode expectedMode) {
        Preconditions.checkState((boolean)previousMetaInfo.getName().equals(expectedName), (Object)("Incompatible state names. Was [" + previousMetaInfo.getName() + "], registered with [" + expectedName + "]."));
        Preconditions.checkState((boolean)previousMetaInfo.getAssignmentMode().equals((Object)expectedMode), (Object)("Incompatible state assignment modes. Was [" + (Object)((Object)previousMetaInfo.getAssignmentMode()) + "], registered with [" + (Object)((Object)expectedMode) + "]."));
    }

    static final class PartitionableListState<S>
    implements ListState<S> {
        private RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo;
        private final ArrayList<S> internalList;
        private final ArrayListSerializer<S> internalListCopySerializer;

        public PartitionableListState(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) {
            this(stateMetaInfo, new ArrayList());
        }

        private PartitionableListState(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo, ArrayList<S> internalList) {
            this.stateMetaInfo = (RegisteredOperatorBackendStateMetaInfo)Preconditions.checkNotNull(stateMetaInfo);
            this.internalList = (ArrayList)Preconditions.checkNotNull(internalList);
            this.internalListCopySerializer = new ArrayListSerializer<S>(stateMetaInfo.getPartitionStateSerializer());
        }

        private PartitionableListState(PartitionableListState<S> toCopy) {
            this(toCopy.stateMetaInfo, toCopy.internalListCopySerializer.copy(toCopy.internalList));
        }

        public void setStateMetaInfo(RegisteredOperatorBackendStateMetaInfo<S> stateMetaInfo) {
            this.stateMetaInfo = stateMetaInfo;
        }

        public RegisteredOperatorBackendStateMetaInfo<S> getStateMetaInfo() {
            return this.stateMetaInfo;
        }

        public PartitionableListState<S> deepCopy() {
            return new PartitionableListState<S>(this);
        }

        public void clear() {
            this.internalList.clear();
        }

        public Iterable<S> get() {
            return this.internalList;
        }

        public void add(S value) {
            this.internalList.add(value);
        }

        public String toString() {
            return "PartitionableListState{stateMetaInfo=" + this.stateMetaInfo + ", internalList=" + this.internalList + '}';
        }

        public long[] write(FSDataOutputStream out) throws IOException {
            long[] partitionOffsets = new long[this.internalList.size()];
            DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper((OutputStream)out);
            for (int i = 0; i < this.internalList.size(); ++i) {
                S element = this.internalList.get(i);
                partitionOffsets[i] = out.getPos();
                this.getStateMetaInfo().getPartitionStateSerializer().serialize(element, (DataOutputView)dov);
            }
            return partitionOffsets;
        }
    }
}

