/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.translators;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.graph.SimpleTransformationTranslator;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.transformations.AbstractMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler;
import org.apache.flink.streaming.runtime.translators.BatchExecutionUtils;
import org.apache.flink.util.Preconditions;

@Internal
public class MultiInputTransformationTranslator<OUT>
extends SimpleTransformationTranslator<OUT, AbstractMultipleInputTransformation<OUT>> {
    @Override
    protected Collection<Integer> translateForBatchInternal(AbstractMultipleInputTransformation<OUT> transformation, TransformationTranslator.Context context) {
        Collection<Integer> ids = this.translateInternal(transformation, context);
        this.maybeApplyBatchExecutionSettings(transformation, context);
        return ids;
    }

    @Override
    protected Collection<Integer> translateForStreamingInternal(AbstractMultipleInputTransformation<OUT> transformation, TransformationTranslator.Context context) {
        Collection<Integer> ids = this.translateInternal(transformation, context);
        if (transformation.isOutputOnlyAfterEndOfStream()) {
            this.maybeApplyBatchExecutionSettings(transformation, context);
        }
        return ids;
    }

    private Collection<Integer> translateInternal(AbstractMultipleInputTransformation<OUT> transformation, TransformationTranslator.Context context) {
        Preconditions.checkNotNull(transformation);
        Preconditions.checkNotNull((Object)context);
        List<Transformation<?>> inputTransformations = transformation.getInputs();
        Preconditions.checkArgument((!inputTransformations.isEmpty() ? 1 : 0) != 0, (Object)"Empty inputs for MultipleInputTransformation. Did you forget to add inputs?");
        MultipleInputSelectionHandler.checkSupportedInputCount(inputTransformations.size());
        StreamGraph streamGraph = context.getStreamGraph();
        String slotSharingGroup = context.getSlotSharingGroup();
        int transformationId = transformation.getId();
        ExecutionConfig executionConfig = streamGraph.getExecutionConfig();
        streamGraph.addMultipleInputOperator(transformationId, slotSharingGroup, transformation.getCoLocationGroupKey(), transformation.getOperatorFactory(), transformation.getInputTypes(), transformation.getOutputType(), transformation.getName());
        streamGraph.setAttribute(transformationId, transformation.getAttribute());
        int parallelism = transformation.getParallelism() != -1 ? transformation.getParallelism() : executionConfig.getParallelism();
        streamGraph.setParallelism(transformationId, parallelism, transformation.isParallelismConfigured());
        streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
        if (transformation instanceof KeyedMultipleInputTransformation) {
            KeyedMultipleInputTransformation keyedTransform = (KeyedMultipleInputTransformation)transformation;
            TypeSerializer keySerializer = keyedTransform.getStateKeyType().createSerializer(executionConfig.getSerializerConfig());
            streamGraph.setMultipleInputStateKey(transformationId, keyedTransform.getStateKeySelectors(), keySerializer);
        }
        for (int i = 0; i < inputTransformations.size(); ++i) {
            Transformation<?> inputTransformation = inputTransformations.get(i);
            Collection<Integer> inputIds = context.getStreamNodeIds(inputTransformation);
            for (Integer inputId : inputIds) {
                streamGraph.addEdge(inputId, transformationId, i + 1);
            }
        }
        streamGraph.setSupportsConcurrentExecutionAttempts(transformationId, transformation.isSupportsConcurrentExecutionAttempts());
        return Collections.singleton(transformationId);
    }

    private void maybeApplyBatchExecutionSettings(AbstractMultipleInputTransformation<OUT> transformation, TransformationTranslator.Context context) {
        if (transformation instanceof KeyedMultipleInputTransformation && !transformation.isInternalSorterSupported()) {
            KeyedMultipleInputTransformation keyedTransformation = (KeyedMultipleInputTransformation)transformation;
            List<Transformation<?>> inputs = transformation.getInputs();
            List<KeySelector<?, ?>> keySelectors = keyedTransformation.getStateKeySelectors();
            StreamConfig.InputRequirement[] inputRequirements = (StreamConfig.InputRequirement[])IntStream.range(0, inputs.size()).mapToObj(idx -> {
                if (keySelectors.get(idx) != null) {
                    return StreamConfig.InputRequirement.SORTED;
                }
                return StreamConfig.InputRequirement.PASS_THROUGH;
            }).toArray(StreamConfig.InputRequirement[]::new);
            BatchExecutionUtils.applyBatchExecutionSettings(transformation.getId(), context, inputRequirements);
        }
    }
}

