/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.cdc.base.source.reader.external;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.data.Envelope;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.spi.Partition;
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import io.debezium.util.SchemaNameAdjuster;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect;
import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher;
import org.apache.seatunnel.connectors.cdc.base.source.reader.external.FetchTask;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SnapshotSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import org.apache.seatunnel.connectors.cdc.debezium.ConnectTableChangeSerializer;
import org.apache.seatunnel.connectors.cdc.debezium.EmbeddedDatabaseHistory;

public abstract class JdbcSourceFetchTaskContext
implements FetchTask.Context {
    protected final JdbcSourceConfig sourceConfig;
    protected final JdbcDataSourceDialect dataSourceDialect;
    protected final CommonConnectorConfig dbzConnectorConfig;
    protected final SchemaNameAdjuster schemaNameAdjuster;
    protected final ConnectTableChangeSerializer tableChangeSerializer = new ConnectTableChangeSerializer();
    protected final JsonConverter jsonConverter;

    public JdbcSourceFetchTaskContext(JdbcSourceConfig sourceConfig, JdbcDataSourceDialect dataSourceDialect) {
        this.sourceConfig = sourceConfig;
        this.dataSourceDialect = dataSourceDialect;
        this.dbzConnectorConfig = sourceConfig.getDbzConnectorConfig();
        this.schemaNameAdjuster = SchemaNameAdjuster.create();
        this.jsonConverter = new JsonConverter();
        this.jsonConverter.configure(Collections.singletonMap("schemas.enable", true), false);
    }

    @Override
    public TableId getTableId(SourceRecord record) {
        return SourceRecordUtils.getTableId(record);
    }

    @Override
    public boolean isDataChangeRecord(SourceRecord record) {
        return SourceRecordUtils.isDataChangeRecord(record);
    }

    @Override
    public boolean isRecordBetween(SourceRecord record, Object[] splitStart, Object[] splitEnd) {
        SeaTunnelRowType splitKeyType = this.getSplitType(this.getDatabaseSchema().tableFor(this.getTableId(record)));
        Object[] key = SourceRecordUtils.getSplitKey(splitKeyType, record, this.getSchemaNameAdjuster());
        return SourceRecordUtils.splitKeyRangeContains(key, splitStart, splitEnd);
    }

    @Override
    public void rewriteOutputBuffer(Map<Struct, SourceRecord> outputBuffer, SourceRecord changeRecord) {
        Struct key = (Struct)changeRecord.key();
        Struct value = (Struct)changeRecord.value();
        if (value != null) {
            Envelope.Operation operation = Envelope.Operation.forCode((String)value.getString("op"));
            switch (operation) {
                case CREATE: 
                case UPDATE: {
                    Envelope envelope = Envelope.fromSchema((Schema)changeRecord.valueSchema());
                    Struct source = value.getStruct("source");
                    Struct after = value.getStruct("after");
                    Instant fetchTs = Instant.ofEpochMilli((Long)source.get("ts_ms"));
                    SourceRecord record = new SourceRecord(changeRecord.sourcePartition(), changeRecord.sourceOffset(), changeRecord.topic(), changeRecord.kafkaPartition(), changeRecord.keySchema(), changeRecord.key(), changeRecord.valueSchema(), (Object)envelope.read((Object)after, source, fetchTs));
                    outputBuffer.put(key, record);
                    break;
                }
                case DELETE: {
                    outputBuffer.remove(key);
                    break;
                }
                case READ: {
                    throw new IllegalStateException(String.format("Data change record shouldn't use READ operation, the the record is %s.", changeRecord));
                }
            }
        }
    }

    @Override
    public List<SourceRecord> formatMessageTimestamp(Collection<SourceRecord> snapshotRecords) {
        return snapshotRecords.stream().map(record -> {
            Envelope envelope = Envelope.fromSchema((Schema)record.valueSchema());
            Struct value = (Struct)record.value();
            Struct updateAfter = value.getStruct("after");
            Struct source = value.getStruct("source");
            source.put("ts_ms", (Object)0L);
            Instant fetchTs = Instant.ofEpochMilli(value.getInt64("ts_ms"));
            SourceRecord sourceRecord = new SourceRecord(record.sourcePartition(), record.sourceOffset(), record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), (Object)envelope.read((Object)updateAfter, source, fetchTs));
            return sourceRecord;
        }).collect(Collectors.toList());
    }

    protected void registerDatabaseHistory(SourceSplitBase sourceSplitBase, JdbcConnection connection) {
        ArrayList<TableChanges.TableChange> engineHistory = new ArrayList<TableChanges.TableChange>();
        if (sourceSplitBase instanceof SnapshotSplit) {
            SnapshotSplit snapshotSplit = (SnapshotSplit)sourceSplitBase;
            engineHistory.add(this.dataSourceDialect.queryTableSchema(connection, snapshotSplit.getTableId()));
        } else {
            IncrementalSplit incrementalSplit = (IncrementalSplit)sourceSplitBase;
            Map<TableId, byte[]> historyTableChanges = incrementalSplit.getHistoryTableChanges();
            for (TableId tableId : incrementalSplit.getTableIds()) {
                if (historyTableChanges != null && historyTableChanges.containsKey(tableId)) {
                    SchemaAndValue schemaAndValue = this.jsonConverter.toConnectData("topic", historyTableChanges.get(tableId));
                    Struct deserializedStruct = (Struct)schemaAndValue.value();
                    TableChanges tableChanges = this.tableChangeSerializer.deserialize(Collections.singletonList(deserializedStruct), false);
                    Iterator iterator = tableChanges.iterator();
                    TableChanges.TableChange tableChange = null;
                    while (iterator.hasNext()) {
                        if (tableChange != null) {
                            throw new IllegalStateException("The table changes should only have one element");
                        }
                        tableChange = (TableChanges.TableChange)iterator.next();
                    }
                    engineHistory.add(tableChange);
                    continue;
                }
                engineHistory.add(this.dataSourceDialect.queryTableSchema(connection, tableId));
            }
        }
        EmbeddedDatabaseHistory.registerHistory(this.sourceConfig.getDbzConfiguration().getString("database.history.instance.name"), engineHistory);
    }

    public SourceConfig getSourceConfig() {
        return this.sourceConfig;
    }

    @Override
    public boolean isExactlyOnce() {
        return this.sourceConfig.isExactlyOnce();
    }

    public JdbcDataSourceDialect getDataSourceDialect() {
        return this.dataSourceDialect;
    }

    public CommonConnectorConfig getDbzConnectorConfig() {
        return this.dbzConnectorConfig;
    }

    public SchemaNameAdjuster getSchemaNameAdjuster() {
        return this.schemaNameAdjuster;
    }

    public abstract RelationalDatabaseSchema getDatabaseSchema();

    public abstract SeaTunnelRowType getSplitType(Table var1);

    public abstract ErrorHandler getErrorHandler();

    public abstract JdbcSourceEventDispatcher getDispatcher();

    public abstract OffsetContext getOffsetContext();

    public abstract Partition getPartition();
}

