/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.FinishedOperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.FullyFinishedOperatorState;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.VertexFinishedStateChecker;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.JobVertexConnectionUtils;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.FlinkRuntimeException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class VertexFinishedStateCheckerTest {
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();

    VertexFinishedStateCheckerTest() {
    }

    @Test
    void testRestoringPartiallyFinishedChainsFailsWithoutUidHash() throws Exception {
        this.testRestoringPartiallyFinishedChainsFails(false);
    }

    @Test
    void testRestoringPartiallyFinishedChainsFailsWithUidHash() throws Exception {
        this.testRestoringPartiallyFinishedChainsFails(true);
    }

    private void testRestoringPartiallyFinishedChainsFails(boolean useUidHash) throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        OperatorIDPair op1 = OperatorIDPair.of((OperatorID)new OperatorID(), (OperatorID)new OperatorID(), (String)"operatorName", (String)"operatorUid");
        OperatorIDPair op2 = OperatorIDPair.generatedIDOnly((OperatorID)new OperatorID());
        OperatorIDPair op3 = OperatorIDPair.generatedIDOnly((OperatorID)new OperatorID());
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID2, 1, 1, Collections.singletonList(op3), true).addJobVertex(jobVertexID1, 1, 1, Arrays.asList(op1, op2), true).build((ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor());
        HashMap<OperatorID, Object> operatorStates = new HashMap<OperatorID, Object>();
        operatorStates.put(useUidHash ? (OperatorID)op1.getUserDefinedOperatorID().get() : op1.getGeneratedOperatorID(), new FullyFinishedOperatorState(null, null, op1.getGeneratedOperatorID(), 1, 1));
        operatorStates.put(op2.getGeneratedOperatorID(), new OperatorState(null, null, op2.getGeneratedOperatorID(), 1, 1));
        HashSet<ExecutionJobVertex> vertices = new HashSet<ExecutionJobVertex>();
        vertices.add(graph.getJobVertex(jobVertexID1));
        VertexFinishedStateChecker finishedStateChecker = new VertexFinishedStateChecker(vertices, operatorStates);
        Assertions.assertThatThrownBy(() -> ((VertexFinishedStateChecker)finishedStateChecker).validateOperatorsFinishedState()).hasMessage("Can not restore vertex anon(" + jobVertexID1 + ") which contain mixed operator finished state: [ALL_RUNNING, FULLY_FINISHED]").isInstanceOf(FlinkRuntimeException.class);
    }

    @Test
    void testAddingRunningOperatorBeforeFinishedOneFails() throws Exception {
        JobVertexID jobVertexID2 = new JobVertexID();
        this.testAddingOperatorsBeforePartiallyOrFullyFinishedOne(new JobVertexID(), "vert1", VertexFinishedStateChecker.VertexFinishedState.ALL_RUNNING, jobVertexID2, "vert2", VertexFinishedStateChecker.VertexFinishedState.FULLY_FINISHED, new DistributionPattern[]{DistributionPattern.ALL_TO_ALL}, FlinkRuntimeException.class, "Illegal JobGraph modification. Cannot run a program with fully finished vertices predeceased with the ones not fully finished. Task vertex vert2(" + jobVertexID2 + ") has a predecessor not fully finished");
    }

    @Test
    void testAddingPartiallyFinishedOperatorBeforeFinishedOneFails() throws Exception {
        JobVertexID jobVertexID2 = new JobVertexID();
        this.testAddingOperatorsBeforePartiallyOrFullyFinishedOne(new JobVertexID(), "vert1", VertexFinishedStateChecker.VertexFinishedState.PARTIALLY_FINISHED, jobVertexID2, "vert2", VertexFinishedStateChecker.VertexFinishedState.FULLY_FINISHED, new DistributionPattern[]{DistributionPattern.ALL_TO_ALL}, FlinkRuntimeException.class, "Illegal JobGraph modification. Cannot run a program with fully finished vertices predeceased with the ones not fully finished. Task vertex vert2(" + jobVertexID2 + ") has a predecessor not fully finished");
    }

    @Test
    void testAddingAllRunningOperatorBeforePartiallyFinishedOneWithAllToAllFails() throws Exception {
        JobVertexID jobVertexID2 = new JobVertexID();
        this.testAddingOperatorsBeforePartiallyOrFullyFinishedOne(new JobVertexID(), "vert1", VertexFinishedStateChecker.VertexFinishedState.ALL_RUNNING, jobVertexID2, "vert2", VertexFinishedStateChecker.VertexFinishedState.PARTIALLY_FINISHED, new DistributionPattern[]{DistributionPattern.ALL_TO_ALL}, FlinkRuntimeException.class, "Illegal JobGraph modification. Cannot run a program with partially finished vertices predeceased with running or partially finished ones and connected via the ALL_TO_ALL edges. Task vertex vert2(" + jobVertexID2 + ") has a all running predecessor");
    }

    @Test
    void testAddingPartiallyFinishedOperatorBeforePartiallyFinishedOneWithAllToAllFails() throws Exception {
        JobVertexID jobVertexID2 = new JobVertexID();
        this.testAddingOperatorsBeforePartiallyOrFullyFinishedOne(new JobVertexID(), "vert1", VertexFinishedStateChecker.VertexFinishedState.PARTIALLY_FINISHED, jobVertexID2, "vert2", VertexFinishedStateChecker.VertexFinishedState.PARTIALLY_FINISHED, new DistributionPattern[]{DistributionPattern.ALL_TO_ALL}, FlinkRuntimeException.class, "Illegal JobGraph modification. Cannot run a program with partially finished vertices predeceased with running or partially finished ones and connected via the ALL_TO_ALL edges. Task vertex vert2(" + jobVertexID2 + ") has a partially finished predecessor");
    }

    @Test
    void testAddingPartiallyFinishedOperatorBeforePartiallyFinishedOneWithPointwiseAndAllToAllFails() throws Exception {
        JobVertexID jobVertexID2 = new JobVertexID();
        this.testAddingOperatorsBeforePartiallyOrFullyFinishedOne(new JobVertexID(), "vert1", VertexFinishedStateChecker.VertexFinishedState.PARTIALLY_FINISHED, jobVertexID2, "vert2", VertexFinishedStateChecker.VertexFinishedState.PARTIALLY_FINISHED, new DistributionPattern[]{DistributionPattern.POINTWISE, DistributionPattern.ALL_TO_ALL}, FlinkRuntimeException.class, "Illegal JobGraph modification. Cannot run a program with partially finished vertices predeceased with running or partially finished ones and connected via the ALL_TO_ALL edges. Task vertex vert2(" + jobVertexID2 + ") has a partially finished predecessor");
    }

    @Test
    void testAddingAllRunningOperatorBeforePartiallyFinishedOneFails() throws Exception {
        JobVertexID jobVertexID2 = new JobVertexID();
        this.testAddingOperatorsBeforePartiallyOrFullyFinishedOne(new JobVertexID(), "vert1", VertexFinishedStateChecker.VertexFinishedState.ALL_RUNNING, jobVertexID2, "vert2", VertexFinishedStateChecker.VertexFinishedState.PARTIALLY_FINISHED, new DistributionPattern[]{DistributionPattern.POINTWISE}, FlinkRuntimeException.class, "Illegal JobGraph modification. Cannot run a program with partially finished vertices predeceased with all running ones. Task vertex vert2(" + jobVertexID2 + ") has a all running predecessor");
    }

    private void testAddingOperatorsBeforePartiallyOrFullyFinishedOne(JobVertexID firstVertexId, String firstVertexName, VertexFinishedStateChecker.VertexFinishedState firstOperatorFinishedState, JobVertexID secondVertexId, String secondVertexName, VertexFinishedStateChecker.VertexFinishedState secondOperatorFinishedState, DistributionPattern[] distributionPatterns, Class<? extends Throwable> expectedExceptionalClass, String expectedMessage) throws Exception {
        OperatorIDPair op1 = OperatorIDPair.generatedIDOnly((OperatorID)new OperatorID());
        OperatorIDPair op2 = OperatorIDPair.generatedIDOnly((OperatorID)new OperatorID());
        JobVertex vertex1 = new JobVertex(firstVertexName, firstVertexId, Collections.singletonList(op1));
        JobVertex vertex2 = new JobVertex(secondVertexName, secondVertexId, Collections.singletonList(op2));
        vertex1.setInvokableClass(NoOpInvokable.class);
        vertex2.setInvokableClass(NoOpInvokable.class);
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(vertex1, true).addJobVertex(vertex2, false).setDistributionPattern(distributionPatterns[0]).build((ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor());
        for (int i = 1; i < distributionPatterns.length; ++i) {
            JobVertexConnectionUtils.connectNewDataSetAsInput(vertex2, vertex1, distributionPatterns[i], ResultPartitionType.PIPELINED);
        }
        HashMap<OperatorID, OperatorState> operatorStates = new HashMap<OperatorID, OperatorState>();
        operatorStates.put(op1.getGeneratedOperatorID(), this.createOperatorState(op1.getGeneratedOperatorID(), firstOperatorFinishedState));
        operatorStates.put(op2.getGeneratedOperatorID(), this.createOperatorState(op2.getGeneratedOperatorID(), secondOperatorFinishedState));
        HashSet<ExecutionJobVertex> vertices = new HashSet<ExecutionJobVertex>();
        vertices.add(graph.getJobVertex(vertex1.getID()));
        vertices.add(graph.getJobVertex(vertex2.getID()));
        VertexFinishedStateChecker finishedStateChecker = new VertexFinishedStateChecker(vertices, operatorStates);
        Assertions.assertThatThrownBy(() -> ((VertexFinishedStateChecker)finishedStateChecker).validateOperatorsFinishedState()).hasMessage(expectedMessage).isInstanceOf(expectedExceptionalClass);
    }

    private OperatorState createOperatorState(OperatorID operatorId, VertexFinishedStateChecker.VertexFinishedState finishedState) {
        switch (finishedState) {
            case ALL_RUNNING: {
                return new OperatorState(null, null, operatorId, 2, 2);
            }
            case PARTIALLY_FINISHED: {
                OperatorState operatorState = new OperatorState(null, null, operatorId, 2, 2);
                operatorState.putState(0, (OperatorSubtaskState)FinishedOperatorSubtaskState.INSTANCE);
                return operatorState;
            }
            case FULLY_FINISHED: {
                return new FullyFinishedOperatorState(null, null, operatorId, 2, 2);
            }
        }
        throw new UnsupportedOperationException("Not supported finished state: " + finishedState);
    }
}

