/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleScheduler;
import org.apache.tez.runtime.library.shuffle.common.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;

public class ShuffleInputEventHandler {
    private static final Log LOG = LogFactory.getLog(ShuffleInputEventHandler.class);
    private final ShuffleScheduler scheduler;
    private final InputContext inputContext;
    private int maxMapRuntime = 0;
    private final boolean sslShuffle;

    public ShuffleInputEventHandler(InputContext inputContext, ShuffleScheduler scheduler, boolean sslShuffle) {
        this.inputContext = inputContext;
        this.scheduler = scheduler;
        this.sslShuffle = sslShuffle;
    }

    public void handleEvents(List<Event> events) throws IOException {
        for (Event event : events) {
            this.handleEvent(event);
        }
    }

    private void handleEvent(Event event) throws IOException {
        if (event instanceof DataMovementEvent) {
            this.processDataMovementEvent((DataMovementEvent)event);
        } else if (event instanceof InputFailedEvent) {
            this.processTaskFailedEvent((InputFailedEvent)event);
        }
    }

    private void processDataMovementEvent(DataMovementEvent dmEvent) throws IOException {
        ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload;
        try {
            shufflePayload = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom((ByteBuffer)dmEvent.getUserPayload()));
        }
        catch (InvalidProtocolBufferException e) {
            throw new TezUncheckedException("Unable to parse DataMovementEvent payload", (Throwable)e);
        }
        int partitionId = dmEvent.getSourceIndex();
        LOG.info((Object)("DataMovementEvent partitionId:" + partitionId + ", targetIndex: " + dmEvent.getTargetIndex() + ", attemptNum: " + dmEvent.getVersion() + ", payload: " + ShuffleUtils.stringify(shufflePayload)));
        int duration = shufflePayload.getRunDuration();
        if (duration > this.maxMapRuntime) {
            this.maxMapRuntime = duration;
            this.scheduler.informMaxMapRunTime(this.maxMapRuntime);
        }
        if (shufflePayload.hasEmptyPartitions()) {
            try {
                byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray((ByteString)shufflePayload.getEmptyPartitions());
                BitSet emptyPartitionsBitSet = TezUtilsInternal.fromByteArray((byte[])emptyPartitions);
                if (emptyPartitionsBitSet.get(partitionId)) {
                    InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion());
                    LOG.info((Object)("Source partition: " + partitionId + " did not generate any data. SrcAttempt: [" + srcAttemptIdentifier + "]. Not fetching."));
                    this.scheduler.copySucceeded(srcAttemptIdentifier, null, 0L, 0L, 0L, null);
                    return;
                }
            }
            catch (IOException e) {
                throw new TezUncheckedException("Unable to set the empty partition to succeeded", (Throwable)e);
            }
        }
        InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent());
        URI baseUri = this.getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
        this.scheduler.addKnownMapOutput(shufflePayload.getHost(), shufflePayload.getPort(), partitionId, baseUri.toString(), srcAttemptIdentifier);
    }

    private void processTaskFailedEvent(InputFailedEvent ifEvent) {
        InputAttemptIdentifier taIdentifier = new InputAttemptIdentifier(ifEvent.getTargetIndex(), ifEvent.getVersion());
        this.scheduler.obsoleteInput(taIdentifier);
        LOG.info((Object)("Obsoleting output of src-task: " + taIdentifier));
    }

    @VisibleForTesting
    URI getBaseURI(String host, int port, int partitionId) {
        StringBuilder sb = ShuffleUtils.constructBaseURIForShuffleHandler(host, port, partitionId, this.inputContext.getApplicationId().toString(), this.sslShuffle);
        URI u = URI.create(sb.toString());
        return u;
    }
}

