/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.lifecycle;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.StreamSupport;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.operators.lifecycle.TestJobExecutor;
import org.apache.flink.runtime.operators.lifecycle.TestJobWithDescription;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommand;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher;
import org.apache.flink.runtime.operators.lifecycle.event.CheckpointCompletedEvent;
import org.apache.flink.runtime.operators.lifecycle.graph.TestJobBuilders;
import org.apache.flink.runtime.operators.lifecycle.validation.DrainingValidator;
import org.apache.flink.runtime.operators.lifecycle.validation.FinishingValidator;
import org.apache.flink.runtime.operators.lifecycle.validation.TestJobDataFlowValidator;
import org.apache.flink.runtime.operators.lifecycle.validation.TestOperatorLifecycleValidator;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.ThrowingConsumer;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class PartiallyFinishedSourcesITCase
extends TestLogger {
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    @Rule
    public final SharedObjects sharedObjects = SharedObjects.create();
    private MiniClusterWithClientResource miniClusterResource;
    @Parameterized.Parameter(value=0)
    public TestJobBuilders.TestingGraphBuilder graphBuilder;
    @Parameterized.Parameter(value=1)
    public TestCommandDispatcher.TestCommandScope subtaskScope;
    @Parameterized.Parameter(value=2)
    public boolean failover;
    @Parameterized.Parameter(value=3)
    public String failoverStrategy;

    @Before
    public void init() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, (Object)this.failoverStrategy);
        this.miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());
        this.miniClusterResource.before();
    }

    @After
    public void tearDown() {
        if (this.miniClusterResource != null) {
            this.miniClusterResource.after();
        }
    }

    @Test
    public void test() throws Exception {
        TestJobWithDescription testJob = this.buildJob();
        String finishingOperatorID = testJob.sources.iterator().next();
        JobVertexID finishingVertexID = this.findJobVertexID(testJob, finishingOperatorID);
        TestJobExecutor executor = TestJobExecutor.execute(testJob, this.miniClusterResource).waitForEvent(CheckpointCompletedEvent.class).sendOperatorCommand(finishingOperatorID, TestCommand.FINISH_SOURCES, this.subtaskScope).waitForSubtasksToFinish(finishingVertexID, this.subtaskScope).waitForEvent(CheckpointCompletedEvent.class).waitForEvent(CheckpointCompletedEvent.class);
        if (this.failover) {
            executor.triggerFailover();
        }
        executor.sendBroadcastCommand(TestCommand.FINISH_SOURCES, TestCommandDispatcher.TestCommandScope.ALL_SUBTASKS).waitForTermination().assertFinishedSuccessfully();
        TestOperatorLifecycleValidator.checkOperatorsLifecycle(testJob, new DrainingValidator(), new FinishingValidator());
        TestJobDataFlowValidator.checkDataFlow(testJob);
    }

    private TestJobWithDescription buildJob() throws Exception {
        return this.graphBuilder.build(this.sharedObjects, (ThrowingConsumer<Configuration, Exception>)((ThrowingConsumer)cfg -> cfg.setBoolean(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, true)), (ThrowingConsumer<StreamExecutionEnvironment, Exception>)((ThrowingConsumer)env -> {
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
            env.getCheckpointConfig().setCheckpointTimeout(30000L);
            env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
            env.getCheckpointConfig().setCheckpointStorage(TEMPORARY_FOLDER.newFolder().toURI());
        }));
    }

    private JobVertexID findJobVertexID(TestJobWithDescription testJob, String finishingOperatorID) {
        return StreamSupport.stream(testJob.jobGraph.getVertices().spliterator(), false).filter(v -> v.getOperatorIDs().stream().anyMatch(idPair -> this.matches((OperatorIDPair)idPair, finishingOperatorID))).findAny().orElseThrow(() -> new RuntimeException("Vertex not found: " + finishingOperatorID)).getID();
    }

    private boolean matches(OperatorIDPair idPair, String operatorID) {
        return idPair.getUserDefinedOperatorID().orElse(idPair.getGeneratedOperatorID()).toString().equals(operatorID);
    }

    @Parameterized.Parameters(name="{0} {1}, failover: {2}, strategy: {3}")
    public static List<Object[]> parameters() {
        List<String> failoverStrategies = Arrays.asList("full", "region");
        List<List> rest = Arrays.asList(Arrays.asList(new Object[]{TestJobBuilders.SIMPLE_GRAPH_BUILDER, TestCommandDispatcher.TestCommandScope.SINGLE_SUBTASK, true}), Arrays.asList(new Object[]{TestJobBuilders.COMPLEX_GRAPH_BUILDER, TestCommandDispatcher.TestCommandScope.SINGLE_SUBTASK, true}), Arrays.asList(new Object[]{TestJobBuilders.COMPLEX_GRAPH_BUILDER, TestCommandDispatcher.TestCommandScope.ALL_SUBTASKS, true}), Arrays.asList(new Object[]{TestJobBuilders.SIMPLE_GRAPH_BUILDER, TestCommandDispatcher.TestCommandScope.SINGLE_SUBTASK, false}), Arrays.asList(new Object[]{TestJobBuilders.COMPLEX_GRAPH_BUILDER, TestCommandDispatcher.TestCommandScope.SINGLE_SUBTASK, false}), Arrays.asList(new Object[]{TestJobBuilders.COMPLEX_GRAPH_BUILDER, TestCommandDispatcher.TestCommandScope.ALL_SUBTASKS, false}));
        ArrayList<Object[]> result = new ArrayList<Object[]>();
        for (String failoverStrategy : failoverStrategies) {
            for (List otherParams : rest) {
                ArrayList<String> fullList = new ArrayList<String>(otherParams);
                fullList.add(failoverStrategy);
                result.add(fullList.toArray());
            }
        }
        return result;
    }
}

