/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.GroupReduceCombineDriver;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.runtime.testutils.recordutils.RecordComparator;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class CombineTaskExternalITCase
extends DriverTestBase<RichGroupReduceFunction<Record, ?>> {
    private static final long COMBINE_MEM = 0x300000L;
    private final double combine_frac;
    private final ArrayList<Record> outList = new ArrayList();
    private final RecordComparator comparator = new RecordComparator(new int[]{0}, new Class[]{IntValue.class});

    public CombineTaskExternalITCase(ExecutionConfig config) {
        super(config, 0x300000L, 0);
        this.combine_frac = 3145728.0 / (double)this.getMemoryManager().getMemorySize();
    }

    @Test
    public void testSingleLevelMergeCombineTask() {
        int keyCnt = 40000;
        int valCnt = 8;
        this.addInput(new UniformRecordGenerator(40000, 8, false));
        this.addDriverComparator(this.comparator);
        this.addDriverComparator(this.comparator);
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
        this.getTaskConfig().setRelativeMemoryDriver(this.combine_frac);
        this.getTaskConfig().setFilehandlesDriver(2);
        GroupReduceCombineDriver testTask = new GroupReduceCombineDriver();
        try {
            this.testDriver((Driver)testTask, MockCombiningReduceStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)"Invoke method caused exception.");
        }
        int expSum = 0;
        for (int i = 1; i < 8; ++i) {
            expSum += i;
        }
        HashMap<IntValue, IntValue> aggMap = new HashMap<IntValue, IntValue>();
        for (Record record : this.outList) {
            IntValue key = new IntValue();
            IntValue value = new IntValue();
            key = (IntValue)record.getField(0, (Value)key);
            value = (IntValue)record.getField(1, (Value)value);
            IntValue prevVal = (IntValue)aggMap.get(key);
            if (prevVal != null) {
                aggMap.put(key, new IntValue(prevVal.getValue() + value.getValue()));
                continue;
            }
            aggMap.put(key, value);
        }
        Assert.assertTrue((String)("Resultset size was " + aggMap.size() + ". Expected was " + 40000), (aggMap.size() == 40000 ? 1 : 0) != 0);
        for (IntValue integer : aggMap.values()) {
            Assert.assertTrue((String)"Incorrect result", (integer.getValue() == expSum ? 1 : 0) != 0);
        }
        this.outList.clear();
    }

    @Test
    public void testMultiLevelMergeCombineTask() throws Exception {
        int keyCnt = 100000;
        int valCnt = 8;
        this.addInput(new UniformRecordGenerator(100000, 8, false));
        this.addDriverComparator(this.comparator);
        this.addDriverComparator(this.comparator);
        this.setOutput(this.outList);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
        this.getTaskConfig().setRelativeMemoryDriver(this.combine_frac);
        this.getTaskConfig().setFilehandlesDriver(2);
        GroupReduceCombineDriver testTask = new GroupReduceCombineDriver();
        try {
            this.testDriver((Driver)testTask, MockCombiningReduceStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)"Invoke method caused exception.");
        }
        int expSum = 0;
        for (int i = 1; i < 8; ++i) {
            expSum += i;
        }
        HashMap<IntValue, IntValue> aggMap = new HashMap<IntValue, IntValue>();
        for (Record record : this.outList) {
            IntValue key = new IntValue();
            IntValue value = new IntValue();
            key = (IntValue)record.getField(0, (Value)key);
            value = (IntValue)record.getField(1, (Value)value);
            IntValue prevVal = (IntValue)aggMap.get(key);
            if (prevVal != null) {
                aggMap.put(key, new IntValue(prevVal.getValue() + value.getValue()));
                continue;
            }
            aggMap.put(key, value);
        }
        Assert.assertTrue((String)("Resultset size was " + aggMap.size() + ". Expected was " + 100000), (aggMap.size() == 100000 ? 1 : 0) != 0);
        for (IntValue integer : aggMap.values()) {
            Assert.assertTrue((String)"Incorrect result", (integer.getValue() == expSum ? 1 : 0) != 0);
        }
        this.outList.clear();
    }

    public static final class MockFailingCombiningReduceStub
    implements GroupReduceFunction<Record, Record>,
    GroupCombineFunction<Record, Record> {
        private static final long serialVersionUID = 1L;
        private int cnt = 0;
        private final IntValue key = new IntValue();
        private final IntValue value = new IntValue();
        private final IntValue combineValue = new IntValue();

        public void reduce(Iterable<Record> records, Collector<Record> out) {
            Record element = null;
            int sum = 0;
            Iterator<Record> iterator = records.iterator();
            while (iterator.hasNext()) {
                Record next;
                element = next = iterator.next();
                element.getField(1, (Value)this.value);
                sum += this.value.getValue();
            }
            element.getField(0, (Value)this.key);
            this.value.setValue(sum - this.key.getValue());
            element.setField(1, (Value)this.value);
            out.collect((Object)element);
        }

        public void combine(Iterable<Record> records, Collector<Record> out) {
            Record element = null;
            int sum = 0;
            Iterator<Record> iterator = records.iterator();
            while (iterator.hasNext()) {
                Record next;
                element = next = iterator.next();
                element.getField(1, (Value)this.combineValue);
                sum += this.combineValue.getValue();
            }
            if (++this.cnt >= 10) {
                throw new ExpectedTestException();
            }
            this.combineValue.setValue(sum);
            element.setField(1, (Value)this.combineValue);
            out.collect((Object)element);
        }
    }

    public static class MockCombiningReduceStub
    implements GroupReduceFunction<Record, Record>,
    GroupCombineFunction<Record, Record> {
        private static final long serialVersionUID = 1L;
        private final IntValue theInteger = new IntValue();

        public void reduce(Iterable<Record> records, Collector<Record> out) {
            Record element = null;
            int sum = 0;
            Iterator<Record> iterator = records.iterator();
            while (iterator.hasNext()) {
                Record next;
                element = next = iterator.next();
                element.getField(1, (Value)this.theInteger);
                sum += this.theInteger.getValue();
            }
            this.theInteger.setValue(sum);
            element.setField(1, (Value)this.theInteger);
            out.collect((Object)element);
        }

        public void combine(Iterable<Record> records, Collector<Record> out) throws Exception {
            this.reduce(records, out);
        }
    }
}

