/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.base.source.reader.external;

import java.util.ArrayList;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.base.source.reader.external.JdbcSourceFetchTaskContext;
import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractScanFetchTask
implements FetchTask {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractScanFetchTask.class);
    protected volatile boolean taskRunning = false;
    protected final SnapshotSplit snapshotSplit;
    private SnapshotPhaseHooks snapshotPhaseHooks = SnapshotPhaseHooks.empty();

    public AbstractScanFetchTask(SnapshotSplit snapshotSplit) {
        this.snapshotSplit = snapshotSplit;
    }

    @Override
    public void execute(FetchTask.Context context) throws Exception {
        StreamSplit backfillStreamSplit;
        boolean streamBackfillRequired;
        LOG.info("Execute ScanFetchTask for split: {}", (Object)this.snapshotSplit);
        DataSourceDialect dialect = context.getDataSourceDialect();
        SourceConfig sourceConfig = context.getSourceConfig();
        this.taskRunning = true;
        if (this.snapshotPhaseHooks.getPreLowWatermarkAction() != null) {
            this.snapshotPhaseHooks.getPreLowWatermarkAction().accept(sourceConfig, this.snapshotSplit);
        }
        Offset lowWatermark = dialect.displayCurrentOffset(sourceConfig);
        LOG.info("Snapshot step 1 - Determining low watermark {} for split {}", (Object)lowWatermark, (Object)this.snapshotSplit);
        this.dispatchLowWaterMarkEvent(context, this.snapshotSplit, lowWatermark);
        if (this.snapshotPhaseHooks.getPostLowWatermarkAction() != null) {
            this.snapshotPhaseHooks.getPostLowWatermarkAction().accept(sourceConfig, this.snapshotSplit);
        }
        LOG.info("Snapshot step 2 - Snapshotting data");
        this.executeDataSnapshot(context);
        if (this.snapshotPhaseHooks.getPreHighWatermarkAction() != null) {
            this.snapshotPhaseHooks.getPreHighWatermarkAction().accept(sourceConfig, this.snapshotSplit);
        }
        Offset highWatermark = context.getSourceConfig().isSkipSnapshotBackfill() ? lowWatermark : dialect.displayCurrentOffset(sourceConfig);
        LOG.info("Snapshot step 3 - Determining high watermark {} for split {}", (Object)highWatermark, (Object)this.snapshotSplit);
        this.dispatchHighWaterMarkEvent(context, this.snapshotSplit, highWatermark);
        if (this.snapshotPhaseHooks.getPostHighWatermarkAction() != null) {
            this.snapshotPhaseHooks.getPostHighWatermarkAction().accept(sourceConfig, this.snapshotSplit);
        }
        if (!(streamBackfillRequired = (backfillStreamSplit = this.createBackfillStreamSplit(lowWatermark, highWatermark)).getEndingOffset().isAfter(backfillStreamSplit.getStartingOffset()))) {
            LOG.info("Skip the backfill {} for split {}: low watermark >= high watermark", (Object)backfillStreamSplit, (Object)this.snapshotSplit);
            this.dispatchEndWaterMarkEvent(context, backfillStreamSplit, backfillStreamSplit.getEndingOffset());
        } else {
            this.executeBackfillTask(context, backfillStreamSplit);
        }
        this.taskRunning = false;
    }

    protected StreamSplit createBackfillStreamSplit(Offset lowWatermark, Offset highWatermark) {
        return new StreamSplit(this.snapshotSplit.splitId(), lowWatermark, highWatermark, new ArrayList<FinishedSnapshotSplitInfo>(), this.snapshotSplit.getTableSchemas(), 0);
    }

    protected abstract void executeBackfillTask(FetchTask.Context var1, StreamSplit var2) throws Exception;

    protected abstract void executeDataSnapshot(FetchTask.Context var1) throws Exception;

    protected void dispatchLowWaterMarkEvent(FetchTask.Context context, SourceSplitBase split, Offset lowWatermark) throws Exception {
        if (context instanceof JdbcSourceFetchTaskContext) {
            ((JdbcSourceFetchTaskContext)context).getWaterMarkDispatcher().dispatchWatermarkEvent(((JdbcSourceFetchTaskContext)context).getPartition().getSourcePartition(), split, lowWatermark, WatermarkKind.LOW);
            return;
        }
        throw new UnsupportedOperationException("Unsupported Context type: " + context.getClass().toString());
    }

    protected void dispatchHighWaterMarkEvent(FetchTask.Context context, SourceSplitBase split, Offset highWatermark) throws Exception {
        if (context instanceof JdbcSourceFetchTaskContext) {
            ((JdbcSourceFetchTaskContext)context).getWaterMarkDispatcher().dispatchWatermarkEvent(((JdbcSourceFetchTaskContext)context).getPartition().getSourcePartition(), split, highWatermark, WatermarkKind.HIGH);
            return;
        }
        throw new UnsupportedOperationException("Unsupported Context type: " + context.getClass().toString());
    }

    protected void dispatchEndWaterMarkEvent(FetchTask.Context context, SourceSplitBase split, Offset endWatermark) throws Exception {
        if (context instanceof JdbcSourceFetchTaskContext) {
            ((JdbcSourceFetchTaskContext)context).getWaterMarkDispatcher().dispatchWatermarkEvent(((JdbcSourceFetchTaskContext)context).getPartition().getSourcePartition(), split, endWatermark, WatermarkKind.END);
            return;
        }
        throw new UnsupportedOperationException("Unsupported Context type: " + context.getClass().toString());
    }

    @Override
    public boolean isRunning() {
        return this.taskRunning;
    }

    public SnapshotSplit getSplit() {
        return this.snapshotSplit;
    }

    @Override
    public void close() {
        this.taskRunning = false;
    }

    @VisibleForTesting
    public void setSnapshotPhaseHooks(SnapshotPhaseHooks snapshotPhaseHooks) {
        this.snapshotPhaseHooks = snapshotPhaseHooks;
    }
}

