package org.apache.flink.changelog.fs;

import java.io.IOException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.ChangelogTaskLocalStateStore;
import org.apache.flink.runtime.state.LocalSnapshotDirectoryProvider;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/changelog/fs/DuplicatingStateChangeFsUploader.class */
public class DuplicatingStateChangeFsUploader extends AbstractStateChangeFsUploader {
    private static final Logger LOG = LoggerFactory.getLogger(DuplicatingStateChangeFsUploader.class);
    private final Path basePath;
    private final FileSystem fileSystem;
    private final LocalSnapshotDirectoryProvider localSnapshotDirectoryProvider;
    private final JobID jobID;

    public DuplicatingStateChangeFsUploader(JobID jobID, Path path, FileSystem fileSystem, boolean z, int i, ChangelogStorageMetricGroup changelogStorageMetricGroup, TaskChangelogRegistry taskChangelogRegistry, LocalSnapshotDirectoryProvider localSnapshotDirectoryProvider) {
        super(z, i, changelogStorageMetricGroup, taskChangelogRegistry, (v1, v2) -> {
            return new FileStateHandle(v1, v2);
        });
        this.basePath = new Path(path, String.format("%s/%s", jobID.toHexString(), StateChangeFsUploader.PATH_SUB_DIR));
        this.fileSystem = fileSystem;
        this.localSnapshotDirectoryProvider = localSnapshotDirectoryProvider;
        this.jobID = jobID;
    }

    @Override // org.apache.flink.changelog.fs.AbstractStateChangeFsUploader
    public OutputStreamWithPos prepareStream() throws IOException {
        String generateFileName = generateFileName();
        LOG.debug("upload tasks to {}", generateFileName);
        Path path = new Path(this.basePath, generateFileName);
        FSDataOutputStream create = this.fileSystem.create(path, FileSystem.WriteMode.NO_OVERWRITE);
        Path path2 = new Path(ChangelogTaskLocalStateStore.getLocalTaskOwnedDirectory(this.localSnapshotDirectoryProvider, this.jobID), generateFileName);
        DuplicatingOutputStreamWithPos duplicatingOutputStreamWithPos = new DuplicatingOutputStreamWithPos(create, path, path2.getFileSystem().create(path2, FileSystem.WriteMode.NO_OVERWRITE), path2);
        duplicatingOutputStreamWithPos.wrap(this.compression, this.bufferSize);
        return duplicatingOutputStreamWithPos;
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
    }
}
