/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.DeltaManifests;
import org.apache.iceberg.flink.sink.DeltaManifestsSerializer;
import org.apache.iceberg.flink.sink.FlinkManifestUtil;
import org.apache.iceberg.flink.sink.ManifestOutputFileFactory;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class IcebergFilesCommitter
extends AbstractStreamOperator<Void>
implements OneInputStreamOperator<WriteResult, Void>,
BoundedOneInput {
    private static final long serialVersionUID = 1L;
    private static final long INITIAL_CHECKPOINT_ID = -1L;
    private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
    private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class);
    private static final String FLINK_JOB_ID = "flink.job-id";
    private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";
    private final TableLoader tableLoader;
    private final boolean replacePartitions;
    private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint = Maps.newTreeMap();
    private final List<WriteResult> writeResultsOfCurrentCkpt = Lists.newArrayList();
    private transient String flinkJobId;
    private transient Table table;
    private transient ManifestOutputFileFactory manifestOutputFileFactory;
    private transient long maxCommittedCheckpointId;
    private static final ListStateDescriptor<String> JOB_ID_DESCRIPTOR = new ListStateDescriptor("iceberg-flink-job-id", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
    private transient ListState<String> jobIdState;
    private static final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR = IcebergFilesCommitter.buildStateDescriptor();
    private transient ListState<SortedMap<Long, byte[]>> checkpointsState;

    IcebergFilesCommitter(TableLoader tableLoader, boolean replacePartitions) {
        this.tableLoader = tableLoader;
        this.replacePartitions = replacePartitions;
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.flinkJobId = this.getContainingTask().getEnvironment().getJobID().toString();
        this.tableLoader.open();
        this.table = this.tableLoader.loadTable();
        int subTaskId = this.getRuntimeContext().getIndexOfThisSubtask();
        int attemptId = this.getRuntimeContext().getAttemptNumber();
        this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(this.table, this.flinkJobId, subTaskId, attemptId);
        this.maxCommittedCheckpointId = -1L;
        this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
        this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
        if (context.isRestored()) {
            String restoredFlinkJobId = (String)((Iterable)this.jobIdState.get()).iterator().next();
            Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId), "Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
            this.maxCommittedCheckpointId = IcebergFilesCommitter.getMaxCommittedCheckpointId(this.table, restoredFlinkJobId);
            NavigableMap<Long, byte[]> uncommittedDataFiles = Maps.newTreeMap((SortedMap)((Iterable)this.checkpointsState.get()).iterator().next()).tailMap(this.maxCommittedCheckpointId, false);
            if (!uncommittedDataFiles.isEmpty()) {
                long maxUncommittedCheckpointId = (Long)uncommittedDataFiles.lastKey();
                this.commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, maxUncommittedCheckpointId);
            }
        }
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        long checkpointId = context.getCheckpointId();
        LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", (Object)this.table, (Object)checkpointId);
        this.dataFilesPerCheckpoint.put(checkpointId, this.writeToManifest(checkpointId));
        this.checkpointsState.clear();
        this.checkpointsState.add(this.dataFilesPerCheckpoint);
        this.jobIdState.clear();
        this.jobIdState.add((Object)this.flinkJobId);
        this.writeResultsOfCurrentCkpt.clear();
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        if (checkpointId > this.maxCommittedCheckpointId) {
            this.commitUpToCheckpoint(this.dataFilesPerCheckpoint, this.flinkJobId, checkpointId);
            this.maxCommittedCheckpointId = checkpointId;
        }
    }

    private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap, String newFlinkJobId, long checkpointId) throws IOException {
        NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
        ArrayList<ManifestFile> manifests = Lists.newArrayList();
        TreeMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
        for (Map.Entry e : pendingMap.entrySet()) {
            if (Arrays.equals(EMPTY_MANIFEST_DATA, (byte[])e.getValue())) continue;
            DeltaManifests deltaManifests = (DeltaManifests)SimpleVersionedSerialization.readVersionAndDeSerialize((SimpleVersionedSerializer)DeltaManifestsSerializer.INSTANCE, (byte[])((byte[])e.getValue()));
            pendingResults.put((Long)e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, this.table.io()));
            manifests.addAll(deltaManifests.manifests());
        }
        if (this.replacePartitions) {
            this.replacePartitions(pendingResults, newFlinkJobId, checkpointId);
        } else {
            this.commitDeltaTxn(pendingResults, newFlinkJobId, checkpointId);
        }
        pendingMap.clear();
        for (ManifestFile manifest : manifests) {
            try {
                this.table.io().deleteFile(manifest.path());
            }
            catch (Exception e) {
                String details = MoreObjects.toStringHelper((Object)this).add("flinkJobId", newFlinkJobId).add("checkpointId", checkpointId).add("manifestPath", manifest.path()).toString();
                LOG.warn("The iceberg transaction has been committed, but we failed to clean the temporary flink manifests: {}", (Object)details, (Object)e);
            }
        }
    }

    private void replacePartitions(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId, long checkpointId) {
        int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
        Preconditions.checkState(deleteFilesNum == 0, "Cannot overwrite partitions with delete files.");
        ReplacePartitions dynamicOverwrite = this.table.newReplacePartitions();
        int numFiles = 0;
        for (WriteResult result : pendingResults.values()) {
            Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files.");
            numFiles += result.dataFiles().length;
            Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
        }
        this.commitOperation(dynamicOverwrite, numFiles, 0, "dynamic partition overwrite", newFlinkJobId, checkpointId);
    }

    private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, String newFlinkJobId, long checkpointId) {
        int deleteFilesNum = pendingResults.values().stream().mapToInt(r -> r.deleteFiles().length).sum();
        if (deleteFilesNum == 0) {
            AppendFiles appendFiles = this.table.newAppend();
            int numFiles = 0;
            for (WriteResult result : pendingResults.values()) {
                Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files.");
                numFiles += result.dataFiles().length;
                Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
            }
            this.commitOperation(appendFiles, numFiles, 0, "append", newFlinkJobId, checkpointId);
        } else {
            for (Map.Entry e : pendingResults.entrySet()) {
                WriteResult result = (WriteResult)e.getValue();
                RowDelta rowDelta = this.table.newRowDelta().validateDataFilesExist(ImmutableList.copyOf(result.referencedDataFiles())).validateDeletedFiles();
                int numDataFiles = result.dataFiles().length;
                Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
                int numDeleteFiles = result.deleteFiles().length;
                Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
                this.commitOperation(rowDelta, numDataFiles, numDeleteFiles, "rowDelta", newFlinkJobId, (Long)e.getKey());
            }
        }
    }

    private void commitOperation(SnapshotUpdate<?> operation, int numDataFiles, int numDeleteFiles, String description, String newFlinkJobId, long checkpointId) {
        LOG.info("Committing {} with {} data files and {} delete files to table {}", new Object[]{description, numDataFiles, numDeleteFiles, this.table});
        operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
        operation.set(FLINK_JOB_ID, newFlinkJobId);
        long start = System.currentTimeMillis();
        operation.commit();
        long duration = System.currentTimeMillis() - start;
        LOG.info("Committed in {} ms", (Object)duration);
    }

    public void processElement(StreamRecord<WriteResult> element) {
        this.writeResultsOfCurrentCkpt.add((WriteResult)element.getValue());
    }

    public void endInput() throws IOException {
        long currentCheckpointId = Long.MAX_VALUE;
        this.dataFilesPerCheckpoint.put(currentCheckpointId, this.writeToManifest(currentCheckpointId));
        this.writeResultsOfCurrentCkpt.clear();
        this.commitUpToCheckpoint(this.dataFilesPerCheckpoint, this.flinkJobId, currentCheckpointId);
    }

    private byte[] writeToManifest(long checkpointId) throws IOException {
        if (this.writeResultsOfCurrentCkpt.isEmpty()) {
            return EMPTY_MANIFEST_DATA;
        }
        WriteResult result = WriteResult.builder().addAll(this.writeResultsOfCurrentCkpt).build();
        DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(result, () -> this.manifestOutputFileFactory.create(checkpointId), this.table.spec());
        return SimpleVersionedSerialization.writeVersionAndSerialize((SimpleVersionedSerializer)DeltaManifestsSerializer.INSTANCE, (Object)deltaManifests);
    }

    public void dispose() throws Exception {
        if (this.tableLoader != null) {
            this.tableLoader.close();
        }
    }

    private static ListStateDescriptor<SortedMap<Long, byte[]>> buildStateDescriptor() {
        Comparator longComparator = Comparators.forType(Types.LongType.get());
        SortedMapTypeInfo sortedMapTypeInfo = new SortedMapTypeInfo((TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, (TypeInformation)PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, longComparator);
        return new ListStateDescriptor("iceberg-files-committer-state", (TypeInformation)sortedMapTypeInfo);
    }

    static long getMaxCommittedCheckpointId(Table table, String flinkJobId) {
        Snapshot snapshot = table.currentSnapshot();
        long lastCommittedCheckpointId = -1L;
        while (snapshot != null) {
            String value;
            Map<String, String> summary = snapshot.summary();
            String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
            if (flinkJobId.equals(snapshotFlinkJobId) && (value = summary.get(MAX_COMMITTED_CHECKPOINT_ID)) != null) {
                lastCommittedCheckpointId = Long.parseLong(value);
                break;
            }
            Long parentSnapshotId = snapshot.parentId();
            snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) : null;
        }
        return lastCommittedCheckpointId;
    }
}

