/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.functions;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.typeutils.TypeExtractionUtils;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogFunction;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.AsyncScalarFunction;
import org.apache.flink.table.functions.AsyncTableFunction;
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.functions.ImperativeAggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.SpecializedFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunction;
import org.apache.flink.table.functions.python.utils.PythonFunctionUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.extraction.ExtractionUtils;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;

@Internal
public final class UserDefinedFunctionHelper {
    public static final String SCALAR_EVAL = "eval";
    public static final String ASYNC_SCALAR_EVAL = "eval";
    public static final String TABLE_EVAL = "eval";
    public static final String AGGREGATE_ACCUMULATE = "accumulate";
    public static final String AGGREGATE_RETRACT = "retract";
    public static final String AGGREGATE_MERGE = "merge";
    public static final String TABLE_AGGREGATE_ACCUMULATE = "accumulate";
    public static final String TABLE_AGGREGATE_RETRACT = "retract";
    public static final String TABLE_AGGREGATE_MERGE = "merge";
    public static final String TABLE_AGGREGATE_EMIT = "emitValue";
    public static final String TABLE_AGGREGATE_EMIT_RETRACT = "emitUpdateWithRetract";
    public static final String ASYNC_TABLE_EVAL = "eval";
    public static final String PROCESS_TABLE_EVAL = "eval";
    public static final String PROCESS_TABLE_ON_TIMER = "onTimer";
    public static final String DEFAULT_ACCUMULATOR_NAME = "acc";

    public static <T, ACC> TypeInformation<T> getReturnTypeOfAggregateFunction(ImperativeAggregateFunction<T, ACC> aggregateFunction) {
        return UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(aggregateFunction, null);
    }

    public static <T, ACC> TypeInformation<T> getReturnTypeOfAggregateFunction(ImperativeAggregateFunction<T, ACC> aggregateFunction, TypeInformation<T> scalaType) {
        TypeInformation<T> userProvidedType = aggregateFunction.getResultType();
        if (userProvidedType != null) {
            return userProvidedType;
        }
        if (scalaType != null) {
            return scalaType;
        }
        return TypeExtractor.createTypeInfo(aggregateFunction, ImperativeAggregateFunction.class, aggregateFunction.getClass(), 0);
    }

    public static <T, ACC> TypeInformation<ACC> getAccumulatorTypeOfAggregateFunction(ImperativeAggregateFunction<T, ACC> aggregateFunction) {
        return UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(aggregateFunction, null);
    }

    public static <T, ACC> TypeInformation<ACC> getAccumulatorTypeOfAggregateFunction(ImperativeAggregateFunction<T, ACC> aggregateFunction, TypeInformation<ACC> scalaType) {
        TypeInformation<ACC> userProvidedType = aggregateFunction.getAccumulatorType();
        if (userProvidedType != null) {
            return userProvidedType;
        }
        if (scalaType != null) {
            return scalaType;
        }
        return TypeExtractor.createTypeInfo(aggregateFunction, ImperativeAggregateFunction.class, aggregateFunction.getClass(), 1);
    }

    public static <T> TypeInformation<T> getReturnTypeOfTableFunction(TableFunction<T> tableFunction) {
        return UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction, null);
    }

    public static <T> TypeInformation<T> getReturnTypeOfTableFunction(TableFunction<T> tableFunction, TypeInformation<T> scalaType) {
        TypeInformation<T> userProvidedType = tableFunction.getResultType();
        if (userProvidedType != null) {
            return userProvidedType;
        }
        if (scalaType != null) {
            return scalaType;
        }
        return TypeExtractor.createTypeInfo(tableFunction, TableFunction.class, tableFunction.getClass(), 0);
    }

    public static UserDefinedFunction instantiateFunction(ClassLoader classLoader, @Nullable ReadableConfig config, String name, CatalogFunction catalogFunction) {
        try {
            switch (catalogFunction.getFunctionLanguage()) {
                case PYTHON: {
                    if (config == null) {
                        throw new IllegalStateException("Python functions are not supported at this location.");
                    }
                    return (UserDefinedFunction)((Object)PythonFunctionUtils.getPythonFunction(catalogFunction.getClassName(), config, classLoader));
                }
                case JAVA: 
                case SCALA: {
                    Class<?> functionClass = classLoader.loadClass(catalogFunction.getClassName());
                    return UserDefinedFunctionHelper.instantiateFunction(functionClass);
                }
            }
            throw new IllegalArgumentException("Unknown function language: " + String.valueOf((Object)catalogFunction.getFunctionLanguage()));
        }
        catch (Exception e) {
            throw new ValidationException(String.format("Cannot instantiate user-defined function '%s'.", name), e);
        }
    }

    public static UserDefinedFunction instantiateFunction(Class<?> functionClass) {
        if (!UserDefinedFunction.class.isAssignableFrom(functionClass)) {
            throw new ValidationException(String.format("Function '%s' does not extend from '%s'.", functionClass.getName(), UserDefinedFunction.class.getName()));
        }
        UserDefinedFunctionHelper.validateClass(functionClass, true);
        try {
            return (UserDefinedFunction)functionClass.newInstance();
        }
        catch (Exception e) {
            throw new ValidationException(String.format("Cannot instantiate user-defined function class '%s'.", functionClass.getName()), e);
        }
    }

    public static void prepareInstance(ReadableConfig config, UserDefinedFunction function) {
        UserDefinedFunctionHelper.validateClass(function.getClass(), false);
        UserDefinedFunctionHelper.cleanFunction(config, function);
    }

    public static boolean isClassNameSerializable(UserDefinedFunction function) {
        Class<?> functionClass = function.getClass();
        if (!InstantiationUtil.hasPublicNullaryConstructor(functionClass)) {
            return false;
        }
        Class<?> currentClass = functionClass;
        while (!currentClass.equals(UserDefinedFunction.class)) {
            for (Field field : currentClass.getDeclaredFields()) {
                if (Modifier.isTransient(field.getModifiers()) || Modifier.isStatic(field.getModifiers())) continue;
                return false;
            }
            currentClass = currentClass.getSuperclass();
        }
        return true;
    }

    public static String generateInlineFunctionName(UserDefinedFunction function) {
        return String.format("*%s*", function.functionIdentifier());
    }

    public static void validateClass(Class<? extends UserDefinedFunction> functionClass) {
        UserDefinedFunctionHelper.validateClass(functionClass, true);
    }

    public static void validateClassForRuntime(Class<? extends UserDefinedFunction> functionClass, String methodName, Class<?>[] argumentClasses, Class<?> outputClass, String functionName) {
        List<Method> methods = ExtractionUtils.collectMethods(functionClass, methodName);
        boolean isMatching = methods.stream().anyMatch(method -> ExtractionUtils.isInvokable(ExtractionUtils.Autoboxing.JVM, method, argumentClasses) && ExtractionUtils.isAssignable(outputClass, method.getReturnType(), ExtractionUtils.Autoboxing.JVM));
        if (!isMatching) {
            throw new ValidationException(String.format("Could not find an implementation method '%s' in class '%s' for function '%s' that matches the following signature:\n%s", methodName, functionClass.getName(), functionName, ExtractionUtils.createMethodSignatureString(methodName, argumentClasses, outputClass)));
        }
    }

    public static UserDefinedFunction createSpecializedFunction(String functionName, FunctionDefinition definition, final CallContext callContext, final ClassLoader builtInClassLoader, final @Nullable ReadableConfig configuration, final @Nullable SpecializedFunction.ExpressionEvaluatorFactory evaluatorFactory) {
        if (definition instanceof SpecializedFunction) {
            SpecializedFunction specialized = (SpecializedFunction)definition;
            SpecializedFunction.SpecializedContext specializedContext = new SpecializedFunction.SpecializedContext(){

                @Override
                public CallContext getCallContext() {
                    return callContext;
                }

                @Override
                public ReadableConfig getConfiguration() {
                    if (configuration == null) {
                        throw new TableException("Access to configuration is currently not supported for all kinds of calls.");
                    }
                    return configuration;
                }

                @Override
                public ClassLoader getBuiltInClassLoader() {
                    return builtInClassLoader;
                }

                @Override
                public SpecializedFunction.ExpressionEvaluator createEvaluator(Expression expression, DataType outputDataType, DataTypes.Field ... args) {
                    if (evaluatorFactory == null) {
                        throw new TableException("Access to expression evaluation is currently not supported for all kinds of calls.");
                    }
                    return evaluatorFactory.createEvaluator(expression, outputDataType, args);
                }

                @Override
                public SpecializedFunction.ExpressionEvaluator createEvaluator(String sqlExpression, DataType outputDataType, DataTypes.Field ... args) {
                    if (evaluatorFactory == null) {
                        throw new TableException("Access to expression evaluation is currently not supported for all kinds of calls.");
                    }
                    return evaluatorFactory.createEvaluator(sqlExpression, outputDataType, args);
                }

                @Override
                public SpecializedFunction.ExpressionEvaluator createEvaluator(BuiltInFunctionDefinition function, DataType outputDataType, DataType ... args) {
                    if (evaluatorFactory == null) {
                        throw new TableException("Access to expression evaluation is currently not supported for all kinds of calls.");
                    }
                    return evaluatorFactory.createEvaluator(function, outputDataType, args);
                }
            };
            UserDefinedFunction udf = specialized.specialize(specializedContext);
            Preconditions.checkState(udf.getKind() == definition.getKind(), "Function kind must not change during specialization.");
            return udf;
        }
        if (definition instanceof UserDefinedFunction) {
            return (UserDefinedFunction)definition;
        }
        throw new TableException(String.format("Could not find a runtime implementation for function definition '%s'.", functionName));
    }

    private static void validateClass(Class<? extends UserDefinedFunction> functionClass, boolean requiresDefaultConstructor) {
        if (TableFunction.class.isAssignableFrom(functionClass)) {
            UserDefinedFunctionHelper.validateNotSingleton(functionClass);
        }
        UserDefinedFunctionHelper.validateInstantiation(functionClass, requiresDefaultConstructor);
        UserDefinedFunctionHelper.validateImplementationMethods(functionClass);
    }

    private static void validateNotSingleton(Class<?> clazz) {
        if (Arrays.stream(clazz.getFields()).anyMatch(f -> f.getName().equals("MODULE$"))) {
            throw new ValidationException(String.format("Function implemented by class %s is a Scala object. This is forbidden because of concurrency problems when using them.", clazz.getName()));
        }
    }

    private static void validateImplementationMethods(Class<? extends UserDefinedFunction> functionClass) {
        if (ScalarFunction.class.isAssignableFrom(functionClass)) {
            UserDefinedFunctionHelper.validateImplementationMethod(functionClass, false, false, "eval");
        } else if (AsyncScalarFunction.class.isAssignableFrom(functionClass)) {
            UserDefinedFunctionHelper.validateImplementationMethod(functionClass, false, false, "eval");
            UserDefinedFunctionHelper.validateAsyncImplementationMethod(functionClass, false, "eval");
        } else if (TableFunction.class.isAssignableFrom(functionClass)) {
            UserDefinedFunctionHelper.validateImplementationMethod(functionClass, true, false, "eval");
        } else if (AsyncTableFunction.class.isAssignableFrom(functionClass)) {
            UserDefinedFunctionHelper.validateImplementationMethod(functionClass, true, false, "eval");
            UserDefinedFunctionHelper.validateAsyncImplementationMethod(functionClass, true, "eval");
        } else if (AggregateFunction.class.isAssignableFrom(functionClass)) {
            UserDefinedFunctionHelper.validateImplementationMethod(functionClass, true, false, "accumulate");
            UserDefinedFunctionHelper.validateImplementationMethod(functionClass, true, true, "retract");
            UserDefinedFunctionHelper.validateImplementationMethod(functionClass, true, true, "merge");
        } else if (TableAggregateFunction.class.isAssignableFrom(functionClass)) {
            UserDefinedFunctionHelper.validateImplementationMethod(functionClass, true, false, "accumulate");
            UserDefinedFunctionHelper.validateImplementationMethod(functionClass, true, true, "retract");
            UserDefinedFunctionHelper.validateImplementationMethod(functionClass, true, true, "merge");
            UserDefinedFunctionHelper.validateImplementationMethod(functionClass, true, false, TABLE_AGGREGATE_EMIT, TABLE_AGGREGATE_EMIT_RETRACT);
        }
    }

    private static void validateImplementationMethod(Class<? extends UserDefinedFunction> clazz, boolean rejectStatic, boolean isOptional, String ... methodNameOptions) {
        HashSet<String> nameSet = new HashSet<String>(Arrays.asList(methodNameOptions));
        List<Method> methods = TypeExtractionUtils.getAllDeclaredMethods(clazz);
        boolean found = false;
        for (Method method : methods) {
            if (!nameSet.contains(method.getName())) continue;
            found = true;
            int modifier = method.getModifiers();
            if (!Modifier.isPublic(modifier)) {
                throw new ValidationException(String.format("Method '%s' of function class '%s' is not public.", method.getName(), clazz.getName()));
            }
            if (Modifier.isAbstract(modifier)) {
                throw new ValidationException(String.format("Method '%s' of function class '%s' must not be abstract.", method.getName(), clazz.getName()));
            }
            if (!rejectStatic || !Modifier.isStatic(modifier)) continue;
            throw new ValidationException(String.format("Method '%s' of function class '%s' must not be static.", method.getName(), clazz.getName()));
        }
        if (!found && !isOptional) {
            throw new ValidationException(String.format("Function class '%s' does not implement a method named %s.", clazz.getName(), nameSet.stream().map(s -> "'" + s + "'").collect(Collectors.joining(" or "))));
        }
    }

    private static void validateAsyncImplementationMethod(Class<? extends UserDefinedFunction> clazz, boolean verifyFutureContainsCollection, String ... methodNameOptions) {
        HashSet<String> nameSet = new HashSet<String>(Arrays.asList(methodNameOptions));
        List<Method> methods = TypeExtractionUtils.getAllDeclaredMethods(clazz);
        for (Method method : methods) {
            if (!nameSet.contains(method.getName())) continue;
            if (!method.getReturnType().equals(Void.TYPE)) {
                throw new ValidationException(String.format("Method '%s' of function class '%s' must be void.", method.getName(), clazz.getName()));
            }
            boolean foundParam = false;
            if (method.getParameterCount() >= 1) {
                Type firstParam = method.getGenericParameterTypes()[0];
                if (TypeExtractionUtils.isGenericOfClass(CompletableFuture.class, firstParam = ExtractionUtils.resolveVariableWithClassContext(clazz, firstParam))) {
                    Type firstTypeArgument;
                    Optional<ParameterizedType> parameterized = TypeExtractionUtils.getParameterizedType(firstParam);
                    if (!verifyFutureContainsCollection) {
                        foundParam = true;
                    } else if (parameterized.isPresent() && parameterized.get().getActualTypeArguments().length > 0 && TypeExtractionUtils.isGenericOfClass(Collection.class, firstTypeArgument = parameterized.get().getActualTypeArguments()[0])) {
                        foundParam = true;
                    }
                }
            }
            if (foundParam) continue;
            if (!verifyFutureContainsCollection) {
                throw new ValidationException(String.format("Method '%s' of function class '%s' must have a first argument of type java.util.concurrent.CompletableFuture.", method.getName(), clazz.getName()));
            }
            throw new ValidationException(String.format("Method '%s' of function class '%s' must have a first argument of type java.util.concurrent.CompletableFuture<java.util.Collection>.", method.getName(), clazz.getName()));
        }
    }

    private static void validateInstantiation(Class<?> clazz, boolean requiresDefaultConstructor) {
        if (!InstantiationUtil.isPublic(clazz)) {
            throw new ValidationException(String.format("Function class '%s' is not public.", clazz.getName()));
        }
        if (!InstantiationUtil.isProperClass(clazz)) {
            throw new ValidationException(String.format("Function class '%s' is not a proper class. It is either abstract, an interface, or a primitive type.", clazz.getName()));
        }
        if (requiresDefaultConstructor && !InstantiationUtil.hasPublicNullaryConstructor(clazz)) {
            throw new ValidationException(String.format("Function class '%s' must have a public default constructor.", clazz.getName()));
        }
    }

    private static void cleanFunction(ReadableConfig config, UserDefinedFunction function) {
        ExecutionConfig.ClosureCleanerLevel level = config.get(PipelineOptions.CLOSURE_CLEANER_LEVEL);
        try {
            ClosureCleaner.clean(function, level, true);
        }
        catch (Throwable t) {
            throw new ValidationException(String.format("Function class '%s' is not serializable. Make sure that the class is self-contained (i.e. no references to outer classes) and all inner fields are serializable as well.", function.getClass()), t);
        }
    }

    private UserDefinedFunctionHelper() {
    }
}

