/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.Optional;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.sink.BucketsRowChannelComputer;
import org.apache.paimon.flink.sink.CompactorSink;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;

public class CompactorSinkBuilder {
    private final FileStoreTable table;
    private DataStream<RowData> input;
    private final boolean fullCompaction;

    public CompactorSinkBuilder(FileStoreTable table, boolean fullCompaction) {
        this.table = table;
        this.fullCompaction = fullCompaction;
    }

    public CompactorSinkBuilder withInput(DataStream<RowData> input) {
        this.input = input;
        return this;
    }

    public DataStreamSink<?> build() {
        BucketMode bucketMode = this.table.bucketMode();
        switch (bucketMode) {
            case HASH_FIXED: 
            case HASH_DYNAMIC: {
                return this.buildForBucketAware();
            }
        }
        throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode);
    }

    private DataStreamSink<?> buildForBucketAware() {
        Integer parallelism = Optional.ofNullable(this.table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key())).map(Integer::valueOf).orElse(null);
        DataStream<RowData> partitioned = FlinkStreamPartitioner.partition(this.input, new BucketsRowChannelComputer(), parallelism);
        return new CompactorSink(this.table, this.fullCompaction).sinkFrom(partitioned);
    }
}

