/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.RuntimeTask;
import org.apache.tez.runtime.RuntimeUtils;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.LogicalIOProcessor;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.Output;
import org.apache.tez.runtime.api.Processor;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.api.TezOutputContext;
import org.apache.tez.runtime.api.TezProcessorContext;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezInputContextImpl;
import org.apache.tez.runtime.api.impl.TezOutputContextImpl;
import org.apache.tez.runtime.api.impl.TezProcessorContextImpl;
import org.apache.tez.runtime.api.impl.TezUmbilical;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
@InterfaceAudience.Private
public class LogicalIOProcessorRuntimeTask
extends RuntimeTask {
    private static final Log LOG = LogFactory.getLog(LogicalIOProcessorRuntimeTask.class);
    private final List<InputSpec> inputSpecs;
    private final ConcurrentHashMap<String, LogicalInput> inputsMap;
    private final ConcurrentHashMap<String, TezInputContext> inputContextMap;
    private final List<OutputSpec> outputSpecs;
    private final ConcurrentHashMap<String, LogicalOutput> outputsMap;
    private final ConcurrentHashMap<String, TezOutputContext> outputContextMap;
    private final ProcessorDescriptor processorDescriptor;
    private final LogicalIOProcessor processor;
    private TezProcessorContext processorContext;
    private final LinkedHashMap<String, LogicalInput> runInputMap;
    private final LinkedHashMap<String, LogicalOutput> runOutputMap;
    private final Map<String, ByteBuffer> serviceConsumerMetadata;
    private final ExecutorService initializerExecutor;
    private final CompletionService<Void> initializerCompletionService;
    private LinkedBlockingQueue<TezEvent> eventsToBeProcessed;
    private Thread eventRouterThread = null;
    private final int appAttemptNumber;

    public LogicalIOProcessorRuntimeTask(TaskSpec taskSpec, int appAttemptNumber, Configuration tezConf, TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata) throws IOException {
        super(taskSpec, tezConf, tezUmbilical);
        LOG.info((Object)("Initializing LogicalIOProcessorRuntimeTask with TaskSpec: " + taskSpec));
        int numInputs = taskSpec.getInputs().size();
        int numOutputs = taskSpec.getOutputs().size();
        this.inputSpecs = taskSpec.getInputs();
        this.inputsMap = new ConcurrentHashMap(numInputs);
        this.inputContextMap = new ConcurrentHashMap(numInputs);
        this.outputSpecs = taskSpec.getOutputs();
        this.outputsMap = new ConcurrentHashMap(numOutputs);
        this.outputContextMap = new ConcurrentHashMap(numOutputs);
        this.runInputMap = new LinkedHashMap();
        this.runOutputMap = new LinkedHashMap();
        this.processorDescriptor = taskSpec.getProcessorDescriptor();
        this.processor = this.createProcessor(this.processorDescriptor);
        this.serviceConsumerMetadata = serviceConsumerMetadata;
        this.eventsToBeProcessed = new LinkedBlockingQueue();
        this.state = RuntimeTask.State.NEW;
        this.appAttemptNumber = appAttemptNumber;
        int numInitializers = numInputs + numOutputs;
        this.initializerExecutor = Executors.newFixedThreadPool(numInitializers, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Initializer %d").build());
        this.initializerCompletionService = new ExecutorCompletionService<Void>(this.initializerExecutor);
    }

    public void initialize() throws Exception {
        LOG.info((Object)"Initializing LogicalProcessorIORuntimeTask");
        Preconditions.checkState((this.state == RuntimeTask.State.NEW ? 1 : 0) != 0, (Object)"Already initialized");
        this.state = RuntimeTask.State.INITED;
        int numTasks = 0;
        for (InputSpec inputSpec : this.taskSpec.getInputs()) {
            this.initializerCompletionService.submit(new InitializeInputCallable(inputSpec));
            ++numTasks;
        }
        for (OutputSpec outputSpec : this.taskSpec.getOutputs()) {
            this.initializerCompletionService.submit(new InitializeOutputCallable(outputSpec));
            ++numTasks;
        }
        this.initializerExecutor.shutdown();
        this.initializeLogicalIOProcessor();
        for (int completedTasks = 0; completedTasks < numTasks; ++completedTasks) {
            LOG.info((Object)("Waiting for " + (numTasks - completedTasks) + " initializers to finish"));
            Future<Void> future = this.initializerCompletionService.take();
            try {
                future.get();
                continue;
            }
            catch (ExecutionException e) {
                if (e.getCause() instanceof Exception) {
                    throw (Exception)e.getCause();
                }
                throw new Exception(e);
            }
        }
        LOG.info((Object)"All initializers finished");
        for (InputSpec inputSpec : this.inputSpecs) {
            LogicalInput input = this.inputsMap.get(inputSpec.getSourceVertexName());
            this.runInputMap.put(inputSpec.getSourceVertexName(), input);
        }
        for (OutputSpec outputSpec : this.outputSpecs) {
            LogicalOutput output = this.outputsMap.get(outputSpec.getDestinationVertexName());
            this.runOutputMap.put(outputSpec.getDestinationVertexName(), output);
        }
        this.startRouterThread();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() throws Exception {
        RuntimeTask.State state = this.state;
        synchronized (state) {
            Preconditions.checkState((this.state == RuntimeTask.State.INITED ? 1 : 0) != 0, (Object)("Can only run while in INITED state. Current: " + (Object)((Object)this.state)));
            this.state = RuntimeTask.State.RUNNING;
        }
        LogicalIOProcessor lioProcessor = this.processor;
        lioProcessor.run(this.runInputMap, this.runOutputMap);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() throws Exception {
        try {
            Preconditions.checkState((this.state == RuntimeTask.State.RUNNING ? 1 : 0) != 0, (Object)("Can only run while in RUNNING state. Current: " + (Object)((Object)this.state)));
            this.state = RuntimeTask.State.CLOSED;
            for (InputSpec inputSpec : this.inputSpecs) {
                String srcVertexName = inputSpec.getSourceVertexName();
                List closeInputEvents = this.inputsMap.get(srcVertexName).close();
                this.sendTaskGeneratedEvents(closeInputEvents, EventMetaData.EventProducerConsumerType.INPUT, this.taskSpec.getVertexName(), srcVertexName, this.taskSpec.getTaskAttemptID());
            }
            this.processor.close();
            for (OutputSpec outputSpec : this.outputSpecs) {
                String destVertexName = outputSpec.getDestinationVertexName();
                List closeOutputEvents = this.outputsMap.get(destVertexName).close();
                this.sendTaskGeneratedEvents(closeOutputEvents, EventMetaData.EventProducerConsumerType.OUTPUT, this.taskSpec.getVertexName(), destVertexName, this.taskSpec.getTaskAttemptID());
            }
        }
        finally {
            this.setTaskDone();
            if (this.eventRouterThread != null) {
                this.eventRouterThread.interrupt();
            }
        }
    }

    private void initializeLogicalIOProcessor() throws Exception {
        TezProcessorContext processorContext;
        LOG.info((Object)("Initializing processor, processorClassName=" + this.processorDescriptor.getClassName()));
        this.processorContext = processorContext = this.createProcessorContext();
        this.processor.initialize(processorContext);
        LOG.info((Object)("Initialized processor, processorClassName=" + this.processorDescriptor.getClassName()));
    }

    private TezInputContext createInputContext(InputSpec inputSpec) {
        TezInputContextImpl inputContext = new TezInputContextImpl(this.tezConf, this.appAttemptNumber, this.tezUmbilical, this.taskSpec.getVertexName(), inputSpec.getSourceVertexName(), this.taskSpec.getTaskAttemptID(), this.tezCounters, inputSpec.getInputDescriptor().getUserPayload() == null ? this.taskSpec.getProcessorDescriptor().getUserPayload() : inputSpec.getInputDescriptor().getUserPayload(), this, this.serviceConsumerMetadata, System.getenv());
        return inputContext;
    }

    private TezOutputContext createOutputContext(OutputSpec outputSpec) {
        TezOutputContextImpl outputContext = new TezOutputContextImpl(this.tezConf, this.appAttemptNumber, this.tezUmbilical, this.taskSpec.getVertexName(), outputSpec.getDestinationVertexName(), this.taskSpec.getTaskAttemptID(), this.tezCounters, outputSpec.getOutputDescriptor().getUserPayload() == null ? this.taskSpec.getProcessorDescriptor().getUserPayload() : outputSpec.getOutputDescriptor().getUserPayload(), this, this.serviceConsumerMetadata, System.getenv());
        return outputContext;
    }

    private TezProcessorContext createProcessorContext() {
        TezProcessorContextImpl processorContext = new TezProcessorContextImpl(this.tezConf, this.appAttemptNumber, this.tezUmbilical, this.taskSpec.getVertexName(), this.taskSpec.getTaskAttemptID(), this.tezCounters, this.processorDescriptor.getUserPayload(), this, this.serviceConsumerMetadata, System.getenv());
        return processorContext;
    }

    private LogicalInput createInput(InputSpec inputSpec) {
        LOG.info((Object)"Creating Input");
        Input input = (Input)RuntimeUtils.createClazzInstance(inputSpec.getInputDescriptor().getClassName());
        if (!(input instanceof LogicalInput)) {
            throw new TezUncheckedException(input.getClass().getName() + " is not a sub-type of LogicalInput." + " Only LogicalInput sub-types supported by LogicalIOProcessor.");
        }
        return (LogicalInput)input;
    }

    private LogicalOutput createOutput(OutputSpec outputSpec) {
        LOG.info((Object)"Creating Output");
        Output output = (Output)RuntimeUtils.createClazzInstance(outputSpec.getOutputDescriptor().getClassName());
        if (!(output instanceof LogicalOutput)) {
            throw new TezUncheckedException(output.getClass().getName() + " is not a sub-type of LogicalOutput." + " Only LogicalOutput sub-types supported by LogicalIOProcessor.");
        }
        return (LogicalOutput)output;
    }

    private LogicalIOProcessor createProcessor(ProcessorDescriptor processorDescriptor) {
        Processor processor = (Processor)RuntimeUtils.createClazzInstance(processorDescriptor.getClassName());
        if (!(processor instanceof LogicalIOProcessor)) {
            throw new TezUncheckedException(processor.getClass().getName() + " is not a sub-type of LogicalIOProcessor." + " Only LogicalIOProcessor sub-types supported by LogicalIOProcessorRuntimeTask.");
        }
        return (LogicalIOProcessor)processor;
    }

    private void sendTaskGeneratedEvents(List<Event> events, EventMetaData.EventProducerConsumerType generator, String taskVertexName, String edgeVertexName, TezTaskAttemptID taskAttemptID) {
        if (events == null || events.isEmpty()) {
            return;
        }
        EventMetaData eventMetaData = new EventMetaData(generator, taskVertexName, edgeVertexName, taskAttemptID);
        ArrayList<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
        for (Event event : events) {
            TezEvent te = new TezEvent(event, eventMetaData);
            tezEvents.add(te);
        }
        if (LOG.isDebugEnabled()) {
            for (TezEvent tezEvent : tezEvents) {
                LOG.debug((Object)("Generated event info, eventMetaData=" + eventMetaData.toString() + ", eventType=" + (Object)((Object)tezEvent.getEventType())));
            }
        }
        this.tezUmbilical.addEvents(tezEvents);
    }

    private boolean handleEvent(TezEvent e) {
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Handling TezEvent in task, taskAttemptId=" + this.taskSpec.getTaskAttemptID() + ", eventType=" + (Object)((Object)e.getEventType()) + ", eventSourceInfo=" + e.getSourceInfo() + ", eventDestinationInfo=" + e.getDestinationInfo()));
        }
        try {
            switch (e.getDestinationInfo().getEventGenerator()) {
                case INPUT: {
                    LogicalInput input = this.inputsMap.get(e.getDestinationInfo().getEdgeVertexName());
                    if (input != null) {
                        input.handleEvents(Collections.singletonList(e.getEvent()));
                        break;
                    }
                    throw new TezUncheckedException("Unhandled event for invalid target: " + e);
                }
                case OUTPUT: {
                    LogicalOutput output = this.outputsMap.get(e.getDestinationInfo().getEdgeVertexName());
                    if (output != null) {
                        output.handleEvents(Collections.singletonList(e.getEvent()));
                        break;
                    }
                    throw new TezUncheckedException("Unhandled event for invalid target: " + e);
                }
                case PROCESSOR: {
                    this.processor.handleEvents(Collections.singletonList(e.getEvent()));
                    break;
                }
                case SYSTEM: {
                    LOG.warn((Object)("Trying to send a System event in a Task: " + e));
                }
            }
        }
        catch (Throwable t) {
            LOG.warn((Object)"Failed to handle event", t);
            this.setFatalError(t, "Failed to handle event");
            EventMetaData sourceInfo = new EventMetaData(e.getDestinationInfo().getEventGenerator(), this.taskSpec.getVertexName(), e.getDestinationInfo().getEdgeVertexName(), this.getTaskAttemptID());
            this.tezUmbilical.signalFatalError(this.getTaskAttemptID(), StringUtils.stringifyException((Throwable)t), sourceInfo);
            return false;
        }
        return true;
    }

    @Override
    public synchronized void handleEvents(Collection<TezEvent> events) {
        if (events == null || events.isEmpty()) {
            return;
        }
        this.eventCounter.addAndGet(events.size());
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Received events to be processed by task, taskAttemptId=" + this.taskSpec.getTaskAttemptID() + ", eventCount=" + events.size() + ", newEventCounter=" + this.eventCounter.get()));
        }
        this.eventsToBeProcessed.addAll(events);
    }

    private void startRouterThread() {
        this.eventRouterThread = new Thread(new Runnable(){

            public void run() {
                while (!LogicalIOProcessorRuntimeTask.this.isTaskDone() && !Thread.currentThread().isInterrupted()) {
                    try {
                        TezEvent e = (TezEvent)LogicalIOProcessorRuntimeTask.this.eventsToBeProcessed.take();
                        if (e == null || LogicalIOProcessorRuntimeTask.this.handleEvent(e)) continue;
                        LOG.warn((Object)("Stopping Event Router thread as failed to handle event: " + e));
                        return;
                    }
                    catch (InterruptedException e) {
                        if (!LogicalIOProcessorRuntimeTask.this.isTaskDone()) {
                            LOG.warn((Object)"Event Router thread interrupted. Returning.");
                        }
                        return;
                    }
                }
            }
        });
        this.eventRouterThread.setName("TezTaskEventRouter[" + this.taskSpec.getTaskAttemptID().toString() + "]");
        this.eventRouterThread.start();
    }

    public synchronized void cleanup() {
        this.setTaskDone();
        if (this.eventRouterThread != null) {
            this.eventRouterThread.interrupt();
        }
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public Collection<TezInputContext> getInputContexts() {
        return this.inputContextMap.values();
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public Collection<TezOutputContext> getOutputContexts() {
        return this.outputContextMap.values();
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public TezProcessorContext getProcessorContext() {
        return this.processorContext;
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public LogicalIOProcessor getProcessor() {
        return this.processor;
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private class InitializeOutputCallable
    implements Callable<Void> {
        private final OutputSpec outputSpec;

        public InitializeOutputCallable(OutputSpec outputSpec) {
            this.outputSpec = outputSpec;
        }

        @Override
        public Void call() throws Exception {
            LOG.info((Object)("Initializing Output using OutputSpec: " + this.outputSpec));
            String edgeName = this.outputSpec.getDestinationVertexName();
            LogicalOutput output = LogicalIOProcessorRuntimeTask.this.createOutput(this.outputSpec);
            TezOutputContext outputContext = LogicalIOProcessorRuntimeTask.this.createOutputContext(this.outputSpec);
            LogicalIOProcessorRuntimeTask.this.outputsMap.put(edgeName, output);
            LogicalIOProcessorRuntimeTask.this.outputContextMap.put(edgeName, outputContext);
            if (output instanceof LogicalOutput) {
                output.setNumPhysicalOutputs(this.outputSpec.getPhysicalEdgeCount());
            }
            LOG.info((Object)("Initializing Input with dest edge: " + edgeName));
            List events = output.initialize(outputContext);
            LogicalIOProcessorRuntimeTask.this.sendTaskGeneratedEvents(events, EventMetaData.EventProducerConsumerType.OUTPUT, outputContext.getTaskVertexName(), outputContext.getDestinationVertexName(), LogicalIOProcessorRuntimeTask.this.taskSpec.getTaskAttemptID());
            LOG.info((Object)("Initialized Output with dest edge: " + edgeName));
            return null;
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    private class InitializeInputCallable
    implements Callable<Void> {
        private final InputSpec inputSpec;

        public InitializeInputCallable(InputSpec inputSpec) {
            this.inputSpec = inputSpec;
        }

        @Override
        public Void call() throws Exception {
            LOG.info((Object)("Initializing Input using InputSpec: " + this.inputSpec));
            String edgeName = this.inputSpec.getSourceVertexName();
            LogicalInput input = LogicalIOProcessorRuntimeTask.this.createInput(this.inputSpec);
            TezInputContext inputContext = LogicalIOProcessorRuntimeTask.this.createInputContext(this.inputSpec);
            LogicalIOProcessorRuntimeTask.this.inputsMap.put(edgeName, input);
            LogicalIOProcessorRuntimeTask.this.inputContextMap.put(edgeName, inputContext);
            if (input instanceof LogicalInput) {
                input.setNumPhysicalInputs(this.inputSpec.getPhysicalEdgeCount());
            }
            LOG.info((Object)("Initializing Input with src edge: " + edgeName));
            List events = input.initialize(inputContext);
            LogicalIOProcessorRuntimeTask.this.sendTaskGeneratedEvents(events, EventMetaData.EventProducerConsumerType.INPUT, inputContext.getTaskVertexName(), inputContext.getSourceVertexName(), LogicalIOProcessorRuntimeTask.this.taskSpec.getTaskAttemptID());
            LOG.info((Object)("Initialized Input with src edge: " + edgeName));
            return null;
        }
    }
}

