package org.apache.flink.table.planner.plan.nodes.exec.batch;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.codegen.CodeGeneratorContext;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.fusion.FusionCodegenUtil;
import org.apache.flink.table.planner.plan.fusion.OpFusionCodegenSpecGenerator;
import org.apache.flink.table.planner.plan.fusion.generator.OneInputOpFusionCodegenSpecGenerator;
import org.apache.flink.table.planner.plan.fusion.spec.OutputFusionCodegenSpec;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
import org.apache.flink.table.runtime.operators.fusion.OperatorFusionCodegenFactory;
import org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperatorFactory;
import org.apache.flink.table.runtime.operators.multipleinput.TableOperatorWrapperGenerator;
import org.apache.flink.table.runtime.operators.multipleinput.input.InputSelectionSpec;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.util.Preconditions;
import scala.Tuple2;

/* loaded from: input_file:org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecMultipleInput.class */
public class BatchExecMultipleInput extends ExecNodeBase<RowData> implements BatchExecNode<RowData>, SingleTransformationTranslator<RowData> {
    private final ExecNode<?> rootNode;
    private final List<ExecNode<?>> memberExecNodes;
    private final List<ExecEdge> originalEdges;

    public BatchExecMultipleInput(ReadableConfig readableConfig, List<InputProperty> list, ExecNode<?> execNode, List<ExecNode<?>> list2, List<ExecEdge> list3, String str) {
        super(ExecNodeContext.newNodeId(), ExecNodeContext.newContext(BatchExecMultipleInput.class), ExecNodeContext.newPersistedConfig(BatchExecMultipleInput.class, readableConfig), list, execNode.getOutputType(), str);
        this.rootNode = execNode;
        this.memberExecNodes = list2;
        Preconditions.checkArgument(list.size() == list3.size());
        this.originalEdges = list3;
    }

    @Override // org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase
    protected Transformation<RowData> translateToPlanInternal(PlannerBase plannerBase, ExecNodeConfig execNodeConfig) {
        StreamOperatorFactory batchMultipleInputStreamOperatorFactory;
        int parallelism;
        int maxParallelism;
        long managedMemoryWeight;
        List<Transformation<?>> arrayList = new ArrayList();
        Iterator<ExecEdge> it = getInputEdges().iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().translateToPlan(plannerBase));
        }
        Transformation<?> translateToPlan = this.rootNode.translateToPlan(plannerBase);
        int[] array = getInputProperties().stream().map((v0) -> {
            return v0.getPriority();
        }).mapToInt(num -> {
            return num.intValue();
        }).toArray();
        ResourceSpec resourceSpec = null;
        ResourceSpec resourceSpec2 = null;
        if (((Boolean) execNodeConfig.get(ExecutionConfigOptions.TABLE_EXEC_OPERATOR_FUSION_CODEGEN_ENABLED)).booleanValue() && allSupportFusionCodegen()) {
            ArrayList arrayList2 = new ArrayList();
            int i = 0;
            for (ExecEdge execEdge : this.originalEdges) {
                int i2 = i + 1;
                BatchExecNode batchExecNode = (BatchExecNode) execEdge.getSource();
                BatchExecInputAdapter batchExecInputAdapter = new BatchExecInputAdapter(i2, TableConfig.getDefault(), InputProperty.builder().priority(array[i]).build(), batchExecNode.getOutputType(), "BatchInputAdapter");
                batchExecInputAdapter.setInputEdges(Collections.singletonList(ExecEdge.builder().source(batchExecNode).target(batchExecInputAdapter).build()));
                BatchExecNode batchExecNode2 = (BatchExecNode) execEdge.getTarget();
                int indexOf = batchExecNode2.getInputEdges().indexOf(execEdge);
                Preconditions.checkArgument(indexOf >= 0);
                batchExecNode2.replaceInputEdge(indexOf, ExecEdge.builder().source(batchExecInputAdapter).target(batchExecNode2).build());
                arrayList2.add(new InputSelectionSpec(i2, array[i]));
                i++;
            }
            CodeGeneratorContext codeGeneratorContext = new CodeGeneratorContext(execNodeConfig, plannerBase.getFlinkContext().getClassLoader());
            OpFusionCodegenSpecGenerator translateToFusionCodegenSpec = this.rootNode.translateToFusionCodegenSpec(plannerBase, codeGeneratorContext);
            OneInputOpFusionCodegenSpecGenerator oneInputOpFusionCodegenSpecGenerator = new OneInputOpFusionCodegenSpecGenerator(translateToFusionCodegenSpec, 0L, getOutputType(), new OutputFusionCodegenSpec(new CodeGeneratorContext(execNodeConfig, plannerBase.getFlinkContext().getClassLoader(), codeGeneratorContext)));
            translateToFusionCodegenSpec.addOutput(1, oneInputOpFusionCodegenSpecGenerator);
            Tuple2<OperatorFusionCodegenFactory<RowData>, Object> generateFusionOperator = FusionCodegenUtil.generateFusionOperator(oneInputOpFusionCodegenSpecGenerator, arrayList2, codeGeneratorContext);
            batchMultipleInputStreamOperatorFactory = (StreamOperatorFactory) generateFusionOperator._1;
            Pair<Integer, Integer> inputMaxParallelism = getInputMaxParallelism(arrayList);
            parallelism = ((Integer) inputMaxParallelism.getLeft()).intValue();
            maxParallelism = ((Integer) inputMaxParallelism.getRight()).intValue();
            managedMemoryWeight = ((Long) generateFusionOperator._2).longValue();
        } else {
            TableOperatorWrapperGenerator tableOperatorWrapperGenerator = new TableOperatorWrapperGenerator(arrayList, translateToPlan, array);
            tableOperatorWrapperGenerator.generate();
            List inputTransformAndInputSpecPairs = tableOperatorWrapperGenerator.getInputTransformAndInputSpecPairs();
            batchMultipleInputStreamOperatorFactory = new BatchMultipleInputStreamOperatorFactory((List) inputTransformAndInputSpecPairs.stream().map((v0) -> {
                return v0.getValue();
            }).collect(Collectors.toList()), tableOperatorWrapperGenerator.getHeadWrappers(), tableOperatorWrapperGenerator.getTailWrapper());
            parallelism = tableOperatorWrapperGenerator.getParallelism();
            maxParallelism = tableOperatorWrapperGenerator.getMaxParallelism();
            managedMemoryWeight = tableOperatorWrapperGenerator.getManagedMemoryWeight() << 20;
            resourceSpec = tableOperatorWrapperGenerator.getMinResources();
            resourceSpec2 = tableOperatorWrapperGenerator.getPreferredResources();
            arrayList = (List) inputTransformAndInputSpecPairs.stream().map((v0) -> {
                return v0.getKey();
            }).collect(Collectors.toList());
        }
        MultipleInputTransformation multipleInputTransformation = new MultipleInputTransformation(createTransformationName(execNodeConfig), batchMultipleInputStreamOperatorFactory, InternalTypeInfo.of(getOutputType()), parallelism, false);
        multipleInputTransformation.setDescription(createTransformationDescription(execNodeConfig));
        if (maxParallelism > 0) {
            multipleInputTransformation.setMaxParallelism(maxParallelism);
        }
        Iterator<Transformation<?>> it2 = arrayList.iterator();
        while (it2.hasNext()) {
            multipleInputTransformation.addInput(it2.next());
        }
        if (resourceSpec != null && resourceSpec2 != null) {
            multipleInputTransformation.setResources(resourceSpec, resourceSpec2);
        }
        multipleInputTransformation.setDescription(createTransformationDescription(execNodeConfig));
        ExecNodeUtil.setManagedMemoryWeight(multipleInputTransformation, managedMemoryWeight);
        multipleInputTransformation.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
        return multipleInputTransformation;
    }

    public List<ExecEdge> getOriginalEdges() {
        return this.originalEdges;
    }

    @VisibleForTesting
    public ExecNode<?> getRootNode() {
        return this.rootNode;
    }

    private boolean allSupportFusionCodegen() {
        return ((Boolean) this.memberExecNodes.stream().map((v0) -> {
            return v0.supportFusionCodegen();
        }).reduce(true, (bool, bool2) -> {
            return Boolean.valueOf(bool.booleanValue() && bool2.booleanValue());
        })).booleanValue();
    }

    private Pair<Integer, Integer> getInputMaxParallelism(List<Transformation<?>> list) {
        int i = -1;
        int i2 = -1;
        for (Transformation<?> transformation : list) {
            i = Math.max(i, transformation.getParallelism());
            i2 = Math.max(i2, transformation.getMaxParallelism());
        }
        return Pair.of(Integer.valueOf(i), Integer.valueOf(i2));
    }
}
