/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.dag.impl;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
import org.apache.tez.dag.app.dag.event.VertexEventNullEdgeInitialized;
import org.apache.tez.dag.app.dag.impl.BroadcastEdgeManager;
import org.apache.tez.dag.app.dag.impl.OneToOneEdgeManager;
import org.apache.tez.dag.app.dag.impl.ScatterGatherEdgeManager;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TezEvent;

public class Edge {
    private static final Log LOG = LogFactory.getLog(Edge.class);
    private EdgeProperty edgeProperty;
    private EdgeManagerPluginContext edgeManagerContext;
    private EdgeManagerPlugin edgeManager;
    private EventHandler eventHandler;
    private AtomicBoolean bufferEvents = new AtomicBoolean(false);
    private List<TezEvent> destinationEventBuffer = new ArrayList<TezEvent>();
    private List<TezEvent> sourceEventBuffer = new ArrayList<TezEvent>();
    private Vertex sourceVertex;
    private Vertex destinationVertex;
    private EventMetaData destinationMetaInfo;

    public Edge(EdgeProperty edgeProperty, EventHandler eventHandler) {
        this.edgeProperty = edgeProperty;
        this.eventHandler = eventHandler;
        this.createEdgeManager();
    }

    private void createEdgeManager() {
        switch (this.edgeProperty.getDataMovementType()) {
            case ONE_TO_ONE: {
                this.edgeManagerContext = new EdgeManagerPluginContextImpl(UserPayload.create(null));
                this.edgeManager = new OneToOneEdgeManager(this.edgeManagerContext);
                break;
            }
            case BROADCAST: {
                this.edgeManagerContext = new EdgeManagerPluginContextImpl(UserPayload.create(null));
                this.edgeManager = new BroadcastEdgeManager(this.edgeManagerContext);
                break;
            }
            case SCATTER_GATHER: {
                this.edgeManagerContext = new EdgeManagerPluginContextImpl(UserPayload.create(null));
                this.edgeManager = new ScatterGatherEdgeManager(this.edgeManagerContext);
                break;
            }
            case CUSTOM: {
                if (this.edgeProperty.getEdgeManagerDescriptor() == null) break;
                UserPayload payload = null;
                payload = this.edgeProperty.getEdgeManagerDescriptor().getUserPayload() != null && this.edgeProperty.getEdgeManagerDescriptor().getUserPayload().hasPayload() ? this.edgeProperty.getEdgeManagerDescriptor().getUserPayload() : UserPayload.create(null);
                this.edgeManagerContext = new EdgeManagerPluginContextImpl(payload);
                String edgeManagerClassName = this.edgeProperty.getEdgeManagerDescriptor().getClassName();
                this.edgeManager = (EdgeManagerPlugin)ReflectionUtils.createClazzInstance((String)edgeManagerClassName, (Class[])new Class[]{EdgeManagerPluginContext.class}, (Object[])new Object[]{this.edgeManagerContext});
                break;
            }
            default: {
                String message = "Unknown edge data movement type: " + this.edgeProperty.getDataMovementType();
                throw new TezUncheckedException(message);
            }
        }
    }

    public void initialize() {
        if (this.edgeManager != null) {
            this.edgeManager.initialize();
        }
        this.destinationMetaInfo = new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, this.destinationVertex.getName(), this.sourceVertex.getName(), null);
    }

    public synchronized void setCustomEdgeManager(EdgeManagerPluginDescriptor descriptor) {
        EdgeProperty modifiedEdgeProperty;
        this.edgeProperty = modifiedEdgeProperty = EdgeProperty.create((EdgeManagerPluginDescriptor)descriptor, (EdgeProperty.DataSourceType)this.edgeProperty.getDataSourceType(), (EdgeProperty.SchedulingType)this.edgeProperty.getSchedulingType(), (OutputDescriptor)this.edgeProperty.getEdgeSource(), (InputDescriptor)this.edgeProperty.getEdgeDestination());
        boolean wasUnInitialized = this.edgeManager == null;
        this.createEdgeManager();
        this.initialize();
        if (wasUnInitialized) {
            this.sendEvent((Event)new VertexEventNullEdgeInitialized(this.sourceVertex.getVertexId(), this, this.destinationVertex));
            this.sendEvent((Event)new VertexEventNullEdgeInitialized(this.destinationVertex.getVertexId(), this, this.sourceVertex));
        }
    }

    public EdgeProperty getEdgeProperty() {
        return this.edgeProperty;
    }

    public EdgeManagerPlugin getEdgeManager() {
        return this.edgeManager;
    }

    public void setSourceVertex(Vertex sourceVertex) {
        if (this.sourceVertex != null && this.sourceVertex != sourceVertex) {
            throw new TezUncheckedException("Source vertex exists: " + sourceVertex.getName());
        }
        this.sourceVertex = sourceVertex;
    }

    public void setDestinationVertex(Vertex destinationVertex) {
        if (this.destinationVertex != null && this.destinationVertex != destinationVertex) {
            throw new TezUncheckedException("Destination vertex exists: " + destinationVertex.getName());
        }
        this.destinationVertex = destinationVertex;
    }

    public InputSpec getDestinationSpec(int destinationTaskIndex) {
        Preconditions.checkState((this.edgeManager != null ? 1 : 0) != 0, (Object)"Edge Manager must be initialized by this time");
        return new InputSpec(this.sourceVertex.getName(), this.edgeProperty.getEdgeDestination(), this.edgeManager.getNumDestinationTaskPhysicalInputs(destinationTaskIndex));
    }

    public OutputSpec getSourceSpec(int sourceTaskIndex) {
        Preconditions.checkState((this.edgeManager != null ? 1 : 0) != 0, (Object)"Edge Manager must be initialized by this time");
        return new OutputSpec(this.destinationVertex.getName(), this.edgeProperty.getEdgeSource(), this.edgeManager.getNumSourceTaskPhysicalOutputs(sourceTaskIndex));
    }

    public void startEventBuffering() {
        this.bufferEvents.set(true);
    }

    public void stopEventBuffering() {
        this.bufferEvents.set(false);
        for (TezEvent event : this.destinationEventBuffer) {
            this.sendTezEventToDestinationTasks(event);
        }
        this.destinationEventBuffer.clear();
        for (TezEvent event : this.sourceEventBuffer) {
            this.sendTezEventToSourceTasks(event);
        }
        this.sourceEventBuffer.clear();
    }

    public void sendTezEventToSourceTasks(TezEvent tezEvent) {
        Preconditions.checkState((this.edgeManager != null ? 1 : 0) != 0, (Object)"Edge Manager must be initialized by this time");
        if (!this.bufferEvents.get()) {
            switch (tezEvent.getEventType()) {
                case INPUT_READ_ERROR_EVENT: {
                    InputReadErrorEvent event = (InputReadErrorEvent)tezEvent.getEvent();
                    TezTaskAttemptID destAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
                    int destTaskIndex = destAttemptId.getTaskID().getId();
                    int srcTaskIndex = this.edgeManager.routeInputErrorEventToSource(event, destTaskIndex, event.getIndex());
                    int numConsumers = this.edgeManager.getNumDestinationConsumerTasks(srcTaskIndex);
                    Task srcTask = this.sourceVertex.getTask(srcTaskIndex);
                    if (srcTask == null) {
                        throw new TezUncheckedException("Unexpected null task. sourceVertex=" + this.sourceVertex.getVertexId() + " srcIndex = " + srcTaskIndex + " destAttemptId=" + destAttemptId + " destIndex=" + destTaskIndex + " edgeManager=" + this.edgeManager.getClass().getName());
                    }
                    TezTaskID srcTaskId = srcTask.getTaskId();
                    int taskAttemptIndex = event.getVersion();
                    TezTaskAttemptID srcTaskAttemptId = TezTaskAttemptID.getInstance((TezTaskID)srcTaskId, (int)taskAttemptIndex);
                    this.sendEvent((Event)new TaskAttemptEventOutputFailed(srcTaskAttemptId, tezEvent, numConsumers));
                    break;
                }
                default: {
                    throw new TezUncheckedException("Unhandled tez event type: " + tezEvent.getEventType());
                }
            }
        } else {
            this.sourceEventBuffer.add(tezEvent);
        }
    }

    private void handleCompositeDataMovementEvent(TezEvent tezEvent) {
        CompositeDataMovementEvent compEvent = (CompositeDataMovementEvent)tezEvent.getEvent();
        EventMetaData srcInfo = tezEvent.getSourceInfo();
        for (DataMovementEvent dmEvent : compEvent.getEvents()) {
            TezEvent newEvent = new TezEvent((org.apache.tez.runtime.api.Event)dmEvent, srcInfo);
            this.sendTezEventToDestinationTasks(newEvent);
        }
    }

    void sendDmEventOrIfEventToTasks(TezEvent tezEvent, int srcTaskIndex, boolean isDataMovementEvent, Map<Integer, List<Integer>> taskAndInputIndices) {
        Preconditions.checkState((this.edgeManager != null ? 1 : 0) != 0, (Object)"Edge Manager must be initialized by this time");
        org.apache.tez.runtime.api.Event event = tezEvent.getEvent();
        boolean isFirstEvent = true;
        HashMap inputIndicesWithEvents = Maps.newHashMap();
        for (Map.Entry<Integer, List<Integer>> entry : taskAndInputIndices.entrySet()) {
            int destTaskIndex = entry.getKey();
            List<Integer> inputIndices = entry.getValue();
            for (int i = 0; i < inputIndices.size(); ++i) {
                Task destTask;
                Integer inputIndex = inputIndices.get(i);
                TezEvent tezEventToSend = (TezEvent)inputIndicesWithEvents.get(inputIndex);
                if (tezEventToSend == null) {
                    if (isFirstEvent) {
                        isFirstEvent = false;
                        if (isDataMovementEvent) {
                            ((DataMovementEvent)event).setTargetIndex(inputIndex.intValue());
                        } else {
                            ((InputFailedEvent)event).setTargetIndex(inputIndex.intValue());
                        }
                        tezEventToSend = tezEvent;
                    } else {
                        InputFailedEvent e;
                        if (isDataMovementEvent) {
                            DataMovementEvent dmEvent = (DataMovementEvent)event;
                            e = DataMovementEvent.create((int)dmEvent.getSourceIndex(), (int)inputIndex, (int)dmEvent.getVersion(), (ByteBuffer)dmEvent.getUserPayload());
                        } else {
                            InputFailedEvent ifEvent = (InputFailedEvent)event;
                            e = InputFailedEvent.create((int)inputIndex, (int)ifEvent.getVersion());
                        }
                        tezEventToSend = new TezEvent((org.apache.tez.runtime.api.Event)e, tezEvent.getSourceInfo());
                    }
                    tezEventToSend.setDestinationInfo(this.destinationMetaInfo);
                    inputIndicesWithEvents.put(inputIndex, tezEventToSend);
                }
                if ((destTask = this.destinationVertex.getTask(destTaskIndex)) == null) {
                    throw new TezUncheckedException("Unexpected null task. sourceVertex=" + this.sourceVertex.getVertexId() + " srcTaskIndex = " + srcTaskIndex + " destVertex=" + this.destinationVertex.getVertexId() + " destTaskIndex=" + destTaskIndex + " destNumTasks=" + this.destinationVertex.getTotalTasks() + " edgeManager=" + this.edgeManager.getClass().getName());
                }
                this.sendEventToTask(destTask, tezEventToSend);
            }
        }
    }

    public void sendTezEventToDestinationTasks(TezEvent tezEvent) {
        block14: {
            block13: {
                Preconditions.checkState((this.edgeManager != null ? 1 : 0) != 0, (Object)"Edge Manager must be initialized by this time");
                if (this.bufferEvents.get()) break block13;
                boolean isDataMovementEvent = true;
                switch (tezEvent.getEventType()) {
                    case COMPOSITE_DATA_MOVEMENT_EVENT: {
                        this.handleCompositeDataMovementEvent(tezEvent);
                        break;
                    }
                    case INPUT_FAILED_EVENT: {
                        isDataMovementEvent = false;
                    }
                    case DATA_MOVEMENT_EVENT: {
                        HashMap destTaskAndInputIndices = Maps.newHashMap();
                        TezTaskAttemptID srcAttemptId = tezEvent.getSourceInfo().getTaskAttemptID();
                        int srcTaskIndex = srcAttemptId.getTaskID().getId();
                        boolean routingRequired = true;
                        if (this.edgeManagerContext.getDestinationVertexNumTasks() == 0) {
                            routingRequired = false;
                            LOG.info((Object)("Not routing events since destination vertex has 0 tasks" + this.generateCommonDebugString(srcTaskIndex, tezEvent)));
                        } else if (this.edgeManagerContext.getDestinationVertexNumTasks() < 0) {
                            throw new TezUncheckedException("Internal error. Not expected to route events to a destination until parallelism is determined" + this.generateCommonDebugString(srcTaskIndex, tezEvent));
                        }
                        if (routingRequired) {
                            if (isDataMovementEvent) {
                                DataMovementEvent dmEvent = (DataMovementEvent)tezEvent.getEvent();
                                this.edgeManager.routeDataMovementEventToDestination(dmEvent, srcTaskIndex, dmEvent.getSourceIndex(), (Map)destTaskAndInputIndices);
                            } else {
                                this.edgeManager.routeInputSourceTaskFailedEventToDestination(srcTaskIndex, (Map)destTaskAndInputIndices);
                            }
                        }
                        if (!destTaskAndInputIndices.isEmpty()) {
                            this.sendDmEventOrIfEventToTasks(tezEvent, srcTaskIndex, isDataMovementEvent, destTaskAndInputIndices);
                            break;
                        }
                        if (routingRequired) {
                            throw new TezUncheckedException("Event must be routed." + this.generateCommonDebugString(srcTaskIndex, tezEvent));
                        }
                        break block14;
                    }
                    default: {
                        throw new TezUncheckedException("Unhandled tez event type: " + tezEvent.getEventType());
                    }
                }
                break block14;
            }
            this.destinationEventBuffer.add(tezEvent);
        }
    }

    private void sendEventToTask(Task task, TezEvent tezEvent) {
        task.registerTezEvent(tezEvent);
    }

    private void sendEvent(Event event) {
        this.eventHandler.handle(event);
    }

    public String getSourceVertexName() {
        return this.sourceVertex.getName();
    }

    public String getDestinationVertexName() {
        return this.destinationVertex.getName();
    }

    private String generateCommonDebugString(int srcTaskIndex, TezEvent tezEvent) {
        return " sourceVertex=" + this.sourceVertex.getVertexId() + " srcIndex = " + srcTaskIndex + " destAttemptId=" + this.destinationVertex.getVertexId() + " edgeManager=" + this.edgeManager.getClass().getName() + " Event type=" + tezEvent.getEventType();
    }

    class EdgeManagerPluginContextImpl
    implements EdgeManagerPluginContext {
        private final UserPayload userPayload;

        EdgeManagerPluginContextImpl(UserPayload userPayload) {
            this.userPayload = userPayload;
        }

        public UserPayload getUserPayload() {
            return this.userPayload;
        }

        public String getSourceVertexName() {
            return Edge.this.sourceVertex.getName();
        }

        public String getDestinationVertexName() {
            return Edge.this.destinationVertex.getName();
        }

        public int getSourceVertexNumTasks() {
            return Edge.this.sourceVertex.getTotalTasks();
        }

        public int getDestinationVertexNumTasks() {
            return Edge.this.destinationVertex.getTotalTasks();
        }
    }
}

