/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.testutils;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.ResettableDriver;
import org.apache.flink.runtime.operators.TaskContext;
import org.apache.flink.runtime.operators.sort.ExternalSorter;
import org.apache.flink.runtime.operators.sort.Sorter;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public abstract class BinaryOperatorTestBase<S extends Function, IN, OUT>
extends TestLogger
implements TaskContext<S, OUT> {
    protected static final int PAGE_SIZE = 32768;
    private final IOManager ioManager;
    private final MemoryManager memManager;
    private final List<MutableObjectIterator<IN>> inputs;
    private final List<TypeComparator<IN>> comparators;
    private final List<Sorter<IN>> sorters;
    private final AbstractInvokable owner;
    private final TaskConfig taskConfig;
    private final TaskManagerRuntimeInfo taskManageInfo;
    protected final long perSortMem;
    protected final double perSortFractionMem;
    private Collector<OUT> output;
    protected int numFileHandles;
    private S stub;
    private Driver<S, IN> driver;
    private volatile boolean running = true;
    private ExecutionConfig executionConfig;
    private List<TypeSerializer<IN>> inputSerializers = new ArrayList<TypeSerializer<IN>>();

    protected BinaryOperatorTestBase(ExecutionConfig executionConfig, long memory, int maxNumSorters, long perSortMemory) {
        if (memory < 0L || maxNumSorters < 0 || perSortMemory < 0L) {
            throw new IllegalArgumentException();
        }
        long totalMem = Math.max(memory, 0L) + (long)Math.max(maxNumSorters, 0) * perSortMemory;
        this.perSortMem = perSortMemory;
        this.perSortFractionMem = (double)perSortMemory / (double)totalMem;
        this.ioManager = new IOManagerAsync();
        this.memManager = totalMem > 0L ? MemoryManagerBuilder.newBuilder().setMemorySize(totalMem).build() : null;
        this.inputs = new ArrayList<MutableObjectIterator<IN>>();
        this.comparators = new ArrayList<TypeComparator<IN>>();
        this.sorters = new ArrayList<Sorter<IN>>();
        this.owner = new DummyInvokable();
        this.taskConfig = new TaskConfig(new Configuration());
        this.executionConfig = executionConfig;
        this.taskManageInfo = new TestingTaskManagerRuntimeInfo();
    }

    @Parameterized.Parameters
    public static Collection<Object[]> getConfigurations() throws IOException {
        LinkedList<Object[]> configs = new LinkedList<Object[]>();
        ExecutionConfig withReuse = new ExecutionConfig();
        withReuse.enableObjectReuse();
        ExecutionConfig withoutReuse = new ExecutionConfig();
        withoutReuse.disableObjectReuse();
        Object[] a = new Object[]{withoutReuse};
        configs.add(a);
        Object[] b = new Object[]{withReuse};
        configs.add(b);
        return configs;
    }

    public void addInput(MutableObjectIterator<IN> input, TypeSerializer<IN> serializer) {
        this.inputs.add(input);
        this.sorters.add(null);
        this.inputSerializers.add(serializer);
    }

    public void addInputSorted(MutableObjectIterator<IN> input, TypeSerializer<IN> serializer, TypeComparator<IN> comp) throws Exception {
        this.inputSerializers.add(serializer);
        ExternalSorter sorter = ExternalSorter.newBuilder((MemoryManager)this.memManager, (AbstractInvokable)this.owner, serializer, comp).maxNumFileHandles(32).enableSpilling(this.ioManager, (double)0.8f).memoryFraction(this.perSortFractionMem).objectReuse(false).largeRecords(true).build(input);
        this.sorters.add((Sorter<IN>)sorter);
        this.inputs.add(null);
    }

    public void addDriverComparator(TypeComparator<IN> comparator) {
        this.comparators.add(comparator);
    }

    public void setOutput(Collector<OUT> output) {
        this.output = output;
    }

    public void setOutput(List<OUT> output, TypeSerializer<OUT> outSerializer) {
        this.output = new ListOutputCollector<OUT>(output, outSerializer);
    }

    public int getNumFileHandlesForSort() {
        return this.numFileHandles;
    }

    public void setNumFileHandlesForSort(int numFileHandles) {
        this.numFileHandles = numFileHandles;
    }

    public void testDriver(Driver driver, Class stubClass) throws Exception {
        this.testDriverInternal(driver, stubClass);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void testDriverInternal(Driver driver, Class stubClass) throws Exception {
        this.driver = driver;
        driver.setup((TaskContext)this);
        this.stub = (Function)stubClass.newInstance();
        this.running = true;
        boolean stubOpen = false;
        try {
            try {
                driver.prepare();
            }
            catch (Throwable t) {
                throw new Exception("The data preparation caused an error: " + t.getMessage(), t);
            }
            try {
                FunctionUtils.openFunction(this.stub, (Configuration)this.getTaskConfig().getStubParameters());
                stubOpen = true;
            }
            catch (Throwable t) {
                throw new Exception("The user defined 'open()' method caused an exception: " + t.getMessage(), t);
            }
            if (!this.running) {
                return;
            }
            driver.run();
            if (this.running) {
                FunctionUtils.closeFunction(this.stub);
                stubOpen = false;
            }
            this.output.close();
        }
        catch (Exception ex) {
            if (stubOpen) {
                try {
                    FunctionUtils.closeFunction(this.stub);
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
            }
            if (this.driver instanceof ResettableDriver) {
                ResettableDriver resDriver = (ResettableDriver)this.driver;
                try {
                    resDriver.teardown();
                }
                catch (Throwable t) {
                    throw new Exception("Error while shutting down an iterative operator: " + t.getMessage(), t);
                }
            }
            if (this.running) {
                throw ex;
            }
        }
        finally {
            driver.cleanup();
        }
    }

    public void testResettableDriver(ResettableDriver driver, Class stubClass, int iterations) throws Exception {
        driver.setup((TaskContext)this);
        for (int i = 0; i < iterations; ++i) {
            if (i == 0) {
                driver.initialize();
            } else {
                driver.reset();
            }
            this.testDriver((Driver)driver, stubClass);
        }
        driver.teardown();
    }

    public void cancel() throws Exception {
        this.running = false;
        while (this.driver == null) {
            Thread.sleep(200L);
        }
        this.driver.cancel();
    }

    public TaskConfig getTaskConfig() {
        return this.taskConfig;
    }

    public TaskManagerRuntimeInfo getTaskManagerInfo() {
        return this.taskManageInfo;
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    public ClassLoader getUserCodeClassLoader() {
        return ((Object)((Object)this)).getClass().getClassLoader();
    }

    public IOManager getIOManager() {
        return this.ioManager;
    }

    public MemoryManager getMemoryManager() {
        return this.memManager;
    }

    public <X> MutableObjectIterator<X> getInput(int index) {
        MutableObjectIterator in = this.inputs.get(index);
        if (in == null) {
            try {
                in = this.sorters.get(index).getIterator();
            }
            catch (InterruptedException e) {
                throw new RuntimeException("Interrupted");
            }
            catch (IOException e) {
                throw new RuntimeException("IOException");
            }
            this.inputs.set(index, in);
        }
        MutableObjectIterator<IN> input = this.inputs.get(index);
        return input;
    }

    public <X> TypeSerializerFactory<X> getInputSerializer(int index) {
        TypeSerializer<IN> ser = this.inputSerializers.get(index);
        return new RuntimeSerializerFactory(ser, ser.createInstance().getClass());
    }

    public <X> TypeComparator<X> getDriverComparator(int index) {
        TypeComparator<IN> comparator = this.comparators.get(index);
        return comparator;
    }

    public S getStub() {
        return this.stub;
    }

    public Collector<OUT> getOutputCollector() {
        return this.output;
    }

    public AbstractInvokable getContainingTask() {
        return this.owner;
    }

    public String formatLogString(String message) {
        return "Driver Tester: " + message;
    }

    public OperatorMetricGroup getMetricGroup() {
        return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
    }

    @After
    public void shutdownAll() throws Exception {
        for (Sorter<IN> sorter : this.sorters) {
            if (sorter == null) continue;
            sorter.close();
        }
        this.sorters.clear();
        this.ioManager.close();
        MemoryManager memMan = this.getMemoryManager();
        if (memMan != null) {
            Assert.assertTrue((String)"Memory Manager managed memory was not completely freed.", (boolean)memMan.verifyEmpty());
            memMan.shutdown();
        }
    }

    public static final class CountingOutputCollector<OUT>
    implements Collector<OUT> {
        private int num;

        public void collect(OUT record) {
            ++this.num;
        }

        public void close() {
        }

        public int getNumberOfRecords() {
            return this.num;
        }
    }

    private static final class ListOutputCollector<OUT>
    implements Collector<OUT> {
        private final List<OUT> output;
        private final TypeSerializer<OUT> serializer;

        public ListOutputCollector(List<OUT> outputList, TypeSerializer<OUT> serializer) {
            this.output = outputList;
            this.serializer = serializer;
        }

        public void collect(OUT record) {
            this.output.add(this.serializer.copy(record));
        }

        public void close() {
        }
    }
}

