/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.TaskContext;
import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
import org.apache.flink.runtime.operators.sort.InMemorySorter;
import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.runtime.util.NonReusingKeyGroupedIterator;
import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GroupReduceCombineDriver<IN, OUT>
implements Driver<GroupCombineFunction<IN, OUT>, OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(GroupReduceCombineDriver.class);
    private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
    private TaskContext<GroupCombineFunction<IN, OUT>, OUT> taskContext;
    private InMemorySorter<IN> sorter;
    private GroupCombineFunction<IN, OUT> combiner;
    private TypeSerializer<IN> serializer;
    private TypeComparator<IN> groupingComparator;
    private QuickSort sortAlgo = new QuickSort();
    private Collector<OUT> output;
    private List<MemorySegment> memory;
    private long oversizedRecordCount;
    private volatile boolean running = true;
    private boolean objectReuseEnabled = false;

    @Override
    public void setup(TaskContext<GroupCombineFunction<IN, OUT>, OUT> context) {
        this.taskContext = context;
        this.running = true;
    }

    @Override
    public int getNumberOfInputs() {
        return 1;
    }

    @Override
    public Class<GroupCombineFunction<IN, OUT>> getStubType() {
        Class<GroupCombineFunction> clazz = GroupCombineFunction.class;
        return clazz;
    }

    @Override
    public int getNumberOfDriverComparators() {
        return 2;
    }

    @Override
    public void prepare() throws Exception {
        DriverStrategy driverStrategy = this.taskContext.getTaskConfig().getDriverStrategy();
        if (driverStrategy != DriverStrategy.SORTED_GROUP_COMBINE) {
            throw new Exception("Invalid strategy " + driverStrategy + " for group reduce combiner.");
        }
        TypeSerializerFactory serializerFactory = this.taskContext.getInputSerializer(0);
        this.serializer = serializerFactory.getSerializer();
        TypeComparator sortingComparator = this.taskContext.getDriverComparator(0);
        this.groupingComparator = this.taskContext.getDriverComparator(1);
        this.combiner = this.taskContext.getStub();
        this.output = this.taskContext.getOutputCollector();
        MemoryManager memManager = this.taskContext.getMemoryManager();
        int numMemoryPages = memManager.computeNumberOfPages(this.taskContext.getTaskConfig().getRelativeMemoryDriver());
        this.memory = memManager.allocatePages(this.taskContext.getContainingTask(), numMemoryPages);
        this.sorter = sortingComparator.supportsSerializationWithKeyNormalization() && this.serializer.getLength() > 0 && this.serializer.getLength() <= 32 ? new FixedLengthRecordSorter<IN>(this.serializer, sortingComparator.duplicate(), this.memory) : new NormalizedKeySorter<IN>(this.serializer, sortingComparator.duplicate(), this.memory);
        ExecutionConfig executionConfig = this.taskContext.getExecutionConfig();
        this.objectReuseEnabled = executionConfig.isObjectReuseEnabled();
        if (LOG.isDebugEnabled()) {
            LOG.debug("GroupReduceCombineDriver object reuse: {}.", (Object)(this.objectReuseEnabled ? "ENABLED" : "DISABLED"));
        }
    }

    @Override
    public void run() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Combiner starting.");
        }
        MutableObjectIterator in = this.taskContext.getInput(0);
        TypeSerializer<IN> serializer = this.serializer;
        if (this.objectReuseEnabled) {
            Object value = serializer.createInstance();
            while (this.running && (value = in.next(value)) != null) {
                if (this.sorter.write(value)) continue;
                this.sortAndCombineAndRetryWrite(value);
            }
        } else {
            Object value;
            while (this.running && (value = in.next()) != null) {
                if (this.sorter.write(value)) continue;
                this.sortAndCombineAndRetryWrite(value);
            }
        }
        if (this.running) {
            this.sortAndCombine();
        }
    }

    private void sortAndCombine() throws Exception {
        if (this.sorter.isEmpty()) {
            return;
        }
        InMemorySorter<IN> sorter = this.sorter;
        this.sortAlgo.sort(sorter);
        GroupCombineFunction<IN, OUT> combiner = this.combiner;
        Collector<OUT> output = this.output;
        if (this.objectReuseEnabled) {
            ReusingKeyGroupedIterator<IN> keyIter = new ReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.serializer, this.groupingComparator);
            while (this.running && keyIter.nextKey()) {
                combiner.combine(keyIter.getValues(), output);
            }
        } else {
            NonReusingKeyGroupedIterator<IN> keyIter = new NonReusingKeyGroupedIterator<IN>(sorter.getIterator(), this.groupingComparator);
            while (this.running && keyIter.nextKey()) {
                combiner.combine(keyIter.getValues(), output);
            }
        }
    }

    private void sortAndCombineAndRetryWrite(IN value) throws Exception {
        this.sortAndCombine();
        this.sorter.reset();
        if (!this.sorter.write(value)) {
            ++this.oversizedRecordCount;
            LOG.debug("Cannot write record to fresh sort buffer, record is too large. Oversized record count: {}", (Object)this.oversizedRecordCount);
            Set<IN> input = Collections.singleton(value);
            this.combiner.combine(input, this.output);
            this.sorter.reset();
        }
    }

    @Override
    public void cleanup() throws Exception {
        if (this.sorter != null) {
            this.sorter.dispose();
        }
        this.taskContext.getMemoryManager().release(this.memory);
    }

    @Override
    public void cancel() {
        this.running = false;
        if (this.sorter != null) {
            try {
                this.sorter.dispose();
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        this.taskContext.getMemoryManager().release(this.memory);
    }

    public long getOversizedRecordCount() {
        return this.oversizedRecordCount;
    }
}

