/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.clone.spits;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.clone.spits.CommitMessageInfo;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.CommitMessage;

public class CommitMessageTableOperator
extends AbstractStreamOperator<Long>
implements OneInputStreamOperator<CommitMessageInfo, Long>,
BoundedOneInput {
    private static final long serialVersionUID = 1L;
    private final Map<String, String> catalogConfig;
    private transient Map<Identifier, List<CommitMessage>> commitMessages;

    public CommitMessageTableOperator(Map<String, String> catalogConfig) {
        this.catalogConfig = catalogConfig;
    }

    public void open() throws Exception {
        super.open();
        this.commitMessages = new HashMap<Identifier, List<CommitMessage>>();
    }

    public void processElement(StreamRecord<CommitMessageInfo> streamRecord) throws Exception {
        CommitMessageInfo info = (CommitMessageInfo)streamRecord.getValue();
        List messages = this.commitMessages.computeIfAbsent(info.identifier(), k -> new ArrayList());
        messages.add(info.commitMessage());
    }

    public void endInput() throws Exception {
        try (Catalog catalog = FlinkCatalogFactory.createPaimonCatalog(Options.fromMap(this.catalogConfig));){
            for (Map.Entry<Identifier, List<CommitMessage>> entry : this.commitMessages.entrySet()) {
                List<CommitMessage> commitMessages = entry.getValue();
                Table table = catalog.getTable(entry.getKey());
                BatchTableCommit commit = table.newBatchWriteBuilder().withOverwrite().newCommit();
                try {
                    commit.commit(commitMessages);
                }
                finally {
                    if (commit == null) continue;
                    commit.close();
                }
            }
        }
    }
}

