/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iceberg.flink.source.reader;

import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.iceberg.flink.source.reader.RecordAndPosition;
import org.apache.iceberg.flink.source.reader.SerializableRecordEmitter;
import org.apache.iceberg.flink.source.reader.SplitWatermarkExtractor;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class WatermarkExtractorRecordEmitter<T>
implements SerializableRecordEmitter<T> {
    private static final Logger LOG = LoggerFactory.getLogger(WatermarkExtractorRecordEmitter.class);
    private final SplitWatermarkExtractor timeExtractor;
    private String lastSplitId = null;
    private long watermark;

    WatermarkExtractorRecordEmitter(SplitWatermarkExtractor timeExtractor) {
        this.timeExtractor = timeExtractor;
    }

    public void emitRecord(RecordAndPosition<T> element, SourceOutput<T> output, IcebergSourceSplit split) {
        if (!split.splitId().equals(this.lastSplitId)) {
            long newWatermark = this.timeExtractor.extractWatermark(split);
            if (newWatermark < this.watermark) {
                LOG.info("Received a new split with lower watermark. Previous watermark = {}, current watermark = {}, previous split = {}, current split = {}", new Object[]{this.watermark, newWatermark, this.lastSplitId, split.splitId()});
            } else {
                this.watermark = newWatermark;
                output.emitWatermark(new Watermark(this.watermark));
                LOG.debug("Watermark = {} emitted based on split = {}", (Object)this.watermark, (Object)this.lastSplitId);
            }
            this.lastSplitId = split.splitId();
        }
        output.collect(element.record());
        split.updatePosition(element.fileOffset(), element.recordOffset());
    }
}

