/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.SnapshotUpdate;
import org.apache.iceberg.Table;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.sink.CommitSummary;
import org.apache.iceberg.flink.sink.DeltaManifests;
import org.apache.iceberg.flink.sink.DeltaManifestsSerializer;
import org.apache.iceberg.flink.sink.FlinkManifestUtil;
import org.apache.iceberg.flink.sink.FlinkWriteResult;
import org.apache.iceberg.flink.sink.IcebergFilesCommitterMetrics;
import org.apache.iceberg.flink.sink.ManifestOutputFileFactory;
import org.apache.iceberg.flink.sink.SinkUtil;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.base.Strings;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.ThreadPools;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class IcebergFilesCommitter
extends AbstractStreamOperator<Void>
implements OneInputStreamOperator<FlinkWriteResult, Void>,
BoundedOneInput {
    private static final long serialVersionUID = 1L;
    private static final long INITIAL_CHECKPOINT_ID = -1L;
    private static final byte[] EMPTY_MANIFEST_DATA = new byte[0];
    private static final Logger LOG = LoggerFactory.getLogger(IcebergFilesCommitter.class);
    private static final String FLINK_JOB_ID = "flink.job-id";
    private static final String OPERATOR_ID = "flink.operator-id";
    private static final String MAX_COMMITTED_CHECKPOINT_ID = "flink.max-committed-checkpoint-id";
    static final String MAX_CONTINUOUS_EMPTY_COMMITS = "flink.max-continuous-empty-commits";
    private final TableLoader tableLoader;
    private final boolean replacePartitions;
    private final Map<String, String> snapshotProperties;
    private final NavigableMap<Long, byte[]> dataFilesPerCheckpoint = Maps.newTreeMap();
    private final Map<Long, List<WriteResult>> writeResultsSinceLastSnapshot = Maps.newHashMap();
    private final String branch;
    private transient String flinkJobId;
    private transient String operatorUniqueId;
    private transient Table table;
    private transient IcebergFilesCommitterMetrics committerMetrics;
    private transient ManifestOutputFileFactory manifestOutputFileFactory;
    private transient long maxCommittedCheckpointId;
    private transient int continuousEmptyCheckpoints;
    private transient int maxContinuousEmptyCommits;
    private static final ListStateDescriptor<String> JOB_ID_DESCRIPTOR = new ListStateDescriptor("iceberg-flink-job-id", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
    private transient ListState<String> jobIdState;
    private static final ListStateDescriptor<SortedMap<Long, byte[]>> STATE_DESCRIPTOR = IcebergFilesCommitter.buildStateDescriptor();
    private transient ListState<SortedMap<Long, byte[]>> checkpointsState;
    private final Integer workerPoolSize;
    private final PartitionSpec spec;
    private transient ExecutorService workerPool;

    IcebergFilesCommitter(StreamOperatorParameters<Void> parameters, TableLoader tableLoader, boolean replacePartitions, Map<String, String> snapshotProperties, Integer workerPoolSize, String branch, PartitionSpec spec) {
        super(parameters);
        this.tableLoader = tableLoader;
        this.replacePartitions = replacePartitions;
        this.snapshotProperties = snapshotProperties;
        this.workerPoolSize = workerPoolSize;
        this.branch = branch;
        this.spec = spec;
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        this.flinkJobId = this.getContainingTask().getEnvironment().getJobID().toString();
        this.operatorUniqueId = this.getRuntimeContext().getOperatorUniqueID();
        this.tableLoader.open();
        this.table = this.tableLoader.loadTable();
        this.committerMetrics = new IcebergFilesCommitterMetrics((MetricGroup)this.metrics, this.table.name());
        this.maxContinuousEmptyCommits = PropertyUtil.propertyAsInt(this.table.properties(), MAX_CONTINUOUS_EMPTY_COMMITS, 10);
        Preconditions.checkArgument(this.maxContinuousEmptyCommits > 0, "flink.max-continuous-empty-commits must be positive");
        int subTaskId = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
        int attemptId = this.getRuntimeContext().getTaskInfo().getAttemptNumber();
        this.manifestOutputFileFactory = FlinkManifestUtil.createOutputFileFactory(() -> this.table, this.table.properties(), this.flinkJobId, this.operatorUniqueId, subTaskId, attemptId);
        this.maxCommittedCheckpointId = -1L;
        this.checkpointsState = context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
        this.jobIdState = context.getOperatorStateStore().getListState(JOB_ID_DESCRIPTOR);
        if (context.isRestored()) {
            Iterable jobIdIterable = (Iterable)this.jobIdState.get();
            if (jobIdIterable == null || !jobIdIterable.iterator().hasNext()) {
                LOG.warn("Failed to restore committer state. This can happen when operator uid changed and Flink allowNonRestoredState is enabled. Best practice is to explicitly set the operator id via FlinkSink#Builder#uidPrefix() so that the committer operator uid is stable. Otherwise, Flink auto generate an operator uid based on job topology.With that, operator uid is subjective to change upon topology change.");
                return;
            }
            String restoredFlinkJobId = (String)jobIdIterable.iterator().next();
            Preconditions.checkState(!Strings.isNullOrEmpty(restoredFlinkJobId), "Flink job id parsed from checkpoint snapshot shouldn't be null or empty");
            this.maxCommittedCheckpointId = SinkUtil.getMaxCommittedCheckpointId(this.table, restoredFlinkJobId, this.operatorUniqueId, this.branch);
            NavigableMap<Long, byte[]> uncommittedDataFiles = Maps.newTreeMap((SortedMap)((Iterable)this.checkpointsState.get()).iterator().next()).tailMap(this.maxCommittedCheckpointId, false);
            if (!uncommittedDataFiles.isEmpty()) {
                long maxUncommittedCheckpointId = (Long)uncommittedDataFiles.lastKey();
                this.commitUpToCheckpoint(uncommittedDataFiles, restoredFlinkJobId, this.operatorUniqueId, maxUncommittedCheckpointId);
            }
        }
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        long checkpointId = context.getCheckpointId();
        LOG.info("Start to flush snapshot state to state backend, table: {}, checkpointId: {}", (Object)this.table, (Object)checkpointId);
        long startNano = System.nanoTime();
        this.writeToManifestUptoLatestCheckpoint(checkpointId);
        this.checkpointsState.clear();
        this.checkpointsState.add(this.dataFilesPerCheckpoint);
        this.jobIdState.clear();
        this.jobIdState.add((Object)this.flinkJobId);
        this.committerMetrics.checkpointDuration(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano));
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        if (checkpointId > this.maxCommittedCheckpointId) {
            LOG.info("Checkpoint {} completed. Attempting commit.", (Object)checkpointId);
            this.commitUpToCheckpoint(this.dataFilesPerCheckpoint, this.flinkJobId, this.operatorUniqueId, checkpointId);
            this.maxCommittedCheckpointId = checkpointId;
        } else {
            LOG.info("Skipping committing checkpoint {}. {} is already committed.", (Object)checkpointId, (Object)this.maxCommittedCheckpointId);
        }
        this.table = this.tableLoader.loadTable();
    }

    private void commitUpToCheckpoint(NavigableMap<Long, byte[]> deltaManifestsMap, String newFlinkJobId, String operatorId, long checkpointId) throws IOException {
        NavigableMap<Long, byte[]> pendingMap = deltaManifestsMap.headMap(checkpointId, true);
        ArrayList<ManifestFile> manifests = Lists.newArrayList();
        TreeMap<Long, WriteResult> pendingResults = Maps.newTreeMap();
        for (Map.Entry e : pendingMap.entrySet()) {
            if (Arrays.equals(EMPTY_MANIFEST_DATA, (byte[])e.getValue())) continue;
            DeltaManifests deltaManifests = (DeltaManifests)SimpleVersionedSerialization.readVersionAndDeSerialize((SimpleVersionedSerializer)DeltaManifestsSerializer.INSTANCE, (byte[])((byte[])e.getValue()));
            pendingResults.put((Long)e.getKey(), FlinkManifestUtil.readCompletedFiles(deltaManifests, this.table.io(), this.table.specs()));
            manifests.addAll(deltaManifests.manifests());
        }
        CommitSummary summary = new CommitSummary(pendingResults);
        this.commitPendingResult(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
        this.committerMetrics.updateCommitSummary(summary);
        pendingMap.clear();
        FlinkManifestUtil.deleteCommittedManifests(this.table, manifests, newFlinkJobId, checkpointId);
    }

    private void commitPendingResult(NavigableMap<Long, WriteResult> pendingResults, CommitSummary summary, String newFlinkJobId, String operatorId, long checkpointId) {
        long totalFiles = summary.dataFilesCount() + summary.deleteFilesCount();
        int n = this.continuousEmptyCheckpoints = totalFiles == 0L ? this.continuousEmptyCheckpoints + 1 : 0;
        if (totalFiles != 0L || this.continuousEmptyCheckpoints % this.maxContinuousEmptyCommits == 0) {
            if (this.replacePartitions) {
                this.replacePartitions(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
            } else {
                this.commitDeltaTxn(pendingResults, summary, newFlinkJobId, operatorId, checkpointId);
            }
            this.continuousEmptyCheckpoints = 0;
        } else {
            LOG.info("Skip commit for checkpoint {} due to no data files or delete files.", (Object)checkpointId);
        }
    }

    private void replacePartitions(NavigableMap<Long, WriteResult> pendingResults, CommitSummary summary, String newFlinkJobId, String operatorId, long checkpointId) {
        Preconditions.checkState(summary.deleteFilesCount() == 0L, "Cannot overwrite partitions with delete files.");
        ReplacePartitions dynamicOverwrite = (ReplacePartitions)this.table.newReplacePartitions().scanManifestsWith(this.workerPool);
        for (WriteResult result : pendingResults.values()) {
            Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files.");
            Arrays.stream(result.dataFiles()).forEach(dynamicOverwrite::addFile);
        }
        this.commitOperation(dynamicOverwrite, summary, "dynamic partition overwrite", newFlinkJobId, operatorId, checkpointId);
    }

    private void commitDeltaTxn(NavigableMap<Long, WriteResult> pendingResults, CommitSummary summary, String newFlinkJobId, String operatorId, long checkpointId) {
        if (summary.deleteFilesCount() == 0L) {
            AppendFiles appendFiles = (AppendFiles)this.table.newAppend().scanManifestsWith(this.workerPool);
            for (WriteResult result : pendingResults.values()) {
                Preconditions.checkState(result.referencedDataFiles().length == 0, "Should have no referenced data files for append.");
                Arrays.stream(result.dataFiles()).forEach(appendFiles::appendFile);
            }
            this.commitOperation(appendFiles, summary, "append", newFlinkJobId, operatorId, checkpointId);
        } else {
            for (Map.Entry e : pendingResults.entrySet()) {
                WriteResult result = (WriteResult)e.getValue();
                RowDelta rowDelta = (RowDelta)this.table.newRowDelta().scanManifestsWith(this.workerPool);
                Arrays.stream(result.dataFiles()).forEach(rowDelta::addRows);
                Arrays.stream(result.deleteFiles()).forEach(rowDelta::addDeletes);
                this.commitOperation(rowDelta, summary, "rowDelta", newFlinkJobId, operatorId, (Long)e.getKey());
            }
        }
    }

    private void commitOperation(SnapshotUpdate<?> operation, CommitSummary summary, String description, String newFlinkJobId, String operatorId, long checkpointId) {
        LOG.info("Committing {} for checkpoint {} to table {} branch {} with summary: {}", new Object[]{description, checkpointId, this.table.name(), this.branch, summary});
        this.snapshotProperties.forEach(operation::set);
        operation.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
        operation.set(FLINK_JOB_ID, newFlinkJobId);
        operation.set(OPERATOR_ID, operatorId);
        operation.toBranch(this.branch);
        long startNano = System.nanoTime();
        operation.commit();
        long durationMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNano);
        LOG.info("Committed {} to table: {}, branch: {}, checkpointId {} in {} ms", new Object[]{description, this.table.name(), this.branch, checkpointId, durationMs});
        this.committerMetrics.commitDuration(durationMs);
    }

    public void processElement(StreamRecord<FlinkWriteResult> element) {
        FlinkWriteResult flinkWriteResult = (FlinkWriteResult)element.getValue();
        List writeResults = this.writeResultsSinceLastSnapshot.computeIfAbsent(flinkWriteResult.checkpointId(), k -> Lists.newArrayList());
        writeResults.add(flinkWriteResult.writeResult());
    }

    public void endInput() throws IOException {
        long currentCheckpointId = Long.MAX_VALUE;
        this.writeToManifestUptoLatestCheckpoint(currentCheckpointId);
        this.commitUpToCheckpoint(this.dataFilesPerCheckpoint, this.flinkJobId, this.operatorUniqueId, currentCheckpointId);
    }

    private void writeToManifestUptoLatestCheckpoint(long checkpointId) throws IOException {
        if (!this.writeResultsSinceLastSnapshot.containsKey(checkpointId)) {
            this.dataFilesPerCheckpoint.put(checkpointId, EMPTY_MANIFEST_DATA);
        }
        for (Map.Entry<Long, List<WriteResult>> writeResultsOfCheckpoint : this.writeResultsSinceLastSnapshot.entrySet()) {
            this.dataFilesPerCheckpoint.put(writeResultsOfCheckpoint.getKey(), this.writeToManifest(writeResultsOfCheckpoint.getKey(), writeResultsOfCheckpoint.getValue()));
        }
        this.writeResultsSinceLastSnapshot.clear();
    }

    private byte[] writeToManifest(long checkpointId, List<WriteResult> writeResults) throws IOException {
        WriteResult result = WriteResult.builder().addAll(writeResults).build();
        DeltaManifests deltaManifests = FlinkManifestUtil.writeCompletedFiles(result, () -> this.manifestOutputFileFactory.create(checkpointId), this.spec);
        return SimpleVersionedSerialization.writeVersionAndSerialize((SimpleVersionedSerializer)DeltaManifestsSerializer.INSTANCE, (Object)deltaManifests);
    }

    public void open() throws Exception {
        super.open();
        String operatorID = this.getRuntimeContext().getOperatorUniqueID();
        this.workerPool = ThreadPools.newFixedThreadPool("iceberg-worker-pool-" + operatorID, this.workerPoolSize);
    }

    public void close() throws Exception {
        if (this.tableLoader != null) {
            this.tableLoader.close();
        }
        if (this.workerPool != null) {
            this.workerPool.shutdown();
        }
    }

    @VisibleForTesting
    static ListStateDescriptor<SortedMap<Long, byte[]>> buildStateDescriptor() {
        Comparator longComparator = Comparators.forType(Types.LongType.get());
        SortedMapTypeInfo sortedMapTypeInfo = new SortedMapTypeInfo((TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, (TypeInformation)PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO, longComparator);
        return new ListStateDescriptor("iceberg-files-committer-state", (TypeInformation)sortedMapTypeInfo);
    }
}

