/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.iterative.aggregators;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class AggregatorsITCase
extends MultipleProgramsTestBase {
    private static final int MAX_ITERATIONS = 20;
    private static final int parallelism = 2;
    private static final String NEGATIVE_ELEMENTS_AGGR = "count.negative.elements";
    @ClassRule
    public static TemporaryFolder tempFolder = new TemporaryFolder();

    public AggregatorsITCase(MultipleProgramsTestBase.TestExecutionMode mode) {
        super(mode);
    }

    @Test
    public void testDistributedCacheWithIterations() throws Exception {
        String testString = "Et tu, Brute?";
        String testName = "testing_caesar";
        File folder = tempFolder.newFolder();
        File resultFile = new File(folder, UUID.randomUUID().toString());
        String testPath = resultFile.toString();
        String resultPath = resultFile.toURI().toString();
        File tempFile = new File(testPath);
        try (FileWriter writer = new FileWriter(tempFile);){
            writer.write("Et tu, Brute?");
        }
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.registerCachedFile(resultPath, "testing_caesar");
        IterativeDataSet solution = env.fromElements((Object[])new Long[]{1L}).iterate(2);
        solution.closeWith((DataSet)env.generateSequence(1L, 2L).filter((FilterFunction)new RichFilterFunction<Long>(){

            public void open(Configuration parameters) throws Exception {
                File file = this.getRuntimeContext().getDistributedCache().getFile("testing_caesar");
                BufferedReader reader = new BufferedReader(new FileReader(file));
                String output = reader.readLine();
                reader.close();
                Assert.assertEquals((Object)output, (Object)"Et tu, Brute?");
            }

            public boolean filter(Long value) throws Exception {
                return false;
            }
        }).withBroadcastSet((DataSet)solution, "SOLUTION")).output((OutputFormat)new DiscardingOutputFormat());
        env.execute();
    }

    @Test
    public void testAggregatorWithoutParameterForIterate() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
        IterativeDataSet iteration = initialSolutionSet.iterate(20);
        LongSumAggregator aggr = new LongSumAggregator();
        iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, (Aggregator)aggr);
        iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, (Aggregator)aggr, (ConvergenceCriterion)new NegativeElementsConvergenceCriterion());
        MapOperator updatedDs = iteration.map((MapFunction)new SubtractOneMap());
        List result = iteration.closeWith((DataSet)updatedDs).collect();
        Collections.sort(result);
        List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);
        Assert.assertEquals(expected, (Object)result);
    }

    @Test
    public void testAggregatorWithParameterForIterate() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
        IterativeDataSet iteration = initialSolutionSet.iterate(20);
        LongSumAggregatorWithParameter aggr = new LongSumAggregatorWithParameter(0);
        iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, (Aggregator)aggr);
        iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, (Aggregator)aggr, (ConvergenceCriterion)new NegativeElementsConvergenceCriterion());
        MapOperator updatedDs = iteration.map((MapFunction)new SubtractOneMapWithParam());
        List result = iteration.closeWith((DataSet)updatedDs).collect();
        Collections.sort(result);
        List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);
        Assert.assertEquals(expected, (Object)result);
    }

    @Test
    public void testConvergenceCriterionWithParameterForIterate() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataSet<Integer> initialSolutionSet = CollectionDataSets.getIntegerDataSet(env);
        IterativeDataSet iteration = initialSolutionSet.iterate(20);
        LongSumAggregator aggr = new LongSumAggregator();
        iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, (Aggregator)aggr);
        iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, (Aggregator)aggr, (ConvergenceCriterion)new NegativeElementsConvergenceCriterionWithParam(3));
        MapOperator updatedDs = iteration.map((MapFunction)new SubtractOneMap());
        List result = iteration.closeWith((DataSet)updatedDs).collect();
        Collections.sort(result);
        List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);
        Assert.assertEquals(expected, (Object)result);
    }

    @Test
    public void testAggregatorWithoutParameterForIterateDelta() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        MapOperator initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map((MapFunction)new TupleMakerMap());
        DeltaIteration iteration = initialSolutionSet.iterateDelta((DataSet)initialSolutionSet, 20, new int[]{0});
        LongSumAggregator aggr = new LongSumAggregator();
        iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, (Aggregator)aggr);
        MapOperator updatedDs = iteration.getWorkset().map((MapFunction)new AggregateMapDelta());
        FlatMapOperator newElements = updatedDs.join((DataSet)iteration.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).flatMap((FlatMapFunction)new UpdateFilter());
        DataSet iterationRes = iteration.closeWith((DataSet)newElements, (DataSet)newElements);
        List result = iterationRes.map((MapFunction)new ProjectSecondMapper()).collect();
        Collections.sort(result);
        List<Integer> expected = Arrays.asList(1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5);
        Assert.assertEquals(expected, (Object)result);
    }

    @Test
    public void testAggregatorWithParameterForIterateDelta() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        MapOperator initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map((MapFunction)new TupleMakerMap());
        DeltaIteration iteration = initialSolutionSet.iterateDelta((DataSet)initialSolutionSet, 20, new int[]{0});
        LongSumAggregatorWithParameter aggr = new LongSumAggregatorWithParameter(4);
        iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, (Aggregator)aggr);
        MapOperator updatedDs = iteration.getWorkset().map((MapFunction)new AggregateMapDelta());
        FlatMapOperator newElements = updatedDs.join((DataSet)iteration.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).flatMap((FlatMapFunction)new UpdateFilter());
        DataSet iterationRes = iteration.closeWith((DataSet)newElements, (DataSet)newElements);
        List result = iterationRes.map((MapFunction)new ProjectSecondMapper()).collect();
        Collections.sort(result);
        List<Integer> expected = Arrays.asList(1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5);
        Assert.assertEquals((Object)result, expected);
    }

    @Test
    public void testConvergenceCriterionWithParameterForIterateDelta() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        MapOperator initialSolutionSet = CollectionDataSets.getIntegerDataSet(env).map((MapFunction)new TupleMakerMap());
        DeltaIteration iteration = initialSolutionSet.iterateDelta((DataSet)initialSolutionSet, 20, new int[]{0});
        LongSumAggregator aggr = new LongSumAggregator();
        iteration.registerAggregator(NEGATIVE_ELEMENTS_AGGR, (Aggregator)aggr);
        iteration.registerAggregationConvergenceCriterion(NEGATIVE_ELEMENTS_AGGR, (Aggregator)aggr, (ConvergenceCriterion)new NegativeElementsConvergenceCriterionWithParam(3));
        MapOperator updatedDs = iteration.getWorkset().map((MapFunction)new AggregateAndSubtractOneDelta());
        JoinOperator.ProjectJoin newElements = updatedDs.join((DataSet)iteration.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).projectFirst(new int[]{0, 1});
        DataSet iterationRes = iteration.closeWith((DataSet)newElements, (DataSet)newElements);
        List result = iterationRes.map((MapFunction)new ProjectSecondMapper()).collect();
        Collections.sort(result);
        List<Integer> expected = Arrays.asList(-3, -2, -2, -1, -1, -1, 0, 0, 0, 0, 1, 1, 1, 1, 1);
        Assert.assertEquals(expected, (Object)result);
    }

    private static final class AggregateAndSubtractOneDelta
    extends RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
        private LongSumAggregator aggr;
        private LongValue previousAggr;
        private int superstep;

        private AggregateAndSubtractOneDelta() {
        }

        public void open(Configuration conf) {
            this.aggr = (LongSumAggregator)this.getIterationRuntimeContext().getIterationAggregator(AggregatorsITCase.NEGATIVE_ELEMENTS_AGGR);
            this.superstep = this.getIterationRuntimeContext().getSuperstepNumber();
            if (this.superstep > 1) {
                this.previousAggr = (LongValue)this.getIterationRuntimeContext().getPreviousIterationAggregate(AggregatorsITCase.NEGATIVE_ELEMENTS_AGGR);
                Assert.assertEquals((long)(this.superstep - 1), (long)this.previousAggr.getValue());
            }
        }

        public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) {
            if ((Integer)value.f1 == 1) {
                this.aggr.aggregate(1L);
            }
            Tuple2<Integer, Integer> tuple2 = value;
            Integer n = (Integer)tuple2.f1;
            tuple2.f1 = (Integer)tuple2.f1 - 1;
            Integer n2 = tuple2.f1;
            return value;
        }
    }

    private static final class ProjectSecondMapper
    extends RichMapFunction<Tuple2<Integer, Integer>, Integer> {
        private ProjectSecondMapper() {
        }

        public Integer map(Tuple2<Integer, Integer> value) {
            return (Integer)value.f1;
        }
    }

    private static final class UpdateFilter
    extends RichFlatMapFunction<Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>, Tuple2<Integer, Integer>> {
        private int superstep;

        private UpdateFilter() {
        }

        public void open(Configuration conf) {
            this.superstep = this.getIterationRuntimeContext().getSuperstepNumber();
        }

        public void flatMap(Tuple2<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> value, Collector<Tuple2<Integer, Integer>> out) {
            if ((Integer)((Tuple2)value.f0).f1 > this.superstep) {
                out.collect(value.f0);
            }
        }
    }

    private static final class AggregateMapDelta
    extends RichMapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
        private LongSumAggregator aggr;
        private LongValue previousAggr;
        private int superstep;

        private AggregateMapDelta() {
        }

        public void open(Configuration conf) {
            this.aggr = (LongSumAggregator)this.getIterationRuntimeContext().getIterationAggregator(AggregatorsITCase.NEGATIVE_ELEMENTS_AGGR);
            this.superstep = this.getIterationRuntimeContext().getSuperstepNumber();
            if (this.superstep > 1) {
                this.previousAggr = (LongValue)this.getIterationRuntimeContext().getPreviousIterationAggregate(AggregatorsITCase.NEGATIVE_ELEMENTS_AGGR);
                Assert.assertEquals((long)(this.superstep - 1), (long)this.previousAggr.getValue());
            }
        }

        public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) {
            if ((Integer)value.f1 == this.superstep) {
                this.aggr.aggregate(1L);
            }
            return value;
        }
    }

    private static final class TupleMakerMap
    extends RichMapFunction<Integer, Tuple2<Integer, Integer>> {
        private Random rnd;

        private TupleMakerMap() {
        }

        public void open(Configuration parameters) {
            this.rnd = new Random(-4539650767900909907L + (long)this.getRuntimeContext().getIndexOfThisSubtask());
        }

        public Tuple2<Integer, Integer> map(Integer value) {
            Integer nodeId = this.rnd.nextInt(100000);
            return new Tuple2((Object)nodeId, (Object)value);
        }
    }

    private static class LongSumAggregatorWithParameter
    extends LongSumAggregator {
        private int value;

        public LongSumAggregatorWithParameter(int val) {
            this.value = val;
        }

        public int getValue() {
            return this.value;
        }
    }

    private static final class SubtractOneMapWithParam
    extends RichMapFunction<Integer, Integer> {
        private LongSumAggregatorWithParameter aggr;

        private SubtractOneMapWithParam() {
        }

        public void open(Configuration conf) {
            this.aggr = (LongSumAggregatorWithParameter)this.getIterationRuntimeContext().getIterationAggregator(AggregatorsITCase.NEGATIVE_ELEMENTS_AGGR);
        }

        public Integer map(Integer value) {
            Integer newValue = value - 1;
            if (newValue < this.aggr.getValue()) {
                this.aggr.aggregate(1L);
            }
            return newValue;
        }
    }

    private static final class SubtractOneMap
    extends RichMapFunction<Integer, Integer> {
        private LongSumAggregator aggr;

        private SubtractOneMap() {
        }

        public void open(Configuration conf) {
            this.aggr = (LongSumAggregator)this.getIterationRuntimeContext().getIterationAggregator(AggregatorsITCase.NEGATIVE_ELEMENTS_AGGR);
        }

        public Integer map(Integer value) {
            Integer newValue = value - 1;
            if (newValue < 0) {
                this.aggr.aggregate(1L);
            }
            return newValue;
        }
    }

    private static final class NegativeElementsConvergenceCriterionWithParam
    implements ConvergenceCriterion<LongValue> {
        private int value;

        public NegativeElementsConvergenceCriterionWithParam(int val) {
            this.value = val;
        }

        public int getValue() {
            return this.value;
        }

        public boolean isConverged(int iteration, LongValue value) {
            return value.getValue() > (long)this.value;
        }
    }

    private static final class NegativeElementsConvergenceCriterion
    implements ConvergenceCriterion<LongValue> {
        private NegativeElementsConvergenceCriterion() {
        }

        public boolean isConverged(int iteration, LongValue value) {
            return value.getValue() > 3L;
        }
    }
}

