/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.api.java.operators;

import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.SingleInputOperator;
import org.apache.flink.api.common.operators.UnaryOperatorInformation;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.api.java.operators.translation.KeyExtractingMapper;
import org.apache.flink.api.java.operators.translation.KeyRemovingMapper;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;

public class PartitionOperator<T>
extends SingleInputUdfOperator<T, T, PartitionOperator<T>> {
    private final Keys<T> pKeys;
    private final PartitionOperatorBase.PartitionMethod pMethod;

    public PartitionOperator(DataSet<T> input, PartitionOperatorBase.PartitionMethod pMethod, Keys<T> pKeys) {
        super(input, input.getType());
        if (pMethod == PartitionOperatorBase.PartitionMethod.HASH && pKeys == null) {
            throw new IllegalArgumentException("Hash Partitioning requires keys");
        }
        if (pMethod == PartitionOperatorBase.PartitionMethod.RANGE) {
            throw new UnsupportedOperationException("Range Partitioning not yet supported");
        }
        if (pKeys instanceof Keys.ExpressionKeys && !(input.getType() instanceof CompositeType)) {
            throw new IllegalArgumentException("Hash Partitioning with key fields only possible on Composite-type DataSets");
        }
        this.pMethod = pMethod;
        this.pKeys = pKeys;
    }

    public PartitionOperator(DataSet<T> input, PartitionOperatorBase.PartitionMethod pMethod) {
        this(input, pMethod, null);
    }

    @Override
    protected SingleInputOperator<?, T, ?> translateToDataFlow(Operator<T> input) {
        String name = "Partition";
        if (this.pMethod == PartitionOperatorBase.PartitionMethod.REBALANCE) {
            UnaryOperatorInformation operatorInfo = new UnaryOperatorInformation(this.getType(), this.getType());
            PartitionOperatorBase noop = new PartitionOperatorBase(operatorInfo, this.pMethod, name);
            noop.setInput(input);
            noop.setDegreeOfParallelism(this.getParallelism());
            return noop;
        }
        if (this.pMethod == PartitionOperatorBase.PartitionMethod.HASH) {
            if (this.pKeys instanceof Keys.ExpressionKeys) {
                int[] logicalKeyPositions = this.pKeys.computeLogicalKeyPositions();
                UnaryOperatorInformation operatorInfo = new UnaryOperatorInformation(this.getType(), this.getType());
                PartitionOperatorBase noop = new PartitionOperatorBase(operatorInfo, this.pMethod, logicalKeyPositions, name);
                noop.setInput(input);
                noop.setDegreeOfParallelism(this.getParallelism());
                return noop;
            }
            if (this.pKeys instanceof Keys.SelectorFunctionKeys) {
                Keys.SelectorFunctionKeys selectorKeys = (Keys.SelectorFunctionKeys)this.pKeys;
                MapOperatorBase po = PartitionOperator.translateSelectorFunctionReducer(selectorKeys, this.pMethod, this.getType(), name, input, this.getParallelism());
                return po;
            }
            throw new UnsupportedOperationException("Unrecognized key type.");
        }
        if (this.pMethod == PartitionOperatorBase.PartitionMethod.RANGE) {
            throw new UnsupportedOperationException("Range partitioning not yet supported");
        }
        return null;
    }

    private static <T, K> MapOperatorBase<Tuple2<K, T>, T, ?> translateSelectorFunctionReducer(Keys.SelectorFunctionKeys<T, ?> rawKeys, PartitionOperatorBase.PartitionMethod pMethod, TypeInformation<T> inputType, String name, Operator<T> input, int partitionDop) {
        Keys.SelectorFunctionKeys<T, ?> keys = rawKeys;
        TupleTypeInfo typeInfoWithKey = new TupleTypeInfo(keys.getKeyType(), inputType);
        UnaryOperatorInformation operatorInfo = new UnaryOperatorInformation(typeInfoWithKey, typeInfoWithKey);
        KeyExtractingMapper extractor = new KeyExtractingMapper(keys.getKeyExtractor());
        MapOperatorBase keyExtractingMap = new MapOperatorBase(extractor, new UnaryOperatorInformation(inputType, typeInfoWithKey), "Key Extractor");
        PartitionOperatorBase noop = new PartitionOperatorBase(operatorInfo, pMethod, new int[]{0}, name);
        MapOperatorBase keyRemovingMap = new MapOperatorBase(new KeyRemovingMapper(), new UnaryOperatorInformation(typeInfoWithKey, inputType), "Key Extractor");
        keyExtractingMap.setInput(input);
        noop.setInput((Operator)keyExtractingMap);
        keyRemovingMap.setInput((Operator)noop);
        keyExtractingMap.setDegreeOfParallelism(input.getDegreeOfParallelism());
        noop.setDegreeOfParallelism(partitionDop);
        keyRemovingMap.setDegreeOfParallelism(partitionDop);
        return keyRemovingMap;
    }
}

