/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Locale;
import java.util.Map;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.IcebergFilesCommitter;
import org.apache.iceberg.flink.sink.IcebergStreamWriter;
import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.util.PropertyUtil;

public class FlinkSink {
    private static final String ICEBERG_STREAM_WRITER_NAME = IcebergStreamWriter.class.getSimpleName();
    private static final String ICEBERG_FILES_COMMITTER_NAME = IcebergFilesCommitter.class.getSimpleName();

    private FlinkSink() {
    }

    public static <T> Builder builderFor(DataStream<T> input, MapFunction<T, RowData> mapper, TypeInformation<RowData> outputType) {
        SingleOutputStreamOperator dataStream = input.map(mapper, outputType);
        return FlinkSink.forRowData((DataStream<RowData>)dataStream);
    }

    public static Builder forRow(DataStream<Row> input, TableSchema tableSchema) {
        RowType rowType = (RowType)tableSchema.toRowDataType().getLogicalType();
        DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
        DataFormatConverters.RowConverter rowConverter = new DataFormatConverters.RowConverter(fieldDataTypes);
        return FlinkSink.builderFor(input, arg_0 -> ((DataFormatConverters.RowConverter)rowConverter).toInternal(arg_0), (TypeInformation<RowData>)RowDataTypeInfo.of((RowType)rowType)).tableSchema(tableSchema);
    }

    public static Builder forRowData(DataStream<RowData> input) {
        return new Builder().forRowData((DataStream<RowData>)input);
    }

    static IcebergStreamWriter<RowData> createStreamWriter(Table table, TableSchema requestedSchema) {
        RowType flinkSchema;
        Preconditions.checkArgument((table != null ? 1 : 0) != 0, (Object)"Iceberg table should't be null");
        if (requestedSchema != null) {
            Schema writeSchema = TypeUtil.reassignIds((Schema)FlinkSchemaUtil.convert(requestedSchema), (Schema)table.schema());
            TypeUtil.validateWriteSchema((Schema)table.schema(), (Schema)writeSchema, (Boolean)true, (Boolean)true);
            flinkSchema = (RowType)requestedSchema.toRowDataType().getLogicalType();
        } else {
            flinkSchema = FlinkSchemaUtil.convert(table.schema());
        }
        Map props = table.properties();
        long targetFileSize = FlinkSink.getTargetFileSizeBytes(props);
        FileFormat fileFormat = FlinkSink.getFileFormat(props);
        RowDataTaskWriterFactory taskWriterFactory = new RowDataTaskWriterFactory(table.schema(), flinkSchema, table.spec(), table.locationProvider(), table.io(), table.encryption(), targetFileSize, fileFormat, props);
        return new IcebergStreamWriter<RowData>(table.name(), taskWriterFactory);
    }

    private static FileFormat getFileFormat(Map<String, String> properties) {
        String formatString = properties.getOrDefault("write.format.default", "parquet");
        return FileFormat.valueOf((String)formatString.toUpperCase(Locale.ENGLISH));
    }

    private static long getTargetFileSizeBytes(Map<String, String> properties) {
        return PropertyUtil.propertyAsLong(properties, (String)"write.target-file-size-bytes", (long)Long.MAX_VALUE);
    }

    public static class Builder {
        private DataStream<RowData> rowDataInput = null;
        private TableLoader tableLoader;
        private Table table;
        private TableSchema tableSchema;
        private boolean overwrite = false;
        private Integer writeParallelism = null;

        private Builder() {
        }

        private Builder forRowData(DataStream<RowData> newRowDataInput) {
            this.rowDataInput = newRowDataInput;
            return this;
        }

        public Builder table(Table newTable) {
            this.table = newTable;
            return this;
        }

        public Builder tableLoader(TableLoader newTableLoader) {
            this.tableLoader = newTableLoader;
            return this;
        }

        public Builder tableSchema(TableSchema newTableSchema) {
            this.tableSchema = newTableSchema;
            return this;
        }

        public Builder overwrite(boolean newOverwrite) {
            this.overwrite = newOverwrite;
            return this;
        }

        public Builder writeParallelism(int newWriteParallelism) {
            this.writeParallelism = newWriteParallelism;
            return this;
        }

        public DataStreamSink<RowData> build() {
            Preconditions.checkArgument((this.rowDataInput != null ? 1 : 0) != 0, (Object)"Please use forRowData() to initialize the input DataStream.");
            Preconditions.checkNotNull((Object)this.tableLoader, (Object)"Table loader shouldn't be null");
            if (this.table == null) {
                this.tableLoader.open();
                try (TableLoader loader = this.tableLoader;){
                    this.table = loader.loadTable();
                }
                catch (IOException e) {
                    throw new UncheckedIOException("Failed to load iceberg table from table loader: " + this.tableLoader, e);
                }
            }
            IcebergStreamWriter<RowData> streamWriter = FlinkSink.createStreamWriter(this.table, this.tableSchema);
            IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(this.tableLoader, this.overwrite);
            this.writeParallelism = this.writeParallelism == null ? this.rowDataInput.getParallelism() : this.writeParallelism.intValue();
            SingleOutputStreamOperator returnStream = this.rowDataInput.transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(DataFile.class), streamWriter).setParallelism(this.writeParallelism.intValue()).transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, (OneInputStreamOperator)filesCommitter).setParallelism(1).setMaxParallelism(1);
            return returnStream.addSink((SinkFunction)new DiscardingSink()).name(String.format("IcebergSink %s", this.table.name())).setParallelism(1);
        }
    }
}

