/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.MiniClusterResource;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class JobRecoveryITCase
extends TestLogger {
    private static final int NUM_TMS = 1;
    private static final int SLOTS_PER_TM = 11;
    private static final int PARALLELISM = 11;
    @ClassRule
    public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(11).build());

    @Test
    public void testTaskFailureRecovery() throws Exception {
        this.runTaskFailureRecoveryTest(this.createjobGraph(false));
    }

    @Test
    public void testTaskFailureWithSlotSharingRecovery() throws Exception {
        this.runTaskFailureRecoveryTest(this.createjobGraph(true));
    }

    private void runTaskFailureRecoveryTest(JobGraph jobGraph) throws Exception {
        MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
        miniCluster.submitJob(jobGraph).get();
        CompletableFuture jobResultFuture = miniCluster.requestJobResult(jobGraph.getJobID());
        Assert.assertThat((Object)((JobResult)jobResultFuture.get()).isSuccess(), (Matcher)Matchers.is((Object)true));
    }

    private JobGraph createjobGraph(boolean slotSharingEnabled) throws IOException {
        JobVertex sender = new JobVertex("Sender");
        sender.setParallelism(11);
        sender.setInvokableClass(TestingAbstractInvokables.Sender.class);
        JobVertex receiver = new JobVertex("Receiver");
        receiver.setParallelism(11);
        receiver.setInvokableClass(FailingOnceReceiver.class);
        FailingOnceReceiver.reset();
        if (slotSharingEnabled) {
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
            receiver.setSlotSharingGroup(slotSharingGroup);
            sender.setSlotSharingGroup(slotSharingGroup);
        }
        receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
        return JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertices(Arrays.asList(sender, receiver)).setJobName(((Object)((Object)this)).getClass().getSimpleName()).setExecutionConfig(executionConfig).build();
    }

    public static final class FailingOnceReceiver
    extends TestingAbstractInvokables.Receiver {
        private static volatile boolean failed = false;

        public FailingOnceReceiver(Environment environment) {
            super(environment);
        }

        @Override
        public void invoke() throws Exception {
            if (!failed && this.getEnvironment().getTaskInfo().getIndexOfThisSubtask() == 0) {
                failed = true;
                throw new FlinkRuntimeException(((Object)((Object)this)).getClass().getSimpleName());
            }
            super.invoke();
        }

        private static void reset() {
            failed = false;
        }
    }
}

