/*
 * Decompiled with CFR 0.152.
 */
package org.apache.seatunnel.connectors.seatunnel.databend.source;

import com.google.auto.service.AutoService;
import java.util.Collections;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.databend.catalog.DatabendCatalog;
import org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendOptions;
import org.apache.seatunnel.connectors.seatunnel.databend.config.DatabendSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.databend.exception.DatabendConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.databend.exception.DatabendConnectorException;
import org.apache.seatunnel.connectors.seatunnel.databend.source.DatabendSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoService(value={Factory.class})
public class DatabendSourceFactory
implements TableSourceFactory {
    private static final Logger log = LoggerFactory.getLogger(DatabendSourceFactory.class);

    public String factoryIdentifier() {
        return "Databend";
    }

    public OptionRule optionRule() {
        return OptionRule.builder().required(new Option[]{DatabendOptions.URL, DatabendOptions.USERNAME, DatabendOptions.PASSWORD}).optional(new Option[]{DatabendOptions.DATABASE, DatabendOptions.TABLE, DatabendOptions.JDBC_CONFIG, DatabendOptions.FETCH_SIZE, DatabendSourceOptions.SQL, DatabendOptions.QUERY, DatabendOptions.SSL}).build();
    }

    public Class<? extends SeaTunnelSource> getSourceClass() {
        return DatabendSource.class;
    }

    public TableSource createSource(TableSourceFactoryContext context) {
        return () -> {
            ReadonlyConfig options = context.getOptions();
            if (!((String)options.get(DatabendOptions.URL)).startsWith("jdbc:databend://")) {
                throw new DatabendConnectorException(DatabendConnectorErrorCode.CONNECT_FAILED, "Databend URL should start with 'jdbc:databend://'");
            }
            String url = (String)options.get(DatabendOptions.URL);
            Boolean ssl = (Boolean)options.get(DatabendOptions.SSL);
            String username = (String)options.get(DatabendOptions.USERNAME);
            String password = (String)options.get(DatabendOptions.PASSWORD);
            Integer fetchSize = (Integer)options.get(DatabendOptions.FETCH_SIZE);
            String sql = this.buildSqlStatement(options);
            String catalogName = "default";
            String database = options.getOptional(DatabendOptions.DATABASE).orElse("default");
            String table = options.getOptional(DatabendOptions.TABLE).orElse("default");
            DatabendCatalog catalog = new DatabendCatalog(options, catalogName);
            try {
                catalog.open();
                TablePath tablePath = TablePath.of((String)database, (String)table);
                CatalogTable catalogTable = catalog.getTable(tablePath);
                log.info("Successfully retrieved catalog table: {}", (Object)catalogTable);
                DatabendSource databendSource = new DatabendSource(catalogTable, sql, url, ssl, username, password, fetchSize);
                return databendSource;
            }
            catch (Exception e) {
                log.warn("Failed to get table schema from catalog, will try to infer schema from query", e);
                TableSchema.Builder builder = TableSchema.builder();
                TableSchema tableSchema = builder.build();
                CatalogTable catalogTable = CatalogTable.of((TableIdentifier)TableIdentifier.of((String)catalogName, (String)database, (String)table), (TableSchema)tableSchema, Collections.emptyMap(), Collections.emptyList(), (String)"", (String)catalogName);
                DatabendSource databendSource = new DatabendSource(catalogTable, sql, url, ssl, username, password, fetchSize);
                return databendSource;
            }
            finally {
                try {
                    catalog.close();
                }
                catch (Exception e) {
                    log.warn("Failed to close catalog", e);
                }
            }
        };
    }

    private String buildSqlStatement(ReadonlyConfig options) {
        if (options.getOptional(DatabendSourceOptions.SQL).isPresent()) {
            return (String)options.get(DatabendSourceOptions.SQL);
        }
        String query = options.getOptional(DatabendOptions.QUERY).orElse(null);
        if (query != null) {
            return query;
        }
        String database = options.getOptional(DatabendOptions.DATABASE).orElse(null);
        String table = options.getOptional(DatabendOptions.TABLE).orElse(null);
        if (database != null && table != null) {
            return String.format("SELECT * FROM %s.%s", database, table);
        }
        throw new DatabendConnectorException(DatabendConnectorErrorCode.SQL_OPERATION_FAILED, "Either SQL, query, or both database and table must be specified");
    }
}

