/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.streaming.runtime.io.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.InputStatus;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.util.Preconditions;

@Internal
public final class StreamTaskNetworkInput<T>
implements StreamTaskInput<T> {
    private final CheckpointedInputGate checkpointedInputGate;
    private final DeserializationDelegate<StreamElement> deserializationDelegate;
    private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
    private final StatusWatermarkValve statusWatermarkValve;
    private final int inputIndex;
    private int lastChannel = -1;
    private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer = null;

    public StreamTaskNetworkInput(CheckpointedInputGate checkpointedInputGate, TypeSerializer<?> inputSerializer, IOManager ioManager, StatusWatermarkValve statusWatermarkValve, int inputIndex) {
        this.checkpointedInputGate = checkpointedInputGate;
        this.deserializationDelegate = new NonReusingDeserializationDelegate(new StreamElementSerializer(inputSerializer));
        this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[checkpointedInputGate.getNumberOfInputChannels()];
        for (int i = 0; i < this.recordDeserializers.length; ++i) {
            this.recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer(ioManager.getSpillingDirectoriesPaths());
        }
        this.statusWatermarkValve = (StatusWatermarkValve)Preconditions.checkNotNull((Object)statusWatermarkValve);
        this.inputIndex = inputIndex;
    }

    @VisibleForTesting
    StreamTaskNetworkInput(CheckpointedInputGate checkpointedInputGate, TypeSerializer<?> inputSerializer, StatusWatermarkValve statusWatermarkValve, int inputIndex, RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers) {
        this.checkpointedInputGate = checkpointedInputGate;
        this.deserializationDelegate = new NonReusingDeserializationDelegate(new StreamElementSerializer(inputSerializer));
        this.recordDeserializers = recordDeserializers;
        this.statusWatermarkValve = statusWatermarkValve;
        this.inputIndex = inputIndex;
    }

    @Override
    public InputStatus emitNext(PushingAsyncDataInput.DataOutput<T> output) throws Exception {
        while (true) {
            Optional<BufferOrEvent> bufferOrEvent;
            if (this.currentRecordDeserializer != null) {
                RecordDeserializer.DeserializationResult result = this.currentRecordDeserializer.getNextRecord(this.deserializationDelegate);
                if (result.isBufferConsumed()) {
                    this.currentRecordDeserializer.getCurrentBuffer().recycleBuffer();
                    this.currentRecordDeserializer = null;
                }
                if (result.isFullRecord()) {
                    this.processElement((StreamElement)this.deserializationDelegate.getInstance(), output);
                    return InputStatus.MORE_AVAILABLE;
                }
            }
            if (!(bufferOrEvent = this.checkpointedInputGate.pollNext()).isPresent()) break;
            this.processBufferOrEvent(bufferOrEvent.get());
        }
        if (this.checkpointedInputGate.isFinished()) {
            Preconditions.checkState((boolean)this.checkpointedInputGate.getAvailableFuture().isDone(), (Object)"Finished BarrierHandler should be available");
            if (!this.checkpointedInputGate.isEmpty()) {
                throw new IllegalStateException("Trailing data in checkpoint barrier handler.");
            }
            return InputStatus.END_OF_INPUT;
        }
        return InputStatus.NOTHING_AVAILABLE;
    }

    private void processElement(StreamElement recordOrMark, PushingAsyncDataInput.DataOutput<T> output) throws Exception {
        if (recordOrMark.isRecord()) {
            output.emitRecord(recordOrMark.asRecord());
        } else if (recordOrMark.isWatermark()) {
            this.statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), this.lastChannel);
        } else if (recordOrMark.isLatencyMarker()) {
            output.emitLatencyMarker(recordOrMark.asLatencyMarker());
        } else if (recordOrMark.isStreamStatus()) {
            this.statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), this.lastChannel);
        } else {
            throw new UnsupportedOperationException("Unknown type of StreamElement");
        }
    }

    private void processBufferOrEvent(BufferOrEvent bufferOrEvent) throws IOException {
        if (bufferOrEvent.isBuffer()) {
            this.lastChannel = bufferOrEvent.getChannelIndex();
            Preconditions.checkState((this.lastChannel != -1 ? 1 : 0) != 0);
            this.currentRecordDeserializer = this.recordDeserializers[this.lastChannel];
            Preconditions.checkState((this.currentRecordDeserializer != null ? 1 : 0) != 0, (Object)"currentRecordDeserializer has already been released");
            this.currentRecordDeserializer.setNextBuffer(bufferOrEvent.getBuffer());
        } else {
            AbstractEvent event = bufferOrEvent.getEvent();
            if (event.getClass() != EndOfPartitionEvent.class) {
                throw new IOException("Unexpected event: " + event);
            }
            this.releaseDeserializer(bufferOrEvent.getChannelIndex());
        }
    }

    @Override
    public int getInputIndex() {
        return this.inputIndex;
    }

    public CompletableFuture<?> getAvailableFuture() {
        if (this.currentRecordDeserializer != null) {
            return AVAILABLE;
        }
        return this.checkpointedInputGate.getAvailableFuture();
    }

    @Override
    public void close() throws IOException {
        for (int channelIndex = 0; channelIndex < this.recordDeserializers.length; ++channelIndex) {
            this.releaseDeserializer(channelIndex);
        }
        this.checkpointedInputGate.cleanup();
    }

    private void releaseDeserializer(int channelIndex) {
        RecordDeserializer<DeserializationDelegate<StreamElement>> deserializer = this.recordDeserializers[channelIndex];
        if (deserializer != null) {
            Buffer buffer = deserializer.getCurrentBuffer();
            if (buffer != null && !buffer.isRecycled()) {
                buffer.recycleBuffer();
            }
            deserializer.clear();
            this.recordDeserializers[channelIndex] = null;
        }
    }
}

