/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.maintenance.api;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.util.OutputTag;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.Schema;
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.flink.maintenance.api.MaintenanceTaskBuilder;
import org.apache.iceberg.flink.maintenance.api.TaskResult;
import org.apache.iceberg.flink.maintenance.api.Trigger;
import org.apache.iceberg.flink.maintenance.operator.DeleteFilesProcessor;
import org.apache.iceberg.flink.maintenance.operator.FileNameReader;
import org.apache.iceberg.flink.maintenance.operator.FileUriKeySelector;
import org.apache.iceberg.flink.maintenance.operator.ListFileSystemFiles;
import org.apache.iceberg.flink.maintenance.operator.ListMetadataFiles;
import org.apache.iceberg.flink.maintenance.operator.MetadataTablePlanner;
import org.apache.iceberg.flink.maintenance.operator.OrphanFilesDetector;
import org.apache.iceberg.flink.maintenance.operator.SkipOnError;
import org.apache.iceberg.flink.maintenance.operator.TaskResultAggregator;
import org.apache.iceberg.flink.source.ScanContext;
import org.apache.iceberg.relocated.com.google.common.base.Splitter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.ThreadPools;

public class DeleteOrphanFiles {
    private static final Schema FILE_PATH_SCHEMA = new Schema(DataFile.FILE_PATH);
    private static final ScanContext FILE_PATH_SCAN_CONTEXT = ScanContext.builder().streaming(true).project(FILE_PATH_SCHEMA).build();
    private static final Splitter COMMA_SPLITTER = Splitter.on(",");
    @Internal
    public static final OutputTag<Exception> ERROR_STREAM = new OutputTag("error-stream", TypeInformation.of(Exception.class));
    static final String PLANNER_TASK_NAME = "Table Planner";
    static final String READER_TASK_NAME = "Files Reader";
    static final String FILESYSTEM_FILES_TASK_NAME = "Filesystem Files";
    static final String METADATA_FILES_TASK_NAME = "List metadata Files";
    static final String DELETE_FILES_TASK_NAME = "Delete File";
    static final String AGGREGATOR_TASK_NAME = "Orphan Files Aggregator";
    static final String FILTER_FILES_TASK_NAME = "Filter File";
    static final String SKIP_ON_ERROR_TASK_NAME = "Skip On Error";

    public static Builder builder() {
        return new Builder();
    }

    private DeleteOrphanFiles() {
    }

    private static Map<String, String> flattenMap(Map<String, String> map) {
        HashMap<String, String> flattenedMap = Maps.newHashMap();
        if (map != null) {
            for (String key : map.keySet()) {
                String value = map.get(key);
                for (String splitKey : COMMA_SPLITTER.split(key)) {
                    flattenedMap.put(splitKey.trim(), value.trim());
                }
            }
        }
        return flattenedMap;
    }

    public static class Builder
    extends MaintenanceTaskBuilder<Builder> {
        private String location;
        private Duration minAge = Duration.ofDays(3L);
        private int planningWorkerPoolSize = ThreadPools.WORKER_THREAD_POOL_SIZE;
        private int deleteBatchSize = 1000;
        private boolean usePrefixListing = false;
        private Map<String, String> equalSchemes = Maps.newHashMap(ImmutableMap.of("s3n", "s3", "s3a", "s3"));
        private final Map<String, String> equalAuthorities = Maps.newHashMap();
        private DeleteOrphanFiles.PrefixMismatchMode prefixMismatchMode = DeleteOrphanFiles.PrefixMismatchMode.ERROR;

        @Override
        String maintenanceTaskName() {
            return "DeleteOrphanFiles";
        }

        public Builder location(String newLocation) {
            this.location = newLocation;
            return this;
        }

        public Builder usePrefixListing(boolean newUsePrefixListing) {
            this.usePrefixListing = newUsePrefixListing;
            return this;
        }

        public Builder prefixMismatchMode(DeleteOrphanFiles.PrefixMismatchMode newPrefixMismatchMode) {
            this.prefixMismatchMode = newPrefixMismatchMode;
            return this;
        }

        public Builder minAge(Duration newMinAge) {
            this.minAge = newMinAge;
            return this;
        }

        public Builder planningWorkerPoolSize(int newPlanningWorkerPoolSize) {
            this.planningWorkerPoolSize = newPlanningWorkerPoolSize;
            return this;
        }

        public Builder equalSchemes(Map<String, String> newEqualSchemes) {
            this.equalSchemes.putAll(DeleteOrphanFiles.flattenMap(newEqualSchemes));
            return this;
        }

        public Builder equalAuthorities(Map<String, String> newEqualAuthorities) {
            this.equalAuthorities.putAll(DeleteOrphanFiles.flattenMap(newEqualAuthorities));
            return this;
        }

        public Builder deleteBatchSize(int newDeleteBatchSize) {
            this.deleteBatchSize = newDeleteBatchSize;
            return this;
        }

        @Override
        DataStream<TaskResult> append(DataStream<Trigger> trigger) {
            this.tableLoader().open();
            SingleOutputStreamOperator splits = trigger.process((ProcessFunction)new MetadataTablePlanner(this.taskName(), this.index(), this.tableLoader(), FILE_PATH_SCAN_CONTEXT, MetadataTableType.ALL_FILES, this.planningWorkerPoolSize)).name(this.operatorName(DeleteOrphanFiles.PLANNER_TASK_NAME)).uid(DeleteOrphanFiles.PLANNER_TASK_NAME + this.uidSuffix()).slotSharingGroup(this.slotSharingGroup()).forceNonParallel();
            SingleOutputStreamOperator tableDataFiles = splits.rebalance().process((ProcessFunction)new FileNameReader(this.taskName(), this.index(), this.tableLoader(), FILE_PATH_SCHEMA, FILE_PATH_SCAN_CONTEXT, MetadataTableType.ALL_FILES)).name(this.operatorName(DeleteOrphanFiles.READER_TASK_NAME)).uid(DeleteOrphanFiles.READER_TASK_NAME + this.uidSuffix()).slotSharingGroup(this.slotSharingGroup()).setParallelism(this.parallelism().intValue());
            SingleOutputStreamOperator tableMetadataFiles = trigger.process((ProcessFunction)new ListMetadataFiles(this.taskName(), this.index(), this.tableLoader())).name(this.operatorName(DeleteOrphanFiles.METADATA_FILES_TASK_NAME)).uid(DeleteOrphanFiles.METADATA_FILES_TASK_NAME + this.uidSuffix()).slotSharingGroup(this.slotSharingGroup()).forceNonParallel();
            SingleOutputStreamOperator allFsFiles = trigger.process((ProcessFunction)new ListFileSystemFiles(this.taskName(), this.index(), this.tableLoader(), this.location, this.minAge.toMillis(), this.usePrefixListing)).name(this.operatorName(DeleteOrphanFiles.FILESYSTEM_FILES_TASK_NAME)).uid(DeleteOrphanFiles.FILESYSTEM_FILES_TASK_NAME + this.uidSuffix()).slotSharingGroup(this.slotSharingGroup()).forceNonParallel();
            SingleOutputStreamOperator filesToDelete = tableMetadataFiles.union(new DataStream[]{tableDataFiles}).keyBy((KeySelector)new FileUriKeySelector(this.equalSchemes, this.equalAuthorities)).connect((DataStream)allFsFiles.keyBy((KeySelector)new FileUriKeySelector(this.equalSchemes, this.equalAuthorities))).process((KeyedCoProcessFunction)new OrphanFilesDetector(this.prefixMismatchMode, this.equalSchemes, this.equalAuthorities)).slotSharingGroup(this.slotSharingGroup()).name(this.operatorName(DeleteOrphanFiles.FILTER_FILES_TASK_NAME)).uid(DeleteOrphanFiles.FILTER_FILES_TASK_NAME + this.uidSuffix()).setParallelism(this.parallelism().intValue());
            DataStream errorStream = tableMetadataFiles.getSideOutput(ERROR_STREAM).union(new DataStream[]{allFsFiles.getSideOutput(ERROR_STREAM), tableDataFiles.getSideOutput(ERROR_STREAM), splits.getSideOutput(ERROR_STREAM), filesToDelete.getSideOutput(ERROR_STREAM)});
            SingleOutputStreamOperator filesOrSkip = filesToDelete.connect(errorStream).transform(this.operatorName(DeleteOrphanFiles.SKIP_ON_ERROR_TASK_NAME), TypeInformation.of(String.class), (TwoInputStreamOperator)new SkipOnError()).uid(DeleteOrphanFiles.SKIP_ON_ERROR_TASK_NAME + this.uidSuffix()).slotSharingGroup(this.slotSharingGroup()).forceNonParallel();
            filesOrSkip.rebalance().transform(this.operatorName(DeleteOrphanFiles.DELETE_FILES_TASK_NAME), TypeInformation.of(Void.class), (OneInputStreamOperator)new DeleteFilesProcessor(this.tableLoader().loadTable(), this.taskName(), this.index(), this.deleteBatchSize)).uid(DeleteOrphanFiles.DELETE_FILES_TASK_NAME + this.uidSuffix()).slotSharingGroup(this.slotSharingGroup()).setParallelism(this.parallelism().intValue());
            return trigger.connect(errorStream).transform(this.operatorName(DeleteOrphanFiles.AGGREGATOR_TASK_NAME), TypeInformation.of(TaskResult.class), (TwoInputStreamOperator)new TaskResultAggregator(this.tableName(), this.taskName(), this.index())).uid(DeleteOrphanFiles.AGGREGATOR_TASK_NAME + this.uidSuffix()).slotSharingGroup(this.slotSharingGroup()).forceNonParallel();
        }
    }
}

