/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.IOException;
import javax.annotation.Nonnull;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.SavepointType;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
import org.apache.flink.runtime.state.CheckpointedStateScope;
import org.apache.flink.runtime.state.FullSnapshotAsyncWriter;
import org.apache.flink.runtime.state.FullSnapshotResources;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.SnapshotStrategy;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SavepointSnapshotStrategy<K>
implements SnapshotStrategy<KeyedStateHandle, FullSnapshotResources<K>> {
    private static final Logger LOG = LoggerFactory.getLogger(SavepointSnapshotStrategy.class);
    private final FullSnapshotResources<K> savepointResources;

    public SavepointSnapshotStrategy(FullSnapshotResources<K> savepointResources) {
        this.savepointResources = savepointResources;
    }

    @Override
    public FullSnapshotResources<K> syncPrepareResources(long checkpointId) throws Exception {
        return this.savepointResources;
    }

    @Override
    public SnapshotStrategy.SnapshotResultSupplier<KeyedStateHandle> asyncSnapshot(FullSnapshotResources<K> savepointResources, long checkpointId, long timestamp, @Nonnull CheckpointStreamFactory streamFactory, @Nonnull CheckpointOptions checkpointOptions) {
        if (savepointResources.getMetaInfoSnapshots().isEmpty()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Asynchronous savepoint performed on empty keyed state at {}. Returning null.", (Object)timestamp);
            }
            return registry -> SnapshotResult.empty();
        }
        SupplierWithException checkpointStreamSupplier = () -> SavepointSnapshotStrategy.createSimpleStream(streamFactory);
        return new FullSnapshotAsyncWriter<K>(SavepointType.savepoint(SavepointFormatType.CANONICAL), (SupplierWithException<CheckpointStreamWithResultProvider, Exception>)checkpointStreamSupplier, savepointResources);
    }

    @Nonnull
    static CheckpointStreamWithResultProvider createSimpleStream(@Nonnull CheckpointStreamFactory primaryStreamFactory) throws IOException {
        CheckpointStateOutputStream primaryOut = primaryStreamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
        return new CheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);
    }
}

