/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.Preconditions;

public class RoundRobinOperatorStateRepartitioner
implements OperatorStateRepartitioner {
    public static final OperatorStateRepartitioner INSTANCE = new RoundRobinOperatorStateRepartitioner();
    private static final boolean OPTIMIZE_MEMORY_USE = false;

    @Override
    public List<Collection<OperatorStateHandle>> repartitionState(List<OperatorStateHandle> previousParallelSubtaskStates, int parallelism) {
        Preconditions.checkNotNull(previousParallelSubtaskStates);
        Preconditions.checkArgument((parallelism > 0 ? 1 : 0) != 0);
        GroupByStateNameResults nameToStateByMode = this.groupByStateName(previousParallelSubtaskStates);
        ArrayList<Collection<OperatorStateHandle>> result = new ArrayList<Collection<OperatorStateHandle>>(parallelism);
        List<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList = this.repartition(nameToStateByMode, parallelism);
        for (int i = 0; i < mergeMapList.size(); ++i) {
            result.add(i, new ArrayList<OperatorStateHandle>(mergeMapList.get(i).values()));
        }
        return result;
    }

    private GroupByStateNameResults groupByStateName(List<OperatorStateHandle> previousParallelSubtaskStates) {
        EnumMap<OperatorStateHandle.Mode, Map<String, List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>>>> nameToStateByMode = new EnumMap<OperatorStateHandle.Mode, Map<String, List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>>>>(OperatorStateHandle.Mode.class);
        for (OperatorStateHandle.Mode mode : OperatorStateHandle.Mode.values()) {
            HashMap map = new HashMap();
            nameToStateByMode.put(mode, new HashMap());
        }
        for (OperatorStateHandle psh : previousParallelSubtaskStates) {
            if (psh == null) continue;
            for (Map.Entry<String, OperatorStateHandle.StateMetaInfo> e : psh.getStateNameToPartitionOffsets().entrySet()) {
                OperatorStateHandle.StateMetaInfo metaInfo = e.getValue();
                Map<String, List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>>> nameToState = nameToStateByMode.get((Object)metaInfo.getDistributionMode());
                List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>> stateLocations = nameToState.get(e.getKey());
                if (stateLocations == null) {
                    stateLocations = new ArrayList<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>>();
                    nameToState.put(e.getKey(), stateLocations);
                }
                stateLocations.add((Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>)new Tuple2((Object)psh.getDelegateStateHandle(), (Object)e.getValue()));
            }
        }
        return new GroupByStateNameResults(nameToStateByMode);
    }

    private List<Map<StreamStateHandle, OperatorStateHandle>> repartition(GroupByStateNameResults nameToStateByMode, int parallelism) {
        ArrayList<Map<StreamStateHandle, OperatorStateHandle>> mergeMapList = new ArrayList<Map<StreamStateHandle, OperatorStateHandle>>(parallelism);
        for (int i = 0; i < parallelism; ++i) {
            mergeMapList.add(new HashMap());
        }
        Map<String, List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>>> distributeNameToState = nameToStateByMode.getByMode(OperatorStateHandle.Mode.SPLIT_DISTRIBUTE);
        int startParallelOp = 0;
        for (Map.Entry<String, List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>>> e : distributeNameToState.entrySet()) {
            List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>> current = e.getValue();
            int totalPartitions = 0;
            for (Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo> offsets : current) {
                totalPartitions += ((OperatorStateHandle.StateMetaInfo)offsets.f1).getOffsets().length;
            }
            int lstIdx = 0;
            int offsetIdx = 0;
            int baseFraction = totalPartitions / parallelism;
            int remainder = totalPartitions % parallelism;
            int newStartParallelOp = startParallelOp;
            for (int i = 0; i < parallelism; ++i) {
                int parallelOpIdx = (i + startParallelOp) % parallelism;
                int numberOfPartitionsToAssign = baseFraction;
                if (remainder > 0) {
                    ++numberOfPartitionsToAssign;
                    --remainder;
                } else if (remainder == 0) {
                    newStartParallelOp = parallelOpIdx;
                    --remainder;
                }
                ArrayList<Tuple2> parallelOperatorState = new ArrayList<Tuple2>();
                while (numberOfPartitionsToAssign > 0) {
                    long[] offs;
                    Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo> handleWithOffsets = current.get(lstIdx);
                    long[] offsets = ((OperatorStateHandle.StateMetaInfo)handleWithOffsets.f1).getOffsets();
                    int remaining = offsets.length - offsetIdx;
                    if (remaining > numberOfPartitionsToAssign) {
                        offs = Arrays.copyOfRange(offsets, offsetIdx, offsetIdx + numberOfPartitionsToAssign);
                        offsetIdx += numberOfPartitionsToAssign;
                    } else {
                        offs = Arrays.copyOfRange(offsets, offsetIdx, offsets.length);
                        offsetIdx = 0;
                        ++lstIdx;
                    }
                    parallelOperatorState.add(new Tuple2(handleWithOffsets.f0, (Object)new OperatorStateHandle.StateMetaInfo(offs, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)));
                    numberOfPartitionsToAssign -= remaining;
                    Map mergeMap = (Map)mergeMapList.get(parallelOpIdx);
                    OperatorStateHandle operatorStateHandle = (OperatorStateHandle)mergeMap.get(handleWithOffsets.f0);
                    if (operatorStateHandle == null) {
                        operatorStateHandle = new OperatorStateHandle(new HashMap<String, OperatorStateHandle.StateMetaInfo>(), (StreamStateHandle)handleWithOffsets.f0);
                        mergeMap.put(handleWithOffsets.f0, operatorStateHandle);
                    }
                    operatorStateHandle.getStateNameToPartitionOffsets().put(e.getKey(), new OperatorStateHandle.StateMetaInfo(offs, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
                }
            }
            startParallelOp = newStartParallelOp;
            e.setValue(null);
        }
        Map<String, List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>>> broadcastNameToState = nameToStateByMode.getByMode(OperatorStateHandle.Mode.BROADCAST);
        for (int i = 0; i < parallelism; ++i) {
            Map mergeMap = (Map)mergeMapList.get(i);
            for (Map.Entry<String, List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>>> e : broadcastNameToState.entrySet()) {
                List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>> current = e.getValue();
                for (Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo> handleWithMetaInfo : current) {
                    OperatorStateHandle operatorStateHandle = (OperatorStateHandle)mergeMap.get(handleWithMetaInfo.f0);
                    if (operatorStateHandle == null) {
                        operatorStateHandle = new OperatorStateHandle(new HashMap<String, OperatorStateHandle.StateMetaInfo>(), (StreamStateHandle)handleWithMetaInfo.f0);
                        mergeMap.put(handleWithMetaInfo.f0, operatorStateHandle);
                    }
                    operatorStateHandle.getStateNameToPartitionOffsets().put(e.getKey(), (OperatorStateHandle.StateMetaInfo)handleWithMetaInfo.f1);
                }
            }
        }
        return mergeMapList;
    }

    private static final class GroupByStateNameResults {
        private final EnumMap<OperatorStateHandle.Mode, Map<String, List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>>>> byMode;

        public GroupByStateNameResults(EnumMap<OperatorStateHandle.Mode, Map<String, List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>>>> byMode) {
            this.byMode = (EnumMap)Preconditions.checkNotNull(byMode);
        }

        public Map<String, List<Tuple2<StreamStateHandle, OperatorStateHandle.StateMetaInfo>>> getByMode(OperatorStateHandle.Mode mode) {
            return this.byMode.get((Object)mode);
        }
    }
}

