/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.maintenance.api;

import java.time.Duration;
import java.util.Map;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.flink.maintenance.api.MaintenanceTaskBuilder;
import org.apache.iceberg.flink.maintenance.api.RewriteDataFilesConfig;
import org.apache.iceberg.flink.maintenance.api.TaskResult;
import org.apache.iceberg.flink.maintenance.api.Trigger;
import org.apache.iceberg.flink.maintenance.operator.DataFileRewriteCommitter;
import org.apache.iceberg.flink.maintenance.operator.DataFileRewritePlanner;
import org.apache.iceberg.flink.maintenance.operator.DataFileRewriteRunner;
import org.apache.iceberg.flink.maintenance.operator.TaskResultAggregator;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

public class RewriteDataFiles {
    static final String PLANNER_TASK_NAME = "RDF Planner";
    static final String REWRITE_TASK_NAME = "Rewrite";
    static final String COMMIT_TASK_NAME = "Rewrite commit";
    static final String AGGREGATOR_TASK_NAME = "Rewrite aggregator";

    private RewriteDataFiles() {
    }

    public static Builder builder() {
        return new Builder();
    }

    public static class Builder
    extends MaintenanceTaskBuilder<Builder> {
        private boolean partialProgressEnabled = false;
        private int partialProgressMaxCommits = 10;
        private final Map<String, String> rewriteOptions = Maps.newHashMapWithExpectedSize(6);
        private long maxRewriteBytes = Long.MAX_VALUE;
        private Expression filter = Expressions.alwaysTrue();

        @Override
        String maintenanceTaskName() {
            return "RewriteDataFiles";
        }

        public Builder partialProgressEnabled(boolean newPartialProgressEnabled) {
            this.partialProgressEnabled = newPartialProgressEnabled;
            return this;
        }

        public Builder partialProgressMaxCommits(int newPartialProgressMaxCommits) {
            this.partialProgressMaxCommits = newPartialProgressMaxCommits;
            return this;
        }

        public Builder maxRewriteBytes(long newMaxRewriteBytes) {
            this.maxRewriteBytes = newMaxRewriteBytes;
            return this;
        }

        public Builder targetFileSizeBytes(long targetFileSizeBytes) {
            this.rewriteOptions.put("target-file-size-bytes", String.valueOf(targetFileSizeBytes));
            return this;
        }

        public Builder minFileSizeBytes(long minFileSizeBytes) {
            this.rewriteOptions.put("min-file-size-bytes", String.valueOf(minFileSizeBytes));
            return this;
        }

        public Builder maxFileSizeBytes(long maxFileSizeBytes) {
            this.rewriteOptions.put("max-file-size-bytes", String.valueOf(maxFileSizeBytes));
            return this;
        }

        public Builder minInputFiles(int minInputFiles) {
            this.rewriteOptions.put("min-input-files", String.valueOf(minInputFiles));
            return this;
        }

        public Builder deleteFileThreshold(int deleteFileThreshold) {
            this.rewriteOptions.put("delete-file-threshold", String.valueOf(deleteFileThreshold));
            return this;
        }

        public Builder rewriteAll(boolean rewriteAll) {
            this.rewriteOptions.put("rewrite-all", String.valueOf(rewriteAll));
            return this;
        }

        public Builder maxFileGroupSizeBytes(long maxFileGroupSizeBytes) {
            this.rewriteOptions.put("max-file-group-size-bytes", String.valueOf(maxFileGroupSizeBytes));
            return this;
        }

        public Builder maxFilesToRewrite(int maxFilesToRewrite) {
            this.rewriteOptions.put("max-files-to-rewrite", String.valueOf(maxFilesToRewrite));
            return this;
        }

        public Builder filter(Expression newFilter) {
            this.filter = newFilter;
            return this;
        }

        public Builder config(RewriteDataFilesConfig rewriteDataFilesConfig) {
            ((Builder)((Builder)((Builder)this.partialProgressEnabled(rewriteDataFilesConfig.partialProgressEnable()).partialProgressMaxCommits(rewriteDataFilesConfig.partialProgressMaxCommits()).maxRewriteBytes(rewriteDataFilesConfig.maxRewriteBytes()).scheduleOnCommitCount(rewriteDataFilesConfig.scheduleOnCommitCount())).scheduleOnDataFileCount(rewriteDataFilesConfig.scheduleOnDataFileCount())).scheduleOnDataFileSize(rewriteDataFilesConfig.scheduleOnDataFileSize())).scheduleOnInterval(Duration.ofSeconds(rewriteDataFilesConfig.scheduleOnIntervalSecond()));
            this.rewriteOptions.putAll(rewriteDataFilesConfig.properties());
            return this;
        }

        @Override
        DataStream<TaskResult> append(DataStream<Trigger> trigger) {
            SingleOutputStreamOperator planned = trigger.process((ProcessFunction)new DataFileRewritePlanner(this.tableName(), this.taskName(), this.index(), this.tableLoader(), this.partialProgressEnabled ? this.partialProgressMaxCommits : 1, this.maxRewriteBytes, this.rewriteOptions, this.filter)).name(this.operatorName(RewriteDataFiles.PLANNER_TASK_NAME)).uid(RewriteDataFiles.PLANNER_TASK_NAME + this.uidSuffix()).slotSharingGroup(this.slotSharingGroup()).forceNonParallel();
            SingleOutputStreamOperator rewritten = planned.rebalance().process((ProcessFunction)new DataFileRewriteRunner(this.tableName(), this.taskName(), this.index())).name(this.operatorName(RewriteDataFiles.REWRITE_TASK_NAME)).uid(RewriteDataFiles.REWRITE_TASK_NAME + this.uidSuffix()).slotSharingGroup(this.slotSharingGroup()).setParallelism(this.parallelism().intValue());
            SingleOutputStreamOperator updated = rewritten.transform(this.operatorName(RewriteDataFiles.COMMIT_TASK_NAME), TypeInformation.of(Trigger.class), (OneInputStreamOperator)new DataFileRewriteCommitter(this.tableName(), this.taskName(), this.index(), this.tableLoader())).uid(RewriteDataFiles.COMMIT_TASK_NAME + this.uidSuffix()).slotSharingGroup(this.slotSharingGroup()).forceNonParallel();
            return trigger.union(new DataStream[]{updated}).connect(planned.getSideOutput(TaskResultAggregator.ERROR_STREAM).union(new DataStream[]{rewritten.getSideOutput(TaskResultAggregator.ERROR_STREAM), updated.getSideOutput(TaskResultAggregator.ERROR_STREAM)})).transform(this.operatorName(RewriteDataFiles.AGGREGATOR_TASK_NAME), TypeInformation.of(TaskResult.class), (TwoInputStreamOperator)new TaskResultAggregator(this.tableName(), this.taskName(), this.index())).uid(RewriteDataFiles.AGGREGATOR_TASK_NAME + this.uidSuffix()).slotSharingGroup(this.slotSharingGroup()).forceNonParallel();
        }
    }
}

