/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.SnapshotRecord;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.data.Envelope;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.ConnectorEvent;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.signal.Signal;
import io.debezium.pipeline.source.snapshot.incremental.IncrementalSnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.ChangeEventCreator;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.pipeline.txmetadata.TransactionMonitor;
import io.debezium.relational.history.ConnectTableChangeSerializer;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DataCollectionSchema;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.HistorizedDatabaseSchema;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.TopicSelector;
import io.debezium.util.SchemaNameAdjuster;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventDispatcher<P extends Partition, T extends DataCollectionId>
implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class);
    private final TopicSelector<T> topicSelector;
    private final DatabaseSchema<T> schema;
    private final HistorizedDatabaseSchema<T> historizedSchema;
    private final ChangeEventQueue<DataChangeEvent> queue;
    private final DataCollectionFilters.DataCollectionFilter<T> filter;
    private final ChangeEventCreator changeEventCreator;
    private final Heartbeat heartbeat;
    private DataChangeEventListener<P> eventListener = DataChangeEventListener.NO_OP();
    private final boolean emitTombstonesOnDelete;
    private final InconsistentSchemaHandler<P, T> inconsistentSchemaHandler;
    private final TransactionMonitor transactionMonitor;
    private final CommonConnectorConfig connectorConfig;
    private final EnumSet<Envelope.Operation> skippedOperations;
    private final boolean neverSkip;
    private final Schema schemaChangeKeySchema;
    private final Schema schemaChangeValueSchema;
    private final ConnectTableChangeSerializer tableChangesSerializer;
    private final Signal<P> signal;
    private IncrementalSnapshotChangeEventSource<P, T> incrementalSnapshotChangeEventSource;
    private final StreamingChangeRecordReceiver streamingReceiver;

    public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> topicSelector, DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilters.DataCollectionFilter<T> filter, ChangeEventCreator changeEventCreator, EventMetadataProvider metadataProvider, SchemaNameAdjuster schemaNameAdjuster) {
        this(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, null, metadataProvider, connectorConfig.createHeartbeat(topicSelector, schemaNameAdjuster, null, null), schemaNameAdjuster);
    }

    public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> topicSelector, DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilters.DataCollectionFilter<T> filter, ChangeEventCreator changeEventCreator, EventMetadataProvider metadataProvider, Heartbeat heartbeat, SchemaNameAdjuster schemaNameAdjuster) {
        this(connectorConfig, topicSelector, schema, queue, filter, changeEventCreator, null, metadataProvider, heartbeat, schemaNameAdjuster);
    }

    public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> topicSelector, DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilters.DataCollectionFilter<T> filter, ChangeEventCreator changeEventCreator, InconsistentSchemaHandler<P, T> inconsistentSchemaHandler, EventMetadataProvider metadataProvider, Heartbeat heartbeat, SchemaNameAdjuster schemaNameAdjuster) {
        this.tableChangesSerializer = new ConnectTableChangeSerializer(schemaNameAdjuster);
        this.connectorConfig = connectorConfig;
        this.topicSelector = topicSelector;
        this.schema = schema;
        this.historizedSchema = schema.isHistorized() ? (HistorizedDatabaseSchema)schema : null;
        this.queue = queue;
        this.filter = filter;
        this.changeEventCreator = changeEventCreator;
        this.streamingReceiver = new StreamingChangeRecordReceiver();
        this.emitTombstonesOnDelete = connectorConfig.isEmitTombstoneOnDelete();
        this.inconsistentSchemaHandler = inconsistentSchemaHandler != null ? inconsistentSchemaHandler : this::errorOnMissingSchema;
        this.skippedOperations = connectorConfig.getSkippedOperations();
        this.neverSkip = connectorConfig.supportsOperationFiltering() || this.skippedOperations.isEmpty();
        this.transactionMonitor = new TransactionMonitor(connectorConfig, metadataProvider, schemaNameAdjuster, this::enqueueTransactionMessage);
        this.signal = new Signal(connectorConfig, this);
        this.heartbeat = heartbeat;
        this.schemaChangeKeySchema = SchemaBuilder.struct().name(schemaNameAdjuster.adjust("io.debezium.connector." + connectorConfig.getConnectorName() + ".SchemaChangeKey")).field("databaseName", Schema.STRING_SCHEMA).build();
        this.schemaChangeValueSchema = SchemaBuilder.struct().name(schemaNameAdjuster.adjust("io.debezium.connector." + connectorConfig.getConnectorName() + ".SchemaChangeValue")).field("source", connectorConfig.getSourceInfoStructMaker().schema()).field("ts_ms", Schema.INT64_SCHEMA).field("databaseName", Schema.OPTIONAL_STRING_SCHEMA).field("schemaName", Schema.OPTIONAL_STRING_SCHEMA).field("ddl", Schema.OPTIONAL_STRING_SCHEMA).field("tableChanges", SchemaBuilder.array((Schema)this.tableChangesSerializer.getChangeSchema()).build()).build();
    }

    public void dispatchSnapshotEvent(P partition, T dataCollectionId, ChangeRecordEmitter<P> changeRecordEmitter, final SnapshotReceiver<P> receiver) throws InterruptedException {
        final DataCollectionSchema dataCollectionSchema = this.schema.schemaFor(dataCollectionId);
        if (dataCollectionSchema == null) {
            this.errorOnMissingSchema(partition, dataCollectionId, changeRecordEmitter);
        }
        changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new ChangeRecordEmitter.Receiver<P>(){

            @Override
            public void changeRecord(P partition, DataCollectionSchema schema, Envelope.Operation operation, Object key, Struct value, OffsetContext offset, ConnectHeaders headers) throws InterruptedException {
                EventDispatcher.this.eventListener.onEvent(partition, dataCollectionSchema.id(), offset, key, value, operation);
                receiver.changeRecord(partition, dataCollectionSchema, operation, key, value, offset, headers);
            }
        });
    }

    public SnapshotReceiver<P> getSnapshotChangeEventReceiver() {
        return new BufferingSnapshotChangeRecordReceiver();
    }

    public SnapshotReceiver<P> getIncrementalSnapshotChangeEventReceiver(DataChangeEventListener<P> dataListener) {
        return new IncrementalSnapshotChangeRecordReceiver(dataListener);
    }

    public boolean dispatchDataChangeEvent(P partition, T dataCollectionId, ChangeRecordEmitter<P> changeRecordEmitter) throws InterruptedException {
        try {
            boolean handled = false;
            if (!this.filter.isIncluded(dataCollectionId)) {
                LOGGER.trace("Filtered data change event for {}", dataCollectionId);
                this.eventListener.onFilteredEvent(partition, "source = " + dataCollectionId, changeRecordEmitter.getOperation());
                this.dispatchFilteredEvent(changeRecordEmitter.getPartition(), changeRecordEmitter.getOffset());
            } else {
                DataCollectionSchema dataCollectionSchema = this.schema.schemaFor(dataCollectionId);
                if (dataCollectionSchema == null) {
                    Optional<DataCollectionSchema> replacementSchema = this.inconsistentSchemaHandler.handle(partition, dataCollectionId, changeRecordEmitter);
                    if (!replacementSchema.isPresent()) {
                        return false;
                    }
                    dataCollectionSchema = replacementSchema.get();
                }
                changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new ChangeRecordEmitter.Receiver<P>(){
                    final /* synthetic */ DataCollectionId val$dataCollectionId;
                    {
                        this.val$dataCollectionId = dataCollectionId;
                    }

                    @Override
                    public void changeRecord(P partition, DataCollectionSchema schema, Envelope.Operation operation, Object key, Struct value, OffsetContext offset, ConnectHeaders headers) throws InterruptedException {
                        if (operation == Envelope.Operation.CREATE && EventDispatcher.this.signal.isSignal(this.val$dataCollectionId)) {
                            EventDispatcher.this.signal.process(partition, value, offset);
                        }
                        if (EventDispatcher.this.neverSkip || !EventDispatcher.this.skippedOperations.contains((Object)operation)) {
                            EventDispatcher.this.transactionMonitor.dataEvent((Partition)partition, this.val$dataCollectionId, offset, key, value);
                            EventDispatcher.this.eventListener.onEvent(partition, this.val$dataCollectionId, offset, key, value, operation);
                            if (EventDispatcher.this.incrementalSnapshotChangeEventSource != null) {
                                EventDispatcher.this.incrementalSnapshotChangeEventSource.processMessage(partition, this.val$dataCollectionId, key, offset);
                            }
                            EventDispatcher.this.streamingReceiver.changeRecord(partition, schema, operation, key, value, offset, headers);
                        }
                    }
                });
                handled = true;
            }
            this.heartbeat.heartbeat(changeRecordEmitter.getPartition().getSourcePartition(), changeRecordEmitter.getOffset().getOffset(), this::enqueueHeartbeat);
            return handled;
        }
        catch (Exception e) {
            switch (this.connectorConfig.getEventProcessingFailureHandlingMode()) {
                case FAIL: {
                    throw new ConnectException("Error while processing event at offset " + changeRecordEmitter.getOffset().getOffset(), (Throwable)e);
                }
                case WARN: {
                    LOGGER.warn("Error while processing event at offset {}", changeRecordEmitter.getOffset().getOffset());
                    break;
                }
                case SKIP: {
                    LOGGER.debug("Error while processing event at offset {}", changeRecordEmitter.getOffset().getOffset());
                }
            }
            return false;
        }
    }

    public void dispatchFilteredEvent(P partition, OffsetContext offset) throws InterruptedException {
        if (this.incrementalSnapshotChangeEventSource != null) {
            this.incrementalSnapshotChangeEventSource.processFilteredEvent(partition, offset);
        }
    }

    public void dispatchTransactionCommittedEvent(P partition, OffsetContext offset) throws InterruptedException {
        this.transactionMonitor.transactionComittedEvent((Partition)partition, offset);
        if (this.incrementalSnapshotChangeEventSource != null) {
            this.incrementalSnapshotChangeEventSource.processTransactionCommittedEvent(partition, offset);
        }
    }

    public void dispatchTransactionStartedEvent(P partition, String transactionId, OffsetContext offset) throws InterruptedException {
        this.transactionMonitor.transactionStartedEvent((Partition)partition, transactionId, offset);
        if (this.incrementalSnapshotChangeEventSource != null) {
            this.incrementalSnapshotChangeEventSource.processTransactionStartedEvent(partition, offset);
        }
    }

    public void dispatchConnectorEvent(P partition, ConnectorEvent event) {
        this.eventListener.onConnectorEvent(partition, event);
    }

    public Optional<DataCollectionSchema> errorOnMissingSchema(P partition, T dataCollectionId, ChangeRecordEmitter<P> changeRecordEmitter) {
        this.eventListener.onErroneousEvent(partition, "source = " + dataCollectionId, changeRecordEmitter.getOperation());
        throw new IllegalArgumentException("No metadata registered for captured table " + dataCollectionId);
    }

    public Optional<DataCollectionSchema> ignoreMissingSchema(T dataCollectionId, ChangeRecordEmitter<P> changeRecordEmitter) {
        return Optional.empty();
    }

    public void dispatchSchemaChangeEvent(P partition, T dataCollectionId, SchemaChangeEventEmitter schemaChangeEventEmitter) throws InterruptedException {
        if (dataCollectionId != null && !this.filter.isIncluded(dataCollectionId) && (this.historizedSchema == null || this.historizedSchema.storeOnlyCapturedTables())) {
            LOGGER.trace("Filtering schema change event for {}", dataCollectionId);
            return;
        }
        schemaChangeEventEmitter.emitSchemaChangeEvent(new SchemaChangeEventReceiver());
        if (this.incrementalSnapshotChangeEventSource != null) {
            this.incrementalSnapshotChangeEventSource.processSchemaChange(partition, (DataCollectionId)dataCollectionId);
        }
    }

    public void dispatchSchemaChangeEvent(Collection<T> dataCollectionIds, SchemaChangeEventEmitter schemaChangeEventEmitter) throws InterruptedException {
        boolean anyNonfilteredEvent = false;
        if (dataCollectionIds == null || dataCollectionIds.isEmpty()) {
            anyNonfilteredEvent = true;
        } else {
            for (DataCollectionId dataCollectionId : dataCollectionIds) {
                if (!this.filter.isIncluded(dataCollectionId)) continue;
                anyNonfilteredEvent = true;
                break;
            }
        }
        if (!anyNonfilteredEvent && (this.historizedSchema == null || this.historizedSchema.storeOnlyCapturedTables())) {
            LOGGER.trace("Filtering schema change event for {}", dataCollectionIds);
            return;
        }
        schemaChangeEventEmitter.emitSchemaChangeEvent(new SchemaChangeEventReceiver());
    }

    public void alwaysDispatchHeartbeatEvent(P partition, OffsetContext offset) throws InterruptedException {
        this.heartbeat.forcedBeat(partition.getSourcePartition(), offset.getOffset(), this::enqueueHeartbeat);
    }

    public void dispatchHeartbeatEvent(P partition, OffsetContext offset) throws InterruptedException {
        this.heartbeat.heartbeat(partition.getSourcePartition(), offset.getOffset(), this::enqueueHeartbeat);
    }

    public boolean heartbeatsEnabled() {
        return this.heartbeat.isEnabled();
    }

    private void enqueueHeartbeat(SourceRecord record) throws InterruptedException {
        this.queue.enqueue(new DataChangeEvent(record));
    }

    private void enqueueTransactionMessage(SourceRecord record) throws InterruptedException {
        this.queue.enqueue(new DataChangeEvent(record));
    }

    private void enqueueSchemaChangeMessage(SourceRecord record) throws InterruptedException {
        this.queue.enqueue(new DataChangeEvent(record));
    }

    public void dispatchServerHeartbeatEvent(P partition, OffsetContext offset) throws InterruptedException {
        if (this.incrementalSnapshotChangeEventSource != null) {
            this.incrementalSnapshotChangeEventSource.processHeartbeat(partition, offset);
        }
    }

    public void setEventListener(DataChangeEventListener<P> eventListener) {
        this.eventListener = eventListener;
    }

    public void setIncrementalSnapshotChangeEventSource(Optional<IncrementalSnapshotChangeEventSource<P, ? extends DataCollectionId>> incrementalSnapshotChangeEventSource) {
        this.incrementalSnapshotChangeEventSource = incrementalSnapshotChangeEventSource.orElse(null);
    }

    public DatabaseSchema<T> getSchema() {
        return this.schema;
    }

    public HistorizedDatabaseSchema<T> getHistorizedSchema() {
        return this.historizedSchema;
    }

    public IncrementalSnapshotChangeEventSource<P, T> getIncrementalSnapshotChangeEventSource() {
        return this.incrementalSnapshotChangeEventSource;
    }

    @Override
    public void close() {
        if (this.heartbeatsEnabled()) {
            this.heartbeat.close();
        }
    }

    @FunctionalInterface
    public static interface InconsistentSchemaHandler<P extends Partition, T extends DataCollectionId> {
        public Optional<DataCollectionSchema> handle(P var1, T var2, ChangeRecordEmitter var3);
    }

    private final class StreamingChangeRecordReceiver
    implements ChangeRecordEmitter.Receiver<P> {
        private StreamingChangeRecordReceiver() {
        }

        @Override
        public void changeRecord(P partition, DataCollectionSchema dataCollectionSchema, Envelope.Operation operation, Object key, Struct value, OffsetContext offsetContext, ConnectHeaders headers) throws InterruptedException {
            Objects.requireNonNull(value, "value must not be null");
            LOGGER.trace("Received change record for {} operation on key {}", (Object)operation, key);
            Schema keySchema = key == null && operation == Envelope.Operation.TRUNCATE ? null : dataCollectionSchema.keySchema();
            String topicName = EventDispatcher.this.topicSelector.topicNameFor(dataCollectionSchema.id());
            SourceRecord record = new SourceRecord(partition.getSourcePartition(), offsetContext.getOffset(), topicName, null, keySchema, key, dataCollectionSchema.getEnvelopeSchema().schema(), (Object)value, null, (Iterable)headers);
            EventDispatcher.this.queue.enqueue(EventDispatcher.this.changeEventCreator.createDataChangeEvent(record));
            if (EventDispatcher.this.emitTombstonesOnDelete && operation == Envelope.Operation.DELETE) {
                SourceRecord tombStone = record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), null, null, record.timestamp(), (Iterable)record.headers());
                EventDispatcher.this.queue.enqueue(EventDispatcher.this.changeEventCreator.createDataChangeEvent(tombStone));
            }
        }
    }

    public static interface SnapshotReceiver<P extends Partition>
    extends ChangeRecordEmitter.Receiver<P> {
        public void completeSnapshot() throws InterruptedException;
    }

    private final class BufferingSnapshotChangeRecordReceiver
    implements SnapshotReceiver<P> {
        private Supplier<DataChangeEvent> bufferedEvent;

        private BufferingSnapshotChangeRecordReceiver() {
        }

        @Override
        public void changeRecord(P partition, DataCollectionSchema dataCollectionSchema, Envelope.Operation operation, Object key, Struct value, OffsetContext offsetContext, ConnectHeaders headers) throws InterruptedException {
            Objects.requireNonNull(value, "value must not be null");
            LOGGER.trace("Received change record for {} operation on key {}", (Object)operation, key);
            if (this.bufferedEvent != null) {
                EventDispatcher.this.queue.enqueue(this.bufferedEvent.get());
            }
            Schema keySchema = dataCollectionSchema.keySchema();
            String topicName = EventDispatcher.this.topicSelector.topicNameFor(dataCollectionSchema.id());
            this.bufferedEvent = () -> {
                SourceRecord record = new SourceRecord(partition.getSourcePartition(), offsetContext.getOffset(), topicName, null, keySchema, key, dataCollectionSchema.getEnvelopeSchema().schema(), (Object)value, null, (Iterable)headers);
                return EventDispatcher.this.changeEventCreator.createDataChangeEvent(record);
            };
        }

        @Override
        public void completeSnapshot() throws InterruptedException {
            if (this.bufferedEvent != null) {
                Struct source;
                SnapshotRecord snapshot;
                DataChangeEvent event = this.bufferedEvent.get();
                Struct envelope = (Struct)event.getRecord().value();
                if (envelope.schema().field("source") != null && (snapshot = SnapshotRecord.fromSource(source = envelope.getStruct("source"))) == SnapshotRecord.TRUE) {
                    SnapshotRecord.LAST.toSource(source);
                }
                EventDispatcher.this.queue.enqueue(event);
                this.bufferedEvent = null;
            }
        }
    }

    private final class IncrementalSnapshotChangeRecordReceiver
    implements SnapshotReceiver<P> {
        public final DataChangeEventListener<P> dataListener;

        public IncrementalSnapshotChangeRecordReceiver(DataChangeEventListener<P> dataListener) {
            this.dataListener = dataListener;
        }

        @Override
        public void changeRecord(P partition, DataCollectionSchema dataCollectionSchema, Envelope.Operation operation, Object key, Struct value, OffsetContext offsetContext, ConnectHeaders headers) throws InterruptedException {
            Objects.requireNonNull(value, "value must not be null");
            LOGGER.trace("Received change record for {} operation on key {}", (Object)operation, key);
            Schema keySchema = dataCollectionSchema.keySchema();
            String topicName = EventDispatcher.this.topicSelector.topicNameFor(dataCollectionSchema.id());
            SourceRecord record = new SourceRecord(partition.getSourcePartition(), offsetContext.getOffset(), topicName, null, keySchema, key, dataCollectionSchema.getEnvelopeSchema().schema(), (Object)value, null, (Iterable)headers);
            this.dataListener.onEvent(partition, dataCollectionSchema.id(), offsetContext, keySchema, value, operation);
            EventDispatcher.this.queue.enqueue(EventDispatcher.this.changeEventCreator.createDataChangeEvent(record));
        }

        @Override
        public void completeSnapshot() throws InterruptedException {
        }
    }

    private final class SchemaChangeEventReceiver
    implements SchemaChangeEventEmitter.Receiver {
        private SchemaChangeEventReceiver() {
        }

        private Struct schemaChangeRecordKey(SchemaChangeEvent event) {
            Struct result = new Struct(EventDispatcher.this.schemaChangeKeySchema);
            result.put("databaseName", (Object)event.getDatabase());
            return result;
        }

        private Struct schemaChangeRecordValue(SchemaChangeEvent event) {
            Struct result = new Struct(EventDispatcher.this.schemaChangeValueSchema);
            result.put("source", (Object)event.getSource());
            result.put("ts_ms", (Object)event.getTimestamp().toEpochMilli());
            result.put("databaseName", (Object)event.getDatabase());
            result.put("schemaName", (Object)event.getSchema());
            result.put("ddl", (Object)event.getDdl());
            result.put("tableChanges", EventDispatcher.this.tableChangesSerializer.serialize(event.getTableChanges()));
            return result;
        }

        @Override
        public void schemaChangeEvent(SchemaChangeEvent event) throws InterruptedException {
            EventDispatcher.this.historizedSchema.applySchemaChange(event);
            if (EventDispatcher.this.connectorConfig.isSchemaChangesHistoryEnabled()) {
                String topicName = EventDispatcher.this.topicSelector.getPrimaryTopic();
                Integer partition = 0;
                Struct key = this.schemaChangeRecordKey(event);
                Struct value = this.schemaChangeRecordValue(event);
                SourceRecord record = new SourceRecord(event.getPartition(), event.getOffset(), topicName, partition, EventDispatcher.this.schemaChangeKeySchema, (Object)key, EventDispatcher.this.schemaChangeValueSchema, (Object)value);
                EventDispatcher.this.enqueueSchemaChangeMessage(record);
            }
        }
    }
}

