/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.debezium.task;

import com.github.shyiko.mysql.binlog.event.Event;
import io.debezium.DebeziumException;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import io.debezium.connector.mysql.MySqlOffsetContext;
import io.debezium.connector.mysql.MySqlPartition;
import io.debezium.connector.mysql.MySqlStreamingChangeEventSource;
import io.debezium.connector.mysql.MySqlStreamingChangeEventSourceMetrics;
import io.debezium.connector.mysql.MySqlTaskContext;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import java.util.function.Predicate;
import org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.EventDispatcherImpl;
import org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher;
import org.apache.flink.cdc.connectors.mysql.debezium.reader.StoppableChangeEventSourceContext;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetUtils;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlBinlogSplitReadTask
extends MySqlStreamingChangeEventSource {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlBinlogSplitReadTask.class);
    private final MySqlBinlogSplit binlogSplit;
    private final EventDispatcherImpl<TableId> eventDispatcher;
    private final SignalEventDispatcher signalEventDispatcher;
    private final ErrorHandler errorHandler;
    private final Predicate<Event> eventFilter;
    private ChangeEventSource.ChangeEventSourceContext context;

    public MySqlBinlogSplitReadTask(MySqlConnectorConfig connectorConfig, MySqlConnection connection, EventDispatcherImpl<TableId> dispatcher, SignalEventDispatcher signalEventDispatcher, ErrorHandler errorHandler, Clock clock, MySqlTaskContext taskContext, MySqlStreamingChangeEventSourceMetrics metrics, MySqlBinlogSplit binlogSplit, Predicate<Event> eventFilter) {
        super(connectorConfig, connection, dispatcher, errorHandler, clock, taskContext, metrics);
        this.binlogSplit = binlogSplit;
        this.eventDispatcher = dispatcher;
        this.errorHandler = errorHandler;
        this.signalEventDispatcher = signalEventDispatcher;
        this.eventFilter = eventFilter;
    }

    @Override
    public void execute(ChangeEventSource.ChangeEventSourceContext context, MySqlPartition partition, MySqlOffsetContext offsetContext) throws InterruptedException {
        this.context = context;
        super.execute(context, partition, offsetContext);
    }

    @Override
    protected void handleEvent(MySqlPartition partition, MySqlOffsetContext offsetContext, Event event) {
        BinlogOffset currentBinlogOffset;
        if (!this.eventFilter.test(event)) {
            return;
        }
        super.handleEvent(partition, offsetContext, event);
        if (this.isBoundedRead() && (currentBinlogOffset = RecordUtils.getBinlogPosition(offsetContext.getOffset())).isAtOrAfter(this.binlogSplit.getEndingOffset())) {
            try {
                this.signalEventDispatcher.dispatchWatermarkEvent(this.binlogSplit, currentBinlogOffset, SignalEventDispatcher.WatermarkKind.BINLOG_END);
            }
            catch (InterruptedException e) {
                LOG.error("Send signal event error.", (Throwable)e);
                this.errorHandler.setProducerThrowable((Throwable)new DebeziumException("Error processing binlog signal event", (Throwable)e));
            }
            ((StoppableChangeEventSourceContext)this.context).stopChangeEventSource();
        }
    }

    private boolean isBoundedRead() {
        return !BinlogOffsetUtils.isNonStoppingOffset(this.binlogSplit.getEndingOffset());
    }
}

