/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph.failover.flip1;

import java.util.HashSet;
import java.util.Iterator;
import java.util.stream.Stream;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.failover.flip1.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.flip1.ResultPartitionAvailabilityChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.PartitionConnectionException;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
import org.apache.flink.runtime.topology.Vertex;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Test;

public class RestartPipelinedRegionFailoverStrategyTest
extends TestLogger {
    @Test
    public void testRegionFailoverForRegionInternalErrors() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex v1 = topology.newExecutionVertex(ExecutionState.FINISHED);
        TestingSchedulingExecutionVertex v2 = topology.newExecutionVertex(ExecutionState.FINISHED);
        TestingSchedulingExecutionVertex v3 = topology.newExecutionVertex(ExecutionState.FINISHED);
        TestingSchedulingExecutionVertex v4 = topology.newExecutionVertex(ExecutionState.FINISHED);
        TestingSchedulingExecutionVertex v5 = topology.newExecutionVertex(ExecutionState.SCHEDULED);
        TestingSchedulingExecutionVertex v6 = topology.newExecutionVertex(ExecutionState.RUNNING);
        topology.connect(v1, v4, ResultPartitionType.BLOCKING);
        topology.connect(v1, v5, ResultPartitionType.BLOCKING);
        topology.connect(v2, v4, ResultPartitionType.BLOCKING);
        topology.connect(v2, v5, ResultPartitionType.BLOCKING);
        topology.connect(v3, v6, ResultPartitionType.BLOCKING);
        RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy((SchedulingTopology)topology);
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v1).restarts(new SchedulingExecutionVertex[]{v1, v4, v5});
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v2).restarts(new SchedulingExecutionVertex[]{v2, v4, v5});
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v3).restarts(new SchedulingExecutionVertex[]{v3, v6});
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v4).restarts(new SchedulingExecutionVertex[]{v4});
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v5).restarts(new SchedulingExecutionVertex[]{v5});
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v6).restarts(new SchedulingExecutionVertex[]{v6});
    }

    @Test
    public void testRegionFailoverForDataConsumptionErrors() throws Exception {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex v1 = topology.newExecutionVertex(ExecutionState.FINISHED);
        TestingSchedulingExecutionVertex v2 = topology.newExecutionVertex(ExecutionState.FINISHED);
        TestingSchedulingExecutionVertex v3 = topology.newExecutionVertex(ExecutionState.FINISHED);
        TestingSchedulingExecutionVertex v4 = topology.newExecutionVertex(ExecutionState.RUNNING);
        TestingSchedulingExecutionVertex v5 = topology.newExecutionVertex(ExecutionState.RUNNING);
        TestingSchedulingExecutionVertex v6 = topology.newExecutionVertex(ExecutionState.RUNNING);
        topology.connect(v1, v4, ResultPartitionType.BLOCKING);
        topology.connect(v1, v5, ResultPartitionType.BLOCKING);
        topology.connect(v2, v4, ResultPartitionType.BLOCKING);
        topology.connect(v2, v5, ResultPartitionType.BLOCKING);
        topology.connect(v3, v6, ResultPartitionType.BLOCKING);
        RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy((SchedulingTopology)topology);
        Iterator<TestingSchedulingResultPartition> v4InputEdgeIterator = v4.getConsumedResults().iterator();
        TestingSchedulingResultPartition v1out = v4InputEdgeIterator.next();
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v4).partitionConnectionCause(v1out).restarts(new SchedulingExecutionVertex[]{v1, v4, v5});
        TestingSchedulingResultPartition v2out = v4InputEdgeIterator.next();
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v4).partitionConnectionCause(v2out).restarts(new SchedulingExecutionVertex[]{v2, v4, v5});
        Iterator<TestingSchedulingResultPartition> v5InputEdgeIterator = v5.getConsumedResults().iterator();
        v1out = v5InputEdgeIterator.next();
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v5).partitionConnectionCause(v1out).restarts(new SchedulingExecutionVertex[]{v1, v4, v5});
        v2out = v5InputEdgeIterator.next();
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v5).partitionConnectionCause(v2out).restarts(new SchedulingExecutionVertex[]{v2, v4, v5});
        TestingSchedulingResultPartition v3out = v6.getConsumedResults().iterator().next();
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v6).partitionConnectionCause(v3out).restarts(new SchedulingExecutionVertex[]{v3, v6});
    }

    @Test
    public void testRegionFailoverForVariousResultPartitionAvailabilityCombinations() throws Exception {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex v1 = topology.newExecutionVertex(ExecutionState.FINISHED);
        TestingSchedulingExecutionVertex v2 = topology.newExecutionVertex(ExecutionState.FINISHED);
        TestingSchedulingExecutionVertex v3 = topology.newExecutionVertex(ExecutionState.RUNNING);
        topology.connect(v1, v3, ResultPartitionType.BLOCKING);
        topology.connect(v2, v3, ResultPartitionType.BLOCKING);
        TestResultPartitionAvailabilityChecker availabilityChecker = new TestResultPartitionAvailabilityChecker();
        RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy((SchedulingTopology)topology, (ResultPartitionAvailabilityChecker)availabilityChecker);
        IntermediateResultPartitionID rp1ID = v1.getProducedResults().iterator().next().getId();
        IntermediateResultPartitionID rp2ID = v2.getProducedResults().iterator().next().getId();
        availabilityChecker.failedPartitions.clear();
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v1).restarts(new SchedulingExecutionVertex[]{v1, v3});
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v2).restarts(new SchedulingExecutionVertex[]{v2, v3});
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v3).restarts(new SchedulingExecutionVertex[]{v3});
        availabilityChecker.failedPartitions.clear();
        availabilityChecker.markResultPartitionFailed(rp1ID);
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v1).restarts(new SchedulingExecutionVertex[]{v1, v3});
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v2).restarts(new SchedulingExecutionVertex[]{v1, v2, v3});
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v3).restarts(new SchedulingExecutionVertex[]{v1, v3});
        availabilityChecker.failedPartitions.clear();
        availabilityChecker.markResultPartitionFailed(rp2ID);
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v1).restarts(new SchedulingExecutionVertex[]{v1, v2, v3});
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v2).restarts(new SchedulingExecutionVertex[]{v2, v3});
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v3).restarts(new SchedulingExecutionVertex[]{v2, v3});
        availabilityChecker.failedPartitions.clear();
        availabilityChecker.markResultPartitionFailed(rp1ID);
        availabilityChecker.markResultPartitionFailed(rp2ID);
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v1).restarts(new SchedulingExecutionVertex[]{v1, v2, v3});
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v2).restarts(new SchedulingExecutionVertex[]{v1, v2, v3});
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v3).restarts(new SchedulingExecutionVertex[]{v1, v2, v3});
    }

    @Test
    public void testRegionFailoverForMultipleVerticesRegions() throws Exception {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex v1 = topology.newExecutionVertex(ExecutionState.FINISHED);
        TestingSchedulingExecutionVertex v2 = topology.newExecutionVertex(ExecutionState.FINISHED);
        TestingSchedulingExecutionVertex v3 = topology.newExecutionVertex(ExecutionState.RUNNING);
        TestingSchedulingExecutionVertex v4 = topology.newExecutionVertex(ExecutionState.RUNNING);
        TestingSchedulingExecutionVertex v5 = topology.newExecutionVertex(ExecutionState.FAILED);
        TestingSchedulingExecutionVertex v6 = topology.newExecutionVertex(ExecutionState.CANCELED);
        topology.connect(v1, v2, ResultPartitionType.PIPELINED);
        topology.connect(v2, v3, ResultPartitionType.BLOCKING);
        topology.connect(v3, v4, ResultPartitionType.PIPELINED);
        topology.connect(v4, v5, ResultPartitionType.BLOCKING);
        topology.connect(v5, v6, ResultPartitionType.PIPELINED);
        RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy((SchedulingTopology)topology);
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v3).restarts(new SchedulingExecutionVertex[]{v3, v4, v5, v6});
        TestingSchedulingResultPartition v2out = v3.getConsumedResults().iterator().next();
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v3).partitionConnectionCause(v2out).restarts(new SchedulingExecutionVertex[]{v1, v2, v3, v4, v5, v6});
    }

    @Test
    public void testRegionFailoverDoesNotRestartCreatedExecutions() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex v1 = topology.newExecutionVertex(ExecutionState.CREATED);
        TestingSchedulingExecutionVertex v2 = topology.newExecutionVertex(ExecutionState.CREATED);
        topology.connect(v1, v2, ResultPartitionType.BLOCKING);
        RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy((SchedulingTopology)topology);
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v2).restarts(new SchedulingExecutionVertex[0]);
        TestingSchedulingResultPartition v1out = v2.getConsumedResults().iterator().next();
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v2).partitionConnectionCause(v1out).restarts(new SchedulingExecutionVertex[0]);
    }

    @Test
    public void testRegionFailoverForPipelinedApproximate() {
        TestingSchedulingTopology topology = new TestingSchedulingTopology();
        TestingSchedulingExecutionVertex v1 = topology.newExecutionVertex(ExecutionState.RUNNING);
        TestingSchedulingExecutionVertex v2 = topology.newExecutionVertex(ExecutionState.RUNNING);
        TestingSchedulingExecutionVertex v3 = topology.newExecutionVertex(ExecutionState.RUNNING);
        TestingSchedulingExecutionVertex v4 = topology.newExecutionVertex(ExecutionState.RUNNING);
        topology.connect(v1, v2, ResultPartitionType.PIPELINED_APPROXIMATE);
        topology.connect(v1, v3, ResultPartitionType.PIPELINED_APPROXIMATE);
        topology.connect(v2, v4, ResultPartitionType.PIPELINED_APPROXIMATE);
        topology.connect(v3, v4, ResultPartitionType.PIPELINED_APPROXIMATE);
        RestartPipelinedRegionFailoverStrategy strategy = new RestartPipelinedRegionFailoverStrategy((SchedulingTopology)topology);
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v1).restarts(new SchedulingExecutionVertex[]{v1, v2, v3, v4});
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v2).restarts(new SchedulingExecutionVertex[]{v2, v4});
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v3).restarts(new SchedulingExecutionVertex[]{v3, v4});
        RestartPipelinedRegionFailoverStrategyTest.verifyThatFailedExecution((FailoverStrategy)strategy, v4).restarts(new SchedulingExecutionVertex[]{v4});
    }

    private static VerificationContext verifyThatFailedExecution(FailoverStrategy strategy, SchedulingExecutionVertex executionVertex) {
        return new VerificationContext(strategy, executionVertex);
    }

    private static class TestResultPartitionAvailabilityChecker
    implements ResultPartitionAvailabilityChecker {
        private final HashSet<IntermediateResultPartitionID> failedPartitions = new HashSet();

        public boolean isAvailable(IntermediateResultPartitionID resultPartitionID) {
            return !this.failedPartitions.contains(resultPartitionID);
        }

        public void markResultPartitionFailed(IntermediateResultPartitionID resultPartitionID) {
            this.failedPartitions.add(resultPartitionID);
        }

        public void removeResultPartitionFromFailedState(IntermediateResultPartitionID resultPartitionID) {
            this.failedPartitions.remove(resultPartitionID);
        }
    }

    private static class VerificationContext {
        private final FailoverStrategy strategy;
        private final SchedulingExecutionVertex executionVertex;
        private Throwable cause = new Exception("Test failure");

        private VerificationContext(FailoverStrategy strategy, SchedulingExecutionVertex executionVertex) {
            this.strategy = strategy;
            this.executionVertex = executionVertex;
        }

        private VerificationContext partitionConnectionCause(SchedulingResultPartition failedProducer) {
            return this.cause((Throwable)new PartitionConnectionException(new ResultPartitionID((IntermediateResultPartitionID)failedProducer.getId(), new ExecutionAttemptID()), (Throwable)new Exception("Test failure")));
        }

        private VerificationContext cause(Throwable cause) {
            this.cause = cause;
            return this;
        }

        private void restarts(SchedulingExecutionVertex ... expectedResult) {
            MatcherAssert.assertThat((Object)this.strategy.getTasksNeedingRestart((ExecutionVertexID)this.executionVertex.getId(), this.cause), (Matcher)Matchers.containsInAnyOrder((Object[])Stream.of(expectedResult).map(Vertex::getId).toArray()));
        }
    }
}

