/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.postpone;

import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.flink.postpone.PostponeBucketCommittableRewriter;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.utils.BoundedOneInputOperator;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessageImpl;

public class RewritePostponeBucketCommittableOperator
extends BoundedOneInputOperator<Committable, Committable> {
    private static final long serialVersionUID = 1L;
    private final FileStoreTable table;
    private transient PostponeBucketCommittableRewriter rewriter;

    public RewritePostponeBucketCommittableOperator(FileStoreTable table) {
        this.table = table;
    }

    public void open() throws Exception {
        this.rewriter = new PostponeBucketCommittableRewriter(this.table);
    }

    public void processElement(StreamRecord<Committable> element) throws Exception {
        Committable committable = (Committable)element.getValue();
        if (committable.kind() != Committable.Kind.FILE) {
            this.output.collect(element);
        }
        this.rewriter.add((CommitMessageImpl)committable.wrappedCommittable());
    }

    public void endInput() throws Exception {
        this.emitAll(Long.MAX_VALUE);
    }

    protected void emitAll(long checkpointId) {
        this.rewriter.emitAll(checkpointId).forEach(c -> this.output.collect((Object)new StreamRecord(c)));
    }
}

