/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecution;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.SpeculativeExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.SpeculativeExecutionVertex;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.scheduler.InternalFailuresListener;
import org.apache.flink.runtime.scheduler.TestingInternalFailuresListener;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class SpeculativeExecutionVertexTest {
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private TestingInternalFailuresListener internalFailuresListener;

    SpeculativeExecutionVertexTest() {
    }

    @BeforeEach
    void setUp() {
        this.internalFailuresListener = new TestingInternalFailuresListener();
    }

    @Test
    void testCreateSpeculativeExecution() throws Exception {
        SpeculativeExecutionVertex ev = this.createSpeculativeExecutionVertex();
        Assertions.assertThat((Collection)ev.getCurrentExecutions()).hasSize(1);
        ev.createNewSpeculativeExecution(System.currentTimeMillis());
        Assertions.assertThat((Collection)ev.getCurrentExecutions()).hasSize(2);
    }

    @Test
    void testResetExecutionVertex() throws Exception {
        SpeculativeExecutionVertex ev = this.createSpeculativeExecutionVertex();
        Execution e1 = ev.getCurrentExecutionAttempt();
        Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
        e1.transitionState(ExecutionState.RUNNING);
        e1.markFinished();
        e2.cancel();
        ev.resetForNewExecution();
        Assertions.assertThat((Object)((ArchivedExecution)ev.getExecutionHistory().getHistoricalExecution(0).get()).getAttemptId()).isEqualTo((Object)e1.getAttemptId());
        Assertions.assertThat((Object)((ArchivedExecution)ev.getExecutionHistory().getHistoricalExecution(1).get()).getAttemptId()).isEqualTo((Object)e2.getAttemptId());
        Assertions.assertThat((Collection)ev.getCurrentExecutions()).hasSize(1);
        Assertions.assertThat((int)ev.getCurrentExecutionAttempt().getAttemptNumber()).isEqualTo(2);
    }

    @Test
    void testCancel() throws Exception {
        SpeculativeExecutionVertex ev = this.createSpeculativeExecutionVertex();
        Execution e1 = ev.getCurrentExecutionAttempt();
        Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
        ev.cancel();
        Assertions.assertThat((Comparable)e1.getState()).isSameAs((Object)ExecutionState.CANCELED);
        Assertions.assertThat((Comparable)e2.getState()).isSameAs((Object)ExecutionState.CANCELED);
    }

    @Test
    void testSuspend() throws Exception {
        SpeculativeExecutionVertex ev = this.createSpeculativeExecutionVertex();
        Execution e1 = ev.getCurrentExecutionAttempt();
        Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
        ev.suspend();
        Assertions.assertThat((Comparable)e1.getState()).isSameAs((Object)ExecutionState.CANCELED);
        Assertions.assertThat((Comparable)e2.getState()).isSameAs((Object)ExecutionState.CANCELED);
    }

    @Test
    void testFail() throws Exception {
        SpeculativeExecutionVertex ev = this.createSpeculativeExecutionVertex();
        Execution e1 = ev.getCurrentExecutionAttempt();
        Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
        ev.fail((Throwable)new Exception("Forced test failure."));
        Assertions.assertThat(this.internalFailuresListener.getFailedTasks()).containsExactly((Object[])new ExecutionAttemptID[]{e1.getAttemptId(), e2.getAttemptId()});
    }

    @Test
    void testMarkFailed() throws Exception {
        SpeculativeExecutionVertex ev = this.createSpeculativeExecutionVertex();
        Execution e1 = ev.getCurrentExecutionAttempt();
        Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
        ev.markFailed((Throwable)new Exception("Forced test failure."));
        Assertions.assertThat(this.internalFailuresListener.getFailedTasks()).containsExactly((Object[])new ExecutionAttemptID[]{e1.getAttemptId(), e2.getAttemptId()});
    }

    @Test
    void testVertexTerminationAndJobTermination() throws Exception {
        JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
        ExecutionGraph eg = this.createExecutionGraph(jobGraph);
        eg.transitionToRunning();
        SpeculativeExecutionVertex ev = (SpeculativeExecutionVertex)eg.getJobVertex(jobVertex.getID()).getTaskVertices()[0];
        Execution e1 = ev.getCurrentExecutionAttempt();
        Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis());
        CompletableFuture terminationFuture = ev.getTerminationFuture();
        e1.transitionState(ExecutionState.RUNNING);
        e1.markFinished();
        Assertions.assertThat((CompletableFuture)terminationFuture).isNotDone();
        Assertions.assertThat((Comparable)eg.getState()).isSameAs((Object)JobStatus.RUNNING);
        e2.cancel();
        Assertions.assertThat((CompletableFuture)terminationFuture).isDone();
        Assertions.assertThat((Comparable)eg.getState()).isSameAs((Object)JobStatus.FINISHED);
    }

    @Test
    void testArchiveFailedExecutions() throws Exception {
        SpeculativeExecutionVertex ev = this.createSpeculativeExecutionVertex();
        Execution e1 = ev.getCurrentExecutionAttempt();
        e1.transitionState(ExecutionState.RUNNING);
        Execution e2 = ev.createNewSpeculativeExecution(0L);
        e2.transitionState(ExecutionState.FAILED);
        ev.archiveFailedExecution(e2.getAttemptId());
        Assertions.assertThat((Collection)ev.getCurrentExecutions()).hasSize(1);
        Assertions.assertThat((Object)ev.currentExecution).isSameAs((Object)e1);
        Execution e3 = ev.createNewSpeculativeExecution(0L);
        e3.transitionState(ExecutionState.RUNNING);
        e1.transitionState(ExecutionState.FAILED);
        ev.archiveFailedExecution(e1.getAttemptId());
        Assertions.assertThat((Collection)ev.getCurrentExecutions()).hasSize(1);
        Assertions.assertThat((Object)ev.currentExecution).isSameAs((Object)e3);
    }

    @Test
    void testArchiveTheOnlyCurrentExecution() throws Exception {
        SpeculativeExecutionVertex ev = this.createSpeculativeExecutionVertex();
        Execution e1 = ev.getCurrentExecutionAttempt();
        e1.transitionState(ExecutionState.FAILED);
        ev.archiveFailedExecution(e1.getAttemptId());
        Assertions.assertThat((Collection)ev.getCurrentExecutions()).hasSize(1);
        Assertions.assertThat((Object)ev.currentExecution).isSameAs((Object)e1);
    }

    @Test
    void testArchiveNonFailedExecutionWithArchiveFailedExecutionMethod() {
        org.junit.jupiter.api.Assertions.assertThrows(IllegalStateException.class, () -> {
            SpeculativeExecutionVertex ev = this.createSpeculativeExecutionVertex();
            Execution e1 = ev.getCurrentExecutionAttempt();
            e1.transitionState(ExecutionState.FAILED);
            Execution e2 = ev.createNewSpeculativeExecution(0L);
            e2.transitionState(ExecutionState.RUNNING);
            ev.archiveFailedExecution(e2.getAttemptId());
        });
    }

    @Test
    void testGetExecutionState() throws Exception {
        SpeculativeExecutionVertex ev = this.createSpeculativeExecutionVertex();
        Execution e1 = ev.getCurrentExecutionAttempt();
        e1.transitionState(ExecutionState.CANCELED);
        Assertions.assertThat((Comparable)ev.getExecutionState()).isSameAs((Object)ExecutionState.CANCELED);
        ArrayList<ExecutionState> statesSortedByPriority = new ArrayList<ExecutionState>();
        statesSortedByPriority.add(ExecutionState.FAILED);
        statesSortedByPriority.add(ExecutionState.CANCELING);
        statesSortedByPriority.add(ExecutionState.CREATED);
        statesSortedByPriority.add(ExecutionState.SCHEDULED);
        statesSortedByPriority.add(ExecutionState.DEPLOYING);
        statesSortedByPriority.add(ExecutionState.INITIALIZING);
        statesSortedByPriority.add(ExecutionState.RUNNING);
        statesSortedByPriority.add(ExecutionState.FINISHED);
        for (ExecutionState state : statesSortedByPriority) {
            Execution execution = ev.createNewSpeculativeExecution(0L);
            execution.transitionState(state);
            Assertions.assertThat((Comparable)ev.getExecutionState()).isSameAs((Object)state);
        }
    }

    @Test
    void testGetNextInputSplit() throws Exception {
        TestInputSource source = new TestInputSource();
        JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1);
        jobVertex.setInputSplitSource((InputSplitSource)source);
        SpeculativeExecutionVertex ev = this.createSpeculativeExecutionVertex(jobVertex);
        int numExecutions = 3;
        for (int i = 0; i < 2; ++i) {
            ev.createNewSpeculativeExecution(0L);
        }
        ArrayList executions = new ArrayList(ev.getCurrentExecutions());
        HashMap<Integer, List> splitsOfAttempts = new HashMap<Integer, List>();
        Random rand = new Random();
        while (executions.size() > 0) {
            int index = rand.nextInt(executions.size());
            Execution execution = (Execution)executions.get(index);
            Optional split = execution.getNextInputSplit();
            if (split.isPresent()) {
                splitsOfAttempts.computeIfAbsent(execution.getAttemptNumber(), k -> new ArrayList()).add(split.get());
                continue;
            }
            executions.remove(index);
        }
        Assertions.assertThat(splitsOfAttempts).hasSize(3);
        Assertions.assertThat((List)((List)splitsOfAttempts.get(0))).containsExactlyInAnyOrder((Object[])source.splits);
        Assertions.assertThat((List)((List)splitsOfAttempts.get(1))).isEqualTo(splitsOfAttempts.get(0));
        Assertions.assertThat((List)((List)splitsOfAttempts.get(2))).isEqualTo(splitsOfAttempts.get(0));
    }

    private SpeculativeExecutionVertex createSpeculativeExecutionVertex() throws Exception {
        return this.createSpeculativeExecutionVertex(ExecutionGraphTestUtils.createNoOpVertex(1));
    }

    private SpeculativeExecutionVertex createSpeculativeExecutionVertex(JobVertex jobVertex) throws Exception {
        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
        ExecutionGraph executionGraph = this.createExecutionGraph(jobGraph);
        return (SpeculativeExecutionVertex)executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0];
    }

    private ExecutionGraph createExecutionGraph(JobGraph jobGraph) throws Exception {
        DefaultExecutionGraph executionGraph = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setExecutionJobVertexFactory((ExecutionJobVertex.Factory)new SpeculativeExecutionJobVertex.Factory()).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        executionGraph.setInternalTaskFailuresListener((InternalFailuresListener)this.internalFailuresListener);
        executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        return executionGraph;
    }

    private class TestInputSource
    extends GenericInputFormat<Integer> {
        private GenericInputSplit[] splits;

        private TestInputSource() {
        }

        public GenericInputSplit[] createInputSplits(int numSplitsHint) {
            int numSplits = numSplitsHint * 10;
            this.splits = new GenericInputSplit[numSplits];
            for (int i = 0; i < numSplits; ++i) {
                this.splits[i] = new GenericInputSplit(i, numSplits);
            }
            return this.splits;
        }

        public boolean reachedEnd() throws IOException {
            return false;
        }

        public Integer nextRecord(Integer reuse) throws IOException {
            return null;
        }
    }
}

