/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.java.operators;

import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.operators.SingleInputOperator;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
import org.apache.flink.api.java.operators.translation.PlanUnwrappingReduceGroupOperator;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfoBase;
import org.apache.flink.util.Collector;

public class DistinctOperator<T>
extends SingleInputOperator<T, T, DistinctOperator<T>> {
    private final Keys<T> keys;

    public DistinctOperator(DataSet<T> input, Keys<T> keys) {
        super(input, input.getType());
        if (keys == null) {
            if (input.getType().isTupleType()) {
                TupleTypeInfoBase tupleType = (TupleTypeInfoBase)input.getType();
                int[] allFields = new int[tupleType.getArity()];
                for (int i = 0; i < tupleType.getArity(); ++i) {
                    allFields[i] = i;
                }
                keys = new Keys.ExpressionKeys<T>(allFields, input.getType(), true);
            } else {
                throw new InvalidProgramException("Distinction on all fields is only possible on tuple data types.");
            }
        }
        if (keys instanceof Keys.ExpressionKeys && !(input.getType() instanceof CompositeType)) {
            throw new InvalidProgramException("Distinction on field positions is only possible on composite type DataSets.");
        }
        this.keys = keys;
    }

    @Override
    protected GroupReduceOperatorBase<?, T, ?> translateToDataFlow(Operator<T> input) {
        DistinctFunction function = new DistinctFunction();
        String name = ((Object)((Object)function)).getClass().getName();
        if (this.keys instanceof Keys.ExpressionKeys) {
            int[] logicalKeyPositions = this.keys.computeLogicalKeyPositions();
            UnaryOperatorInformation operatorInfo = new UnaryOperatorInformation(this.getInputType(), this.getResultType());
            GroupReduceOperatorBase po = new GroupReduceOperatorBase(function, operatorInfo, logicalKeyPositions, name);
            po.setCombinable(true);
            po.setInput(input);
            po.setDegreeOfParallelism(this.getParallelism());
            return po;
        }
        if (this.keys instanceof Keys.SelectorFunctionKeys) {
            Keys.SelectorFunctionKeys selectorKeys = (Keys.SelectorFunctionKeys)this.keys;
            PlanUnwrappingReduceGroupOperator po = DistinctOperator.translateSelectorFunctionDistinct(selectorKeys, function, this.getInputType(), this.getResultType(), name, input);
            po.setDegreeOfParallelism(this.getParallelism());
            return po;
        }
        throw new UnsupportedOperationException("Unrecognized key type.");
    }

    private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, K> translateSelectorFunctionDistinct(Keys.SelectorFunctionKeys<IN, ?> rawKeys, RichGroupReduceFunction<IN, OUT> function, TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input) {
        Keys.SelectorFunctionKeys<IN, ?> keys = rawKeys;
        TupleTypeInfo typeInfoWithKey = new TupleTypeInfo(keys.getKeyType(), inputType);
        KeyExtractingMapper extractor = new KeyExtractingMapper(keys.getKeyExtractor());
        PlanUnwrappingReduceGroupOperator reducer = new PlanUnwrappingReduceGroupOperator(function, keys, name, outputType, typeInfoWithKey, true);
        MapOperatorBase mapper = new MapOperatorBase(extractor, new UnaryOperatorInformation(inputType, typeInfoWithKey), "Key Extractor");
        reducer.setInput((Operator)mapper);
        mapper.setInput(input);
        mapper.setDegreeOfParallelism(input.getDegreeOfParallelism());
        return reducer;
    }

    @RichGroupReduceFunction.Combinable
    public static final class DistinctFunction<T>
    extends RichGroupReduceFunction<T, T> {
        private static final long serialVersionUID = 1L;

        public void reduce(Iterable<T> values, Collector<T> out) {
            out.collect(values.iterator().next());
        }
    }
}

