/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.postgres.source.fetch;

import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresEventDispatcher;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresPartition;
import io.debezium.connector.postgresql.PostgresSchema;
import io.debezium.connector.postgresql.PostgresStreamingChangeEventSource;
import io.debezium.connector.postgresql.PostgresTaskContext;
import io.debezium.connector.postgresql.connection.Lsn;
import io.debezium.connector.postgresql.connection.PostgresConnection;
import io.debezium.connector.postgresql.connection.ReplicationConnection;
import io.debezium.connector.postgresql.spi.Snapshotter;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import java.util.HashMap;
import javax.annotation.Nullable;
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
import org.apache.flink.cdc.connectors.base.WatermarkDispatcher;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
import org.apache.flink.cdc.connectors.postgres.source.fetch.PostgresSourceFetchTaskContext;
import org.apache.flink.cdc.connectors.postgres.source.fetch.StoppableChangeEventSourceContext;
import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresStreamFetchTask
implements FetchTask<SourceSplitBase> {
    private static final Logger LOG = LoggerFactory.getLogger(PostgresStreamFetchTask.class);
    private final StreamSplit split;
    private final StoppableChangeEventSourceContext changeEventSourceContext;
    private volatile boolean taskRunning = false;
    private volatile boolean stopped = false;
    private StreamSplitReadTask streamSplitReadTask;
    private Long lastCommitLsn;

    public PostgresStreamFetchTask(StreamSplit streamSplit) {
        this.split = streamSplit;
        this.changeEventSourceContext = new StoppableChangeEventSourceContext();
    }

    @Override
    public void execute(FetchTask.Context context) throws Exception {
        if (this.stopped) {
            LOG.debug("StreamFetchTask for split: {} is already stopped and can not be executed", (Object)this.split);
            return;
        }
        LOG.debug("execute StreamFetchTask for split: {}", (Object)this.split);
        PostgresSourceFetchTaskContext sourceFetchContext = (PostgresSourceFetchTaskContext)context;
        this.taskRunning = true;
        this.streamSplitReadTask = new StreamSplitReadTask(sourceFetchContext.getDbzConnectorConfig(), sourceFetchContext.getSnapShotter(), sourceFetchContext.getConnection(), (PostgresEventDispatcher<TableId>)sourceFetchContext.getEventDispatcher(), sourceFetchContext.getWaterMarkDispatcher(), sourceFetchContext.getErrorHandler(), sourceFetchContext.getTaskContext().getClock(), sourceFetchContext.getDatabaseSchema(), sourceFetchContext.getTaskContext(), sourceFetchContext.getReplicationConnection(), this.split);
        this.streamSplitReadTask.execute((ChangeEventSource.ChangeEventSourceContext)this.changeEventSourceContext, sourceFetchContext.getPartition(), sourceFetchContext.getOffsetContext());
    }

    @Override
    public void close() {
        LOG.debug("stopping StreamFetchTask for split: {}", (Object)this.split);
        if (this.streamSplitReadTask != null) {
            ((StoppableChangeEventSourceContext)this.streamSplitReadTask.context).stopChangeEventSource();
        }
        this.stopped = true;
        this.taskRunning = false;
    }

    @Override
    public boolean isRunning() {
        return this.taskRunning;
    }

    @Override
    public SourceSplitBase getSplit() {
        return this.split;
    }

    public void commitCurrentOffset(@Nullable Offset offsetToCommit) {
        if (this.streamSplitReadTask != null && this.streamSplitReadTask.offsetContext != null) {
            PostgresOffsetContext postgresOffsetContext = this.streamSplitReadTask.offsetContext;
            Long commitLsn = (Long)postgresOffsetContext.getOffset().get("lsn_commit");
            if (offsetToCommit != null) {
                commitLsn = ((PostgresOffset)offsetToCommit).getLsn().asLong();
            }
            if (commitLsn != null && (this.lastCommitLsn == null || Lsn.valueOf(commitLsn).compareTo(Lsn.valueOf(this.lastCommitLsn)) > 0)) {
                this.lastCommitLsn = commitLsn;
                HashMap<String, Long> offsets = new HashMap<String, Long>();
                offsets.put("lsn_commit", this.lastCommitLsn);
                LOG.debug("Committing offset {} for {}", (Object)Lsn.valueOf(this.lastCommitLsn), (Object)this.streamSplitReadTask.streamSplit);
                this.streamSplitReadTask.commitOffset(offsets);
            }
        }
    }

    @VisibleForTesting
    StoppableChangeEventSourceContext getChangeEventSourceContext() {
        return this.changeEventSourceContext;
    }

    public static class StreamSplitReadTask
    extends PostgresStreamingChangeEventSource {
        private static final Logger LOG = LoggerFactory.getLogger(StreamSplitReadTask.class);
        private final StreamSplit streamSplit;
        private final WatermarkDispatcher watermarkDispatcher;
        private final ErrorHandler errorHandler;
        public ChangeEventSource.ChangeEventSourceContext context;
        public PostgresOffsetContext offsetContext;

        public StreamSplitReadTask(PostgresConnectorConfig connectorConfig, Snapshotter snapshotter, PostgresConnection connection, PostgresEventDispatcher<TableId> eventDispatcher, WatermarkDispatcher watermarkDispatcher, ErrorHandler errorHandler, Clock clock, PostgresSchema schema, PostgresTaskContext taskContext, ReplicationConnection replicationConnection, StreamSplit streamSplit) {
            super(connectorConfig, snapshotter, connection, eventDispatcher, errorHandler, clock, schema, taskContext, replicationConnection);
            this.streamSplit = streamSplit;
            this.watermarkDispatcher = watermarkDispatcher;
            this.errorHandler = errorHandler;
        }

        @Override
        public void execute(ChangeEventSource.ChangeEventSourceContext context, PostgresPartition partition, PostgresOffsetContext offsetContext) throws InterruptedException {
            this.context = context;
            this.offsetContext = offsetContext;
            LOG.info("Execute StreamSplitReadTask for split: {}", (Object)this.streamSplit);
            offsetContext.setStreamingStoppingLsn(((PostgresOffset)this.streamSplit.getEndingOffset()).getLsn());
            super.execute(context, partition, offsetContext);
            if (this.isBoundedRead()) {
                LOG.debug("StreamSplit is bounded read: {}", (Object)this.streamSplit);
                PostgresOffset currentOffset = PostgresOffset.of(offsetContext.getOffset());
                try {
                    this.watermarkDispatcher.dispatchWatermarkEvent(partition.getSourcePartition(), this.streamSplit, currentOffset, WatermarkKind.END);
                    LOG.info("StreamSplitReadTask finished for {} at {}", (Object)this.streamSplit, (Object)currentOffset);
                }
                catch (InterruptedException e) {
                    LOG.error("Send signal event error.", (Throwable)e);
                    this.errorHandler.setProducerThrowable((Throwable)new FlinkRuntimeException("Error processing WAL signal event", (Throwable)e));
                }
                ((StoppableChangeEventSourceContext)context).stopChangeEventSource();
            }
        }

        private boolean isBoundedRead() {
            return !PostgresOffset.NO_STOPPING_OFFSET.getLsn().equals(((PostgresOffset)this.streamSplit.getEndingOffset()).getLsn());
        }
    }
}

