/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.Timer;
import java.util.TimerTask;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.util.event.NotificationListener;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class LocalInputChannel
extends InputChannel
implements NotificationListener {
    private static final Logger LOG = LoggerFactory.getLogger(LocalInputChannel.class);
    private final Object requestLock = new Object();
    private final ResultPartitionManager partitionManager;
    private final TaskEventDispatcher taskEventDispatcher;
    private volatile ResultSubpartitionView subpartitionView;
    private volatile boolean isReleased;
    private volatile Buffer lookAhead;

    LocalInputChannel(SingleInputGate inputGate, int channelIndex, ResultPartitionID partitionId, ResultPartitionManager partitionManager, TaskEventDispatcher taskEventDispatcher) {
        this(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher, (Tuple2<Integer, Integer>)new Tuple2((Object)0, (Object)0));
    }

    LocalInputChannel(SingleInputGate inputGate, int channelIndex, ResultPartitionID partitionId, ResultPartitionManager partitionManager, TaskEventDispatcher taskEventDispatcher, Tuple2<Integer, Integer> initialAndMaxBackoff) {
        super(inputGate, channelIndex, partitionId, initialAndMaxBackoff);
        this.partitionManager = Preconditions.checkNotNull(partitionManager);
        this.taskEventDispatcher = Preconditions.checkNotNull(taskEventDispatcher);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    void requestSubpartition(int subpartitionIndex) throws IOException, InterruptedException {
        Object object = this.requestLock;
        synchronized (object) {
            if (this.subpartitionView == null) {
                LOG.debug("{}: Requesting LOCAL subpartition {} of partition {}.", new Object[]{this, subpartitionIndex, this.partitionId});
                try {
                    this.subpartitionView = this.partitionManager.createSubpartitionView(this.partitionId, subpartitionIndex, this.inputGate.getBufferProvider());
                }
                catch (PartitionNotFoundException notFound) {
                    if (this.increaseBackoff()) {
                        this.inputGate.retriggerPartitionRequest(this.partitionId.getPartitionId());
                        return;
                    }
                    throw notFound;
                }
                if (this.subpartitionView == null) {
                    throw new IOException("Error requesting subpartition.");
                }
                this.getNextLookAhead();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void retriggerSubpartitionRequest(Timer timer, final int subpartitionIndex) throws IOException, InterruptedException {
        Object object = this.requestLock;
        synchronized (object) {
            Preconditions.checkState(this.subpartitionView == null, "Already requested partition.");
            timer.schedule(new TimerTask(){

                @Override
                public void run() {
                    try {
                        LocalInputChannel.this.requestSubpartition(subpartitionIndex);
                    }
                    catch (Throwable t) {
                        LocalInputChannel.this.setError(t);
                    }
                }
            }, this.getCurrentBackoff());
        }
    }

    @Override
    Buffer getNextBuffer() throws IOException, InterruptedException {
        this.checkError();
        Preconditions.checkState(this.subpartitionView != null, "Queried for a buffer before requesting the subpartition.");
        if (this.lookAhead == null) {
            this.lookAhead = this.subpartitionView.getNextBuffer();
        }
        Buffer next = this.lookAhead;
        this.lookAhead = null;
        if (!next.isBuffer() && EventSerializer.fromBuffer(next, this.getClass().getClassLoader()).getClass() == EndOfPartitionEvent.class) {
            return next;
        }
        this.getNextLookAhead();
        return next;
    }

    @Override
    void sendTaskEvent(TaskEvent event) throws IOException {
        this.checkError();
        Preconditions.checkState(this.subpartitionView != null, "Tried to send task event to producer before requesting the subpartition.");
        if (!this.taskEventDispatcher.publish(this.partitionId, event)) {
            throw new IOException("Error while publishing event " + event + " to producer. The producer could not be found.");
        }
    }

    @Override
    boolean isReleased() {
        return this.isReleased;
    }

    @Override
    void notifySubpartitionConsumed() throws IOException {
        if (this.subpartitionView != null) {
            this.subpartitionView.notifySubpartitionConsumed();
        }
    }

    @Override
    void releaseAllResources() throws IOException {
        if (!this.isReleased) {
            if (this.lookAhead != null) {
                this.lookAhead.recycle();
                this.lookAhead = null;
            }
            if (this.subpartitionView != null) {
                this.subpartitionView.releaseAllResources();
                this.subpartitionView = null;
            }
            this.isReleased = true;
        }
    }

    public String toString() {
        return "LocalInputChannel [" + this.partitionId + "]";
    }

    @Override
    public void onNotification() {
        if (this.isReleased) {
            return;
        }
        try {
            this.getNextLookAhead();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void getNextLookAhead() throws IOException, InterruptedException {
        block3: {
            ResultSubpartitionView view = this.subpartitionView;
            if (view == null) {
                return;
            }
            do {
                this.lookAhead = view.getNextBuffer();
                if (this.lookAhead != null) break block3;
                if (!view.registerListener(this)) continue;
                return;
            } while (!view.isReleased());
            Throwable cause = view.getFailureCause();
            if (cause != null) {
                this.setError(new ProducerFailedException(cause));
            }
            return;
        }
        this.notifyAvailableBuffer();
    }
}

