/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.utils;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.Changelog;
import org.apache.paimon.Snapshot;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.HintFileUtils;
import org.apache.paimon.utils.StringUtils;
import org.apache.paimon.utils.ThreadPoolUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChangelogManager
implements Serializable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(ChangelogManager.class);
    public static final String CHANGELOG_PREFIX = "changelog-";
    private final FileIO fileIO;
    private final Path tablePath;
    private final String branch;

    public ChangelogManager(FileIO fileIO, Path tablePath, @Nullable String branchName) {
        this.fileIO = fileIO;
        this.tablePath = tablePath;
        this.branch = BranchManager.normalizeBranch(branchName);
    }

    public FileIO fileIO() {
        return this.fileIO;
    }

    @Nullable
    public Long latestLongLivedChangelogId() {
        try {
            return HintFileUtils.findLatest(this.fileIO, this.changelogDirectory(), CHANGELOG_PREFIX, this::longLivedChangelogPath);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to find latest changelog id", e);
        }
    }

    @Nullable
    public Long earliestLongLivedChangelogId() {
        try {
            return HintFileUtils.findEarliest(this.fileIO, this.changelogDirectory(), CHANGELOG_PREFIX, this::longLivedChangelogPath);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to find earliest changelog id", e);
        }
    }

    public boolean longLivedChangelogExists(long snapshotId) {
        Path path = this.longLivedChangelogPath(snapshotId);
        try {
            return this.fileIO.exists(path);
        }
        catch (IOException e) {
            throw new RuntimeException("Failed to determine if changelog #" + snapshotId + " exists in path " + path, e);
        }
    }

    public Changelog longLivedChangelog(long snapshotId) {
        return Changelog.fromPath(this.fileIO, this.longLivedChangelogPath(snapshotId));
    }

    public Changelog changelog(long snapshotId) {
        Path changelogPath = this.longLivedChangelogPath(snapshotId);
        return Changelog.fromPath(this.fileIO, changelogPath);
    }

    public Path longLivedChangelogPath(long snapshotId) {
        return new Path(BranchManager.branchPath(this.tablePath, this.branch) + "/changelog/changelog-" + snapshotId);
    }

    public Path changelogDirectory() {
        return new Path(BranchManager.branchPath(this.tablePath, this.branch) + "/changelog");
    }

    public void commitChangelog(Changelog changelog, long id) throws IOException {
        this.fileIO.writeFile(this.longLivedChangelogPath(id), changelog.toJson(), true);
    }

    public void commitLongLivedChangelogLatestHint(long snapshotId) throws IOException {
        HintFileUtils.commitLatestHint(this.fileIO, snapshotId, this.changelogDirectory());
    }

    public void commitLongLivedChangelogEarliestHint(long snapshotId) throws IOException {
        HintFileUtils.commitEarliestHint(this.fileIO, snapshotId, this.changelogDirectory());
    }

    public Changelog tryGetChangelog(long snapshotId) throws FileNotFoundException {
        Path changelogPath = this.longLivedChangelogPath(snapshotId);
        return Changelog.tryFromPath(this.fileIO, changelogPath);
    }

    public Iterator<Changelog> changelogs() throws IOException {
        return FileUtils.listVersionedFiles(this.fileIO, this.changelogDirectory(), CHANGELOG_PREFIX).map(this::changelog).sorted(Comparator.comparingLong(Snapshot::id)).iterator();
    }

    public List<Changelog> safelyGetAllChangelogs() throws IOException {
        List<Path> paths = FileUtils.listVersionedFiles(this.fileIO, this.changelogDirectory(), CHANGELOG_PREFIX).map(this::longLivedChangelogPath).collect(Collectors.toList());
        List<Changelog> changelogs = Collections.synchronizedList(new ArrayList(paths.size()));
        ChangelogManager.collectSnapshots(path -> {
            block3: {
                try {
                    String changelogStr = this.fileIO.readFileUtf8((Path)path);
                    if (StringUtils.isNullOrWhitespaceOnly(changelogStr)) {
                        LOG.warn("Changelog file is empty, path: {}", path);
                    }
                    changelogs.add(Changelog.fromJson(changelogStr));
                }
                catch (IOException e) {
                    if (e instanceof FileNotFoundException) break block3;
                    throw new RuntimeException(e);
                }
            }
        }, paths);
        return changelogs;
    }

    public void deleteLatestHint() throws IOException {
        HintFileUtils.deleteLatestHint(this.fileIO, this.changelogDirectory());
    }

    public void deleteEarliestHint() throws IOException {
        HintFileUtils.deleteEarliestHint(this.fileIO, this.changelogDirectory());
    }

    private static void collectSnapshots(Consumer<Path> pathConsumer, List<Path> paths) throws IOException {
        ThreadPoolExecutor executor = ThreadPoolUtils.createCachedThreadPool(Runtime.getRuntime().availableProcessors(), "CHANGELOG_COLLECTOR");
        try {
            ThreadPoolUtils.randomlyOnlyExecute(executor, pathConsumer, paths);
        }
        catch (RuntimeException e) {
            throw new IOException(e);
        }
        finally {
            executor.shutdown();
        }
    }
}

