/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.iceberg;

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.MetadataUpdate;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableMetadataParser;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.iceberg.types.Types;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.Path;
import org.apache.paimon.iceberg.IcebergMetadataCommitter;
import org.apache.paimon.iceberg.IcebergOptions;
import org.apache.paimon.iceberg.metadata.IcebergMetadata;
import org.apache.paimon.iceberg.metadata.IcebergSchema;
import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergRestMetadataCommitter
implements IcebergMetadataCommitter {
    private static final Logger LOG = LoggerFactory.getLogger(IcebergRestMetadataCommitter.class);
    private static final String REST_CATALOG_NAME = "rest-catalog";
    private final RESTCatalog restCatalog;
    private final String icebergDatabaseName;
    private final TableIdentifier icebergTableIdentifier;
    private final IcebergOptions icebergOptions;
    private Table icebergTable;

    public IcebergRestMetadataCommitter(FileStoreTable table) {
        Options options = new Options(table.options());
        this.icebergOptions = new IcebergOptions(options);
        Identifier identifier = (Identifier)Preconditions.checkNotNull((Object)table.catalogEnvironment().identifier());
        String icebergDatabase = (String)options.get(IcebergOptions.METASTORE_DATABASE);
        String icebergTable = (String)options.get(IcebergOptions.METASTORE_TABLE);
        this.icebergDatabaseName = icebergDatabase != null && !icebergDatabase.isEmpty() ? icebergDatabase : identifier.getDatabaseName();
        String icebergTableName = icebergTable != null && !icebergTable.isEmpty() ? icebergTable : identifier.getTableName();
        this.icebergTableIdentifier = TableIdentifier.of((Namespace)Namespace.of((String[])new String[]{this.icebergDatabaseName}), (String)icebergTableName);
        Map restConfigs = this.icebergOptions.icebergRestConfig();
        try {
            Configuration hadoopConf = new Configuration();
            hadoopConf.setClassLoader(IcebergRestMetadataCommitter.class.getClassLoader());
            this.restCatalog = this.initRestCatalog(restConfigs, hadoopConf);
        }
        catch (Exception e) {
            throw new RuntimeException("Fail to initialize iceberg rest catalog.", e);
        }
    }

    public String identifier() {
        return "rest";
    }

    public void commitMetadata(Path newMetadataPath, @Nullable Path baseMetadataPath) {
        throw new UnsupportedOperationException();
    }

    public void commitMetadata(IcebergMetadata newIcebergMetadata, @Nullable IcebergMetadata baseIcebergMetadata) {
        try {
            this.commitMetadataImpl(newIcebergMetadata, baseIcebergMetadata);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void commitMetadataImpl(IcebergMetadata newIcebergMetadata, @Nullable IcebergMetadata baseIcebergMetadata) {
        TableMetadata.Builder updatdeBuilder;
        newIcebergMetadata = this.adjustMetadataForRest(newIcebergMetadata);
        TableMetadata newMetadata = TableMetadataParser.fromJson((String)newIcebergMetadata.toJson());
        if (!this.databaseExists()) {
            this.createDatabase();
        }
        try {
            if (!this.tableExists()) {
                LOG.info("Table {} does not exist, create it.", (Object)this.icebergTableIdentifier);
                this.icebergTable = this.createTable();
                updatdeBuilder = this.updatesForCorrectBase(((BaseTable)this.icebergTable).operations().current(), newMetadata, true);
            } else {
                this.icebergTable = this.getTable();
                TableMetadata metadata = ((BaseTable)this.icebergTable).operations().current();
                boolean withBase = IcebergRestMetadataCommitter.checkBase(metadata, newMetadata, baseIcebergMetadata);
                if (withBase) {
                    LOG.info("create updates with base metadata.");
                    updatdeBuilder = this.updatesForCorrectBase(metadata, newMetadata, false);
                } else {
                    LOG.info("create updates without base metadata. currentSnapshotId for base metadata: {}, for new metadata:{}", (Object)metadata.currentSnapshot().snapshotId(), (Object)newMetadata.currentSnapshot().snapshotId());
                    updatdeBuilder = this.updatesForIncorrectBase(newMetadata);
                }
            }
        }
        catch (Exception e) {
            throw new RuntimeException("Fail to create table or get table: " + this.icebergTableIdentifier, e);
        }
        TableMetadata updatedForCommit = updatdeBuilder.build();
        if (LOG.isDebugEnabled()) {
            LOG.debug("updates:{}", (Object)IcebergRestMetadataCommitter.updatesToString(updatedForCommit.changes()));
        }
        try {
            ((BaseTable)this.icebergTable).operations().commit(((BaseTable)this.icebergTable).operations().current(), updatedForCommit);
        }
        catch (Exception e) {
            throw new RuntimeException("Fail to commit metadata to rest catalog.", e);
        }
    }

    private TableMetadata.Builder updatesForCorrectBase(TableMetadata base, TableMetadata newMetadata, boolean isNewTable) {
        TableMetadata.Builder updateBuilder = TableMetadata.buildFrom((TableMetadata)base);
        int schemaId = this.icebergTable.schema().schemaId();
        if (isNewTable) {
            Preconditions.checkArgument((schemaId == 0 ? 1 : 0) != 0, (String)"the schema id for newly created iceberg table should be 0, but is %s", (Object[])new Object[]{schemaId});
            this.addAndSetCurrentSchema(newMetadata.schemas(), newMetadata.currentSchemaId(), updateBuilder);
            updateBuilder.addPartitionSpec(newMetadata.spec());
            updateBuilder.setDefaultPartitionSpec(newMetadata.defaultSpecId());
            this.addNewSnapshot(newMetadata.currentSnapshot(), updateBuilder);
        } else {
            Preconditions.checkArgument((newMetadata.currentSchemaId() >= schemaId ? 1 : 0) != 0, (String)"the new metadata has correct base, but the schemaId(%s) in iceberg table is greater than currentSchemaId(%s) in new metadata.", (Object[])new Object[]{schemaId, newMetadata.currentSchemaId()});
            if (newMetadata.currentSchemaId() != schemaId) {
                this.addAndSetCurrentSchema(newMetadata.schemas().stream().filter(schema -> schema.schemaId() > schemaId).collect(Collectors.toList()), newMetadata.currentSchemaId(), updateBuilder);
            }
            this.addNewSnapshot(newMetadata.currentSnapshot(), updateBuilder);
            HashSet<Long> snapshotIdsToRemove = new HashSet<Long>();
            this.icebergTable.snapshots().forEach(snapshot -> snapshotIdsToRemove.add(snapshot.snapshotId()));
            Set snapshotIdsInNewMetadata = newMetadata.snapshots().stream().map(Snapshot::snapshotId).collect(Collectors.toSet());
            snapshotIdsToRemove.removeAll(snapshotIdsInNewMetadata);
            this.removeSnapshots(snapshotIdsToRemove, updateBuilder);
        }
        return updateBuilder;
    }

    private TableMetadata.Builder updatesForIncorrectBase(TableMetadata newMetadata) {
        LOG.info("the base metadata is incorrect, we'll recreate the iceberg table.");
        this.icebergTable = this.recreateTable();
        return this.updatesForCorrectBase(((BaseTable)this.icebergTable).operations().current(), newMetadata, true);
    }

    private RESTCatalog initRestCatalog(Map<String, String> restConfigs, Configuration conf) {
        restConfigs.put("type", "rest");
        Catalog catalog = CatalogUtil.buildIcebergCatalog((String)REST_CATALOG_NAME, restConfigs, (Object)conf);
        return (RESTCatalog)catalog;
    }

    private boolean databaseExists() {
        return this.restCatalog.namespaceExists(Namespace.of((String[])new String[]{this.icebergDatabaseName}));
    }

    private boolean tableExists() {
        return this.restCatalog.tableExists(this.icebergTableIdentifier);
    }

    private void createDatabase() {
        this.restCatalog.createNamespace(Namespace.of((String[])new String[]{this.icebergDatabaseName}));
    }

    private Table createTable() {
        Schema emptySchema = new Schema(new Types.NestedField[0]);
        return this.restCatalog.createTable(this.icebergTableIdentifier, emptySchema);
    }

    private Table getTable() {
        return this.restCatalog.loadTable(this.icebergTableIdentifier);
    }

    private void dropTable() {
        this.restCatalog.dropTable(this.icebergTableIdentifier, false);
    }

    private Table recreateTable() {
        try {
            this.dropTable();
            return this.createTable();
        }
        catch (Exception e) {
            throw new RuntimeException("Fail to recreate iceberg table.", e);
        }
    }

    private void addNewSnapshot(Snapshot newSnapshot, TableMetadata.Builder update) {
        update.setBranchSnapshot(newSnapshot, "main");
    }

    private void removeSnapshots(Set<Long> snapshotIds, TableMetadata.Builder update) {
        update.removeSnapshots(snapshotIds);
    }

    private void addAndSetCurrentSchema(List<Schema> schemas, int currentSchemaId, TableMetadata.Builder update) {
        for (Schema schema : schemas) {
            update.addSchema(schema);
        }
        update.setCurrentSchema(currentSchemaId);
        HashMap<String, String> properties = new HashMap<String, String>();
        properties.put("write.metadata.previous-versions-max", String.valueOf(this.icebergOptions.previousVersionsMax()));
        properties.put("write.metadata.delete-after-commit.enabled", String.valueOf(this.icebergOptions.deleteAfterCommitEnabled()));
        update.setProperties(properties);
    }

    private static boolean checkBase(TableMetadata currentMetadata, TableMetadata newMetadata, @Nullable IcebergMetadata baseIcebergMetadata) {
        if (baseIcebergMetadata == null) {
            LOG.info("new metadata without base metadata cause base metadata from upstream is null.");
            return false;
        }
        return currentMetadata.currentSnapshot().snapshotId() == newMetadata.currentSnapshot().snapshotId() - 1L;
    }

    private IcebergMetadata adjustMetadataForRest(IcebergMetadata newIcebergMetadata) {
        List schemas = newIcebergMetadata.schemas().stream().map(schema -> new IcebergSchema(schema.schemaId() + 1, schema.fields())).collect(Collectors.toList());
        int currentSchemaId = newIcebergMetadata.currentSchemaId() + 1;
        List snapshots = newIcebergMetadata.snapshots().stream().map(snapshot -> new IcebergSnapshot(snapshot.sequenceNumber(), snapshot.snapshotId(), snapshot.timestampMs(), snapshot.summary(), snapshot.manifestList(), snapshot.schemaId() + 1)).collect(Collectors.toList());
        return new IcebergMetadata(newIcebergMetadata.formatVersion(), newIcebergMetadata.tableUuid(), newIcebergMetadata.location(), newIcebergMetadata.currentSnapshotId(), newIcebergMetadata.lastColumnId(), schemas, currentSchemaId, newIcebergMetadata.partitionSpecs(), newIcebergMetadata.lastPartitionId(), snapshots, newIcebergMetadata.currentSnapshotId(), newIcebergMetadata.refs());
    }

    private static String updateToString(MetadataUpdate update) {
        if (update instanceof MetadataUpdate.AddSnapshot) {
            return String.format("AddSnapshot(%s)", ((MetadataUpdate.AddSnapshot)update).snapshot().snapshotId());
        }
        if (update instanceof MetadataUpdate.RemoveSnapshot) {
            return String.format("RemoveSnapshot(%s)", ((MetadataUpdate.RemoveSnapshot)update).snapshotId());
        }
        if (update instanceof MetadataUpdate.SetSnapshotRef) {
            return String.format("SetSnapshotRef(%s, %s, %s)", ((MetadataUpdate.SetSnapshotRef)update).name(), ((MetadataUpdate.SetSnapshotRef)update).type(), ((MetadataUpdate.SetSnapshotRef)update).snapshotId());
        }
        if (update instanceof MetadataUpdate.AddSchema) {
            return String.format("AddSchema(%s)", ((MetadataUpdate.AddSchema)update).schema().schemaId());
        }
        if (update instanceof MetadataUpdate.SetCurrentSchema) {
            return String.format("SetCurrentSchema(%s)", ((MetadataUpdate.SetCurrentSchema)update).schemaId());
        }
        if (update instanceof MetadataUpdate.SetProperties) {
            return String.format("SetProperties(%s)", ((MetadataUpdate.SetProperties)update).updated());
        }
        return update.toString();
    }

    private static String updatesToString(List<MetadataUpdate> updates) {
        return updates.stream().map(IcebergRestMetadataCommitter::updateToString).collect(Collectors.joining(", "));
    }
}

