/*
 * Decompiled with CFR 0.152.
 */
package io.debezium.pipeline;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.data.Envelope;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.pipeline.DataChangeEvent;
import io.debezium.pipeline.source.spi.DataChangeEventListener;
import io.debezium.pipeline.spi.ChangeEventCreator;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DataCollectionSchema;
import io.debezium.schema.DatabaseSchema;
import io.debezium.schema.HistorizedDatabaseSchema;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.schema.TopicSelector;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventDispatcher<T extends DataCollectionId> {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventDispatcher.class);
    private final TopicSelector<T> topicSelector;
    private final DatabaseSchema<T> schema;
    private final HistorizedDatabaseSchema<T> historizedSchema;
    private final ChangeEventQueue<DataChangeEvent> queue;
    private final DataCollectionFilters.DataCollectionFilter<T> filter;
    private final ChangeEventCreator changeEventCreator;
    private final Heartbeat heartbeat;
    private DataChangeEventListener eventListener = DataChangeEventListener.NO_OP;
    private final StreamingChangeRecordReceiver streamingReceiver;

    public EventDispatcher(CommonConnectorConfig connectorConfig, TopicSelector<T> topicSelector, DatabaseSchema<T> schema, ChangeEventQueue<DataChangeEvent> queue, DataCollectionFilters.DataCollectionFilter<T> filter, ChangeEventCreator changeEventCreator) {
        this.topicSelector = topicSelector;
        this.schema = schema;
        this.historizedSchema = schema instanceof HistorizedDatabaseSchema ? (HistorizedDatabaseSchema)schema : null;
        this.queue = queue;
        this.filter = filter;
        this.changeEventCreator = changeEventCreator;
        this.streamingReceiver = new StreamingChangeRecordReceiver();
        this.heartbeat = Heartbeat.create(connectorConfig.getConfig(), topicSelector.getHeartbeatTopic(), connectorConfig.getLogicalName());
    }

    public void dispatchSnapshotEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter, SnapshotReceiver receiver) throws InterruptedException {
        DataCollectionSchema dataCollectionSchema = this.schema.schemaFor(dataCollectionId);
        if (dataCollectionSchema == null) {
            throw new IllegalArgumentException("No metadata registered for captured table " + dataCollectionId);
        }
        changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new ChangeRecordEmitter.Receiver((DataCollectionId)dataCollectionId, receiver, dataCollectionSchema){
            final /* synthetic */ DataCollectionId val$dataCollectionId;
            final /* synthetic */ SnapshotReceiver val$receiver;
            final /* synthetic */ DataCollectionSchema val$dataCollectionSchema;
            {
                this.val$dataCollectionId = dataCollectionId;
                this.val$receiver = snapshotReceiver;
                this.val$dataCollectionSchema = dataCollectionSchema;
            }

            @Override
            public void changeRecord(DataCollectionSchema schema, Envelope.Operation operation, Object key, Struct value, OffsetContext offset) throws InterruptedException {
                EventDispatcher.this.eventListener.onEvent("source = " + this.val$dataCollectionId + ", id = " + key + ", offset = " + offset.getSourceInfo());
                this.val$receiver.changeRecord(this.val$dataCollectionSchema, operation, key, value, offset);
            }
        });
    }

    public SnapshotReceiver getSnapshotChangeEventReceiver() {
        return new BufferingSnapshotChangeRecordReceiver();
    }

    public void dispatchDataChangeEvent(T dataCollectionId, ChangeRecordEmitter changeRecordEmitter) throws InterruptedException {
        if (!this.filter.isIncluded(dataCollectionId)) {
            this.eventListener.onSkippedEvent("source = " + dataCollectionId);
            LOGGER.trace("Skipping data change event for {}", dataCollectionId);
        } else {
            DataCollectionSchema dataCollectionSchema = this.schema.schemaFor(dataCollectionId);
            if (dataCollectionSchema == null) {
                throw new IllegalArgumentException("No metadata registered for captured table " + dataCollectionId);
            }
            changeRecordEmitter.emitChangeRecords(dataCollectionSchema, new ChangeRecordEmitter.Receiver((DataCollectionId)dataCollectionId){
                final /* synthetic */ DataCollectionId val$dataCollectionId;
                {
                    this.val$dataCollectionId = dataCollectionId;
                }

                @Override
                public void changeRecord(DataCollectionSchema schema, Envelope.Operation operation, Object key, Struct value, OffsetContext offset) throws InterruptedException {
                    EventDispatcher.this.eventListener.onEvent("operation = " + (Object)((Object)operation) + ", source = " + this.val$dataCollectionId + ", id = " + key + ", offset = " + offset.getSourceInfo());
                    EventDispatcher.this.streamingReceiver.changeRecord(schema, operation, key, value, offset);
                }
            });
        }
        this.heartbeat.heartbeat(changeRecordEmitter.getOffset().getPartition(), changeRecordEmitter.getOffset().getOffset(), this::enqueueHeartbeat);
    }

    public void dispatchSchemaChangeEvent(T dataCollectionId, SchemaChangeEventEmitter schemaChangeEventEmitter) throws InterruptedException {
        if (!this.filter.isIncluded(dataCollectionId)) {
            LOGGER.trace("Skipping data change event for {}", dataCollectionId);
            return;
        }
        schemaChangeEventEmitter.emitSchemaChangeEvent(new SchemaChangeEventReceiver());
    }

    public void dispatchHeartbeatEvent(OffsetContext offset) throws InterruptedException {
        this.heartbeat.forcedBeat(offset.getPartition(), offset.getOffset(), this::enqueueHeartbeat);
    }

    private void enqueueHeartbeat(SourceRecord record) throws InterruptedException {
        this.queue.enqueue(new DataChangeEvent(record));
    }

    public void setEventListener(DataChangeEventListener eventListener) {
        this.eventListener = eventListener;
    }

    private final class SchemaChangeEventReceiver
    implements SchemaChangeEventEmitter.Receiver {
        private SchemaChangeEventReceiver() {
        }

        @Override
        public void schemaChangeEvent(SchemaChangeEvent event) throws InterruptedException {
            EventDispatcher.this.historizedSchema.applySchemaChange(event);
        }
    }

    private final class BufferingSnapshotChangeRecordReceiver
    implements SnapshotReceiver {
        private Supplier<DataChangeEvent> bufferedEvent;

        private BufferingSnapshotChangeRecordReceiver() {
        }

        @Override
        public void changeRecord(DataCollectionSchema dataCollectionSchema, Envelope.Operation operation, Object key, Struct value, OffsetContext offsetContext) throws InterruptedException {
            Objects.requireNonNull(key, "key must not be null");
            Objects.requireNonNull(value, "key must not be null");
            LOGGER.trace("Received change record for {} operation on key {}", (Object)operation, key);
            if (this.bufferedEvent != null) {
                EventDispatcher.this.queue.enqueue(this.bufferedEvent.get());
            }
            Schema keySchema = dataCollectionSchema.keySchema();
            String topicName = EventDispatcher.this.topicSelector.topicNameFor(dataCollectionSchema.id());
            this.bufferedEvent = () -> {
                SourceRecord record = new SourceRecord(offsetContext.getPartition(), offsetContext.getOffset(), topicName, null, keySchema, key, dataCollectionSchema.getEnvelopeSchema().schema(), (Object)value);
                return EventDispatcher.this.changeEventCreator.createDataChangeEvent(record);
            };
        }

        @Override
        public void completeSnapshot() throws InterruptedException {
            if (this.bufferedEvent != null) {
                EventDispatcher.this.queue.enqueue(this.bufferedEvent.get());
                this.bufferedEvent = null;
            }
        }
    }

    private final class StreamingChangeRecordReceiver
    implements ChangeRecordEmitter.Receiver {
        private StreamingChangeRecordReceiver() {
        }

        @Override
        public void changeRecord(DataCollectionSchema dataCollectionSchema, Envelope.Operation operation, Object key, Struct value, OffsetContext offsetContext) throws InterruptedException {
            Objects.requireNonNull(key, "key must not be null");
            Objects.requireNonNull(value, "key must not be null");
            LOGGER.trace("Received change record for {} operation on key {}", (Object)operation, key);
            Schema keySchema = dataCollectionSchema.keySchema();
            String topicName = EventDispatcher.this.topicSelector.topicNameFor(dataCollectionSchema.id());
            SourceRecord record = new SourceRecord(offsetContext.getPartition(), offsetContext.getOffset(), topicName, null, keySchema, key, dataCollectionSchema.getEnvelopeSchema().schema(), (Object)value);
            EventDispatcher.this.queue.enqueue(EventDispatcher.this.changeEventCreator.createDataChangeEvent(record));
            boolean emitTombstonesOnDelete = true;
            if (emitTombstonesOnDelete && operation == Envelope.Operation.DELETE) {
                SourceRecord tombStone = record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), null, null, record.timestamp());
                EventDispatcher.this.queue.enqueue(EventDispatcher.this.changeEventCreator.createDataChangeEvent(tombStone));
            }
        }
    }

    public static interface SnapshotReceiver
    extends ChangeRecordEmitter.Receiver {
        public void completeSnapshot() throws InterruptedException;
    }
}

