/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.api.bridge.java.internal;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.api.Types;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogManager;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.ExternalSchemaTranslator;
import org.apache.flink.table.catalog.FunctionCatalog;
import org.apache.flink.table.catalog.GenericInMemoryCatalog;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.SchemaResolver;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.delegation.Executor;
import org.apache.flink.table.delegation.ExecutorFactory;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.delegation.PlannerFactory;
import org.apache.flink.table.descriptors.ConnectorDescriptor;
import org.apache.flink.table.descriptors.StreamTableDescriptor;
import org.apache.flink.table.expressions.ApiExpressionUtils;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ExpressionParser;
import org.apache.flink.table.factories.ComponentFactoryService;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.functions.UserDefinedFunctionHelper;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.operations.ExternalModifyOperation;
import org.apache.flink.table.operations.JavaDataStreamQueryOperation;
import org.apache.flink.table.operations.JavaExternalQueryOperation;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.OutputConversionModifyOperation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.operations.utils.OperationTreeBuilder;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.sources.TableSourceValidation;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.table.typeutils.FieldInfoUtils;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

@Internal
public final class StreamTableEnvironmentImpl
extends TableEnvironmentImpl
implements StreamTableEnvironment {
    private final StreamExecutionEnvironment executionEnvironment;

    public StreamTableEnvironmentImpl(CatalogManager catalogManager, ModuleManager moduleManager, FunctionCatalog functionCatalog, TableConfig tableConfig, StreamExecutionEnvironment executionEnvironment, Planner planner, Executor executor, boolean isStreamingMode, ClassLoader userClassLoader) {
        super(catalogManager, moduleManager, tableConfig, executor, functionCatalog, planner, isStreamingMode, userClassLoader);
        this.executionEnvironment = executionEnvironment;
    }

    public static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment, EnvironmentSettings settings, TableConfig tableConfig) {
        if (!settings.isStreamingMode()) {
            throw new TableException("StreamTableEnvironment can not run in batch mode for now, please use TableEnvironment.");
        }
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        ModuleManager moduleManager = new ModuleManager();
        CatalogManager catalogManager = CatalogManager.newBuilder().classLoader(classLoader).config((ReadableConfig)tableConfig.getConfiguration()).defaultCatalog(settings.getBuiltInCatalogName(), (Catalog)new GenericInMemoryCatalog(settings.getBuiltInCatalogName(), settings.getBuiltInDatabaseName())).executionConfig(executionEnvironment.getConfig()).build();
        FunctionCatalog functionCatalog = new FunctionCatalog(tableConfig, catalogManager, moduleManager);
        Map executorProperties = settings.toExecutorProperties();
        Executor executor = StreamTableEnvironmentImpl.lookupExecutor(executorProperties, executionEnvironment);
        Map plannerProperties = settings.toPlannerProperties();
        Planner planner = ((PlannerFactory)ComponentFactoryService.find(PlannerFactory.class, (Map)plannerProperties)).create(plannerProperties, executor, tableConfig, functionCatalog, catalogManager);
        return new StreamTableEnvironmentImpl(catalogManager, moduleManager, functionCatalog, tableConfig, executionEnvironment, planner, executor, settings.isStreamingMode(), classLoader);
    }

    private static Executor lookupExecutor(Map<String, String> executorProperties, StreamExecutionEnvironment executionEnvironment) {
        try {
            ExecutorFactory executorFactory = (ExecutorFactory)ComponentFactoryService.find(ExecutorFactory.class, executorProperties);
            Method createMethod = executorFactory.getClass().getMethod("create", Map.class, StreamExecutionEnvironment.class);
            return (Executor)createMethod.invoke((Object)executorFactory, executorProperties, executionEnvironment);
        }
        catch (Exception e) {
            throw new TableException("Could not instantiate the executor. Make sure a planner module is on the classpath", (Throwable)e);
        }
    }

    @Override
    public <T> void registerFunction(String name, TableFunction<T> tableFunction) {
        TypeInformation typeInfo = UserDefinedFunctionHelper.getReturnTypeOfTableFunction(tableFunction);
        this.functionCatalog.registerTempSystemTableFunction(name, tableFunction, typeInfo);
    }

    @Override
    public <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> aggregateFunction) {
        TypeInformation typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(aggregateFunction);
        TypeInformation accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(aggregateFunction);
        this.functionCatalog.registerTempSystemAggregateFunction(name, aggregateFunction, typeInfo, accTypeInfo);
    }

    @Override
    public <T, ACC> void registerFunction(String name, TableAggregateFunction<T, ACC> tableAggregateFunction) {
        TypeInformation typeInfo = UserDefinedFunctionHelper.getReturnTypeOfAggregateFunction(tableAggregateFunction);
        TypeInformation accTypeInfo = UserDefinedFunctionHelper.getAccumulatorTypeOfAggregateFunction(tableAggregateFunction);
        this.functionCatalog.registerTempSystemAggregateFunction(name, tableAggregateFunction, typeInfo, accTypeInfo);
    }

    @Override
    public <T> Table fromDataStream(DataStream<T> dataStream) {
        return this.fromStreamInternal(dataStream, null, null, ChangelogMode.insertOnly());
    }

    @Override
    public <T> Table fromDataStream(DataStream<T> dataStream, Schema schema) {
        Preconditions.checkNotNull((Object)schema, (String)"Schema must not be null.");
        return this.fromStreamInternal(dataStream, schema, null, ChangelogMode.insertOnly());
    }

    @Override
    public Table fromChangelogStream(DataStream<Row> dataStream) {
        return this.fromStreamInternal(dataStream, null, null, ChangelogMode.all());
    }

    @Override
    public Table fromChangelogStream(DataStream<Row> dataStream, Schema schema) {
        Preconditions.checkNotNull((Object)schema, (String)"Schema must not be null.");
        return this.fromStreamInternal(dataStream, schema, null, ChangelogMode.all());
    }

    @Override
    public Table fromChangelogStream(DataStream<Row> dataStream, Schema schema, ChangelogMode changelogMode) {
        Preconditions.checkNotNull((Object)schema, (String)"Schema must not be null.");
        return this.fromStreamInternal(dataStream, schema, null, changelogMode);
    }

    @Override
    public <T> void createTemporaryView(String path, DataStream<T> dataStream) {
        this.createTemporaryView(path, this.fromStreamInternal(dataStream, null, path, ChangelogMode.insertOnly()));
    }

    @Override
    public <T> void createTemporaryView(String path, DataStream<T> dataStream, Schema schema) {
        this.createTemporaryView(path, this.fromStreamInternal(dataStream, schema, path, ChangelogMode.insertOnly()));
    }

    private <T> Table fromStreamInternal(DataStream<T> dataStream, @Nullable Schema schema, @Nullable String viewPath, ChangelogMode changelogMode) {
        Preconditions.checkNotNull(dataStream, (String)"Data stream must not be null.");
        Preconditions.checkNotNull((Object)changelogMode, (String)"Changelog mode must not be null.");
        CatalogManager catalogManager = this.getCatalogManager();
        SchemaResolver schemaResolver = catalogManager.getSchemaResolver();
        OperationTreeBuilder operationTreeBuilder = this.getOperationTreeBuilder();
        UnresolvedIdentifier unresolvedIdentifier = viewPath != null ? this.getParser().parseIdentifier(viewPath) : UnresolvedIdentifier.of((String[])new String[]{"Unregistered_DataStream_Source_" + dataStream.getId()});
        ObjectIdentifier objectIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
        ExternalSchemaTranslator.InputResult schemaTranslationResult = ExternalSchemaTranslator.fromExternal((DataTypeFactory)catalogManager.getDataTypeFactory(), (TypeInformation)dataStream.getType(), (Schema)schema);
        ResolvedSchema resolvedSchema = schemaTranslationResult.getSchema().resolve(schemaResolver);
        JavaExternalQueryOperation<T> scanOperation = new JavaExternalQueryOperation<T>(objectIdentifier, dataStream, schemaTranslationResult.getPhysicalDataType(), schemaTranslationResult.isTopLevelRecord(), changelogMode, resolvedSchema);
        List projections = schemaTranslationResult.getProjections();
        if (projections == null) {
            return this.createTable(scanOperation);
        }
        QueryOperation projectOperation = operationTreeBuilder.project(projections.stream().map(ApiExpressionUtils::unresolvedRef).collect(Collectors.toList()), scanOperation);
        return this.createTable(projectOperation);
    }

    @Override
    public DataStream<Row> toDataStream(Table table) {
        Preconditions.checkNotNull((Object)table, (String)"Table must not be null.");
        DataType sourceType = table.getResolvedSchema().toSourceRowDataType();
        return this.toDataStream(table, (AbstractDataType<?>)sourceType);
    }

    @Override
    public <T> DataStream<T> toDataStream(Table table, Class<T> targetClass) {
        Preconditions.checkNotNull((Object)table, (String)"Table must not be null.");
        Preconditions.checkNotNull(targetClass, (String)"Target class must not be null.");
        if (targetClass == Row.class) {
            return this.toDataStream(table);
        }
        return this.toDataStream(table, (AbstractDataType<?>)DataTypes.of(targetClass));
    }

    @Override
    public <T> DataStream<T> toDataStream(Table table, AbstractDataType<?> targetDataType) {
        Preconditions.checkNotNull((Object)table, (String)"Table must not be null.");
        Preconditions.checkNotNull(targetDataType, (String)"Target data type must not be null.");
        ExternalSchemaTranslator.OutputResult schemaTranslationResult = ExternalSchemaTranslator.fromInternal((DataTypeFactory)this.getCatalogManager().getDataTypeFactory(), (ResolvedSchema)table.getResolvedSchema(), targetDataType);
        return this.toStreamInternal(table, schemaTranslationResult, ChangelogMode.insertOnly());
    }

    @Override
    public DataStream<Row> toChangelogStream(Table table) {
        Preconditions.checkNotNull((Object)table, (String)"Table must not be null.");
        ExternalSchemaTranslator.OutputResult schemaTranslationResult = ExternalSchemaTranslator.fromInternal((ResolvedSchema)table.getResolvedSchema(), null);
        return this.toStreamInternal(table, schemaTranslationResult, null);
    }

    @Override
    public DataStream<Row> toChangelogStream(Table table, Schema targetSchema) {
        Preconditions.checkNotNull((Object)table, (String)"Table must not be null.");
        Preconditions.checkNotNull((Object)targetSchema, (String)"Target schema must not be null.");
        ExternalSchemaTranslator.OutputResult schemaTranslationResult = ExternalSchemaTranslator.fromInternal((ResolvedSchema)table.getResolvedSchema(), (Schema)targetSchema);
        return this.toStreamInternal(table, schemaTranslationResult, null);
    }

    @Override
    public DataStream<Row> toChangelogStream(Table table, Schema targetSchema, ChangelogMode changelogMode) {
        Preconditions.checkNotNull((Object)table, (String)"Table must not be null.");
        Preconditions.checkNotNull((Object)targetSchema, (String)"Target schema must not be null.");
        Preconditions.checkNotNull((Object)changelogMode, (String)"Changelog mode must not be null.");
        ExternalSchemaTranslator.OutputResult schemaTranslationResult = ExternalSchemaTranslator.fromInternal((ResolvedSchema)table.getResolvedSchema(), (Schema)targetSchema);
        return this.toStreamInternal(table, schemaTranslationResult, changelogMode);
    }

    private <T> DataStream<T> toStreamInternal(Table table, ExternalSchemaTranslator.OutputResult schemaTranslationResult, @Nullable ChangelogMode changelogMode) {
        CatalogManager catalogManager = this.getCatalogManager();
        SchemaResolver schemaResolver = catalogManager.getSchemaResolver();
        OperationTreeBuilder operationTreeBuilder = this.getOperationTreeBuilder();
        QueryOperation projectOperation = schemaTranslationResult.getProjections().map(projections -> operationTreeBuilder.project(projections.stream().map(ApiExpressionUtils::unresolvedRef).collect(Collectors.toList()), table.getQueryOperation())).orElseGet(() -> ((Table)table).getQueryOperation());
        ResolvedSchema resolvedSchema = schemaResolver.resolve(schemaTranslationResult.getSchema());
        UnresolvedIdentifier unresolvedIdentifier = UnresolvedIdentifier.of((String[])new String[]{"Unregistered_DataStream_Sink_" + ExternalModifyOperation.getUniqueId()});
        ObjectIdentifier objectIdentifier = catalogManager.qualifyIdentifier(unresolvedIdentifier);
        ExternalModifyOperation modifyOperation = new ExternalModifyOperation(objectIdentifier, projectOperation, resolvedSchema, changelogMode, schemaTranslationResult.getPhysicalDataType().orElseGet(() -> ((ResolvedSchema)resolvedSchema).toPhysicalRowDataType()));
        return this.toStreamInternal(table, (ModifyOperation)modifyOperation);
    }

    private <T> DataStream<T> toStreamInternal(Table table, ModifyOperation modifyOperation) {
        List transformations = this.planner.translate(Collections.singletonList(modifyOperation));
        Transformation<T> transformation = this.getTransformation(table, transformations);
        this.executionEnvironment.addOperator(transformation);
        return new DataStream(this.executionEnvironment, transformation);
    }

    @Override
    public <T> Table fromDataStream(DataStream<T> dataStream, String fields) {
        List expressions = ExpressionParser.parseExpressionList((String)fields);
        return this.fromDataStream(dataStream, expressions.toArray(new Expression[0]));
    }

    @Override
    public <T> Table fromDataStream(DataStream<T> dataStream, Expression ... fields) {
        JavaDataStreamQueryOperation<T> queryOperation = this.asQueryOperation(dataStream, Optional.of(Arrays.asList(fields)));
        return this.createTable(queryOperation);
    }

    @Override
    public <T> void registerDataStream(String name, DataStream<T> dataStream) {
        this.createTemporaryView(name, dataStream);
    }

    @Override
    public <T> void registerDataStream(String name, DataStream<T> dataStream, String fields) {
        this.createTemporaryView(name, dataStream, fields);
    }

    @Override
    public <T> void createTemporaryView(String path, DataStream<T> dataStream, String fields) {
        this.createTemporaryView(path, this.fromDataStream(dataStream, fields));
    }

    @Override
    public <T> void createTemporaryView(String path, DataStream<T> dataStream, Expression ... fields) {
        this.createTemporaryView(path, this.fromDataStream(dataStream, fields));
    }

    protected QueryOperation qualifyQueryOperation(ObjectIdentifier identifier, QueryOperation queryOperation) {
        if (queryOperation instanceof JavaDataStreamQueryOperation) {
            JavaDataStreamQueryOperation operation = (JavaDataStreamQueryOperation)queryOperation;
            return new JavaDataStreamQueryOperation(identifier, operation.getDataStream(), operation.getFieldIndices(), operation.getResolvedSchema());
        }
        return queryOperation;
    }

    @Override
    public <T> DataStream<T> toAppendStream(Table table, Class<T> clazz) {
        TypeInformation<T> typeInfo = this.extractTypeInformation(table, clazz);
        return this.toAppendStream(table, typeInfo);
    }

    @Override
    public <T> DataStream<T> toAppendStream(Table table, TypeInformation<T> typeInfo) {
        OutputConversionModifyOperation modifyOperation = new OutputConversionModifyOperation(table.getQueryOperation(), TypeConversions.fromLegacyInfoToDataType(typeInfo), OutputConversionModifyOperation.UpdateMode.APPEND);
        return this.toStreamInternal(table, (ModifyOperation)modifyOperation);
    }

    @Override
    public <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, Class<T> clazz) {
        TypeInformation<T> typeInfo = this.extractTypeInformation(table, clazz);
        return this.toRetractStream(table, typeInfo);
    }

    @Override
    public <T> DataStream<Tuple2<Boolean, T>> toRetractStream(Table table, TypeInformation<T> typeInfo) {
        OutputConversionModifyOperation modifyOperation = new OutputConversionModifyOperation(table.getQueryOperation(), this.wrapWithChangeFlag(typeInfo), OutputConversionModifyOperation.UpdateMode.RETRACT);
        return this.toStreamInternal(table, (ModifyOperation)modifyOperation);
    }

    @Override
    public StreamTableDescriptor connect(ConnectorDescriptor connectorDescriptor) {
        return (StreamTableDescriptor)super.connect(connectorDescriptor);
    }

    @Internal
    public StreamExecutionEnvironment execEnv() {
        return this.executionEnvironment;
    }

    public Pipeline getPipeline(String jobName) {
        return this.execEnv.createPipeline(this.translateAndClearBuffer(), this.tableConfig, jobName);
    }

    protected void validateTableSource(TableSource<?> tableSource) {
        super.validateTableSource(tableSource);
        this.validateTimeCharacteristic(TableSourceValidation.hasRowtimeAttribute(tableSource));
    }

    private <T> TypeInformation<T> extractTypeInformation(Table table, Class<T> clazz) {
        try {
            return TypeExtractor.createTypeInfo(clazz);
        }
        catch (Exception ex) {
            throw new ValidationException(String.format("Could not convert query: %s to a DataStream of class %s", table.getQueryOperation().asSummaryString(), clazz.getSimpleName()), (Throwable)ex);
        }
    }

    private <T> Transformation<T> getTransformation(Table table, List<Transformation<?>> transformations) {
        if (transformations.size() != 1) {
            throw new TableException(String.format("Expected a single transformation for query: %s\n Got: %s", table.getQueryOperation().asSummaryString(), transformations));
        }
        return transformations.get(0);
    }

    private <T> DataType wrapWithChangeFlag(TypeInformation<T> outputType) {
        TupleTypeInfo tupleTypeInfo = new TupleTypeInfo(new TypeInformation[]{Types.BOOLEAN(), outputType});
        return TypeConversions.fromLegacyInfoToDataType((TypeInformation)tupleTypeInfo);
    }

    private <T> JavaDataStreamQueryOperation<T> asQueryOperation(DataStream<T> dataStream, Optional<List<Expression>> fields) {
        TypeInformation streamType = dataStream.getType();
        FieldInfoUtils.TypeInfoSchema typeInfoSchema = fields.map(f -> {
            FieldInfoUtils.TypeInfoSchema fieldsInfo = FieldInfoUtils.getFieldsInfo((TypeInformation)streamType, (Expression[])f.toArray(new Expression[0]));
            this.validateTimeCharacteristic(fieldsInfo.isRowtimeDefined());
            return fieldsInfo;
        }).orElseGet(() -> FieldInfoUtils.getFieldsInfo((TypeInformation)streamType));
        return new JavaDataStreamQueryOperation<T>(dataStream, typeInfoSchema.getIndices(), typeInfoSchema.toResolvedSchema());
    }

    private void validateTimeCharacteristic(boolean isRowtimeDefined) {
        if (isRowtimeDefined && this.executionEnvironment.getStreamTimeCharacteristic() != TimeCharacteristic.EventTime) {
            throw new ValidationException(String.format("A rowtime attribute requires an EventTime time characteristic in stream environment. But is: %s", this.executionEnvironment.getStreamTimeCharacteristic()));
        }
    }
}

