/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle.impl;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.net.URI;
import java.util.BitSet;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TezInputContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleScheduler;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;

public class ShuffleInputEventHandler {
    private static final Log LOG = LogFactory.getLog(ShuffleInputEventHandler.class);
    private final ShuffleScheduler scheduler;
    private final TezInputContext inputContext;
    private int maxMapRuntime = 0;

    public ShuffleInputEventHandler(TezInputContext inputContext, ShuffleScheduler scheduler) {
        this.inputContext = inputContext;
        this.scheduler = scheduler;
    }

    public void handleEvents(List<Event> events) {
        for (Event event : events) {
            this.handleEvent(event);
        }
    }

    private void handleEvent(Event event) {
        if (event instanceof DataMovementEvent) {
            this.processDataMovementEvent((DataMovementEvent)event);
        } else if (event instanceof InputFailedEvent) {
            this.processTaskFailedEvent((InputFailedEvent)event);
        }
    }

    private void processDataMovementEvent(DataMovementEvent dmEvent) {
        ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload;
        try {
            shufflePayload = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(dmEvent.getUserPayload());
        }
        catch (InvalidProtocolBufferException e) {
            throw new TezUncheckedException("Unable to parse DataMovementEvent payload", (Throwable)e);
        }
        int partitionId = dmEvent.getSourceIndex();
        URI baseUri = this.getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
        InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent());
        LOG.info((Object)("DataMovementEvent baseUri:" + baseUri + ", src: " + srcAttemptIdentifier));
        int duration = shufflePayload.getRunDuration();
        if (duration > this.maxMapRuntime) {
            this.maxMapRuntime = duration;
            this.scheduler.informMaxMapRunTime(this.maxMapRuntime);
        }
        if (shufflePayload.hasEmptyPartitions()) {
            try {
                byte[] emptyPartitions = TezUtils.decompressByteStringToByteArray((ByteString)shufflePayload.getEmptyPartitions());
                BitSet emptyPartitionsBitSet = TezUtils.fromByteArray((byte[])emptyPartitions);
                if (emptyPartitionsBitSet.get(partitionId)) {
                    LOG.info((Object)("Source partition: " + partitionId + " did not generate any data. SrcAttempt: [" + srcAttemptIdentifier + "]. Not fetching."));
                    this.scheduler.copySucceeded(srcAttemptIdentifier, null, 0L, 0L, 0L, null);
                    return;
                }
            }
            catch (IOException e) {
                throw new TezUncheckedException("Unable to set the empty partition to succeeded", (Throwable)e);
            }
        }
        this.scheduler.addKnownMapOutput(shufflePayload.getHost(), shufflePayload.getPort(), partitionId, baseUri.toString(), srcAttemptIdentifier);
    }

    private void processTaskFailedEvent(InputFailedEvent ifEvent) {
        InputAttemptIdentifier taIdentifier = new InputAttemptIdentifier(ifEvent.getTargetIndex(), ifEvent.getVersion());
        this.scheduler.obsoleteInput(taIdentifier);
        LOG.info((Object)("Obsoleting output of src-task: " + taIdentifier));
    }

    private URI getBaseURI(String host, int port, int partitionId) {
        StringBuilder sb = new StringBuilder("http://");
        sb.append(host);
        sb.append(":");
        sb.append(String.valueOf(port));
        sb.append("/");
        sb.append("mapOutput?job=");
        sb.append(this.inputContext.getApplicationId().toString().replace("application", "job"));
        sb.append("&reduce=");
        sb.append(partitionId);
        sb.append("&map=");
        URI u = URI.create(sb.toString());
        return u;
    }
}

