/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.Public;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.sink.AdaptiveParallelism;
import org.apache.paimon.flink.sink.DynamicBucketCompactSink;
import org.apache.paimon.flink.sink.FixedBucketSink;
import org.apache.paimon.flink.sink.FlinkSink;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.LocalMergeOperator;
import org.apache.paimon.flink.sink.LogSinkFunction;
import org.apache.paimon.flink.sink.PostponeBucketChannelComputer;
import org.apache.paimon.flink.sink.PostponeBucketSink;
import org.apache.paimon.flink.sink.RowAppendTableSink;
import org.apache.paimon.flink.sink.RowDataChannelComputer;
import org.apache.paimon.flink.sink.RowDataHashPartitionChannelComputer;
import org.apache.paimon.flink.sink.RowDynamicBucketSink;
import org.apache.paimon.flink.sink.index.GlobalDynamicBucketSink;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.utils.InternalTypeInfo;
import org.apache.paimon.flink.utils.ParallelismUtils;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Public
public class FlinkSinkBuilder {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkSinkBuilder.class);
    protected final FileStoreTable table;
    private DataStream<RowData> input;
    @Nullable
    protected Map<String, String> overwritePartition;
    @Nullable
    private Integer parallelism;
    @Nullable
    private TableSortInfo tableSortInfo;
    protected boolean compactSink = false;
    @Nullable
    protected LogSinkFunction logSinkFunction;

    public FlinkSinkBuilder(Table table) {
        if (!(table instanceof FileStoreTable)) {
            throw new UnsupportedOperationException("Unsupported table type: " + table);
        }
        this.table = (FileStoreTable)table;
    }

    public FlinkSinkBuilder forRow(DataStream<Row> input, DataType rowDataType) {
        org.apache.flink.table.types.logical.RowType rowType = (org.apache.flink.table.types.logical.RowType)rowDataType.getLogicalType();
        DataType[] fieldDataTypes = rowDataType.getChildren().toArray(new DataType[0]);
        DataFormatConverters.RowConverter converter = new DataFormatConverters.RowConverter(fieldDataTypes);
        SingleOutputStreamOperator newInput = input.map(arg_0 -> ((DataFormatConverters.RowConverter)converter).toInternal(arg_0)).returns((TypeInformation)org.apache.flink.table.runtime.typeutils.InternalTypeInfo.of((org.apache.flink.table.types.logical.RowType)rowType));
        ParallelismUtils.setParallelism(newInput, input.getParallelism(), false);
        this.input = newInput;
        return this;
    }

    public FlinkSinkBuilder forRowData(DataStream<RowData> input) {
        this.input = input;
        return this;
    }

    public FlinkSinkBuilder overwrite() {
        return this.overwrite(new HashMap<String, String>());
    }

    public FlinkSinkBuilder overwrite(Map<String, String> overwritePartition) {
        this.overwritePartition = overwritePartition;
        return this;
    }

    public FlinkSinkBuilder parallelism(@Nullable Integer parallelism) {
        this.parallelism = parallelism;
        return this;
    }

    public FlinkSinkBuilder clusteringIfPossible(String clusteringColumns, String clusteringStrategy, boolean sortInCluster, int sampleFactor) {
        List<String> columns = CoreOptions.clusteringColumns(clusteringColumns);
        if (columns.isEmpty()) {
            return this;
        }
        Preconditions.checkState(this.input != null, "The input stream should be specified earlier.");
        if (FlinkSink.isStreaming(this.input) || !this.table.bucketMode().equals((Object)BucketMode.BUCKET_UNAWARE)) {
            LOG.warn("Clustering is enabled; however, it has been skipped as it only supports the bucket unaware table without primary keys and BATCH execution mode.");
            return this;
        }
        List<String> fieldNames = this.table.schema().fieldNames();
        Preconditions.checkState(new HashSet<String>(fieldNames).containsAll(new HashSet<String>(columns)), String.format("Field names %s should contains all clustering column names %s.", fieldNames, columns));
        Preconditions.checkState(sampleFactor >= 20, "The minimum allowed " + FlinkConnectorOptions.CLUSTERING_SAMPLE_FACTOR.key() + " is 20.");
        TableSortInfo.Builder sortInfoBuilder = new TableSortInfo.Builder();
        sortInfoBuilder.setSortStrategy(CoreOptions.clusteringStrategy(clusteringStrategy, columns.size()));
        int upstreamParallelism = this.input.getParallelism();
        String sinkParallelismValue = this.table.options().get(FlinkConnectorOptions.SINK_PARALLELISM.key());
        int sinkParallelism = sinkParallelismValue == null ? upstreamParallelism : Integer.parseInt(sinkParallelismValue);
        sortInfoBuilder.setSortColumns(columns).setSortInCluster(sortInCluster).setSinkParallelism(sinkParallelism);
        int globalSampleSize = sinkParallelism * sampleFactor;
        int localSampleSize = upstreamParallelism > 0 ? Math.max(sampleFactor, globalSampleSize / upstreamParallelism) : sinkParallelism * 20;
        this.tableSortInfo = sortInfoBuilder.setRangeNumber(sinkParallelism).setGlobalSampleSize(globalSampleSize).setLocalSampleSize(localSampleSize).build();
        return this;
    }

    public DataStreamSink<?> build() {
        this.setParallelismIfAdaptiveConflict();
        this.input = this.trySortInput(this.input);
        SingleOutputStreamOperator input = FlinkSinkBuilder.mapToInternalRow(this.input, this.table.rowType());
        if (this.table.coreOptions().localMergeEnabled() && this.table.schema().primaryKeys().size() > 0) {
            SingleOutputStreamOperator newInput = input.forward().transform("local merge", input.getType(), (OneInputStreamOperatorFactory)new LocalMergeOperator.Factory(this.table.schema()));
            ParallelismUtils.forwardParallelism(newInput, input);
            input = newInput;
        }
        BucketMode bucketMode = this.table.bucketMode();
        switch (bucketMode) {
            case POSTPONE_MODE: {
                return this.buildPostponeBucketSink((DataStream<InternalRow>)input);
            }
            case HASH_FIXED: {
                return this.buildForFixedBucket((DataStream<InternalRow>)input);
            }
            case HASH_DYNAMIC: {
                return this.buildDynamicBucketSink((DataStream<InternalRow>)input, false);
            }
            case KEY_DYNAMIC: {
                return this.buildDynamicBucketSink((DataStream<InternalRow>)input, true);
            }
            case BUCKET_UNAWARE: {
                return this.buildUnawareBucketSink((DataStream<InternalRow>)input);
            }
        }
        throw new UnsupportedOperationException("Unsupported bucket mode: " + bucketMode);
    }

    public static DataStream<InternalRow> mapToInternalRow(DataStream<RowData> input, RowType rowType) {
        SingleOutputStreamOperator result = input.map(FlinkRowWrapper::new).returns(InternalTypeInfo.fromRowType(rowType));
        ParallelismUtils.forwardParallelism(result, input);
        return result;
    }

    protected DataStreamSink<?> buildDynamicBucketSink(DataStream<InternalRow> input, boolean globalIndex) {
        Preconditions.checkArgument(this.logSinkFunction == null, "Dynamic bucket mode can not work with log system.");
        return this.compactSink && !globalIndex ? new DynamicBucketCompactSink(this.table, this.overwritePartition).build(input, this.parallelism) : (globalIndex ? new GlobalDynamicBucketSink(this.table, this.overwritePartition).build(input, this.parallelism) : new RowDynamicBucketSink(this.table, this.overwritePartition).build(input, this.parallelism));
    }

    protected DataStreamSink<?> buildForFixedBucket(DataStream<InternalRow> input) {
        int bucketNums = this.table.bucketSpec().getNumBuckets();
        if (this.parallelism == null && bucketNums < input.getParallelism() && this.table.partitionKeys().isEmpty()) {
            LOG.warn("For non-partitioned table, if bucketNums is less than the parallelism of inputOperator, then the parallelism of writerOperator will be set to bucketNums.");
            this.parallelism = bucketNums;
        }
        DataStream<InternalRow> partitioned = FlinkStreamPartitioner.partition(input, new RowDataChannelComputer(this.table.schema(), this.logSinkFunction != null), this.parallelism);
        FixedBucketSink sink = new FixedBucketSink(this.table, this.overwritePartition, this.logSinkFunction);
        return sink.sinkFrom(partitioned);
    }

    private DataStreamSink<?> buildPostponeBucketSink(DataStream<InternalRow> input) {
        ChannelComputer<InternalRow> channelComputer = !this.table.partitionKeys().isEmpty() && this.table.coreOptions().partitionSinkStrategy() == CoreOptions.PartitionSinkStrategy.HASH ? new RowDataHashPartitionChannelComputer(this.table.schema()) : new PostponeBucketChannelComputer(this.table.schema());
        DataStream<InternalRow> partitioned = FlinkStreamPartitioner.partition(input, channelComputer, this.parallelism);
        PostponeBucketSink sink = new PostponeBucketSink(this.table, this.overwritePartition);
        return sink.sinkFrom(partitioned);
    }

    private DataStreamSink<?> buildUnawareBucketSink(DataStream<InternalRow> input) {
        Preconditions.checkArgument(this.table.primaryKeys().isEmpty(), "Unaware bucket mode only works with append-only table for now.");
        if (!this.table.partitionKeys().isEmpty() && this.table.coreOptions().partitionSinkStrategy() == CoreOptions.PartitionSinkStrategy.HASH) {
            input = FlinkStreamPartitioner.partition(input, new RowDataHashPartitionChannelComputer(this.table.schema()), this.parallelism);
        }
        return new RowAppendTableSink(this.table, this.overwritePartition, this.logSinkFunction, this.parallelism).sinkFrom(input);
    }

    private DataStream<RowData> trySortInput(DataStream<RowData> input) {
        if (this.tableSortInfo != null) {
            TableSorter sorter = TableSorter.getSorter(input.getExecutionEnvironment(), input, this.table, this.tableSortInfo);
            return sorter.sort();
        }
        return input;
    }

    private void setParallelismIfAdaptiveConflict() {
        try {
            boolean hashDynamicMode;
            boolean parallelismUndefined = this.parallelism == null || this.parallelism == -1;
            boolean isStreaming = FlinkSink.isStreaming(this.input);
            boolean isAdaptiveParallelismEnabled = AdaptiveParallelism.isEnabled(this.input.getExecutionEnvironment());
            boolean bl = hashDynamicMode = this.table.bucketMode() == BucketMode.HASH_DYNAMIC;
            if (parallelismUndefined && !isStreaming && isAdaptiveParallelismEnabled && hashDynamicMode) {
                String parallelismSource;
                ArrayList<String> messages = new ArrayList<String>();
                messages.add("Dynamic Bucket Mode");
                if (this.input.getParallelism() > 0) {
                    parallelismSource = "input parallelism";
                    this.parallelism = this.input.getParallelism();
                } else {
                    parallelismSource = "AdaptiveBatchScheduler's default max parallelism";
                    this.parallelism = AdaptiveParallelism.getDefaultMaxParallelism(this.input.getExecutionEnvironment().getConfiguration(), this.input.getExecutionConfig());
                }
                String msg = String.format("Paimon Sink with %s does not support Flink's Adaptive Parallelism mode. Configuring sink parallelism to `%s` instead. You can also set Paimon `sink.parallelism` manually to override this configuration.", messages, parallelismSource);
                LOG.warn(msg);
            }
        }
        catch (NoClassDefFoundError noClassDefFoundError) {
            // empty catch block
        }
    }
}

