/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.CleanupWhenUnsuccessful;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.MutableReader;
import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
import org.apache.flink.runtime.operators.util.CloseableInputProvider;
import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
import org.apache.flink.runtime.operators.util.ReaderIterator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataSinkTask<IT>
extends AbstractInvokable {
    private static final Logger LOG = LoggerFactory.getLogger(DataSinkTask.class);
    private volatile OutputFormat<IT> format;
    private MutableReader<?> inputReader;
    private MutableObjectIterator<IT> reader;
    private TypeSerializerFactory<IT> inputTypeSerializerFactory;
    private CloseableInputProvider<IT> localStrategy;
    private TaskConfig config;
    private volatile boolean taskCanceled;
    private volatile boolean cleanupCalled;

    public DataSinkTask(Environment environment) {
        super(environment);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    @Override
    public void invoke() throws Exception {
        block55: {
            block53: {
                OutputFormat<IT> format;
                MutableObjectIterator<IT> input;
                TypeSerializer serializer;
                boolean objectReuseEnabled;
                SimpleCounter numRecordsIn;
                block51: {
                    block52: {
                        MutableObjectIterator<IT> input1;
                        SimpleCounter tmpNumRecordsIn;
                        LOG.debug(this.getLogString("Start registering input and output"));
                        this.initOutputFormat();
                        try {
                            this.initInputReaders();
                        }
                        catch (Exception e) {
                            throw new RuntimeException("Initializing the input streams failed" + (e.getMessage() == null ? "." : ": " + e.getMessage()), e);
                        }
                        LOG.debug(this.getLogString("Finished registering input and output"));
                        LOG.debug(this.getLogString("Starting data sink operator"));
                        DistributedRuntimeUDFContext ctx = this.createRuntimeContext();
                        try {
                            OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup)ctx.getMetricGroup()).getIOMetricGroup();
                            ioMetricGroup.reuseInputMetricsForTask();
                            ioMetricGroup.reuseOutputMetricsForTask();
                            tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
                        }
                        catch (Exception e) {
                            LOG.warn("An exception occurred during the metrics setup.", (Throwable)e);
                            tmpNumRecordsIn = new SimpleCounter();
                        }
                        numRecordsIn = tmpNumRecordsIn;
                        if (RichOutputFormat.class.isAssignableFrom(this.format.getClass())) {
                            ((RichOutputFormat)this.format).setRuntimeContext((RuntimeContext)ctx);
                            LOG.debug(this.getLogString("Rich Sink detected. Initializing runtime context."));
                        }
                        ExecutionConfig executionConfig = this.getExecutionConfig();
                        objectReuseEnabled = executionConfig.isObjectReuseEnabled();
                        switch (this.config.getInputLocalStrategy(0)) {
                            case NONE: {
                                this.localStrategy = null;
                                input1 = this.reader;
                                break;
                            }
                            case SORT: {
                                try {
                                    TypeComparatorFactory compFact = this.config.getInputComparator(0, this.getUserCodeClassLoader());
                                    if (compFact == null) {
                                        throw new Exception("Missing comparator factory for local strategy on input 0");
                                    }
                                    UnilateralSortMerger<IT> sorter = new UnilateralSortMerger<IT>(this.getEnvironment().getMemoryManager(), this.getEnvironment().getIOManager(), this.reader, this, this.inputTypeSerializerFactory, compFact.createComparator(), this.config.getRelativeMemoryInput(0), this.config.getFilehandlesInput(0), this.config.getSpillingThresholdInput(0), this.config.getUseLargeRecordHandler(), this.getExecutionConfig().isObjectReuseEnabled());
                                    this.localStrategy = sorter;
                                    input1 = sorter.getIterator();
                                    break;
                                }
                                catch (Exception e) {
                                    throw new RuntimeException("Initializing the input processing failed" + (e.getMessage() == null ? "." : ": " + e.getMessage()), e);
                                }
                            }
                            default: {
                                throw new RuntimeException("Invalid local strategy for DataSinkTask");
                            }
                        }
                        serializer = this.inputTypeSerializerFactory.getSerializer();
                        input = input1;
                        format = this.format;
                        if (!this.taskCanceled) break block51;
                        if (this.format == null) break block52;
                        try {
                            this.format.close();
                        }
                        catch (Throwable t) {
                            if (!LOG.isWarnEnabled()) break block52;
                            LOG.warn(this.getLogString("Error closing the output format"), t);
                        }
                    }
                    if (this.localStrategy != null) {
                        try {
                            this.localStrategy.close();
                        }
                        catch (Throwable t) {
                            LOG.error("Error closing local strategy", t);
                        }
                    }
                    BatchTask.clearReaders(new MutableReader[]{this.inputReader});
                    return;
                }
                LOG.debug(this.getLogString("Starting to produce output"));
                format.open(this.getEnvironment().getTaskInfo().getIndexOfThisSubtask(), this.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks());
                if (objectReuseEnabled) {
                    Object record = serializer.createInstance();
                    while (!this.taskCanceled && (record = input.next(record)) != null) {
                        numRecordsIn.inc();
                        format.writeRecord(record);
                    }
                } else {
                    Object record;
                    while (!this.taskCanceled && (record = input.next()) != null) {
                        numRecordsIn.inc();
                        format.writeRecord(record);
                    }
                }
                if (!this.taskCanceled) {
                    this.format.close();
                    this.format = null;
                }
                if (this.format == null) break block53;
                try {
                    this.format.close();
                }
                catch (Throwable t) {
                    if (!LOG.isWarnEnabled()) break block53;
                    LOG.warn(this.getLogString("Error closing the output format"), t);
                }
            }
            if (this.localStrategy != null) {
                try {
                    this.localStrategy.close();
                }
                catch (Throwable t) {
                    LOG.error("Error closing local strategy", t);
                }
            }
            BatchTask.clearReaders(new MutableReader[]{this.inputReader});
            break block55;
            catch (Exception ex) {
                block54: {
                    try {
                        try {
                            if (!this.cleanupCalled && this.format instanceof CleanupWhenUnsuccessful) {
                                this.cleanupCalled = true;
                                ((CleanupWhenUnsuccessful)this.format).tryCleanupOnError();
                            }
                        }
                        catch (Throwable t) {
                            LOG.error("Cleanup on error failed.", t);
                        }
                        ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
                        if (ex instanceof CancelTaskException) {
                            throw ex;
                        }
                        if (!this.taskCanceled) {
                            if (LOG.isErrorEnabled()) {
                                LOG.error(this.getLogString("Error in user code: " + ex.getMessage()), (Throwable)ex);
                            }
                            throw ex;
                        }
                        if (this.format == null) break block54;
                    }
                    catch (Throwable throwable) {
                        block56: {
                            if (this.format != null) {
                                try {
                                    this.format.close();
                                }
                                catch (Throwable t) {
                                    if (!LOG.isWarnEnabled()) break block56;
                                    LOG.warn(this.getLogString("Error closing the output format"), t);
                                }
                            }
                        }
                        if (this.localStrategy != null) {
                            try {
                                this.localStrategy.close();
                            }
                            catch (Throwable t) {
                                LOG.error("Error closing local strategy", t);
                            }
                        }
                        BatchTask.clearReaders(new MutableReader[]{this.inputReader});
                        throw throwable;
                    }
                    try {
                        this.format.close();
                    }
                    catch (Throwable t) {
                        if (!LOG.isWarnEnabled()) break block54;
                        LOG.warn(this.getLogString("Error closing the output format"), t);
                    }
                }
                if (this.localStrategy != null) {
                    try {
                        this.localStrategy.close();
                    }
                    catch (Throwable t) {
                        LOG.error("Error closing local strategy", t);
                    }
                }
                BatchTask.clearReaders(new MutableReader[]{this.inputReader});
            }
        }
        if (!this.taskCanceled) {
            LOG.debug(this.getLogString("Finished data sink operator"));
        } else {
            LOG.debug(this.getLogString("Data sink operator cancelled"));
        }
    }

    @Override
    public void cancel() throws Exception {
        this.taskCanceled = true;
        OutputFormat<IT> format = this.format;
        if (format != null) {
            try {
                this.format.close();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            try {
                if (!this.cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
                    this.cleanupCalled = true;
                    ((CleanupWhenUnsuccessful)format).tryCleanupOnError();
                }
            }
            catch (Throwable t) {
                LOG.error("Cleanup on error failed.", t);
            }
        }
        LOG.debug(this.getLogString("Cancelling data sink operator"));
    }

    private void initOutputFormat() {
        ClassLoader userCodeClassLoader = this.getUserCodeClassLoader();
        Configuration taskConf = this.getTaskConfiguration();
        this.config = new TaskConfig(taskConf);
        try {
            this.format = (OutputFormat)this.config.getStubWrapper(userCodeClassLoader).getUserCodeObject(OutputFormat.class, userCodeClassLoader);
            if (!OutputFormat.class.isAssignableFrom(this.format.getClass())) {
                throw new RuntimeException("The class '" + this.format.getClass().getName() + "' is not a subclass of '" + OutputFormat.class.getName() + "' as is required.");
            }
        }
        catch (ClassCastException ccex) {
            throw new RuntimeException("The stub class is not a proper subclass of " + OutputFormat.class.getName(), ccex);
        }
        Thread thread = Thread.currentThread();
        ClassLoader original = thread.getContextClassLoader();
        try {
            thread.setContextClassLoader(userCodeClassLoader);
            this.format.configure(this.config.getStubParameters());
        }
        catch (Throwable t) {
            throw new RuntimeException("The user defined 'configure()' method in the Output Format caused an error: " + t.getMessage(), t);
        }
        finally {
            thread.setContextClassLoader(original);
        }
    }

    private void initInputReaders() throws Exception {
        int numGates = 0;
        int groupSize = this.config.getGroupSize(0);
        numGates += groupSize;
        if (groupSize == 1) {
            this.inputReader = new MutableRecordReader(this.getEnvironment().getInputGate(0), this.getEnvironment().getTaskManagerInfo().getTmpDirectories());
        } else if (groupSize > 1) {
            this.inputReader = new MutableRecordReader(new UnionInputGate(this.getEnvironment().getAllInputGates()), this.getEnvironment().getTaskManagerInfo().getTmpDirectories());
        } else {
            throw new Exception("Illegal input group size in task configuration: " + groupSize);
        }
        this.inputTypeSerializerFactory = this.config.getInputSerializer(0, this.getUserCodeClassLoader());
        ReaderIterator<IT> iter = new ReaderIterator<IT>(this.inputReader, this.inputTypeSerializerFactory.getSerializer());
        this.reader = iter;
        if (numGates != this.config.getNumInputs()) {
            throw new Exception("Illegal configuration: Number of input gates and group sizes are not consistent.");
        }
    }

    private String getLogString(String message) {
        return BatchTask.constructLogString(message, this.getEnvironment().getTaskInfo().getTaskName(), this);
    }

    public DistributedRuntimeUDFContext createRuntimeContext() {
        Environment env = this.getEnvironment();
        return new DistributedRuntimeUDFContext(env.getTaskInfo(), this.getUserCodeClassLoader(), this.getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(), this.getEnvironment().getMetricGroup().getOrAddOperator(this.getEnvironment().getTaskInfo().getTaskName()));
    }
}

