/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.debezium.internal;

import io.debezium.config.Configuration;
import io.debezium.relational.history.AbstractDatabaseHistory;
import io.debezium.relational.history.DatabaseHistoryException;
import io.debezium.relational.history.DatabaseHistoryListener;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.HistoryRecordComparator;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.function.Consumer;
import org.apache.flink.cdc.debezium.internal.SchemaRecord;
import org.apache.flink.cdc.debezium.utils.DatabaseHistoryUtil;

public class FlinkDatabaseHistory
extends AbstractDatabaseHistory {
    public static final String DATABASE_HISTORY_INSTANCE_NAME = "database.history.instance.name";
    private ConcurrentLinkedQueue<SchemaRecord> schemaRecords;
    private String instanceName;

    private ConcurrentLinkedQueue<SchemaRecord> getRegisteredHistoryRecord(String instanceName) {
        Collection<SchemaRecord> historyRecords = DatabaseHistoryUtil.retrieveHistory(instanceName);
        return new ConcurrentLinkedQueue<SchemaRecord>(historyRecords);
    }

    @Override
    public void configure(Configuration config, HistoryRecordComparator comparator, DatabaseHistoryListener listener, boolean useCatalogBeforeSchema) {
        super.configure(config, comparator, listener, useCatalogBeforeSchema);
        this.instanceName = config.getString(DATABASE_HISTORY_INSTANCE_NAME);
        this.schemaRecords = this.getRegisteredHistoryRecord(this.instanceName);
        DatabaseHistoryUtil.registerHistory(this.instanceName, this.schemaRecords);
    }

    @Override
    public void stop() {
        super.stop();
        DatabaseHistoryUtil.removeHistory(this.instanceName);
    }

    @Override
    protected void storeRecord(HistoryRecord record) throws DatabaseHistoryException {
        this.schemaRecords.add(new SchemaRecord(record));
    }

    @Override
    protected void recoverRecords(Consumer<HistoryRecord> records) {
        this.schemaRecords.stream().map(SchemaRecord::getHistoryRecord).forEach(records);
    }

    @Override
    public boolean exists() {
        return !this.schemaRecords.isEmpty();
    }

    @Override
    public boolean storageExists() {
        return true;
    }

    public String toString() {
        return "Flink Database History";
    }

    public static boolean isCompatible(Collection<SchemaRecord> records) {
        SchemaRecord record;
        Iterator<SchemaRecord> iterator = records.iterator();
        return !iterator.hasNext() || (record = iterator.next()).isHistoryRecord();
    }
}

