/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.TestingMiniCluster;
import org.apache.flink.runtime.minicluster.TestingMiniClusterConfiguration;
import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.streaming.api.graph.ExecutionPlan;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.SerializedThrowable;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class JobIntermediateDatasetReuseTest {
    private static final Logger LOG = LoggerFactory.getLogger(JobIntermediateDatasetReuseTest.class);

    JobIntermediateDatasetReuseTest() {
    }

    @Test
    void testClusterPartitionReuse() throws Exception {
        this.internalTestClusterPartitionReuse(1, 1, jobResult -> Assertions.assertThat((boolean)jobResult.isSuccess()).isTrue());
    }

    @Test
    void testClusterPartitionReuseMultipleParallelism() throws Exception {
        this.internalTestClusterPartitionReuse(64, 64, jobResult -> Assertions.assertThat((boolean)jobResult.isSuccess()).isTrue());
    }

    @Test
    void testClusterPartitionReuseWithMoreConsumerParallelismThrowException() throws Exception {
        this.internalTestClusterPartitionReuse(1, 2, jobResult -> {
            Assertions.assertThat((boolean)jobResult.isSuccess()).isFalse();
            Assertions.assertThat((Throwable)this.getClusterDatasetCorruptedException((JobResult)jobResult)).isNotNull();
        });
    }

    @Test
    void testClusterPartitionReuseWithLessConsumerParallelismThrowException() throws Exception {
        this.internalTestClusterPartitionReuse(2, 1, jobResult -> {
            Assertions.assertThat((boolean)jobResult.isSuccess()).isFalse();
            Assertions.assertThat((Throwable)this.getClusterDatasetCorruptedException((JobResult)jobResult)).isNotNull();
        });
    }

    private void internalTestClusterPartitionReuse(int producerParallelism, int consumerParallelism, Consumer<JobResult> jobResultVerification) throws Exception {
        TestingMiniClusterConfiguration miniClusterConfiguration = TestingMiniClusterConfiguration.newBuilder().build();
        try (TestingMiniCluster miniCluster = TestingMiniCluster.newBuilder(miniClusterConfiguration).build();){
            miniCluster.start();
            IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
            JobGraph firstJobGraph = this.createFirstJobGraph(producerParallelism, intermediateDataSetID);
            miniCluster.submitJob((ExecutionPlan)firstJobGraph).get();
            CompletableFuture jobResultFuture = miniCluster.requestJobResult(firstJobGraph.getJobID());
            JobResult jobResult = (JobResult)jobResultFuture.get();
            Assertions.assertThat((boolean)jobResult.isSuccess()).isTrue();
            JobGraph secondJobGraph = this.createSecondJobGraph(consumerParallelism, intermediateDataSetID);
            miniCluster.submitJob((ExecutionPlan)secondJobGraph).get();
            jobResultFuture = miniCluster.requestJobResult(secondJobGraph.getJobID());
            jobResult = (JobResult)jobResultFuture.get();
            jobResultVerification.accept(jobResult);
        }
    }

    @Test
    void testClusterPartitionReuseWithTMFail() throws Exception {
        TestingMiniClusterConfiguration miniClusterConfiguration = TestingMiniClusterConfiguration.newBuilder().build();
        try (TestingMiniCluster miniCluster = TestingMiniCluster.newBuilder(miniClusterConfiguration).build();){
            miniCluster.start();
            IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
            JobGraph firstJobGraph = this.createFirstJobGraph(1, intermediateDataSetID);
            miniCluster.submitJob((ExecutionPlan)firstJobGraph).get();
            CompletableFuture jobResultFuture = miniCluster.requestJobResult(firstJobGraph.getJobID());
            JobResult jobResult = (JobResult)jobResultFuture.get();
            Assertions.assertThat((boolean)jobResult.isSuccess()).isTrue();
            miniCluster.terminateTaskManager(0);
            miniCluster.startTaskManager();
            JobGraph secondJobGraph = this.createSecondJobGraph(1, intermediateDataSetID);
            RestartStrategyUtils.configureFixedDelayRestartStrategy(secondJobGraph, 1024, 1000L);
            miniCluster.submitJob((ExecutionPlan)secondJobGraph).get();
            jobResultFuture = miniCluster.requestJobResult(secondJobGraph.getJobID());
            jobResult = (JobResult)jobResultFuture.get();
            Assertions.assertThat((boolean)jobResult.isSuccess()).isFalse();
            ClusterDatasetCorruptedException exception = this.getClusterDatasetCorruptedException(jobResult);
            Assertions.assertThat((Throwable)exception).isNotNull();
            Assertions.assertThat((Comparable)((IntermediateDataSetID)exception.getCorruptedClusterDatasetIds().get(0))).isEqualTo((Object)intermediateDataSetID);
            firstJobGraph.setJobID(new JobID());
            miniCluster.submitJob((ExecutionPlan)firstJobGraph).get();
            jobResultFuture = miniCluster.requestJobResult(firstJobGraph.getJobID());
            jobResult = (JobResult)jobResultFuture.get();
            Assertions.assertThat((boolean)jobResult.isSuccess()).isTrue();
            secondJobGraph.setJobID(new JobID());
            miniCluster.submitJob((ExecutionPlan)secondJobGraph).get();
            jobResultFuture = miniCluster.requestJobResult(secondJobGraph.getJobID());
            jobResult = (JobResult)jobResultFuture.get();
            Assertions.assertThat((boolean)jobResult.isSuccess()).isTrue();
        }
    }

    private ClusterDatasetCorruptedException getClusterDatasetCorruptedException(JobResult jobResult) {
        Assertions.assertThat((boolean)jobResult.getSerializedThrowable().isPresent()).isTrue();
        for (Throwable throwable = ((SerializedThrowable)jobResult.getSerializedThrowable().get()).deserializeError(Thread.currentThread().getContextClassLoader()); throwable != null; throwable = throwable.getCause()) {
            if (!(throwable instanceof ClusterDatasetCorruptedException)) continue;
            return (ClusterDatasetCorruptedException)throwable;
        }
        return null;
    }

    private JobGraph createSecondJobGraph(int parallelism, IntermediateDataSetID intermediateDataSetID) {
        JobVertex receiver = new JobVertex("Receiver 2", null);
        receiver.setParallelism(parallelism);
        receiver.setInvokableClass(Receiver.class);
        receiver.addIntermediateDataSetIdToConsume(intermediateDataSetID);
        return new JobGraph(null, "Second Job", new JobVertex[]{receiver});
    }

    private JobGraph createFirstJobGraph(int parallelism, IntermediateDataSetID intermediateDataSetID) {
        JobVertex sender = new JobVertex("Sender");
        sender.setParallelism(parallelism);
        sender.setInvokableClass(Sender.class);
        JobVertex receiver = new JobVertex("Receiver");
        receiver.setParallelism(parallelism);
        receiver.setInvokableClass(Receiver.class);
        JobVertexConnectionUtils.connectNewDataSetAsInput(receiver, sender, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING_PERSISTENT, intermediateDataSetID, false);
        return new JobGraph(null, "First Job", new JobVertex[]{sender, receiver});
    }

    public static class Receiver
    extends AbstractInvokable {
        public Receiver(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            int index = this.getIndexInSubtaskGroup();
            RecordReader reader = new RecordReader((InputGate)this.getEnvironment().getInputGate(0), IntValue.class, this.getEnvironment().getTaskManagerInfo().getTmpDirectories());
            for (int i = index; i < index + 100; ++i) {
                int value = ((IntValue)reader.next()).getValue();
                LOG.debug("Receiver({}) received {}", (Object)index, (Object)value);
                Assertions.assertThat((int)value).isEqualTo(i);
            }
            Assertions.assertThat((Comparable)((IntValue)reader.next())).isNull();
        }
    }

    public static class Sender
    extends AbstractInvokable {
        public Sender(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            int index = this.getIndexInSubtaskGroup();
            try (RecordWriter writer = new RecordWriterBuilder().build(this.getEnvironment().getWriter(0));){
                for (int i = index; i < index + 100; ++i) {
                    writer.emit((IOReadableWritable)new IntValue(i));
                    LOG.debug("Sender({}) emit {}", (Object)index, (Object)i);
                }
                writer.flushAll();
            }
        }
    }
}

