/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.sink;

import java.util.Arrays;
import java.util.Map;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.flink.maintenance.operator.TableChange;
import org.apache.iceberg.flink.sink.DeltaManifests;
import org.apache.iceberg.flink.sink.DeltaManifestsSerializer;
import org.apache.iceberg.flink.sink.FlinkManifestUtil;
import org.apache.iceberg.flink.sink.IcebergCommittable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class CommittableToTableChangeConverter
extends ProcessFunction<CommittableMessage<IcebergCommittable>, TableChange> {
    private static final Logger LOG = LoggerFactory.getLogger(CommittableToTableChangeConverter.class);
    private final FileIO io;
    private final String tableName;
    private final Map<Integer, PartitionSpec> specs;
    private transient String flinkJobId;

    public CommittableToTableChangeConverter(FileIO fileIO, String tableName, Map<Integer, PartitionSpec> specs) {
        Preconditions.checkNotNull(fileIO, "FileIO should not be null");
        Preconditions.checkNotNull(tableName, "TableName should not be null");
        Preconditions.checkNotNull(specs, "Specs should not be null");
        this.io = fileIO;
        this.tableName = tableName;
        this.specs = specs;
    }

    public void open(OpenContext openContext) throws Exception {
        super.open(openContext);
        Preconditions.checkState(this.getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks() == 1, "CommittableToTableChangeConverter must run with parallelism 1, current parallelism: %s", this.getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks());
        this.flinkJobId = this.getRuntimeContext().getJobInfo().getJobId().toString();
    }

    public void processElement(CommittableMessage<IcebergCommittable> value, ProcessFunction.Context ctx, Collector<TableChange> out) throws Exception {
        if (value instanceof CommittableWithLineage) {
            WriteResult writeResult;
            DeltaManifests deltaManifests;
            IcebergCommittable committable = (IcebergCommittable)((CommittableWithLineage)value).getCommittable();
            if (committable == null || committable.manifest().length == 0) {
                return;
            }
            try {
                deltaManifests = (DeltaManifests)SimpleVersionedSerialization.readVersionAndDeSerialize((SimpleVersionedSerializer)DeltaManifestsSerializer.INSTANCE, (byte[])committable.manifest());
                writeResult = FlinkManifestUtil.readCompletedFiles(deltaManifests, this.io, this.specs);
            }
            catch (Exception e) {
                LOG.warn("Unable to read delta manifests for table {} at checkpoint {}", new Object[]{this.tableName, committable.checkpointId(), e});
                return;
            }
            TableChange tableChange = new TableChange(Arrays.asList(writeResult.dataFiles()), Arrays.asList(writeResult.deleteFiles()));
            out.collect((Object)tableChange);
            FlinkManifestUtil.deleteCommittedManifests(this.tableName, this.io, deltaManifests.manifests(), this.flinkJobId, committable.checkpointId());
        }
    }
}

