/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.translators;

import java.util.Collection;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.TransformationTranslator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BatchGroupedReduceOperator;
import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamGroupedReduceAsyncStateOperator;
import org.apache.flink.streaming.api.operators.StreamGroupedReduceOperator;
import org.apache.flink.streaming.api.transformations.ReduceTransformation;
import org.apache.flink.streaming.runtime.translators.AbstractOneInputTransformationTranslator;
import org.apache.flink.streaming.runtime.translators.BatchExecutionUtils;

public class ReduceTransformationTranslator<IN, KEY>
extends AbstractOneInputTransformationTranslator<IN, IN, ReduceTransformation<IN, KEY>> {
    @Override
    public Collection<Integer> translateForBatchInternal(ReduceTransformation<IN, KEY> transformation, TransformationTranslator.Context context) {
        BatchGroupedReduceOperator groupedReduce = new BatchGroupedReduceOperator(transformation.getReducer(), transformation.getInputType().createSerializer(context.getStreamGraph().getExecutionConfig().getSerializerConfig()));
        SimpleOperatorFactory operatorFactory = SimpleOperatorFactory.of(groupedReduce);
        operatorFactory.setChainingStrategy(transformation.getChainingStrategy());
        Collection<Integer> ids = this.translateInternal(transformation, operatorFactory, transformation.getInputType(), transformation.getKeySelector(), transformation.getKeyTypeInfo(), context);
        BatchExecutionUtils.applyBatchExecutionSettings(transformation.getId(), context, StreamConfig.InputRequirement.SORTED);
        return ids;
    }

    @Override
    public Collection<Integer> translateForStreamingInternal(ReduceTransformation<IN, KEY> transformation, TransformationTranslator.Context context) {
        AbstractStreamOperator groupedReduce = transformation.isEnableAsyncState() ? new StreamGroupedReduceAsyncStateOperator<IN>(transformation.getReducer(), transformation.getInputType().createSerializer(context.getStreamGraph().getExecutionConfig().getSerializerConfig())) : new StreamGroupedReduceOperator<IN>(transformation.getReducer(), transformation.getInputType().createSerializer(context.getStreamGraph().getExecutionConfig().getSerializerConfig()));
        SimpleOperatorFactory<IN> operatorFactory = SimpleOperatorFactory.of(groupedReduce);
        operatorFactory.setChainingStrategy(transformation.getChainingStrategy());
        return this.translateInternal(transformation, operatorFactory, transformation.getInputType(), transformation.getKeySelector(), transformation.getKeyTypeInfo(), context);
    }
}

