/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.connector.vitess;

import io.debezium.connector.vitess.VitessChangeRecordEmitter;
import io.debezium.connector.vitess.VitessConnectorConfig;
import io.debezium.connector.vitess.VitessDatabaseSchema;
import io.debezium.connector.vitess.VitessOffsetContext;
import io.debezium.connector.vitess.VitessPartition;
import io.debezium.connector.vitess.connection.ReplicationConnection;
import io.debezium.connector.vitess.connection.ReplicationMessage;
import io.debezium.connector.vitess.connection.ReplicationMessageProcessor;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;
import io.debezium.util.DelayStrategy;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VitessStreamingChangeEventSource
implements StreamingChangeEventSource<VitessPartition, VitessOffsetContext> {
    private static final Logger LOGGER = LoggerFactory.getLogger(VitessStreamingChangeEventSource.class);
    private final EventDispatcher<VitessPartition, TableId> dispatcher;
    private final ErrorHandler errorHandler;
    private final Clock clock;
    private final VitessDatabaseSchema schema;
    private final VitessConnectorConfig connectorConfig;
    private final ReplicationConnection replicationConnection;
    private final DelayStrategy pauseNoMessage;

    public VitessStreamingChangeEventSource(EventDispatcher<VitessPartition, TableId> dispatcher, ErrorHandler errorHandler, Clock clock, VitessDatabaseSchema schema, VitessConnectorConfig connectorConfig, ReplicationConnection replicationConnection) {
        this.dispatcher = dispatcher;
        this.errorHandler = errorHandler;
        this.clock = clock;
        this.schema = schema;
        this.connectorConfig = connectorConfig;
        this.replicationConnection = replicationConnection;
        this.pauseNoMessage = DelayStrategy.constant(connectorConfig.getPollInterval().toMillis());
        LOGGER.info("VitessStreamingChangeEventSource is created");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void execute(ChangeEventSource.ChangeEventSourceContext context, VitessPartition partition, VitessOffsetContext offsetContext) {
        if (offsetContext == null) {
            offsetContext = VitessOffsetContext.initialContext(this.connectorConfig, this.clock);
        }
        try {
            AtomicReference<Throwable> error = new AtomicReference<Throwable>();
            this.replicationConnection.startStreaming(offsetContext.getRestartVgtid(), this.newReplicationMessageProcessor(partition, offsetContext), error);
            while (context.isRunning() && error.get() == null) {
                this.pauseNoMessage.sleepWhen(true);
            }
            if (error.get() != null) {
                LOGGER.error("Error during streaming", error.get());
                throw error.get();
            }
        }
        catch (Throwable e) {
            this.errorHandler.setProducerThrowable(e);
        }
        finally {
            try {
                this.replicationConnection.close();
            }
            catch (Exception e) {
                LOGGER.error("Failed to close replicationConnection", e);
            }
        }
    }

    private ReplicationMessageProcessor newReplicationMessageProcessor(VitessPartition partition, VitessOffsetContext offsetContext) {
        return (message, newVgtid, isLastRowOfTransaction) -> {
            if (message.isTransactionalMessage()) {
                offsetContext.rotateVgtid(newVgtid, message.getCommitTime());
                if (message.getOperation() == ReplicationMessage.Operation.BEGIN) {
                    this.dispatcher.dispatchTransactionStartedEvent(partition, message.getTransactionId(), offsetContext);
                } else if (message.getOperation() == ReplicationMessage.Operation.COMMIT) {
                    this.dispatcher.dispatchTransactionCommittedEvent(partition, offsetContext);
                }
                return;
            }
            if (message.getOperation() == ReplicationMessage.Operation.DDL || message.getOperation() == ReplicationMessage.Operation.OTHER) {
                offsetContext.rotateVgtid(newVgtid, message.getCommitTime());
            } else {
                TableId tableId = VitessDatabaseSchema.parse(message.getTable());
                Objects.requireNonNull(tableId);
                offsetContext.event(tableId, message.getCommitTime());
                if (isLastRowOfTransaction) {
                    offsetContext.resetVgtid(newVgtid, message.getCommitTime());
                }
                this.dispatcher.dispatchDataChangeEvent(partition, tableId, new VitessChangeRecordEmitter(partition, offsetContext, this.clock, this.connectorConfig, this.schema, message));
            }
        };
    }
}

