/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.test.checkpointing.utils.AccumulatingIntegerSink;
import org.apache.flink.test.checkpointing.utils.CancellingIntegerSource;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class UnalignedCheckpointCompatibilityITCase
extends TestLogger {
    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final int TOTAL_ELEMENTS = 20;
    private static final int FIRST_RUN_EL_COUNT = 10;
    private static final int FIRST_RUN_BACKPRESSURE_MS = 100;
    private static final int PARALLELISM = 1;
    private final boolean startAligned;
    private final CheckpointType type;
    private static MiniClusterWithClientResource miniCluster;

    @Parameterized.Parameters(name="type: {0}, startAligned: {1}")
    public static Object[][] parameters() {
        return new Object[][]{{CheckpointType.CHECKPOINT, true}, {CheckpointType.CHECKPOINT, false}, {CheckpointType.SAVEPOINT, true}, {CheckpointType.SAVEPOINT, false}};
    }

    public UnalignedCheckpointCompatibilityITCase(CheckpointType type, boolean startAligned) {
        this.startAligned = startAligned;
        this.type = type;
    }

    @BeforeClass
    public static void setupMiniCluster() throws Exception {
        File folder = temporaryFolder.getRoot();
        Configuration conf = new Configuration();
        conf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)folder.toURI().toString());
        conf.set(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, (Object)Integer.MAX_VALUE);
        miniCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(conf).build());
        miniCluster.before();
    }

    @AfterClass
    public static void teardownMiniCluster() {
        miniCluster.after();
    }

    @Before
    public void cleanDirectory() throws IOException {
        FileUtils.cleanDirectory((File)temporaryFolder.getRoot());
    }

    @Test
    public void test() throws Exception {
        Tuple2<String, Map<String, Object>> pathAndAccumulators = this.type.isSavepoint() ? this.runAndTakeSavepoint() : this.runAndTakeExternalCheckpoint();
        String savepointPath = (String)pathAndAccumulators.f0;
        Map accumulatorsBeforeBarrier = (Map)pathAndAccumulators.f1;
        Map<String, Object> accumulatorsAfterBarrier = this.runFromSavepoint(savepointPath, !this.startAligned, 20);
        if (this.type.isSavepoint()) {
            Assert.assertEquals(UnalignedCheckpointCompatibilityITCase.intRange(0, 20), UnalignedCheckpointCompatibilityITCase.extractAndConcat(accumulatorsBeforeBarrier, accumulatorsAfterBarrier));
        }
    }

    private Tuple2<String, Map<String, Object>> runAndTakeSavepoint() throws Exception {
        JobClient jobClient = UnalignedCheckpointCompatibilityITCase.submitJobInitially(this.env(this.startAligned, 0));
        CommonTestUtils.waitForAllTaskRunning(() -> (AccessExecutionGraph)miniCluster.getMiniCluster().getExecutionGraph(jobClient.getJobID()).get());
        Thread.sleep(100L);
        CompletionStage accFuture = jobClient.getJobExecutionResult().thenApply(JobExecutionResult::getAllAccumulatorResults);
        CompletableFuture savepointFuture = jobClient.stopWithSavepoint(false, this.tempFolder().toURI().toString());
        return new Tuple2(savepointFuture.get(), accFuture.get());
    }

    private Tuple2<String, Map<String, Object>> runAndTakeExternalCheckpoint() throws Exception {
        JobClient jobClient = UnalignedCheckpointCompatibilityITCase.submitJobInitially(this.env(this.startAligned, 100));
        File attemptDir = UnalignedCheckpointCompatibilityITCase.waitForChild(temporaryFolder.getRoot(), (dir, name) -> true);
        File checkpointDir = UnalignedCheckpointCompatibilityITCase.waitForChild(attemptDir, (dir, name) -> name.startsWith("chk-"));
        File metadata = UnalignedCheckpointCompatibilityITCase.waitForChild(checkpointDir, (dir, name) -> name.equals("_metadata"));
        this.cancelJob(jobClient);
        return new Tuple2((Object)metadata.getParentFile().toString(), Collections.emptyMap());
    }

    private static JobClient submitJobInitially(StreamExecutionEnvironment env) throws Exception {
        return env.executeAsync(UnalignedCheckpointCompatibilityITCase.dag(10, true, 100, env));
    }

    private Map<String, Object> runFromSavepoint(String path, boolean isAligned, int totalCount) throws Exception {
        StreamExecutionEnvironment env = this.env(isAligned, 50);
        StreamGraph streamGraph = UnalignedCheckpointCompatibilityITCase.dag(totalCount, false, 0, env);
        streamGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)path));
        return env.execute(streamGraph).getJobExecutionResult().getAllAccumulatorResults();
    }

    private static File waitForChild(File dir, FilenameFilter filenameFilter) throws InterruptedException {
        File[] files = dir.listFiles(filenameFilter);
        while (files.length == 0) {
            Thread.sleep(50L);
            files = dir.listFiles(filenameFilter);
        }
        return Arrays.stream(files).max(Comparator.naturalOrder()).get();
    }

    private void cancelJob(JobClient jobClient) throws InterruptedException, ExecutionException {
        jobClient.cancel().get();
        try {
            jobClient.getJobExecutionResult();
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private StreamExecutionEnvironment env(boolean isAligned, int checkpointingInterval) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.setRestartStrategy((RestartStrategies.RestartStrategyConfiguration)new RestartStrategies.NoRestartStrategyConfiguration());
        env.getCheckpointConfig().enableUnalignedCheckpoints(!isAligned);
        env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        if (checkpointingInterval > 0) {
            env.enableCheckpointing((long)checkpointingInterval);
        }
        return env;
    }

    private static StreamGraph dag(int numElements, boolean continueAfterNumElementsReached, int sinkDelayMillis, StreamExecutionEnvironment env) {
        env.addSource((SourceFunction)CancellingIntegerSource.upTo(numElements, continueAfterNumElementsReached)).addSink((SinkFunction)new AccumulatingIntegerSink(sinkDelayMillis));
        return env.getStreamGraph();
    }

    private static List<Integer> intRange(int from, int to) {
        return IntStream.range(from, to).boxed().collect(Collectors.toList());
    }

    private static List<Integer> extractAndConcat(Map<String, Object> ... accumulators) {
        return Stream.of(accumulators).map(AccumulatingIntegerSink::getOutput).peek(l -> Preconditions.checkState((!l.isEmpty() ? 1 : 0) != 0)).flatMap(Collection::stream).collect(Collectors.toList());
    }

    private File tempFolder() throws IOException {
        return temporaryFolder.newFolder();
    }
}

