/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.SuccessException;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public abstract class StreamFaultToleranceTestBase
extends TestLogger {
    @Parameterized.Parameter
    public FailoverStrategy failoverStrategy;
    protected static final int NUM_TASK_MANAGERS = 3;
    protected static final int NUM_TASK_SLOTS = 4;
    protected static final int PARALLELISM = 12;
    private static MiniClusterWithClientResource cluster;
    @ClassRule
    public static TemporaryFolder tempFolder;

    @Parameterized.Parameters(name="FailoverStrategy: {0}")
    public static Collection<FailoverStrategy> parameters() {
        return Arrays.asList(FailoverStrategy.RestartAllFailoverStrategy, FailoverStrategy.RestartPipelinedRegionFailoverStrategy);
    }

    @Before
    public void setup() throws Exception {
        Configuration configuration = new Configuration();
        switch (this.failoverStrategy) {
            case RestartPipelinedRegionFailoverStrategy: {
                configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "region");
                break;
            }
            case RestartAllFailoverStrategy: {
                configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, "full");
            }
        }
        FsStateChangelogStorageFactory.configure((Configuration)configuration, (File)tempFolder.newFolder());
        cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(3).setNumberSlotsPerTaskManager(4).build());
        cluster.before();
    }

    @After
    public void shutDownExistingCluster() {
        if (cluster != null) {
            cluster.after();
            cluster = null;
        }
    }

    public abstract void testProgram(StreamExecutionEnvironment var1);

    public abstract void postSubmit() throws Exception;

    @Test
    public void runCheckpointedProgram() throws Exception {
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(12);
            env.enableCheckpointing(500L);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)0L));
            this.testProgram(env);
            JobGraph jobGraph = env.getStreamGraph().getJobGraph();
            try {
                TestUtils.submitJobAndWaitForResult(cluster.getClusterClient(), jobGraph, ((Object)((Object)this)).getClass().getClassLoader());
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, SuccessException.class).isPresent());
            }
            this.postSubmit();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    static {
        tempFolder = new TemporaryFolder();
    }

    public static class PrefixCount
    implements Serializable {
        public String prefix;
        public String value;
        public long count;

        public PrefixCount() {
        }

        public PrefixCount(String prefix, String value, long count) {
            this.prefix = prefix;
            this.value = value;
            this.count = count;
        }

        public String toString() {
            return this.prefix + " / " + this.value;
        }
    }

    public static enum FailoverStrategy {
        RestartAllFailoverStrategy,
        RestartPipelinedRegionFailoverStrategy;

    }
}

