/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.RunnableFuture;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.NonClosingCheckpointOutputStream;
import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;

public class StateSnapshotContextSynchronousImpl
implements StateSnapshotContext,
Closeable {
    private final long checkpointId;
    private final long checkpointTimestamp;
    private final CheckpointStreamFactory streamFactory;
    private final KeyGroupRange keyGroupRange;
    private final CloseableRegistry closableRegistry;
    private KeyedStateCheckpointOutputStream keyedStateCheckpointOutputStream;
    private OperatorStateCheckpointOutputStream operatorStateCheckpointOutputStream;

    @VisibleForTesting
    public StateSnapshotContextSynchronousImpl(long checkpointId, long checkpointTimestamp) {
        this.checkpointId = checkpointId;
        this.checkpointTimestamp = checkpointTimestamp;
        this.streamFactory = null;
        this.keyGroupRange = KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
        this.closableRegistry = null;
    }

    public StateSnapshotContextSynchronousImpl(long checkpointId, long checkpointTimestamp, CheckpointStreamFactory streamFactory, KeyGroupRange keyGroupRange, CloseableRegistry closableRegistry) {
        this.checkpointId = checkpointId;
        this.checkpointTimestamp = checkpointTimestamp;
        this.streamFactory = (CheckpointStreamFactory)Preconditions.checkNotNull((Object)streamFactory);
        this.keyGroupRange = (KeyGroupRange)Preconditions.checkNotNull((Object)keyGroupRange);
        this.closableRegistry = (CloseableRegistry)Preconditions.checkNotNull((Object)closableRegistry);
    }

    @Override
    public long getCheckpointId() {
        return this.checkpointId;
    }

    @Override
    public long getCheckpointTimestamp() {
        return this.checkpointTimestamp;
    }

    private CheckpointStreamFactory.CheckpointStateOutputStream openAndRegisterNewStream() throws Exception {
        CheckpointStreamFactory.CheckpointStateOutputStream cout = this.streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
        this.closableRegistry.registerCloseable((Closeable)((Object)cout));
        return cout;
    }

    @Override
    public KeyedStateCheckpointOutputStream getRawKeyedOperatorStateOutput() throws Exception {
        if (null == this.keyedStateCheckpointOutputStream) {
            Preconditions.checkState((this.keyGroupRange != KeyGroupRange.EMPTY_KEY_GROUP_RANGE ? 1 : 0) != 0, (Object)"Not a keyed operator");
            this.keyedStateCheckpointOutputStream = new KeyedStateCheckpointOutputStream(this.openAndRegisterNewStream(), this.keyGroupRange);
        }
        return this.keyedStateCheckpointOutputStream;
    }

    @Override
    public OperatorStateCheckpointOutputStream getRawOperatorStateOutput() throws Exception {
        if (null == this.operatorStateCheckpointOutputStream) {
            this.operatorStateCheckpointOutputStream = new OperatorStateCheckpointOutputStream(this.openAndRegisterNewStream());
        }
        return this.operatorStateCheckpointOutputStream;
    }

    @Nonnull
    public RunnableFuture<SnapshotResult<KeyedStateHandle>> getKeyedStateStreamFuture() throws IOException {
        KeyedStateHandle keyGroupsStateHandle = this.closeAndUnregisterStreamToObtainStateHandle(this.keyedStateCheckpointOutputStream);
        return this.toDoneFutureOfSnapshotResult(keyGroupsStateHandle);
    }

    @Nonnull
    public RunnableFuture<SnapshotResult<OperatorStateHandle>> getOperatorStateStreamFuture() throws IOException {
        OperatorStateHandle operatorStateHandle = this.closeAndUnregisterStreamToObtainStateHandle(this.operatorStateCheckpointOutputStream);
        return this.toDoneFutureOfSnapshotResult(operatorStateHandle);
    }

    private <T extends StateObject> RunnableFuture<SnapshotResult<T>> toDoneFutureOfSnapshotResult(T handle) {
        SnapshotResult<T> snapshotResult = SnapshotResult.of(handle);
        return DoneFuture.of(snapshotResult);
    }

    private <T extends StreamStateHandle> T closeAndUnregisterStreamToObtainStateHandle(NonClosingCheckpointOutputStream<T> stream) throws IOException {
        if (stream != null && this.closableRegistry.unregisterCloseable((Closeable)((Object)stream.getDelegate()))) {
            return stream.closeAndGetHandle();
        }
        return null;
    }

    private <T extends StreamStateHandle> void closeAndUnregisterStream(NonClosingCheckpointOutputStream<? extends T> stream) throws IOException {
        Preconditions.checkNotNull(stream);
        CheckpointStreamFactory.CheckpointStateOutputStream delegate = stream.getDelegate();
        if (this.closableRegistry.unregisterCloseable((Closeable)((Object)delegate))) {
            delegate.close();
        }
    }

    @Override
    public void close() throws IOException {
        IOException exception = null;
        if (this.keyedStateCheckpointOutputStream != null) {
            try {
                this.closeAndUnregisterStream(this.keyedStateCheckpointOutputStream);
            }
            catch (IOException e) {
                exception = new IOException("Could not close the raw keyed state checkpoint output stream.", e);
            }
        }
        if (this.operatorStateCheckpointOutputStream != null) {
            try {
                this.closeAndUnregisterStream(this.operatorStateCheckpointOutputStream);
            }
            catch (IOException e) {
                exception = (IOException)ExceptionUtils.firstOrSuppressed((Throwable)new IOException("Could not close the raw operator state checkpoint output stream.", e), (Throwable)exception);
            }
        }
        if (exception != null) {
            throw exception;
        }
    }
}

