/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.shuffle.impl;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.BitSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.Inflater;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.library.common.CompositeInputAttemptIdentifier;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.DiskFetchedInput;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
import org.apache.tez.runtime.library.common.shuffle.MemoryFetchedInput;
import org.apache.tez.runtime.library.common.shuffle.ShuffleEventHandler;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.impl.ShuffleManager;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ShuffleInputEventHandlerImpl
implements ShuffleEventHandler {
    private static final Logger LOG = LoggerFactory.getLogger(ShuffleInputEventHandlerImpl.class);
    private final ShuffleManager shuffleManager;
    private final FetchedInputAllocator inputAllocator;
    private final CompressionCodec codec;
    private final boolean ifileReadAhead;
    private final int ifileReadAheadLength;
    private final boolean useSharedInputs;
    private final InputContext inputContext;
    private final boolean compositeFetch;
    private final Inflater inflater;
    private final AtomicInteger nextToLogEventCount = new AtomicInteger(0);
    private final AtomicInteger numDmeEvents = new AtomicInteger(0);
    private final AtomicInteger numObsoletionEvents = new AtomicInteger(0);
    private final AtomicInteger numDmeEventsNoData = new AtomicInteger(0);

    public ShuffleInputEventHandlerImpl(InputContext inputContext, ShuffleManager shuffleManager, FetchedInputAllocator inputAllocator, CompressionCodec codec, boolean ifileReadAhead, int ifileReadAheadLength, boolean compositeFetch) {
        this.inputContext = inputContext;
        this.shuffleManager = shuffleManager;
        this.inputAllocator = inputAllocator;
        this.codec = codec;
        this.ifileReadAhead = ifileReadAhead;
        this.ifileReadAheadLength = ifileReadAheadLength;
        this.useSharedInputs = inputContext.getTaskAttemptNumber() == 0;
        this.compositeFetch = compositeFetch;
        this.inflater = TezCommonUtils.newInflater();
    }

    @Override
    public void handleEvents(List<Event> events) throws IOException {
        for (Event event : events) {
            this.handleEvent(event);
        }
    }

    private void handleEvent(Event event) throws IOException {
        if (event instanceof DataMovementEvent) {
            ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload;
            this.numDmeEvents.incrementAndGet();
            DataMovementEvent dmEvent = (DataMovementEvent)event;
            try {
                shufflePayload = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom((ByteBuffer)dmEvent.getUserPayload()));
            }
            catch (InvalidProtocolBufferException e) {
                throw new TezUncheckedException("Unable to parse DataMovementEvent payload", (Throwable)e);
            }
            BitSet emptyPartitionsBitSet = null;
            if (shufflePayload.hasEmptyPartitions()) {
                try {
                    byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray((ByteString)shufflePayload.getEmptyPartitions(), (Inflater)this.inflater);
                    emptyPartitionsBitSet = TezUtilsInternal.fromByteArray((byte[])emptyPartitions);
                }
                catch (IOException e) {
                    throw new TezUncheckedException("Unable to set the empty partition to succeeded", (Throwable)e);
                }
            }
            this.processDataMovementEvent(dmEvent, shufflePayload, emptyPartitionsBitSet);
            this.shuffleManager.updateEventReceivedTime();
        } else if (event instanceof CompositeRoutedDataMovementEvent) {
            ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload;
            CompositeRoutedDataMovementEvent crdme = (CompositeRoutedDataMovementEvent)event;
            try {
                shufflePayload = ShuffleUserPayloads.DataMovementEventPayloadProto.parseFrom(ByteString.copyFrom((ByteBuffer)crdme.getUserPayload()));
            }
            catch (InvalidProtocolBufferException e) {
                throw new TezUncheckedException("Unable to parse DataMovementEvent payload", (Throwable)e);
            }
            BitSet emptyPartitionsBitSet = null;
            if (shufflePayload.hasEmptyPartitions()) {
                try {
                    byte[] emptyPartitions = TezCommonUtils.decompressByteStringToByteArray((ByteString)shufflePayload.getEmptyPartitions(), (Inflater)this.inflater);
                    emptyPartitionsBitSet = TezUtilsInternal.fromByteArray((byte[])emptyPartitions);
                }
                catch (IOException e) {
                    throw new TezUncheckedException("Unable to set the empty partition to succeeded", (Throwable)e);
                }
            }
            if (this.compositeFetch) {
                this.numDmeEvents.addAndGet(crdme.getCount());
                this.processCompositeRoutedDataMovementEvent(crdme, shufflePayload, emptyPartitionsBitSet);
            } else {
                for (int offset = 0; offset < crdme.getCount(); ++offset) {
                    this.numDmeEvents.incrementAndGet();
                    this.processDataMovementEvent(crdme.expand(offset), shufflePayload, emptyPartitionsBitSet);
                }
            }
            this.shuffleManager.updateEventReceivedTime();
        } else if (event instanceof InputFailedEvent) {
            this.numObsoletionEvents.incrementAndGet();
            this.processInputFailedEvent((InputFailedEvent)event);
        } else {
            throw new TezUncheckedException("Unexpected event type: " + event.getClass().getName());
        }
        if (this.numDmeEvents.get() + this.numObsoletionEvents.get() > this.nextToLogEventCount.get()) {
            this.logProgress(false);
            this.nextToLogEventCount.addAndGet(50);
        }
    }

    @Override
    public void logProgress(boolean updateOnClose) {
        LOG.info(this.inputContext.getSourceVertexName() + ": numDmeEventsSeen=" + this.numDmeEvents.get() + ", numDmeEventsSeenWithNoData=" + this.numDmeEventsNoData.get() + ", numObsoletionEventsSeen=" + this.numObsoletionEvents.get() + (updateOnClose ? ", updateOnClose" : ""));
    }

    private void processDataMovementEvent(DataMovementEvent dme, ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload, BitSet emptyPartitionsBitSet) throws IOException {
        int srcIndex = dme.getSourceIndex();
        String hostIdentifier = shufflePayload.getHost() + ":" + shufflePayload.getPort();
        if (LOG.isDebugEnabled()) {
            LOG.debug("DME srcIdx: " + srcIndex + ", targetIndex: " + dme.getTargetIndex() + ", attemptNum: " + dme.getVersion() + ", payload: " + ShuffleUtils.stringify(shufflePayload));
        }
        if (shufflePayload.hasEmptyPartitions()) {
            if (emptyPartitionsBitSet.get(srcIndex)) {
                CompositeInputAttemptIdentifier srcAttemptIdentifier = this.constructInputAttemptIdentifier(dme.getTargetIndex(), 1, dme.getVersion(), shufflePayload, false);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Source partition: " + srcIndex + " did not generate any data. SrcAttempt: [" + srcAttemptIdentifier + "]. Not fetching.");
                }
                this.numDmeEventsNoData.getAndIncrement();
                this.shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier.expand(0));
                return;
            }
            this.shuffleManager.updateApproximateInputRecords(shufflePayload.getNumRecord());
        } else {
            this.shuffleManager.updateApproximateInputRecords(shufflePayload.getNumRecord());
        }
        CompositeInputAttemptIdentifier srcAttemptIdentifier = this.constructInputAttemptIdentifier(dme.getTargetIndex(), 1, dme.getVersion(), shufflePayload, this.useSharedInputs && srcIndex == 0);
        if (shufflePayload.hasData()) {
            ShuffleUserPayloads.DataProto dataProto = shufflePayload.getData();
            FetchedInput fetchedInput = this.inputAllocator.allocate(dataProto.getRawLength(), dataProto.getCompressedLength(), srcAttemptIdentifier);
            this.moveDataToFetchedInput(dataProto, fetchedInput, hostIdentifier);
            this.shuffleManager.addCompletedInputWithData(srcAttemptIdentifier, fetchedInput);
            LOG.debug("Payload via DME : " + srcAttemptIdentifier);
        } else {
            this.shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, srcIndex);
        }
    }

    private void moveDataToFetchedInput(ShuffleUserPayloads.DataProto dataProto, FetchedInput fetchedInput, String hostIdentifier) throws IOException {
        switch (fetchedInput.getType()) {
            case DISK: {
                ShuffleUtils.shuffleToDisk(((DiskFetchedInput)fetchedInput).getOutputStream(), hostIdentifier, dataProto.getData().newInput(), dataProto.getCompressedLength(), dataProto.getUncompressedLength(), LOG, fetchedInput.getInputAttemptIdentifier(), this.ifileReadAhead, this.ifileReadAheadLength, true);
                break;
            }
            case MEMORY: {
                ShuffleUtils.shuffleToMemory(((MemoryFetchedInput)fetchedInput).getBytes(), dataProto.getData().newInput(), dataProto.getRawLength(), dataProto.getCompressedLength(), this.codec, this.ifileReadAhead, this.ifileReadAheadLength, LOG, fetchedInput.getInputAttemptIdentifier());
                break;
            }
            default: {
                throw new TezUncheckedException("Unexpected type: " + (Object)((Object)fetchedInput.getType()));
            }
        }
    }

    private void processCompositeRoutedDataMovementEvent(CompositeRoutedDataMovementEvent crdme, ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload, BitSet emptyPartitionsBitSet) throws IOException {
        int partitionId = crdme.getSourceIndex();
        if (LOG.isDebugEnabled()) {
            LOG.debug("DME srcIdx: " + partitionId + ", targetIndex: " + crdme.getTargetIndex() + ", count:" + crdme.getCount() + ", attemptNum: " + crdme.getVersion() + ", payload: " + ShuffleUtils.stringify(shufflePayload));
        }
        if (shufflePayload.hasEmptyPartitions()) {
            CompositeInputAttemptIdentifier compositeInputAttemptIdentifier = this.constructInputAttemptIdentifier(crdme.getTargetIndex(), crdme.getCount(), crdme.getVersion(), shufflePayload, false);
            boolean allPartitionsEmpty = true;
            for (int i = 0; i < crdme.getCount(); ++i) {
                int srcPartitionId = partitionId + i;
                allPartitionsEmpty &= emptyPartitionsBitSet.get(srcPartitionId);
                if (!emptyPartitionsBitSet.get(srcPartitionId)) continue;
                InputAttemptIdentifier srcAttemptIdentifier = compositeInputAttemptIdentifier.expand(i);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Source partition: " + srcPartitionId + " did not generate any data. SrcAttempt: [" + srcAttemptIdentifier + "]. Not fetching.");
                }
                this.numDmeEventsNoData.getAndIncrement();
                this.shuffleManager.addCompletedInputWithNoData(srcAttemptIdentifier);
            }
            if (allPartitionsEmpty) {
                return;
            }
        }
        CompositeInputAttemptIdentifier srcAttemptIdentifier = this.constructInputAttemptIdentifier(crdme.getTargetIndex(), crdme.getCount(), crdme.getVersion(), shufflePayload, this.useSharedInputs && partitionId == 0);
        this.shuffleManager.addKnownInput(shufflePayload.getHost(), shufflePayload.getPort(), srcAttemptIdentifier, partitionId);
    }

    private void processInputFailedEvent(InputFailedEvent ife) {
        InputAttemptIdentifier srcAttemptIdentifier = new InputAttemptIdentifier(ife.getTargetIndex(), ife.getVersion());
        this.shuffleManager.obsoleteKnownInput(srcAttemptIdentifier);
    }

    private CompositeInputAttemptIdentifier constructInputAttemptIdentifier(int targetIndex, int targetIndexCount, int version, ShuffleUserPayloads.DataMovementEventPayloadProto shufflePayload, boolean isShared) {
        String pathComponent = shufflePayload.hasPathComponent() ? shufflePayload.getPathComponent() : null;
        CompositeInputAttemptIdentifier srcAttemptIdentifier = null;
        if (shufflePayload.hasSpillId()) {
            int spillEventId = shufflePayload.getSpillId();
            boolean lastEvent = shufflePayload.getLastEvent();
            InputAttemptIdentifier.SPILL_INFO spillInfo = lastEvent ? InputAttemptIdentifier.SPILL_INFO.FINAL_UPDATE : InputAttemptIdentifier.SPILL_INFO.INCREMENTAL_UPDATE;
            srcAttemptIdentifier = new CompositeInputAttemptIdentifier(targetIndex, version, pathComponent, isShared, spillInfo, spillEventId, targetIndexCount);
        } else {
            srcAttemptIdentifier = new CompositeInputAttemptIdentifier(targetIndex, version, pathComponent, isShared, targetIndexCount);
        }
        return srcAttemptIdentifier;
    }
}

