/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.List;
import java.util.Optional;
import java.util.Timer;
import java.util.TimerTask;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.CompositeBuffer;
import org.apache.flink.runtime.io.network.buffer.FileRegionBuffer;
import org.apache.flink.runtime.io.network.buffer.FullyFilledBuffer;
import org.apache.flink.runtime.io.network.logger.NetworkActionsLogger;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.ChannelStatePersister;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalInputChannel
extends InputChannel
implements BufferAvailabilityListener {
    private static final Logger LOG = LoggerFactory.getLogger(LocalInputChannel.class);
    private final Object requestLock = new Object();
    private final ResultPartitionManager partitionManager;
    private final TaskEventPublisher taskEventPublisher;
    @Nullable
    private volatile ResultSubpartitionView subpartitionView;
    private volatile boolean isReleased;
    private final ChannelStatePersister channelStatePersister;
    private final Deque<ResultSubpartition.BufferAndBacklog> toBeConsumedBuffers = new ArrayDeque<ResultSubpartition.BufferAndBacklog>();

    public LocalInputChannel(SingleInputGate inputGate, int channelIndex, ResultPartitionID partitionId, ResultSubpartitionIndexSet consumedSubpartitionIndexSet, ResultPartitionManager partitionManager, TaskEventPublisher taskEventPublisher, int initialBackoff, int maxBackoff, Counter numBytesIn, Counter numBuffersIn, ChannelStateWriter stateWriter) {
        super(inputGate, channelIndex, partitionId, consumedSubpartitionIndexSet, initialBackoff, maxBackoff, numBytesIn, numBuffersIn);
        this.partitionManager = Preconditions.checkNotNull(partitionManager);
        this.taskEventPublisher = Preconditions.checkNotNull(taskEventPublisher);
        this.channelStatePersister = new ChannelStatePersister(stateWriter, this.getChannelInfo());
    }

    @Override
    public void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException {
        this.channelStatePersister.startPersisting(barrier.getId(), Collections.emptyList());
    }

    @Override
    public void checkpointStopped(long checkpointId) {
        this.channelStatePersister.stopPersisting(checkpointId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void requestSubpartitions() throws IOException {
        Preconditions.checkState(this.toBeConsumedBuffers.isEmpty());
        boolean retriggerRequest = false;
        boolean notifyDataAvailable = false;
        Object object = this.requestLock;
        synchronized (object) {
            Preconditions.checkState(!this.isReleased, "LocalInputChannel has been released already");
            if (this.subpartitionView == null) {
                LOG.debug("{}: Requesting LOCAL subpartitions {} of partition {}. {}", new Object[]{this, this.consumedSubpartitionIndexSet, this.partitionId, this.channelStatePersister});
                try {
                    ResultSubpartitionView subpartitionView = this.partitionManager.createSubpartitionView(this.partitionId, this.consumedSubpartitionIndexSet, this);
                    if (subpartitionView == null) {
                        throw new IOException("Error requesting subpartition.");
                    }
                    this.subpartitionView = subpartitionView;
                    if (this.isReleased) {
                        subpartitionView.releaseAllResources();
                        this.subpartitionView = null;
                    } else {
                        notifyDataAvailable = true;
                    }
                }
                catch (PartitionNotFoundException notFound) {
                    if (this.increaseBackoff()) {
                        retriggerRequest = true;
                    }
                    throw notFound;
                }
            }
        }
        if (notifyDataAvailable) {
            this.notifyDataAvailable(this.subpartitionView);
        }
        if (retriggerRequest) {
            this.inputGate.retriggerPartitionRequest(this.partitionId.getPartitionId(), this.channelInfo);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void retriggerSubpartitionRequest(Timer timer) {
        Object object = this.requestLock;
        synchronized (object) {
            Preconditions.checkState(this.subpartitionView == null, "already requested partition");
            timer.schedule(new TimerTask(){

                @Override
                public void run() {
                    try {
                        LocalInputChannel.this.requestSubpartitions();
                    }
                    catch (Throwable t) {
                        LocalInputChannel.this.setError(t);
                    }
                }
            }, this.getCurrentBackoff());
        }
    }

    @Override
    protected int peekNextBufferSubpartitionIdInternal() throws IOException {
        this.checkError();
        ResultSubpartitionView subpartitionView = this.subpartitionView;
        if (subpartitionView == null) {
            if (this.isReleased) {
                return -1;
            }
            subpartitionView = this.checkAndWaitForSubpartitionView();
        }
        return subpartitionView.peekNextBufferSubpartitionId();
    }

    @Override
    public Optional<InputChannel.BufferAndAvailability> getNextBuffer() throws IOException {
        this.checkError();
        if (!this.toBeConsumedBuffers.isEmpty()) {
            return this.getBufferAndAvailability(this.toBeConsumedBuffers.removeFirst());
        }
        ResultSubpartitionView subpartitionView = this.subpartitionView;
        if (subpartitionView == null) {
            if (this.isReleased) {
                return Optional.empty();
            }
            subpartitionView = this.checkAndWaitForSubpartitionView();
        }
        ResultSubpartition.BufferAndBacklog next = subpartitionView.getNextBuffer();
        while (next != null && next.buffer().readableBytes() == 0) {
            next.buffer().recycleBuffer();
            next = subpartitionView.getNextBuffer();
            this.numBuffersIn.inc();
        }
        if (next == null) {
            if (subpartitionView.isReleased()) {
                throw new CancelTaskException("Consumed partition " + String.valueOf(subpartitionView) + " has been released.");
            }
            return Optional.empty();
        }
        Buffer buffer = next.buffer();
        if (buffer instanceof FullyFilledBuffer) {
            List<Buffer> partialBuffers = ((FullyFilledBuffer)buffer).getPartialBuffers();
            int seq = next.getSequenceNumber();
            for (Buffer partialBuffer : partialBuffers) {
                this.toBeConsumedBuffers.add(new ResultSubpartition.BufferAndBacklog(partialBuffer, next.buffersInBacklog(), buffer.getDataType(), seq++));
            }
            return this.getBufferAndAvailability(this.toBeConsumedBuffers.removeFirst());
        }
        return this.getBufferAndAvailability(next);
    }

    private Optional<InputChannel.BufferAndAvailability> getBufferAndAvailability(ResultSubpartition.BufferAndBacklog next) throws IOException {
        Buffer buffer = next.buffer();
        if (buffer instanceof FileRegionBuffer) {
            buffer = ((FileRegionBuffer)buffer).readInto(this.inputGate.getUnpooledSegment());
        }
        if (buffer instanceof CompositeBuffer) {
            buffer = ((CompositeBuffer)buffer).getFullBufferData(this.inputGate.getUnpooledSegment());
        }
        this.numBytesIn.inc((long)buffer.readableBytes());
        this.numBuffersIn.inc();
        this.channelStatePersister.checkForBarrier(buffer);
        this.channelStatePersister.maybePersist(buffer);
        NetworkActionsLogger.traceInput("LocalInputChannel#getNextBuffer", buffer, this.inputGate.getOwningTaskName(), this.channelInfo, this.channelStatePersister, next.getSequenceNumber());
        return Optional.of(new InputChannel.BufferAndAvailability(buffer, next.getNextDataType(), next.buffersInBacklog(), next.getSequenceNumber()));
    }

    @Override
    public void notifyDataAvailable(ResultSubpartitionView view) {
        this.notifyChannelNonEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ResultSubpartitionView checkAndWaitForSubpartitionView() {
        Object object = this.requestLock;
        synchronized (object) {
            Preconditions.checkState(!this.isReleased, "released");
            Preconditions.checkState(this.subpartitionView != null, "Queried for a buffer before requesting the subpartition.");
            return this.subpartitionView;
        }
    }

    @Override
    public void resumeConsumption() {
        Preconditions.checkState(!this.isReleased, "Channel released.");
        ResultSubpartitionView subpartitionView = Preconditions.checkNotNull(this.subpartitionView);
        subpartitionView.resumeConsumption();
        if (subpartitionView.getAvailabilityAndBacklog(true).isAvailable()) {
            this.notifyChannelNonEmpty();
        }
    }

    @Override
    public void acknowledgeAllRecordsProcessed() throws IOException {
        Preconditions.checkState(!this.isReleased, "Channel released.");
        this.subpartitionView.acknowledgeAllDataProcessed();
    }

    @Override
    void sendTaskEvent(TaskEvent event) throws IOException {
        this.checkError();
        Preconditions.checkState(this.subpartitionView != null, "Tried to send task event to producer before requesting the subpartition.");
        if (!this.taskEventPublisher.publish(this.partitionId, event)) {
            throw new IOException("Error while publishing event " + String.valueOf(event) + " to producer. The producer could not be found.");
        }
    }

    @Override
    boolean isReleased() {
        return this.isReleased;
    }

    @Override
    void releaseAllResources() throws IOException {
        if (!this.isReleased) {
            this.isReleased = true;
            ResultSubpartitionView view = this.subpartitionView;
            if (view != null) {
                view.releaseAllResources();
                this.subpartitionView = null;
            }
        }
    }

    @Override
    void announceBufferSize(int newBufferSize) {
        Preconditions.checkState(!this.isReleased, "Channel released.");
        ResultSubpartitionView view = this.subpartitionView;
        if (view != null) {
            view.notifyNewBufferSize(newBufferSize);
        }
    }

    @Override
    int getBuffersInUseCount() {
        ResultSubpartitionView view = this.subpartitionView;
        return view == null ? 0 : view.getNumberOfQueuedBuffers();
    }

    @Override
    public int unsynchronizedGetNumberOfQueuedBuffers() {
        ResultSubpartitionView view = this.subpartitionView;
        if (view != null) {
            return view.unsynchronizedGetNumberOfQueuedBuffers();
        }
        return 0;
    }

    @Override
    public void notifyRequiredSegmentId(int subpartitionId, int segmentId) {
        if (this.subpartitionView != null) {
            Preconditions.checkNotNull(this.subpartitionView).notifyRequiredSegmentId(subpartitionId, segmentId);
        }
    }

    public String toString() {
        return "LocalInputChannel [" + String.valueOf(this.partitionId) + "]";
    }

    @VisibleForTesting
    ResultSubpartitionView getSubpartitionView() {
        return this.subpartitionView;
    }
}

