/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.CheckpointableInput;
import org.apache.flink.runtime.io.network.partition.consumer.EndOfChannelStateEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointableTask;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.operators.testutils.DummyCheckpointInvokable;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput;
import org.apache.flink.streaming.runtime.io.DataInputStatus;
import org.apache.flink.streaming.runtime.io.MockInputGate;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.io.StreamTaskInput;
import org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierTracker;
import org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate;
import org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler;
import org.apache.flink.streaming.runtime.io.checkpointing.UpstreamRecoveryTracker;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinator;
import org.apache.flink.streaming.runtime.tasks.TestSubtaskCheckpointCoordinator;
import org.apache.flink.streaming.runtime.watermarkstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Test;

public class StreamTaskNetworkInputTest {
    private static final int PAGE_SIZE = 1000;
    private final IOManager ioManager = new IOManagerAsync();

    @After
    public void tearDown() throws Exception {
        this.ioManager.close();
    }

    @Test
    public void testIsAvailableWithBufferedDataInDeserializer() throws Exception {
        List<BufferOrEvent> buffers = Collections.singletonList(this.createDataBuffer());
        VerifyRecordsDataOutput output = new VerifyRecordsDataOutput();
        StreamTaskNetworkInput<Long> input = this.createStreamTaskNetworkInput(buffers);
        StreamTaskNetworkInputTest.assertHasNextElement(input, output);
        StreamTaskNetworkInputTest.assertHasNextElement(input, output);
        Assertions.assertThat((int)output.getNumberOfEmittedRecords()).isEqualTo(2);
    }

    @Test
    public void testNoDataProcessedAfterCheckpointBarrier() throws Exception {
        CheckpointBarrier barrier = new CheckpointBarrier(0L, 0L, CheckpointOptions.forCheckpointWithDefaultLocation());
        ArrayList<BufferOrEvent> buffers = new ArrayList<BufferOrEvent>(2);
        buffers.add(new BufferOrEvent((AbstractEvent)barrier, new InputChannelInfo(0, 0)));
        buffers.add(this.createDataBuffer());
        VerifyRecordsDataOutput output = new VerifyRecordsDataOutput();
        StreamTaskNetworkInput<Long> input = this.createStreamTaskNetworkInput(buffers);
        StreamTaskNetworkInputTest.assertHasNextElement(input, output);
        Assertions.assertThat((int)output.getNumberOfEmittedRecords()).isZero();
    }

    private Map<InputChannelInfo, TestRecordDeserializer> createDeserializers(CheckpointableInput inputGate) {
        return inputGate.getChannelInfos().stream().collect(Collectors.toMap(Function.identity(), unused -> new TestRecordDeserializer(this.ioManager.getSpillingDirectoriesPaths())));
    }

    @Test
    public void testSnapshotAfterEndOfPartition() throws Exception {
        int numInputChannels = 1;
        int channelId = 0;
        int checkpointId = 0;
        VerifyRecordsDataOutput output = new VerifyRecordsDataOutput();
        LongSerializer inSerializer = LongSerializer.INSTANCE;
        StreamTestSingleInputGate inputGate = new StreamTestSingleInputGate(numInputChannels, 0, inSerializer, 1024);
        StreamTaskNetworkInput input = new StreamTaskNetworkInput(new CheckpointedInputGate((InputGate)inputGate.getInputGate(), (CheckpointBarrierHandler)SingleCheckpointBarrierHandler.createUnalignedCheckpointBarrierHandler((SubtaskCheckpointCoordinator)TestSubtaskCheckpointCoordinator.INSTANCE, (String)"test", (CheckpointableTask)new DummyCheckpointInvokable(), (Clock)SystemClock.getInstance(), (boolean)false, (CheckpointableInput[])new CheckpointableInput[]{inputGate.getInputGate()}), (MailboxExecutor)new SyncMailboxExecutor()), (TypeSerializer)inSerializer, this.ioManager, new StatusWatermarkValve(numInputChannels), 0, () -> false);
        inputGate.sendEvent((AbstractEvent)new CheckpointBarrier((long)checkpointId, 0L, CheckpointOptions.forCheckpointWithDefaultLocation().toUnaligned()), channelId);
        inputGate.sendElement(new StreamRecord((Object)42L), channelId);
        StreamTaskNetworkInputTest.assertHasNextElement(input, output);
        StreamTaskNetworkInputTest.assertHasNextElement(input, output);
        Assertions.assertThat((int)output.getNumberOfEmittedRecords()).isOne();
        inputGate.sendEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, channelId);
        input.emitNext(output);
        CompletableFuture completableFuture = input.prepareSnapshot(ChannelStateWriter.NO_OP, (long)checkpointId);
        completableFuture.join();
    }

    @Test
    public void testReleasingDeserializerTimely() throws Exception {
        int numInputChannels = 2;
        LongSerializer inSerializer = LongSerializer.INSTANCE;
        StreamTestSingleInputGate<Long> inputGate = new StreamTestSingleInputGate<Long>(numInputChannels, 0, (TypeSerializer<Long>)inSerializer, 1024);
        NoOpDataOutput output = new NoOpDataOutput();
        Map<InputChannelInfo, TestRecordDeserializer> deserializers = this.createDeserializers((CheckpointableInput)inputGate.getInputGate());
        HashMap<InputChannelInfo, TestRecordDeserializer> copiedDeserializers = new HashMap<InputChannelInfo, TestRecordDeserializer>(deserializers);
        TestStreamTaskNetworkInput input = new TestStreamTaskNetworkInput(inputGate, inSerializer, numInputChannels, deserializers);
        for (InputChannelInfo channelInfo : inputGate.getInputGate().getChannelInfos()) {
            Assertions.assertThat((Object)((Object)deserializers.get(channelInfo))).isNotNull();
            inputGate.sendEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, channelInfo.getInputChannelIdx());
            input.emitNext(output);
            Assertions.assertThat((boolean)((TestRecordDeserializer)((Object)copiedDeserializers.get(channelInfo))).isCleared()).isTrue();
            Assertions.assertThat((Object)((Object)deserializers.get(channelInfo))).isNull();
        }
    }

    @Test
    public void testInputStatusAfterEndOfRecovery() throws Exception {
        int numInputChannels = 2;
        LongSerializer inSerializer = LongSerializer.INSTANCE;
        StreamTestSingleInputGate<Long> inputGate = new StreamTestSingleInputGate<Long>(numInputChannels, 0, (TypeSerializer<Long>)inSerializer, 1024);
        NoOpDataOutput output = new NoOpDataOutput();
        Map<InputChannelInfo, TestRecordDeserializer> deserializers = this.createDeserializers((CheckpointableInput)inputGate.getInputGate());
        TestStreamTaskNetworkInput input = new TestStreamTaskNetworkInput(inputGate, inSerializer, numInputChannels, deserializers);
        inputGate.sendElement(new StreamRecord((Object)42L), 0);
        Assertions.assertThat((Comparable)input.emitNext(output)).isEqualTo((Object)DataInputStatus.MORE_AVAILABLE);
        inputGate.sendEvent((AbstractEvent)EndOfChannelStateEvent.INSTANCE, 0);
        Assertions.assertThat((Comparable)input.emitNext(output)).isEqualTo((Object)DataInputStatus.MORE_AVAILABLE);
        inputGate.sendEvent((AbstractEvent)EndOfChannelStateEvent.INSTANCE, 1);
        Assertions.assertThat((Comparable)input.emitNext(output)).isEqualTo((Object)DataInputStatus.END_OF_RECOVERY);
    }

    @Test
    public void testRecordsAreProcessedInBatches() throws Exception {
        int i;
        int numInputChannels = 2;
        Random random = new Random();
        LongSerializer inSerializer = LongSerializer.INSTANCE;
        StreamTestSingleInputGate inputGate = new StreamTestSingleInputGate(numInputChannels, 0, inSerializer, 1024);
        StreamTaskNetworkInput input = new StreamTaskNetworkInput(StreamTaskNetworkInputTest.createCheckpointedInputGate((InputGate)inputGate.getInputGate()), (TypeSerializer)inSerializer, this.ioManager, new StatusWatermarkValve(numInputChannels), 0, () -> true);
        VerifyRecordsDataOutput output = new VerifyRecordsDataOutput();
        int expectedElementCount = 3;
        for (i = 0; i < expectedElementCount; ++i) {
            inputGate.sendElement(new StreamRecord((Object)i), random.nextInt(numInputChannels));
        }
        Assertions.assertThat((Comparable)input.emitNext(output)).isEqualTo((Object)DataInputStatus.NOTHING_AVAILABLE);
        Assertions.assertThat((int)output.getNumberOfEmittedRecords()).isEqualTo(expectedElementCount);
        inputGate.sendEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, 0);
        for (i = 0; i < expectedElementCount; ++i) {
            inputGate.sendElement(new StreamRecord((Object)i), 1);
        }
        Assertions.assertThat((Comparable)input.emitNext(output)).isEqualTo((Object)DataInputStatus.NOTHING_AVAILABLE);
        Assertions.assertThat((int)output.getNumberOfEmittedRecords()).isEqualTo(expectedElementCount * 2);
    }

    @Test
    public void testBatchProcessingRecordsCanBeInterrupted() throws Exception {
        int i;
        int numInputChannels = 2;
        Random random = new Random();
        LongSerializer inSerializer = LongSerializer.INSTANCE;
        StreamTestSingleInputGate inputGate = new StreamTestSingleInputGate(numInputChannels, 0, inSerializer, 1024);
        AtomicBoolean canEmitBatchOfRecords = new AtomicBoolean();
        StreamTaskNetworkInput input = new StreamTaskNetworkInput(StreamTaskNetworkInputTest.createCheckpointedInputGate((InputGate)inputGate.getInputGate()), (TypeSerializer)inSerializer, this.ioManager, new StatusWatermarkValve(numInputChannels), 0, canEmitBatchOfRecords::get);
        VerifyRecordsDataOutput output = new VerifyRecordsDataOutput();
        int expectedElementCount = 5;
        for (i = 0; i < expectedElementCount; ++i) {
            inputGate.sendElement(new StreamRecord((Object)i), random.nextInt(numInputChannels));
        }
        canEmitBatchOfRecords.set(false);
        Assertions.assertThat((Comparable)input.emitNext(output)).isEqualTo((Object)DataInputStatus.MORE_AVAILABLE);
        Assertions.assertThat((int)output.getNumberOfEmittedRecords()).isOne();
        canEmitBatchOfRecords.set(true);
        Assertions.assertThat((Comparable)input.emitNext(output)).isEqualTo((Object)DataInputStatus.NOTHING_AVAILABLE);
        Assertions.assertThat((int)output.getNumberOfEmittedRecords()).isEqualTo(expectedElementCount);
        inputGate.sendEvent((AbstractEvent)EndOfPartitionEvent.INSTANCE, 0);
        for (i = 0; i < expectedElementCount; ++i) {
            inputGate.sendElement(new StreamRecord((Object)i), 1);
        }
        canEmitBatchOfRecords.set(false);
        Assertions.assertThat((Comparable)input.emitNext(output)).isEqualTo((Object)DataInputStatus.MORE_AVAILABLE);
        Assertions.assertThat((int)output.getNumberOfEmittedRecords()).isEqualTo(expectedElementCount);
        canEmitBatchOfRecords.set(true);
        Assertions.assertThat((Comparable)input.emitNext(output)).isEqualTo((Object)DataInputStatus.NOTHING_AVAILABLE);
        Assertions.assertThat((int)output.getNumberOfEmittedRecords()).isEqualTo(expectedElementCount * 2);
    }

    private BufferOrEvent createDataBuffer() throws IOException {
        try (BufferBuilder bufferBuilder = BufferBuilderTestUtils.createEmptyBufferBuilder((int)1000);){
            BufferConsumer bufferConsumer = bufferBuilder.createBufferConsumer();
            this.serializeRecord(42L, bufferBuilder);
            this.serializeRecord(44L, bufferBuilder);
            BufferOrEvent bufferOrEvent = new BufferOrEvent(bufferConsumer.build(), new InputChannelInfo(0, 0));
            return bufferOrEvent;
        }
    }

    private StreamTaskNetworkInput<Long> createStreamTaskNetworkInput(List<BufferOrEvent> buffers) {
        return new StreamTaskNetworkInput(StreamTaskNetworkInputTest.createCheckpointedInputGate((InputGate)new MockInputGate(1, buffers, false)), (TypeSerializer)LongSerializer.INSTANCE, this.ioManager, new StatusWatermarkValve(1), 0, () -> false);
    }

    private static CheckpointedInputGate createCheckpointedInputGate(InputGate inputGate) {
        return new CheckpointedInputGate(inputGate, (CheckpointBarrierHandler)new CheckpointBarrierTracker(1, (CheckpointableTask)new DummyCheckpointInvokable(), (Clock)SystemClock.getInstance(), false), (MailboxExecutor)new SyncMailboxExecutor(), UpstreamRecoveryTracker.forInputGate((InputGate)inputGate));
    }

    private void serializeRecord(long value, BufferBuilder bufferBuilder) throws IOException {
        DataOutputSerializer serializer = new DataOutputSerializer(128);
        SerializationDelegate serializationDelegate = new SerializationDelegate((TypeSerializer)new StreamElementSerializer((TypeSerializer)LongSerializer.INSTANCE));
        serializationDelegate.setInstance((Object)new StreamRecord((Object)value));
        ByteBuffer serializedRecord = RecordWriter.serializeRecord((DataOutputSerializer)serializer, (IOReadableWritable)serializationDelegate);
        bufferBuilder.appendAndCommit(serializedRecord);
        Assertions.assertThat((boolean)bufferBuilder.isFull()).isFalse();
    }

    private static <T> void assertHasNextElement(StreamTaskInput<T> input, PushingAsyncDataInput.DataOutput<T> output) throws Exception {
        Assertions.assertThat((boolean)input.getAvailableFuture().isDone()).isTrue();
        DataInputStatus status = input.emitNext(output);
        Assertions.assertThat((Comparable)status).isEqualTo((Object)DataInputStatus.MORE_AVAILABLE);
    }

    private static class TestStreamTaskNetworkInput
    extends AbstractStreamTaskNetworkInput<Long, TestRecordDeserializer> {
        public TestStreamTaskNetworkInput(StreamTestSingleInputGate<Long> inputGate, LongSerializer inSerializer, int numInputChannels, Map<InputChannelInfo, TestRecordDeserializer> deserializers) {
            super(StreamTaskNetworkInputTest.createCheckpointedInputGate((InputGate)inputGate.getInputGate()), (TypeSerializer)inSerializer, new StatusWatermarkValve(numInputChannels), 0, deserializers, () -> false);
        }

        public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws CheckpointException {
            throw new UnsupportedOperationException();
        }
    }

    private static class VerifyRecordsDataOutput<T>
    extends NoOpDataOutput<T> {
        private int numberOfEmittedRecords;

        private VerifyRecordsDataOutput() {
        }

        @Override
        public void emitRecord(StreamRecord<T> record) {
            ++this.numberOfEmittedRecords;
        }

        int getNumberOfEmittedRecords() {
            return this.numberOfEmittedRecords;
        }
    }

    private static class NoOpDataOutput<T>
    implements PushingAsyncDataInput.DataOutput<T> {
        private NoOpDataOutput() {
        }

        public void emitRecord(StreamRecord<T> record) {
        }

        public void emitWatermark(Watermark watermark) {
        }

        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
        }

        public void emitLatencyMarker(LatencyMarker latencyMarker) {
        }
    }

    private static class TestRecordDeserializer
    extends SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>> {
        private boolean cleared = false;

        public TestRecordDeserializer(String[] tmpDirectories) {
            super(tmpDirectories);
        }

        public void clear() {
            this.cleared = true;
        }

        public boolean isCleared() {
            return this.cleared;
        }
    }
}

