/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.changelog.fs;

import java.io.IOException;
import java.io.OutputStream;
import java.util.function.BiFunction;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.changelog.fs.AbstractStateChangeFsUploader;
import org.apache.flink.changelog.fs.ChangelogStorageMetricGroup;
import org.apache.flink.changelog.fs.OutputStreamWithPos;
import org.apache.flink.changelog.fs.TaskChangelogRegistry;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StateChangeFsUploader
extends AbstractStateChangeFsUploader {
    private static final Logger LOG = LoggerFactory.getLogger(StateChangeFsUploader.class);
    @VisibleForTesting
    public static final String PATH_SUB_DIR = "dstl";
    private final Path basePath;
    private final FileSystem fileSystem;

    @VisibleForTesting
    public StateChangeFsUploader(JobID jobID, Path basePath, FileSystem fileSystem, boolean compression, int bufferSize, ChangelogStorageMetricGroup metrics, TaskChangelogRegistry changelogRegistry) {
        this(jobID, basePath, fileSystem, compression, bufferSize, metrics, changelogRegistry, FileStateHandle::new);
    }

    public StateChangeFsUploader(JobID jobID, Path basePath, FileSystem fileSystem, boolean compression, int bufferSize, ChangelogStorageMetricGroup metrics, TaskChangelogRegistry changelogRegistry, BiFunction<Path, Long, StreamStateHandle> handleFactory) {
        super(compression, bufferSize, metrics, changelogRegistry, handleFactory);
        this.basePath = new Path(basePath, String.format("%s/%s", jobID.toHexString(), PATH_SUB_DIR));
        this.fileSystem = fileSystem;
    }

    @VisibleForTesting
    public Path getBasePath() {
        return this.basePath;
    }

    @Override
    public OutputStreamWithPos prepareStream() throws IOException {
        String fileName = this.generateFileName();
        LOG.debug("upload tasks to {}", (Object)fileName);
        Path path = new Path(this.basePath, fileName);
        OutputStreamWithPos outputStream = new OutputStreamWithPos((OutputStream)this.fileSystem.create(path, FileSystem.WriteMode.NO_OVERWRITE), path);
        outputStream.wrap(this.compression, this.bufferSize);
        return outputStream;
    }

    @Override
    public void close() {
    }
}

