/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.channel;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializer;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateSerializerImpl;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestDispatcherImpl;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.CheckpointInProgressRequest;
import org.apache.flink.runtime.checkpoint.channel.CheckpointStartRequest;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.ResultSubpartitionInfo;
import org.apache.flink.runtime.checkpoint.channel.SubtaskRegisterRequest;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.CloseableIterator;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
public class ChannelStateWriteRequestDispatcherTest {
    private static final JobID JOB_ID = new JobID();
    private static final JobVertexID JOB_VERTEX_ID = new JobVertexID();
    private static final int SUBTASK_INDEX = 0;
    @Parameter
    public Optional<Class<Exception>> expectedException;
    @Parameter(value=1)
    public List<ChannelStateWriteRequest> requests;
    private static final long CHECKPOINT_ID = 42L;

    @Parameters(name="expectedException={0} requests={1}")
    public static List<Object[]> data() {
        return Arrays.asList({Optional.empty(), Arrays.asList(ChannelStateWriteRequestDispatcherTest.start(), ChannelStateWriteRequestDispatcherTest.completeIn(), ChannelStateWriteRequestDispatcherTest.completeOut())}, {Optional.empty(), Arrays.asList(ChannelStateWriteRequestDispatcherTest.start(), ChannelStateWriteRequestDispatcherTest.writeIn(), ChannelStateWriteRequestDispatcherTest.completeIn())}, {Optional.empty(), Arrays.asList(ChannelStateWriteRequestDispatcherTest.start(), ChannelStateWriteRequestDispatcherTest.writeOut(), ChannelStateWriteRequestDispatcherTest.completeOut())}, {Optional.empty(), Arrays.asList(ChannelStateWriteRequestDispatcherTest.start(), ChannelStateWriteRequestDispatcherTest.writeOutFuture(), ChannelStateWriteRequestDispatcherTest.completeOut())}, {Optional.empty(), Arrays.asList(ChannelStateWriteRequestDispatcherTest.start(), ChannelStateWriteRequestDispatcherTest.completeIn(), ChannelStateWriteRequestDispatcherTest.writeOut())}, {Optional.empty(), Arrays.asList(ChannelStateWriteRequestDispatcherTest.start(), ChannelStateWriteRequestDispatcherTest.completeIn(), ChannelStateWriteRequestDispatcherTest.writeOutFuture())}, {Optional.empty(), Arrays.asList(ChannelStateWriteRequestDispatcherTest.start(), ChannelStateWriteRequestDispatcherTest.completeOut(), ChannelStateWriteRequestDispatcherTest.writeIn())}, {Optional.of(IllegalArgumentException.class), Collections.singletonList(ChannelStateWriteRequestDispatcherTest.writeIn())}, {Optional.of(IllegalArgumentException.class), Collections.singletonList(ChannelStateWriteRequestDispatcherTest.writeOut())}, {Optional.of(IllegalArgumentException.class), Collections.singletonList(ChannelStateWriteRequestDispatcherTest.writeOutFuture())}, {Optional.of(IllegalArgumentException.class), Collections.singletonList(ChannelStateWriteRequestDispatcherTest.completeIn())}, {Optional.of(IllegalArgumentException.class), Collections.singletonList(ChannelStateWriteRequestDispatcherTest.completeOut())}, {Optional.of(IllegalArgumentException.class), Arrays.asList(ChannelStateWriteRequestDispatcherTest.start(), ChannelStateWriteRequestDispatcherTest.completeIn(), ChannelStateWriteRequestDispatcherTest.completeIn())}, {Optional.of(IllegalArgumentException.class), Arrays.asList(ChannelStateWriteRequestDispatcherTest.start(), ChannelStateWriteRequestDispatcherTest.completeOut(), ChannelStateWriteRequestDispatcherTest.completeOut())}, {Optional.of(IllegalStateException.class), Arrays.asList(ChannelStateWriteRequestDispatcherTest.start(), ChannelStateWriteRequestDispatcherTest.completeIn(), ChannelStateWriteRequestDispatcherTest.writeIn())}, {Optional.of(IllegalStateException.class), Arrays.asList(ChannelStateWriteRequestDispatcherTest.start(), ChannelStateWriteRequestDispatcherTest.completeOut(), ChannelStateWriteRequestDispatcherTest.writeOut())}, {Optional.of(IllegalStateException.class), Arrays.asList(ChannelStateWriteRequestDispatcherTest.start(), ChannelStateWriteRequestDispatcherTest.completeOut(), ChannelStateWriteRequestDispatcherTest.writeOutFuture())}, {Optional.of(IllegalStateException.class), Arrays.asList(ChannelStateWriteRequestDispatcherTest.start(), ChannelStateWriteRequestDispatcherTest.start())});
    }

    private static CheckpointInProgressRequest completeOut() {
        return ChannelStateWriteRequest.completeOutput((JobVertexID)JOB_VERTEX_ID, (int)0, (long)42L);
    }

    private static CheckpointInProgressRequest completeIn() {
        return ChannelStateWriteRequest.completeInput((JobVertexID)JOB_VERTEX_ID, (int)0, (long)42L);
    }

    private static ChannelStateWriteRequest writeIn() {
        return ChannelStateWriteRequest.write((JobVertexID)JOB_VERTEX_ID, (int)0, (long)42L, (InputChannelInfo)new InputChannelInfo(1, 1), (CloseableIterator)CloseableIterator.ofElement((Object)new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment((int)1), FreeingBufferRecycler.INSTANCE), Buffer::recycleBuffer));
    }

    private static ChannelStateWriteRequest writeOut() {
        return ChannelStateWriteRequest.write((JobVertexID)JOB_VERTEX_ID, (int)0, (long)42L, (ResultSubpartitionInfo)new ResultSubpartitionInfo(1, 1), (Buffer[])new Buffer[]{new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment((int)1), FreeingBufferRecycler.INSTANCE)});
    }

    private static ChannelStateWriteRequest writeOutFuture() {
        CompletableFuture<List<NetworkBuffer>> outFuture = new CompletableFuture<List<NetworkBuffer>>();
        ChannelStateWriteRequest writeRequest = ChannelStateWriteRequest.write((JobVertexID)JOB_VERTEX_ID, (int)0, (long)42L, (ResultSubpartitionInfo)new ResultSubpartitionInfo(1, 1), outFuture);
        outFuture.complete(Collections.singletonList(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment((int)1), FreeingBufferRecycler.INSTANCE)));
        return writeRequest;
    }

    private static SubtaskRegisterRequest register() {
        return new SubtaskRegisterRequest(JOB_VERTEX_ID, 0);
    }

    private static CheckpointStartRequest start() {
        return new CheckpointStartRequest(JOB_VERTEX_ID, 0, 42L, new ChannelStateWriter.ChannelStateWriteResult(), new CheckpointStorageLocationReference(new byte[]{1}));
    }

    @TestTemplate
    void doRun() {
        ChannelStateWriteRequestDispatcherImpl processor = new ChannelStateWriteRequestDispatcherImpl(() -> new JobManagerCheckpointStorage().createCheckpointStorage(JOB_ID), (ChannelStateSerializer)new ChannelStateSerializerImpl());
        try {
            processor.dispatch((ChannelStateWriteRequest)ChannelStateWriteRequestDispatcherTest.register());
            for (ChannelStateWriteRequest request : this.requests) {
                processor.dispatch(request);
            }
        }
        catch (Throwable t) {
            if (this.expectedException.filter(e -> e.isInstance(t)).isPresent()) {
                return;
            }
            throw new RuntimeException("unexpected exception", t);
        }
        this.expectedException.ifPresent(e -> Assertions.fail((String)("expected exception " + e)));
    }
}

