/*
 * Decompiled with CFR 0.152.
 */
package org.apache.linkis.engineconnplugin.flink.client.shims;

import java.lang.reflect.Method;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.bridge.java.internal.BatchTableEnvironmentImpl;
import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.descriptors.FunctionDescriptor;
import org.apache.flink.table.factories.BatchTableSinkFactory;
import org.apache.flink.table.factories.BatchTableSourceFactory;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.factories.ModuleFactory;
import org.apache.flink.table.factories.TableFactoryService;
import org.apache.flink.table.factories.TableSinkFactory;
import org.apache.flink.table.factories.TableSourceFactory;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.FunctionService;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.TemporalTableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.sinks.TableSink;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.util.TemporaryClassLoaderContext;
import org.apache.linkis.engineconnplugin.flink.client.shims.FlinkShims;
import org.apache.linkis.engineconnplugin.flink.client.shims.SessionState;
import org.apache.linkis.engineconnplugin.flink.client.shims.config.Environment;
import org.apache.linkis.engineconnplugin.flink.client.shims.config.entries.ExecutionEntry;
import org.apache.linkis.engineconnplugin.flink.client.shims.config.entries.SinkTableEntry;
import org.apache.linkis.engineconnplugin.flink.client.shims.config.entries.SourceSinkTableEntry;
import org.apache.linkis.engineconnplugin.flink.client.shims.config.entries.SourceTableEntry;
import org.apache.linkis.engineconnplugin.flink.client.shims.config.entries.TableEntry;
import org.apache.linkis.engineconnplugin.flink.client.shims.config.entries.TemporalTableEntry;
import org.apache.linkis.engineconnplugin.flink.client.shims.config.entries.ViewEntry;
import org.apache.linkis.engineconnplugin.flink.client.shims.errorcode.FlinkErrorCodeSummary;
import org.apache.linkis.engineconnplugin.flink.client.shims.exception.SqlExecutionException;

public class Flink1122Shims
extends FlinkShims {
    private SessionState sessionState;
    private TableEnvironmentInternal tableEnv;
    private Environment environment;
    private ClassLoader classLoader;
    private StreamExecutionEnvironment streamExecEnv;
    private Executor executor;
    private ExecutionEnvironment execEnv;

    public Flink1122Shims(String flinkVersion) {
        super(flinkVersion);
    }

    public CompletableFuture<String> triggerSavepoint(Object clusterClientObject, Object jobIdObject, String savepoint) {
        ClusterClient clusterClient = (ClusterClient)clusterClientObject;
        return clusterClient.triggerSavepoint((JobID)jobIdObject, savepoint);
    }

    public CompletableFuture<String> cancelWithSavepoint(Object clusterClientObject, Object jobIdObject, String savepoint) {
        ClusterClient clusterClient = (ClusterClient)clusterClientObject;
        return clusterClient.cancelWithSavepoint((JobID)jobIdObject, savepoint);
    }

    public CompletableFuture<String> stopWithSavepoint(Object clusterClientObject, Object jobIdObject, boolean advanceToEndOfEventTime, String savepoint) {
        ClusterClient clusterClient = (ClusterClient)clusterClientObject;
        return clusterClient.stopWithSavepoint((JobID)jobIdObject, advanceToEndOfEventTime, savepoint);
    }

    public Object initializeTableEnvironment(Object environmentObject, Object flinkConfigObject, Object streamExecEnvObject, Object sessionStateObject, ClassLoader classLoader) throws SqlExecutionException {
        boolean noInheritedState;
        Configuration flinkConfig = (Configuration)flinkConfigObject;
        SessionState sessionState = (SessionState)sessionStateObject;
        this.streamExecEnv = (StreamExecutionEnvironment)streamExecEnvObject;
        this.environment = (Environment)environmentObject;
        this.classLoader = classLoader;
        EnvironmentSettings settings = this.environment.getExecution().getEnvironmentSettings();
        TableConfig config = new TableConfig();
        this.environment.getConfiguration().asMap().forEach((k, v) -> config.getConfiguration().setString(k, v));
        boolean bl = noInheritedState = sessionState == null;
        if (noInheritedState) {
            ModuleManager moduleManager = new ModuleManager();
            CatalogManager catalogManager = CatalogManager.newBuilder().classLoader(classLoader).config((ReadableConfig)config.getConfiguration()).defaultCatalog(settings.getBuiltInCatalogName(), (Catalog)new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName())).build();
            FunctionCatalog functionCatalog = new FunctionCatalog(config, catalogManager, moduleManager);
            this.sessionState = SessionState.of(catalogManager, moduleManager, functionCatalog);
            this.createTableEnvironment(settings, config, catalogManager, moduleManager, functionCatalog);
            LinkedHashMap<String, Module> modules = new LinkedHashMap<String, Module>();
            this.environment.getModules().forEach((name, entry) -> modules.put((String)name, this.createModule(entry.asMap(), classLoader)));
            if (!modules.isEmpty()) {
                this.tableEnv.unloadModule("core");
                modules.forEach((arg_0, arg_1) -> ((TableEnvironmentInternal)this.tableEnv).loadModule(arg_0, arg_1));
            }
            this.registerFunctions();
            this.initializeCatalogs();
        } else {
            this.sessionState = sessionState;
            this.createTableEnvironment(settings, config, sessionState.catalogManager, sessionState.moduleManager, sessionState.functionCatalog);
        }
        return this.tableEnv;
    }

    private void createTableEnvironment(EnvironmentSettings settings, TableConfig config, CatalogManager catalogManager, ModuleManager moduleManager, FunctionCatalog functionCatalog) {
        if (this.environment.getExecution().isStreamingPlanner()) {
            this.execEnv = null;
            Map executorProperties = settings.toExecutorProperties();
            this.executor = Flink1122Shims.lookupExecutor(executorProperties, this.streamExecEnv);
            this.tableEnv = this.createStreamTableEnvironment(this.streamExecEnv, settings, config, this.executor, catalogManager, moduleManager, functionCatalog);
            return;
        }
        this.streamExecEnv = null;
        this.execEnv = this.createExecutionEnvironment();
        this.executor = null;
        this.tableEnv = new BatchTableEnvironmentImpl(this.execEnv, config, catalogManager, moduleManager);
    }

    private void initializeCatalogs() throws SqlExecutionException {
        TableEntry entry;
        this.wrapClassLoader(() -> this.environment.getCatalogs().forEach((name, entry) -> {
            Catalog catalog = this.createCatalog((String)name, entry.asMap(), this.classLoader);
            this.tableEnv.registerCatalog(name, catalog);
        }));
        HashMap<String, Object> tableSources = new HashMap<String, Object>();
        HashMap<String, Object> tableSinks = new HashMap<String, Object>();
        for (Map.Entry keyValue : this.environment.getTables().entrySet()) {
            String name = (String)keyValue.getKey();
            TableEntry entry2 = (TableEntry)keyValue.getValue();
            if (entry2 instanceof SourceTableEntry || entry2 instanceof SourceSinkTableEntry) {
                tableSources.put(name, Flink1122Shims.createTableSource(this.environment.getExecution(), entry2.asMap(), this.classLoader));
            }
            if (!(entry2 instanceof SinkTableEntry) && !(entry2 instanceof SourceSinkTableEntry)) continue;
            tableSinks.put(name, Flink1122Shims.createTableSink(this.environment.getExecution(), entry2.asMap(), this.classLoader));
        }
        tableSources.forEach((arg_0, arg_1) -> ((TableEnvironmentInternal)this.tableEnv).registerTableSourceInternal(arg_0, arg_1));
        tableSinks.forEach((arg_0, arg_1) -> ((TableEnvironmentInternal)this.tableEnv).registerTableSinkInternal(arg_0, arg_1));
        for (Map.Entry keyValue : this.environment.getTables().entrySet()) {
            entry = (TableEntry)keyValue.getValue();
            if (!(entry instanceof TemporalTableEntry)) continue;
            TemporalTableEntry temporalTableEntry = (TemporalTableEntry)entry;
            this.registerTemporalTable(temporalTableEntry);
        }
        for (Map.Entry keyValue : this.environment.getTables().entrySet()) {
            entry = (TableEntry)keyValue.getValue();
            if (!(entry instanceof ViewEntry)) continue;
            ViewEntry viewEntry = (ViewEntry)entry;
            this.registerView(viewEntry);
        }
        Optional catalog = this.environment.getExecution().getCurrentCatalog();
        catalog.ifPresent(arg_0 -> ((TableEnvironmentInternal)this.tableEnv).useCatalog(arg_0));
        Optional database = this.environment.getExecution().getCurrentDatabase();
        database.ifPresent(arg_0 -> ((TableEnvironmentInternal)this.tableEnv).useDatabase(arg_0));
    }

    private ExecutionEnvironment createExecutionEnvironment() {
        ExecutionEnvironment execEnv = ExecutionEnvironment.getExecutionEnvironment();
        execEnv.setRestartStrategy(this.environment.getExecution().getRestartStrategy());
        execEnv.setParallelism(this.environment.getExecution().getParallelism());
        return execEnv;
    }

    private void registerFunctions() throws SqlExecutionException {
        LinkedHashMap<String, FunctionDefinition> functions = new LinkedHashMap<String, FunctionDefinition>();
        this.environment.getFunctions().forEach((name, entry) -> {
            UserDefinedFunction function = FunctionService.createFunction((FunctionDescriptor)entry.getDescriptor(), (ClassLoader)this.classLoader, (boolean)false);
            functions.put((String)name, (FunctionDefinition)function);
        });
        this.registerFunctions(functions);
    }

    private void registerFunctions(Map<String, FunctionDefinition> functions) throws SqlExecutionException {
        if (this.tableEnv instanceof StreamTableEnvironment) {
            StreamTableEnvironment streamTableEnvironment = (StreamTableEnvironment)this.tableEnv;
            for (Map.Entry<String, FunctionDefinition> keyValue : functions.entrySet()) {
                String k = keyValue.getKey();
                FunctionDefinition v = keyValue.getValue();
                if (v instanceof ScalarFunction) {
                    streamTableEnvironment.registerFunction(k, (ScalarFunction)v);
                    continue;
                }
                if (v instanceof AggregateFunction) {
                    streamTableEnvironment.registerFunction(k, (AggregateFunction)v);
                    continue;
                }
                if (v instanceof TableFunction) {
                    streamTableEnvironment.registerFunction(k, (TableFunction)v);
                    continue;
                }
                throw new SqlExecutionException(MessageFormat.format(FlinkErrorCodeSummary.SUPPORTED_FUNCTION_TYPE.getErrorDesc(), v.getClass().getName()));
            }
        } else {
            BatchTableEnvironment batchTableEnvironment = (BatchTableEnvironment)this.tableEnv;
            for (Map.Entry<String, FunctionDefinition> keyValue : functions.entrySet()) {
                String k = keyValue.getKey();
                FunctionDefinition v = keyValue.getValue();
                if (v instanceof ScalarFunction) {
                    batchTableEnvironment.registerFunction(k, (ScalarFunction)v);
                    continue;
                }
                if (v instanceof AggregateFunction) {
                    batchTableEnvironment.registerFunction(k, (AggregateFunction)v);
                    continue;
                }
                if (v instanceof TableFunction) {
                    batchTableEnvironment.registerFunction(k, (TableFunction)v);
                    continue;
                }
                throw new SqlExecutionException(MessageFormat.format(FlinkErrorCodeSummary.SUPPORTED_FUNCTION_TYPE.getErrorDesc(), v.getClass().getName()));
            }
        }
    }

    private void registerView(ViewEntry viewEntry) throws SqlExecutionException {
        try {
            this.tableEnv.registerTable(viewEntry.getName(), this.tableEnv.sqlQuery(viewEntry.getQuery()));
        }
        catch (Exception e) {
            throw new SqlExecutionException("Invalid view '" + viewEntry.getName() + "' with query:\n" + viewEntry.getQuery() + "\nCause: " + e.getMessage());
        }
    }

    private void registerTemporalTable(TemporalTableEntry temporalTableEntry) throws SqlExecutionException {
        try {
            Table table = this.tableEnv.scan(new String[]{temporalTableEntry.getHistoryTable()});
            TemporalTableFunction function = table.createTemporalTableFunction(temporalTableEntry.getTimeAttribute(), String.join((CharSequence)",", temporalTableEntry.getPrimaryKeyFields()));
            if (this.tableEnv instanceof StreamTableEnvironment) {
                StreamTableEnvironment streamTableEnvironment = (StreamTableEnvironment)this.tableEnv;
                streamTableEnvironment.registerFunction(temporalTableEntry.getName(), (TableFunction)function);
            } else {
                BatchTableEnvironment batchTableEnvironment = (BatchTableEnvironment)this.tableEnv;
                batchTableEnvironment.registerFunction(temporalTableEntry.getName(), (TableFunction)function);
            }
        }
        catch (Exception e) {
            throw new SqlExecutionException("Invalid temporal table '" + temporalTableEntry.getName() + "' over table '" + temporalTableEntry.getHistoryTable() + ".\nCause: " + e.getMessage());
        }
    }

    private static Executor lookupExecutor(Map<String, String> executorProperties, StreamExecutionEnvironment executionEnvironment) {
        try {
            ExecutorFactory executorFactory = (ExecutorFactory)ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
            Method createMethod = executorFactory.getClass().getMethod("create", Map.class, StreamExecutionEnvironment.class);
            return (Executor)createMethod.invoke((Object)executorFactory, executorProperties, executionEnvironment);
        }
        catch (Exception e) {
            throw new TableException("Could not instantiate the executor. Make sure a planner module is on the classpath", (Throwable)e);
        }
    }

    private Module createModule(Map<String, String> moduleProperties, ClassLoader classLoader) {
        ModuleFactory factory = (ModuleFactory)TableFactoryService.find(ModuleFactory.class, moduleProperties, (ClassLoader)classLoader);
        return factory.createModule(moduleProperties);
    }

    private Catalog createCatalog(String name, Map<String, String> catalogProperties, ClassLoader classLoader) {
        CatalogFactory factory = (CatalogFactory)TableFactoryService.find(CatalogFactory.class, catalogProperties, (ClassLoader)classLoader);
        return factory.createCatalog(name, catalogProperties);
    }

    private static TableSource<?> createTableSource(ExecutionEntry execution, Map<String, String> sourceProperties, ClassLoader classLoader) throws SqlExecutionException {
        if (execution.isStreamingPlanner()) {
            TableSourceFactory factory = (TableSourceFactory)TableFactoryService.find(TableSourceFactory.class, sourceProperties, (ClassLoader)classLoader);
            return factory.createTableSource(sourceProperties);
        }
        if (execution.isBatchPlanner()) {
            BatchTableSourceFactory factory = (BatchTableSourceFactory)TableFactoryService.find(BatchTableSourceFactory.class, sourceProperties, (ClassLoader)classLoader);
            return factory.createBatchTableSource(sourceProperties);
        }
        throw new SqlExecutionException(FlinkErrorCodeSummary.SUPPORTED_SOURCES.getErrorDesc());
    }

    private static TableSink<?> createTableSink(ExecutionEntry execution, Map<String, String> sinkProperties, ClassLoader classLoader) throws SqlExecutionException {
        if (execution.isStreamingPlanner()) {
            TableSinkFactory factory = (TableSinkFactory)TableFactoryService.find(TableSinkFactory.class, sinkProperties, (ClassLoader)classLoader);
            return factory.createTableSink(sinkProperties);
        }
        if (execution.isBatchPlanner()) {
            BatchTableSinkFactory factory = (BatchTableSinkFactory)TableFactoryService.find(BatchTableSinkFactory.class, sinkProperties, (ClassLoader)classLoader);
            return factory.createBatchTableSink(sinkProperties);
        }
        throw new SqlExecutionException(FlinkErrorCodeSummary.SUPPORTED_SINKS.getErrorDesc());
    }

    private TableEnvironmentInternal createStreamTableEnvironment(StreamExecutionEnvironment env, EnvironmentSettings settings, TableConfig config, Executor executor, CatalogManager catalogManager, ModuleManager moduleManager, FunctionCatalog functionCatalog) {
        Map plannerProperties = settings.toPlannerProperties();
        Planner planner = ((PlannerFactory)ComponentFactoryService.find(PlannerFactory.class, (Map)plannerProperties)).create(plannerProperties, executor, config, functionCatalog, catalogManager);
        return new StreamTableEnvironmentImpl(catalogManager, moduleManager, functionCatalog, config, env, planner, executor, settings.isStreamingMode(), this.classLoader);
    }

    void wrapClassLoader(Runnable runnable) {
        try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of((ClassLoader)this.classLoader);){
            runnable.run();
        }
    }
}

