/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.base.source.reader.external;

import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.pipeline.DataChangeEvent;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.base.source.reader.external.Fetcher;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;
import org.apache.flink.shaded.guava31.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IncrementalSourceScanFetcher
implements Fetcher<SourceRecords, SourceSplitBase> {
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalSourceScanFetcher.class);
    public AtomicBoolean hasNextElement;
    public AtomicBoolean reachEnd;
    private final FetchTask.Context taskContext;
    private final ExecutorService executorService;
    private volatile ChangeEventQueue<DataChangeEvent> queue;
    private volatile Throwable readException;
    private FetchTask<SourceSplitBase> snapshotSplitReadTask;
    private SnapshotSplit currentSnapshotSplit;
    private static final long READER_CLOSE_TIMEOUT_SECONDS = 30L;

    public IncrementalSourceScanFetcher(FetchTask.Context taskContext, int subtaskId) {
        this.taskContext = taskContext;
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("debezium-snapshot-reader-" + subtaskId).setUncaughtExceptionHandler((thread, throwable) -> this.setReadException(throwable)).build();
        this.executorService = Executors.newSingleThreadExecutor(threadFactory);
        this.hasNextElement = new AtomicBoolean(false);
        this.reachEnd = new AtomicBoolean(false);
    }

    @Override
    public void submitTask(FetchTask<SourceSplitBase> fetchTask) {
        this.snapshotSplitReadTask = fetchTask;
        this.currentSnapshotSplit = fetchTask.getSplit().asSnapshotSplit();
        this.taskContext.configure(this.currentSnapshotSplit);
        this.queue = this.taskContext.getQueue();
        this.hasNextElement.set(true);
        this.reachEnd.set(false);
        this.executorService.execute(() -> {
            try {
                this.snapshotSplitReadTask.execute(this.taskContext);
            }
            catch (Exception e) {
                this.setReadException(e);
            }
        });
    }

    @Override
    public boolean isFinished() {
        return this.currentSnapshotSplit == null || !this.snapshotSplitReadTask.isRunning() && !this.hasNextElement.get() && this.reachEnd.get();
    }

    @Override
    @Nullable
    public Iterator<SourceRecords> pollSplitRecords() throws InterruptedException {
        this.checkReadException();
        if (this.hasNextElement.get()) {
            if (this.taskContext.getSourceConfig().isSkipSnapshotBackfill()) {
                return this.pollWithoutBuffer();
            }
            return this.pollWithBuffer();
        }
        this.reachEnd.compareAndSet(false, true);
        return null;
    }

    public Iterator<SourceRecords> pollWithoutBuffer() throws InterruptedException {
        this.checkReadException();
        List<DataChangeEvent> batch = this.queue.poll();
        ArrayList<SourceRecord> records = new ArrayList<SourceRecord>();
        for (DataChangeEvent event : batch) {
            if (WatermarkEvent.isEndWatermarkEvent(event.getRecord())) {
                this.hasNextElement.set(false);
                break;
            }
            records.add(event.getRecord());
        }
        return Collections.singletonList(new SourceRecords(records)).iterator();
    }

    public Iterator<SourceRecords> pollWithBuffer() throws InterruptedException {
        boolean reachChangeLogStart = false;
        boolean reachChangeLogEnd = false;
        SourceRecord lowWatermark = null;
        SourceRecord highWatermark = null;
        HashMap<Struct, SourceRecord> outputBuffer = new HashMap<Struct, SourceRecord>();
        block0: while (!reachChangeLogEnd) {
            this.checkReadException();
            List<DataChangeEvent> batch = this.queue.poll();
            for (DataChangeEvent event : batch) {
                SourceRecord record = event.getRecord();
                if (lowWatermark == null) {
                    lowWatermark = record;
                    this.assertLowWatermark(lowWatermark);
                    continue;
                }
                if (highWatermark == null && WatermarkEvent.isHighWatermarkEvent(record)) {
                    highWatermark = record;
                    reachChangeLogStart = true;
                    continue;
                }
                if (reachChangeLogStart && WatermarkEvent.isEndWatermarkEvent(record)) {
                    reachChangeLogEnd = true;
                    continue block0;
                }
                if (!reachChangeLogStart) {
                    outputBuffer.put((Struct)record.key(), record);
                    continue;
                }
                if (!this.isChangeRecordInChunkRange(record)) continue;
                this.taskContext.rewriteOutputBuffer(outputBuffer, record);
            }
        }
        this.hasNextElement.set(false);
        ArrayList<SourceRecord> normalizedRecords = new ArrayList<SourceRecord>();
        normalizedRecords.add(lowWatermark);
        normalizedRecords.addAll(this.taskContext.formatMessageTimestamp(outputBuffer.values()));
        normalizedRecords.add(highWatermark);
        ArrayList<SourceRecords> sourceRecordsSet = new ArrayList<SourceRecords>();
        sourceRecordsSet.add(new SourceRecords(normalizedRecords));
        return sourceRecordsSet.iterator();
    }

    private void checkReadException() {
        if (this.readException != null) {
            throw new FlinkRuntimeException(String.format("Read split %s error due to %s.", this.currentSnapshotSplit, this.readException.getMessage()), this.readException);
        }
    }

    private void setReadException(Throwable throwable) {
        LOG.error(String.format("Execute snapshot read task for snapshot split %s fail", this.currentSnapshotSplit), throwable);
        if (this.readException == null) {
            this.readException = throwable;
        } else {
            this.readException.addSuppressed(throwable);
        }
    }

    @Override
    public void close() {
        try {
            if (this.taskContext != null) {
                this.taskContext.close();
            }
            if (this.snapshotSplitReadTask != null) {
                this.snapshotSplitReadTask.close();
            }
            if (this.executorService != null) {
                this.executorService.shutdown();
                if (!this.executorService.awaitTermination(30L, TimeUnit.SECONDS)) {
                    LOG.warn("Failed to close the scan fetcher in {} seconds.", (Object)30L);
                }
            }
        }
        catch (Exception e) {
            LOG.error("Close scan fetcher error", (Throwable)e);
        }
    }

    @VisibleForTesting
    public ExecutorService getExecutorService() {
        return this.executorService;
    }

    private void assertLowWatermark(SourceRecord lowWatermark) {
        Preconditions.checkState((boolean)WatermarkEvent.isLowWatermarkEvent(lowWatermark), (Object)String.format("The first record should be low watermark signal event, but actual is %s", lowWatermark));
    }

    private boolean isChangeRecordInChunkRange(SourceRecord record) {
        if (this.taskContext.isDataChangeRecord(record)) {
            return this.taskContext.isRecordBetween(record, this.currentSnapshotSplit.getSplitStart(), this.currentSnapshotSplit.getSplitEnd());
        }
        return false;
    }
}

