/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Inflater;
import org.apache.hadoop.util.StringInterner;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleScheduler;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShuffleInputEventHandlerOrderedGrouped
implements ShuffleEventHandler {
    private static final Logger LOG = LoggerFactory.getLogger(ShuffleInputEventHandlerOrderedGrouped.class);
    private final ShuffleScheduler scheduler;
    private final InputContext inputContext;
    private final boolean compositeFetch;
    private final Inflater inflater;
    private final AtomicInteger nextToLogEventCount = new AtomicInteger(0);
    private final AtomicInteger numDmeEvents = new AtomicInteger(0);
    private final AtomicInteger numObsoletionEvents = new AtomicInteger(0);
    private final AtomicInteger numDmeEventsNoData = new AtomicInteger(0);

    public ShuffleInputEventHandlerOrderedGrouped(InputContext inputContext, ShuffleScheduler scheduler, boolean compositeFetch) {
        this.inputContext = inputContext;
        this.scheduler = scheduler;
        this.compositeFetch = compositeFetch;
        this.inflater = TezCommonUtils.newInflater();
    }

    @Override
    public void handleEvents(List<Event> events) throws IOException {
        for (Event event : events) {
            this.handleEvent(event);
        }
    }

    @Override
    public void logProgress(boolean updateOnClose) {
        LOG.info(this.inputContext.getSourceVertexName() + ": " + "numDmeEventsSeen=" + this.numDmeEvents.get() + ", numDmeEventsSeenWithNoData=" + this.numDmeEventsNoData.get() + ", numObsoletionEventsSeen=" + this.numObsoletionEvents.get() + (updateOnClose ? ", updateOnClose" : ""));
    }

    private void handleEvent(Event event) throws IOException {
        if (event instanceof DataMovementEvent) {
            ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload;
            this.numDmeEvents.incrementAndGet();
            DataMovementEvent dmEvent = (DataMovementEvent)event;
            try {
                shufflePayload = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom((ByteBuffer)dmEvent.getUserPayload()));
            }
            catch (InvalidProtocolBufferException e) {
                throw new TezUncheckedException("Unable to parse DataMovementEvent payload", (Throwable)e);
            }
            BitSet emptyPartitionsBitSet = null;
            if (shufflePayload.hasEmptyPartitions()) {
                try {
                    byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray((ByteString)shufflePayload.getEmptyPartitions(), (Inflater)this.inflater);
                    emptyPartitionsBitSet = TezUtilsInternal.fromByteArray((byte[])emptyPartitions);
                }
                catch (IOException e) {
                    throw new TezUncheckedException("Unable to set the empty partition to succeeded", (Throwable)e);
                }
            }
            this.processDataMovementEvent(dmEvent, shufflePayload, emptyPartitionsBitSet);
            this.scheduler.updateEventReceivedTime();
        } else if (event instanceof CompositeRoutedDataMovementEvent) {
            ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload;
            CompositeRoutedDataMovementEvent crdme = (CompositeRoutedDataMovementEvent)event;
            try {
                shufflePayload = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom((ByteBuffer)crdme.getUserPayload()));
            }
            catch (InvalidProtocolBufferException e) {
                throw new TezUncheckedException("Unable to parse DataMovementEvent payload", (Throwable)e);
            }
            BitSet emptyPartitionsBitSet = null;
            if (shufflePayload.hasEmptyPartitions()) {
                try {
                    byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray((ByteString)shufflePayload.getEmptyPartitions(), (Inflater)this.inflater);
                    emptyPartitionsBitSet = TezUtilsInternal.fromByteArray((byte[])emptyPartitions);
                }
                catch (IOException e) {
                    throw new TezUncheckedException("Unable to set the empty partition to succeeded", (Throwable)e);
                }
            }
            if (this.compositeFetch) {
                this.numDmeEvents.addAndGet(crdme.getCount());
                this.processCompositeRoutedDataMovementEvent(crdme, shufflePayload, emptyPartitionsBitSet);
            } else {
                for (int offset = 0; offset < crdme.getCount(); ++offset) {
                    this.numDmeEvents.incrementAndGet();
                    this.processDataMovementEvent(crdme.expand(offset), shufflePayload, emptyPartitionsBitSet);
                }
            }
            this.scheduler.updateEventReceivedTime();
        } else if (event instanceof InputFailedEvent) {
            this.numObsoletionEvents.incrementAndGet();
            this.processTaskFailedEvent((InputFailedEvent)event);
        }
        if (this.numDmeEvents.get() + this.numObsoletionEvents.get() > this.nextToLogEventCount.get()) {
            this.logProgress(false);
            this.nextToLogEventCount.addAndGet(50);
        }
    }

    private void processDataMovementEvent(DataMovementEvent dmEvent, ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload, BitSet emptyPartitionsBitSet) throws IOException {
        int partitionId = dmEvent.getSourceIndex();
        CompositeInputAttemptIdentifier srcAttemptIdentifier = this.constructInputAttemptIdentifier(dmEvent.getTargetIndex(), 1, dmEvent.getVersion(), shufflePayload);
        if (LOG.isDebugEnabled()) {
            LOG.debug("DME srcIdx: " + partitionId + ", targetIdx: " + dmEvent.getTargetIndex() + ", attemptNum: " + dmEvent.getVersion() + ", payload: " + ShuffleUtils.stringify(shufflePayload));
        }
        if (shufflePayload.hasEmptyPartitions()) {
            try {
                if (emptyPartitionsBitSet.get(partitionId)) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Source partition: " + partitionId + " did not generate any data. SrcAttempt: [" + srcAttemptIdentifier + "]. Not fetching.");
                    }
                    this.numDmeEventsNoData.getAndIncrement();
                    this.scheduler.copySucceeded(srcAttemptIdentifier.expand(0), null, 0L, 0L, 0L, null, true);
                    return;
                }
            }
            catch (IOException e) {
                throw new TezUncheckedException("Unable to set the empty partition to succeeded", (Throwable)e);
            }
        }
        this.scheduler.addKnownMapOutput(StringInterner.weakIntern((String)shufflePayload.getHost()), shufflePayload.getPort(), partitionId, srcAttemptIdentifier);
    }

    private void processCompositeRoutedDataMovementEvent(CompositeRoutedDataMovementEvent crdmEvent, ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload, BitSet emptyPartitionsBitSet) throws IOException {
        int partitionId = crdmEvent.getSourceIndex();
        CompositeInputAttemptIdentifier compositeInputAttemptIdentifier = this.constructInputAttemptIdentifier(crdmEvent.getTargetIndex(), crdmEvent.getCount(), crdmEvent.getVersion(), shufflePayload);
        if (LOG.isDebugEnabled()) {
            LOG.debug("DME srcIdx: " + partitionId + ", targetIdx: " + crdmEvent.getTargetIndex() + ", count:" + crdmEvent.getCount() + ", attemptNum: " + crdmEvent.getVersion() + ", payload: " + ShuffleUtils.stringify(shufflePayload));
        }
        if (shufflePayload.hasEmptyPartitions()) {
            boolean allPartitionsEmpty = true;
            for (int i = 0; i < crdmEvent.getCount(); ++i) {
                int srcPartitionId = partitionId + i;
                allPartitionsEmpty &= emptyPartitionsBitSet.get(srcPartitionId);
                if (!emptyPartitionsBitSet.get(srcPartitionId)) continue;
                InputAttemptIdentifier srcInputAttemptIdentifier = compositeInputAttemptIdentifier.expand(i);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Source partition: " + srcPartitionId + " did not generate any data. SrcAttempt: [" + srcInputAttemptIdentifier + "]. Not fetching.");
                }
                this.numDmeEventsNoData.getAndIncrement();
                this.scheduler.copySucceeded(srcInputAttemptIdentifier, null, 0L, 0L, 0L, null, true);
            }
            if (allPartitionsEmpty) {
                return;
            }
        }
        this.scheduler.addKnownMapOutput(StringInterner.weakIntern((String)shufflePayload.getHost()), shufflePayload.getPort(), partitionId, compositeInputAttemptIdentifier);
    }

    private void processTaskFailedEvent(InputFailedEvent ifEvent) {
        InputAttemptIdentifier taIdentifier = new InputAttemptIdentifier(ifEvent.getTargetIndex(), ifEvent.getVersion());
        this.scheduler.obsoleteInput(taIdentifier);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Obsoleting output of src-task: " + taIdentifier);
        }
    }

    private CompositeInputAttemptIdentifier constructInputAttemptIdentifier(int targetIndex, int targetIndexCount, int version, ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload) {
        String pathComponent = shufflePayload.hasPathComponent() ? StringInterner.weakIntern((String)shufflePayload.getPathComponent()) : null;
        int spillEventId = shufflePayload.getSpillId();
        CompositeInputAttemptIdentifier srcAttemptIdentifier = null;
        if (shufflePayload.hasSpillId()) {
            boolean lastEvent = shufflePayload.getLastEvent();
            InputAttemptIdentifier.SPILL_INFO info = lastEvent ? InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE : InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE;
            srcAttemptIdentifier = new CompositeInputAttemptIdentifier(targetIndex, version, pathComponent, false, info, spillEventId, targetIndexCount);
        } else {
            srcAttemptIdentifier = new CompositeInputAttemptIdentifier(targetIndex, version, pathComponent, targetIndexCount);
        }
        return srcAttemptIdentifier;
    }
}

