/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.cdc.base.source;

import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.ChangeStreamTableSourceFactory;
import org.apache.seatunnel.api.table.factory.ChangeStreamTableSourceState;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BaseChangeStreamTableSourceFactory
implements ChangeStreamTableSourceFactory {
    private static final Logger log = LoggerFactory.getLogger(BaseChangeStreamTableSourceFactory.class);

    public <T, SplitT extends SourceSplit, StateT extends Serializable> TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
        return this.restoreSource(context, Collections.emptyList());
    }

    public <T, SplitT extends SourceSplit, StateT extends Serializable> TableSource<T, SplitT, StateT> restoreSource(TableSourceFactoryContext context, ChangeStreamTableSourceState<StateT, SplitT> state) {
        return this.restoreSource(context, this.getRestoreTableStruct(state));
    }

    public abstract <T, SplitT extends SourceSplit, StateT extends Serializable> TableSource<T, SplitT, StateT> restoreSource(TableSourceFactoryContext var1, List<CatalogTable> var2);

    protected <SplitT extends SourceSplit, StateT extends Serializable> List<CatalogTable> getRestoreTableStruct(ChangeStreamTableSourceState<StateT, SplitT> state) {
        List incrementalSplits = state.getSplits().stream().flatMap(Collection::stream).filter(e -> e != null).map(e -> (SourceSplitBase)SourceSplitBase.class.cast(e)).filter(e -> e.isIncrementalSplit()).map(e -> e.asIncrementalSplit()).collect(Collectors.toList());
        if (incrementalSplits.size() > 1) {
            throw new UnsupportedOperationException("Multiple incremental splits are not supported");
        }
        if (incrementalSplits.size() == 1) {
            IncrementalSplit incrementalSplit = (IncrementalSplit)incrementalSplits.get(0);
            if (incrementalSplit.getCheckpointTables() != null) {
                List<CatalogTable> checkpointTableStruct = incrementalSplit.getCheckpointTables();
                log.info("Restore source using checkpoint tables: {}", checkpointTableStruct);
                return checkpointTableStruct;
            }
            if (incrementalSplit.getCheckpointDataType() != null) {
                List checkpointDataTypeStruct = CatalogTableUtil.convertDataTypeToCatalogTables((SeaTunnelDataType)incrementalSplit.getCheckpointDataType(), (String)"default.default");
                log.info("Restore source using checkpoint tables: {}", (Object)checkpointDataTypeStruct);
                return checkpointDataTypeStruct;
            }
        }
        log.info("Restore source using checkpoint tables is empty");
        return Collections.emptyList();
    }

    protected List<CatalogTable> mergeTableStruct(List<CatalogTable> dbTableStruct, List<CatalogTable> restoreTableStruct) {
        if (!restoreTableStruct.isEmpty()) {
            Map<TablePath, CatalogTable> restoreTableMap = restoreTableStruct.stream().collect(Collectors.toMap(CatalogTable::getTablePath, t -> t));
            List<CatalogTable> mergedTableStruct = dbTableStruct.stream().map(e -> restoreTableMap.getOrDefault(e.getTablePath(), (CatalogTable)e)).collect(Collectors.toList());
            log.info("Merge db table struct with checkpoint table struct: {}", mergedTableStruct);
            return mergedTableStruct;
        }
        return dbTableStruct;
    }
}

