/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.iterative.task;

import java.io.IOException;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.Future;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobInfo;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.aggregators.LongSumAggregator;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
import org.apache.flink.api.common.operators.util.JoinHashMap;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
import org.apache.flink.runtime.io.network.api.reader.MutableReader;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
import org.apache.flink.runtime.iterative.concurrent.Broker;
import org.apache.flink.runtime.iterative.concurrent.IterationAggregatorBroker;
import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
import org.apache.flink.runtime.iterative.io.SolutionSetObjectsUpdateOutputCollector;
import org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollector;
import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
import org.apache.flink.runtime.iterative.task.RuntimeAggregatorRegistry;
import org.apache.flink.runtime.iterative.task.Terminable;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.ResettableDriver;
import org.apache.flink.runtime.operators.hash.CompactingHashTable;
import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.types.Value;
import org.apache.flink.util.Collector;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.UserCodeClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractIterativeTask<S extends Function, OT>
extends BatchTask<S, OT>
implements Terminable {
    private static final Logger log = LoggerFactory.getLogger(AbstractIterativeTask.class);
    protected LongSumAggregator worksetAggregator;
    protected BlockingBackChannel worksetBackChannel;
    protected boolean isWorksetIteration;
    protected boolean isWorksetUpdate;
    protected boolean isSolutionSetUpdate;
    private RuntimeAggregatorRegistry iterationAggregators;
    private String brokerKey;
    private int superstepNum = 1;
    private volatile boolean terminationRequested;

    public AbstractIterativeTask(Environment environment) {
        super(environment);
    }

    @Override
    protected void initialize() throws Exception {
        super.initialize();
        if (this.driver instanceof ResettableDriver) {
            ResettableDriver resDriver = (ResettableDriver)this.driver;
            for (int i = 0; i < resDriver.getNumberOfInputs(); ++i) {
                if (!resDriver.isInputResettable(i)) continue;
                this.excludeFromReset(i);
            }
        }
        TaskConfig config = this.getLastTasksConfig();
        this.isWorksetIteration = config.getIsWorksetIteration();
        this.isWorksetUpdate = config.getIsWorksetUpdate();
        this.isSolutionSetUpdate = config.getIsSolutionSetUpdate();
        if (this.isWorksetUpdate) {
            this.worksetBackChannel = BlockingBackChannelBroker.instance().getAndRemove(this.brokerKey());
            if (this.isWorksetIteration) {
                this.worksetAggregator = (LongSumAggregator)this.getIterationAggregators().getAggregator("pact.runtime.workset-empty-aggregator");
                if (this.worksetAggregator == null) {
                    throw new RuntimeException("Missing workset elements count aggregator.");
                }
            }
        }
    }

    @Override
    public void run() throws Exception {
        String name;
        if (this.inFirstIteration()) {
            if (this.driver instanceof ResettableDriver) {
                ((ResettableDriver)this.driver).initialize();
            }
        } else {
            this.reinstantiateDriver();
            this.resetAllInputs();
            for (int i : this.iterativeBroadcastInputs) {
                name = this.getTaskConfig().getBroadcastInputName(i);
                this.readAndSetBroadcastInput(i, name, this.runtimeUdfContext, this.superstepNum);
            }
        }
        super.run();
        for (int i : this.iterativeBroadcastInputs) {
            name = this.getTaskConfig().getBroadcastInputName(i);
            this.releaseBroadcastVariables(name, this.superstepNum, this.runtimeUdfContext);
        }
    }

    @Override
    protected void closeLocalStrategiesAndCaches() {
        try {
            super.closeLocalStrategiesAndCaches();
        }
        finally {
            if (this.driver instanceof ResettableDriver) {
                ResettableDriver resDriver = (ResettableDriver)this.driver;
                try {
                    resDriver.teardown();
                }
                catch (Throwable t) {
                    log.error("Error while shutting down an iterative operator.", t);
                }
            }
        }
    }

    @Override
    public DistributedRuntimeUDFContext createRuntimeContext(OperatorMetricGroup metrics) {
        Environment env = this.getEnvironment();
        return new IterativeRuntimeUdfContext(env.getJobInfo(), env.getTaskInfo(), env.getUserCodeClassLoader(), this.getExecutionConfig(), env.getDistributedCacheEntries(), this.accumulatorMap, metrics, env.getExternalResourceInfoProvider());
    }

    protected boolean inFirstIteration() {
        return this.superstepNum == 1;
    }

    protected int currentIteration() {
        return this.superstepNum;
    }

    protected void incrementIterationCounter() {
        ++this.superstepNum;
    }

    public String brokerKey() {
        if (this.brokerKey == null) {
            int iterationId = this.config.getIterationId();
            this.brokerKey = this.getEnvironment().getJobID().toString() + '#' + iterationId + '#' + this.getEnvironment().getTaskInfo().getIndexOfThisSubtask();
        }
        return this.brokerKey;
    }

    private void reinstantiateDriver() throws Exception {
        if (this.driver instanceof ResettableDriver) {
            ResettableDriver resDriver = (ResettableDriver)this.driver;
            resDriver.reset();
        } else {
            Class driverClass = this.config.getDriver();
            this.driver = InstantiationUtil.instantiate(driverClass, Driver.class);
            try {
                this.driver.setup(this);
            }
            catch (Throwable t) {
                throw new Exception("The pact driver setup for '" + this.getEnvironment().getTaskInfo().getTaskName() + "' , caused an error: " + t.getMessage(), t);
            }
        }
    }

    public RuntimeAggregatorRegistry getIterationAggregators() {
        if (this.iterationAggregators == null) {
            this.iterationAggregators = (RuntimeAggregatorRegistry)IterationAggregatorBroker.instance().get(this.brokerKey());
        }
        return this.iterationAggregators;
    }

    protected void verifyEndOfSuperstepState() throws IOException {
        MutableReader reader;
        if (this.iterativeInputs.length == 0 && this.iterativeBroadcastInputs.length == 0) {
            throw new IllegalStateException("Error: Iterative task without a single iterative input.");
        }
        for (int inputNum : this.iterativeInputs) {
            reader = this.inputReaders[inputNum];
            if (reader.isFinished()) continue;
            if (reader.hasReachedEndOfSuperstep()) {
                reader.startNextSuperstep();
                continue;
            }
            MutableObjectIterator inIter = this.inputIterators[inputNum];
            Object o = this.inputSerializers[inputNum].getSerializer().createInstance();
            while ((o = inIter.next(o)) != null) {
            }
            if (reader.isFinished()) continue;
            reader.startNextSuperstep();
        }
        for (int inputNum : this.iterativeBroadcastInputs) {
            reader = this.broadcastInputReaders[inputNum];
            if (reader.isFinished()) continue;
            if (!reader.hasReachedEndOfSuperstep()) {
                throw new IllegalStateException("An iterative broadcast input has not been fully consumed.");
            }
            reader.startNextSuperstep();
        }
    }

    @Override
    public boolean terminationRequested() {
        return this.terminationRequested;
    }

    @Override
    public void requestTermination() {
        this.terminationRequested = true;
    }

    @Override
    public void cancel() throws Exception {
        this.requestTermination();
        super.cancel();
    }

    protected Collector<OT> createWorksetUpdateOutputCollector(Collector<OT> delegate) {
        DataOutputView outputView = this.worksetBackChannel.getWriteEnd();
        TypeSerializer<OT> serializer = this.getOutputSerializer();
        return new WorksetUpdateOutputCollector<OT>(outputView, serializer, delegate);
    }

    protected Collector<OT> createWorksetUpdateOutputCollector() {
        return this.createWorksetUpdateOutputCollector(null);
    }

    protected Collector<OT> createSolutionSetUpdateOutputCollector(Collector<OT> delegate) {
        Broker<Object> solutionSetBroker = SolutionSetBroker.instance();
        Object ss = solutionSetBroker.get(this.brokerKey());
        if (ss instanceof CompactingHashTable) {
            CompactingHashTable solutionSet = (CompactingHashTable)ss;
            return new SolutionSetUpdateOutputCollector<OT>(solutionSet, delegate);
        }
        if (ss instanceof JoinHashMap) {
            JoinHashMap map = (JoinHashMap)((Object)ss);
            return new SolutionSetObjectsUpdateOutputCollector<OT>(map, delegate);
        }
        throw new RuntimeException("Unrecognized solution set handle: " + ss);
    }

    private TypeSerializer<OT> getOutputSerializer() {
        TypeSerializerFactory serializerFactory = this.getLastTasksConfig().getOutputSerializer(this.getUserCodeClassLoader());
        if (serializerFactory == null) {
            throw new RuntimeException("Missing output serializer for workset update.");
        }
        return serializerFactory.getSerializer();
    }

    private class IterativeRuntimeUdfContext
    extends DistributedRuntimeUDFContext
    implements IterationRuntimeContext {
        public IterativeRuntimeUdfContext(JobInfo jobInfo, TaskInfo taskInfo, UserCodeClassLoader userCodeClassLoader, ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks, Map<String, Accumulator<?, ?>> accumulatorMap, OperatorMetricGroup metrics, ExternalResourceInfoProvider externalResourceInfoProvider) {
            super(jobInfo, taskInfo, userCodeClassLoader, executionConfig, cpTasks, accumulatorMap, metrics, externalResourceInfoProvider);
        }

        @Override
        public int getSuperstepNumber() {
            return AbstractIterativeTask.this.superstepNum;
        }

        @Override
        public <T extends Aggregator<?>> T getIterationAggregator(String name) {
            return AbstractIterativeTask.this.getIterationAggregators().getAggregator(name);
        }

        @Override
        public <T extends Value> T getPreviousIterationAggregate(String name) {
            return (T)AbstractIterativeTask.this.getIterationAggregators().getPreviousGlobalAggregate(name);
        }

        @Override
        public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> newAccumulator) {
            if (AbstractIterativeTask.this.inFirstIteration()) {
                super.addAccumulator(name, newAccumulator);
            }
        }
    }
}

