/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.broadcast.input;

import com.google.common.base.Preconditions;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.library.broadcast.input.BroadcastShuffleManager;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.shuffle.common.DiskFetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInput;
import org.apache.tez.runtime.library.shuffle.common.FetchedInputAllocator;
import org.apache.tez.runtime.library.shuffle.common.MemoryFetchedInput;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;

public class BroadcastShuffleInputEventHandler {
    private static final Log LOG = LogFactory.getLog(BroadcastShuffleInputEventHandler.class);
    private final BroadcastShuffleManager shuffleManager;
    private final FetchedInputAllocator inputAllocator;
    private final CompressionCodec codec;
    private final boolean ifileReadAhead;
    private final int ifileReadAheadLength;

    public BroadcastShuffleInputEventHandler(TezInputContext inputContext, BroadcastShuffleManager shuffleManager, FetchedInputAllocator inputAllocator, CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength) {
        this.shuffleManager = shuffleManager;
        this.inputAllocator = inputAllocator;
        this.codec = codec;
        this.ifileReadAhead = ifileReadAhead;
        this.ifileReadAheadLength = ifileReadAheadLength;
    }

    public void handleEvents(List<Event> events) throws IOException {
        for (Event event : events) {
            this.handleEvent(event);
        }
    }

    private void handleEvent(Event event) throws IOException {
        if (event instanceof DataMovementEvent) {
            this.processDataMovementEvent((DataMovementEvent)event);
        } else if (event instanceof InputFailedEvent) {
            this.processInputFailedEvent((InputFailedEvent)event);
        } else {
            throw new TezUncheckedException("Unexpected event type: " + event.getClass().getName());
        }
    }

    private void processDataMovementEvent(DataMovementEvent dme) throws IOException {
        ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload;
        Preconditions.checkArgument((dme.getSourceIndex() == 0 ? 1 : 0) != 0, (Object)("Unexpected srcIndex: " + dme.getSourceIndex() + " on DataMovementEvent. Can only be 0"));
        try {
            shufflePayload = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(dme.getUserPayload());
        }
        catch (InvalidProtocolBufferException e) {
            throw new TezUncheckedException("Unable to parse DataMovementEvent payload", (Throwable)e);
        }
        LOG.info((Object)("Processing DataMovementEvent with srcIndex: " + dme.getSourceIndex() + ", targetIndex: " + dme.getTargetIndex() + ", attemptNum: " + dme.getVersion() + ", payload: " + this.stringify(shufflePayload)));
        if (shufflePayload.getOutputGenerated()) {
            InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion(), shufflePayload.getPathComponent());
            if (shufflePayload.hasData()) {
                ShuffleUserPayloads.DataProto dataProto = shufflePayload.getData();
                FetchedInput fetchedInput = this.inputAllocator.allocate(dataProto.getRawLength(), dataProto.getCompressedLength(), srcAttemptIdentifier);
                this.moveDataToFetchedInput(dataProto, fetchedInput);
                this.shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
            } else {
                this.shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, 0);
            }
        } else {
            this.shuffleManager.addCompletedInputWithNoData(new InputAttemptIdentifier(dme.getTargetIndex(), dme.getVersion()));
        }
    }

    private void moveDataToFetchedInput(ShuffleUserPayloads.DataProto dataProto, FetchedInput fetchedInput) throws IOException {
        switch (fetchedInput.getType()) {
            case DISK: {
                ShuffleUtils.shuffleToDisk((DiskFetchedInput)fetchedInput, dataProto.getData().newInput(), dataProto.getCompressedLength(), LOG);
                break;
            }
            case MEMORY: {
                ShuffleUtils.shuffleToMemory((MemoryFetchedInput)fetchedInput, dataProto.getData().newInput(), dataProto.getRawLength(), dataProto.getCompressedLength(), this.codec, this.ifileReadAhead, this.ifileReadAheadLength, LOG);
                break;
            }
            default: {
                throw new TezUncheckedException("Unexpected type: " + (Object)((Object)fetchedInput.getType()));
            }
        }
    }

    private void processInputFailedEvent(InputFailedEvent ife) {
        InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion());
        this.shuffleManager.obsoleteKnownInput(srcAttemptIdentifier);
    }

    private String stringify(ShuffleUserPayloads.DataMovementEventPayloadProto dmProto) {
        StringBuilder sb = new StringBuilder();
        sb.append("[");
        sb.append("outputGenerated: " + dmProto.getOutputGenerated()).append(", ");
        sb.append("host: " + dmProto.getHost()).append(", ");
        sb.append("port: " + dmProto.getPort()).append(", ");
        sb.append("pathComponent: " + dmProto.getPathComponent()).append(", ");
        sb.append("runDuration: " + dmProto.getRunDuration()).append(", ");
        sb.append("hasData: " + dmProto.hasData());
        return sb.toString();
    }
}

