/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.maintenance.operator;

import java.io.IOException;
import java.util.Set;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.actions.RewriteDataFilesCommitManager;
import org.apache.iceberg.actions.RewriteFileGroup;
import org.apache.iceberg.flink.TableLoader;
import org.apache.iceberg.flink.maintenance.api.Trigger;
import org.apache.iceberg.flink.maintenance.operator.DataFileRewriteRunner;
import org.apache.iceberg.flink.maintenance.operator.TableMaintenanceMetrics;
import org.apache.iceberg.flink.maintenance.operator.TaskResultAggregator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class DataFileRewriteCommitter
extends AbstractStreamOperator<Trigger>
implements OneInputStreamOperator<DataFileRewriteRunner.ExecutedGroup, Trigger> {
    private static final Logger LOG = LoggerFactory.getLogger(DataFileRewriteCommitter.class);
    private final String tableName;
    private final String taskName;
    private final int taskIndex;
    private final TableLoader tableLoader;
    private transient Table table;
    private transient RewriteDataFilesCommitManager.CommitService commitService;
    private transient Counter errorCounter;
    private transient Counter addedDataFileNumCounter;
    private transient Counter addedDataFileSizeCounter;
    private transient Counter removedDataFileNumCounter;
    private transient Counter removedDataFileSizeCounter;

    public DataFileRewriteCommitter(String tableName, String taskName, int taskIndex, TableLoader tableLoader) {
        Preconditions.checkNotNull(tableName, "Table name should no be null");
        Preconditions.checkNotNull(taskName, "Task name should no be null");
        Preconditions.checkNotNull(tableLoader, "Table loader should no be null");
        this.tableName = tableName;
        this.taskName = taskName;
        this.taskIndex = taskIndex;
        this.tableLoader = tableLoader;
    }

    public void open() throws Exception {
        super.open();
        this.tableLoader.open();
        this.table = this.tableLoader.loadTable();
        MetricGroup taskMetricGroup = TableMaintenanceMetrics.groupFor((RuntimeContext)this.getRuntimeContext(), this.tableName, this.taskName, this.taskIndex);
        this.errorCounter = taskMetricGroup.counter("error");
        this.addedDataFileNumCounter = taskMetricGroup.counter("addedDataFileNum");
        this.addedDataFileSizeCounter = taskMetricGroup.counter("addedDataFileSize");
        this.removedDataFileNumCounter = taskMetricGroup.counter("removedDataFileNum");
        this.removedDataFileSizeCounter = taskMetricGroup.counter("removedDataFileSize");
    }

    public void processElement(StreamRecord<DataFileRewriteRunner.ExecutedGroup> streamRecord) {
        DataFileRewriteRunner.ExecutedGroup executedGroup = (DataFileRewriteRunner.ExecutedGroup)streamRecord.getValue();
        try {
            if (this.commitService == null) {
                this.table.refresh();
                FlinkRewriteDataFilesCommitManager commitManager = new FlinkRewriteDataFilesCommitManager(this.table, executedGroup.snapshotId(), streamRecord.getTimestamp());
                this.commitService = commitManager.service(executedGroup.groupsPerCommit());
                this.commitService.start();
            }
            this.commitService.offer(executedGroup.group());
        }
        catch (Exception e) {
            LOG.warn("[For table {} with {}[{}] at {}]: Exception processing {}", new Object[]{this.tableName, this.taskName, this.taskIndex, streamRecord.getTimestamp(), executedGroup, e});
            this.output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord((Object)e));
            this.errorCounter.inc();
        }
    }

    public void processWatermark(Watermark mark) throws Exception {
        try {
            if (this.commitService != null) {
                this.commitService.close();
            }
            LOG.info("[For table {} with {}[{}] at {}]: Successfully completed data file compaction", new Object[]{this.tableName, this.taskName, this.taskIndex, mark.getTimestamp()});
        }
        catch (Exception e) {
            LOG.warn("[For table {} with {}[{}] at {}]: Exception closing commit service", new Object[]{this.tableName, this.taskName, this.taskIndex, mark.getTimestamp(), e});
            this.output.collect(TaskResultAggregator.ERROR_STREAM, new StreamRecord((Object)e));
            this.errorCounter.inc();
        }
        this.commitService = null;
        super.processWatermark(mark);
    }

    public void close() throws IOException {
        if (this.commitService != null) {
            this.commitService.close();
        }
    }

    private class FlinkRewriteDataFilesCommitManager
    extends RewriteDataFilesCommitManager {
        private final long timestamp;

        FlinkRewriteDataFilesCommitManager(Table table, long startingSnapshotId, long timestamp) {
            super(table, startingSnapshotId);
            this.timestamp = timestamp;
        }

        @Override
        public void commitFileGroups(Set<RewriteFileGroup> fileGroups) {
            super.commitFileGroups(fileGroups);
            LOG.info("[For table {} with {}[{}] at {}]: Committed {}", new Object[]{DataFileRewriteCommitter.this.tableName, DataFileRewriteCommitter.this.taskName, DataFileRewriteCommitter.this.taskIndex, this.timestamp, fileGroups});
            this.updateMetrics(fileGroups);
        }

        private void updateMetrics(Set<RewriteFileGroup> fileGroups) {
            for (RewriteFileGroup fileGroup : fileGroups) {
                for (DataFile added : fileGroup.addedFiles()) {
                    DataFileRewriteCommitter.this.addedDataFileNumCounter.inc();
                    DataFileRewriteCommitter.this.addedDataFileSizeCounter.inc(added.fileSizeInBytes());
                }
                for (DataFile rewritten : fileGroup.rewrittenFiles()) {
                    DataFileRewriteCommitter.this.removedDataFileNumCounter.inc();
                    DataFileRewriteCommitter.this.removedDataFileSizeCounter.inc(rewritten.fileSizeInBytes());
                }
            }
        }
    }
}

