/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.copy;

import java.io.IOException;
import java.util.HashSet;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.copy.CopyFileInfo;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.SnapshotManager;

public class SnapshotHintOperator
extends AbstractStreamOperator<CopyFileInfo>
implements OneInputStreamOperator<CopyFileInfo, CopyFileInfo>,
BoundedOneInput {
    private final Map<String, String> targetCatalogConfig;
    private Catalog targetCatalog;
    private Set<String> identifiers;

    public SnapshotHintOperator(Map<String, String> targetCatalogConfig) {
        this.targetCatalogConfig = targetCatalogConfig;
    }

    public void open() throws Exception {
        this.targetCatalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.targetCatalogConfig));
        this.identifiers = new HashSet<String>();
    }

    public void processElement(StreamRecord<CopyFileInfo> streamRecord) throws Exception {
        String identifier = ((CopyFileInfo)streamRecord.getValue()).getTargetIdentifier();
        this.identifiers.add(identifier);
    }

    public void endInput() throws Exception {
        for (String identifier : this.identifiers) {
            FileStoreTable targetTable = (FileStoreTable)this.targetCatalog.getTable(Identifier.fromString(identifier));
            this.commitSnapshotHintInTargetTable(targetTable.snapshotManager());
        }
    }

    private void commitSnapshotHintInTargetTable(SnapshotManager targetTableSnapshotManager) throws IOException {
        OptionalLong optionalSnapshotId = targetTableSnapshotManager.safelyGetAllSnapshots().stream().mapToLong(Snapshot::id).max();
        if (optionalSnapshotId.isPresent()) {
            long snapshotId = optionalSnapshotId.getAsLong();
            targetTableSnapshotManager.commitEarliestHint(snapshotId);
            targetTableSnapshotManager.commitLatestHint(snapshotId);
            for (Snapshot snapshot : targetTableSnapshotManager.safelyGetAllSnapshots()) {
                if (snapshot.id() == snapshotId) continue;
                targetTableSnapshotManager.deleteSnapshot(snapshot.id());
            }
        }
    }

    public void close() throws Exception {
        if (this.targetCatalog != null) {
            this.targetCatalog.close();
        }
    }
}

