/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Timer;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.PullingAsyncDataInput;
import org.apache.flink.runtime.io.network.api.EndOfData;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.RecoveryMetadata;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.PrioritizedDeque;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.GateNotificationHelper;
import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RecoveredInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteChannelStateChecker;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.UnknownInputChannel;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageInputChannelId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyServiceImpl;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerClient;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.throughput.BufferDebloater;
import org.apache.flink.runtime.throughput.ThroughputCalculator;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SupplierWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SingleInputGate
extends IndexedInputGate {
    private static final Logger LOG = LoggerFactory.getLogger(SingleInputGate.class);
    private final Object requestLock = new Object();
    private final String owningTaskName;
    private final int gateIndex;
    private final IntermediateDataSetID consumedResultId;
    private final ResultPartitionType consumedPartitionType;
    private final int numberOfInputChannels;
    private final Map<IntermediateResultPartitionID, Map<InputChannelInfo, InputChannel>> inputChannels;
    @GuardedBy(value="requestLock")
    private final InputChannel[] channels;
    private final PrioritizedDeque<InputChannel> inputChannelsWithData = new PrioritizedDeque();
    @GuardedBy(value="inputChannelsWithData")
    private final BitSet enqueuedInputChannelsWithData;
    @GuardedBy(value="inputChannelsWithData")
    private final BitSet channelsWithEndOfPartitionEvents;
    @GuardedBy(value="inputChannelsWithData")
    private final BitSet channelsWithEndOfUserRecords;
    @GuardedBy(value="inputChannelsWithData")
    private int[] lastPrioritySequenceNumber;
    private final PartitionProducerStateProvider partitionProducerStateProvider;
    private BufferPool bufferPool;
    private boolean hasReceivedAllEndOfPartitionEvents;
    private boolean hasReceivedEndOfData;
    private boolean requestedPartitionsFlag;
    private final List<TaskEvent> pendingEvents = new ArrayList<TaskEvent>();
    private int numberOfUninitializedChannels;
    private Timer retriggerLocalRequestTimer;
    private final SupplierWithException<BufferPool, IOException> bufferPoolFactory;
    private final CompletableFuture<Void> closeFuture;
    @Nullable
    private final BufferDecompressor bufferDecompressor;
    private final MemorySegmentProvider memorySegmentProvider;
    private final MemorySegment unpooledSegment;
    private final ThroughputCalculator throughputCalculator;
    private final BufferDebloater bufferDebloater;
    private boolean shouldDrainOnEndOfData = true;
    @Nullable
    private TieredStorageConsumerClient tieredStorageConsumerClient;
    @Nullable
    private List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs;
    @Nullable
    private AvailabilityNotifier availabilityNotifier;
    private final Map<Integer, Tuple2<Boolean, Integer>> lastBufferStatusMapInTieredStore = new HashMap<Integer, Tuple2<Boolean, Integer>>();
    private final int[] endOfDatas;
    private final int[] endOfPartitions;

    public SingleInputGate(String owningTaskName, int gateIndex, IntermediateDataSetID consumedResultId, ResultPartitionType consumedPartitionType, int numberOfInputChannels, PartitionProducerStateProvider partitionProducerStateProvider, SupplierWithException<BufferPool, IOException> bufferPoolFactory, @Nullable BufferDecompressor bufferDecompressor, MemorySegmentProvider memorySegmentProvider, int segmentSize, ThroughputCalculator throughputCalculator, @Nullable BufferDebloater bufferDebloater) {
        this.owningTaskName = Preconditions.checkNotNull(owningTaskName);
        Preconditions.checkArgument(0 <= gateIndex, "The gate index must be positive.");
        this.gateIndex = gateIndex;
        this.consumedResultId = Preconditions.checkNotNull(consumedResultId);
        this.consumedPartitionType = Preconditions.checkNotNull(consumedPartitionType);
        this.bufferPoolFactory = Preconditions.checkNotNull(bufferPoolFactory);
        Preconditions.checkArgument(numberOfInputChannels > 0);
        this.numberOfInputChannels = numberOfInputChannels;
        this.inputChannels = CollectionUtil.newHashMapWithExpectedSize(numberOfInputChannels);
        this.channels = new InputChannel[numberOfInputChannels];
        this.channelsWithEndOfPartitionEvents = new BitSet(numberOfInputChannels);
        this.channelsWithEndOfUserRecords = new BitSet(numberOfInputChannels);
        this.enqueuedInputChannelsWithData = new BitSet(numberOfInputChannels);
        this.lastPrioritySequenceNumber = new int[numberOfInputChannels];
        Arrays.fill(this.lastPrioritySequenceNumber, Integer.MIN_VALUE);
        this.partitionProducerStateProvider = Preconditions.checkNotNull(partitionProducerStateProvider);
        this.bufferDecompressor = bufferDecompressor;
        this.memorySegmentProvider = Preconditions.checkNotNull(memorySegmentProvider);
        this.closeFuture = new CompletableFuture();
        this.unpooledSegment = MemorySegmentFactory.allocateUnpooledSegment(segmentSize);
        this.bufferDebloater = bufferDebloater;
        this.throughputCalculator = Preconditions.checkNotNull(throughputCalculator);
        this.tieredStorageConsumerClient = null;
        this.tieredStorageConsumerSpecs = null;
        this.availabilityNotifier = null;
        this.endOfDatas = new int[numberOfInputChannels];
        Arrays.fill(this.endOfDatas, 0);
        this.endOfPartitions = new int[numberOfInputChannels];
        Arrays.fill(this.endOfPartitions, 0);
    }

    protected PrioritizedDeque<InputChannel> getInputChannelsWithData() {
        return this.inputChannelsWithData;
    }

    @Override
    public void setup() throws IOException {
        Preconditions.checkState(this.bufferPool == null, "Bug in input gate setup logic: Already registered buffer pool.");
        BufferPool bufferPool = this.bufferPoolFactory.get();
        this.setBufferPool(bufferPool);
        if (this.tieredStorageConsumerClient != null) {
            this.tieredStorageConsumerClient.setup(bufferPool);
        }
        this.setupChannels();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> getStateConsumedFuture() {
        Object object = this.requestLock;
        synchronized (object) {
            ArrayList futures = new ArrayList(this.numberOfInputChannels);
            for (InputChannel inputChannel : this.inputChannels()) {
                if (!(inputChannel instanceof RecoveredInputChannel)) continue;
                futures.add(((RecoveredInputChannel)inputChannel).getStateConsumedFuture());
            }
            return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void requestPartitions() {
        Object object = this.requestLock;
        synchronized (object) {
            if (!this.requestedPartitionsFlag) {
                if (this.closeFuture.isDone()) {
                    throw new IllegalStateException("Already released.");
                }
                long numInputChannels = this.inputChannels.values().stream().mapToLong(x -> x.values().size()).sum();
                if ((long)this.numberOfInputChannels != numInputChannels) {
                    throw new IllegalStateException(String.format("Bug in input gate setup logic: mismatch between number of total input channels [%s] and the currently set number of input channels [%s].", numInputChannels, this.numberOfInputChannels));
                }
                this.convertRecoveredInputChannels();
                this.internalRequestPartitions();
            }
            this.requestedPartitionsFlag = true;
            if (this.enabledTieredStorage()) {
                this.tieredStorageConsumerClient.start();
            }
        }
    }

    @VisibleForTesting
    public void convertRecoveredInputChannels() {
        LOG.debug("Converting recovered input channels ({} channels)", (Object)this.getNumberOfInputChannels());
        for (Map<InputChannelInfo, InputChannel> inputChannelsForCurrentPartition : this.inputChannels.values()) {
            HashSet<InputChannelInfo> oldInputChannelInfos = new HashSet<InputChannelInfo>(inputChannelsForCurrentPartition.keySet());
            for (InputChannelInfo inputChannelInfo : oldInputChannelInfos) {
                InputChannel inputChannel = inputChannelsForCurrentPartition.get(inputChannelInfo);
                if (!(inputChannel instanceof RecoveredInputChannel)) continue;
                try {
                    InputChannel realInputChannel = ((RecoveredInputChannel)inputChannel).toInputChannel();
                    inputChannel.releaseAllResources();
                    inputChannelsForCurrentPartition.remove(inputChannelInfo);
                    inputChannelsForCurrentPartition.put(realInputChannel.getChannelInfo(), realInputChannel);
                    this.channels[inputChannel.getChannelIndex()] = realInputChannel;
                }
                catch (Throwable t) {
                    inputChannel.setError(t);
                    return;
                }
            }
        }
    }

    private void internalRequestPartitions() {
        for (InputChannel inputChannel : this.inputChannels()) {
            try {
                inputChannel.requestSubpartitions();
            }
            catch (Throwable t) {
                inputChannel.setError(t);
                return;
            }
        }
    }

    @Override
    public void finishReadRecoveredState() throws IOException {
        for (InputChannel channel : this.channels) {
            if (!(channel instanceof RecoveredInputChannel)) continue;
            ((RecoveredInputChannel)channel).finishReadRecoveredState();
        }
    }

    @Override
    public int getNumberOfInputChannels() {
        return this.numberOfInputChannels;
    }

    @Override
    public int getGateIndex() {
        return this.gateIndex;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public List<InputChannelInfo> getUnfinishedChannels() {
        ArrayList<InputChannelInfo> unfinishedChannels = new ArrayList<InputChannelInfo>(this.numberOfInputChannels - this.channelsWithEndOfPartitionEvents.cardinality());
        PrioritizedDeque<InputChannel> prioritizedDeque = this.inputChannelsWithData;
        synchronized (prioritizedDeque) {
            int i = this.channelsWithEndOfPartitionEvents.nextClearBit(0);
            while (i < this.numberOfInputChannels) {
                unfinishedChannels.add(this.getChannel(i).getChannelInfo());
                i = this.channelsWithEndOfPartitionEvents.nextClearBit(i + 1);
            }
        }
        return unfinishedChannels;
    }

    @VisibleForTesting
    int getBuffersInUseCount() {
        int total = 0;
        for (InputChannel channel : this.channels) {
            total += channel.getBuffersInUseCount();
        }
        return total;
    }

    @VisibleForTesting
    public void announceBufferSize(int newBufferSize) {
        for (InputChannel channel : this.channels) {
            if (channel.isReleased()) continue;
            channel.announceBufferSize(newBufferSize);
        }
    }

    @Override
    public void triggerDebloating() {
        if (this.isFinished() || this.closeFuture.isDone()) {
            return;
        }
        Preconditions.checkState(this.bufferDebloater != null, "Buffer debloater should not be null");
        long currentThroughput = this.throughputCalculator.calculateThroughput();
        this.bufferDebloater.recalculateBufferSize(currentThroughput, this.getBuffersInUseCount()).ifPresent(this::announceBufferSize);
    }

    public Duration getLastEstimatedTimeToConsume() {
        return this.bufferDebloater.getLastEstimatedTimeToConsumeBuffers();
    }

    public ResultPartitionType getConsumedPartitionType() {
        return this.consumedPartitionType;
    }

    BufferProvider getBufferProvider() {
        return this.bufferPool;
    }

    public BufferPool getBufferPool() {
        return this.bufferPool;
    }

    MemorySegmentProvider getMemorySegmentProvider() {
        return this.memorySegmentProvider;
    }

    public String getOwningTaskName() {
        return this.owningTaskName;
    }

    public int getNumberOfQueuedBuffers() {
        for (int retry = 0; retry < 3; ++retry) {
            try {
                int totalBuffers = 0;
                for (InputChannel channel : this.inputChannels()) {
                    totalBuffers += channel.unsynchronizedGetNumberOfQueuedBuffers();
                }
                return totalBuffers;
            }
            catch (Exception ex) {
                LOG.debug("Fail to get number of queued buffers :", (Throwable)ex);
                continue;
            }
        }
        return 0;
    }

    public long getSizeOfQueuedBuffers() {
        for (int retry = 0; retry < 3; ++retry) {
            try {
                long totalSize = 0L;
                for (InputChannel channel : this.inputChannels()) {
                    totalSize += channel.unsynchronizedGetSizeOfQueuedBuffers();
                }
                return totalSize;
            }
            catch (Exception ex) {
                LOG.debug("Fail to get size of queued buffers :", (Throwable)ex);
                continue;
            }
        }
        return 0L;
    }

    public CompletableFuture<Void> getCloseFuture() {
        return this.closeFuture;
    }

    @Override
    public InputChannel getChannel(int channelIndex) {
        return this.channels[channelIndex];
    }

    public void setBufferPool(BufferPool bufferPool) {
        Preconditions.checkState(this.bufferPool == null, "Bug in input gate setup logic: buffer pool hasalready been set for this input gate.");
        this.bufferPool = Preconditions.checkNotNull(bufferPool);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public void setupChannels() throws IOException {
        this.bufferPool.reserveSegments(1);
        Object object = this.requestLock;
        synchronized (object) {
            for (InputChannel inputChannel : this.inputChannels()) {
                inputChannel.setup();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setInputChannels(InputChannel ... channels) {
        if (channels.length != this.numberOfInputChannels) {
            throw new IllegalArgumentException("Expected " + this.numberOfInputChannels + " channels, but got " + channels.length);
        }
        Object object = this.requestLock;
        synchronized (object) {
            System.arraycopy(channels, 0, this.channels, 0, this.numberOfInputChannels);
            for (InputChannel inputChannel : channels) {
                if (this.inputChannels.computeIfAbsent(inputChannel.getPartitionId().getPartitionId(), ignored -> new HashMap()).put(inputChannel.getChannelInfo(), inputChannel) != null || !(inputChannel instanceof UnknownInputChannel)) continue;
                ++this.numberOfUninitializedChannels;
            }
        }
    }

    public void setTieredStorageService(List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs, TieredStorageConsumerClient client, TieredStorageNettyServiceImpl nettyService) {
        this.tieredStorageConsumerSpecs = tieredStorageConsumerSpecs;
        this.tieredStorageConsumerClient = client;
        if (client != null) {
            this.availabilityNotifier = new AvailabilityNotifierImpl();
            this.setupTieredStorageNettyService(nettyService, tieredStorageConsumerSpecs);
            client.registerAvailabilityNotifier(this.availabilityNotifier);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateInputChannel(ResourceID localLocation, NettyShuffleDescriptor shuffleDescriptor) throws IOException, InterruptedException {
        Object object = this.requestLock;
        synchronized (object) {
            if (this.closeFuture.isDone()) {
                return;
            }
            IntermediateResultPartitionID partitionId = shuffleDescriptor.getResultPartitionID().getPartitionId();
            HashMap<InputChannelInfo, InputChannel> newInputChannels = new HashMap<InputChannelInfo, InputChannel>();
            for (InputChannel current : this.inputChannels.get(partitionId).values()) {
                InputChannel newChannel;
                if (!(current instanceof UnknownInputChannel)) continue;
                UnknownInputChannel unknownChannel = (UnknownInputChannel)current;
                boolean isLocal = shuffleDescriptor.isLocalTo(localLocation);
                if (isLocal) {
                    newChannel = unknownChannel.toLocalInputChannel(shuffleDescriptor.getResultPartitionID());
                } else {
                    RemoteInputChannel remoteInputChannel = unknownChannel.toRemoteInputChannel(shuffleDescriptor.getConnectionId(), shuffleDescriptor.getResultPartitionID());
                    remoteInputChannel.setup();
                    newChannel = remoteInputChannel;
                }
                LOG.debug("{}: Updated unknown input channel to {}.", (Object)this.owningTaskName, (Object)newChannel);
                newInputChannels.put(newChannel.getChannelInfo(), newChannel);
                this.channels[current.getChannelIndex()] = newChannel;
                if (this.requestedPartitionsFlag) {
                    newChannel.requestSubpartitions();
                }
                for (TaskEvent event : this.pendingEvents) {
                    newChannel.sendTaskEvent(event);
                }
                if (--this.numberOfUninitializedChannels == 0) {
                    this.pendingEvents.clear();
                }
                if (!this.enabledTieredStorage()) continue;
                TieredStoragePartitionId tieredStoragePartitionId = TieredStorageIdMappingUtils.convertId(shuffleDescriptor.getResultPartitionID());
                TieredStorageConsumerSpec spec = Preconditions.checkNotNull(this.tieredStorageConsumerSpecs).get(current.getChannelIndex());
                for (int subpartitionId : spec.getSubpartitionIds().values()) {
                    this.tieredStorageConsumerClient.updateTierShuffleDescriptors(tieredStoragePartitionId, spec.getInputChannelId(), new TieredStorageSubpartitionId(subpartitionId), Preconditions.checkNotNull(shuffleDescriptor.getTierShuffleDescriptors()));
                }
                this.queueChannel(newChannel, null, false);
            }
            this.inputChannels.put(partitionId, newInputChannels);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void retriggerPartitionRequest(IntermediateResultPartitionID partitionId, InputChannelInfo inputChannelInfo) throws IOException {
        Object object = this.requestLock;
        synchronized (object) {
            if (!this.closeFuture.isDone()) {
                InputChannel ch = this.inputChannels.get(partitionId).get(inputChannelInfo);
                Preconditions.checkNotNull(ch, "Unknown input channel with ID " + partitionId);
                LOG.debug("{}: Retriggering partition request {}:{}.", new Object[]{this.owningTaskName, ch.partitionId, ch.getConsumedSubpartitionIndexSet()});
                if (ch.getClass() == RemoteInputChannel.class) {
                    RemoteInputChannel rch = (RemoteInputChannel)ch;
                    rch.retriggerSubpartitionRequest();
                } else if (ch.getClass() == LocalInputChannel.class) {
                    LocalInputChannel ich = (LocalInputChannel)ch;
                    if (this.retriggerLocalRequestTimer == null) {
                        this.retriggerLocalRequestTimer = new Timer(true);
                    }
                    ich.retriggerSubpartitionRequest(this.retriggerLocalRequestTimer);
                } else {
                    throw new IllegalStateException("Unexpected type of channel to retrigger partition: " + ch.getClass());
                }
            }
        }
    }

    @VisibleForTesting
    Timer getRetriggerLocalRequestTimer() {
        return this.retriggerLocalRequestTimer;
    }

    MemorySegment getUnpooledSegment() {
        return this.unpooledSegment;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws IOException {
        boolean released = false;
        PrioritizedDeque<InputChannel> prioritizedDeque = this.requestLock;
        synchronized (prioritizedDeque) {
            if (!this.closeFuture.isDone()) {
                try {
                    LOG.debug("{}: Releasing {}.", (Object)this.owningTaskName, (Object)this);
                    if (this.retriggerLocalRequestTimer != null) {
                        this.retriggerLocalRequestTimer.cancel();
                    }
                    for (InputChannel inputChannel : this.inputChannels()) {
                        try {
                            inputChannel.releaseAllResources();
                        }
                        catch (IOException e) {
                            LOG.warn("{}: Error during release of channel resources: {}.", new Object[]{this.owningTaskName, e.getMessage(), e});
                        }
                    }
                    if (this.bufferPool != null) {
                        this.bufferPool.lazyDestroy();
                    }
                }
                finally {
                    released = true;
                    this.closeFuture.complete(null);
                }
            }
        }
        if (released) {
            prioritizedDeque = this.inputChannelsWithData;
            synchronized (prioritizedDeque) {
                this.inputChannelsWithData.notifyAll();
            }
            if (this.enabledTieredStorage()) {
                this.tieredStorageConsumerClient.close();
            }
        }
    }

    @Override
    public boolean isFinished() {
        return this.hasReceivedAllEndOfPartitionEvents;
    }

    @Override
    public PullingAsyncDataInput.EndOfDataStatus hasReceivedEndOfData() {
        if (!this.hasReceivedEndOfData) {
            return PullingAsyncDataInput.EndOfDataStatus.NOT_END_OF_DATA;
        }
        if (this.shouldDrainOnEndOfData) {
            return PullingAsyncDataInput.EndOfDataStatus.DRAINED;
        }
        return PullingAsyncDataInput.EndOfDataStatus.STOPPED;
    }

    public String toString() {
        return "SingleInputGate{owningTaskName='" + this.owningTaskName + '\'' + ", gateIndex=" + this.gateIndex + '}';
    }

    @Override
    public Optional<BufferOrEvent> getNext() throws IOException, InterruptedException {
        return this.getNextBufferOrEvent(true);
    }

    @Override
    public Optional<BufferOrEvent> pollNext() throws IOException, InterruptedException {
        return this.getNextBufferOrEvent(false);
    }

    private Optional<BufferOrEvent> getNextBufferOrEvent(boolean blocking) throws IOException, InterruptedException {
        if (this.hasReceivedAllEndOfPartitionEvents) {
            return Optional.empty();
        }
        if (this.closeFuture.isDone()) {
            throw new CancelTaskException("Input gate is already closed.");
        }
        Optional<InputGate.InputWithData<InputChannel, Buffer>> next = this.waitAndGetNextData(blocking);
        if (!next.isPresent()) {
            this.throughputCalculator.pauseMeasurement();
            return Optional.empty();
        }
        this.throughputCalculator.resumeMeasurement();
        InputGate.InputWithData<InputChannel, Buffer> inputWithData = next.get();
        BufferOrEvent bufferOrEvent = this.transformToBufferOrEvent((Buffer)inputWithData.data, inputWithData.moreAvailable, (InputChannel)inputWithData.input, inputWithData.morePriorityEvents);
        this.throughputCalculator.incomingDataSize(bufferOrEvent.getSize());
        return Optional.of(bufferOrEvent);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     * Converted monitor instructions to comments
     * Lifted jumps to return sites
     */
    private Optional<InputGate.InputWithData<InputChannel, Buffer>> waitAndGetNextData(boolean blocking) throws IOException, InterruptedException {
        boolean bl;
        boolean morePriorityEvents;
        Optional<Buffer> buffer;
        InputChannel inputChannel;
        block6: while (true) {
            PrioritizedDeque<InputChannel> prioritizedDeque = this.inputChannelsWithData;
            // MONITORENTER : prioritizedDeque
            Optional<InputChannel> inputChannelOpt = this.getChannel(blocking);
            if (!inputChannelOpt.isPresent()) {
                // MONITOREXIT : prioritizedDeque
                return Optional.empty();
            }
            inputChannel = inputChannelOpt.get();
            buffer = this.readRecoveredOrNormalBuffer(inputChannel);
            if (!buffer.isPresent()) {
                this.checkUnavailability();
                // MONITOREXIT : prioritizedDeque
                continue;
            }
            int numSubpartitions = inputChannel.getConsumedSubpartitionIndexSet().size();
            if (numSubpartitions <= 1) break;
            switch (buffer.get().getDataType()) {
                case END_OF_DATA: {
                    int n = inputChannel.getChannelIndex();
                    this.endOfDatas[n] = this.endOfDatas[n] + 1;
                    if (this.endOfDatas[inputChannel.getChannelIndex()] >= numSubpartitions) break block6;
                    buffer.get().recycleBuffer();
                    // MONITOREXIT : prioritizedDeque
                    continue block6;
                }
                case END_OF_PARTITION: {
                    int n = inputChannel.getChannelIndex();
                    this.endOfPartitions[n] = this.endOfPartitions[n] + 1;
                    if (this.endOfPartitions[inputChannel.getChannelIndex()] >= numSubpartitions) break block6;
                    buffer.get().recycleBuffer();
                    // MONITOREXIT : prioritizedDeque
                    continue block6;
                }
            }
            break;
        }
        boolean bl2 = morePriorityEvents = this.inputChannelsWithData.getNumPriorityElements() > 0;
        if (buffer.get().getDataType().hasPriority() && !morePriorityEvents) {
            this.priorityAvailabilityHelper.resetUnavailable();
        }
        this.checkUnavailability();
        Buffer buffer2 = buffer.get();
        if (!this.inputChannelsWithData.isEmpty()) {
            bl = true;
            return Optional.of(new InputGate.InputWithData<InputChannel, Buffer>(inputChannel, buffer2, bl, morePriorityEvents));
        }
        bl = false;
        // MONITOREXIT : prioritizedDeque
        return Optional.of(new InputGate.InputWithData<InputChannel, Buffer>(inputChannel, buffer2, bl, morePriorityEvents));
    }

    private Optional<Buffer> readRecoveredOrNormalBuffer(InputChannel inputChannel) throws IOException, InterruptedException {
        if (inputChannel instanceof RecoveredInputChannel && !inputChannel.isReleased()) {
            Optional<Buffer> buffer = this.readBufferFromInputChannel(inputChannel);
            if (!((RecoveredInputChannel)inputChannel).getStateConsumedFuture().isDone()) {
                return buffer;
            }
        }
        return this.enabledTieredStorage() ? this.readBufferFromTieredStore(inputChannel) : this.readBufferFromInputChannel(inputChannel);
    }

    private Optional<Buffer> readBufferFromInputChannel(InputChannel inputChannel) throws IOException, InterruptedException {
        Buffer buffer;
        Optional<InputChannel.BufferAndAvailability> bufferAndAvailabilityOpt = inputChannel.getNextBuffer();
        if (!bufferAndAvailabilityOpt.isPresent()) {
            return Optional.empty();
        }
        InputChannel.BufferAndAvailability bufferAndAvailability = bufferAndAvailabilityOpt.get();
        if (bufferAndAvailability.moreAvailable()) {
            this.queueChannelUnsafe(inputChannel, bufferAndAvailability.morePriorityEvents());
        }
        if (bufferAndAvailability.hasPriority()) {
            this.lastPrioritySequenceNumber[inputChannel.getChannelIndex()] = bufferAndAvailability.getSequenceNumber();
        }
        if ((buffer = bufferAndAvailability.buffer()).getDataType() == Buffer.DataType.RECOVERY_METADATA) {
            RecoveryMetadata recoveryMetadata = (RecoveryMetadata)EventSerializer.fromSerializedEvent(buffer.getNioBufferReadable(), this.getClass().getClassLoader());
            this.lastBufferStatusMapInTieredStore.put(inputChannel.getChannelIndex(), Tuple2.of(buffer.getDataType().isPartialRecord(), recoveryMetadata.getFinalBufferSubpartitionId()));
        }
        return Optional.of(bufferAndAvailability.buffer());
    }

    private Optional<Buffer> readBufferFromTieredStore(InputChannel inputChannel) throws IOException {
        Optional<Buffer> buffer;
        TieredStorageConsumerSpec tieredStorageConsumerSpec = Preconditions.checkNotNull(this.tieredStorageConsumerSpecs).get(inputChannel.getChannelIndex());
        Tuple2 lastBufferStatus = this.lastBufferStatusMapInTieredStore.computeIfAbsent(inputChannel.getChannelIndex(), key -> Tuple2.of(false, -1));
        boolean isLastBufferPartialRecord = (Boolean)lastBufferStatus.f0;
        int lastSubpartitionId = (Integer)lastBufferStatus.f1;
        do {
            int subpartitionId;
            if ((subpartitionId = isLastBufferPartialRecord ? lastSubpartitionId : Preconditions.checkNotNull(this.tieredStorageConsumerClient).peekNextBufferSubpartitionId(tieredStorageConsumerSpec.getPartitionId(), tieredStorageConsumerSpec.getSubpartitionIds())) < 0) {
                return Optional.empty();
            }
            buffer = Preconditions.checkNotNull(this.tieredStorageConsumerClient).getNextBuffer(tieredStorageConsumerSpec.getPartitionId(), new TieredStorageSubpartitionId(subpartitionId));
            if (!buffer.isPresent()) continue;
            if (!(inputChannel instanceof RecoveredInputChannel)) {
                this.queueChannel(Preconditions.checkNotNull(inputChannel), null, false);
            }
            this.lastBufferStatusMapInTieredStore.put(inputChannel.getChannelIndex(), Tuple2.of(buffer.get().getDataType().isPartialRecord(), subpartitionId));
            break;
        } while (!isLastBufferPartialRecord && inputChannel.getConsumedSubpartitionIndexSet().size() > 1);
        return buffer;
    }

    private boolean enabledTieredStorage() {
        return this.tieredStorageConsumerClient != null;
    }

    private void checkUnavailability() {
        assert (Thread.holdsLock(this.inputChannelsWithData));
        if (this.inputChannelsWithData.isEmpty()) {
            this.availabilityHelper.resetUnavailable();
        }
    }

    private BufferOrEvent transformToBufferOrEvent(Buffer buffer, boolean moreAvailable, InputChannel currentChannel, boolean morePriorityEvents) throws IOException, InterruptedException {
        if (buffer.isBuffer()) {
            return this.transformBuffer(buffer, moreAvailable, currentChannel, morePriorityEvents);
        }
        return this.transformEvent(buffer, moreAvailable, currentChannel, morePriorityEvents);
    }

    private BufferOrEvent transformBuffer(Buffer buffer, boolean moreAvailable, InputChannel currentChannel, boolean morePriorityEvents) {
        return new BufferOrEvent(this.decompressBufferIfNeeded(buffer), currentChannel.getChannelInfo(), moreAvailable, morePriorityEvents);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private BufferOrEvent transformEvent(Buffer buffer, boolean moreAvailable, InputChannel currentChannel, boolean morePriorityEvents) throws IOException, InterruptedException {
        AbstractEvent event;
        try {
            event = EventSerializer.fromBuffer(buffer, this.getClass().getClassLoader());
        }
        finally {
            buffer.recycleBuffer();
        }
        if (event.getClass() == EndOfPartitionEvent.class) {
            PrioritizedDeque<InputChannel> prioritizedDeque = this.inputChannelsWithData;
            synchronized (prioritizedDeque) {
                Preconditions.checkState(!this.channelsWithEndOfPartitionEvents.get(currentChannel.getChannelIndex()));
                this.channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
                this.hasReceivedAllEndOfPartitionEvents = this.channelsWithEndOfPartitionEvents.cardinality() == this.numberOfInputChannels;
                this.enqueuedInputChannelsWithData.clear(currentChannel.getChannelIndex());
                if (this.inputChannelsWithData.contains(currentChannel)) {
                    this.inputChannelsWithData.getAndRemove(channel -> channel == currentChannel);
                }
            }
            if (this.hasReceivedAllEndOfPartitionEvents) {
                Preconditions.checkState(!moreAvailable || !this.pollNext().isPresent());
                moreAvailable = false;
                this.markAvailable();
            }
            currentChannel.releaseAllResources();
        } else if (event.getClass() == EndOfData.class) {
            PrioritizedDeque<InputChannel> prioritizedDeque = this.inputChannelsWithData;
            synchronized (prioritizedDeque) {
                Preconditions.checkState(!this.channelsWithEndOfUserRecords.get(currentChannel.getChannelIndex()));
                this.channelsWithEndOfUserRecords.set(currentChannel.getChannelIndex());
                this.hasReceivedEndOfData = this.channelsWithEndOfUserRecords.cardinality() == this.numberOfInputChannels;
                this.shouldDrainOnEndOfData &= ((EndOfData)event).getStopMode() == StopMode.DRAIN;
            }
        }
        return new BufferOrEvent(event, buffer.getDataType().hasPriority(), currentChannel.getChannelInfo(), moreAvailable, buffer.getSize(), morePriorityEvents);
    }

    private Buffer decompressBufferIfNeeded(Buffer buffer) {
        if (buffer.isCompressed()) {
            try {
                Preconditions.checkNotNull(this.bufferDecompressor, "Buffer decompressor not set.");
                Buffer buffer2 = this.bufferDecompressor.decompressToIntermediateBuffer(buffer);
                return buffer2;
            }
            finally {
                buffer.recycleBuffer();
            }
        }
        return buffer;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void markAvailable() {
        CompletableFuture<?> toNotify;
        PrioritizedDeque<InputChannel> prioritizedDeque = this.inputChannelsWithData;
        synchronized (prioritizedDeque) {
            toNotify = this.availabilityHelper.getUnavailableToResetAvailable();
        }
        toNotify.complete(null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sendTaskEvent(TaskEvent event) throws IOException {
        Object object = this.requestLock;
        synchronized (object) {
            for (InputChannel inputChannel : this.inputChannels()) {
                inputChannel.sendTaskEvent(event);
            }
            if (this.numberOfUninitializedChannels > 0) {
                this.pendingEvents.add(event);
            }
        }
    }

    @Override
    public void resumeConsumption(InputChannelInfo channelInfo) throws IOException {
        Preconditions.checkState(!this.isFinished(), "InputGate already finished.");
        this.channels[channelInfo.getInputChannelIdx()].resumeConsumption();
    }

    @Override
    public void acknowledgeAllRecordsProcessed(InputChannelInfo channelInfo) throws IOException {
        Preconditions.checkState(!this.isFinished(), "InputGate already finished.");
        if (!this.enabledTieredStorage()) {
            this.channels[channelInfo.getInputChannelIdx()].acknowledgeAllRecordsProcessed();
        }
    }

    void notifyChannelNonEmpty(InputChannel channel) {
        if (this.enabledTieredStorage()) {
            TieredStorageConsumerSpec tieredStorageConsumerSpec = Preconditions.checkNotNull(this.tieredStorageConsumerSpecs).get(channel.getChannelIndex());
            Preconditions.checkNotNull(this.availabilityNotifier).notifyAvailable(tieredStorageConsumerSpec.getPartitionId(), tieredStorageConsumerSpec.getInputChannelId());
        } else {
            this.queueChannel(Preconditions.checkNotNull(channel), null, false);
        }
    }

    void notifyPriorityEvent(InputChannel inputChannel, int prioritySequenceNumber) {
        this.queueChannel(Preconditions.checkNotNull(inputChannel), prioritySequenceNumber, false);
    }

    void notifyPriorityEventForce(InputChannel inputChannel) {
        this.queueChannel(Preconditions.checkNotNull(inputChannel), null, true);
    }

    void triggerPartitionStateCheck(ResultPartitionID partitionId, InputChannelInfo inputChannelInfo) {
        this.partitionProducerStateProvider.requestPartitionProducerState(this.consumedResultId, partitionId, responseHandle -> {
            boolean isProducingState = new RemoteChannelStateChecker(partitionId, this.owningTaskName).isProducerReadyOrAbortConsumption((PartitionProducerStateProvider.ResponseHandle)responseHandle);
            if (isProducingState) {
                try {
                    this.retriggerPartitionRequest(partitionId.getPartitionId(), inputChannelInfo);
                }
                catch (IOException t) {
                    responseHandle.failConsumption(t);
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     * Converted monitor instructions to comments
     * Lifted jumps to return sites
     */
    private void queueChannel(InputChannel channel, @Nullable Integer prioritySequenceNumber, boolean forcePriority) {
        boolean priority;
        Throwable throwable;
        GateNotificationHelper notification;
        block25: {
            block26: {
                block23: {
                    block24: {
                        notification = new GateNotificationHelper(this, this.inputChannelsWithData);
                        throwable = null;
                        PrioritizedDeque<InputChannel> prioritizedDeque = this.inputChannelsWithData;
                        // MONITORENTER : prioritizedDeque
                        boolean bl = priority = prioritySequenceNumber != null || forcePriority;
                        if (forcePriority || !priority || !this.isOutdated(prioritySequenceNumber, this.lastPrioritySequenceNumber[channel.getChannelIndex()])) break block23;
                        // MONITOREXIT : prioritizedDeque
                        if (notification == null) return;
                        if (throwable == null) break block24;
                        try {
                            notification.close();
                            return;
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                            return;
                        }
                    }
                    notification.close();
                    return;
                }
                try {
                    if (this.queueChannelUnsafe(channel, priority)) break block25;
                    // MONITOREXIT : prioritizedDeque
                    if (notification == null) return;
                    if (throwable == null) break block26;
                }
                catch (Throwable throwable3) {
                    throwable = throwable3;
                    throw throwable3;
                }
                try {
                    notification.close();
                    return;
                }
                catch (Throwable throwable4) {
                    throwable.addSuppressed(throwable4);
                    return;
                }
            }
            notification.close();
            return;
        }
        try {
            if (priority && this.inputChannelsWithData.getNumPriorityElements() == 1) {
                notification.notifyPriority();
            }
            if (this.inputChannelsWithData.size() == 1) {
                notification.notifyDataAvailable();
            }
            // MONITOREXIT : prioritizedDeque
            return;
        }
        catch (Throwable throwable5) {
            throw throwable5;
        }
        finally {
            if (notification != null) {
                if (throwable != null) {
                    try {
                        notification.close();
                    }
                    catch (Throwable throwable6) {
                        throwable.addSuppressed(throwable6);
                    }
                } else {
                    notification.close();
                }
            }
        }
    }

    private boolean isOutdated(int sequenceNumber, int lastSequenceNumber) {
        if (lastSequenceNumber < 0 != sequenceNumber < 0 && Math.max(lastSequenceNumber, sequenceNumber) > 0x3FFFFFFF) {
            return lastSequenceNumber < 0;
        }
        return lastSequenceNumber >= sequenceNumber;
    }

    private boolean queueChannelUnsafe(InputChannel channel, boolean priority) {
        assert (Thread.holdsLock(this.inputChannelsWithData));
        if (this.channelsWithEndOfPartitionEvents.get(channel.getChannelIndex())) {
            return false;
        }
        boolean alreadyEnqueued = this.enqueuedInputChannelsWithData.get(channel.getChannelIndex());
        if (alreadyEnqueued && (!priority || this.inputChannelsWithData.containsPriorityElement(channel))) {
            return false;
        }
        this.inputChannelsWithData.add(channel, priority, alreadyEnqueued);
        if (!alreadyEnqueued) {
            this.enqueuedInputChannelsWithData.set(channel.getChannelIndex());
        }
        return true;
    }

    private Optional<InputChannel> getChannel(boolean blocking) throws InterruptedException {
        assert (Thread.holdsLock(this.inputChannelsWithData));
        while (this.inputChannelsWithData.isEmpty()) {
            if (this.closeFuture.isDone()) {
                throw new IllegalStateException("Released");
            }
            if (blocking) {
                this.inputChannelsWithData.wait();
                continue;
            }
            this.availabilityHelper.resetUnavailable();
            return Optional.empty();
        }
        InputChannel inputChannel = this.inputChannelsWithData.poll();
        this.enqueuedInputChannelsWithData.clear(inputChannel.getChannelIndex());
        return Optional.of(inputChannel);
    }

    private void setupTieredStorageNettyService(TieredStorageNettyServiceImpl nettyService, List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs) {
        ArrayList<Supplier<InputChannel>> channelSuppliers = new ArrayList<Supplier<InputChannel>>();
        int index = 0;
        while (index < this.channels.length) {
            int channelIndex = index++;
            channelSuppliers.add(() -> this.channels[channelIndex]);
        }
        nettyService.setupInputChannels(tieredStorageConsumerSpecs, channelSuppliers);
    }

    @VisibleForTesting
    public Map<Tuple2<IntermediateResultPartitionID, InputChannelInfo>, InputChannel> getInputChannels() {
        HashMap<Tuple2<IntermediateResultPartitionID, InputChannelInfo>, InputChannel> result = new HashMap<Tuple2<IntermediateResultPartitionID, InputChannelInfo>, InputChannel>();
        for (Map.Entry<IntermediateResultPartitionID, Map<InputChannelInfo, InputChannel>> mapEntry : this.inputChannels.entrySet()) {
            for (Map.Entry<InputChannelInfo, InputChannel> entry : mapEntry.getValue().entrySet()) {
                result.put(Tuple2.of(mapEntry.getKey(), entry.getKey()), entry.getValue());
            }
        }
        return result;
    }

    public Iterable<InputChannel> inputChannels() {
        return () -> new Iterator<InputChannel>(){
            private final Iterator mapIterator;
            private Iterator iterator;
            {
                this.mapIterator = SingleInputGate.this.inputChannels.values().iterator();
                this.iterator = null;
            }

            @Override
            public boolean hasNext() {
                return this.iterator != null && this.iterator.hasNext() || this.mapIterator.hasNext();
            }

            @Override
            public InputChannel next() {
                if ((this.iterator == null || !this.iterator.hasNext()) && this.mapIterator.hasNext()) {
                    this.iterator = ((Map)this.mapIterator.next()).values().iterator();
                }
                if (this.iterator == null || !this.iterator.hasNext()) {
                    return null;
                }
                return (InputChannel)this.iterator.next();
            }
        };
    }

    private class AvailabilityNotifierImpl
    implements AvailabilityNotifier {
        private AvailabilityNotifierImpl() {
        }

        @Override
        public void notifyAvailable(TieredStoragePartitionId partitionId, TieredStorageInputChannelId inputChannelId) {
            Map channels = (Map)SingleInputGate.this.inputChannels.get(partitionId.getPartitionID().getPartitionId());
            if (channels == null) {
                return;
            }
            InputChannelInfo inputChannelInfo = new InputChannelInfo(SingleInputGate.this.gateIndex, inputChannelId.getInputChannelId());
            InputChannel inputChannel = (InputChannel)channels.get(inputChannelInfo);
            if (inputChannel != null) {
                SingleInputGate.this.queueChannel(inputChannel, null, false);
            }
        }
    }
}

