/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SideOutputDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinarySection;
import org.apache.paimon.flink.action.TableActionBase;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.utils.BoundedOneInputOperator;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.operation.ListUnexistingFiles;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.utils.Preconditions;

public class RemoveUnexistingFilesAction
extends TableActionBase {
    private static final OutputTag<String> RESULT_SIDE_OUTPUT = new OutputTag("result-side-output", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
    private boolean dryRun = false;
    @Nullable
    private Integer parallelism = null;

    public RemoveUnexistingFilesAction(String databaseName, String tableName, Map<String, String> catalogConfig) {
        super(databaseName, tableName, catalogConfig);
    }

    public RemoveUnexistingFilesAction dryRun() {
        this.dryRun = true;
        return this;
    }

    public RemoveUnexistingFilesAction withParallelism(int parallelism) {
        this.parallelism = parallelism;
        return this;
    }

    @Override
    public void build() throws Exception {
        this.buildDataStream();
    }

    public DataStream<String> buildDataStream() throws Exception {
        FileStoreTable fileStoreTable = (FileStoreTable)this.table;
        List<BinaryRow> binaryPartitions = fileStoreTable.newScan().listPartitions();
        SingleOutputStreamOperator source = this.env.fromData((Collection)binaryPartitions.stream().map(BinarySection::toBytes).collect(Collectors.toList()), (TypeInformation)PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO).name("Remove Unexisting Files Source").forceNonParallel();
        SingleOutputStreamOperator worker = source.transform("Remove Unexisting Files Worker", (TypeInformation)new CommittableTypeInfo(), (OneInputStreamOperator)new WorkerOperator(fileStoreTable));
        if (this.parallelism != null) {
            worker = worker.setParallelism(Math.min(this.parallelism, binaryPartitions.size()));
        }
        SideOutputDataStream result = worker.getSideOutput(RESULT_SIDE_OUTPUT);
        if (this.dryRun) {
            return result;
        }
        worker.transform("Global Committer : " + this.table.name(), (TypeInformation)new CommittableTypeInfo(), (OneInputStreamOperator)new CommitOperator(fileStoreTable)).forceNonParallel();
        return result;
    }

    @Override
    public void run() throws Exception {
        this.build();
        this.env.execute("Remove Unexisting Files : " + this.table.name());
    }

    private static class CommitOperator
    extends BoundedOneInputOperator<Committable, Committable> {
        private static final long serialVersionUID = 1L;
        private final FileStoreTable table;
        private transient List<CommitMessage> commitMessages;
        private transient TableCommitImpl commit;

        private CommitOperator(FileStoreTable table) {
            this.table = table;
        }

        public void open() throws Exception {
            this.commitMessages = new ArrayList<CommitMessage>();
            this.commit = this.table.newCommit(UUID.randomUUID().toString());
        }

        public void processElement(StreamRecord<Committable> record) throws Exception {
            Committable committable = (Committable)record.getValue();
            Preconditions.checkArgument(committable.kind() == Committable.Kind.FILE, "Committable has kind " + committable.kind() + ". This is unexpected!");
            this.commitMessages.add((CommitMessage)committable.wrappedCommittable());
        }

        public void endInput() throws Exception {
            try {
                this.commit.commit(Long.MAX_VALUE, this.commitMessages);
            }
            catch (Exception e) {
                LOG.warn("Commit failed due to exception. Consider running this action or procedure again.", (Throwable)e);
            }
        }

        public void close() throws Exception {
            if (this.commit != null) {
                this.commit.close();
            }
        }
    }

    private static class WorkerOperator
    extends BoundedOneInputOperator<byte[], Committable> {
        private static final long serialVersionUID = 1L;
        private final FileStoreTable table;
        private transient ListUnexistingFiles operation;
        private transient BinaryRow reuse;

        private WorkerOperator(FileStoreTable table) {
            this.table = table;
        }

        public void open() throws Exception {
            this.operation = new ListUnexistingFiles(this.table);
            this.reuse = new BinaryRow(this.table.schema().partitionKeys().size());
        }

        public void processElement(StreamRecord<byte[]> record) throws Exception {
            byte[] bytes = (byte[])record.getValue();
            this.reuse.pointTo(MemorySegment.wrap(bytes), 0, bytes.length);
            Map<Integer, Map<String, DataFileMeta>> toDelete = this.operation.list(this.reuse);
            for (Map.Entry<Integer, Map<String, DataFileMeta>> entry : toDelete.entrySet()) {
                CommitMessageImpl message = new CommitMessageImpl(this.reuse, entry.getKey(), this.table.coreOptions().bucket(), new DataIncrement(Collections.emptyList(), new ArrayList<DataFileMeta>(entry.getValue().values()), Collections.emptyList()), CompactIncrement.emptyIncrement());
                this.output.collect((Object)new StreamRecord((Object)new Committable(Long.MAX_VALUE, Committable.Kind.FILE, message)));
                for (String path : entry.getValue().keySet()) {
                    this.output.collect(RESULT_SIDE_OUTPUT, new StreamRecord((Object)path));
                }
            }
        }

        public void endInput() throws Exception {
        }
    }
}

