/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot;

import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.SnapshotResult;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Map;
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.source.split.wartermark.WatermarkKind;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.PostgresSourceFetchTaskContext;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.PostgresSnapshotSplitReadTask;
import org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.reader.snapshot.SnapshotSplitChangeEventSourceContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresSnapshotFetchTask
implements FetchTask<SourceSplitBase> {
    private static final Logger log = LoggerFactory.getLogger(PostgresSnapshotFetchTask.class);
    private final SnapshotSplit split;
    private volatile boolean taskRunning = false;
    private PostgresSnapshotSplitReadTask snapshotSplitReadTask;

    public PostgresSnapshotFetchTask(SnapshotSplit split) {
        this.split = split;
    }

    public void execute(FetchTask.Context context) throws Exception {
        PostgresSourceFetchTaskContext sourceFetchContext = (PostgresSourceFetchTaskContext)context;
        this.taskRunning = true;
        this.snapshotSplitReadTask = new PostgresSnapshotSplitReadTask(sourceFetchContext.getDbzConnectorConfig(), sourceFetchContext.getOffsetContext(), (SnapshotProgressListener)sourceFetchContext.getSnapshotChangeEventSourceMetrics(), sourceFetchContext.getDatabaseSchema(), sourceFetchContext.getDataConnection(), (JdbcSourceEventDispatcher)sourceFetchContext.getDispatcher(), this.split);
        SnapshotSplitChangeEventSourceContext changeEventSourceContext = new SnapshotSplitChangeEventSourceContext();
        SnapshotResult<PostgresOffsetContext> snapshotResult = this.snapshotSplitReadTask.execute((ChangeEventSource.ChangeEventSourceContext)changeEventSourceContext, sourceFetchContext.getPartition(), sourceFetchContext.getOffsetContext());
        if (!snapshotResult.isCompletedOrSkipped()) {
            this.taskRunning = false;
            throw new IllegalStateException(String.format("Read snapshot for split %s fail", this.split));
        }
        boolean changed = changeEventSourceContext.getHighWatermark().isAfter(changeEventSourceContext.getLowWatermark());
        if (!context.isExactlyOnce()) {
            this.taskRunning = false;
            if (changed) {
                log.debug("Skip merge changelog(exactly-once) for snapshot split {}", (Object)this.split);
            }
            return;
        }
        IncrementalSplit backfillSplit = this.createBackFillWalSplit(changeEventSourceContext);
        this.dispatchBinlogEndEvent(backfillSplit, ((PostgresSourceFetchTaskContext)context).getPartition().getSourcePartition(), ((PostgresSourceFetchTaskContext)context).getDispatcher());
        this.taskRunning = false;
    }

    private IncrementalSplit createBackFillWalSplit(SnapshotSplitChangeEventSourceContext sourceContext) {
        return new IncrementalSplit(this.split.splitId(), Collections.singletonList(this.split.getTableId()), (Offset)sourceContext.getLowWatermark(), (Offset)sourceContext.getHighWatermark(), new ArrayList());
    }

    private void dispatchBinlogEndEvent(IncrementalSplit backFillBinlogSplit, Map<String, ?> sourcePartition, JdbcSourceEventDispatcher eventDispatcher) throws InterruptedException {
        eventDispatcher.dispatchWatermarkEvent(sourcePartition, (SourceSplitBase)backFillBinlogSplit, backFillBinlogSplit.getStopOffset(), WatermarkKind.END);
    }

    public boolean isRunning() {
        return this.taskRunning;
    }

    public void shutdown() {
        this.taskRunning = false;
    }

    public SourceSplitBase getSplit() {
        return this.split;
    }
}

